From 8f285a787f7af0436de18ac140712801a69eff05 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 21 Jun 2024 13:24:47 -0400 Subject: [PATCH 01/23] Refactor backups to be fully controlled by Oban --- changelog.d/backups-refactor.change | 1 + config/test.exs | 2 + lib/pleroma/ecto_enums.ex | 8 - lib/pleroma/emails/user_email.ex | 35 +- lib/pleroma/user/backup.ex | 366 ++++++++---------- .../controllers/admin_api_controller.ex | 5 +- .../operations/pleroma_backup_operation.ex | 10 +- .../controllers/backup_controller.ex | 2 +- .../web/pleroma_api/views/backup_view.ex | 10 - lib/pleroma/workers/backup_worker.ex | 47 +-- .../20240622175346_backup_refactor.exs | 19 + 11 files changed, 209 insertions(+), 296 deletions(-) create mode 100644 changelog.d/backups-refactor.change create mode 100644 priv/repo/migrations/20240622175346_backup_refactor.exs diff --git a/changelog.d/backups-refactor.change b/changelog.d/backups-refactor.change new file mode 100644 index 0000000000..47fc741385 --- /dev/null +++ b/changelog.d/backups-refactor.change @@ -0,0 +1 @@ +Refactor the user backups code and improve test coverage diff --git a/config/test.exs b/config/test.exs index 410b9bbabb..2a007a22dd 100644 --- a/config/test.exs +++ b/config/test.exs @@ -187,6 +187,8 @@ config :pleroma, Pleroma.Web.RichMedia.Backfill, stream_out: Pleroma.Web.ActivityPub.ActivityPubMock +config :pleroma, Pleroma.User.Backup, tempdir: "test/tmp" + if File.exists?("./config/test.secret.exs") do import_config "test.secret.exs" else diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex index b346b39d6e..a4890b489a 100644 --- a/lib/pleroma/ecto_enums.ex +++ b/lib/pleroma/ecto_enums.ex @@ -27,11 +27,3 @@ failed: 4, manual: 5 ) - -defenum(Pleroma.User.Backup.State, - pending: 1, - running: 2, - complete: 3, - failed: 4, - invalid: 5 -) diff --git a/lib/pleroma/emails/user_email.ex b/lib/pleroma/emails/user_email.ex index 95b963764e..10d89d2f34 100644 --- a/lib/pleroma/emails/user_email.ex +++ b/lib/pleroma/emails/user_email.ex @@ -345,37 +345,22 @@ def unsubscribe_url(user, notifications_type) do Router.Helpers.subscription_url(Endpoint, :unsubscribe, token) end - def backup_is_ready_email(backup, admin_user_id \\ nil) do + def backup_is_ready_email(backup) do %{user: user} = Pleroma.Repo.preload(backup, :user) Gettext.with_locale_or_default user.language do download_url = Pleroma.Web.PleromaAPI.BackupView.download_url(backup) html_body = - if is_nil(admin_user_id) do - Gettext.dpgettext( - "static_pages", - "account archive email body - self-requested", - """ -

You requested a full backup of your Pleroma account. It's ready for download:

-

%{download_url}

- """, - download_url: download_url - ) - else - admin = Pleroma.Repo.get(User, admin_user_id) - - Gettext.dpgettext( - "static_pages", - "account archive email body - admin requested", - """ -

Admin @%{admin_nickname} requested a full backup of your Pleroma account. It's ready for download:

-

%{download_url}

- """, - admin_nickname: admin.nickname, - download_url: download_url - ) - end + Gettext.dpgettext( + "static_pages", + "account archive email body", + """ +

A full backup of your Pleroma account was requested. It's ready for download:

+

%{download_url}

+ """, + download_url: download_url + ) new() |> to(recipient(user)) diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 1821de6674..2568089a4c 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -14,9 +14,10 @@ defmodule Pleroma.User.Backup do alias Pleroma.Activity alias Pleroma.Bookmark + alias Pleroma.Config alias Pleroma.Repo + alias Pleroma.Uploaders.Uploader alias Pleroma.User - alias Pleroma.User.Backup.State alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.UserView @@ -29,71 +30,111 @@ defmodule Pleroma.User.Backup do field(:file_name, :string) field(:file_size, :integer, default: 0) field(:processed, :boolean, default: false) - field(:state, State, default: :invalid) - field(:processed_number, :integer, default: 0) + field(:tempdir, :string) belongs_to(:user, User, type: FlakeId.Ecto.CompatType) timestamps() end - @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config) + @doc """ + Schedules a job to backup a user if the number of backup requests has not exceeded the limit. - def create(user, admin_id \\ nil) do - with :ok <- validate_limit(user, admin_id), - {:ok, backup} <- user |> new() |> Repo.insert() do - BackupWorker.process(backup, admin_id) + Admins can directly call new/1 and schedule_backup/1 to bypass the limit. + """ + @spec user(User.t()) :: {:ok, t()} | {:error, any()} + def user(user) do + days = Config.get([__MODULE__, :limit_days]) + + with true <- permitted?(user), + %__MODULE__{} = backup <- new(user), + {:ok, inserted_backup} <- Repo.insert(backup), + {:ok, %Oban.Job{}} <- schedule_backup(inserted_backup) do + {:ok, inserted_backup} + else + false -> + {:error, + dngettext( + "errors", + "Last export was less than a day ago", + "Last export was less than %{days} days ago", + days, + days: days + )} + + e -> + {:error, e} end end + @doc "Generates a %Backup{} for a user with a random file name" + @spec new(User.t()) :: t() def new(user) do rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false) datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now()) name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip" %__MODULE__{ - user_id: user.id, content_type: "application/zip", file_name: name, - state: :pending + tempdir: tempdir(), + user: user } end - def delete(backup) do - uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) + @doc "Schedules the execution of the provided backup" + @spec schedule_backup(t()) :: {:ok, Oban.Job.t()} | {:error, any()} + def schedule_backup(backup) do + with false <- is_nil(backup.id) do + %{"op" => "process", "backup_id" => backup.id} + |> BackupWorker.new() + |> Oban.insert() + else + true -> + {:error, "Backup is missing id. Please insert it into the Repo first."} + + e -> + {:error, e} + end + end + + @doc "Deletes the backup archive file and removes the database record" + @spec delete_archive(t()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()} + def delete_archive(backup) do + uploader = Config.get([Pleroma.Upload, :uploader]) with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do Repo.delete(backup) end end - defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok + @doc "Schedules a job to delete the backup archive" + @spec schedule_delete(t()) :: {:ok, Oban.Job.t()} | {:error, any()} + def schedule_delete(backup) do + days = Config.get([__MODULE__, :purge_after_days]) + time = 60 * 60 * 24 * days + scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time) - defp validate_limit(user, nil) do - case get_last(user.id) do - %__MODULE__{inserted_at: inserted_at} -> - days = Pleroma.Config.get([__MODULE__, :limit_days]) - diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days) + %{"op" => "delete", "backup_id" => backup.id} + |> BackupWorker.new(scheduled_at: scheduled_at) + |> Oban.insert() + end - if diff > days do - :ok - else - {:error, - dngettext( - "errors", - "Last export was less than a day ago", - "Last export was less than %{days} days ago", - days, - days: days - )} - end - - nil -> - :ok + defp permitted?(user) do + with {_, %__MODULE__{inserted_at: inserted_at}} <- {:last, get_last(user)}, + days = Config.get([__MODULE__, :limit_days]), + diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days), + {_, true} <- {:diff, diff > days} do + true + else + {:last, nil} -> true + {:diff, false} -> false end end - def get_last(user_id) do + @doc "Returns last backup for the provided user" + @spec get_last(User.t()) :: t() + def get_last(%User{id: user_id}) do __MODULE__ |> where(user_id: ^user_id) |> order_by(desc: :id) @@ -101,6 +142,8 @@ def get_last(user_id) do |> Repo.one() end + @doc "Lists all existing backups for a user" + @spec list(User.t()) :: [Ecto.Schema.t() | term()] def list(%User{id: user_id}) do __MODULE__ |> where(user_id: ^user_id) @@ -108,94 +151,37 @@ def list(%User{id: user_id}) do |> Repo.all() end - def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do - __MODULE__ - |> where(user_id: ^user_id) - |> where([b], b.id != ^latest_id) - |> Repo.all() - |> Enum.each(&BackupWorker.delete/1) - end - - def get(id), do: Repo.get(__MODULE__, id) - - defp set_state(backup, state, processed_number \\ nil) do - struct = - %{state: state} - |> Pleroma.Maps.put_if_present(:processed_number, processed_number) - - backup - |> cast(struct, [:state, :processed_number]) - |> Repo.update() - end - - def process( - %__MODULE__{} = backup, - processor_module \\ __MODULE__.Processor - ) do - set_state(backup, :running, 0) - - current_pid = self() - - task = - Task.Supervisor.async_nolink( - Pleroma.TaskSupervisor, - processor_module, - :do_process, - [backup, current_pid] - ) - - wait_backup(backup, backup.processed_number, task) - end - - defp wait_backup(backup, current_processed, task) do - wait_time = @config_impl.get([__MODULE__, :process_wait_time]) - - receive do - {:progress, new_processed} -> - total_processed = current_processed + new_processed - - set_state(backup, :running, total_processed) - wait_backup(backup, total_processed, task) - - {:DOWN, _ref, _proc, _pid, reason} -> - backup = get(backup.id) - - if reason != :normal do - Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}") - - {:ok, backup} = set_state(backup, :failed) - - cleanup(backup) - - {:error, - %{ - backup: backup, - reason: :exit, - details: reason - }} - else - {:ok, backup} - end - after - wait_time -> - Logger.error( - "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating" - ) - - Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid) - - {:ok, backup} = set_state(backup, :failed) - - cleanup(backup) - - {:error, - %{ - backup: backup, - reason: :timeout - }} + @doc "Schedules deletion of all but the the most recent backup" + @spec remove_outdated(User.t()) :: :ok + def remove_outdated(user) do + with %__MODULE__{} = latest_backup <- get_last(user) do + __MODULE__ + |> where(user_id: ^user.id) + |> where([b], b.id != ^latest_backup.id) + |> Repo.all() + |> Enum.each(&schedule_delete/1) + else + _ -> :ok end end + def get_by_id(id), do: Repo.get(__MODULE__, id) + + @doc "Generates changeset for %Pleroma.User.Backup{}" + @spec changeset(%__MODULE__{}, map()) :: %Ecto.Changeset{} + def changeset(backup \\ %__MODULE__{}, attrs) do + backup + |> cast(attrs, [:content_type, :file_name, :file_size, :processed, :tempdir]) + end + + @doc "Updates the backup record" + @spec update_record(%__MODULE__{}, map()) :: {:ok, %__MODULE__{}} | {:error, %Ecto.Changeset{}} + def update_record(%__MODULE__{} = backup, attrs) do + backup + |> changeset(attrs) + |> Repo.update() + end + @files [ ~c"actor.json", ~c"outbox.json", @@ -204,53 +190,66 @@ defp wait_backup(backup, current_processed, task) do ~c"followers.json", ~c"following.json" ] - @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error - def export(%__MODULE__{} = backup, caller_pid) do - backup = Repo.preload(backup, :user) - dir = backup_tempdir(backup) - with :ok <- File.mkdir(dir), - :ok <- actor(dir, backup.user, caller_pid), - :ok <- statuses(dir, backup.user, caller_pid), - :ok <- likes(dir, backup.user, caller_pid), - :ok <- bookmarks(dir, backup.user, caller_pid), - :ok <- followers(dir, backup.user, caller_pid), - :ok <- following(dir, backup.user, caller_pid), - {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir), - {:ok, _} <- File.rm_rf(dir) do - {:ok, zip_path} + @spec run(t()) :: {:ok, t()} | {:error, :failed} + def run(%__MODULE__{} = backup) do + backup = Repo.preload(backup, :user) + tempfile = Path.join([backup.tempdir, backup.file_name]) + + with {_, :ok} <- {:mkdir, File.mkdir_p(backup.tempdir)}, + {_, :ok} <- {:actor, actor(backup.tempdir, backup.user)}, + {_, :ok} <- {:statuses, statuses(backup.tempdir, backup.user)}, + {_, :ok} <- {:likes, likes(backup.tempdir, backup.user)}, + {_, :ok} <- {:bookmarks, bookmarks(backup.tempdir, backup.user)}, + {_, :ok} <- {:followers, followers(backup.tempdir, backup.user)}, + {_, :ok} <- {:following, following(backup.tempdir, backup.user)}, + {_, {:ok, _zip_path}} <- + {:zip, :zip.create(to_charlist(tempfile), @files, cwd: to_charlist(backup.tempdir))}, + {_, {:ok, %File.Stat{size: zip_size}}} <- {:filestat, File.stat(tempfile)}, + {:ok, updated_backup} <- update_record(backup, %{file_size: zip_size}) do + {:ok, updated_backup} else - _ -> :error + _ -> + File.rm_rf(backup.tempdir) + {:error, :failed} end end - def dir(name) do - dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!() - Path.join(dir, name) + defp tempdir do + case Config.get([__MODULE__, :tempdir]) do + nil -> + System.tmp_dir!() + + path -> + rand = :crypto.strong_rand_bytes(8) |> Base.url_encode64(padding: false) + Path.join([path, rand]) + end end - def upload(%__MODULE__{} = backup, zip_path) do - uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) + @doc "Uploads the completed backup and marks it as processed" + @spec upload(t()) :: {:ok, t()} + def upload(%__MODULE__{tempdir: tempdir} = backup) when is_binary(tempdir) do + uploader = Config.get([Pleroma.Upload, :uploader]) upload = %Pleroma.Upload{ name: backup.file_name, - tempfile: zip_path, + tempfile: Path.join([tempdir, backup.file_name]), content_type: backup.content_type, path: Path.join("backups", backup.file_name) } - with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload), - :ok <- File.rm(zip_path) do - {:ok, upload} + with {:ok, _} <- Uploader.put_file(uploader, upload), + {:ok, uploaded_backup} <- update_record(backup, %{processed: true}), + {:ok, _} <- File.rm_rf(tempdir) do + {:ok, uploaded_backup} end end - defp actor(dir, user, caller_pid) do + defp actor(dir, user) do with {:ok, json} <- UserView.render("user.json", %{user: user}) |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"}) |> Jason.encode() do - send(caller_pid, {:progress, 1}) File.write(Path.join(dir, "actor.json"), json) end end @@ -269,22 +268,10 @@ defp write_header(file, name) do ) end - defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0 - - defp backup_tempdir(backup) do - name = String.trim_trailing(backup.file_name, ".zip") - dir(name) - end - - defp cleanup(backup) do - dir = backup_tempdir(backup) - File.rm_rf(dir) - end - - defp write(query, dir, name, fun, caller_pid) do + defp write(query, dir, name, fun) do path = Path.join(dir, "#{name}.json") - chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size]) + chunk_size = Config.get([__MODULE__, :process_chunk_size]) with {:ok, file} <- File.open(path, [:write, :utf8]), :ok <- write_header(file, name) do @@ -300,10 +287,6 @@ defp write(query, dir, name, fun, caller_pid) do end), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do - if should_report?(acc + 1, chunk_size) do - send(caller_pid, {:progress, chunk_size}) - end - acc + 1 else {:error, e} -> @@ -318,31 +301,29 @@ defp write(query, dir, name, fun, caller_pid) do end end) - send(caller_pid, {:progress, rem(total, chunk_size)}) - with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do File.close(file) end end end - defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do + defp bookmarks(dir, %{id: user_id} = _user) do Bookmark |> where(user_id: ^user_id) |> join(:inner, [b], activity in assoc(b, :activity)) |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)}) - |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid) + |> write(dir, "bookmarks", fn a -> {:ok, a.object} end) end - defp likes(dir, user, caller_pid) do + defp likes(dir, user) do user.ap_id |> Activity.Queries.by_actor() |> Activity.Queries.by_type("Like") |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)}) - |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid) + |> write(dir, "likes", fn a -> {:ok, a.object} end) end - defp statuses(dir, user, caller_pid) do + defp statuses(dir, user) do opts = %{} |> Map.put(:type, ["Create", "Announce"]) @@ -362,52 +343,17 @@ defp statuses(dir, user, caller_pid) do with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do {:ok, Map.delete(activity, "@context")} end - end, - caller_pid + end ) end - defp followers(dir, user, caller_pid) do + defp followers(dir, user) do User.get_followers_query(user) - |> write(dir, "followers", fn a -> {:ok, a.ap_id} end, caller_pid) + |> write(dir, "followers", fn a -> {:ok, a.ap_id} end) end - defp following(dir, user, caller_pid) do + defp following(dir, user) do User.get_friends_query(user) - |> write(dir, "following", fn a -> {:ok, a.ap_id} end, caller_pid) - end -end - -defmodule Pleroma.User.Backup.ProcessorAPI do - @callback do_process(%Pleroma.User.Backup{}, pid()) :: - {:ok, %Pleroma.User.Backup{}} | {:error, any()} -end - -defmodule Pleroma.User.Backup.Processor do - @behaviour Pleroma.User.Backup.ProcessorAPI - - alias Pleroma.Repo - alias Pleroma.User.Backup - - import Ecto.Changeset - - @impl true - def do_process(backup, current_pid) do - with {:ok, zip_file} <- Backup.export(backup, current_pid), - {:ok, %{size: size}} <- File.stat(zip_file), - {:ok, _upload} <- Backup.upload(backup, zip_file) do - backup - |> cast( - %{ - file_size: size, - processed: true, - state: :complete - }, - [:file_size, :processed, :state] - ) - |> Repo.update() - else - e -> {:error, e} - end + |> write(dir, "following", fn a -> {:ok, a.ap_id} end) end end diff --git a/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex b/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex index 1894000ff4..0f22dd5389 100644 --- a/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex +++ b/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex @@ -13,6 +13,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do alias Pleroma.ModerationLog alias Pleroma.Stats alias Pleroma.User + alias Pleroma.User.Backup alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.AdminAPI alias Pleroma.Web.AdminAPI.AccountView @@ -429,7 +430,9 @@ def stats(conn, params) do def create_backup(%{assigns: %{user: admin}} = conn, %{"nickname" => nickname}) do with %User{} = user <- User.get_by_nickname(nickname), - {:ok, _} <- Pleroma.User.Backup.create(user, admin.id) do + %Backup{} = backup <- Backup.new(user), + {:ok, inserted_backup} <- Pleroma.Repo.insert(backup), + {:ok, %Oban.Job{}} <- Backup.schedule_backup(inserted_backup) do ModerationLog.insert_log(%{actor: admin, subject: user, action: "create_backup"}) json(conn, "") diff --git a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex index 400f3825d8..86f7095159 100644 --- a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex +++ b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex @@ -65,12 +65,7 @@ defp backup do file_name: %Schema{type: :string}, file_size: %Schema{type: :integer}, processed: %Schema{type: :boolean, description: "whether this backup has succeeded"}, - state: %Schema{ - type: :string, - description: "the state of the backup", - enum: ["pending", "running", "complete", "failed"] - }, - processed_number: %Schema{type: :integer, description: "the number of records processed"} + tempdir: %Schema{type: :string} }, example: %{ "content_type" => "application/zip", @@ -79,8 +74,7 @@ defp backup do "file_size" => 4105, "inserted_at" => "2020-09-08T16:42:07.000Z", "processed" => true, - "state" => "complete", - "processed_number" => 20 + "tempdir" => "/tmp/PZIMw40vmpM" } } end diff --git a/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex b/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex index b9daed22bc..0115ec6450 100644 --- a/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex @@ -20,7 +20,7 @@ def index(%{assigns: %{user: user}} = conn, _params) do end def create(%{assigns: %{user: user}} = conn, _params) do - with {:ok, _} <- Backup.create(user) do + with {:ok, _} <- Backup.user(user) do backups = Backup.list(user) render(conn, "index.json", backups: backups) end diff --git a/lib/pleroma/web/pleroma_api/views/backup_view.ex b/lib/pleroma/web/pleroma_api/views/backup_view.ex index 20403aeee4..d778590f02 100644 --- a/lib/pleroma/web/pleroma_api/views/backup_view.ex +++ b/lib/pleroma/web/pleroma_api/views/backup_view.ex @@ -9,22 +9,12 @@ defmodule Pleroma.Web.PleromaAPI.BackupView do alias Pleroma.Web.CommonAPI.Utils def render("show.json", %{backup: %Backup{} = backup}) do - # To deal with records before the migration - state = - if backup.state == :invalid do - if backup.processed, do: :complete, else: :failed - else - backup.state - end - %{ id: backup.id, content_type: backup.content_type, url: download_url(backup), file_size: backup.file_size, processed: backup.processed, - state: to_string(state), - processed_number: backup.processed_number, inserted_at: Utils.to_masto_date(backup.inserted_at) } end diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index 54ac31a3c6..41f404e69a 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -8,44 +8,25 @@ defmodule Pleroma.Workers.BackupWorker do alias Oban.Job alias Pleroma.User.Backup - def process(backup, admin_user_id \\ nil) do - %{"op" => "process", "backup_id" => backup.id, "admin_user_id" => admin_user_id} - |> new() - |> Oban.insert() - end - - def schedule_deletion(backup) do - days = Pleroma.Config.get([Backup, :purge_after_days]) - time = 60 * 60 * 24 * days - scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time) - - %{"op" => "delete", "backup_id" => backup.id} - |> new(scheduled_at: scheduled_at) - |> Oban.insert() - end - - def delete(backup) do - %{"op" => "delete", "backup_id" => backup.id} - |> new() - |> Oban.insert() - end - @impl Oban.Worker def perform(%Job{ - args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id} + args: %{"op" => "process", "backup_id" => backup_id} }) do - with {:ok, %Backup{} = backup} <- - backup_id |> Backup.get() |> Backup.process(), - {:ok, _job} <- schedule_deletion(backup), - :ok <- Backup.remove_outdated(backup), - :ok <- maybe_deliver_email(backup, admin_user_id) do - {:ok, backup} + with {_, %Backup{} = backup} <- {:get, Backup.get_by_id(backup_id)}, + {_, {:ok, updated_backup}} <- {:run, Backup.run(backup)}, + {_, {:ok, uploaded_backup}} <- {:upload, Backup.upload(updated_backup)}, + {_, {:ok, _job}} <- {:delete, Backup.schedule_delete(uploaded_backup)}, + {_, :ok} <- {:outdated, Backup.remove_outdated(uploaded_backup.user)}, + {_, :ok} <- {:email, maybe_deliver_email(uploaded_backup)} do + {:ok, uploaded_backup} + else + e -> {:error, e} end end def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do - case Backup.get(backup_id) do - %Backup{} = backup -> Backup.delete(backup) + case Backup.get_by_id(backup_id) do + %Backup{} = backup -> Backup.delete_archive(backup) nil -> :ok end end @@ -57,13 +38,13 @@ defp has_email?(user) do not is_nil(user.email) and user.email != "" end - defp maybe_deliver_email(backup, admin_user_id) do + defp maybe_deliver_email(backup) do has_mailer = Pleroma.Config.get([Pleroma.Emails.Mailer, :enabled]) backup = backup |> Pleroma.Repo.preload(:user) if has_email?(backup.user) and has_mailer do backup - |> Pleroma.Emails.UserEmail.backup_is_ready_email(admin_user_id) + |> Pleroma.Emails.UserEmail.backup_is_ready_email() |> Pleroma.Emails.Mailer.deliver() :ok diff --git a/priv/repo/migrations/20240622175346_backup_refactor.exs b/priv/repo/migrations/20240622175346_backup_refactor.exs new file mode 100644 index 0000000000..5dfc55789d --- /dev/null +++ b/priv/repo/migrations/20240622175346_backup_refactor.exs @@ -0,0 +1,19 @@ +defmodule Pleroma.Repo.Migrations.BackupRefactor do + use Ecto.Migration + + def up do + alter table("backups") do + remove(:state) + remove(:processed_number) + add(:tempdir, :string) + end + end + + def down do + alter table("backups") do + add(:state, :integer, default: 5) + add(:processed_number, :integer, default: 0) + remove(:tempdir) + end + end +end From e5a738d465fa3cb16a5737b03561523059ed727c Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 24 Jun 2024 10:10:53 -0400 Subject: [PATCH 02/23] Refactor tests for Backups --- test/pleroma/user/backup_async_test.exs | 49 ----- test/pleroma/user/backup_test.exs | 175 +++++++----------- .../controllers/admin_api_controller_test.exs | 8 +- .../controllers/backup_controller_test.exs | 4 +- .../pleroma_api/views/backup_view_test.exs | 35 +--- test/support/mocks.ex | 2 - 6 files changed, 80 insertions(+), 193 deletions(-) delete mode 100644 test/pleroma/user/backup_async_test.exs diff --git a/test/pleroma/user/backup_async_test.exs b/test/pleroma/user/backup_async_test.exs deleted file mode 100644 index b0e9413bed..0000000000 --- a/test/pleroma/user/backup_async_test.exs +++ /dev/null @@ -1,49 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2023 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.User.BackupAsyncTest do - use Pleroma.DataCase, async: true - - import Pleroma.Factory - import Mox - - alias Pleroma.UnstubbedConfigMock, as: ConfigMock - alias Pleroma.User.Backup - alias Pleroma.User.Backup.ProcessorMock - - setup do - user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) - - {:ok, backup} = user |> Backup.new() |> Repo.insert() - %{backup: backup} - end - - test "it handles unrecoverable exceptions", %{backup: backup} do - ProcessorMock - |> expect(:do_process, fn _, _ -> - raise "mock exception" - end) - - ConfigMock - |> stub_with(Pleroma.Config) - - {:error, %{backup: backup, reason: :exit}} = Backup.process(backup, ProcessorMock) - - assert backup.state == :failed - end - - test "it handles timeouts", %{backup: backup} do - ProcessorMock - |> expect(:do_process, fn _, _ -> - Process.sleep(:timer.seconds(4)) - end) - - ConfigMock - |> expect(:get, fn [Pleroma.User.Backup, :process_wait_time] -> :timer.seconds(2) end) - - {:error, %{backup: backup, reason: :timeout}} = Backup.process(backup, ProcessorMock) - - assert backup.state == :failed - end -end diff --git a/test/pleroma/user/backup_test.exs b/test/pleroma/user/backup_test.exs index d82d1719bf..24fe09f7e8 100644 --- a/test/pleroma/user/backup_test.exs +++ b/test/pleroma/user/backup_test.exs @@ -6,7 +6,6 @@ defmodule Pleroma.User.BackupTest do use Oban.Testing, repo: Pleroma.Repo use Pleroma.DataCase - import Mock import Pleroma.Factory import Swoosh.TestAssertions import Mox @@ -16,7 +15,6 @@ defmodule Pleroma.User.BackupTest do alias Pleroma.UnstubbedConfigMock, as: ConfigMock alias Pleroma.Uploaders.S3.ExAwsMock alias Pleroma.User.Backup - alias Pleroma.User.Backup.ProcessorMock alias Pleroma.Web.CommonAPI alias Pleroma.Workers.BackupWorker @@ -28,79 +26,56 @@ defmodule Pleroma.User.BackupTest do ConfigMock |> stub_with(Pleroma.Config) - ProcessorMock - |> stub_with(Pleroma.User.Backup.Processor) - :ok end test "it does not requrie enabled email" do clear_config([Pleroma.Emails.Mailer, :enabled], false) user = insert(:user) - assert {:ok, _} = Backup.create(user) + assert {:ok, _} = Backup.user(user) end test "it does not require user's email" do user = insert(:user, %{email: nil}) - assert {:ok, _} = Backup.create(user) + assert {:ok, _} = Backup.user(user) end test "it creates a backup record and an Oban job" do - %{id: user_id} = user = insert(:user) - assert {:ok, %Oban.Job{args: args}} = Backup.create(user) + user = insert(:user) + assert {:ok, %Backup{} = backup} = Backup.user(user) + assert {:ok, %Oban.Job{args: args}} = Backup.schedule_backup(backup) assert_enqueued(worker: BackupWorker, args: args) - backup = Backup.get(args["backup_id"]) - assert %Backup{user_id: ^user_id, processed: false, file_size: 0, state: :pending} = backup + backup = Backup.get_by_id(args["backup_id"]) + assert %Backup{processed: false, file_size: 0} = backup end test "it return an error if the export limit is over" do - %{id: user_id} = user = insert(:user) + user = insert(:user) limit_days = Pleroma.Config.get([Backup, :limit_days]) - assert {:ok, %Oban.Job{args: args}} = Backup.create(user) - backup = Backup.get(args["backup_id"]) - assert %Backup{user_id: ^user_id, processed: false, file_size: 0} = backup + {:ok, first_backup} = Backup.user(user) + {:ok, _run_backup} = Backup.run(first_backup) - assert Backup.create(user) == {:error, "Last export was less than #{limit_days} days ago"} + assert Backup.user(user) == {:error, "Last export was less than #{limit_days} days ago"} end test "it process a backup record" do clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) %{id: user_id} = user = insert(:user) - assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) - assert {:ok, backup} = perform_job(BackupWorker, args) + assert {:ok, %Backup{id: backup_id}} = Backup.user(user) + + oban_args = %{"op" => "process", "backup_id" => backup_id} + + assert {:ok, backup} = perform_job(BackupWorker, oban_args) assert backup.file_size > 0 - assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup + assert match?(%Backup{id: ^backup_id, processed: true, user_id: ^user_id}, backup) delete_job_args = %{"op" => "delete", "backup_id" => backup_id} assert_enqueued(worker: BackupWorker, args: delete_job_args) assert {:ok, backup} = perform_job(BackupWorker, delete_job_args) - refute Backup.get(backup_id) - - email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup) - - assert_email_sent( - to: {user.name, user.email}, - html_body: email.html_body - ) - end - - test "it updates states of the backup" do - clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) - %{id: user_id} = user = insert(:user) - - assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) - assert {:ok, backup} = perform_job(BackupWorker, args) - assert backup.file_size > 0 - assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup - - delete_job_args = %{"op" => "delete", "backup_id" => backup_id} - - assert_enqueued(worker: BackupWorker, args: delete_job_args) - assert {:ok, backup} = perform_job(BackupWorker, delete_job_args) - refute Backup.get(backup_id) + refute Backup.get_by_id(backup_id) email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup) @@ -114,10 +89,15 @@ test "it does not send an email if the user does not have an email" do clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) %{id: user_id} = user = insert(:user, %{email: nil}) - assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) - assert {:ok, backup} = perform_job(BackupWorker, args) - assert backup.file_size > 0 - assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup + assert {:ok, %Backup{} = backup} = Backup.user(user) + + expected_args = %{"op" => "process", "backup_id" => backup.id} + + assert_enqueued(worker: BackupWorker, args: %{"backup_id" => backup.id}) + assert {:ok, completed_backup} = perform_job(BackupWorker, expected_args) + assert completed_backup.file_size > 0 + assert completed_backup.processed + assert completed_backup.user_id == user_id assert_no_email_sent() end @@ -127,10 +107,13 @@ test "it does not send an email if mailer is not on" do clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) %{id: user_id} = user = insert(:user) - assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) - assert {:ok, backup} = perform_job(BackupWorker, args) + assert {:ok, %Backup{id: backup_id}} = Backup.user(user) + + oban_args = %{"op" => "process", "backup_id" => backup_id} + + assert {:ok, backup} = perform_job(BackupWorker, oban_args) assert backup.file_size > 0 - assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup + assert match?(%Backup{id: ^backup_id, processed: true, user_id: ^user_id}, backup) assert_no_email_sent() end @@ -139,10 +122,15 @@ test "it does not send an email if the user has an empty email" do clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) %{id: user_id} = user = insert(:user, %{email: ""}) - assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) - assert {:ok, backup} = perform_job(BackupWorker, args) + assert {:ok, %Backup{id: backup_id} = backup} = Backup.user(user) + + expected_args = %{"op" => "process", "backup_id" => backup.id} + + assert_enqueued(worker: BackupWorker, args: expected_args) + + assert {:ok, backup} = perform_job(BackupWorker, expected_args) assert backup.file_size > 0 - assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup + assert match?(%Backup{id: ^backup_id, processed: true, user_id: ^user_id}, backup) assert_no_email_sent() end @@ -152,16 +140,13 @@ test "it removes outdated backups after creating a fresh one" do clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) user = insert(:user) - assert {:ok, job1} = Backup.create(user) - - assert {:ok, %Backup{}} = ObanHelpers.perform(job1) - assert {:ok, job2} = Backup.create(user) - assert Pleroma.Repo.aggregate(Backup, :count) == 2 - assert {:ok, backup2} = ObanHelpers.perform(job2) + assert {:ok, %{id: backup_one_id}} = Backup.user(user) + assert {:ok, %{id: _backup_two_id}} = Backup.user(user) + # Run the backups ObanHelpers.perform_all() - assert [^backup2] = Pleroma.Repo.all(Backup) + assert_enqueued(worker: BackupWorker, args: %{"op" => "delete", "backup_id" => backup_one_id}) end test "it creates a zip archive with user data" do @@ -185,9 +170,12 @@ test "it creates a zip archive with user data" do CommonAPI.follow(other_user, user) - assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - assert {:ok, path} = Backup.export(backup, self()) - assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(path), [:memory]) + assert {:ok, backup} = Backup.user(user) + assert {:ok, run_backup} = Backup.run(backup) + + tempfile = Path.join([run_backup.tempdir, run_backup.file_name]) + + assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(tempfile), [:memory]) assert {:ok, {~c"actor.json", json}} = :zip.zip_get(~c"actor.json", zipfile) assert %{ @@ -275,10 +263,10 @@ test "it creates a zip archive with user data" do } = Jason.decode!(json) :zip.zip_close(zipfile) - File.rm!(path) + File.rm_rf!(run_backup.tempdir) end - test "it counts the correct number processed" do + test "correct number processed" do user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) Enum.map(1..120, fn i -> @@ -288,43 +276,21 @@ test "it counts the correct number processed" do end) assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - {:ok, backup} = Backup.process(backup) + {:ok, backup} = Backup.run(backup) - assert backup.processed_number == 1 + 120 + 120 + 120 + zip_path = Path.join([backup.tempdir, backup.file_name]) - Backup.delete(backup) - end + assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(zip_path), [:memory]) - test "it handles errors" do - user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) + backup_parts = [~c"likes.json", ~c"bookmarks.json", ~c"outbox.json"] - Enum.map(1..120, fn i -> - {:ok, _status} = CommonAPI.post(user, %{status: "status #{i}"}) + Enum.each(backup_parts, fn part -> + assert {:ok, {_part, part_json}} = :zip.zip_get(part, zipfile) + {:ok, decoded_part} = Jason.decode(part_json) + assert decoded_part["totalItems"] == 120 end) - assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - - with_mock Pleroma.Web.ActivityPub.Transmogrifier, - [:passthrough], - prepare_outgoing: fn data -> - object = - data["object"] - |> Pleroma.Object.normalize(fetch: false) - |> Map.get(:data) - - data = data |> Map.put("object", object) - - if String.contains?(data["object"]["content"], "119"), - do: raise(%Postgrex.Error{}), - else: {:ok, data} - end do - {:ok, backup} = Backup.process(backup) - assert backup.processed - assert backup.state == :complete - assert backup.processed_number == 1 + 119 - - Backup.delete(backup) - end + Backup.delete_archive(backup) end describe "it uploads and deletes a backup archive" do @@ -343,12 +309,11 @@ test "it handles errors" do Bookmark.create(user.id, status3.id) assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - assert {:ok, path} = Backup.export(backup, self()) - [path: path, backup: backup] + [backup: backup] end - test "S3", %{path: path, backup: backup} do + test "S3", %{backup: backup} do clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.S3) clear_config([Pleroma.Uploaders.S3, :streaming_enabled], false) @@ -358,15 +323,17 @@ test "S3", %{path: path, backup: backup} do %{http_method: :delete} -> {:ok, %{status_code: 204}} end) - assert {:ok, %Pleroma.Upload{}} = Backup.upload(backup, path) - assert {:ok, _backup} = Backup.delete(backup) + assert {:ok, backup} = Backup.run(backup) + assert {:ok, %Backup{processed: true}} = Backup.upload(backup) + assert {:ok, _backup} = Backup.delete_archive(backup) end - test "Local", %{path: path, backup: backup} do + test "Local", %{backup: backup} do clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) - assert {:ok, %Pleroma.Upload{}} = Backup.upload(backup, path) - assert {:ok, _backup} = Backup.delete(backup) + assert {:ok, backup} = Backup.run(backup) + assert {:ok, %Backup{processed: true}} = Backup.upload(backup) + assert {:ok, _backup} = Backup.delete_archive(backup) end end end diff --git a/test/pleroma/web/admin_api/controllers/admin_api_controller_test.exs b/test/pleroma/web/admin_api/controllers/admin_api_controller_test.exs index a7ee8359d0..6614d1409b 100644 --- a/test/pleroma/web/admin_api/controllers/admin_api_controller_test.exs +++ b/test/pleroma/web/admin_api/controllers/admin_api_controller_test.exs @@ -1096,9 +1096,13 @@ test "it creates a backup", %{conn: conn} do ObanHelpers.perform_all() - email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup, admin.id) + email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup) + + assert String.contains?( + email.html_body, + "A full backup of your Pleroma account was requested" + ) - assert String.contains?(email.html_body, "Admin @#{admin.nickname} requested a full backup") assert_email_sent(to: {user.name, user.email}, html_body: email.html_body) log_message = "@#{admin_nickname} requested account backup for @#{user_nickname}" diff --git a/test/pleroma/web/pleroma_api/controllers/backup_controller_test.exs b/test/pleroma/web/pleroma_api/controllers/backup_controller_test.exs index 21e619fa4e..1e056adb59 100644 --- a/test/pleroma/web/pleroma_api/controllers/backup_controller_test.exs +++ b/test/pleroma/web/pleroma_api/controllers/backup_controller_test.exs @@ -20,9 +20,7 @@ defmodule Pleroma.Web.PleromaAPI.BackupControllerTest do end test "GET /api/v1/pleroma/backups", %{user: user, conn: conn} do - assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id}}} = Backup.create(user) - - backup = Backup.get(backup_id) + assert {:ok, %Backup{} = backup} = Backup.user(user) response = conn diff --git a/test/pleroma/web/pleroma_api/views/backup_view_test.exs b/test/pleroma/web/pleroma_api/views/backup_view_test.exs index b125b88726..303547f3b1 100644 --- a/test/pleroma/web/pleroma_api/views/backup_view_test.exs +++ b/test/pleroma/web/pleroma_api/views/backup_view_test.exs @@ -27,42 +27,11 @@ test "it renders the ID" do assert result.id == backup.id end - test "it renders the state and processed_number" do + test "it renders the processed state" do user = insert(:user) backup = Backup.new(user) result = BackupView.render("show.json", backup: backup) - assert result.state == to_string(backup.state) - assert result.processed_number == backup.processed_number - end - - test "it renders failed state with legacy records" do - backup = %Backup{ - id: 0, - content_type: "application/zip", - file_name: "dummy", - file_size: 1, - state: :invalid, - processed: true, - processed_number: 1, - inserted_at: NaiveDateTime.utc_now() - } - - result = BackupView.render("show.json", backup: backup) - assert result.state == "complete" - - backup = %Backup{ - id: 0, - content_type: "application/zip", - file_name: "dummy", - file_size: 1, - state: :invalid, - processed: false, - processed_number: 1, - inserted_at: NaiveDateTime.utc_now() - } - - result = BackupView.render("show.json", backup: backup) - assert result.state == "failed" + refute result.processed end end diff --git a/test/support/mocks.ex b/test/support/mocks.ex index 63cbc49ab6..d84958e154 100644 --- a/test/support/mocks.ex +++ b/test/support/mocks.ex @@ -32,6 +32,4 @@ Mox.defmock(Pleroma.LoggerMock, for: Pleroma.Logging) -Mox.defmock(Pleroma.User.Backup.ProcessorMock, for: Pleroma.User.Backup.ProcessorAPI) - Mox.defmock(Pleroma.Uploaders.S3.ExAwsMock, for: Pleroma.Uploaders.S3.ExAwsAPI) From ece063586b5e2c2e8c4eb6e92e50b3dcc09d2836 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 24 Jun 2024 16:36:43 -0400 Subject: [PATCH 03/23] Limit backup jobs to 5 minutes --- lib/pleroma/workers/backup_worker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index 41f404e69a..fb366de4af 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -32,7 +32,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do end @impl Oban.Worker - def timeout(_job), do: :infinity + def timeout(_job), do: :timer.minutes(5) defp has_email?(user) do not is_nil(user.email) and user.email != "" From 3f60d7bf644fc38e8e5d9f525a1675526ade0037 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 24 Jun 2024 22:52:21 -0400 Subject: [PATCH 04/23] Better random tempdir format --- lib/pleroma/user/backup.ex | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 2568089a4c..7feaa22bf5 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -216,13 +216,15 @@ def run(%__MODULE__{} = backup) do end defp tempdir do + rand = :crypto.strong_rand_bytes(8) |> Base.url_encode64(padding: false) + subdir = "backup-#{rand}" + case Config.get([__MODULE__, :tempdir]) do nil -> - System.tmp_dir!() + Path.join([System.tmp_dir!(), subdir]) path -> - rand = :crypto.strong_rand_bytes(8) |> Base.url_encode64(padding: false) - Path.join([path, rand]) + Path.join([path, subdir]) end end From e5cbbaf3f0385492580b27a624b936c1d74757a1 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 24 Jun 2024 22:58:38 -0400 Subject: [PATCH 05/23] Extend the backup job time limit to 30 minutes --- lib/pleroma/workers/backup_worker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index fb366de4af..1791c558d8 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -32,7 +32,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do end @impl Oban.Worker - def timeout(_job), do: :timer.minutes(5) + def timeout(_job), do: :timer.minutes(30) defp has_email?(user) do not is_nil(user.email) and user.email != "" From 187897874bddc27ef23606730e79522e3583ec43 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 22 Jul 2024 14:00:54 -0400 Subject: [PATCH 06/23] Make backup timeout configurable --- config/config.exs | 4 ++-- config/description.exs | 15 +++++++-------- docs/configuration/cheatsheet.md | 1 + lib/pleroma/workers/backup_worker.ex | 3 ++- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/config/config.exs b/config/config.exs index 044f951f6e..3058160ead 100644 --- a/config/config.exs +++ b/config/config.exs @@ -908,8 +908,8 @@ purge_after_days: 30, limit_days: 7, dir: nil, - process_wait_time: 30_000, - process_chunk_size: 100 + process_chunk_size: 100, + timeout: :timer.minutes(30) config :pleroma, ConcurrentLimiter, [ {Pleroma.Search, [max_running: 30, max_waiting: 50]} diff --git a/config/description.exs b/config/description.exs index b7d86dc63c..179eea3f7c 100644 --- a/config/description.exs +++ b/config/description.exs @@ -3355,20 +3355,19 @@ description: "Limit user to export not more often than once per N days", suggestions: [7] }, - %{ - key: :process_wait_time, - type: :integer, - label: "Process Wait Time", - description: - "The amount of time to wait for backup to report progress, in milliseconds. If no progress is received from the backup job for that much time, terminate it and deem it failed.", - suggestions: [30_000] - }, %{ key: :process_chunk_size, type: :integer, label: "Process Chunk Size", description: "The number of activities to fetch in the backup job for each chunk.", suggestions: [100] + }, + %{ + key: :timeout, + type: :integer, + label: "Timeout", + description: "The amount of time to wait for backup to complete in seconds.", + suggestions: [1_800] } ] }, diff --git a/docs/configuration/cheatsheet.md b/docs/configuration/cheatsheet.md index 9c56599884..5689d3be50 100644 --- a/docs/configuration/cheatsheet.md +++ b/docs/configuration/cheatsheet.md @@ -1171,6 +1171,7 @@ Control favicons for instances. 3. the directory named by the TMP environment variable 4. C:\TMP on Windows or /tmp on Unix-like operating systems 5. as a last resort, the current working directory +* `:timeout` an integer representing seconds ## Frontend management diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index 1791c558d8..d1b6fcdadf 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Workers.BackupWorker do use Oban.Worker, queue: :slow, max_attempts: 1 alias Oban.Job + alias Pleroma.Config.Getting, as: Config alias Pleroma.User.Backup @impl Oban.Worker @@ -32,7 +33,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do end @impl Oban.Worker - def timeout(_job), do: :timer.minutes(30) + def timeout(_job), do: Config.get([Backup, :timeout], :timer.minutes(30)) defp has_email?(user) do not is_nil(user.email) and user.email != "" From 1a482a73c3b99f7fdc512b734dd746e9f9cd396d Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 25 Jul 2024 11:18:27 -0400 Subject: [PATCH 07/23] Fix Optimistic Inbox for failed signatures When signatures fail on incoming activities we put the job into Oban to be processed later instead of doing the user fetching and validation inline which is expensive and increases latency on the incoming POST request. Unfortunately we did not retain the :method, :request_path, and :query_string parameters from the conn so the signature validation and Oban Job would always fail. This was most obvious when Mastodon sends Deletes for users your server has never seen before. --- changelog.d/optimistic-inbox-sigs.fix | 1 + .../web/activity_pub/activity_pub_controller.ex | 11 +++++++++-- lib/pleroma/workers/receiver_worker.ex | 17 +++++++++++++++-- 3 files changed, 25 insertions(+), 4 deletions(-) create mode 100644 changelog.d/optimistic-inbox-sigs.fix diff --git a/changelog.d/optimistic-inbox-sigs.fix b/changelog.d/optimistic-inbox-sigs.fix new file mode 100644 index 0000000000..53ffe6b5bb --- /dev/null +++ b/changelog.d/optimistic-inbox-sigs.fix @@ -0,0 +1 @@ +Fix Optimistic Inbox for failed signatures diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index e6161455dc..cdd054e1a3 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -293,8 +293,15 @@ def inbox(%{assigns: %{valid_signature: true}} = conn, params) do json(conn, "ok") end - def inbox(%{assigns: %{valid_signature: false}, req_headers: req_headers} = conn, params) do - Federator.incoming_ap_doc(%{req_headers: req_headers, params: params}) + def inbox(%{assigns: %{valid_signature: false}} = conn, params) do + Federator.incoming_ap_doc(%{ + method: conn.method, + req_headers: conn.req_headers, + request_path: conn.request_path, + params: params, + query_string: conn.query_string + }) + json(conn, "ok") end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index d01813c258..1a2881cd1a 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -12,13 +12,26 @@ defmodule Pleroma.Workers.ReceiverWorker do @impl Oban.Worker def perform(%Job{ - args: %{"op" => "incoming_ap_doc", "req_headers" => req_headers, "params" => params} + args: %{ + "op" => "incoming_ap_doc", + "method" => method, + "params" => params, + "req_headers" => req_headers, + "request_path" => request_path, + "query_string" => query_string + } }) do # Oban's serialization converts our tuple headers to lists. # Revert it for the signature validation. req_headers = Enum.into(req_headers, [], &List.to_tuple(&1)) - conn_data = %{params: params, req_headers: req_headers} + conn_data = %{ + method: method, + params: params, + req_headers: req_headers, + request_path: request_path, + query_string: query_string + } with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]), {:ok, _public_key} <- Signature.refetch_public_key(conn_data), From 1b9c887dbb8d87814f8d9cc11cfcbc8802348b22 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 25 Jul 2024 12:54:27 -0400 Subject: [PATCH 08/23] Extract validate_signature/2 from the HTTPSignaturePlug This logic only exists in the Plug, so attempting to validate the signature by calling the library function HTTPSignature.validate_conn/2 directly will never work because we do not attempt to construct the (request-target) and @request-target headers with both the commonly misinterpreted and correct implementation of this field. Therefore all attempts to validate a signature from an Oban Job will fail. --- config/test.exs | 3 +- lib/pleroma/signature.ex | 52 ++++++++++++++++++++ lib/pleroma/web/plugs/http_signature_plug.ex | 50 +------------------ lib/pleroma/workers/receiver_worker.ex | 2 +- 4 files changed, 56 insertions(+), 51 deletions(-) diff --git a/config/test.exs b/config/test.exs index 5d9541f437..8a56940547 100644 --- a/config/test.exs +++ b/config/test.exs @@ -158,8 +158,7 @@ config :pleroma, Pleroma.Web.Plugs.HTTPSecurityPlug, config_impl: Pleroma.StaticStubbedConfigMock config :pleroma, Pleroma.Web.Plugs.HTTPSignaturePlug, config_impl: Pleroma.StaticStubbedConfigMock -config :pleroma, Pleroma.Web.Plugs.HTTPSignaturePlug, - http_signatures_impl: Pleroma.StubbedHTTPSignaturesMock +config :pleroma, Pleroma.Signature, http_signatures_impl: Pleroma.StubbedHTTPSignaturesMock peer_module = if String.to_integer(System.otp_release()) >= 25 do diff --git a/lib/pleroma/signature.ex b/lib/pleroma/signature.ex index 900d40c4bb..51dac24024 100644 --- a/lib/pleroma/signature.ex +++ b/lib/pleroma/signature.ex @@ -10,6 +10,14 @@ defmodule Pleroma.Signature do alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub + import Plug.Conn, only: [put_req_header: 3] + + @http_signatures_impl Application.compile_env( + :pleroma, + [__MODULE__, :http_signatures_impl], + HTTPSignatures + ) + @known_suffixes ["/publickey", "/main-key"] def key_id_to_actor_id(key_id) do @@ -85,4 +93,48 @@ def signed_date, do: signed_date(NaiveDateTime.utc_now()) def signed_date(%NaiveDateTime{} = date) do Timex.format!(date, "{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT") end + + @spec validate_signature(map(), String.t()) :: boolean() + def validate_signature(conn, request_target) do + # Newer drafts for HTTP signatures now use @request-target instead of the + # old (request-target). We'll now support both for incoming signatures. + conn = + conn + |> put_req_header("(request-target)", request_target) + |> put_req_header("@request-target", request_target) + + @http_signatures_impl.validate_conn(conn) + end + + @spec validate_signature(map()) :: boolean() + def validate_signature(conn) do + # This (request-target) is non-standard, but many implementations do it + # this way due to a misinterpretation of + # https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06 + # "path" was interpreted as not having the query, though later examples + # show that it must be the absolute path + query. This behavior is kept to + # make sure most software (Pleroma itself, Mastodon, and probably others) + # do not break. + request_target = String.downcase("#{conn.method}") <> " #{conn.request_path}" + + # This is the proper way to build the @request-target, as expected by + # many HTTP signature libraries, clarified in the following draft: + # https://www.ietf.org/archive/id/draft-ietf-httpbis-message-signatures-11.html#section-2.2.6 + # It is the same as before, but containing the query part as well. + proper_target = request_target <> "?#{conn.query_string}" + + cond do + # Normal, non-standard behavior but expected by Pleroma and more. + validate_signature(conn, request_target) -> + true + + # Has query string and the previous one failed: let's try the standard. + conn.query_string != "" -> + validate_signature(conn, proper_target) + + # If there's no query string and signature fails, it's rotten. + true -> + false + end + end end diff --git a/lib/pleroma/web/plugs/http_signature_plug.ex b/lib/pleroma/web/plugs/http_signature_plug.ex index 6bf2dd432a..67974599a7 100644 --- a/lib/pleroma/web/plugs/http_signature_plug.ex +++ b/lib/pleroma/web/plugs/http_signature_plug.ex @@ -8,16 +8,12 @@ defmodule Pleroma.Web.Plugs.HTTPSignaturePlug do import Plug.Conn import Phoenix.Controller, only: [get_format: 1, text: 2] + alias Pleroma.Signature alias Pleroma.Web.ActivityPub.MRF require Logger @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config) - @http_signatures_impl Application.compile_env( - :pleroma, - [__MODULE__, :http_signatures_impl], - HTTPSignatures - ) def init(options) do options @@ -39,48 +35,6 @@ def call(conn, _opts) do end end - defp validate_signature(conn, request_target) do - # Newer drafts for HTTP signatures now use @request-target instead of the - # old (request-target). We'll now support both for incoming signatures. - conn = - conn - |> put_req_header("(request-target)", request_target) - |> put_req_header("@request-target", request_target) - - @http_signatures_impl.validate_conn(conn) - end - - defp validate_signature(conn) do - # This (request-target) is non-standard, but many implementations do it - # this way due to a misinterpretation of - # https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06 - # "path" was interpreted as not having the query, though later examples - # show that it must be the absolute path + query. This behavior is kept to - # make sure most software (Pleroma itself, Mastodon, and probably others) - # do not break. - request_target = String.downcase("#{conn.method}") <> " #{conn.request_path}" - - # This is the proper way to build the @request-target, as expected by - # many HTTP signature libraries, clarified in the following draft: - # https://www.ietf.org/archive/id/draft-ietf-httpbis-message-signatures-11.html#section-2.2.6 - # It is the same as before, but containing the query part as well. - proper_target = request_target <> "?#{conn.query_string}" - - cond do - # Normal, non-standard behavior but expected by Pleroma and more. - validate_signature(conn, request_target) -> - true - - # Has query string and the previous one failed: let's try the standard. - conn.query_string != "" -> - validate_signature(conn, proper_target) - - # If there's no query string and signature fails, it's rotten. - true -> - false - end - end - defp maybe_assign_valid_signature(conn) do if has_signature_header?(conn) do # we replace the digest header with the one we computed in DigestPlug @@ -90,7 +44,7 @@ defp maybe_assign_valid_signature(conn) do conn -> conn end - assign(conn, :valid_signature, validate_signature(conn)) + assign(conn, :valid_signature, Signature.validate_signature(conn)) else Logger.debug("No signature header!") conn diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index 1a2881cd1a..94624579e0 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -35,7 +35,7 @@ def perform(%Job{ with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]), {:ok, _public_key} <- Signature.refetch_public_key(conn_data), - {:signature, true} <- {:signature, HTTPSignatures.validate_conn(conn_data)}, + {:signature, true} <- {:signature, Signature.validate_signature(conn_data)}, {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do {:ok, res} else From a964368e31ddf8a22dd36bbf2aae51f7d91bbdaf Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 25 Jul 2024 14:33:51 -0400 Subject: [PATCH 09/23] Add test to fetch and validate an activity that originally failed signature --- lib/pleroma/web/federator.ex | 6 +- lib/pleroma/workers/receiver_worker.ex | 2 +- test/fixtures/bastianallgeier.json | 117 +++++++++++ test/fixtures/denniskoch.json | 112 ++++++++++ .../receiver_worker_signature_activity.json | 62 ++++++ test/pleroma/workers/receiver_worker_test.exs | 196 ++++++++++++++++++ 6 files changed, 492 insertions(+), 3 deletions(-) create mode 100644 test/fixtures/bastianallgeier.json create mode 100644 test/fixtures/denniskoch.json create mode 100644 test/fixtures/receiver_worker_signature_activity.json diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 4b30fd21d2..3d3101d61e 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -35,10 +35,12 @@ def allowed_thread_distance?(distance) do end # Client API - def incoming_ap_doc(%{params: params, req_headers: req_headers}) do + def incoming_ap_doc(%{params: _params, req_headers: _req_headers} = args) do + job_args = Enum.into(args, %{}, fn {k, v} -> {Atom.to_string(k), v} end) + ReceiverWorker.enqueue( "incoming_ap_doc", - %{"req_headers" => req_headers, "params" => params, "timeout" => :timer.seconds(20)}, + Map.put(job_args, "timeout", :timer.seconds(20)), priority: 2 ) end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index 94624579e0..fd5c13fca2 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -25,7 +25,7 @@ def perform(%Job{ # Revert it for the signature validation. req_headers = Enum.into(req_headers, [], &List.to_tuple(&1)) - conn_data = %{ + conn_data = %Plug.Conn{ method: method, params: params, req_headers: req_headers, diff --git a/test/fixtures/bastianallgeier.json b/test/fixtures/bastianallgeier.json new file mode 100644 index 0000000000..6b47e7db9b --- /dev/null +++ b/test/fixtures/bastianallgeier.json @@ -0,0 +1,117 @@ +{ + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1", + { + "Curve25519Key": "toot:Curve25519Key", + "Device": "toot:Device", + "Ed25519Key": "toot:Ed25519Key", + "Ed25519Signature": "toot:Ed25519Signature", + "EncryptedMessage": "toot:EncryptedMessage", + "PropertyValue": "schema:PropertyValue", + "alsoKnownAs": { + "@id": "as:alsoKnownAs", + "@type": "@id" + }, + "cipherText": "toot:cipherText", + "claim": { + "@id": "toot:claim", + "@type": "@id" + }, + "deviceId": "toot:deviceId", + "devices": { + "@id": "toot:devices", + "@type": "@id" + }, + "discoverable": "toot:discoverable", + "featured": { + "@id": "toot:featured", + "@type": "@id" + }, + "featuredTags": { + "@id": "toot:featuredTags", + "@type": "@id" + }, + "fingerprintKey": { + "@id": "toot:fingerprintKey", + "@type": "@id" + }, + "focalPoint": { + "@container": "@list", + "@id": "toot:focalPoint" + }, + "identityKey": { + "@id": "toot:identityKey", + "@type": "@id" + }, + "indexable": "toot:indexable", + "manuallyApprovesFollowers": "as:manuallyApprovesFollowers", + "memorial": "toot:memorial", + "messageFranking": "toot:messageFranking", + "messageType": "toot:messageType", + "movedTo": { + "@id": "as:movedTo", + "@type": "@id" + }, + "publicKeyBase64": "toot:publicKeyBase64", + "schema": "http://schema.org#", + "suspended": "toot:suspended", + "toot": "http://joinmastodon.org/ns#", + "value": "schema:value" + } + ], + "attachment": [ + { + "name": "Website", + "type": "PropertyValue", + "value": "https://bastianallgeier.com" + }, + { + "name": "Project", + "type": "PropertyValue", + "value": "https://getkirby.com" + }, + { + "name": "Github", + "type": "PropertyValue", + "value": "https://github.com/bastianallgeier" + } + ], + "devices": "https://mastodon.social/users/bastianallgeier/collections/devices", + "discoverable": true, + "endpoints": { + "sharedInbox": "https://mastodon.social/inbox" + }, + "featured": "https://mastodon.social/users/bastianallgeier/collections/featured", + "featuredTags": "https://mastodon.social/users/bastianallgeier/collections/tags", + "followers": "https://mastodon.social/users/bastianallgeier/followers", + "following": "https://mastodon.social/users/bastianallgeier/following", + "icon": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://files.mastodon.social/accounts/avatars/000/007/393/original/0180a20079617c71.jpg" + }, + "id": "https://mastodon.social/users/bastianallgeier", + "image": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://files.mastodon.social/accounts/headers/000/007/393/original/13d644ab46d50478.jpeg" + }, + "inbox": "https://mastodon.social/users/bastianallgeier/inbox", + "indexable": false, + "manuallyApprovesFollowers": false, + "memorial": false, + "name": "Bastian Allgeier", + "outbox": "https://mastodon.social/users/bastianallgeier/outbox", + "preferredUsername": "bastianallgeier", + "publicKey": { + "id": "https://mastodon.social/users/bastianallgeier#main-key", + "owner": "https://mastodon.social/users/bastianallgeier", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3fz+hpgVztO9z6HUhyzv\nwP++ERBBoIwSLKf1TyIM8bvzGFm2YXaO5uxu1HvumYFTYc3ACr3q4j8VUb7NMxkQ\nlzu4QwPjOFJ43O+fY+HSPORXEDW5fXDGC5DGpox4+i08LxRmx7L6YPRUSUuPN8nI\nWyq1Qsq1zOQrNY/rohMXkBdSXxqC3yIRqvtLt4otCgay/5tMogJWkkS6ZKyFhb9z\nwVVy1fsbV10c9C+SHy4NH26CKaTtpTYLRBMjhTCS8bX8iDSjGIf2aZgYs1ir7gEz\n9wf5CvLiENmVWGwm64t6KSEAkA4NJ1hzgHUZPCjPHZE2SmhO/oHaxokTzqtbbENJ\n1QIDAQAB\n-----END PUBLIC KEY-----\n" + }, + "published": "2016-11-01T00:00:00Z", + "summary": "

