Merge branch 'remove/workerhelper' into 'develop'

Remove WorkerHelper

See merge request pleroma/pleroma!4166
This commit is contained in:
feld 2024-08-07 13:26:41 +00:00
commit a2490ddd9f
43 changed files with 208 additions and 228 deletions

View file

@ -0,0 +1 @@
Worker configuration is no longer available. This only affects custom max_retries values for Oban queues.

View file

@ -600,13 +600,6 @@
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker} {"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
] ]
config :pleroma, :workers,
retries: [
federator_incoming: 5,
federator_outgoing: 5,
search_indexing: 2
]
config :pleroma, Pleroma.Formatter, config :pleroma, Pleroma.Formatter,
class: false, class: false,
rel: "ugc", rel: "ugc",

View file

@ -2013,23 +2013,6 @@
} }
] ]
}, },
%{
group: :pleroma,
key: :workers,
type: :group,
description: "Includes custom worker options not interpretable directly by `Oban`",
children: [
%{
key: :retries,
type: {:keyword, :integer},
description: "Max retry attempts for failed jobs, per `Oban` queue",
suggestions: [
federator_incoming: 5,
federator_outgoing: 5
]
}
]
},
%{ %{
group: :pleroma, group: :pleroma,
key: Pleroma.Web.Metadata, key: Pleroma.Web.Metadata,

View file

@ -295,10 +295,12 @@ def run(["ensure_expiration"]) do
|> DateTime.from_naive!("Etc/UTC") |> DateTime.from_naive!("Etc/UTC")
|> Timex.shift(days: days) |> Timex.shift(days: days)
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ Pleroma.Workers.PurgeExpiredActivity.enqueue(
activity_id: activity.id, %{
expires_at: expires_at activity_id: activity.id
}) },
scheduled_at: expires_at
)
end) end)
end) end)
|> Stream.run() |> Stream.run()

View file

@ -25,7 +25,8 @@ def deliver_async(email, config \\ []) do
|> :erlang.term_to_binary() |> :erlang.term_to_binary()
|> Base.encode64() |> Base.encode64()
MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config}) MailerWorker.new(%{"op" => "email", "encoded_email" => encoded_email, "config" => config})
|> Oban.insert()
end end
@doc "callback to perform send email from queue" @doc "callback to perform send email from queue"

View file

@ -133,10 +133,13 @@ defp maybe_add_expires_at(changeset, %{expires_in: nil}) do
defp maybe_add_expires_at(changeset, _), do: changeset defp maybe_add_expires_at(changeset, _), do: changeset
defp maybe_add_expiration_job(%{expires_at: %NaiveDateTime{} = expires_at} = filter) do defp maybe_add_expiration_job(%{expires_at: %NaiveDateTime{} = expires_at} = filter) do
Pleroma.Workers.PurgeExpiredFilter.enqueue(%{ Pleroma.Workers.PurgeExpiredFilter.new(
filter_id: filter.id, %{
expires_at: DateTime.from_naive!(expires_at, "Etc/UTC") filter_id: filter.id
}) },
scheduled_at: DateTime.from_naive!(expires_at, "Etc/UTC")
)
|> Oban.insert()
end end
defp maybe_add_expiration_job(_), do: {:ok, nil} defp maybe_add_expiration_job(_), do: {:ok, nil}

View file

@ -297,7 +297,8 @@ defp scrape_metadata(%URI{} = instance_uri) do
all of those users' activities and notifications. all of those users' activities and notifications.
""" """
def delete_users_and_activities(host) when is_binary(host) do def delete_users_and_activities(host) when is_binary(host) do
DeleteWorker.enqueue("delete_instance", %{"host" => host}) DeleteWorker.new(%{"op" => "delete_instance", "host" => host})
|> Oban.insert()
end end
def perform(:delete_instance, host) when is_binary(host) do def perform(:delete_instance, host) when is_binary(host) do

View file

