Merge branch 'optimistic-inbox' into 'develop'
Optimistic Inbox See merge request pleroma/pleroma!3989
This commit is contained in:
commit
e7974afd37
7 changed files with 54 additions and 14 deletions
1
changelog.d/optimistic-inbox.change
Normal file
1
changelog.d/optimistic-inbox.change
Normal file
|
@ -0,0 +1 @@
|
|||
Optimistic Inbox reduces the processing overhead of incoming activities without instantly verifiable signatures.
|
|
@ -2136,7 +2136,7 @@ def public_key(%{public_key: public_key_pem}) when is_binary(public_key_pem) do
|
|||
def public_key(_), do: {:error, "key not found"}
|
||||
|
||||
def get_public_key_for_ap_id(ap_id) do
|
||||
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id),
|
||||
with %User{} = user <- get_cached_by_ap_id(ap_id),
|
||||
{:ok, public_key} <- public_key(user) do
|
||||
{:ok, public_key}
|
||||
else
|
||||
|
|
|
@ -287,10 +287,9 @@ def inbox(%{assigns: %{valid_signature: true}} = conn, params) do
|
|||
json(conn, "ok")
|
||||
end
|
||||
|
||||
def inbox(%{assigns: %{valid_signature: false}} = conn, _params) do
|
||||
conn
|
||||
|> put_status(:bad_request)
|
||||
|> json("Invalid HTTP Signature")
|
||||
def inbox(%{assigns: %{valid_signature: false}, req_headers: req_headers} = conn, params) do
|
||||
Federator.incoming_ap_doc(%{req_headers: req_headers, params: params})
|
||||
json(conn, "ok")
|
||||
end
|
||||
|
||||
# POST /relay/inbox -or- POST /internal/fetch/inbox
|
||||
|
|
|
@ -35,6 +35,17 @@ def allowed_thread_distance?(distance) do
|
|||
end
|
||||
|
||||
# Client API
|
||||
def incoming_ap_doc(%{params: params, req_headers: req_headers}) do
|
||||
ReceiverWorker.enqueue(
|
||||
"incoming_ap_doc",
|
||||
%{"req_headers" => req_headers, "params" => params, "timeout" => :timer.seconds(20)},
|
||||
priority: 2
|
||||
)
|
||||
end
|
||||
|
||||
def incoming_ap_doc(%{"type" => "Delete"} = params) do
|
||||
ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}, priority: 3)
|
||||
end
|
||||
|
||||
def incoming_ap_doc(params) do
|
||||
ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
|
||||
|
|
|
@ -3,24 +3,56 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.ReceiverWorker do
|
||||
alias Pleroma.Signature
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.Federator
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
|
||||
|
||||
@impl Oban.Worker
|
||||
|
||||
def perform(%Job{
|
||||
args: %{"op" => "incoming_ap_doc", "req_headers" => req_headers, "params" => params}
|
||||
}) do
|
||||
# Oban's serialization converts our tuple headers to lists.
|
||||
# Revert it for the signature validation.
|
||||
req_headers = Enum.into(req_headers, [], &List.to_tuple(&1))
|
||||
|
||||
conn_data = %{params: params, req_headers: req_headers}
|
||||
|
||||
with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]),
|
||||
{:ok, _public_key} <- Signature.refetch_public_key(conn_data),
|
||||
{:signature, true} <- {:signature, HTTPSignatures.validate_conn(conn_data)},
|
||||
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
|
||||
{:ok, res}
|
||||
else
|
||||
e -> process_errors(e)
|
||||
end
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
|
||||
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
|
||||
{:ok, res}
|
||||
else
|
||||
e -> process_errors(e)
|
||||
end
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
|
||||
|
||||
def timeout(_job), do: :timer.seconds(5)
|
||||
|
||||
defp process_errors(errors) do
|
||||
case errors do
|
||||
{:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
|
||||
{:error, :already_present} -> {:cancel, :already_present}
|
||||
{:error, {:validate_object, reason}} -> {:cancel, reason}
|
||||
{:error, {:error, {:validate, reason}}} -> {:cancel, reason}
|
||||
{:error, {:reject, reason}} -> {:cancel, reason}
|
||||
{:signature, false} -> {:cancel, :invalid_signature}
|
||||
{:error, {:error, reason = "Object has been deleted"}} -> {:cancel, reason}
|
||||
e -> e
|
||||
end
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
def timeout(_job), do: :timer.seconds(5)
|
||||
end
|
||||
|
|
|
@ -43,10 +43,7 @@ test "it returns key" do
|
|||
end
|
||||
|
||||
test "it returns error when not found user" do
|
||||
assert capture_log(fn ->
|
||||
assert Signature.fetch_public_key(make_fake_conn("https://test-ap-id")) ==
|
||||
{:error, :error}
|
||||
end) =~ "[error] Could not decode user"
|
||||
assert Signature.fetch_public_key(make_fake_conn("https://test-ap-id")) == {:error, :error}
|
||||
end
|
||||
|
||||
test "it returns error if public key is nil" do
|
||||
|
|
|
@ -1951,8 +1951,8 @@ test "unsuggests a user" do
|
|||
end
|
||||
end
|
||||
|
||||
test "get_public_key_for_ap_id fetches a user that's not in the db" do
|
||||
assert {:ok, _key} = User.get_public_key_for_ap_id("http://mastodon.example.org/users/admin")
|
||||
test "get_public_key_for_ap_id returns correctly for user that's not in the db" do
|
||||
assert :error = User.get_public_key_for_ap_id("http://mastodon.example.org/users/admin")
|
||||
end
|
||||
|
||||
describe "per-user rich-text filtering" do
|
||||
|
|
Loading…
Reference in a new issue