Split Federator.publish_one/1 into a second function called prepare_one/1
This commit is contained in:
parent
16ba2742b7
commit
f8bdcaa161
3 changed files with 65 additions and 19 deletions
|
@ -76,14 +76,12 @@ def representable?(%Activity{} = activity) do
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Publish a single message to a peer. Takes a struct with the following
|
Prepare an activity for publishing from an Oban job
|
||||||
parameters set:
|
|
||||||
|
|
||||||
* `inbox`: the inbox to publish to
|
* `inbox`: the inbox to publish to
|
||||||
* `activity_id`: the internal activity id
|
* `activity_id`: the internal activity id
|
||||||
* `cc`: the cc recipients relevant to this inbox (optional)
|
* `cc`: the cc recipients relevant to this inbox (optional)
|
||||||
"""
|
"""
|
||||||
def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
def prepare_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
||||||
activity = Activity.get_by_id_with_user_actor(activity_id)
|
activity = Activity.get_by_id_with_user_actor(activity_id)
|
||||||
actor = activity.user_actor
|
actor = activity.user_actor
|
||||||
|
|
||||||
|
@ -113,6 +111,38 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
||||||
date: date
|
date: date
|
||||||
})
|
})
|
||||||
|
|
||||||
|
%{
|
||||||
|
activity_id: activity_id,
|
||||||
|
json: json,
|
||||||
|
date: date,
|
||||||
|
signature: signature,
|
||||||
|
digest: digest,
|
||||||
|
inbox: inbox,
|
||||||
|
unreachable_since: params[:unreachable_since]
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Publish a single message to a peer. Takes a struct with the following
|
||||||
|
parameters set:
|
||||||
|
* `activity_id`: the activity id
|
||||||
|
* `json`: the json payload
|
||||||
|
* `date`: the signed date from Pleroma.Signature.signed_date()
|
||||||
|
* `signature`: the signature from Pleroma.Signature.sign/2
|
||||||
|
* `digest`: base64 encoded the hash of the json payload prefixed with "SHA-256="
|
||||||
|
* `inbox`: the inbox URI of this delivery
|
||||||
|
* `unreachable_since`: timestamp the instance was marked unreachable
|
||||||
|
|
||||||
|
"""
|
||||||
|
def publish_one(%{
|
||||||
|
activity_id: activity_id,
|
||||||
|
json: json,
|
||||||
|
date: date,
|
||||||
|
signature: signature,
|
||||||
|
digest: digest,
|
||||||
|
inbox: inbox,
|
||||||
|
unreachable_since: unreachable_since
|
||||||
|
}) do
|
||||||
with {:ok, %{status: code}} = result when code in 200..299 <-
|
with {:ok, %{status: code}} = result when code in 200..299 <-
|
||||||
HTTP.post(
|
HTTP.post(
|
||||||
inbox,
|
inbox,
|
||||||
|
@ -124,14 +154,12 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
||||||
{"digest", digest}
|
{"digest", digest}
|
||||||
]
|
]
|
||||||
) do
|
) do
|
||||||
if not Map.has_key?(params, :unreachable_since) || params[:unreachable_since] do
|
maybe_set_reachable(unreachable_since, inbox)
|
||||||
Instances.set_reachable(inbox)
|
|
||||||
end
|
|
||||||
|
|
||||||
result
|
result
|
||||||
else
|
else
|
||||||
{_post_result, %{status: code} = response} = e ->
|
{_post_result, %{status: code} = response} = e ->
|
||||||
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
|
maybe_set_unreachable(unreachable_since, inbox)
|
||||||
Logger.metadata(activity: activity_id, inbox: inbox, status: code)
|
Logger.metadata(activity: activity_id, inbox: inbox, status: code)
|
||||||
Logger.error("Publisher failed to inbox #{inbox} with status #{code}")
|
Logger.error("Publisher failed to inbox #{inbox} with status #{code}")
|
||||||
|
|
||||||
|
@ -152,7 +180,7 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
||||||
connection_pool_snooze()
|
connection_pool_snooze()
|
||||||
|
|
||||||
e ->
|
e ->
|
||||||
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
|
maybe_set_unreachable(unreachable_since, inbox)
|
||||||
Logger.metadata(activity: activity_id, inbox: inbox)
|
Logger.metadata(activity: activity_id, inbox: inbox)
|
||||||
Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")
|
Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")
|
||||||
{:error, e}
|
{:error, e}
|
||||||
|
@ -161,6 +189,12 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
|
||||||
|
|
||||||
defp connection_pool_snooze, do: {:snooze, 3}
|
defp connection_pool_snooze, do: {:snooze, 3}
|
||||||
|
|
||||||
|
defp maybe_set_reachable(%NaiveDateTime{}, inbox), do: Instances.set_reachable(inbox)
|
||||||
|
defp maybe_set_reachable(_, _), do: :ok
|
||||||
|
|
||||||
|
defp maybe_set_unreachable(nil, inbox), do: Instances.set_unreachable(inbox)
|
||||||
|
defp maybe_set_unreachable(%NaiveDateTime{}, _), do: :ok
|
||||||
|
|
||||||
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
|
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
|
||||||
if port == URI.default_port(scheme) do
|
if port == URI.default_port(scheme) do
|
||||||
host
|
host
|
||||||
|
|
|
@ -71,7 +71,10 @@ defp publish_priority(_), do: 0
|
||||||
# Job Worker Callbacks
|
# Job Worker Callbacks
|
||||||
|
|
||||||
@spec perform(atom(), any()) :: {:ok, any()} | {:error, any()}
|
@spec perform(atom(), any()) :: {:ok, any()} | {:error, any()}
|
||||||
def perform(:publish_one, params), do: Publisher.publish_one(params)
|
def perform(:publish_one, params) do
|
||||||
|
Publisher.prepare_one(params)
|
||||||
|
|> Publisher.publish_one()
|
||||||
|
end
|
||||||
|
|
||||||
def perform(:publish, activity) do
|
def perform(:publish, activity) do
|
||||||
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
|
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
|
||||||
|
|
|
@ -151,18 +151,20 @@ test "publish to url with with different ports" do
|
||||||
_actor = insert(:user)
|
_actor = insert(:user)
|
||||||
|
|
||||||
assert {:ok, %{body: "port 42"}} =
|
assert {:ok, %{body: "port 42"}} =
|
||||||
Publisher.publish_one(%{
|
Publisher.prepare_one(%{
|
||||||
inbox: inbox42,
|
inbox: inbox42,
|
||||||
activity_id: activity.id,
|
activity_id: activity.id,
|
||||||
unreachable_since: true
|
unreachable_since: true
|
||||||
})
|
})
|
||||||
|
|> Publisher.publish_one()
|
||||||
|
|
||||||
assert {:ok, %{body: "port 80"}} =
|
assert {:ok, %{body: "port 80"}} =
|
||||||
Publisher.publish_one(%{
|
Publisher.prepare_one(%{
|
||||||
inbox: inbox80,
|
inbox: inbox80,
|
||||||
activity_id: activity.id,
|
activity_id: activity.id,
|
||||||
unreachable_since: true
|
unreachable_since: true
|
||||||
})
|
})
|
||||||
|
|> Publisher.publish_one()
|
||||||
end
|
end
|
||||||
|
|
||||||
test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is not specified",
|
test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is not specified",
|
||||||
|
@ -174,7 +176,8 @@ test "publish to url with with different ports" do
|
||||||
activity = insert(:note_activity)
|
activity = insert(:note_activity)
|
||||||
|
|
||||||
assert {:ok, _} =
|
assert {:ok, _} =
|
||||||
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
|
Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id})
|
||||||
|
|> Publisher.publish_one()
|
||||||
|
|
||||||
assert called(Instances.set_reachable(inbox))
|
assert called(Instances.set_reachable(inbox))
|
||||||
end
|
end
|
||||||
|
@ -188,11 +191,12 @@ test "publish to url with with different ports" do
|
||||||
activity = insert(:note_activity)
|
activity = insert(:note_activity)
|
||||||
|
|
||||||
assert {:ok, _} =
|
assert {:ok, _} =
|
||||||
Publisher.publish_one(%{
|
Publisher.prepare_one(%{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
activity_id: activity.id,
|
activity_id: activity.id,
|
||||||
unreachable_since: NaiveDateTime.utc_now()
|
unreachable_since: NaiveDateTime.utc_now()
|
||||||
})
|
})
|
||||||
|
|> Publisher.publish_one()
|
||||||
|
|
||||||
assert called(Instances.set_reachable(inbox))
|
assert called(Instances.set_reachable(inbox))
|
||||||
end
|
end
|
||||||
|
@ -206,11 +210,12 @@ test "publish to url with with different ports" do
|
||||||
activity = insert(:note_activity)
|
activity = insert(:note_activity)
|
||||||
|
|
||||||
assert {:ok, _} =
|
assert {:ok, _} =
|
||||||
Publisher.publish_one(%{
|
Publisher.prepare_one(%{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
activity_id: activity.id,
|
activity_id: activity.id,
|
||||||
unreachable_since: nil
|
unreachable_since: nil
|
||||||
})
|
})
|
||||||
|
|> Publisher.publish_one()
|
||||||
|
|
||||||
refute called(Instances.set_reachable(inbox))
|
refute called(Instances.set_reachable(inbox))
|
||||||
end
|
end
|
||||||
|
@ -224,7 +229,8 @@ test "publish to url with with different ports" do
|
||||||
activity = insert(:note_activity)
|
activity = insert(:note_activity)
|
||||||
|
|
||||||
assert {:cancel, _} =
|
assert {:cancel, _} =
|
||||||
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
|
Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id})
|
||||||
|
|> Publisher.publish_one()
|
||||||
|
|
||||||
assert called(Instances.set_unreachable(inbox))
|
assert called(Instances.set_unreachable(inbox))
|
||||||
end
|
end
|
||||||
|
@ -239,10 +245,11 @@ test "publish to url with with different ports" do
|
||||||
|
|
||||||
assert capture_log(fn ->
|
assert capture_log(fn ->
|
||||||
assert {:error, _} =
|
assert {:error, _} =
|
||||||
Publisher.publish_one(%{
|
Publisher.prepare_one(%{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
activity_id: activity.id
|
activity_id: activity.id
|
||||||
})
|
})
|
||||||
|
|> Publisher.publish_one()
|
||||||
end) =~ "connrefused"
|
end) =~ "connrefused"
|
||||||
|
|
||||||
assert called(Instances.set_unreachable(inbox))
|
assert called(Instances.set_unreachable(inbox))
|
||||||
|
@ -257,7 +264,8 @@ test "publish to url with with different ports" do
|
||||||
activity = insert(:note_activity)
|
activity = insert(:note_activity)
|
||||||
|
|
||||||
assert {:ok, _} =
|
assert {:ok, _} =
|
||||||
Publisher.publish_one(%{inbox: inbox, activity_id: activity.id})
|
Publisher.prepare_one(%{inbox: inbox, activity_id: activity.id})
|
||||||
|
|> Publisher.publish_one()
|
||||||
|
|
||||||
refute called(Instances.set_unreachable(inbox))
|
refute called(Instances.set_unreachable(inbox))
|
||||||
end
|
end
|
||||||
|
@ -272,11 +280,12 @@ test "publish to url with with different ports" do
|
||||||
|
|
||||||
assert capture_log(fn ->
|
assert capture_log(fn ->
|
||||||
assert {:error, _} =
|
assert {:error, _} =
|
||||||
Publisher.publish_one(%{
|
Publisher.prepare_one(%{
|
||||||
inbox: inbox,
|
inbox: inbox,
|
||||||
activity_id: activity.id,
|
activity_id: activity.id,
|
||||||
unreachable_since: NaiveDateTime.utc_now()
|
unreachable_since: NaiveDateTime.utc_now()
|
||||||
})
|
})
|
||||||
|
|> Publisher.publish_one()
|
||||||
end) =~ "connrefused"
|
end) =~ "connrefused"
|
||||||
|
|
||||||
refute called(Instances.set_unreachable(inbox))
|
refute called(Instances.set_unreachable(inbox))
|
||||||
|
|
Loading…
Reference in a new issue