@ -52,11 +52,14 @@ defp expired?(%__MODULE__{valid_until: valid_until}) do
@spec create(User.t(), Authorization.t() | nil) :: {:ok, t()} | {:error, Ecto.Changeset.t()} @spec create(User.t(), Authorization.t() | nil) :: {:ok, t()} | {:error, Ecto.Changeset.t()}
def create(user, authorization \\ nil) do def create(user, authorization \\ nil) do
with {:ok, token} <- do_create(user, authorization) do with {:ok, token} <- do_create(user, authorization) do
Pleroma.Workers.PurgeExpiredToken.enqueue(%{ Pleroma.Workers.PurgeExpiredToken.new(
token_id: token.id, %{
valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"), token_id: token.id,
mod: __MODULE__ mod: __MODULE__
}) },
scheduled_at: DateTime.from_naive!(token.valid_until, "Etc/UTC")
)
|> Oban.insert()
{:ok, token} {:ok, token}
end end

View file

@ -255,7 +255,8 @@ def delete(%Object{data: %{"id" => id}} = object) do
@spec cleanup_attachments(boolean(), Object.t()) :: @spec cleanup_attachments(boolean(), Object.t()) ::
{:ok, Oban.Job.t() | nil} {:ok, Oban.Job.t() | nil}
def cleanup_attachments(true, %Object{} = object) do def cleanup_attachments(true, %Object{} = object) do
AttachmentsCleanupWorker.enqueue("cleanup_attachments", %{"object" => object}) AttachmentsCleanupWorker.new(%{"op" => "cleanup_attachments", "object" => object})
|> Oban.insert()
end end
def cleanup_attachments(_, _), do: {:ok, nil} def cleanup_attachments(_, _), do: {:ok, nil}

View file

@ -2,11 +2,13 @@ defmodule Pleroma.Search do
alias Pleroma.Workers.SearchIndexingWorker alias Pleroma.Workers.SearchIndexingWorker
def add_to_index(%Pleroma.Activity{id: activity_id}) do def add_to_index(%Pleroma.Activity{id: activity_id}) do
SearchIndexingWorker.enqueue("add_to_index", %{"activity" => activity_id}) SearchIndexingWorker.new(%{"op" => "add_to_index", "activity" => activity_id})
|> Oban.insert()
end end
def remove_from_index(%Pleroma.Object{id: object_id}) do def remove_from_index(%Pleroma.Object{id: object_id}) do
SearchIndexingWorker.enqueue("remove_from_index", %{"object" => object_id}) SearchIndexingWorker.new(%{"op" => "remove_from_index", "object" => object_id})
|> Oban.insert()
end end
def search(query, options) do def search(query, options) do

View file

@ -736,7 +736,8 @@ def update_password_reset_pending(user, value) do
end end
def force_password_reset_async(user) do def force_password_reset_async(user) do
BackgroundWorker.enqueue("force_password_reset", %{"user_id" => user.id}) BackgroundWorker.new(%{"op" => "force_password_reset", "user_id" => user.id})
|> Oban.insert()
end end
@spec force_password_reset(User.t()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()} @spec force_password_reset(User.t()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()}
@ -1218,7 +1219,8 @@ def update_and_set_cache(struct, params) do
def update_and_set_cache(changeset) do def update_and_set_cache(changeset) do
with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
if get_change(changeset, :raw_fields) do if get_change(changeset, :raw_fields) do
BackgroundWorker.enqueue("verify_fields_links", %{"user_id" => user.id}) BackgroundWorker.new(%{"op" => "verify_fields_links", "user_id" => user.id})
|> Oban.insert()
end end
set_cache(user) set_cache(user)
@ -1589,11 +1591,11 @@ def mute(%User{} = muter, %User{} = mutee, params \\ %{}) do
)) || )) ||
{:ok, nil} do {:ok, nil} do
if duration > 0 do if duration > 0 do
Pleroma.Workers.MuteExpireWorker.enqueue( Pleroma.Workers.MuteExpireWorker.new(
"unmute_user", %{"op" => "unmute_user", "muter_id" => muter.id, "mutee_id" => mutee.id},
%{"muter_id" => muter.id, "mutee_id" => mutee.id},
scheduled_at: expires_at scheduled_at: expires_at
) )
|> Oban.insert()
end end
@cachex.del(:user_cache, "muted_users_ap_ids:#{muter.ap_id}") @cachex.del(:user_cache, "muted_users_ap_ids:#{muter.ap_id}")
@ -1836,7 +1838,8 @@ defp maybe_filter_on_ap_id(query, ap_ids) when is_list(ap_ids) do
defp maybe_filter_on_ap_id(query, _ap_ids), do: query defp maybe_filter_on_ap_id(query, _ap_ids), do: query
def set_activation_async(user, status \\ true) do def set_activation_async(user, status \\ true) do
BackgroundWorker.enqueue("user_activation", %{"user_id" => user.id, "status" => status}) BackgroundWorker.new(%{"op" => "user_activation", "user_id" => user.id, "status" => status})
|> Oban.insert()
end end
@spec set_activation([User.t()], boolean()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()} @spec set_activation([User.t()], boolean()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()}
@ -1983,7 +1986,9 @@ def delete(users) when is_list(users) do
def delete(%User{} = user) do def delete(%User{} = user) do
# Purge the user immediately # Purge the user immediately
purge(user) purge(user)
DeleteWorker.enqueue("delete_user", %{"user_id" => user.id})
DeleteWorker.new(%{"op" => "delete_user", "user_id" => user.id})
|> Oban.insert()
end end
# *Actually* delete the user from the DB # *Actually* delete the user from the DB

