diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index af6aa0781e..373ceef322 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -118,7 +118,7 @@ defp should_federate?(inbox, public) do end end - @spec recipients(User.t(), Activity.t()) :: list(User.t()) | [] + @spec recipients(User.t(), Activity.t()) :: {[User.t()], [User.t()]} defp recipients(actor, activity) do followers = if actor.follower_address in activity.recipients do @@ -138,7 +138,10 @@ defp recipients(actor, activity) do [] end - Pleroma.Web.Federator.Publisher.remote_users(actor, activity) ++ followers ++ fetchers + mentioned = Pleroma.Web.Federator.Publisher.remote_users(actor, activity) + non_mentioned = (followers ++ fetchers) -- mentioned + + {mentioned, non_mentioned} end defp get_cc_ap_ids(ap_id, recipients) do @@ -195,34 +198,39 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - recipients = recipients(actor, activity) + {priority_recipients, recipients} = recipients(actor, activity) inboxes = - recipients - |> Enum.map(fn actor -> actor.inbox end) - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() + [priority_recipients, recipients] + |> Enum.map(fn recipients -> + recipients + |> Enum.map(fn actor -> actor.inbox end) + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Instances.filter_reachable() + end) Repo.checkout(fn -> - Enum.each(inboxes, fn {inbox, unreachable_since} -> - %User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end) + Enum.each(inboxes, fn inboxes -> + Enum.each(inboxes, fn {inbox, unreachable_since} -> + %User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end) - # Get all the recipients on the same host and add them to cc. Otherwise, a remote - # instance would only accept a first message for the first recipient and ignore the rest. - cc = get_cc_ap_ids(ap_id, recipients) + # Get all the recipients on the same host and add them to cc. Otherwise, a remote + # instance would only accept a first message for the first recipient and ignore the rest. + cc = get_cc_ap_ids(ap_id, recipients) - json = - data - |> Map.put("cc", cc) - |> Jason.encode!() + json = + data + |> Map.put("cc", cc) + |> Jason.encode!() - Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ - inbox: inbox, - json: json, - actor_id: actor.id, - id: activity.data["id"], - unreachable_since: unreachable_since - }) + Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ + inbox: inbox, + json: json, + actor_id: actor.id, + id: activity.data["id"], + unreachable_since: unreachable_since + }) + end) end) end) end @@ -239,25 +247,33 @@ def publish(%User{} = actor, %Activity{} = activity) do {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - recipients(actor, activity) - |> Enum.map(fn %User{} = user -> - determine_inbox(activity, user) - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() - |> Enum.each(fn {inbox, unreachable_since} -> - Pleroma.Web.Federator.Publisher.enqueue_one( - __MODULE__, - %{ - inbox: inbox, - json: json, - actor_id: actor.id, - id: activity.data["id"], - unreachable_since: unreachable_since - } - ) + {priority_recipients, recipients} = recipients(actor, activity) + + [{priority_recipients, 0}, {recipients, 1}] + |> Enum.map(fn {recipients, priority} -> + recipients + |> Enum.map(fn %User{} = user -> + determine_inbox(activity, user) + end) + |> Enum.uniq() + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Instances.filter_reachable() + |> Enum.each(fn {inbox, unreachable_since} -> + Pleroma.Web.Federator.Publisher.enqueue_one( + __MODULE__, + %{ + inbox: inbox, + json: json, + actor_id: actor.id, + id: activity.data["id"], + unreachable_since: unreachable_since + }, + priority: priority + ) + end) end) + + :ok end def gather_webfinger_links(%User{} = user) do diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index a45796e9de..8c6547208b 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -29,11 +29,12 @@ defmodule Pleroma.Web.Federator.Publisher do @doc """ Enqueue publishing a single activity. """ - @spec enqueue_one(module(), Map.t()) :: :ok - def enqueue_one(module, %{} = params) do + @spec enqueue_one(module(), Map.t(), Keyword.t()) :: {:ok, %Oban.Job{}} + def enqueue_one(module, %{} = params, worker_args \\ []) do PublisherWorker.enqueue( "publish_one", - %{"module" => to_string(module), "params" => params} + %{"module" => to_string(module), "params" => params}, + worker_args ) end diff --git a/test/pleroma/web/activity_pub/publisher_test.exs b/test/pleroma/web/activity_pub/publisher_test.exs index c5137cbb75..9800144b50 100644 --- a/test/pleroma/web/activity_pub/publisher_test.exs +++ b/test/pleroma/web/activity_pub/publisher_test.exs @@ -331,11 +331,40 @@ test "publish to url with with different ports" do assert res == :ok assert called( - Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ - inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, - id: note_activity.data["id"] - }) + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: "https://domain.com/users/nick1/inbox", + actor_id: actor.id, + id: note_activity.data["id"] + }, + priority: 1 + ) + ) + end + + test_with_mock "Publishes to directly addressed actors with higher priority.", + Pleroma.Web.Federator.Publisher, + [:passthrough], + [] do + note_activity = insert(:direct_note_activity) + + actor = Pleroma.User.get_by_ap_id(note_activity.data["actor"]) + + res = Publisher.publish(actor, note_activity) + + assert res == :ok + + assert called( + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: :_, + actor_id: actor.id, + id: note_activity.data["id"] + }, + priority: 0 + ) ) end @@ -414,19 +443,27 @@ test "publish to url with with different ports" do assert res == :ok assert called( - Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ - inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, - id: delete.data["id"] - }) + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: "https://domain.com/users/nick1/inbox", + actor_id: actor.id, + id: delete.data["id"] + }, + priority: 1 + ) ) assert called( - Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ - inbox: "https://domain2.com/users/nick1/inbox", - actor_id: actor.id, - id: delete.data["id"] - }) + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: "https://domain2.com/users/nick1/inbox", + actor_id: actor.id, + id: delete.data["id"] + }, + priority: 1 + ) ) end end diff --git a/test/support/factory.ex b/test/support/factory.ex index d945447170..20bc5162e1 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -212,7 +212,7 @@ def listen_factory do end def direct_note_factory do - user2 = insert(:user) + user2 = insert(:user, local: false, inbox: "http://example.com/inbox") %Pleroma.Object{data: data} = note_factory() %Pleroma.Object{data: Map.merge(data, %{"to" => [user2.ap_id]})}