Designer & developer. Creator of Kirby CMS

", + "tag": [], + "type": "Person", + "url": "https://mastodon.social/@bastianallgeier" +} diff --git a/test/fixtures/denniskoch.json b/test/fixtures/denniskoch.json new file mode 100644 index 0000000000..7aa4de508e --- /dev/null +++ b/test/fixtures/denniskoch.json @@ -0,0 +1,112 @@ +{ + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1", + { + "Curve25519Key": "toot:Curve25519Key", + "Device": "toot:Device", + "Ed25519Key": "toot:Ed25519Key", + "Ed25519Signature": "toot:Ed25519Signature", + "EncryptedMessage": "toot:EncryptedMessage", + "PropertyValue": "schema:PropertyValue", + "alsoKnownAs": { + "@id": "as:alsoKnownAs", + "@type": "@id" + }, + "cipherText": "toot:cipherText", + "claim": { + "@id": "toot:claim", + "@type": "@id" + }, + "deviceId": "toot:deviceId", + "devices": { + "@id": "toot:devices", + "@type": "@id" + }, + "discoverable": "toot:discoverable", + "featured": { + "@id": "toot:featured", + "@type": "@id" + }, + "featuredTags": { + "@id": "toot:featuredTags", + "@type": "@id" + }, + "fingerprintKey": { + "@id": "toot:fingerprintKey", + "@type": "@id" + }, + "focalPoint": { + "@container": "@list", + "@id": "toot:focalPoint" + }, + "identityKey": { + "@id": "toot:identityKey", + "@type": "@id" + }, + "indexable": "toot:indexable", + "manuallyApprovesFollowers": "as:manuallyApprovesFollowers", + "memorial": "toot:memorial", + "messageFranking": "toot:messageFranking", + "messageType": "toot:messageType", + "movedTo": { + "@id": "as:movedTo", + "@type": "@id" + }, + "publicKeyBase64": "toot:publicKeyBase64", + "schema": "http://schema.org#", + "suspended": "toot:suspended", + "toot": "http://joinmastodon.org/ns#", + "value": "schema:value" + } + ], + "attachment": [ + { + "name": "GitHub", + "type": "PropertyValue", + "value": "https://github.com/pxlrbt/" + }, + { + "name": "Discord", + "type": "PropertyValue", + "value": "pxlrbt#6029" + } + ], + "devices": "https://phpc.social/users/denniskoch/collections/devices", + "discoverable": true, + "endpoints": { + "sharedInbox": "https://phpc.social/inbox" + }, + "featured": "https://phpc.social/users/denniskoch/collections/featured", + "featuredTags": "https://phpc.social/users/denniskoch/collections/tags", + "followers": "https://phpc.social/users/denniskoch/followers", + "following": "https://phpc.social/users/denniskoch/following", + "icon": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://media.phpc.social/accounts/avatars/109/364/097/179/042/485/original/6e770c7b3f5ef72d.jpg" + }, + "id": "https://phpc.social/users/denniskoch", + "image": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://media.phpc.social/accounts/headers/109/364/097/179/042/485/original/709da24705260c04.jpg" + }, + "inbox": "https://phpc.social/users/denniskoch/inbox", + "indexable": true, + "manuallyApprovesFollowers": false, + "memorial": false, + "name": "Dennis Koch", + "outbox": "https://phpc.social/users/denniskoch/outbox", + "preferredUsername": "denniskoch", + "publicKey": { + "id": "https://phpc.social/users/denniskoch#main-key", + "owner": "https://phpc.social/users/denniskoch", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4dmcSlqLj18gPvuslkmt\nQTniZ8ybO4pgvMvPLYtBuTBUjo49vJ/8Sw6jB5zcKb1haqIdny7Rv/vY3kCdCXcP\nloh1I+jthEgqLT8JpZWGwLGwg9piFhrMGADmt3N8du7HfglzuZ8LlVpnZ8feCw7I\nS2ua/ZCxE47mI45Z3ed2kkFYKWopWWqFn2lan/1OyHrcFKtCvaVjRdvo0UUt2tgl\nvyJI4+zN8FnrCbsMtcbI5nSzfJIrOc4LeaGmLJh+0o2rwoOQZc2487XWbeyfhjsq\nPRBpYN7pfHWQDvzQIN075LHTf9zDFsm6+HqY7Zs5rYxr72rvcX7d9JcP6CasIosY\nqwIDAQAB\n-----END PUBLIC KEY-----\n" + }, + "published": "2022-11-18T00:00:00Z", + "summary": "

