B ActivityPub.Publisher: Prioritize direct mentions
This commit is contained in:
parent
147b37b893
commit
3fbc80eb58
4 changed files with 114 additions and 60 deletions
|
@ -118,7 +118,7 @@ defp should_federate?(inbox, public) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec recipients(User.t(), Activity.t()) :: list(User.t()) | []
|
@spec recipients(User.t(), Activity.t()) :: {[User.t()], [User.t()]}
|
||||||
defp recipients(actor, activity) do
|
defp recipients(actor, activity) do
|
||||||
followers =
|
followers =
|
||||||
if actor.follower_address in activity.recipients do
|
if actor.follower_address in activity.recipients do
|
||||||
|
@ -138,7 +138,10 @@ defp recipients(actor, activity) do
|
||||||
[]
|
[]
|
||||||
end
|
end
|
||||||
|
|
||||||
Pleroma.Web.Federator.Publisher.remote_users(actor, activity) ++ followers ++ fetchers
|
mentioned = Pleroma.Web.Federator.Publisher.remote_users(actor, activity)
|
||||||
|
non_mentioned = (followers ++ fetchers) -- mentioned
|
||||||
|
|
||||||
|
{mentioned, non_mentioned}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp get_cc_ap_ids(ap_id, recipients) do
|
defp get_cc_ap_ids(ap_id, recipients) do
|
||||||
|
@ -195,34 +198,39 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
|
||||||
public = is_public?(activity)
|
public = is_public?(activity)
|
||||||
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
||||||
|
|
||||||
recipients = recipients(actor, activity)
|
{priority_recipients, recipients} = recipients(actor, activity)
|
||||||
|
|
||||||
inboxes =
|
inboxes =
|
||||||
recipients
|
[priority_recipients, recipients]
|
||||||
|> Enum.map(fn actor -> actor.inbox end)
|
|> Enum.map(fn recipients ->
|
||||||
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|
recipients
|
||||||
|> Instances.filter_reachable()
|
|> Enum.map(fn actor -> actor.inbox end)
|
||||||
|
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|
||||||
|
|> Instances.filter_reachable()
|
||||||
|
end)
|
||||||
|
|
||||||
Repo.checkout(fn ->
|
Repo.checkout(fn ->
|
||||||
Enum.each(inboxes, fn {inbox, unreachable_since} ->
|
Enum.each(inboxes, fn inboxes ->
|
||||||
%User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end)
|
Enum.each(inboxes, fn {inbox, unreachable_since} ->
|
||||||
|
%User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end)
|
||||||
|
|
||||||
# Get all the recipients on the same host and add them to cc. Otherwise, a remote
|
# Get all the recipients on the same host and add them to cc. Otherwise, a remote
|
||||||
# instance would only accept a first message for the first recipient and ignore the rest.
|
# instance would only accept a first message for the first recipient and ignore the rest.
|
||||||
cc = get_cc_ap_ids(ap_id, recipients)
|
cc = get_cc_ap_ids(ap_id, recipients)
|
||||||
|
|
||||||
json =
|
json =
|
||||||
data
|
data
|
||||||
|> Map.put("cc", cc)
|
|> Map.put("cc", cc)
|
||||||
|> Jason.encode!()
|
|> Jason.encode!()
|
||||||
|
|
||||||
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
|
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
json: json,
|
json: json,
|
||||||
actor_id: actor.id,
|
actor_id: actor.id,
|
||||||
id: activity.data["id"],
|
id: activity.data["id"],
|
||||||
unreachable_since: unreachable_since
|
unreachable_since: unreachable_since
|
||||||
})
|
})
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
@ -239,25 +247,33 @@ def publish(%User{} = actor, %Activity{} = activity) do
|
||||||
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
||||||
json = Jason.encode!(data)
|
json = Jason.encode!(data)
|
||||||
|
|
||||||
recipients(actor, activity)
|
{priority_recipients, recipients} = recipients(actor, activity)
|
||||||
|> Enum.map(fn %User{} = user ->
|
|
||||||
determine_inbox(activity, user)
|
[{priority_recipients, 0}, {recipients, 1}]
|
||||||
end)
|
|> Enum.map(fn {recipients, priority} ->
|
||||||
|> Enum.uniq()
|
recipients
|
||||||
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|
|> Enum.map(fn %User{} = user ->
|
||||||
|> Instances.filter_reachable()
|
determine_inbox(activity, user)
|
||||||
|> Enum.each(fn {inbox, unreachable_since} ->
|
end)
|
||||||
Pleroma.Web.Federator.Publisher.enqueue_one(
|
|> Enum.uniq()
|
||||||
__MODULE__,
|
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
|
||||||
%{
|
|> Instances.filter_reachable()
|
||||||
inbox: inbox,
|
|> Enum.each(fn {inbox, unreachable_since} ->
|
||||||
json: json,
|
Pleroma.Web.Federator.Publisher.enqueue_one(
|
||||||
actor_id: actor.id,
|
__MODULE__,
|
||||||
id: activity.data["id"],
|
%{
|
||||||
unreachable_since: unreachable_since
|
inbox: inbox,
|
||||||
}
|
json: json,
|
||||||
)
|
actor_id: actor.id,
|
||||||
|
id: activity.data["id"],
|
||||||
|
unreachable_since: unreachable_since
|
||||||
|
},
|
||||||
|
priority: priority
|
||||||
|
)
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
def gather_webfinger_links(%User{} = user) do
|
def gather_webfinger_links(%User{} = user) do
|
||||||
|
|
|
@ -29,11 +29,12 @@ defmodule Pleroma.Web.Federator.Publisher do
|
||||||
@doc """
|
@doc """
|
||||||
Enqueue publishing a single activity.
|
Enqueue publishing a single activity.
|
||||||
"""
|
"""
|
||||||
@spec enqueue_one(module(), Map.t()) :: :ok
|
@spec enqueue_one(module(), Map.t(), Keyword.t()) :: {:ok, %Oban.Job{}}
|
||||||
def enqueue_one(module, %{} = params) do
|
def enqueue_one(module, %{} = params, worker_args \\ []) do
|
||||||
PublisherWorker.enqueue(
|
PublisherWorker.enqueue(
|
||||||
"publish_one",
|
"publish_one",
|
||||||
%{"module" => to_string(module), "params" => params}
|
%{"module" => to_string(module), "params" => params},
|
||||||
|
worker_args
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -331,11 +331,40 @@ test "publish to url with with different ports" do
|
||||||
assert res == :ok
|
assert res == :ok
|
||||||
|
|
||||||
assert called(
|
assert called(
|
||||||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
|
Pleroma.Web.Federator.Publisher.enqueue_one(
|
||||||
inbox: "https://domain.com/users/nick1/inbox",
|
Publisher,
|
||||||
actor_id: actor.id,
|
%{
|
||||||
id: note_activity.data["id"]
|
inbox: "https://domain.com/users/nick1/inbox",
|
||||||
})
|
actor_id: actor.id,
|
||||||
|
id: note_activity.data["id"]
|
||||||
|
},
|
||||||
|
priority: 1
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
test_with_mock "Publishes to directly addressed actors with higher priority.",
|
||||||
|
Pleroma.Web.Federator.Publisher,
|
||||||
|
[:passthrough],
|
||||||
|
[] do
|
||||||
|
note_activity = insert(:direct_note_activity)
|
||||||
|
|
||||||
|
actor = Pleroma.User.get_by_ap_id(note_activity.data["actor"])
|
||||||
|
|
||||||
|
res = Publisher.publish(actor, note_activity)
|
||||||
|
|
||||||
|
assert res == :ok
|
||||||
|
|
||||||
|
assert called(
|
||||||
|
Pleroma.Web.Federator.Publisher.enqueue_one(
|
||||||
|
Publisher,
|
||||||
|
%{
|
||||||
|
inbox: :_,
|
||||||
|
actor_id: actor.id,
|
||||||
|
id: note_activity.data["id"]
|
||||||
|
},
|
||||||
|
priority: 0
|
||||||
|
)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -414,19 +443,27 @@ test "publish to url with with different ports" do
|
||||||
assert res == :ok
|
assert res == :ok
|
||||||
|
|
||||||
assert called(
|
assert called(
|
||||||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
|
Pleroma.Web.Federator.Publisher.enqueue_one(
|
||||||
inbox: "https://domain.com/users/nick1/inbox",
|
Publisher,
|
||||||
actor_id: actor.id,
|
%{
|
||||||
id: delete.data["id"]
|
inbox: "https://domain.com/users/nick1/inbox",
|
||||||
})
|
actor_id: actor.id,
|
||||||
|
id: delete.data["id"]
|
||||||
|
},
|
||||||
|
priority: 1
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
assert called(
|
assert called(
|
||||||
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
|
Pleroma.Web.Federator.Publisher.enqueue_one(
|
||||||
inbox: "https://domain2.com/users/nick1/inbox",
|
Publisher,
|
||||||
actor_id: actor.id,
|
%{
|
||||||
id: delete.data["id"]
|
inbox: "https://domain2.com/users/nick1/inbox",
|
||||||
})
|
actor_id: actor.id,
|
||||||
|
id: delete.data["id"]
|
||||||
|
},
|
||||||
|
priority: 1
|
||||||
|
)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -212,7 +212,7 @@ def listen_factory do
|
||||||
end
|
end
|
||||||
|
|
||||||
def direct_note_factory do
|
def direct_note_factory do
|
||||||
user2 = insert(:user)
|
user2 = insert(:user, local: false, inbox: "http://example.com/inbox")
|
||||||
|
|
||||||
%Pleroma.Object{data: data} = note_factory()
|
%Pleroma.Object{data: data} = note_factory()
|
||||||
%Pleroma.Object{data: Map.merge(data, %{"to" => [user2.ap_id]})}
|
%Pleroma.Object{data: Map.merge(data, %{"to" => [user2.ap_id]})}
|
||||||
|
|
Loading…
Reference in a new issue