From 1f986ec7138ce95a102a84c75a5b39dd885bf451 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 1 Aug 2024 22:02:04 -0400 Subject: [PATCH] Gun: Publisher job behavior improvement Gun's connection pool also returns an error if duplicate workers are launched simultaneously. Snooze on this error as well, and lower the snooze to 3 seconds with the optimism that the connection will still be open by then and the delivery can be completed quickly. The original setting of 30 seconds is pretty high and means there's an unnatural lag between deliveries of activities destined to the same server that were created at nearly the same time. This configuration should be more efficient. --- changelog.d/oban_gun_snooze.change | 1 + lib/pleroma/web/activity_pub/publisher.ex | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 changelog.d/oban_gun_snooze.change diff --git a/changelog.d/oban_gun_snooze.change b/changelog.d/oban_gun_snooze.change new file mode 100644 index 0000000000..c94525b2ae --- /dev/null +++ b/changelog.d/oban_gun_snooze.change @@ -0,0 +1 @@ +Publisher behavior improvement when snoozing Oban jobs due to Gun connection pool contention. diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index e040753dc6..e63b8ff1f6 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -143,9 +143,13 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do _ -> {:error, e} end + {:error, {:already_started, _}} -> + Logger.debug("Publisher snoozing worker job due worker :already_started race condition") + connection_pool_snooze() + {:error, :pool_full} -> Logger.debug("Publisher snoozing worker job due to full connection pool") - {:snooze, 30} + connection_pool_snooze() e -> unless params[:unreachable_since], do: Instances.set_unreachable(inbox) @@ -155,6 +159,8 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do end end + defp connection_pool_snooze, do: {:snooze, 3} + defp signature_host(%URI{port: port, scheme: scheme, host: host}) do if port == URI.default_port(scheme) do host