🧑‍💻 Full Stack Developer
🚀 Laravel, Filament, Livewire, Vue, Inertia
🌍 Germany

", + "tag": [], + "type": "Person", + "url": "https://phpc.social/@denniskoch" +} diff --git a/test/fixtures/receiver_worker_signature_activity.json b/test/fixtures/receiver_worker_signature_activity.json new file mode 100644 index 0000000000..3c3fb3fd2f --- /dev/null +++ b/test/fixtures/receiver_worker_signature_activity.json @@ -0,0 +1,62 @@ +{ + "@context": [ + "https://www.w3.org/ns/activitystreams", + { + "atomUri": "ostatus:atomUri", + "blurhash": "toot:blurhash", + "conversation": "ostatus:conversation", + "focalPoint": { + "@container": "@list", + "@id": "toot:focalPoint" + }, + "inReplyToAtomUri": "ostatus:inReplyToAtomUri", + "ostatus": "http://ostatus.org#", + "sensitive": "as:sensitive", + "toot": "http://joinmastodon.org/ns#", + "votersCount": "toot:votersCount" + } + ], + "atomUri": "https://chaos.social/users/distantnative/statuses/109336635639931467", + "attachment": [ + { + "blurhash": "UAK1zS00OXIUxuMxIUM{?b-:-;W:Di?b%2M{", + "height": 960, + "mediaType": "image/jpeg", + "name": null, + "type": "Document", + "url": "https://assets.chaos.social/media_attachments/files/109/336/634/286/114/657/original/2e6122063d8bfb26.jpeg", + "width": 346 + } + ], + "attributedTo": "https://chaos.social/users/distantnative", + "cc": [ + "https://chaos.social/users/distantnative/followers" + ], + "content": "

