From f8bdcaa161575e40097a82481009620edc5a0696 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Tue, 6 Aug 2024 11:15:35 -0400 Subject: [PATCH] Split Federator.publish_one/1 into a second function called prepare_one/1 --- lib/pleroma/web/activity_pub/publisher.ex | 52 +++++++++++++++---- lib/pleroma/web/federator.ex | 5 +- .../web/activity_pub/publisher_test.exs | 27 ++++++---- 3 files changed, 65 insertions(+), 19 deletions(-) diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index e63b8ff1f6..2d2c09f1c8 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -76,14 +76,12 @@ def representable?(%Activity{} = activity) do end @doc """ - Publish a single message to a peer. Takes a struct with the following - parameters set: - + Prepare an activity for publishing from an Oban job * `inbox`: the inbox to publish to * `activity_id`: the internal activity id * `cc`: the cc recipients relevant to this inbox (optional) """ - def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do + def prepare_one(%{inbox: inbox, activity_id: activity_id} = params) do activity = Activity.get_by_id_with_user_actor(activity_id) actor = activity.user_actor @@ -113,6 +111,38 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do date: date }) + %{ + activity_id: activity_id, + json: json, + date: date, + signature: signature, + digest: digest, + inbox: inbox, + unreachable_since: params[:unreachable_since] + } + end + + @doc """ + Publish a single message to a peer. Takes a struct with the following + parameters set: + * `activity_id`: the activity id + * `json`: the json payload + * `date`: the signed date from Pleroma.Signature.signed_date() + * `signature`: the signature from Pleroma.Signature.sign/2 + * `digest`: base64 encoded the hash of the json payload prefixed with "SHA-256=" + * `inbox`: the inbox URI of this delivery + * `unreachable_since`: timestamp the instance was marked unreachable + + """ + def publish_one(%{ + activity_id: activity_id, + json: json, + date: date, + signature: signature, + digest: digest, + inbox: inbox, + unreachable_since: unreachable_since + }) do with {:ok, %{status: code}} = result when code in 200..299 <- HTTP.post( inbox, @@ -124,14 +154,12 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do {"digest", digest} ] ) do - if not Map.has_key?(params, :unreachable_since) || params[:unreachable_since] do - Instances.set_reachable(inbox) - end + maybe_set_reachable(unreachable_since, inbox) result else {_post_result, %{status: code} = response} = e -> - unless params[:unreachable_since], do: Instances.set_unreachable(inbox) + maybe_set_unreachable(unreachable_since, inbox) Logger.metadata(activity: activity_id, inbox: inbox, status: code) Logger.error("Publisher failed to inbox #{inbox} with status #{code}") @@ -152,7 +180,7 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do connection_pool_snooze() e -> - unless params[:unreachable_since], do: Instances.set_unreachable(inbox) + maybe_set_unreachable(unreachable_since, inbox) Logger.metadata(activity: activity_id, inbox: inbox) Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}") {:error, e} @@ -161,6 +189,12 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do defp connection_pool_snooze, do: {:snooze, 3} + defp maybe_set_reachable(%NaiveDateTime{}, inbox), do: Instances.set_reachable(inbox) + defp maybe_set_reachable(_, _), do: :ok + + defp maybe_set_unreachable(nil, inbox), do: Instances.set_unreachable(inbox) + defp maybe_set_unreachable(%NaiveDateTime{}, _), do: :ok + defp signature_host(%URI{port: port, scheme: scheme, host: host}) do if port == URI.default_port(scheme) do host diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 3d3101d61e..242cf4bfd4 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -71,7 +71,10 @@ defp publish_priority(_), do: 0 # Job Worker Callbacks @spec perform(atom(), any()) :: {:ok, any()} | {:error, any()} - def perform(:publish_one, params), do: Publisher.publish_one(params) + def perform(:publish_one, params) do + Publisher.prepare_one(params) + |> Publisher.publish_one() + end def perform(:publish, activity) do Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) diff --git a/test/pleroma/web/activity_pub/publisher_test.exs b/test/pleroma/web/activity_pub/publisher_test.exs index 1992fb2283..370b4b4d28 100644 --- a/test/pleroma/web/activity_pub/publisher_test.exs +++ b/test/pleroma/web/activity_pub/publisher_test.exs @@ -151,18 +151,20 @@ test "publish to url with with different ports" do _actor = insert(:user) assert {:ok, %{body: "port 42"}} = - Publisher.publish_one(%{ + Publisher.prepare_one(%{ inbox: inbox42, activity_id: activity.id, unreachable_since: true }) + |> Publisher.publish_one() assert {:ok, %{body: "port 80"}} = - Publisher.publish_one(%{ + Publisher.prepare_one(%{ inbox: inbox80, activity_id: activity.id, unreachable_since: true }) + |> Publisher.publish_one() end test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is not specified", @@ -174,7 +176,8 @@ test "publish to url with with different ports" do activity = insert(:note_activity) assert {:ok, _} = - Publisher.publish_one(%{inbox: inbox, activity_id: activity.id}) + Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id}) + |> Publisher.publish_one() assert called(Instances.set_reachable(inbox)) end @@ -188,11 +191,12 @@ test "publish to url with with different ports" do activity = insert(:note_activity) assert {:ok, _} = - Publisher.publish_one(%{ + Publisher.prepare_one(%{ inbox: inbox, activity_id: activity.id, unreachable_since: NaiveDateTime.utc_now() }) + |> Publisher.publish_one() assert called(Instances.set_reachable(inbox)) end @@ -206,11 +210,12 @@ test "publish to url with with different ports" do activity = insert(:note_activity) assert {:ok, _} = - Publisher.publish_one(%{ + Publisher.prepare_one(%{ inbox: inbox, activity_id: activity.id, unreachable_since: nil }) + |> Publisher.publish_one() refute called(Instances.set_reachable(inbox)) end @@ -224,7 +229,8 @@ test "publish to url with with different ports" do activity = insert(:note_activity) assert {:cancel, _} = - Publisher.publish_one(%{inbox: inbox, activity_id: activity.id}) + Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id}) + |> Publisher.publish_one() assert called(Instances.set_unreachable(inbox)) end @@ -239,10 +245,11 @@ test "publish to url with with different ports" do assert capture_log(fn -> assert {:error, _} = - Publisher.publish_one(%{ + Publisher.prepare_one(%{ inbox: inbox, activity_id: activity.id }) + |> Publisher.publish_one() end) =~ "connrefused" assert called(Instances.set_unreachable(inbox)) @@ -257,7 +264,8 @@ test "publish to url with with different ports" do activity = insert(:note_activity) assert {:ok, _} = - Publisher.publish_one(%{inbox: inbox, activity_id: activity.id}) + Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id}) + |> Publisher.publish_one() refute called(Instances.set_unreachable(inbox)) end @@ -272,11 +280,12 @@ test "publish to url with with different ports" do assert capture_log(fn -> assert {:error, _} = - Publisher.publish_one(%{ + Publisher.prepare_one(%{ inbox: inbox, activity_id: activity.id, unreachable_since: NaiveDateTime.utc_now() }) + |> Publisher.publish_one() end) =~ "connrefused" refute called(Instances.set_unreachable(inbox))