Merge branch 'publisher-cc-fix' into 'develop'
Fix follow requests which get stuck pending See merge request pleroma/pleroma!4208
This commit is contained in:
commit
c0195895d2
6 changed files with 170 additions and 82 deletions
1
changelog.d/follow-request.fix
Normal file
1
changelog.d/follow-request.fix
Normal file
|
@ -0,0 +1 @@
|
|||
Fixed malformed follow requests that cause them to appear stuck pending due to the recipient being unable to process them.
|
|
@ -11,6 +11,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
|||
alias Pleroma.Object
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.Publisher.Prepared
|
||||
alias Pleroma.Web.ActivityPub.Relay
|
||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||
alias Pleroma.Workers.PublisherWorker
|
||||
|
@ -76,14 +77,13 @@ def representable?(%Activity{} = activity) do
|
|||
end
|
||||
|
||||
@doc """
|
||||
Publish a single message to a peer. Takes a struct with the following
|
||||
parameters set:
|
||||
|
||||
Prepare an activity for publishing from an Oban job
|
||||
* `inbox`: the inbox to publish to
|
||||
* `activity_id`: the internal activity id
|
||||
* `cc`: the cc recipients relevant to this inbox (optional)
|
||||
"""
|
||||
def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
||||
@spec prepare_one(map()) :: Prepared.t()
|
||||
def prepare_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
||||
activity = Activity.get_by_id_with_user_actor(activity_id)
|
||||
actor = activity.user_actor
|
||||
|
||||
|
@ -93,7 +93,7 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
|||
|
||||
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
|
||||
|
||||
cc = Map.get(params, :cc)
|
||||
cc = Map.get(params, :cc, [])
|
||||
|
||||
json =
|
||||
data
|
||||
|
@ -113,27 +113,49 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
|||
date: date
|
||||
})
|
||||
|
||||
%Prepared{
|
||||
activity_id: activity_id,
|
||||
json: json,
|
||||
date: date,
|
||||
signature: signature,
|
||||
digest: digest,
|
||||
inbox: inbox,
|
||||
unreachable_since: params[:unreachable_since]
|
||||
}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Publish a single message to a peer. Takes a struct with the following
|
||||
parameters set:
|
||||
* `activity_id`: the activity id
|
||||
* `json`: the json payload
|
||||
* `date`: the signed date from Pleroma.Signature.signed_date()
|
||||
* `signature`: the signature from Pleroma.Signature.sign/2
|
||||
* `digest`: base64 encoded the hash of the json payload prefixed with "SHA-256="
|
||||
* `inbox`: the inbox URI of this delivery
|
||||
* `unreachable_since`: timestamp the instance was marked unreachable
|
||||
|
||||
"""
|
||||
def publish_one(%Prepared{} = p) do
|
||||
with {:ok, %{status: code}} = result when code in 200..299 <-
|
||||
HTTP.post(
|
||||
inbox,
|
||||
json,
|
||||
p.inbox,
|
||||
p.json,
|
||||
[
|
||||
{"Content-Type", "application/activity+json"},
|
||||
{"Date", date},
|
||||
{"signature", signature},
|
||||
{"digest", digest}
|
||||
{"Date", p.date},
|
||||
{"signature", p.signature},
|
||||
{"digest", p.digest}
|
||||
]
|
||||
) do
|
||||
if not Map.has_key?(params, :unreachable_since) || params[:unreachable_since] do
|
||||
Instances.set_reachable(inbox)
|
||||
end
|
||||
maybe_set_reachable(p.unreachable_since, p.inbox)
|
||||
|
||||
result
|
||||
else
|
||||
{_post_result, %{status: code} = response} = e ->
|
||||
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
|
||||
Logger.metadata(activity: activity_id, inbox: inbox, status: code)
|
||||
Logger.error("Publisher failed to inbox #{inbox} with status #{code}")
|
||||
maybe_set_unreachable(p.unreachable_since, p.inbox)
|
||||
Logger.metadata(activity: p.activity_id, inbox: p.inbox, status: code)
|
||||
Logger.error("Publisher failed to inbox #{p.inbox} with status #{code}")
|
||||
|
||||
case response do
|
||||
%{status: 400} -> {:cancel, :bad_request}
|
||||
|
@ -152,15 +174,21 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
|||
connection_pool_snooze()
|
||||
|
||||
e ->
|
||||
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
|
||||
Logger.metadata(activity: activity_id, inbox: inbox)
|
||||
Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")
|
||||
maybe_set_unreachable(p.unreachable_since, p.inbox)
|
||||
Logger.metadata(activity: p.activity_id, inbox: p.inbox)
|
||||
Logger.error("Publisher failed to inbox #{p.inbox} #{inspect(e)}")
|
||||
{:error, e}
|
||||
end
|
||||
end
|
||||
|
||||
defp connection_pool_snooze, do: {:snooze, 3}
|
||||
|
||||
defp maybe_set_reachable(%NaiveDateTime{}, inbox), do: Instances.set_reachable(inbox)
|
||||
defp maybe_set_reachable(_, _), do: :ok
|
||||
|
||||
defp maybe_set_unreachable(nil, inbox), do: Instances.set_unreachable(inbox)
|
||||
defp maybe_set_unreachable(%NaiveDateTime{}, _), do: :ok
|
||||
|
||||
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
|
||||
if port == URI.default_port(scheme) do
|
||||
host
|
||||
|
|
8
lib/pleroma/web/activity_pub/publisher/prepared.ex
Normal file
8
lib/pleroma/web/activity_pub/publisher/prepared.ex
Normal file
|
@ -0,0 +1,8 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.ActivityPub.Publisher.Prepared do
|
||||
@type t :: %__MODULE__{}
|
||||
defstruct [:activity_id, :json, :date, :signature, :digest, :inbox, :unreachable_since]
|
||||
end
|
|
@ -71,7 +71,10 @@ defp publish_priority(_), do: 0
|
|||
# Job Worker Callbacks
|
||||
|
||||
@spec perform(atom(), any()) :: {:ok, any()} | {:error, any()}
|
||||
def perform(:publish_one, params), do: Publisher.publish_one(params)
|
||||
def perform(:publish_one, params) do
|
||||
Publisher.prepare_one(params)
|
||||
|> Publisher.publish_one()
|
||||
end
|
||||
|
||||
def perform(:publish, activity) do
|
||||
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.ActivityPub.PublisherTest do
|
||||
use Oban.Testing, repo: Pleroma.Repo
|
||||
use Pleroma.Web.ConnCase
|
||||
|
||||
import ExUnit.CaptureLog
|
||||
|
@ -13,6 +14,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
|
|||
alias Pleroma.Activity
|
||||
alias Pleroma.Instances
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Tests.ObanHelpers
|
||||
alias Pleroma.Web.ActivityPub.Publisher
|
||||
alias Pleroma.Web.CommonAPI
|
||||
|
||||
|
@ -150,32 +152,20 @@ test "publish to url with with different ports" do
|
|||
_actor = insert(:user)
|
||||
|
||||
assert {:ok, %{body: "port 42"}} =
|
||||
Publisher.publish_one(%{
|
||||
Publisher.prepare_one(%{
|
||||
inbox: inbox42,
|
||||
activity_id: activity.id,
|
||||
unreachable_since: true
|
||||
})
|
||||
|> Publisher.publish_one()
|
||||
|
||||
assert {:ok, %{body: "port 80"}} =
|
||||
Publisher.publish_one(%{
|
||||
Publisher.prepare_one(%{
|
||||
inbox: inbox80,
|
||||
activity_id: activity.id,
|
||||
unreachable_since: true
|
||||
})
|
||||
end
|
||||
|
||||
test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is not specified",
|
||||
Instances,
|
||||
[:passthrough],
|
||||
[] do
|
||||
_actor = insert(:user)
|
||||
inbox = "http://200.site/users/nick1/inbox"
|
||||
activity = insert(:note_activity)
|
||||
|
||||
assert {:ok, _} =
|
||||
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
|
||||
|
||||
assert called(Instances.set_reachable(inbox))
|
||||
|> Publisher.publish_one()
|
||||
end
|
||||
|
||||
test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is set",
|
||||
|
@ -187,11 +177,12 @@ test "publish to url with with different ports" do
|
|||
activity = insert(:note_activity)
|
||||
|
||||
assert {:ok, _} =
|
||||
Publisher.publish_one(%{
|
||||
Publisher.prepare_one(%{
|
||||
inbox: inbox,
|
||||
activity_id: activity.id,
|
||||
unreachable_since: NaiveDateTime.utc_now()
|
||||
})
|
||||
|> Publisher.publish_one()
|
||||
|
||||
assert called(Instances.set_reachable(inbox))
|
||||
end
|
||||
|
@ -205,11 +196,12 @@ test "publish to url with with different ports" do
|
|||
activity = insert(:note_activity)
|
||||
|
||||
assert {:ok, _} =
|
||||
Publisher.publish_one(%{
|
||||
Publisher.prepare_one(%{
|
||||
inbox: inbox,
|
||||
activity_id: activity.id,
|
||||
unreachable_since: nil
|
||||
})
|
||||
|> Publisher.publish_one()
|
||||
|
||||
refute called(Instances.set_reachable(inbox))
|
||||
end
|
||||
|
@ -223,7 +215,8 @@ test "publish to url with with different ports" do
|
|||
activity = insert(:note_activity)
|
||||
|
||||
assert {:cancel, _} =
|
||||
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
|
||||
Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id})
|
||||
|> Publisher.publish_one()
|
||||
|
||||
assert called(Instances.set_unreachable(inbox))
|
||||
end
|
||||
|
@ -238,10 +231,11 @@ test "publish to url with with different ports" do
|
|||
|
||||
assert capture_log(fn ->
|
||||
assert {:error, _} =
|
||||
Publisher.publish_one(%{
|
||||
Publisher.prepare_one(%{
|
||||
inbox: inbox,
|
||||
activity_id: activity.id
|
||||
})
|
||||
|> Publisher.publish_one()
|
||||
end) =~ "connrefused"
|
||||
|
||||
assert called(Instances.set_unreachable(inbox))
|
||||
|
@ -256,7 +250,8 @@ test "publish to url with with different ports" do
|
|||
activity = insert(:note_activity)
|
||||
|
||||
assert {:ok, _} =
|
||||
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
|
||||
Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id})
|
||||
|> Publisher.publish_one()
|
||||
|
||||
refute called(Instances.set_unreachable(inbox))
|
||||
end
|
||||
|
@ -271,11 +266,12 @@ test "publish to url with with different ports" do
|
|||
|
||||
assert capture_log(fn ->
|
||||
assert {:error, _} =
|
||||
Publisher.publish_one(%{
|
||||
Publisher.prepare_one(%{
|
||||
inbox: inbox,
|
||||
activity_id: activity.id,
|
||||
unreachable_since: NaiveDateTime.utc_now()
|
||||
})
|
||||
|> Publisher.publish_one()
|
||||
end) =~ "connrefused"
|
||||
|
||||
refute called(Instances.set_unreachable(inbox))
|
||||
|
@ -310,12 +306,15 @@ test "publish to url with with different ports" do
|
|||
|
||||
assert res == :ok
|
||||
|
||||
assert not called(
|
||||
Publisher.enqueue_one(%{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: note_activity.id
|
||||
})
|
||||
)
|
||||
refute_enqueued(
|
||||
worker: "Pleroma.Workers.PublisherWorker",
|
||||
args: %{
|
||||
"params" => %{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: note_activity.id
|
||||
}
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
test_with_mock "Publishes a non-public activity to non-quarantined instances.",
|
||||
|
@ -345,15 +344,16 @@ test "publish to url with with different ports" do
|
|||
|
||||
assert res == :ok
|
||||
|
||||
assert called(
|
||||
Publisher.enqueue_one(
|
||||
%{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: note_activity.id
|
||||
},
|
||||
priority: 1
|
||||
)
|
||||
)
|
||||
assert_enqueued(
|
||||
worker: "Pleroma.Workers.PublisherWorker",
|
||||
args: %{
|
||||
"params" => %{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: note_activity.id
|
||||
}
|
||||
},
|
||||
priority: 1
|
||||
)
|
||||
end
|
||||
|
||||
test_with_mock "Publishes to directly addressed actors with higher priority.",
|
||||
|
@ -403,12 +403,15 @@ test "publish to url with with different ports" do
|
|||
res = Publisher.publish(actor, note_activity)
|
||||
assert res == :ok
|
||||
|
||||
assert called(
|
||||
Publisher.enqueue_one(%{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: note_activity.id
|
||||
})
|
||||
)
|
||||
assert_enqueued(
|
||||
worker: "Pleroma.Workers.PublisherWorker",
|
||||
args: %{
|
||||
"params" => %{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: note_activity.id
|
||||
}
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
test_with_mock "publishes a delete activity to peers who signed fetch requests to the create acitvity/object.",
|
||||
|
@ -452,25 +455,69 @@ test "publish to url with with different ports" do
|
|||
res = Publisher.publish(actor, delete)
|
||||
assert res == :ok
|
||||
|
||||
assert called(
|
||||
Publisher.enqueue_one(
|
||||
%{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: delete.id
|
||||
},
|
||||
priority: 1
|
||||
)
|
||||
)
|
||||
assert_enqueued(
|
||||
worker: "Pleroma.Workers.PublisherWorker",
|
||||
args: %{
|
||||
"params" => %{
|
||||
inbox: "https://domain.com/users/nick1/inbox",
|
||||
activity_id: delete.id
|
||||
}
|
||||
},
|
||||
priority: 1
|
||||
)
|
||||
|
||||
assert called(
|
||||
Publisher.enqueue_one(
|
||||
%{
|
||||
inbox: "https://domain2.com/users/nick1/inbox",
|
||||
activity_id: delete.id
|
||||
},
|
||||
priority: 1
|
||||
)
|
||||
)
|
||||
assert_enqueued(
|
||||
worker: "Pleroma.Workers.PublisherWorker",
|
||||
args: %{
|
||||
"params" => %{
|
||||
inbox: "https://domain2.com/users/nick1/inbox",
|
||||
activity_id: delete.id
|
||||
}
|
||||
},
|
||||
priority: 1
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
test "cc in prepared json for a follow request is an empty list" do
|
||||
user = insert(:user)
|
||||
remote_user = insert(:user, local: false)
|
||||
|
||||
{:ok, _, _, activity} = CommonAPI.follow(remote_user, user)
|
||||
|
||||
assert_enqueued(
|
||||
worker: "Pleroma.Workers.PublisherWorker",
|
||||
args: %{
|
||||
"activity_id" => activity.id,
|
||||
"op" => "publish"
|
||||
}
|
||||
)
|
||||
|
||||
ObanHelpers.perform_all()
|
||||
|
||||
expected_params =
|
||||
%{
|
||||
"activity_id" => activity.id,
|
||||
"inbox" => remote_user.inbox,
|
||||
"unreachable_since" => nil
|
||||
}
|
||||
|
||||
assert_enqueued(
|
||||
worker: "Pleroma.Workers.PublisherWorker",
|
||||
args: %{
|
||||
"op" => "publish_one",
|
||||
"params" => expected_params
|
||||
}
|
||||
)
|
||||
|
||||
# params need to be atom keys for Publisher.prepare_one.
|
||||
# this is done in the Oban job.
|
||||
expected_params = Map.new(expected_params, fn {k, v} -> {String.to_atom(k), v} end)
|
||||
|
||||
%{json: json} = Publisher.prepare_one(expected_params)
|
||||
|
||||
{:ok, decoded} = Jason.decode(json)
|
||||
|
||||
assert decoded["cc"] == []
|
||||
end
|
||||
end
|
||||
|
|
|
@ -63,7 +63,8 @@ def user_factory(attrs \\ %{}) do
|
|||
ap_id: ap_id,
|
||||
follower_address: ap_id <> "/followers",
|
||||
following_address: ap_id <> "/following",
|
||||
featured_address: ap_id <> "/collections/featured"
|
||||
featured_address: ap_id <> "/collections/featured",
|
||||
inbox: "https://#{base_domain}/inbox"
|
||||
}
|
||||
else
|
||||
%{
|
||||
|
|
Loading…
Reference in a new issue