Favorite piece of anthropology meta discourse.

", + "contentMap": { + "en": "

Favorite piece of anthropology meta discourse.

" + }, + "conversation": "tag:chaos.social,2022-11-13:objectId=71843781:objectType=Conversation", + "id": "https://chaos.social/users/distantnative/statuses/109336635639931467", + "inReplyTo": null, + "inReplyToAtomUri": null, + "published": "2022-11-13T13:04:20Z", + "replies": { + "first": { + "items": [], + "next": "https://chaos.social/users/distantnative/statuses/109336635639931467/replies?only_other_accounts=true&page=true", + "partOf": "https://chaos.social/users/distantnative/statuses/109336635639931467/replies", + "type": "CollectionPage" + }, + "id": "https://chaos.social/users/distantnative/statuses/109336635639931467/replies", + "type": "Collection" + }, + "sensitive": false, + "summary": null, + "tag": [], + "to": [ + "https://www.w3.org/ns/activitystreams#Public" + ], + "type": "Note", + "url": "https://chaos.social/@distantnative/109336635639931467" +} diff --git a/test/pleroma/workers/receiver_worker_test.exs b/test/pleroma/workers/receiver_worker_test.exs index 2b8bd3c40b..33be910853 100644 --- a/test/pleroma/workers/receiver_worker_test.exs +++ b/test/pleroma/workers/receiver_worker_test.exs @@ -9,6 +9,7 @@ defmodule Pleroma.Workers.ReceiverWorkerTest do import Mock import Pleroma.Factory + alias Pleroma.Web.Federator alias Pleroma.Workers.ReceiverWorker test "it does not retry MRF reject" do @@ -49,4 +50,199 @@ test "it does not retry duplicates" do args: %{"op" => "incoming_ap_doc", "params" => params} }) end + + test "it can validate the signature" do + Tesla.Mock.mock(fn + %{url: "https://mastodon.social/users/bastianallgeier"} -> + %Tesla.Env{ + status: 200, + body: File.read!("test/fixtures/bastianallgeier.json"), + headers: [{"content-type", "application/activity+json"}] + } + + %{url: "https://mastodon.social/users/bastianallgeier/collections/featured"} -> + %Tesla.Env{ + status: 200, + headers: [{"content-type", "application/activity+json"}], + body: + File.read!("test/fixtures/users_mock/masto_featured.json") + |> String.replace("{{domain}}", "mastodon.social") + |> String.replace("{{nickname}}", "bastianallgeier") + } + + %{url: "https://phpc.social/users/denniskoch"} -> + %Tesla.Env{ + status: 200, + body: File.read!("test/fixtures/denniskoch.json"), + headers: [{"content-type", "application/activity+json"}] + } + + %{url: "https://phpc.social/users/denniskoch/collections/featured"} -> + %Tesla.Env{ + status: 200, + headers: [{"content-type", "application/activity+json"}], + body: + File.read!("test/fixtures/users_mock/masto_featured.json") + |> String.replace("{{domain}}", "phpc.social") + |> String.replace("{{nickname}}", "denniskoch") + } + + %{url: "https://mastodon.social/users/bastianallgeier/statuses/112846516276907281"} -> + %Tesla.Env{ + status: 200, + headers: [{"content-type", "application/activity+json"}], + body: File.read!("test/fixtures/receiver_worker_signature_activity.json") + } + end) + + params = %{ + "@context" => [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1", + %{ + "claim" => %{"@id" => "toot:claim", "@type" => "@id"}, + "memorial" => "toot:memorial", + "atomUri" => "ostatus:atomUri", + "manuallyApprovesFollowers" => "as:manuallyApprovesFollowers", + "blurhash" => "toot:blurhash", + "ostatus" => "http://ostatus.org#", + "discoverable" => "toot:discoverable", + "focalPoint" => %{"@container" => "@list", "@id" => "toot:focalPoint"}, + "votersCount" => "toot:votersCount", + "Hashtag" => "as:Hashtag", + "Emoji" => "toot:Emoji", + "alsoKnownAs" => %{"@id" => "as:alsoKnownAs", "@type" => "@id"}, + "sensitive" => "as:sensitive", + "movedTo" => %{"@id" => "as:movedTo", "@type" => "@id"}, + "inReplyToAtomUri" => "ostatus:inReplyToAtomUri", + "conversation" => "ostatus:conversation", + "Device" => "toot:Device", + "schema" => "http://schema.org#", + "toot" => "http://joinmastodon.org/ns#", + "cipherText" => "toot:cipherText", + "suspended" => "toot:suspended", + "messageType" => "toot:messageType", + "featuredTags" => %{"@id" => "toot:featuredTags", "@type" => "@id"}, + "Curve25519Key" => "toot:Curve25519Key", + "deviceId" => "toot:deviceId", + "Ed25519Signature" => "toot:Ed25519Signature", + "featured" => %{"@id" => "toot:featured", "@type" => "@id"}, + "devices" => %{"@id" => "toot:devices", "@type" => "@id"}, + "value" => "schema:value", + "PropertyValue" => "schema:PropertyValue", + "messageFranking" => "toot:messageFranking", + "publicKeyBase64" => "toot:publicKeyBase64", + "identityKey" => %{"@id" => "toot:identityKey", "@type" => "@id"}, + "Ed25519Key" => "toot:Ed25519Key", + "indexable" => "toot:indexable", + "EncryptedMessage" => "toot:EncryptedMessage", + "fingerprintKey" => %{"@id" => "toot:fingerprintKey", "@type" => "@id"} + } + ], + "actor" => "https://phpc.social/users/denniskoch", + "cc" => [ + "https://phpc.social/users/denniskoch/followers", + "https://mastodon.social/users/bastianallgeier", + "https://chaos.social/users/distantnative", + "https://fosstodon.org/users/kev" + ], + "id" => "https://phpc.social/users/denniskoch/statuses/112847382711461301/activity", + "object" => %{ + "atomUri" => "https://phpc.social/users/denniskoch/statuses/112847382711461301", + "attachment" => [], + "attributedTo" => "https://phpc.social/users/denniskoch", + "cc" => [ + "https://phpc.social/users/denniskoch/followers", + "https://mastodon.social/users/bastianallgeier", + "https://chaos.social/users/distantnative", + "https://fosstodon.org/users/kev" + ], + "content" => + "

