From 52e9bec15655dd3ba75c133b85266a1ea65a8eef Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 28 Jun 2024 11:47:31 -0400 Subject: [PATCH 1/3] Remove WorkerHelper --- changelog.d/workerhelper.skip | 0 config/config.exs | 7 --- lib/mix/tasks/pleroma/database.ex | 10 ++-- lib/pleroma/emails/mailer.ex | 3 +- lib/pleroma/filter.ex | 11 +++-- lib/pleroma/instances/instance.ex | 3 +- lib/pleroma/mfa/token.ex | 13 +++-- lib/pleroma/object.ex | 3 +- lib/pleroma/search.ex | 6 ++- lib/pleroma/user.ex | 19 +++++--- lib/pleroma/user/import.ex | 30 +++++++----- lib/pleroma/web/activity_pub/activity_pub.ex | 18 ++++--- lib/pleroma/web/activity_pub/publisher.ex | 6 +-- lib/pleroma/web/activity_pub/side_effects.ex | 14 ++++-- lib/pleroma/web/common_api.ex | 6 +-- lib/pleroma/web/federator.ex | 27 +++++++---- lib/pleroma/web/o_auth/token.ex | 9 ++-- lib/pleroma/web/push.ex | 2 +- .../workers/attachments_cleanup_worker.ex | 6 +-- lib/pleroma/workers/background_worker.ex | 8 ++-- lib/pleroma/workers/backup_worker.ex | 4 +- .../workers/cron/digest_emails_worker.ex | 4 +- .../workers/cron/new_users_digest_worker.ex | 6 +-- lib/pleroma/workers/delete_worker.ex | 7 ++- lib/pleroma/workers/mailer_worker.ex | 6 +-- lib/pleroma/workers/mute_expire_worker.ex | 6 +-- lib/pleroma/workers/poll_worker.ex | 6 +-- lib/pleroma/workers/publisher_worker.ex | 22 ++++++--- lib/pleroma/workers/purge_expired_activity.ex | 11 ++--- lib/pleroma/workers/purge_expired_filter.ex | 2 +- lib/pleroma/workers/purge_expired_token.ex | 12 +---- lib/pleroma/workers/receiver_worker.ex | 6 +-- lib/pleroma/workers/remote_fetcher_worker.ex | 6 +-- lib/pleroma/workers/rich_media_worker.ex | 4 +- .../workers/scheduled_activity_worker.ex | 6 +-- lib/pleroma/workers/search_indexing_worker.ex | 6 +-- lib/pleroma/workers/user_refresh_worker.ex | 2 +- lib/pleroma/workers/web_pusher_worker.ex | 6 +-- lib/pleroma/workers/worker_helper.ex | 48 ------------------- test/mix/tasks/pleroma/database_test.exs | 10 ++-- .../workers/purge_expired_activity_test.exs | 30 +++++++----- 41 files changed, 200 insertions(+), 211 deletions(-) create mode 100644 changelog.d/workerhelper.skip delete mode 100644 lib/pleroma/workers/worker_helper.ex diff --git a/changelog.d/workerhelper.skip b/changelog.d/workerhelper.skip new file mode 100644 index 0000000000..e69de29bb2 diff --git a/config/config.exs b/config/config.exs index 4780892f7a..b835a7c807 100644 --- a/config/config.exs +++ b/config/config.exs @@ -600,13 +600,6 @@ {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker} ] -config :pleroma, :workers, - retries: [ - federator_incoming: 5, - federator_outgoing: 5, - search_indexing: 2 - ] - config :pleroma, Pleroma.Formatter, class: false, rel: "ugc", diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index b82d1f079d..e52b5e0a72 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -295,10 +295,12 @@ def run(["ensure_expiration"]) do |> DateTime.from_naive!("Etc/UTC") |> Timex.shift(days: days) - Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ - activity_id: activity.id, - expires_at: expires_at - }) + Pleroma.Workers.PurgeExpiredActivity.enqueue( + %{ + activity_id: activity.id + }, + scheduled_at: expires_at + ) end) end) |> Stream.run() diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index 1014421308..2a80f8547c 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -25,7 +25,8 @@ def deliver_async(email, config \\ []) do |> :erlang.term_to_binary() |> Base.encode64() - MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config}) + MailerWorker.new(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}) + |> Oban.insert() end @doc "callback to perform send email from queue" diff --git a/lib/pleroma/filter.ex b/lib/pleroma/filter.ex index e827d3cbcb..77ed64d4fc 100644 --- a/lib/pleroma/filter.ex +++ b/lib/pleroma/filter.ex @@ -133,10 +133,13 @@ defp maybe_add_expires_at(changeset, %{expires_in: nil}) do defp maybe_add_expires_at(changeset, _), do: changeset defp maybe_add_expiration_job(%{expires_at: %NaiveDateTime{} = expires_at} = filter) do - Pleroma.Workers.PurgeExpiredFilter.enqueue(%{ - filter_id: filter.id, - expires_at: DateTime.from_naive!(expires_at, "Etc/UTC") - }) + Pleroma.Workers.PurgeExpiredFilter.new( + %{ + filter_id: filter.id + }, + scheduled_at: DateTime.from_naive!(expires_at, "Etc/UTC") + ) + |> Oban.insert() end defp maybe_add_expiration_job(_), do: {:ok, nil} diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index 288555146e..33f1229d02 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -297,7 +297,8 @@ defp scrape_metadata(%URI{} = instance_uri) do all of those users' activities and notifications. """ def delete_users_and_activities(host) when is_binary(host) do - DeleteWorker.enqueue("delete_instance", %{"host" => host}) + DeleteWorker.new(%{"op" => "delete_instance", "host" => host}) + |> Oban.insert() end def perform(:delete_instance, host) when is_binary(host) do diff --git a/lib/pleroma/mfa/token.ex b/lib/pleroma/mfa/token.ex index 57bc11ed52..b53e1c7d0e 100644 --- a/lib/pleroma/mfa/token.ex +++ b/lib/pleroma/mfa/token.ex @@ -52,11 +52,14 @@ defp expired?(%__MODULE__{valid_until: valid_until}) do @spec create(User.t(), Authorization.t() | nil) :: {:ok, t()} | {:error, Ecto.Changeset.t()} def create(user, authorization \\ nil) do with {:ok, token} <- do_create(user, authorization) do - Pleroma.Workers.PurgeExpiredToken.enqueue(%{ - token_id: token.id, - valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"), - mod: __MODULE__ - }) + Pleroma.Workers.PurgeExpiredToken.new( + %{ + token_id: token.id, + mod: __MODULE__ + }, + scheduled_at: DateTime.from_naive!(token.valid_until, "Etc/UTC") + ) + |> Oban.insert() {:ok, token} end diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index eb44b3855b..748f18e6cd 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -255,7 +255,8 @@ def delete(%Object{data: %{"id" => id}} = object) do @spec cleanup_attachments(boolean(), Object.t()) :: {:ok, Oban.Job.t() | nil} def cleanup_attachments(true, %Object{} = object) do - AttachmentsCleanupWorker.enqueue("cleanup_attachments", %{"object" => object}) + AttachmentsCleanupWorker.new(%{"op" => "cleanup_attachments", "object" => object}) + |> Oban.insert() end def cleanup_attachments(_, _), do: {:ok, nil} diff --git a/lib/pleroma/search.ex b/lib/pleroma/search.ex index b9d2a0188e..30b3ba9586 100644 --- a/lib/pleroma/search.ex +++ b/lib/pleroma/search.ex @@ -2,11 +2,13 @@ defmodule Pleroma.Search do alias Pleroma.Workers.SearchIndexingWorker def add_to_index(%Pleroma.Activity{id: activity_id}) do - SearchIndexingWorker.enqueue("add_to_index", %{"activity" => activity_id}) + SearchIndexingWorker.new(%{"op" => "add_to_index", "activity" => activity_id}) + |> Oban.insert() end def remove_from_index(%Pleroma.Object{id: object_id}) do - SearchIndexingWorker.enqueue("remove_from_index", %{"object" => object_id}) + SearchIndexingWorker.new(%{"op" => "remove_from_index", "object" => object_id}) + |> Oban.insert() end def search(query, options) do diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index e28d76a7c8..0e9d708313 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -736,7 +736,8 @@ def update_password_reset_pending(user, value) do end def force_password_reset_async(user) do - BackgroundWorker.enqueue("force_password_reset", %{"user_id" => user.id}) + BackgroundWorker.new(%{"op" => "force_password_reset", "user_id" => user.id}) + |> Oban.insert() end @spec force_password_reset(User.t()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()} @@ -1218,7 +1219,8 @@ def update_and_set_cache(struct, params) do def update_and_set_cache(changeset) do with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do if get_change(changeset, :raw_fields) do - BackgroundWorker.enqueue("verify_fields_links", %{"user_id" => user.id}) + BackgroundWorker.new(%{"op" => "verify_fields_links", "user_id" => user.id}) + |> Oban.insert() end set_cache(user) @@ -1589,11 +1591,11 @@ def mute(%User{} = muter, %User{} = mutee, params \\ %{}) do )) || {:ok, nil} do if duration > 0 do - Pleroma.Workers.MuteExpireWorker.enqueue( - "unmute_user", - %{"muter_id" => muter.id, "mutee_id" => mutee.id}, + Pleroma.Workers.MuteExpireWorker.new( + %{"op" => "unmute_user", "muter_id" => muter.id, "mutee_id" => mutee.id}, scheduled_at: expires_at ) + |> Oban.insert() end @cachex.del(:user_cache, "muted_users_ap_ids:#{muter.ap_id}") @@ -1836,7 +1838,8 @@ defp maybe_filter_on_ap_id(query, ap_ids) when is_list(ap_ids) do defp maybe_filter_on_ap_id(query, _ap_ids), do: query def set_activation_async(user, status \\ true) do - BackgroundWorker.enqueue("user_activation", %{"user_id" => user.id, "status" => status}) + BackgroundWorker.new(%{"op" => "user_activation", "user_id" => user.id, "status" => status}) + |> Oban.insert() end @spec set_activation([User.t()], boolean()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()} @@ -1983,7 +1986,9 @@ def delete(users) when is_list(users) do def delete(%User{} = user) do # Purge the user immediately purge(user) - DeleteWorker.enqueue("delete_user", %{"user_id" => user.id}) + + DeleteWorker.new(%{"op" => "delete_user", "user_id" => user.id}) + |> Oban.insert() end # *Actually* delete the user from the DB diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex index 53ffd1ab3d..11905237c3 100644 --- a/lib/pleroma/user/import.ex +++ b/lib/pleroma/user/import.ex @@ -63,23 +63,29 @@ defp handle_error(op, user_id, error) do end def blocks_import(%User{} = blocker, [_ | _] = identifiers) do - BackgroundWorker.enqueue( - "blocks_import", - %{"user_id" => blocker.id, "identifiers" => identifiers} - ) + BackgroundWorker.new(%{ + "op" => "blocks_import", + "user_id" => blocker.id, + "identifiers" => identifiers + }) + |> Oban.insert() end def follow_import(%User{} = follower, [_ | _] = identifiers) do - BackgroundWorker.enqueue( - "follow_import", - %{"user_id" => follower.id, "identifiers" => identifiers} - ) + BackgroundWorker.new(%{ + "op" => "follow_import", + "user_id" => follower.id, + "identifiers" => identifiers + }) + |> Oban.insert() end def mutes_import(%User{} = user, [_ | _] = identifiers) do - BackgroundWorker.enqueue( - "mutes_import", - %{"user_id" => user.id, "identifiers" => identifiers} - ) + BackgroundWorker.new(%{ + "op" => "mutes_import", + "user_id" => user.id, + "identifiers" => identifiers + }) + |> Oban.insert() end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index b30b0cabe5..a2a94a0ff4 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -222,10 +222,12 @@ defp maybe_create_activity_expiration( %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity ) do with {:ok, _job} <- - Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ - activity_id: activity.id, - expires_at: expires_at - }) do + Pleroma.Workers.PurgeExpiredActivity.enqueue( + %{ + activity_id: activity.id + }, + scheduled_at: expires_at + ) do {:ok, activity} end end @@ -446,10 +448,12 @@ def move(%User{} = origin, %User{} = target, local \\ true) do _ <- notify_and_stream(activity) do maybe_federate(activity) - BackgroundWorker.enqueue("move_following", %{ + BackgroundWorker.new(%{ + "op" => "move_following", "origin_id" => origin.id, "target_id" => target.id }) + |> Oban.insert() {:ok, activity} else @@ -1797,10 +1801,12 @@ def enqueue_pin_fetches(%{pinned_objects: pins}) do # enqueue a task to fetch all pinned objects Enum.each(pins, fn {ap_id, _} -> if is_nil(Object.get_cached_by_ap_id(ap_id)) do - Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{ + Pleroma.Workers.RemoteFetcherWorker.new(%{ + "op" => "fetch_remote", "id" => ap_id, "depth" => 1 }) + |> Oban.insert() end end) end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index e040753dc6..f71652cb74 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -30,11 +30,11 @@ defmodule Pleroma.Web.ActivityPub.Publisher do """ @spec enqueue_one(map(), Keyword.t()) :: {:ok, %Oban.Job{}} def enqueue_one(%{} = params, worker_args \\ []) do - PublisherWorker.enqueue( - "publish_one", - %{"params" => params}, + PublisherWorker.new( + %{"op" => "publish_one", "params" => params}, worker_args ) + |> Oban.insert() end @doc """ diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index cc1c7a0afb..d6d4036719 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -223,10 +223,12 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and object.data["replies"] != nil do for reply_id <- object.data["replies"] do - Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{ + Pleroma.Workers.RemoteFetcherWorker.new(%{ + "op" => "fetch_remote", "id" => reply_id, "depth" => reply_depth }) + |> Oban.insert() end end @@ -410,10 +412,12 @@ def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do {:ok, expires_at} = Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at]) - Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ - activity_id: meta[:activity_id], - expires_at: expires_at - }) + Pleroma.Workers.PurgeExpiredActivity.enqueue( + %{ + activity_id: meta[:activity_id] + }, + scheduled_at: expires_at + ) end {:ok, object, meta} diff --git a/lib/pleroma/web/common_api.ex b/lib/pleroma/web/common_api.ex index b90b6a6d90..1ed905d6cb 100644 --- a/lib/pleroma/web/common_api.ex +++ b/lib/pleroma/web/common_api.ex @@ -559,11 +559,11 @@ def add_mute(activity, user, params \\ %{}) do with {:ok, _} <- ThreadMute.add_mute(user.id, activity.data["context"]), _ <- Pleroma.Notification.mark_context_as_read(user, activity.data["context"]) do if expires_in > 0 do - Pleroma.Workers.MuteExpireWorker.enqueue( - "unmute_conversation", - %{"user_id" => user.id, "activity_id" => activity.id}, + Pleroma.Workers.MuteExpireWorker.new( + %{"op" => "unmute_conversation", "user_id" => user.id, "activity_id" => activity.id}, schedule_in: expires_in ) + |> Oban.insert() end {:ok, activity} diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 3d3101d61e..c740fc85fa 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -35,22 +35,30 @@ def allowed_thread_distance?(distance) do end # Client API - def incoming_ap_doc(%{params: _params, req_headers: _req_headers} = args) do - job_args = Enum.into(args, %{}, fn {k, v} -> {Atom.to_string(k), v} end) - - ReceiverWorker.enqueue( - "incoming_ap_doc", - Map.put(job_args, "timeout", :timer.seconds(20)), + def incoming_ap_doc(%{params: params, req_headers: req_headers}) do + ReceiverWorker.new( + %{ + "op" => "incoming_ap_doc", + "req_headers" => req_headers, + "params" => params, + "timeout" => :timer.seconds(20) + }, priority: 2 ) + |> Oban.insert() end def incoming_ap_doc(%{"type" => "Delete"} = params) do - ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}, priority: 3, queue: :slow) + ReceiverWorker.new(%{"op" => "incoming_ap_doc", "params" => params}, + priority: 3, + queue: :slow + ) + |> Oban.insert() end def incoming_ap_doc(params) do - ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) + ReceiverWorker.new(%{"op" => "incoming_ap_doc", "params" => params}) + |> Oban.insert() end @impl true @@ -60,9 +68,10 @@ def publish(%{id: "pleroma:fakeid"} = activity) do @impl true def publish(%Pleroma.Activity{data: %{"type" => type}} = activity) do - PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}, + PublisherWorker.new(%{"op" => "publish", "activity_id" => activity.id}, priority: publish_priority(type) ) + |> Oban.insert() end defp publish_priority("Delete"), do: 3 diff --git a/lib/pleroma/web/o_auth/token.ex b/lib/pleroma/web/o_auth/token.ex index 9b1198b428..d964250944 100644 --- a/lib/pleroma/web/o_auth/token.ex +++ b/lib/pleroma/web/o_auth/token.ex @@ -100,11 +100,10 @@ defp put_valid_until(changeset, attrs) do def create(%App{} = app, %User{} = user, attrs \\ %{}) do with {:ok, token} <- do_create(app, user, attrs) do if Pleroma.Config.get([:oauth2, :clean_expired_tokens]) do - Pleroma.Workers.PurgeExpiredToken.enqueue(%{ - token_id: token.id, - valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"), - mod: __MODULE__ - }) + Pleroma.Workers.PurgeExpiredToken.new(%{token_id: token.id, mod: __MODULE__}, + scheduled_at: DateTime.from_naive!(token.valid_until, "Etc/UTC") + ) + |> Oban.insert() end {:ok, token} diff --git a/lib/pleroma/web/push.ex b/lib/pleroma/web/push.ex index d4693f63e4..d783f776a5 100644 --- a/lib/pleroma/web/push.ex +++ b/lib/pleroma/web/push.ex @@ -28,6 +28,6 @@ def enabled, do: match?([subject: _, public_key: _, private_key: _], vapid_confi @spec send(Pleroma.Notification.t()) :: {:ok, Oban.Job.t()} | {:error, Oban.Job.changeset() | term()} def send(notification) do - WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id}) + WebPusherWorker.new(%{"op" => "web_push", "notification_id" => notification.id}) end end diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex index 0b570b70b0..e2f92b1fd0 100644 --- a/lib/pleroma/workers/attachments_cleanup_worker.ex +++ b/lib/pleroma/workers/attachments_cleanup_worker.ex @@ -8,9 +8,9 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do alias Pleroma.Object alias Pleroma.Repo - use Pleroma.Workers.WorkerHelper, queue: "slow" + use Oban.Worker, queue: :slow - @impl Oban.Worker + @impl true def perform(%Job{ args: %{ "op" => "cleanup_attachments", @@ -31,7 +31,7 @@ def perform(%Job{ def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip} - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(900) defp do_clean({object_ids, attachment_urls}) do diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 870aef3c6a..60da2d5ca7 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -5,9 +5,9 @@ defmodule Pleroma.Workers.BackgroundWorker do alias Pleroma.User - use Pleroma.Workers.WorkerHelper, queue: "background" + use Oban.Worker, queue: :background - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "user_activation", "user_id" => user_id, "status" => status}}) do user = User.get_cached_by_id(user_id) @@ -39,6 +39,6 @@ def perform(%Job{args: %{"op" => "verify_fields_links", "user_id" => user_id}}) User.perform(:verify_fields_links, user) end - @impl Oban.Worker - def timeout(_job), do: :timer.seconds(15) + @impl true + def timeout(_job), do: :timer.seconds(900) end diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index d1b6fcdadf..6466d8d73e 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -9,7 +9,7 @@ defmodule Pleroma.Workers.BackupWorker do alias Pleroma.Config.Getting, as: Config alias Pleroma.User.Backup - @impl Oban.Worker + @impl true def perform(%Job{ args: %{"op" => "process", "backup_id" => backup_id} }) do @@ -32,7 +32,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do end end - @impl Oban.Worker + @impl true def timeout(_job), do: Config.get([Backup, :timeout], :timer.minutes(30)) defp has_email?(user) do diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex index 17e92d10b0..b50b52a7b0 100644 --- a/lib/pleroma/workers/cron/digest_emails_worker.ex +++ b/lib/pleroma/workers/cron/digest_emails_worker.ex @@ -18,7 +18,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do require Logger - @impl Oban.Worker + @impl true def perform(_job) do config = Config.get([:email_notifications, :digest]) @@ -59,6 +59,6 @@ def send_email(user) do User.touch_last_digest_emailed_at(user) end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex index 1f57aad4a2..7876499831 100644 --- a/lib/pleroma/workers/cron/new_users_digest_worker.ex +++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex @@ -9,9 +9,9 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do import Ecto.Query - use Pleroma.Workers.WorkerHelper, queue: "background" + use Oban.Worker, queue: :background - @impl Oban.Worker + @impl true def perform(_job) do if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do today = NaiveDateTime.utc_now() |> Timex.beginning_of_day() @@ -61,6 +61,6 @@ def perform(_job) do :ok end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/delete_worker.ex b/lib/pleroma/workers/delete_worker.ex index 97003fb69e..6a1c7bb383 100644 --- a/lib/pleroma/workers/delete_worker.ex +++ b/lib/pleroma/workers/delete_worker.ex @@ -6,10 +6,9 @@ defmodule Pleroma.Workers.DeleteWorker do alias Pleroma.Instances.Instance alias Pleroma.User - use Pleroma.Workers.WorkerHelper, queue: "slow" - - @impl Oban.Worker + use Oban.Worker, queue: :slow + @impl true def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do user = User.get_cached_by_id(user_id) User.perform(:delete, user) @@ -19,6 +18,6 @@ def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do Instance.perform(:delete_instance, host) end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(900) end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex index 652bf77e01..b0259b1917 100644 --- a/lib/pleroma/workers/mailer_worker.ex +++ b/lib/pleroma/workers/mailer_worker.ex @@ -3,9 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.MailerWorker do - use Pleroma.Workers.WorkerHelper, queue: "background" + use Oban.Worker, queue: :background - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do encoded_email |> Base.decode64!() @@ -13,6 +13,6 @@ def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "con |> Pleroma.Emails.Mailer.deliver(config) end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/mute_expire_worker.ex b/lib/pleroma/workers/mute_expire_worker.ex index a7ab5883af..8356a775d7 100644 --- a/lib/pleroma/workers/mute_expire_worker.ex +++ b/lib/pleroma/workers/mute_expire_worker.ex @@ -3,9 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.MuteExpireWorker do - use Pleroma.Workers.WorkerHelper, queue: "background" + use Oban.Worker, queue: :background - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do Pleroma.User.unmute(muter_id, mutee_id) :ok @@ -18,6 +18,6 @@ def perform(%Job{ :ok end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index af8997e702..d263aa1b9e 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -6,13 +6,13 @@ defmodule Pleroma.Workers.PollWorker do @moduledoc """ Generates notifications when a poll ends. """ - use Pleroma.Workers.WorkerHelper, queue: "background" + use Oban.Worker, queue: :background alias Pleroma.Activity alias Pleroma.Notification alias Pleroma.Object - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do with %Activity{} = activity <- find_poll_activity(activity_id), {:ok, notifications} <- Notification.create_poll_notifications(activity) do @@ -23,7 +23,7 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do end end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) defp find_poll_activity(activity_id) do diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex index 63fcf4ac22..7d9b022de3 100644 --- a/lib/pleroma/workers/publisher_worker.ex +++ b/lib/pleroma/workers/publisher_worker.ex @@ -6,13 +6,9 @@ defmodule Pleroma.Workers.PublisherWorker do alias Pleroma.Activity alias Pleroma.Web.Federator - use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + use Oban.Worker, queue: :federator_outgoing, max_attempts: 5 - def backoff(%Job{attempt: attempt}) when is_integer(attempt) do - Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) - end - - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do activity = Activity.get_by_id(activity_id) Federator.perform(:publish, activity) @@ -23,6 +19,18 @@ def perform(%Job{args: %{"op" => "publish_one", "params" => params}}) do Federator.perform(:publish_one, params) end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(10) + + @base_backoff 15 + @pow 5 + @impl true + def backoff(%Job{attempt: attempt}) when is_integer(attempt) do + backoff = + :math.pow(attempt, @pow) + + @base_backoff + + :rand.uniform(2 * @base_backoff) * attempt + + trunc(backoff) + end end diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex index f48e340420..f05e75f46a 100644 --- a/lib/pleroma/workers/purge_expired_activity.ex +++ b/lib/pleroma/workers/purge_expired_activity.ex @@ -13,16 +13,13 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do alias Pleroma.Activity - @spec enqueue(map()) :: + @spec enqueue(map(), list()) :: {:ok, Oban.Job.t()} | {:error, :expired_activities_disabled} | {:error, :expiration_too_close} - def enqueue(args) do + def enqueue(params, worker_args) do with true <- enabled?() do - {scheduled_at, args} = Map.pop(args, :expires_at) - - args - |> new(scheduled_at: scheduled_at) + new(params, worker_args) |> Oban.insert() end end @@ -35,7 +32,7 @@ def perform(%Oban.Job{args: %{"activity_id" => id}}) do end end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) defp enabled? do diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex index 1f6931e4c2..0405f66847 100644 --- a/lib/pleroma/workers/purge_expired_filter.ex +++ b/lib/pleroma/workers/purge_expired_filter.ex @@ -31,7 +31,7 @@ def perform(%Job{args: %{"filter_id" => id}}) do |> Repo.delete() end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) @spec get_expiration(pos_integer()) :: Job.t() | nil diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex index 1854bf5619..ff962f21b6 100644 --- a/lib/pleroma/workers/purge_expired_token.ex +++ b/lib/pleroma/workers/purge_expired_token.ex @@ -9,16 +9,6 @@ defmodule Pleroma.Workers.PurgeExpiredToken do use Oban.Worker, queue: :background, max_attempts: 1 - @spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) :: - {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()} - def enqueue(args) do - {scheduled_at, args} = Map.pop(args, :valid_until) - - args - |> __MODULE__.new(scheduled_at: scheduled_at) - |> Oban.insert() - end - @impl true def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do module @@ -27,6 +17,6 @@ def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do |> Pleroma.Repo.delete() end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index fd5c13fca2..d4db97b639 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -7,9 +7,9 @@ defmodule Pleroma.Workers.ReceiverWorker do alias Pleroma.User alias Pleroma.Web.Federator - use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" + use Oban.Worker, queue: :federator_incoming, max_attempts: 5 - @impl Oban.Worker + @impl true def perform(%Job{ args: %{ @@ -51,7 +51,7 @@ def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do end end - @impl Oban.Worker + @impl true def timeout(%_{args: %{"timeout" => timeout}}), do: timeout def timeout(_job), do: :timer.seconds(5) diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex index 60096e14b5..e43765733e 100644 --- a/lib/pleroma/workers/remote_fetcher_worker.ex +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -5,9 +5,9 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do alias Pleroma.Object.Fetcher - use Pleroma.Workers.WorkerHelper, queue: "background" + use Oban.Worker, queue: :background - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do {:ok, _object} -> @@ -30,6 +30,6 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do end end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(15) end diff --git a/lib/pleroma/workers/rich_media_worker.ex b/lib/pleroma/workers/rich_media_worker.ex index 2ebf42d4f2..d5ba7b63ec 100644 --- a/lib/pleroma/workers/rich_media_worker.ex +++ b/lib/pleroma/workers/rich_media_worker.ex @@ -9,7 +9,7 @@ defmodule Pleroma.Workers.RichMediaWorker do use Oban.Worker, queue: :background, max_attempts: 3, unique: [period: 300] - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "expire", "url" => url} = _args}) do Card.delete(url) end @@ -33,7 +33,7 @@ def perform(%Job{args: %{"op" => "backfill", "url" => _url} = args}) do # a slow/infinite data stream and insert a negative cache entry for the URL # We pad it by 2 seconds to be certain a slow connection is detected and we # can inject a negative cache entry for the URL - @impl Oban.Worker + @impl true def timeout(_job) do Config.get!([:rich_media, :timeout]) + :timer.seconds(2) end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex index ab62686f42..da386e0c32 100644 --- a/lib/pleroma/workers/scheduled_activity_worker.ex +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do The worker to post scheduled activity. """ - use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + use Oban.Worker, queue: :federator_outgoing, max_attempts: 5 alias Pleroma.Repo alias Pleroma.ScheduledActivity @@ -15,7 +15,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do require Logger - @impl Oban.Worker + @impl true def perform(%Job{args: %{"activity_id" => activity_id}}) do with %ScheduledActivity{} = scheduled_activity <- find_scheduled_activity(activity_id), %User{} = user <- find_user(scheduled_activity.user_id) do @@ -37,7 +37,7 @@ def perform(%Job{args: %{"activity_id" => activity_id}}) do end end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) defp find_scheduled_activity(id) do diff --git a/lib/pleroma/workers/search_indexing_worker.ex b/lib/pleroma/workers/search_indexing_worker.ex index 8969ae3788..001f5254d4 100644 --- a/lib/pleroma/workers/search_indexing_worker.ex +++ b/lib/pleroma/workers/search_indexing_worker.ex @@ -1,7 +1,7 @@ defmodule Pleroma.Workers.SearchIndexingWorker do - use Pleroma.Workers.WorkerHelper, queue: "search_indexing" + use Oban.Worker, queue: :search_indexing, max_attempts: 2 - @impl Oban.Worker + @impl true alias Pleroma.Config.Getting, as: Config @@ -21,6 +21,6 @@ def perform(%Job{args: %{"op" => "remove_from_index", "object" => object_id}}) d search_module.remove_from_index(object) end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/user_refresh_worker.ex b/lib/pleroma/workers/user_refresh_worker.ex index fb90e9c9c0..222a4a8f7e 100644 --- a/lib/pleroma/workers/user_refresh_worker.ex +++ b/lib/pleroma/workers/user_refresh_worker.ex @@ -12,6 +12,6 @@ def perform(%Job{args: %{"ap_id" => ap_id}}) do User.fetch_by_ap_id(ap_id) end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(15) end diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex index c549d3cd65..f4232d02af 100644 --- a/lib/pleroma/workers/web_pusher_worker.ex +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -7,9 +7,9 @@ defmodule Pleroma.Workers.WebPusherWorker do alias Pleroma.Repo alias Pleroma.Web.Push.Impl - use Pleroma.Workers.WorkerHelper, queue: "web_push" + use Oban.Worker, queue: :web_push - @impl Oban.Worker + @impl true def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do notification = Notification @@ -20,6 +20,6 @@ def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_i |> Enum.each(&Impl.deliver(&1)) end - @impl Oban.Worker + @impl true def timeout(_job), do: :timer.seconds(5) end diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex deleted file mode 100644 index 1d20cbd893..0000000000 --- a/lib/pleroma/workers/worker_helper.ex +++ /dev/null @@ -1,48 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2022 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.WorkerHelper do - alias Pleroma.Config - alias Pleroma.Workers.WorkerHelper - - def worker_args(queue) do - case Config.get([:workers, :retries, queue]) do - nil -> [] - max_attempts -> [max_attempts: max_attempts] - end - end - - def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do - backoff = - :math.pow(attempt, pow) + - base_backoff + - :rand.uniform(2 * base_backoff) * attempt - - trunc(backoff) - end - - defmacro __using__(opts) do - caller_module = __CALLER__.module - queue = Keyword.fetch!(opts, :queue) - - quote do - # Note: `max_attempts` is intended to be overridden in `new/2` call - use Oban.Worker, - queue: unquote(queue), - max_attempts: 1 - - alias Oban.Job - - def enqueue(op, params, worker_args \\ []) do - params = Map.merge(%{"op" => op}, params) - queue_atom = String.to_atom(unquote(queue)) - worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom) - - unquote(caller_module) - |> apply(:new, [params, worker_args]) - |> Oban.insert() - end - end - end -end diff --git a/test/mix/tasks/pleroma/database_test.exs b/test/mix/tasks/pleroma/database_test.exs index a51a3bf3dc..96a9255288 100644 --- a/test/mix/tasks/pleroma/database_test.exs +++ b/test/mix/tasks/pleroma/database_test.exs @@ -623,10 +623,12 @@ test "it adds to expiration old statuses" do expires_at = DateTime.add(DateTime.utc_now(), 60 * 61) - Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ - activity_id: activity_id3, - expires_at: expires_at - }) + Pleroma.Workers.PurgeExpiredActivity.enqueue( + %{ + activity_id: activity_id3 + }, + scheduled_at: expires_at + ) Mix.Tasks.Pleroma.Database.run(["ensure_expiration"]) diff --git a/test/pleroma/workers/purge_expired_activity_test.exs b/test/pleroma/workers/purge_expired_activity_test.exs index 040ff6a51c..ea563d3d31 100644 --- a/test/pleroma/workers/purge_expired_activity_test.exs +++ b/test/pleroma/workers/purge_expired_activity_test.exs @@ -14,10 +14,12 @@ test "enqueue job" do activity = insert(:note_activity) assert {:ok, _} = - PurgeExpiredActivity.enqueue(%{ - activity_id: activity.id, - expires_at: DateTime.add(DateTime.utc_now(), 3601) - }) + PurgeExpiredActivity.enqueue( + %{ + activity_id: activity.id + }, + scheduled_at: DateTime.add(DateTime.utc_now(), 3601) + ) assert_enqueued( worker: Pleroma.Workers.PurgeExpiredActivity, @@ -34,10 +36,12 @@ test "error if user was not found" do activity = insert(:note_activity) assert {:ok, _} = - PurgeExpiredActivity.enqueue(%{ - activity_id: activity.id, - expires_at: DateTime.add(DateTime.utc_now(), 3601) - }) + PurgeExpiredActivity.enqueue( + %{ + activity_id: activity.id + }, + scheduled_at: DateTime.add(DateTime.utc_now(), 3601) + ) user = Pleroma.User.get_by_ap_id(activity.actor) Pleroma.Repo.delete(user) @@ -48,10 +52,12 @@ test "error if user was not found" do test "error if actiivity was not found" do assert {:ok, _} = - PurgeExpiredActivity.enqueue(%{ - activity_id: "some_id", - expires_at: DateTime.add(DateTime.utc_now(), 3601) - }) + PurgeExpiredActivity.enqueue( + %{ + activity_id: "some_id" + }, + scheduled_at: DateTime.add(DateTime.utc_now(), 3601) + ) assert {:cancel, :activity_not_found} = perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"}) From 1d3a92be1101b04d1b69b8b3fd030d5d06f4dbea Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 28 Jun 2024 12:06:43 -0400 Subject: [PATCH 2/3] Remove :workers config from ConfigDB --- config/description.exs | 17 ----------------- ...240628160536_deprecate_config_db_workers.exs | 7 +++++++ 2 files changed, 7 insertions(+), 17 deletions(-) create mode 100644 priv/repo/migrations/20240628160536_deprecate_config_db_workers.exs diff --git a/config/description.exs b/config/description.exs index 2809e91307..15faecb38c 100644 --- a/config/description.exs +++ b/config/description.exs @@ -2013,23 +2013,6 @@ } ] }, - %{ - group: :pleroma, - key: :workers, - type: :group, - description: "Includes custom worker options not interpretable directly by `Oban`", - children: [ - %{ - key: :retries, - type: {:keyword, :integer}, - description: "Max retry attempts for failed jobs, per `Oban` queue", - suggestions: [ - federator_incoming: 5, - federator_outgoing: 5 - ] - } - ] - }, %{ group: :pleroma, key: Pleroma.Web.Metadata, diff --git a/priv/repo/migrations/20240628160536_deprecate_config_db_workers.exs b/priv/repo/migrations/20240628160536_deprecate_config_db_workers.exs new file mode 100644 index 0000000000..549dd22e9e --- /dev/null +++ b/priv/repo/migrations/20240628160536_deprecate_config_db_workers.exs @@ -0,0 +1,7 @@ +defmodule Pleroma.Repo.Migrations.DeprecateConfigDBWorkers do + use Ecto.Migration + + def change do + execute("DELETE FROM config WHERE config.group = ':workers'") + end +end From d56b889cf1f530364ecd10836f8bd11b67eccb47 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 28 Jun 2024 12:21:27 -0400 Subject: [PATCH 3/3] Update changelog --- changelog.d/workerhelper.change | 1 + changelog.d/workerhelper.skip | 0 2 files changed, 1 insertion(+) create mode 100644 changelog.d/workerhelper.change delete mode 100644 changelog.d/workerhelper.skip diff --git a/changelog.d/workerhelper.change b/changelog.d/workerhelper.change new file mode 100644 index 0000000000..7a20c4af82 --- /dev/null +++ b/changelog.d/workerhelper.change @@ -0,0 +1 @@ +Worker configuration is no longer available. This only affects custom max_retries values for Oban queues. diff --git a/changelog.d/workerhelper.skip b/changelog.d/workerhelper.skip deleted file mode 100644 index e69de29bb2..0000000000