View file

@ -63,23 +63,29 @@ defp handle_error(op, user_id, error) do
end end
def blocks_import(%User{} = blocker, [_ | _] = identifiers) do def blocks_import(%User{} = blocker, [_ | _] = identifiers) do
BackgroundWorker.enqueue( BackgroundWorker.new(%{
"blocks_import", "op" => "blocks_import",
%{"user_id" => blocker.id, "identifiers" => identifiers} "user_id" => blocker.id,
) "identifiers" => identifiers
})
|> Oban.insert()
end end
def follow_import(%User{} = follower, [_ | _] = identifiers) do def follow_import(%User{} = follower, [_ | _] = identifiers) do
BackgroundWorker.enqueue( BackgroundWorker.new(%{
"follow_import", "op" => "follow_import",
%{"user_id" => follower.id, "identifiers" => identifiers} "user_id" => follower.id,
) "identifiers" => identifiers
})
|> Oban.insert()
end end
def mutes_import(%User{} = user, [_ | _] = identifiers) do def mutes_import(%User{} = user, [_ | _] = identifiers) do
BackgroundWorker.enqueue( BackgroundWorker.new(%{
"mutes_import", "op" => "mutes_import",
%{"user_id" => user.id, "identifiers" => identifiers} "user_id" => user.id,
) "identifiers" => identifiers
})
|> Oban.insert()
end end
end end

View file

@ -222,10 +222,12 @@ defp maybe_create_activity_expiration(
%{data: %{"expires_at" => %DateTime{} = expires_at}} = activity %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
) do ) do
with {:ok, _job} <- with {:ok, _job} <-
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ Pleroma.Workers.PurgeExpiredActivity.enqueue(
activity_id: activity.id, %{
expires_at: expires_at activity_id: activity.id
}) do },
scheduled_at: expires_at
) do
{:ok, activity} {:ok, activity}
end end
end end
@ -446,10 +448,12 @@ def move(%User{} = origin, %User{} = target, local \\ true) do
_ <- notify_and_stream(activity) do _ <- notify_and_stream(activity) do
maybe_federate(activity) maybe_federate(activity)
BackgroundWorker.enqueue("move_following", %{ BackgroundWorker.new(%{
"op" => "move_following",
"origin_id" => origin.id, "origin_id" => origin.id,
"target_id" => target.id "target_id" => target.id
}) })
|> Oban.insert()
{:ok, activity} {:ok, activity}
else else
@ -1797,10 +1801,12 @@ def enqueue_pin_fetches(%{pinned_objects: pins}) do
# enqueue a task to fetch all pinned objects # enqueue a task to fetch all pinned objects
Enum.each(pins, fn {ap_id, _} -> Enum.each(pins, fn {ap_id, _} ->
if is_nil(Object.get_cached_by_ap_id(ap_id)) do if is_nil(Object.get_cached_by_ap_id(ap_id)) do
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{ Pleroma.Workers.RemoteFetcherWorker.new(%{
"op" => "fetch_remote",
"id" => ap_id, "id" => ap_id,
"depth" => 1 "depth" => 1
}) })
|> Oban.insert()
end end
end) end)
end end

View file

@ -31,11 +31,11 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
""" """
@spec enqueue_one(map(), Keyword.t()) :: {:ok, %Oban.Job{}} @spec enqueue_one(map(), Keyword.t()) :: {:ok, %Oban.Job{}}
def enqueue_one(%{} = params, worker_args \\ []) do def enqueue_one(%{} = params, worker_args \\ []) do
PublisherWorker.enqueue( PublisherWorker.new(
"publish_one", %{"op" => "publish_one", "params" => params},
%{"params" => params},
worker_args worker_args
) )
|> Oban.insert()
end end
@doc """ @doc """