@bastianallgeier @distantnative @kev Another main argument: Discord is popular. Many people have an account, so you can just join an server quickly. Also you know the app and how to get around.

", + "contentMap" => %{ + "en" => + "

@bastianallgeier @distantnative @kev Another main argument: Discord is popular. Many people have an account, so you can just join an server quickly. Also you know the app and how to get around.

" + }, + "conversation" => + "tag:mastodon.social,2024-07-25:objectId=760068442:objectType=Conversation", + "id" => "https://phpc.social/users/denniskoch/statuses/112847382711461301", + "inReplyTo" => + "https://mastodon.social/users/bastianallgeier/statuses/112846516276907281", + "inReplyToAtomUri" => + "https://mastodon.social/users/bastianallgeier/statuses/112846516276907281", + "published" => "2024-07-25T13:33:29Z", + "replies" => %{ + "first" => %{ + "items" => [], + "next" => + "https://phpc.social/users/denniskoch/statuses/112847382711461301/replies?only_other_accounts=true&page=true", + "partOf" => + "https://phpc.social/users/denniskoch/statuses/112847382711461301/replies", + "type" => "CollectionPage" + }, + "id" => "https://phpc.social/users/denniskoch/statuses/112847382711461301/replies", + "type" => "Collection" + }, + "sensitive" => false, + "tag" => [ + %{ + "href" => "https://mastodon.social/users/bastianallgeier", + "name" => "@bastianallgeier@mastodon.social", + "type" => "Mention" + }, + %{ + "href" => "https://chaos.social/users/distantnative", + "name" => "@distantnative@chaos.social", + "type" => "Mention" + }, + %{ + "href" => "https://fosstodon.org/users/kev", + "name" => "@kev@fosstodon.org", + "type" => "Mention" + } + ], + "to" => ["https://www.w3.org/ns/activitystreams#Public"], + "type" => "Note", + "url" => "https://phpc.social/@denniskoch/112847382711461301" + }, + "published" => "2024-07-25T13:33:29Z", + "signature" => %{ + "created" => "2024-07-25T13:33:29Z", + "creator" => "https://phpc.social/users/denniskoch#main-key", + "signatureValue" => + "slz9BKJzd2n1S44wdXGOU+bV/wsskdgAaUpwxj8R16mYOL8+DTpE6VnfSKoZGsBBJT8uG5gnVfVEz1YsTUYtymeUgLMh7cvd8VnJnZPS+oixbmBRVky/Myf91TEgQQE7G4vDmTdB4ii54hZrHcOOYYf5FKPNRSkMXboKA6LMqNtekhbI+JTUJYIB02WBBK6PUyo15f6B1RJ6HGWVgud9NE0y1EZXfrkqUt682p8/9D49ORf7AwjXUJibKic2RbPvhEBj70qUGfBm4vvgdWhSUn1IG46xh+U0+NrTSUED82j1ZVOeua/2k/igkGs8cSBkY35quXTkPz6gbqCCH66CuA==", + "type" => "RsaSignature2017" + }, + "to" => ["https://www.w3.org/ns/activitystreams#Public"], + "type" => "Create" + } + + req_headers = [ + ["accept-encoding", "gzip"], + ["content-length", "5184"], + ["content-type", "application/activity+json"], + ["date", "Thu, 25 Jul 2024 13:33:31 GMT"], + ["digest", "SHA-256=ouge/6HP2/QryG6F3JNtZ6vzs/hSwMk67xdxe87eH7A="], + ["host", "bikeshed.party"], + [ + "signature", + "keyId=\"https://mastodon.social/users/bastianallgeier#main-key\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest content-type\",signature=\"ymE3vn5Iw50N6ukSp8oIuXJB5SBjGAGjBasdTDvn+ahZIzq2SIJfmVCsIIzyqIROnhWyQoTbavTclVojEqdaeOx+Ejz2wBnRBmhz5oemJLk4RnnCH0lwMWyzeY98YAvxi9Rq57Gojuv/1lBqyGa+rDzynyJpAMyFk17XIZpjMKuTNMCbjMDy76ILHqArykAIL/v1zxkgwxY/+ELzxqMpNqtZ+kQ29znNMUBB3eVZ/mNAHAz6o33Y9VKxM2jw+08vtuIZOusXyiHbRiaj2g5HtN2WBUw1MzzfRfHF2/yy7rcipobeoyk5RvP5SyHV3WrIeZ3iyoNfmv33y8fxllF0EA==\"" + ], + [ + "user-agent", + "http.rb/5.2.0 (Mastodon/4.3.0-nightly.2024-07-25; +https://mastodon.social/)" + ] + ] + + {:ok, oban_job} = + Federator.incoming_ap_doc(%{ + method: "POST", + req_headers: req_headers, + request_path: "/inbox", + params: params, + query_string: "" + }) + + assert {:ok, %Pleroma.Activity{}} = ReceiverWorker.perform(oban_job) + end end From 84b15ac1119396eeb9827fc5242489a4f5cb820b Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 25 Jul 2024 16:18:31 -0400 Subject: [PATCH 10/23] Improve specs and matching --- lib/pleroma/signature.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/pleroma/signature.ex b/lib/pleroma/signature.ex index 51dac24024..0f3362ebec 100644 --- a/lib/pleroma/signature.ex +++ b/lib/pleroma/signature.ex @@ -94,8 +94,8 @@ def signed_date(%NaiveDateTime{} = date) do Timex.format!(date, "{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT") end - @spec validate_signature(map(), String.t()) :: boolean() - def validate_signature(conn, request_target) do + @spec validate_signature(Plug.Conn.t(), String.t()) :: boolean() + def validate_signature(%Plug.Conn{} = conn, request_target) do # Newer drafts for HTTP signatures now use @request-target instead of the # old (request-target). We'll now support both for incoming signatures. conn = @@ -106,8 +106,8 @@ def validate_signature(conn, request_target) do @http_signatures_impl.validate_conn(conn) end - @spec validate_signature(map()) :: boolean() - def validate_signature(conn) do + @spec validate_signature(Plug.Conn.t()) :: boolean() + def validate_signature(%Plug.Conn{} = conn) do # This (request-target) is non-standard, but many implementations do it # this way due to a misinterpretation of # https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06 From c19d55cabb4932b9786fa8a4571d7b92e3925e00 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 25 Jul 2024 16:18:45 -0400 Subject: [PATCH 11/23] Safer string concatenation --- lib/pleroma/signature.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/signature.ex b/lib/pleroma/signature.ex index 0f3362ebec..1955134788 100644 --- a/lib/pleroma/signature.ex +++ b/lib/pleroma/signature.ex @@ -115,13 +115,13 @@ def validate_signature(%Plug.Conn{} = conn) do # show that it must be the absolute path + query. This behavior is kept to # make sure most software (Pleroma itself, Mastodon, and probably others) # do not break. - request_target = String.downcase("#{conn.method}") <> " #{conn.request_path}" + request_target = Enum.join([String.downcase(conn.method), conn.request_path], " ") # This is the proper way to build the @request-target, as expected by # many HTTP signature libraries, clarified in the following draft: # https://www.ietf.org/archive/id/draft-ietf-httpbis-message-signatures-11.html#section-2.2.6 # It is the same as before, but containing the query part as well. - proper_target = request_target <> "?#{conn.query_string}" + proper_target = Enum.join([request_target, "?", conn.query_string], "") cond do # Normal, non-standard behavior but expected by Pleroma and more. From 21cf321f7454edc1a7e00436f66f470edc222775 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 25 Jul 2024 16:36:34 -0400 Subject: [PATCH 12/23] Quiet Dialyzer It is angry we are making a fake %Plug.Conn{} to pass through Signature.validate_signature/1. We can work around it by making the code support a map, but then we lose the benefit of being able to use put_req_header/3 --- .dialyzer_ignore.exs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.dialyzer_ignore.exs b/.dialyzer_ignore.exs index 865e7d7820..692cac9234 100644 --- a/.dialyzer_ignore.exs +++ b/.dialyzer_ignore.exs @@ -2,5 +2,8 @@ {"lib/cachex.ex", "Unknown type: Spec.cache/0."}, {"lib/pleroma/web/plugs/rate_limiter.ex", "The pattern can never match the type {:commit, _} | {:ignore, _}."}, {"lib/pleroma/web/plugs/rate_limiter.ex", "Function get_scale/2 will never be called."}, -{"lib/pleroma/web/plugs/rate_limiter.ex", "Function initialize_buckets!/1 will never be called."} +{"lib/pleroma/web/plugs/rate_limiter.ex", "Function initialize_buckets!/1 will never be called."}, +{"lib/pleroma/workers/receiver_worker.ex", :call}, +{"lib/pleroma/workers/receiver_worker.ex", :pattern_match}, +{"lib/pleroma/workers/receiver_worker.ex", :pattern_match_cov}, ] From 59309a9eff5c2e61b2195945eca21c5126eb3f5f Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Sun, 28 Jul 2024 20:41:21 -0400 Subject: [PATCH 13/23] Publisher job simplification Publisher jobs now store the the activity id instead of inserting duplicate JSON data in the Oban queue for each delivery. --- changelog.d/publisher.change | 1 + lib/pleroma/web/activity_pub/publisher.ex | 41 ++++++++++--------- .../web/activity_pub/publisher_test.exs | 40 +++++++++++------- 3 files changed, 48 insertions(+), 34 deletions(-) create mode 100644 changelog.d/publisher.change diff --git a/changelog.d/publisher.change b/changelog.d/publisher.change new file mode 100644 index 0000000000..0d26d7b005 --- /dev/null +++ b/changelog.d/publisher.change @@ -0,0 +1 @@ +Publisher jobs now store the the activity id instead of inserting duplicate JSON data in the Oban queue for each delivery. diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index c8bdf2250b..cb436e8d80 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -80,13 +80,26 @@ def representable?(%Activity{} = activity) do parameters set: * `inbox`: the inbox to publish to - * `json`: the JSON message body representing the ActivityPub message * `actor`: the actor which is signing the message - * `id`: the ActivityStreams URI of the message + * `activity_id`: the internal activity id + * `cc`: the cc recipients relevant to this inbox (optional) """ - def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do - Logger.debug("Federating #{id} to #{inbox}") + def publish_one(%{inbox: inbox, actor: %User{} = actor, activity_id: activity_id} = params) do + activity = Activity.get_by_id(activity_id) + + ap_id = activity.data["id"] + Logger.debug("Federating #{ap_id} to #{inbox}") uri = %{path: path} = URI.parse(inbox) + + {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) + + cc = Map.get(params, :cc) + + json = + data + |> Map.put("cc", cc) + |> Jason.encode!() + digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) date = Pleroma.Signature.signed_date() @@ -119,7 +132,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa else {_post_result, %{status: code} = response} = e -> unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - Logger.metadata(activity: id, inbox: inbox, status: code) + Logger.metadata(id: activity_id, inbox: inbox, status: code) Logger.error("Publisher failed to inbox #{inbox} with status #{code}") case response do @@ -136,7 +149,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa e -> unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - Logger.metadata(activity: id, inbox: inbox) + Logger.metadata(activity: activity_id, inbox: inbox) Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}") {:error, e} end @@ -251,7 +264,6 @@ def determine_inbox( def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do public = public?(activity) - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) [priority_recipients, recipients] = recipients(actor, activity) @@ -276,16 +288,11 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) # instance would only accept a first message for the first recipient and ignore the rest. cc = get_cc_ap_ids(ap_id, recipients) - json = - data - |> Map.put("cc", cc) - |> Jason.encode!() - __MODULE__.enqueue_one(%{ inbox: inbox, - json: json, + cc: cc, actor_id: actor.id, - id: activity.data["id"], + activity_id: activity.id, unreachable_since: unreachable_since }) end) @@ -302,9 +309,6 @@ def publish(%User{} = actor, %Activity{} = activity) do Relay.publish(activity) end - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - json = Jason.encode!(data) - [priority_inboxes, inboxes] = recipients(actor, activity) |> Enum.map(fn recipients -> @@ -326,9 +330,8 @@ def publish(%User{} = actor, %Activity{} = activity) do __MODULE__.enqueue_one( %{ inbox: inbox, - json: json, actor_id: actor.id, - id: activity.data["id"], + activity_id: activity.id, unreachable_since: unreachable_since }, priority: priority diff --git a/test/pleroma/web/activity_pub/publisher_test.exs b/test/pleroma/web/activity_pub/publisher_test.exs index 6f48a02279..15f29a3860 100644 --- a/test/pleroma/web/activity_pub/publisher_test.exs +++ b/test/pleroma/web/activity_pub/publisher_test.exs @@ -137,6 +137,7 @@ test "it returns inbox for messages involving single recipients in total" do test "publish to url with with different ports" do inbox80 = "http://42.site/users/nick1/inbox" inbox42 = "http://42.site:42/users/nick1/inbox" + activity = insert(:note_activity) mock(fn %{method: :post, url: "http://42.site:42/users/nick1/inbox"} -> @@ -151,18 +152,16 @@ test "publish to url with with different ports" do assert {:ok, %{body: "port 42"}} = Publisher.publish_one(%{ inbox: inbox42, - json: "{}", actor: actor, - id: 1, + activity_id: activity.id, unreachable_since: true }) assert {:ok, %{body: "port 80"}} = Publisher.publish_one(%{ inbox: inbox80, - json: "{}", actor: actor, - id: 1, + activity_id: activity.id, unreachable_since: true }) end @@ -173,8 +172,11 @@ test "publish to url with with different ports" do [] do actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" + activity = insert(:note_activity) + + assert {:ok, _} = + Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id}) - assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) assert called(Instances.set_reachable(inbox)) end @@ -184,13 +186,13 @@ test "publish to url with with different ports" do [] do actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" + activity = insert(:note_activity) assert {:ok, _} = Publisher.publish_one(%{ inbox: inbox, - json: "{}", actor: actor, - id: 1, + activity_id: activity.id, unreachable_since: NaiveDateTime.utc_now() }) @@ -203,13 +205,13 @@ test "publish to url with with different ports" do [] do actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" + activity = insert(:note_activity) assert {:ok, _} = Publisher.publish_one(%{ inbox: inbox, - json: "{}", actor: actor, - id: 1, + activity_id: activity.id, unreachable_since: nil }) @@ -222,9 +224,10 @@ test "publish to url with with different ports" do [] do actor = insert(:user) inbox = "http://404.site/users/nick1/inbox" + activity = insert(:note_activity) assert {:cancel, _} = - Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) + Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id}) assert called(Instances.set_unreachable(inbox)) end @@ -235,10 +238,15 @@ test "publish to url with with different ports" do [] do actor = insert(:user) inbox = "http://connrefused.site/users/nick1/inbox" + activity = insert(:note_activity) assert capture_log(fn -> assert {:error, _} = - Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) + Publisher.publish_one(%{ + inbox: inbox, + actor: actor, + activity_id: activity.id + }) end) =~ "connrefused" assert called(Instances.set_unreachable(inbox)) @@ -250,8 +258,10 @@ test "publish to url with with different ports" do [] do actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" + activity = insert(:note_activity) - assert {:ok, _} = Publisher.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1}) + assert {:ok, _} = + Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id}) refute called(Instances.set_unreachable(inbox)) end @@ -262,14 +272,14 @@ test "publish to url with with different ports" do [] do actor = insert(:user) inbox = "http://connrefused.site/users/nick1/inbox" + activity = insert(:note_activity) assert capture_log(fn -> assert {:error, _} = Publisher.publish_one(%{ inbox: inbox, - json: "{}", actor: actor, - id: 1, + activity_id: activity.id, unreachable_since: NaiveDateTime.utc_now() }) end) =~ "connrefused" @@ -406,7 +416,7 @@ test "publish to url with with different ports" do Publisher.enqueue_one(%{ inbox: "https://domain.com/users/nick1/inbox", actor_id: actor.id, - id: note_activity.data["id"] + activity_id: note_activity.id }) ) end From 74072622e08dd1efdc7bf69c3278250ea1efb22e Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 29 Jul 2024 09:52:13 -0400 Subject: [PATCH 14/23] Remove actor and actor_id from the job as it can be inferred by the activity --- lib/pleroma/web/activity_pub/publisher.ex | 17 ++----- .../web/activity_pub/publisher_test.exs | 44 +++++++------------ 2 files changed, 19 insertions(+), 42 deletions(-) diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index cb436e8d80..373bf6e416 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -80,12 +80,12 @@ def representable?(%Activity{} = activity) do parameters set: * `inbox`: the inbox to publish to - * `actor`: the actor which is signing the message * `activity_id`: the internal activity id * `cc`: the cc recipients relevant to this inbox (optional) """ - def publish_one(%{inbox: inbox, actor: %User{} = actor, activity_id: activity_id} = params) do - activity = Activity.get_by_id(activity_id) + def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do + activity = Activity.get_by_id_with_user_actor(activity_id) + actor = activity.user_actor ap_id = activity.data["id"] Logger.debug("Federating #{ap_id} to #{inbox}") @@ -155,15 +155,6 @@ def publish_one(%{inbox: inbox, actor: %User{} = actor, activity_id: activity_id end end - def publish_one(%{actor_id: actor_id} = params) do - actor = User.get_cached_by_id(actor_id) - - params - |> Map.delete(:actor_id) - |> Map.put(:actor, actor) - |> publish_one() - end - defp signature_host(%URI{port: port, scheme: scheme, host: host}) do if port == URI.default_port(scheme) do host @@ -291,7 +282,6 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) __MODULE__.enqueue_one(%{ inbox: inbox, cc: cc, - actor_id: actor.id, activity_id: activity.id, unreachable_since: unreachable_since }) @@ -330,7 +320,6 @@ def publish(%User{} = actor, %Activity{} = activity) do __MODULE__.enqueue_one( %{ inbox: inbox, - actor_id: actor.id, activity_id: activity.id, unreachable_since: unreachable_since }, diff --git a/test/pleroma/web/activity_pub/publisher_test.exs b/test/pleroma/web/activity_pub/publisher_test.exs index 15f29a3860..569b6af1a6 100644 --- a/test/pleroma/web/activity_pub/publisher_test.exs +++ b/test/pleroma/web/activity_pub/publisher_test.exs @@ -147,12 +147,11 @@ test "publish to url with with different ports" do {:ok, %Tesla.Env{status: 200, body: "port 80"}} end) - actor = insert(:user) + _actor = insert(:user) assert {:ok, %{body: "port 42"}} = Publisher.publish_one(%{ inbox: inbox42, - actor: actor, activity_id: activity.id, unreachable_since: true }) @@ -160,7 +159,6 @@ test "publish to url with with different ports" do assert {:ok, %{body: "port 80"}} = Publisher.publish_one(%{ inbox: inbox80, - actor: actor, activity_id: activity.id, unreachable_since: true }) @@ -170,12 +168,12 @@ test "publish to url with with different ports" do Instances, [:passthrough], [] do - actor = insert(:user) + _actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" activity = insert(:note_activity) assert {:ok, _} = - Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id}) + Publisher.publish_one(%{inbox: inbox, activity_id: activity.id}) assert called(Instances.set_reachable(inbox)) end @@ -184,14 +182,13 @@ test "publish to url with with different ports" do Instances, [:passthrough], [] do - actor = insert(:user) + _actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" activity = insert(:note_activity) assert {:ok, _} = Publisher.publish_one(%{ inbox: inbox, - actor: actor, activity_id: activity.id, unreachable_since: NaiveDateTime.utc_now() }) @@ -203,14 +200,13 @@ test "publish to url with with different ports" do Instances, [:passthrough], [] do - actor = insert(:user) + _actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" activity = insert(:note_activity) assert {:ok, _} = Publisher.publish_one(%{ inbox: inbox, - actor: actor, activity_id: activity.id, unreachable_since: nil }) @@ -222,12 +218,12 @@ test "publish to url with with different ports" do Instances, [:passthrough], [] do - actor = insert(:user) + _actor = insert(:user) inbox = "http://404.site/users/nick1/inbox" activity = insert(:note_activity) assert {:cancel, _} = - Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id}) + Publisher.publish_one(%{inbox: inbox, activity_id: activity.id}) assert called(Instances.set_unreachable(inbox)) end @@ -236,7 +232,7 @@ test "publish to url with with different ports" do Instances, [:passthrough], [] do - actor = insert(:user) + _actor = insert(:user) inbox = "http://connrefused.site/users/nick1/inbox" activity = insert(:note_activity) @@ -244,7 +240,6 @@ test "publish to url with with different ports" do assert {:error, _} = Publisher.publish_one(%{ inbox: inbox, - actor: actor, activity_id: activity.id }) end) =~ "connrefused" @@ -256,12 +251,12 @@ test "publish to url with with different ports" do Instances, [:passthrough], [] do - actor = insert(:user) + _actor = insert(:user) inbox = "http://200.site/users/nick1/inbox" activity = insert(:note_activity) assert {:ok, _} = - Publisher.publish_one(%{inbox: inbox, actor: actor, activity_id: activity.id}) + Publisher.publish_one(%{inbox: inbox, activity_id: activity.id}) refute called(Instances.set_unreachable(inbox)) end @@ -270,7 +265,7 @@ test "publish to url with with different ports" do Instances, [:passthrough], [] do - actor = insert(:user) + _actor = insert(:user) inbox = "http://connrefused.site/users/nick1/inbox" activity = insert(:note_activity) @@ -278,7 +273,6 @@ test "publish to url with with different ports" do assert {:error, _} = Publisher.publish_one(%{ inbox: inbox, - actor: actor, activity_id: activity.id, unreachable_since: NaiveDateTime.utc_now() }) @@ -319,8 +313,7 @@ test "publish to url with with different ports" do assert not called( Publisher.enqueue_one(%{ inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, - id: note_activity.data["id"] + activity_id: note_activity.id }) ) end @@ -356,8 +349,7 @@ test "publish to url with with different ports" do Publisher.enqueue_one( %{ inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, - id: note_activity.data["id"] + activity_id: note_activity.id }, priority: 1 ) @@ -380,8 +372,7 @@ test "publish to url with with different ports" do Publisher.enqueue_one( %{ inbox: :_, - actor_id: actor.id, - id: note_activity.data["id"] + activity_id: note_activity.id }, priority: 0 ) @@ -415,7 +406,6 @@ test "publish to url with with different ports" do assert called( Publisher.enqueue_one(%{ inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, activity_id: note_activity.id }) ) @@ -466,8 +456,7 @@ test "publish to url with with different ports" do Publisher.enqueue_one( %{ inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, - id: delete.data["id"] + activity_id: delete.id }, priority: 1 ) @@ -477,8 +466,7 @@ test "publish to url with with different ports" do Publisher.enqueue_one( %{ inbox: "https://domain2.com/users/nick1/inbox", - actor_id: actor.id, - id: delete.data["id"] + activity_id: delete.id }, priority: 1 ) From 8893ad98997197bd89e98f7dd18825dcb1206aa4 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 29 Jul 2024 09:59:35 -0400 Subject: [PATCH 15/23] Fix cancelling jobs --- lib/pleroma/web/common_api.ex | 4 ++-- test/pleroma/web/common_api_test.exs | 32 ++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/lib/pleroma/web/common_api.ex b/lib/pleroma/web/common_api.ex index 06faf845eb..b90b6a6d90 100644 --- a/lib/pleroma/web/common_api.ex +++ b/lib/pleroma/web/common_api.ex @@ -714,11 +714,11 @@ def get_user(ap_id, fake_record_fallback \\ true) do end end - defp maybe_cancel_jobs(%Activity{data: %{"id" => ap_id}}) do + defp maybe_cancel_jobs(%Activity{id: activity_id}) do Oban.Job |> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.args["op"] == "publish_one") - |> where([j], j.args["params"]["id"] == ^ap_id) + |> where([j], j.args["params"]["activity_id"] == ^activity_id) |> Oban.cancel_all_jobs() end diff --git a/test/pleroma/web/common_api_test.exs b/test/pleroma/web/common_api_test.exs index b6fba69995..4cdd3cffaa 100644 --- a/test/pleroma/web/common_api_test.exs +++ b/test/pleroma/web/common_api_test.exs @@ -1957,7 +1957,7 @@ test "when deleting posts", %{ {:ok, _, _} = Pleroma.User.follow(remote_one, local_user) {:ok, _, _} = Pleroma.User.follow(remote_two, local_user) - {:ok, %{data: %{"id" => ap_id}} = activity} = + {:ok, %{id: activity_id} = _activity} = CommonAPI.post(local_user, %{status: "Happy Friday everyone!"}) # Generate the publish_one jobs @@ -1971,7 +1971,7 @@ test "when deleting posts", %{ state: "available", queue: "federator_outgoing", worker: "Pleroma.Workers.PublisherWorker", - args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} + args: %{"op" => "publish_one", "params" => %{"activity_id" => ^activity_id}} }, job ) @@ -1980,7 +1980,7 @@ test "when deleting posts", %{ assert length(publish_one_jobs) == 2 # The delete should have triggered cancelling the publish_one jobs - assert {:ok, _delete} = CommonAPI.delete(activity.id, local_user) + assert {:ok, _delete} = CommonAPI.delete(activity_id, local_user) # all_enqueued/1 will not return cancelled jobs cancelled_jobs = @@ -1988,7 +1988,7 @@ test "when deleting posts", %{ |> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.state == "cancelled") |> where([j], j.args["op"] == "publish_one") - |> where([j], j.args["params"]["id"] == ^ap_id) + |> where([j], j.args["params"]["activity_id"] == ^activity_id) |> Pleroma.Repo.all() assert length(cancelled_jobs) == 2 @@ -2001,7 +2001,7 @@ test "when unfavoriting posts", %{ {:ok, activity} = CommonAPI.post(remote_user, %{status: "I like turtles!"}) - {:ok, %{data: %{"id" => ap_id}} = _favorite} = + {:ok, %{id: favorite_id} = _favorite} = CommonAPI.favorite(activity.id, local_user) # Generate the publish_one jobs @@ -2015,7 +2015,7 @@ test "when unfavoriting posts", %{ state: "available", queue: "federator_outgoing", worker: "Pleroma.Workers.PublisherWorker", - args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} + args: %{"op" => "publish_one", "params" => %{"activity_id" => ^favorite_id}} }, job ) @@ -2032,7 +2032,7 @@ test "when unfavoriting posts", %{ |> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.state == "cancelled") |> where([j], j.args["op"] == "publish_one") - |> where([j], j.args["params"]["id"] == ^ap_id) + |> where([j], j.args["params"]["activity_id"] == ^favorite_id) |> Pleroma.Repo.all() assert length(cancelled_jobs) == 1 @@ -2049,7 +2049,7 @@ test "when unboosting posts", %{ {:ok, activity} = CommonAPI.post(remote_one, %{status: "This is an unpleasant post"}) - {:ok, %{data: %{"id" => ap_id}} = _repeat} = + {:ok, %{id: repeat_id} = _repeat} = CommonAPI.repeat(activity.id, local_user) # Generate the publish_one jobs @@ -2063,7 +2063,7 @@ test "when unboosting posts", %{ state: "available", queue: "federator_outgoing", worker: "Pleroma.Workers.PublisherWorker", - args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} + args: %{"op" => "publish_one", "params" => %{"activity_id" => ^repeat_id}} }, job ) @@ -2080,7 +2080,7 @@ test "when unboosting posts", %{ |> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.state == "cancelled") |> where([j], j.args["op"] == "publish_one") - |> where([j], j.args["params"]["id"] == ^ap_id) + |> where([j], j.args["params"]["activity_id"] == ^repeat_id) |> Pleroma.Repo.all() assert length(cancelled_jobs) == 2 @@ -2094,11 +2094,11 @@ test "when unreacting to posts", %{ {:ok, _, _} = Pleroma.User.follow(remote_one, local_user) {:ok, _, _} = Pleroma.User.follow(remote_two, local_user) - {:ok, activity} = + {:ok, %{id: activity_id}} = CommonAPI.post(remote_one, %{status: "Gang gang!!!!"}) - {:ok, %{data: %{"id" => ap_id}} = _react} = - CommonAPI.react_with_emoji(activity.id, local_user, "👍") + {:ok, %{id: react_id} = _react} = + CommonAPI.react_with_emoji(activity_id, local_user, "👍") # Generate the publish_one jobs ObanHelpers.perform_all() @@ -2111,7 +2111,7 @@ test "when unreacting to posts", %{ state: "available", queue: "federator_outgoing", worker: "Pleroma.Workers.PublisherWorker", - args: %{"op" => "publish_one", "params" => %{"id" => ^ap_id}} + args: %{"op" => "publish_one", "params" => %{"activity_id" => ^react_id}} }, job ) @@ -2120,7 +2120,7 @@ test "when unreacting to posts", %{ assert length(publish_one_jobs) == 2 # The unreact should have triggered cancelling the publish_one jobs - assert {:ok, _unreact} = CommonAPI.unreact_with_emoji(activity.id, local_user, "👍") + assert {:ok, _unreact} = CommonAPI.unreact_with_emoji(activity_id, local_user, "👍") # all_enqueued/1 will not return cancelled jobs cancelled_jobs = @@ -2128,7 +2128,7 @@ test "when unreacting to posts", %{ |> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.state == "cancelled") |> where([j], j.args["op"] == "publish_one") - |> where([j], j.args["params"]["id"] == ^ap_id) + |> where([j], j.args["params"]["activity_id"] == ^react_id) |> Pleroma.Repo.all() assert length(cancelled_jobs) == 2 From b48fd89a41ad766b79a7a2336737196216cede22 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 29 Jul 2024 10:03:22 -0400 Subject: [PATCH 16/23] Revert unintended change to the Logger metadata tag name --- lib/pleroma/web/activity_pub/publisher.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 373bf6e416..e040753dc6 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -132,7 +132,7 @@ def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do else {_post_result, %{status: code} = response} = e -> unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - Logger.metadata(id: activity_id, inbox: inbox, status: code) + Logger.metadata(activity: activity_id, inbox: inbox, status: code) Logger.error("Publisher failed to inbox #{inbox} with status #{code}") case response do From 05d4989795b79683933aa602cd427c57ded10e2f Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 29 Jul 2024 13:54:26 -0400 Subject: [PATCH 17/23] Insert replacement jobs in the new format if any remain undelivered The old jobs remain and will fail gracefully --- .../20240729163838_publisher_job_change.exs | 27 ++++++++++++ .../publisher_migration_change_test.exs | 43 +++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 priv/repo/migrations/20240729163838_publisher_job_change.exs create mode 100644 test/pleroma/repo/migrations/publisher_migration_change_test.exs diff --git a/priv/repo/migrations/20240729163838_publisher_job_change.exs b/priv/repo/migrations/20240729163838_publisher_job_change.exs new file mode 100644 index 0000000000..08d73b5ad1 --- /dev/null +++ b/priv/repo/migrations/20240729163838_publisher_job_change.exs @@ -0,0 +1,27 @@ +defmodule Pleroma.Repo.Migrations.PublisherJobChange do + use Ecto.Migration + + alias Pleroma.Activity + import Ecto.Query + + def up do + query = + from(j in Oban.Job, + where: j.worker == "Pleroma.Workers.PublisherWorker", + where: j.state in ["available", "retryable"] + ) + + jobs = + Oban |> Oban.config() |> Oban.Repo.all(query) + + Enum.each(jobs, fn job -> + args = job.args + activity = Activity.get_by_ap_id(args["id"]) + + updated_args = Map.put(args, "activity_id", activity.id) + + Pleroma.Workers.PublisherWorker.new(updated_args) + |> Oban.insert() + end) + end +end diff --git a/test/pleroma/repo/migrations/publisher_migration_change_test.exs b/test/pleroma/repo/migrations/publisher_migration_change_test.exs new file mode 100644 index 0000000000..9c035e604e --- /dev/null +++ b/test/pleroma/repo/migrations/publisher_migration_change_test.exs @@ -0,0 +1,43 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Repo.Migrations.PublisherMigrationChangeTest do + use Oban.Testing, repo: Pleroma.Repo + use Pleroma.DataCase + import Pleroma.Factory + import Pleroma.Tests.Helpers + + alias Pleroma.Activity + alias Pleroma.Workers.PublisherWorker + + setup_all do: require_migration("20240729163838_publisher_job_change") + + describe "up/0" do + test "migrates publisher jobs to new format", %{migration: migration} do + user = insert(:user) + + %Activity{id: activity_id, data: %{"id" => ap_id}} = + insert(:note_activity, user: user) + + {:ok, %{id: job_id}} = + PublisherWorker.new(%{ + "actor_id" => user.id, + "json" => "{}", + "id" => ap_id, + "inbox" => "https://example.com/inbox", + "unreachable_since" => nil + }) + |> Oban.insert() + + assert [%{id: ^job_id, args: %{"id" => ^ap_id}}] = all_enqueued(worker: PublisherWorker) + + assert migration.up() == :ok + + assert_enqueued( + worker: PublisherWorker, + args: %{"id" => ap_id, "activity_id" => activity_id} + ) + end + end +end From 66649e1dcd95035ba3eb04c59f359c2ee4b965dc Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Tue, 30 Jul 2024 09:49:09 -0400 Subject: [PATCH 18/23] Remove unused Oban queue --- changelog.d/oban-transmogrifier.skip | 0 config/config.exs | 1 - 2 files changed, 1 deletion(-) create mode 100644 changelog.d/oban-transmogrifier.skip diff --git a/changelog.d/oban-transmogrifier.skip b/changelog.d/oban-transmogrifier.skip new file mode 100644 index 0000000000..e69de29bb2 diff --git a/config/config.exs b/config/config.exs index 2f0d9d7d2e..4780892f7a 100644 --- a/config/config.exs +++ b/config/config.exs @@ -590,7 +590,6 @@ federator_incoming: 5, federator_outgoing: 5, web_push: 50, - transmogrifier: 20, background: 20, search_indexing: [limit: 10, paused: true], slow: 5 From 1bce582f0de896b2a84cc2ef44f82646276dc255 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Tue, 30 Jul 2024 10:48:06 -0400 Subject: [PATCH 19/23] Fix migration crashing due to Oban not running We can use Pleroma.Repo to fetch the jobs --- priv/repo/migrations/20240729163838_publisher_job_change.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/priv/repo/migrations/20240729163838_publisher_job_change.exs b/priv/repo/migrations/20240729163838_publisher_job_change.exs index 08d73b5ad1..3449e3b3b4 100644 --- a/priv/repo/migrations/20240729163838_publisher_job_change.exs +++ b/priv/repo/migrations/20240729163838_publisher_job_change.exs @@ -2,6 +2,7 @@ defmodule Pleroma.Repo.Migrations.PublisherJobChange do use Ecto.Migration alias Pleroma.Activity + alias Pleroma.Repo import Ecto.Query def up do @@ -11,8 +12,7 @@ def up do where: j.state in ["available", "retryable"] ) - jobs = - Oban |> Oban.config() |> Oban.Repo.all(query) + jobs = Repo.all(query) Enum.each(jobs, fn job -> args = job.args From a6119210b73aab7071c57f8b6500fe3237ade720 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Tue, 30 Jul 2024 11:06:55 -0400 Subject: [PATCH 20/23] Increase federator outgoing job parallelism --- changelog.d/federator_outgoing_increase.change | 1 + config/config.exs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/federator_outgoing_increase.change diff --git a/changelog.d/federator_outgoing_increase.change b/changelog.d/federator_outgoing_increase.change new file mode 100644 index 0000000000..7e68a79bcd --- /dev/null +++ b/changelog.d/federator_outgoing_increase.change @@ -0,0 +1 @@ +Increase outgoing federation parallelism diff --git a/config/config.exs b/config/config.exs index 4780892f7a..226d73df41 100644 --- a/config/config.exs +++ b/config/config.exs @@ -588,7 +588,7 @@ queues: [ activity_expiration: 10, federator_incoming: 5, - federator_outgoing: 5, + federator_outgoing: 25, web_push: 50, background: 20, search_indexing: [limit: 10, paused: true], From 49f46220ff670c68363da43db135c5d35229cc5d Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Tue, 30 Jul 2024 11:11:30 -0400 Subject: [PATCH 21/23] Align Hackney and Gun connection pool timeouts --- changelog.d/hackney-pool-timeouts.change | 1 + config/config.exs | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 changelog.d/hackney-pool-timeouts.change diff --git a/changelog.d/hackney-pool-timeouts.change b/changelog.d/hackney-pool-timeouts.change new file mode 100644 index 0000000000..d763fe602e --- /dev/null +++ b/changelog.d/hackney-pool-timeouts.change @@ -0,0 +1 @@ +Change Hackney connection pool timeouts to align with the values Gun uses diff --git a/config/config.exs b/config/config.exs index 4780892f7a..d9754d9c8c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -858,19 +858,19 @@ config :pleroma, :hackney_pools, federation: [ max_connections: 50, - timeout: 150_000 + timeout: 10_000 ], media: [ max_connections: 50, - timeout: 150_000 + timeout: 15_000 ], rich_media: [ max_connections: 50, - timeout: 150_000 + timeout: 15_000 ], upload: [ max_connections: 25, - timeout: 300_000 + timeout: 15_000 ] config :pleroma, :majic_pool, size: 2 From b50261262ebc90739b8868ca588d3a8d6ed43b7d Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Tue, 30 Jul 2024 11:48:10 -0400 Subject: [PATCH 22/23] Fix publisher job migration error --- changelog.d/fix-migration.skip | 0 .../20240729163838_publisher_job_change.exs | 13 +++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) create mode 100644 changelog.d/fix-migration.skip diff --git a/changelog.d/fix-migration.skip b/changelog.d/fix-migration.skip new file mode 100644 index 0000000000..e69de29bb2 diff --git a/priv/repo/migrations/20240729163838_publisher_job_change.exs b/priv/repo/migrations/20240729163838_publisher_job_change.exs index 3449e3b3b4..cbea18205e 100644 --- a/priv/repo/migrations/20240729163838_publisher_job_change.exs +++ b/priv/repo/migrations/20240729163838_publisher_job_change.exs @@ -16,12 +16,17 @@ def up do Enum.each(jobs, fn job -> args = job.args - activity = Activity.get_by_ap_id(args["id"]) - updated_args = Map.put(args, "activity_id", activity.id) + case Activity.get_by_ap_id(args["id"]) do + nil -> + :ok - Pleroma.Workers.PublisherWorker.new(updated_args) - |> Oban.insert() + %Activity{id: activity_id} -> + updated_args = Map.put(args, "activity_id", activity_id) + + Pleroma.Workers.PublisherWorker.new(updated_args) + |> Oban.insert() + end end) end end From e6951e7e409471a5fa68bd2387a758d6abb1f728 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Wed, 31 Jul 2024 14:14:31 -0400 Subject: [PATCH 23/23] Fix User.disclose_client never working correctly Our test environment cheats by constructing a conn with a custom oauth_access/2 function. This assigns a :token to the conn but due to the way it is constructed it has the :user preloaded. When the OAuth Plug fetches a token it does not preload the user, so the check for user.disclose_client was always nil and assumed to be false. Preloading the :user ensures the test environment matches reality. --- changelog.d/disclose_client.fix | 1 + lib/pleroma/web/plugs/o_auth_plug.ex | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/disclose_client.fix diff --git a/changelog.d/disclose_client.fix b/changelog.d/disclose_client.fix new file mode 100644 index 0000000000..938abc930e --- /dev/null +++ b/changelog.d/disclose_client.fix @@ -0,0 +1 @@ +Client application data was always missing from the status diff --git a/lib/pleroma/web/plugs/o_auth_plug.ex b/lib/pleroma/web/plugs/o_auth_plug.ex index b59ac9d3ea..488968691d 100644 --- a/lib/pleroma/web/plugs/o_auth_plug.ex +++ b/lib/pleroma/web/plugs/o_auth_plug.ex @@ -52,7 +52,7 @@ defp fetch_user_and_token(token) do where: t.token == ^token ) - with %Token{user_id: user_id} = token_record <- Repo.one(token_query), + with %Token{user_id: user_id} = token_record <- Repo.one(token_query) |> Repo.preload(:user), false <- is_nil(user_id), %User{} = user <- User.get_cached_by_id(user_id) do {:ok, user, token_record}