Optimistic Inbox

Rework inbound federation to accept requests optimistically. The HTTP Signatures Plug will not attempt to fetch the actor or key and will fail early.

If the signature cannot be validated we pass the required data into the Oban job with a reduced priority and increase the timeout to 20 seconds. The Oban job will handle the actor and key fetching before attempting to validate the activity again. This job will be retried 5 times by default.

Another welcome side effect is that actors who change their keys can federate to Pleroma instances immediately instead of needing to wait the default value of 86400s / 24 hours before the key will be fetched again.
This commit is contained in:
Mark Felder 2023-12-07 22:27:19 -05:00
parent 5f74aadaaf
commit 074b31d9ab
6 changed files with 58 additions and 11 deletions

View file

@ -58,7 +58,7 @@ def refetch_public_key(conn) do
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn), with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
{:ok, actor_id} <- key_id_to_actor_id(kid), {:ok, actor_id} <- key_id_to_actor_id(kid),
{:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id), {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id),
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do {:ok, public_key} <- User.get_or_fetch_public_key_for_ap_id(actor_id) do
{:ok, public_key} {:ok, public_key}
else else
e -> e ->

View file

@ -2135,7 +2135,7 @@ def public_key(%{public_key: public_key_pem}) when is_binary(public_key_pem) do
def public_key(_), do: {:error, "key not found"} def public_key(_), do: {:error, "key not found"}
def get_public_key_for_ap_id(ap_id) do def get_or_fetch_public_key_for_ap_id(ap_id) do
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id), with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id),
{:ok, public_key} <- public_key(user) do {:ok, public_key} <- public_key(user) do
{:ok, public_key} {:ok, public_key}
@ -2144,6 +2144,15 @@ def get_public_key_for_ap_id(ap_id) do
end end
end end
def get_public_key_for_ap_id(ap_id) do
with {:ok, %User{} = user} <- get_cached_by_ap_id(ap_id),
{:ok, public_key} <- public_key(user) do
{:ok, public_key}
else
_ -> :error
end
end
@doc "Gets or fetch a user by uri or nickname." @doc "Gets or fetch a user by uri or nickname."
@spec get_or_fetch(String.t()) :: {:ok, User.t()} | {:error, String.t()} @spec get_or_fetch(String.t()) :: {:ok, User.t()} | {:error, String.t()}
def get_or_fetch("http://" <> _host = uri), do: get_or_fetch_by_ap_id(uri) def get_or_fetch("http://" <> _host = uri), do: get_or_fetch_by_ap_id(uri)

View file

@ -287,10 +287,9 @@ def inbox(%{assigns: %{valid_signature: true}} = conn, params) do
json(conn, "ok") json(conn, "ok")
end end
def inbox(%{assigns: %{valid_signature: false}} = conn, _params) do def inbox(%{assigns: %{valid_signature: false}, req_headers: req_headers} = conn, params) do
conn Federator.incoming_ap_doc(%{req_headers: req_headers, params: params})
|> put_status(:bad_request) json(conn, "ok")
|> json("Invalid HTTP Signature")
end end
# POST /relay/inbox -or- POST /internal/fetch/inbox # POST /relay/inbox -or- POST /internal/fetch/inbox

View file

@ -35,6 +35,13 @@ def allowed_thread_distance?(distance) do
end end
# Client API # Client API
def incoming_ap_doc(%{params: params, req_headers: req_headers}) do
ReceiverWorker.enqueue(
"incoming_ap_doc",
%{"req_headers" => req_headers, "params" => params, "timeout" => :timer.seconds(20)},
priority: 5
)
end
def incoming_ap_doc(params) do def incoming_ap_doc(params) do
ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})

View file

@ -3,24 +3,51 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReceiverWorker do defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.Signature
alias Pleroma.User
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
@impl Oban.Worker @impl Oban.Worker
def perform(%Job{
args: %{"op" => "incoming_ap_doc", "req_headers" => req_headers, "params" => params}
}) do
conn_data = %{params: params, req_headers: req_headers}
with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]),
{:ok, _public_key} <- Signature.refetch_public_key(conn_data),
{:signature, true} <- {:signature, HTTPSignatures.validate_conn(conn_data)},
{:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
{:ok, res}
else
e -> process_errors(e)
end
end
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
{:ok, res} {:ok, res}
else else
e -> process_errors(e)
end
end
@impl Oban.Worker
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
def timeout(_job), do: :timer.seconds(5)
defp process_errors(errors) do
case errors do
{:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed} {:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed}
{:error, :already_present} -> {:cancel, :already_present} {:error, :already_present} -> {:cancel, :already_present}
{:error, {:validate_object, reason}} -> {:cancel, reason} {:error, {:validate_object, reason}} -> {:cancel, reason}
{:error, {:error, {:validate, reason}}} -> {:cancel, reason} {:error, {:error, {:validate, reason}}} -> {:cancel, reason}
{:error, {:reject, reason}} -> {:cancel, reason} {:error, {:reject, reason}} -> {:cancel, reason}
{:signature, false} -> {:error, :invalid_signature}
e -> e e -> e
end end
end end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -1951,8 +1951,13 @@ test "unsuggests a user" do
end end
end end
test "get_public_key_for_ap_id fetches a user that's not in the db" do test "get_or_fetch_public_key_for_ap_id fetches a user that's not in the db" do
assert {:ok, _key} = User.get_public_key_for_ap_id("http://mastodon.example.org/users/admin") assert {:ok, _key} =
User.get_or_fetch_public_key_for_ap_id("http://mastodon.example.org/users/admin")
end
test "get_public_key_for_ap_id returns correctly for user that's not in the db" do
assert :error = User.get_public_key_for_ap_id("http://mastodon.example.org/users/admin")
end end
describe "per-user rich-text filtering" do describe "per-user rich-text filtering" do