View file

@ -223,10 +223,12 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do
if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
object.data["replies"] != nil do object.data["replies"] != nil do
for reply_id <- object.data["replies"] do for reply_id <- object.data["replies"] do
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{ Pleroma.Workers.RemoteFetcherWorker.new(%{
"op" => "fetch_remote",
"id" => reply_id, "id" => reply_id,
"depth" => reply_depth "depth" => reply_depth
}) })
|> Oban.insert()
end end
end end
@ -410,10 +412,12 @@ def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
{:ok, expires_at} = {:ok, expires_at} =
Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at]) Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ Pleroma.Workers.PurgeExpiredActivity.enqueue(
activity_id: meta[:activity_id], %{
expires_at: expires_at activity_id: meta[:activity_id]
}) },
scheduled_at: expires_at
)
end end
{:ok, object, meta} {:ok, object, meta}

View file

@ -559,11 +559,11 @@ def add_mute(activity, user, params \\ %{}) do
with {:ok, _} <- ThreadMute.add_mute(user.id, activity.data["context"]), with {:ok, _} <- ThreadMute.add_mute(user.id, activity.data["context"]),
_ <- Pleroma.Notification.mark_context_as_read(user, activity.data["context"]) do _ <- Pleroma.Notification.mark_context_as_read(user, activity.data["context"]) do
if expires_in > 0 do if expires_in > 0 do
Pleroma.Workers.MuteExpireWorker.enqueue( Pleroma.Workers.MuteExpireWorker.new(
"unmute_conversation", %{"op" => "unmute_conversation", "user_id" => user.id, "activity_id" => activity.id},
%{"user_id" => user.id, "activity_id" => activity.id},
schedule_in: expires_in schedule_in: expires_in
) )
|> Oban.insert()
end end
{:ok, activity} {:ok, activity}

View file

@ -35,22 +35,30 @@ def allowed_thread_distance?(distance) do
end end
# Client API # Client API
def incoming_ap_doc(%{params: _params, req_headers: _req_headers} = args) do def incoming_ap_doc(%{params: params, req_headers: req_headers}) do
job_args = Enum.into(args, %{}, fn {k, v} -> {Atom.to_string(k), v} end) ReceiverWorker.new(
%{
ReceiverWorker.enqueue( "op" => "incoming_ap_doc",
"incoming_ap_doc", "req_headers" => req_headers,
Map.put(job_args, "timeout", :timer.seconds(20)), "params" => params,
"timeout" => :timer.seconds(20)
},
priority: 2 priority: 2
) )
|> Oban.insert()
end end
def incoming_ap_doc(%{"type" => "Delete"} = params) do def incoming_ap_doc(%{"type" => "Delete"} = params) do
ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}, priority: 3, queue: :slow) ReceiverWorker.new(%{"op" => "incoming_ap_doc", "params" => params},
priority: 3,
queue: :slow
)
|> Oban.insert()
end end
def incoming_ap_doc(params) do def incoming_ap_doc(params) do
ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) ReceiverWorker.new(%{"op" => "incoming_ap_doc", "params" => params})
|> Oban.insert()
end end
@impl true @impl true
@ -60,9 +68,10 @@ def publish(%{id: "pleroma:fakeid"} = activity) do
@impl true @impl true
def publish(%Pleroma.Activity{data: %{"type" => type}} = activity) do def publish(%Pleroma.Activity{data: %{"type" => type}} = activity) do
PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}, PublisherWorker.new(%{"op" => "publish", "activity_id" => activity.id},
priority: publish_priority(type) priority: publish_priority(type)
) )
|> Oban.insert()
end end
defp publish_priority("Delete"), do: 3 defp publish_priority("Delete"), do: 3

View file

@ -100,11 +100,10 @@ defp put_valid_until(changeset, attrs) do
def create(%App{} = app, %User{} = user, attrs \\ %{}) do def create(%App{} = app, %User{} = user, attrs \\ %{}) do
with {:ok, token} <- do_create(app, user, attrs) do with {:ok, token} <- do_create(app, user, attrs) do
if Pleroma.Config.get([:oauth2, :clean_expired_tokens]) do if Pleroma.Config.get([:oauth2, :clean_expired_tokens]) do
Pleroma.Workers.PurgeExpiredToken.enqueue(%{ Pleroma.Workers.PurgeExpiredToken.new(%{token_id: token.id, mod: __MODULE__},
token_id: token.id, scheduled_at: DateTime.from_naive!(token.valid_until, "Etc/UTC")
valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"), )
mod: __MODULE__ |> Oban.insert()
})
end end
{:ok, token} {:ok, token}

View file

@ -28,6 +28,6 @@ def enabled, do: match?([subject: _, public_key: _, private_key: _], vapid_confi
@spec send(Pleroma.Notification.t()) :: @spec send(Pleroma.Notification.t()) ::
{:ok, Oban.Job.t()} | {:error, Oban.Job.changeset() | term()} {:ok, Oban.Job.t()} | {:error, Oban.Job.changeset() | term()}
def send(notification) do def send(notification) do
WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id}) WebPusherWorker.new(%{"op" => "web_push", "notification_id" => notification.id})
end end
end end

View file

@ -8,9 +8,9 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
use Pleroma.Workers.WorkerHelper, queue: "slow" use Oban.Worker, queue: :slow
@impl Oban.Worker @impl true
def perform(%Job{ def perform(%Job{
args: %{ args: %{
"op" => "cleanup_attachments", "op" => "cleanup_attachments",
@ -31,7 +31,7 @@ def perform(%Job{
def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip} def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(900) def timeout(_job), do: :timer.seconds(900)
defp do_clean({object_ids, attachment_urls}) do defp do_clean({object_ids, attachment_urls}) do

View file

@ -5,9 +5,9 @@
defmodule Pleroma.Workers.BackgroundWorker do defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.User alias Pleroma.User
use Pleroma.Workers.WorkerHelper, queue: "background" use Oban.Worker, queue: :background
@impl Oban.Worker @impl true
def perform(%Job{args: %{"op" => "user_activation", "user_id" => user_id, "status" => status}}) do def perform(%Job{args: %{"op" => "user_activation", "user_id" => user_id, "status" => status}}) do
user = User.get_cached_by_id(user_id) user = User.get_cached_by_id(user_id)
@ -39,6 +39,6 @@ def perform(%Job{args: %{"op" => "verify_fields_links", "user_id" => user_id}})
User.perform(:verify_fields_links, user) User.perform(:verify_fields_links, user)
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(15) def timeout(_job), do: :timer.seconds(900)
end end

View file

@ -9,7 +9,7 @@ defmodule Pleroma.Workers.BackupWorker do
alias Pleroma.Config.Getting, as: Config alias Pleroma.Config.Getting, as: Config
alias Pleroma.User.Backup alias Pleroma.User.Backup
@impl Oban.Worker @impl true
def perform(%Job{ def perform(%Job{
args: %{"op" => "process", "backup_id" => backup_id} args: %{"op" => "process", "backup_id" => backup_id}
}) do }) do
@ -32,7 +32,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do
end end
end end
@impl Oban.Worker @impl true
def timeout(_job), do: Config.get([Backup, :timeout], :timer.minutes(30)) def timeout(_job), do: Config.get([Backup, :timeout], :timer.minutes(30))
defp has_email?(user) do defp has_email?(user) do

View file

@ -18,7 +18,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
require Logger require Logger
@impl Oban.Worker @impl true
def perform(_job) do def perform(_job) do
config = Config.get([:email_notifications, :digest]) config = Config.get([:email_notifications, :digest])
@ -59,6 +59,6 @@ def send_email(user) do
User.touch_last_digest_emailed_at(user) User.touch_last_digest_emailed_at(user)
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -9,9 +9,9 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do
import Ecto.Query import Ecto.Query
use Pleroma.Workers.WorkerHelper, queue: "background" use Oban.Worker, queue: :background
@impl Oban.Worker @impl true
def perform(_job) do def perform(_job) do
if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do
today = NaiveDateTime.utc_now() |> Timex.beginning_of_day() today = NaiveDateTime.utc_now() |> Timex.beginning_of_day()
@ -61,6 +61,6 @@ def perform(_job) do
:ok :ok
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -6,10 +6,9 @@ defmodule Pleroma.Workers.DeleteWorker do
alias Pleroma.Instances.Instance alias Pleroma.Instances.Instance
alias Pleroma.User alias Pleroma.User
use Pleroma.Workers.WorkerHelper, queue: "slow" use Oban.Worker, queue: :slow
@impl Oban.Worker
@impl true
def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do
user = User.get_cached_by_id(user_id) user = User.get_cached_by_id(user_id)
User.perform(:delete, user) User.perform(:delete, user)
@ -19,6 +18,6 @@ def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do
Instance.perform(:delete_instance, host) Instance.perform(:delete_instance, host)
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(900) def timeout(_job), do: :timer.seconds(900)
end end

View file

@ -3,9 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do defmodule Pleroma.Workers.MailerWorker do
use Pleroma.Workers.WorkerHelper, queue: "background" use Oban.Worker, queue: :background
@impl Oban.Worker @impl true
def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do
encoded_email encoded_email
|> Base.decode64!() |> Base.decode64!()
@ -13,6 +13,6 @@ def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "con
|> Pleroma.Emails.Mailer.deliver(config) |> Pleroma.Emails.Mailer.deliver(config)
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -3,9 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MuteExpireWorker do defmodule Pleroma.Workers.MuteExpireWorker do
use Pleroma.Workers.WorkerHelper, queue: "background" use Oban.Worker, queue: :background
@impl Oban.Worker @impl true
def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do
Pleroma.User.unmute(muter_id, mutee_id) Pleroma.User.unmute(muter_id, mutee_id)
:ok :ok
@ -18,6 +18,6 @@ def perform(%Job{
:ok :ok
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -6,13 +6,13 @@ defmodule Pleroma.Workers.PollWorker do
@moduledoc """ @moduledoc """
Generates notifications when a poll ends. Generates notifications when a poll ends.
""" """
use Pleroma.Workers.WorkerHelper, queue: "background" use Oban.Worker, queue: :background
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Object alias Pleroma.Object
@impl Oban.Worker @impl true
def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
with %Activity{} = activity <- find_poll_activity(activity_id), with %Activity{} = activity <- find_poll_activity(activity_id),
{:ok, notifications} <- Notification.create_poll_notifications(activity) do {:ok, notifications} <- Notification.create_poll_notifications(activity) do
@ -23,7 +23,7 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
end end
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
defp find_poll_activity(activity_id) do defp find_poll_activity(activity_id) do

View file

@ -6,13 +6,9 @@ defmodule Pleroma.Workers.PublisherWorker do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" use Oban.Worker, queue: :federator_outgoing, max_attempts: 5
def backoff(%Job{attempt: attempt}) when is_integer(attempt) do @impl true
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
@impl Oban.Worker
def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do
activity = Activity.get_by_id(activity_id) activity = Activity.get_by_id(activity_id)
Federator.perform(:publish, activity) Federator.perform(:publish, activity)
@ -23,6 +19,18 @@ def perform(%Job{args: %{"op" => "publish_one", "params" => params}}) do
Federator.perform(:publish_one, params) Federator.perform(:publish_one, params)
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(10) def timeout(_job), do: :timer.seconds(10)
@base_backoff 15
@pow 5
@impl true
def backoff(%Job{attempt: attempt}) when is_integer(attempt) do
backoff =
:math.pow(attempt, @pow) +
@base_backoff +
:rand.uniform(2 * @base_backoff) * attempt
trunc(backoff)
end
end end

View file

@ -13,16 +13,13 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do
alias Pleroma.Activity alias Pleroma.Activity
@spec enqueue(map()) :: @spec enqueue(map(), list()) ::
{:ok, Oban.Job.t()} {:ok, Oban.Job.t()}
| {:error, :expired_activities_disabled} | {:error, :expired_activities_disabled}
| {:error, :expiration_too_close} | {:error, :expiration_too_close}
def enqueue(args) do def enqueue(params, worker_args) do
with true <- enabled?() do with true <- enabled?() do
{scheduled_at, args} = Map.pop(args, :expires_at) new(params, worker_args)
args
|> new(scheduled_at: scheduled_at)
|> Oban.insert() |> Oban.insert()
end end
end end
@ -35,7 +32,7 @@ def perform(%Oban.Job{args: %{"activity_id" => id}}) do
end end
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
defp enabled? do defp enabled? do

View file

@ -31,7 +31,7 @@ def perform(%Job{args: %{"filter_id" => id}}) do
|> Repo.delete() |> Repo.delete()
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
@spec get_expiration(pos_integer()) :: Job.t() | nil @spec get_expiration(pos_integer()) :: Job.t() | nil

View file

@ -9,16 +9,6 @@ defmodule Pleroma.Workers.PurgeExpiredToken do
use Oban.Worker, queue: :background, max_attempts: 1 use Oban.Worker, queue: :background, max_attempts: 1
@spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) ::
{:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()}
def enqueue(args) do
{scheduled_at, args} = Map.pop(args, :valid_until)
args
|> __MODULE__.new(scheduled_at: scheduled_at)
|> Oban.insert()
end
@impl true @impl true
def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
module module
@ -27,6 +17,6 @@ def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
|> Pleroma.Repo.delete() |> Pleroma.Repo.delete()
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -7,9 +7,9 @@ defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" use Oban.Worker, queue: :federator_incoming, max_attempts: 5
@impl Oban.Worker @impl true
def perform(%Job{ def perform(%Job{
args: %{ args: %{
@ -51,7 +51,7 @@ def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
end end
end end
@impl Oban.Worker @impl true
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)

View file

@ -5,9 +5,9 @@
defmodule Pleroma.Workers.RemoteFetcherWorker do defmodule Pleroma.Workers.RemoteFetcherWorker do
alias Pleroma.Object.Fetcher alias Pleroma.Object.Fetcher
use Pleroma.Workers.WorkerHelper, queue: "background" use Oban.Worker, queue: :background
@impl Oban.Worker @impl true
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
{:ok, _object} -> {:ok, _object} ->
@ -30,6 +30,6 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
end end
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(15) def timeout(_job), do: :timer.seconds(15)
end end

View file

@ -9,7 +9,7 @@ defmodule Pleroma.Workers.RichMediaWorker do
use Oban.Worker, queue: :background, max_attempts: 3, unique: [period: 300] use Oban.Worker, queue: :background, max_attempts: 3, unique: [period: 300]
@impl Oban.Worker @impl true
def perform(%Job{args: %{"op" => "expire", "url" => url} = _args}) do def perform(%Job{args: %{"op" => "expire", "url" => url} = _args}) do
Card.delete(url) Card.delete(url)
end end
@ -33,7 +33,7 @@ def perform(%Job{args: %{"op" => "backfill", "url" => _url} = args}) do
# a slow/infinite data stream and insert a negative cache entry for the URL # a slow/infinite data stream and insert a negative cache entry for the URL
# We pad it by 2 seconds to be certain a slow connection is detected and we # We pad it by 2 seconds to be certain a slow connection is detected and we
# can inject a negative cache entry for the URL # can inject a negative cache entry for the URL
@impl Oban.Worker @impl true
def timeout(_job) do def timeout(_job) do
Config.get!([:rich_media, :timeout]) + :timer.seconds(2) Config.get!([:rich_media, :timeout]) + :timer.seconds(2)
end end

View file

@ -7,7 +7,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
The worker to post scheduled activity. The worker to post scheduled activity.
""" """
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" use Oban.Worker, queue: :federator_outgoing, max_attempts: 5
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
@ -15,7 +15,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
require Logger require Logger
@impl Oban.Worker @impl true
def perform(%Job{args: %{"activity_id" => activity_id}}) do def perform(%Job{args: %{"activity_id" => activity_id}}) do
with %ScheduledActivity{} = scheduled_activity <- find_scheduled_activity(activity_id), with %ScheduledActivity{} = scheduled_activity <- find_scheduled_activity(activity_id),
%User{} = user <- find_user(scheduled_activity.user_id) do %User{} = user <- find_user(scheduled_activity.user_id) do
@ -37,7 +37,7 @@ def perform(%Job{args: %{"activity_id" => activity_id}}) do
end end
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
defp find_scheduled_activity(id) do defp find_scheduled_activity(id) do

View file

@ -1,7 +1,7 @@
defmodule Pleroma.Workers.SearchIndexingWorker do defmodule Pleroma.Workers.SearchIndexingWorker do
use Pleroma.Workers.WorkerHelper, queue: "search_indexing" use Oban.Worker, queue: :search_indexing, max_attempts: 2
@impl Oban.Worker @impl true
alias Pleroma.Config.Getting, as: Config alias Pleroma.Config.Getting, as: Config
@ -21,6 +21,6 @@ def perform(%Job{args: %{"op" => "remove_from_index", "object" => object_id}}) d
search_module.remove_from_index(object) search_module.remove_from_index(object)
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -12,6 +12,6 @@ def perform(%Job{args: %{"ap_id" => ap_id}}) do
User.fetch_by_ap_id(ap_id) User.fetch_by_ap_id(ap_id)
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(15) def timeout(_job), do: :timer.seconds(15)
end end

View file

@ -7,9 +7,9 @@ defmodule Pleroma.Workers.WebPusherWorker do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Web.Push.Impl alias Pleroma.Web.Push.Impl
use Pleroma.Workers.WorkerHelper, queue: "web_push" use Oban.Worker, queue: :web_push
@impl Oban.Worker @impl true
def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do
notification = notification =
Notification Notification
@ -20,6 +20,6 @@ def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_i
|> Enum.each(&Impl.deliver(&1)) |> Enum.each(&Impl.deliver(&1))
end end
@impl Oban.Worker @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
end end

View file

@ -1,48 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config
alias Pleroma.Workers.WorkerHelper
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
nil -> []
max_attempts -> [max_attempts: max_attempts]
end
end
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
backoff =
:math.pow(attempt, pow) +
base_backoff +
:rand.uniform(2 * base_backoff) * attempt
trunc(backoff)
end
defmacro __using__(opts) do
caller_module = __CALLER__.module
queue = Keyword.fetch!(opts, :queue)
quote do
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: unquote(queue),
max_attempts: 1
alias Oban.Job
def enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Oban.insert()
end
end
end
end

View file

@ -0,0 +1,7 @@
defmodule Pleroma.Repo.Migrations.DeprecateConfigDBWorkers do
use Ecto.Migration
def change do
execute("DELETE FROM config WHERE config.group = ':workers'")
end
end

View file

@ -623,10 +623,12 @@ test "it adds to expiration old statuses" do
expires_at = DateTime.add(DateTime.utc_now(), 60 * 61) expires_at = DateTime.add(DateTime.utc_now(), 60 * 61)
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{ Pleroma.Workers.PurgeExpiredActivity.enqueue(
activity_id: activity_id3, %{
expires_at: expires_at activity_id: activity_id3
}) },
scheduled_at: expires_at
)
Mix.Tasks.Pleroma.Database.run(["ensure_expiration"]) Mix.Tasks.Pleroma.Database.run(["ensure_expiration"])

View file

@ -14,10 +14,12 @@ test "enqueue job" do
activity = insert(:note_activity) activity = insert(:note_activity)
assert {:ok, _} = assert {:ok, _} =
PurgeExpiredActivity.enqueue(%{ PurgeExpiredActivity.enqueue(
activity_id: activity.id, %{
expires_at: DateTime.add(DateTime.utc_now(), 3601) activity_id: activity.id
}) },
scheduled_at: DateTime.add(DateTime.utc_now(), 3601)
)
assert_enqueued( assert_enqueued(
worker: Pleroma.Workers.PurgeExpiredActivity, worker: Pleroma.Workers.PurgeExpiredActivity,
@ -34,10 +36,12 @@ test "error if user was not found" do
activity = insert(:note_activity) activity = insert(:note_activity)
assert {:ok, _} = assert {:ok, _} =
PurgeExpiredActivity.enqueue(%{ PurgeExpiredActivity.enqueue(
activity_id: activity.id, %{
expires_at: DateTime.add(DateTime.utc_now(), 3601) activity_id: activity.id
}) },
scheduled_at: DateTime.add(DateTime.utc_now(), 3601)
)
user = Pleroma.User.get_by_ap_id(activity.actor) user = Pleroma.User.get_by_ap_id(activity.actor)
Pleroma.Repo.delete(user) Pleroma.Repo.delete(user)
@ -48,10 +52,12 @@ test "error if user was not found" do
test "error if actiivity was not found" do test "error if actiivity was not found" do
assert {:ok, _} = assert {:ok, _} =
PurgeExpiredActivity.enqueue(%{ PurgeExpiredActivity.enqueue(
activity_id: "some_id", %{
expires_at: DateTime.add(DateTime.utc_now(), 3601) activity_id: "some_id"
}) },
scheduled_at: DateTime.add(DateTime.utc_now(), 3601)
)
assert {:cancel, :activity_not_found} = assert {:cancel, :activity_not_found} =
perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"}) perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"})