diff --git a/.dialyzer_ignore.exs b/.dialyzer_ignore.exs index 865e7d7820..692cac9234 100644 --- a/.dialyzer_ignore.exs +++ b/.dialyzer_ignore.exs @@ -2,5 +2,8 @@ {"lib/cachex.ex", "Unknown type: Spec.cache/0."}, {"lib/pleroma/web/plugs/rate_limiter.ex", "The pattern can never match the type {:commit, _} | {:ignore, _}."}, {"lib/pleroma/web/plugs/rate_limiter.ex", "Function get_scale/2 will never be called."}, -{"lib/pleroma/web/plugs/rate_limiter.ex", "Function initialize_buckets!/1 will never be called."} +{"lib/pleroma/web/plugs/rate_limiter.ex", "Function initialize_buckets!/1 will never be called."}, +{"lib/pleroma/workers/receiver_worker.ex", :call}, +{"lib/pleroma/workers/receiver_worker.ex", :pattern_match}, +{"lib/pleroma/workers/receiver_worker.ex", :pattern_match_cov}, ] diff --git a/changelog.d/backups-refactor.change b/changelog.d/backups-refactor.change new file mode 100644 index 0000000000..47fc741385 --- /dev/null +++ b/changelog.d/backups-refactor.change @@ -0,0 +1 @@ +Refactor the user backups code and improve test coverage diff --git a/changelog.d/disclose_client.fix b/changelog.d/disclose_client.fix new file mode 100644 index 0000000000..938abc930e --- /dev/null +++ b/changelog.d/disclose_client.fix @@ -0,0 +1 @@ +Client application data was always missing from the status diff --git a/changelog.d/federator_outgoing_increase.change b/changelog.d/federator_outgoing_increase.change new file mode 100644 index 0000000000..7e68a79bcd --- /dev/null +++ b/changelog.d/federator_outgoing_increase.change @@ -0,0 +1 @@ +Increase outgoing federation parallelism diff --git a/changelog.d/fix-migration.skip b/changelog.d/fix-migration.skip new file mode 100644 index 0000000000..e69de29bb2 diff --git a/changelog.d/hackney-pool-timeouts.change b/changelog.d/hackney-pool-timeouts.change new file mode 100644 index 0000000000..d763fe602e --- /dev/null +++ b/changelog.d/hackney-pool-timeouts.change @@ -0,0 +1 @@ +Change Hackney connection pool timeouts to align with the values Gun uses diff --git a/changelog.d/oban-transmogrifier.skip b/changelog.d/oban-transmogrifier.skip new file mode 100644 index 0000000000..e69de29bb2 diff --git a/changelog.d/optimistic-inbox-sigs.fix b/changelog.d/optimistic-inbox-sigs.fix new file mode 100644 index 0000000000..53ffe6b5bb --- /dev/null +++ b/changelog.d/optimistic-inbox-sigs.fix @@ -0,0 +1 @@ +Fix Optimistic Inbox for failed signatures diff --git a/changelog.d/publisher.change b/changelog.d/publisher.change new file mode 100644 index 0000000000..0d26d7b005 --- /dev/null +++ b/changelog.d/publisher.change @@ -0,0 +1 @@ +Publisher jobs now store the the activity id instead of inserting duplicate JSON data in the Oban queue for each delivery. diff --git a/config/config.exs b/config/config.exs index 4133c66b61..437116decf 100644 --- a/config/config.exs +++ b/config/config.exs @@ -598,10 +598,8 @@ queues: [ activity_expiration: 10, federator_incoming: 5, - federator_outgoing: 5, + federator_outgoing: 25, web_push: 50, - transmogrifier: 20, - notifications: 20, background: 20, search_indexing: [limit: 10, paused: true], slow: 5 @@ -882,19 +880,19 @@ config :pleroma, :hackney_pools, federation: [ max_connections: 50, - timeout: 150_000 + timeout: 10_000 ], media: [ max_connections: 50, - timeout: 150_000 + timeout: 15_000 ], rich_media: [ max_connections: 50, - timeout: 150_000 + timeout: 15_000 ], upload: [ max_connections: 25, - timeout: 300_000 + timeout: 15_000 ] config :pleroma, :majic_pool, size: 2 @@ -933,8 +931,8 @@ purge_after_days: 30, limit_days: 7, dir: nil, - process_wait_time: 30_000, - process_chunk_size: 100 + process_chunk_size: 100, + timeout: :timer.minutes(30) config :pleroma, ConcurrentLimiter, [ {Pleroma.Search, [max_running: 30, max_waiting: 50]}, diff --git a/config/description.exs b/config/description.exs index 7d7241c455..3720805c1e 100644 --- a/config/description.exs +++ b/config/description.exs @@ -3386,20 +3386,19 @@ description: "Limit user to export not more often than once per N days", suggestions: [7] }, - %{ - key: :process_wait_time, - type: :integer, - label: "Process Wait Time", - description: - "The amount of time to wait for backup to report progress, in milliseconds. If no progress is received from the backup job for that much time, terminate it and deem it failed.", - suggestions: [30_000] - }, %{ key: :process_chunk_size, type: :integer, label: "Process Chunk Size", description: "The number of activities to fetch in the backup job for each chunk.", suggestions: [100] + }, + %{ + key: :timeout, + type: :integer, + label: "Timeout", + description: "The amount of time to wait for backup to complete in seconds.", + suggestions: [1_800] } ] }, diff --git a/config/test.exs b/config/test.exs index 32d5082cf0..c71cb515eb 100644 --- a/config/test.exs +++ b/config/test.exs @@ -170,8 +170,7 @@ config :pleroma, Pleroma.Web.Plugs.HTTPSecurityPlug, config_impl: Pleroma.StaticStubbedConfigMock config :pleroma, Pleroma.Web.Plugs.HTTPSignaturePlug, config_impl: Pleroma.StaticStubbedConfigMock -config :pleroma, Pleroma.Web.Plugs.HTTPSignaturePlug, - http_signatures_impl: Pleroma.StubbedHTTPSignaturesMock +config :pleroma, Pleroma.Signature, http_signatures_impl: Pleroma.StubbedHTTPSignaturesMock peer_module = if String.to_integer(System.otp_release()) >= 25 do @@ -201,6 +200,8 @@ config :pleroma, Pleroma.Web.Plugs.HTTPSecurityPlug, enable: false +config :pleroma, Pleroma.User.Backup, tempdir: "test/tmp" + if File.exists?("./config/test.secret.exs") do import_config "test.secret.exs" else diff --git a/docs/configuration/cheatsheet.md b/docs/configuration/cheatsheet.md index c2f5556d0b..88b5cccb37 100644 --- a/docs/configuration/cheatsheet.md +++ b/docs/configuration/cheatsheet.md @@ -1171,6 +1171,7 @@ Control favicons for instances. 3. the directory named by the TMP environment variable 4. C:\TMP on Windows or /tmp on Unix-like operating systems 5. as a last resort, the current working directory +* `:timeout` an integer representing seconds ## Frontend management diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex index b346b39d6e..a4890b489a 100644 --- a/lib/pleroma/ecto_enums.ex +++ b/lib/pleroma/ecto_enums.ex @@ -27,11 +27,3 @@ failed: 4, manual: 5 ) - -defenum(Pleroma.User.Backup.State, - pending: 1, - running: 2, - complete: 3, - failed: 4, - invalid: 5 -) diff --git a/lib/pleroma/emails/user_email.ex b/lib/pleroma/emails/user_email.ex index 95b963764e..10d89d2f34 100644 --- a/lib/pleroma/emails/user_email.ex +++ b/lib/pleroma/emails/user_email.ex @@ -345,37 +345,22 @@ def unsubscribe_url(user, notifications_type) do Router.Helpers.subscription_url(Endpoint, :unsubscribe, token) end - def backup_is_ready_email(backup, admin_user_id \\ nil) do + def backup_is_ready_email(backup) do %{user: user} = Pleroma.Repo.preload(backup, :user) Gettext.with_locale_or_default user.language do download_url = Pleroma.Web.PleromaAPI.BackupView.download_url(backup) html_body = - if is_nil(admin_user_id) do - Gettext.dpgettext( - "static_pages", - "account archive email body - self-requested", - """ -
You requested a full backup of your Pleroma account. It's ready for download:
- - """, - download_url: download_url - ) - else - admin = Pleroma.Repo.get(User, admin_user_id) - - Gettext.dpgettext( - "static_pages", - "account archive email body - admin requested", - """ -Admin @%{admin_nickname} requested a full backup of your Pleroma account. It's ready for download:
- - """, - admin_nickname: admin.nickname, - download_url: download_url - ) - end + Gettext.dpgettext( + "static_pages", + "account archive email body", + """ +A full backup of your Pleroma account was requested. It's ready for download:
+ + """, + download_url: download_url + ) new() |> to(recipient(user)) diff --git a/lib/pleroma/signature.ex b/lib/pleroma/signature.ex index 900d40c4bb..1955134788 100644 --- a/lib/pleroma/signature.ex +++ b/lib/pleroma/signature.ex @@ -10,6 +10,14 @@ defmodule Pleroma.Signature do alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub + import Plug.Conn, only: [put_req_header: 3] + + @http_signatures_impl Application.compile_env( + :pleroma, + [__MODULE__, :http_signatures_impl], + HTTPSignatures + ) + @known_suffixes ["/publickey", "/main-key"] def key_id_to_actor_id(key_id) do @@ -85,4 +93,48 @@ def signed_date, do: signed_date(NaiveDateTime.utc_now()) def signed_date(%NaiveDateTime{} = date) do Timex.format!(date, "{WDshort}, {0D} {Mshort} {YYYY} {h24}:{m}:{s} GMT") end + + @spec validate_signature(Plug.Conn.t(), String.t()) :: boolean() + def validate_signature(%Plug.Conn{} = conn, request_target) do + # Newer drafts for HTTP signatures now use @request-target instead of the + # old (request-target). We'll now support both for incoming signatures. + conn = + conn + |> put_req_header("(request-target)", request_target) + |> put_req_header("@request-target", request_target) + + @http_signatures_impl.validate_conn(conn) + end + + @spec validate_signature(Plug.Conn.t()) :: boolean() + def validate_signature(%Plug.Conn{} = conn) do + # This (request-target) is non-standard, but many implementations do it + # this way due to a misinterpretation of + # https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06 + # "path" was interpreted as not having the query, though later examples + # show that it must be the absolute path + query. This behavior is kept to + # make sure most software (Pleroma itself, Mastodon, and probably others) + # do not break. + request_target = Enum.join([String.downcase(conn.method), conn.request_path], " ") + + # This is the proper way to build the @request-target, as expected by + # many HTTP signature libraries, clarified in the following draft: + # https://www.ietf.org/archive/id/draft-ietf-httpbis-message-signatures-11.html#section-2.2.6 + # It is the same as before, but containing the query part as well. + proper_target = Enum.join([request_target, "?", conn.query_string], "") + + cond do + # Normal, non-standard behavior but expected by Pleroma and more. + validate_signature(conn, request_target) -> + true + + # Has query string and the previous one failed: let's try the standard. + conn.query_string != "" -> + validate_signature(conn, proper_target) + + # If there's no query string and signature fails, it's rotten. + true -> + false + end + end end diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 7db415a458..df6fbeada2 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -15,9 +15,10 @@ defmodule Pleroma.User.Backup do alias Pleroma.Activity alias Pleroma.Bookmark alias Pleroma.Chat + alias Pleroma.Config alias Pleroma.Repo + alias Pleroma.Uploaders.Uploader alias Pleroma.User - alias Pleroma.User.Backup.State alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.UserView @@ -30,71 +31,111 @@ defmodule Pleroma.User.Backup do field(:file_name, :string) field(:file_size, :integer, default: 0) field(:processed, :boolean, default: false) - field(:state, State, default: :invalid) - field(:processed_number, :integer, default: 0) + field(:tempdir, :string) belongs_to(:user, User, type: FlakeId.Ecto.CompatType) timestamps() end - @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config) + @doc """ + Schedules a job to backup a user if the number of backup requests has not exceeded the limit. - def create(user, admin_id \\ nil) do - with :ok <- validate_limit(user, admin_id), - {:ok, backup} <- user |> new() |> Repo.insert() do - BackupWorker.process(backup, admin_id) + Admins can directly call new/1 and schedule_backup/1 to bypass the limit. + """ + @spec user(User.t()) :: {:ok, t()} | {:error, any()} + def user(user) do + days = Config.get([__MODULE__, :limit_days]) + + with true <- permitted?(user), + %__MODULE__{} = backup <- new(user), + {:ok, inserted_backup} <- Repo.insert(backup), + {:ok, %Oban.Job{}} <- schedule_backup(inserted_backup) do + {:ok, inserted_backup} + else + false -> + {:error, + dngettext( + "errors", + "Last export was less than a day ago", + "Last export was less than %{days} days ago", + days, + days: days + )} + + e -> + {:error, e} end end + @doc "Generates a %Backup{} for a user with a random file name" + @spec new(User.t()) :: t() def new(user) do rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false) datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now()) name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip" %__MODULE__{ - user_id: user.id, content_type: "application/zip", file_name: name, - state: :pending + tempdir: tempdir(), + user: user } end - def delete(backup) do - uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) + @doc "Schedules the execution of the provided backup" + @spec schedule_backup(t()) :: {:ok, Oban.Job.t()} | {:error, any()} + def schedule_backup(backup) do + with false <- is_nil(backup.id) do + %{"op" => "process", "backup_id" => backup.id} + |> BackupWorker.new() + |> Oban.insert() + else + true -> + {:error, "Backup is missing id. Please insert it into the Repo first."} + + e -> + {:error, e} + end + end + + @doc "Deletes the backup archive file and removes the database record" + @spec delete_archive(t()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()} + def delete_archive(backup) do + uploader = Config.get([Pleroma.Upload, :uploader]) with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do Repo.delete(backup) end end - defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok + @doc "Schedules a job to delete the backup archive" + @spec schedule_delete(t()) :: {:ok, Oban.Job.t()} | {:error, any()} + def schedule_delete(backup) do + days = Config.get([__MODULE__, :purge_after_days]) + time = 60 * 60 * 24 * days + scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time) - defp validate_limit(user, nil) do - case get_last(user.id) do - %__MODULE__{inserted_at: inserted_at} -> - days = Pleroma.Config.get([__MODULE__, :limit_days]) - diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days) + %{"op" => "delete", "backup_id" => backup.id} + |> BackupWorker.new(scheduled_at: scheduled_at) + |> Oban.insert() + end - if diff > days do - :ok - else - {:error, - dngettext( - "errors", - "Last export was less than a day ago", - "Last export was less than %{days} days ago", - days, - days: days - )} - end - - nil -> - :ok + defp permitted?(user) do + with {_, %__MODULE__{inserted_at: inserted_at}} <- {:last, get_last(user)}, + days = Config.get([__MODULE__, :limit_days]), + diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days), + {_, true} <- {:diff, diff > days} do + true + else + {:last, nil} -> true + {:diff, false} -> false end end - def get_last(user_id) do + @doc "Returns last backup for the provided user" + @spec get_last(User.t()) :: t() + def get_last(%User{id: user_id}) do __MODULE__ |> where(user_id: ^user_id) |> order_by(desc: :id) @@ -102,6 +143,8 @@ def get_last(user_id) do |> Repo.one() end + @doc "Lists all existing backups for a user" + @spec list(User.t()) :: [Ecto.Schema.t() | term()] def list(%User{id: user_id}) do __MODULE__ |> where(user_id: ^user_id) @@ -109,94 +152,37 @@ def list(%User{id: user_id}) do |> Repo.all() end - def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do - __MODULE__ - |> where(user_id: ^user_id) - |> where([b], b.id != ^latest_id) - |> Repo.all() - |> Enum.each(&BackupWorker.delete/1) - end - - def get(id), do: Repo.get(__MODULE__, id) - - defp set_state(backup, state, processed_number \\ nil) do - struct = - %{state: state} - |> Pleroma.Maps.put_if_present(:processed_number, processed_number) - - backup - |> cast(struct, [:state, :processed_number]) - |> Repo.update() - end - - def process( - %__MODULE__{} = backup, - processor_module \\ __MODULE__.Processor - ) do - set_state(backup, :running, 0) - - current_pid = self() - - task = - Task.Supervisor.async_nolink( - Pleroma.TaskSupervisor, - processor_module, - :do_process, - [backup, current_pid] - ) - - wait_backup(backup, backup.processed_number, task) - end - - defp wait_backup(backup, current_processed, task) do - wait_time = @config_impl.get([__MODULE__, :process_wait_time]) - - receive do - {:progress, new_processed} -> - total_processed = current_processed + new_processed - - set_state(backup, :running, total_processed) - wait_backup(backup, total_processed, task) - - {:DOWN, _ref, _proc, _pid, reason} -> - backup = get(backup.id) - - if reason != :normal do - Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}") - - {:ok, backup} = set_state(backup, :failed) - - cleanup(backup) - - {:error, - %{ - backup: backup, - reason: :exit, - details: reason - }} - else - {:ok, backup} - end - after - wait_time -> - Logger.error( - "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating" - ) - - Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid) - - {:ok, backup} = set_state(backup, :failed) - - cleanup(backup) - - {:error, - %{ - backup: backup, - reason: :timeout - }} + @doc "Schedules deletion of all but the the most recent backup" + @spec remove_outdated(User.t()) :: :ok + def remove_outdated(user) do + with %__MODULE__{} = latest_backup <- get_last(user) do + __MODULE__ + |> where(user_id: ^user.id) + |> where([b], b.id != ^latest_backup.id) + |> Repo.all() + |> Enum.each(&schedule_delete/1) + else + _ -> :ok end end + def get_by_id(id), do: Repo.get(__MODULE__, id) + + @doc "Generates changeset for %Pleroma.User.Backup{}" + @spec changeset(%__MODULE__{}, map()) :: %Ecto.Changeset{} + def changeset(backup \\ %__MODULE__{}, attrs) do + backup + |> cast(attrs, [:content_type, :file_name, :file_size, :processed, :tempdir]) + end + + @doc "Updates the backup record" + @spec update_record(%__MODULE__{}, map()) :: {:ok, %__MODULE__{}} | {:error, %Ecto.Changeset{}} + def update_record(%__MODULE__{} = backup, attrs) do + backup + |> changeset(attrs) + |> Repo.update() + end + @files [ ~c"actor.json", ~c"outbox.json", @@ -207,55 +193,70 @@ defp wait_backup(backup, current_processed, task) do ~c"chats.json", ~c"chat_messages.json" ] - @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error - def export(%__MODULE__{} = backup, caller_pid) do - backup = Repo.preload(backup, :user) - dir = backup_tempdir(backup) - with :ok <- File.mkdir(dir), - :ok <- actor(dir, backup.user, caller_pid), - :ok <- statuses(dir, backup.user, caller_pid), - :ok <- likes(dir, backup.user, caller_pid), - :ok <- bookmarks(dir, backup.user, caller_pid), - :ok <- followers(dir, backup.user, caller_pid), - :ok <- following(dir, backup.user, caller_pid), - :ok <- chats(dir, backup.user, caller_pid), - :ok <- chat_messages(dir, backup.user, caller_pid), - {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir), - {:ok, _} <- File.rm_rf(dir) do - {:ok, zip_path} + @spec run(t()) :: {:ok, t()} | {:error, :failed} + def run(%__MODULE__{} = backup) do + backup = Repo.preload(backup, :user) + tempfile = Path.join([backup.tempdir, backup.file_name]) + + with {_, :ok} <- {:mkdir, File.mkdir_p(backup.tempdir)}, + {_, :ok} <- {:actor, actor(backup.tempdir, backup.user)}, + {_, :ok} <- {:statuses, statuses(backup.tempdir, backup.user)}, + {_, :ok} <- {:likes, likes(backup.tempdir, backup.user)}, + {_, :ok} <- {:bookmarks, bookmarks(backup.tempdir, backup.user)}, + {_, :ok} <- {:followers, followers(backup.tempdir, backup.user)}, + {_, :ok} <- {:following, following(backup.tempdir, backup.user)}, + {_, :ok} <- {:chats, chats(backup.tempdir, backup.user)}, + {_, :ok} <- {:chat_messages, chat_messages(backup.tempdir, backup.user)}, + {_, {:ok, _zip_path}} <- + {:zip, :zip.create(to_charlist(tempfile), @files, cwd: to_charlist(backup.tempdir))}, + {_, {:ok, %File.Stat{size: zip_size}}} <- {:filestat, File.stat(tempfile)}, + {:ok, updated_backup} <- update_record(backup, %{file_size: zip_size}) do + {:ok, updated_backup} else - _ -> :error + _ -> + File.rm_rf(backup.tempdir) + {:error, :failed} end end - def dir(name) do - dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!() - Path.join(dir, name) + defp tempdir do + rand = :crypto.strong_rand_bytes(8) |> Base.url_encode64(padding: false) + subdir = "backup-#{rand}" + + case Config.get([__MODULE__, :tempdir]) do + nil -> + Path.join([System.tmp_dir!(), subdir]) + + path -> + Path.join([path, subdir]) + end end - def upload(%__MODULE__{} = backup, zip_path) do - uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) + @doc "Uploads the completed backup and marks it as processed" + @spec upload(t()) :: {:ok, t()} + def upload(%__MODULE__{tempdir: tempdir} = backup) when is_binary(tempdir) do + uploader = Config.get([Pleroma.Upload, :uploader]) upload = %Pleroma.Upload{ name: backup.file_name, - tempfile: zip_path, + tempfile: Path.join([tempdir, backup.file_name]), content_type: backup.content_type, path: Path.join("backups", backup.file_name) } - with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload), - :ok <- File.rm(zip_path) do - {:ok, upload} + with {:ok, _} <- Uploader.put_file(uploader, upload), + {:ok, uploaded_backup} <- update_record(backup, %{processed: true}), + {:ok, _} <- File.rm_rf(tempdir) do + {:ok, uploaded_backup} end end - defp actor(dir, user, caller_pid) do + defp actor(dir, user) do with {:ok, json} <- UserView.render("user.json", %{user: user}) |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"}) |> Jason.encode() do - send(caller_pid, {:progress, 1}) File.write(Path.join(dir, "actor.json"), json) end end @@ -274,22 +275,10 @@ defp write_header(file, name) do ) end - defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0 - - defp backup_tempdir(backup) do - name = String.trim_trailing(backup.file_name, ".zip") - dir(name) - end - - defp cleanup(backup) do - dir = backup_tempdir(backup) - File.rm_rf(dir) - end - - defp write(query, dir, name, fun, caller_pid) do + defp write(query, dir, name, fun) do path = Path.join(dir, "#{name}.json") - chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size]) + chunk_size = Config.get([__MODULE__, :process_chunk_size]) with {:ok, file} <- File.open(path, [:write, :utf8]), :ok <- write_header(file, name) do @@ -305,10 +294,6 @@ defp write(query, dir, name, fun, caller_pid) do end), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do - if should_report?(acc + 1, chunk_size) do - send(caller_pid, {:progress, chunk_size}) - end - acc + 1 else {:error, e} -> @@ -323,31 +308,29 @@ defp write(query, dir, name, fun, caller_pid) do end end) - send(caller_pid, {:progress, rem(total, chunk_size)}) - with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do File.close(file) end end end - defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do + defp bookmarks(dir, %{id: user_id} = _user) do Bookmark |> where(user_id: ^user_id) |> join(:inner, [b], activity in assoc(b, :activity)) |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)}) - |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid) + |> write(dir, "bookmarks", fn a -> {:ok, a.object} end) end - defp likes(dir, user, caller_pid) do + defp likes(dir, user) do user.ap_id |> Activity.Queries.by_actor() |> Activity.Queries.by_type("Like") |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)}) - |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid) + |> write(dir, "likes", fn a -> {:ok, a.object} end) end - defp statuses(dir, user, caller_pid) do + defp statuses(dir, user) do opts = %{} |> Map.put(:type, ["Create", "Announce"]) @@ -367,22 +350,21 @@ defp statuses(dir, user, caller_pid) do with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do {:ok, Map.delete(activity, "@context")} end - end, - caller_pid + end ) end - defp followers(dir, user, caller_pid) do + defp followers(dir, user) do User.get_followers_query(user) - |> write(dir, "followers", fn a -> {:ok, a.ap_id} end, caller_pid) + |> write(dir, "followers", fn a -> {:ok, a.ap_id} end) end - defp following(dir, user, caller_pid) do + defp following(dir, user) do User.get_friends_query(user) - |> write(dir, "following", fn a -> {:ok, a.ap_id} end, caller_pid) + |> write(dir, "following", fn a -> {:ok, a.ap_id} end) end - defp chats(dir, user, caller_pid) do + defp chats(dir, user) do Chat.for_user_query(user.id) |> write( dir, @@ -397,12 +379,11 @@ defp chats(dir, user, caller_pid) do "published" => chat.inserted_at |> DateTime.from_naive!("Etc/UTC") |> DateTime.to_iso8601() }} - end, - caller_pid + end ) end - def chat_messages(dir, %{id: user_id}, caller_pid) do + def chat_messages(dir, %{id: user_id}) do chats_subquery = from(c in Chat, where: c.user_id == ^user_id, @@ -427,42 +408,7 @@ def chat_messages(dir, %{id: user_id}, caller_pid) do )} do {:ok, Map.delete(activity, "@context")} end - end, - caller_pid + end ) end end - -defmodule Pleroma.User.Backup.ProcessorAPI do - @callback do_process(%Pleroma.User.Backup{}, pid()) :: - {:ok, %Pleroma.User.Backup{}} | {:error, any()} -end - -defmodule Pleroma.User.Backup.Processor do - @behaviour Pleroma.User.Backup.ProcessorAPI - - alias Pleroma.Repo - alias Pleroma.User.Backup - - import Ecto.Changeset - - @impl true - def do_process(backup, current_pid) do - with {:ok, zip_file} <- Backup.export(backup, current_pid), - {:ok, %{size: size}} <- File.stat(zip_file), - {:ok, _upload} <- Backup.upload(backup, zip_file) do - backup - |> cast( - %{ - file_size: size, - processed: true, - state: :complete - }, - [:file_size, :processed, :state] - ) - |> Repo.update() - else - e -> {:error, e} - end - end -end diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 7516f20fc7..1ee4647349 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -303,8 +303,15 @@ def inbox(%{assigns: %{valid_signature: true}} = conn, params) do json(conn, "ok") end - def inbox(%{assigns: %{valid_signature: false}, req_headers: req_headers} = conn, params) do - Federator.incoming_ap_doc(%{req_headers: req_headers, params: params}) + def inbox(%{assigns: %{valid_signature: false}} = conn, params) do + Federator.incoming_ap_doc(%{ + method: conn.method, + req_headers: conn.req_headers, + request_path: conn.request_path, + params: params, + query_string: conn.query_string + }) + json(conn, "ok") end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index c8bdf2250b..e040753dc6 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -80,13 +80,26 @@ def representable?(%Activity{} = activity) do parameters set: * `inbox`: the inbox to publish to - * `json`: the JSON message body representing the ActivityPub message - * `actor`: the actor which is signing the message - * `id`: the ActivityStreams URI of the message + * `activity_id`: the internal activity id + * `cc`: the cc recipients relevant to this inbox (optional) """ - def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do - Logger.debug("Federating #{id} to #{inbox}") + def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do + activity = Activity.get_by_id_with_user_actor(activity_id) + actor = activity.user_actor + + ap_id = activity.data["id"] + Logger.debug("Federating #{ap_id} to #{inbox}") uri = %{path: path} = URI.parse(inbox) + + {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) + + cc = Map.get(params, :cc) + + json = + data + |> Map.put("cc", cc) + |> Jason.encode!() + digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64()) date = Pleroma.Signature.signed_date() @@ -119,7 +132,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa else {_post_result, %{status: code} = response} = e -> unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - Logger.metadata(activity: id, inbox: inbox, status: code) + Logger.metadata(activity: activity_id, inbox: inbox, status: code) Logger.error("Publisher failed to inbox #{inbox} with status #{code}") case response do @@ -136,21 +149,12 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa e -> unless params[:unreachable_since], do: Instances.set_unreachable(inbox) - Logger.metadata(activity: id, inbox: inbox) + Logger.metadata(activity: activity_id, inbox: inbox) Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}") {:error, e} end end - def publish_one(%{actor_id: actor_id} = params) do - actor = User.get_cached_by_id(actor_id) - - params - |> Map.delete(:actor_id) - |> Map.put(:actor, actor) - |> publish_one() - end - defp signature_host(%URI{port: port, scheme: scheme, host: host}) do if port == URI.default_port(scheme) do host @@ -251,7 +255,6 @@ def determine_inbox( def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do public = public?(activity) - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) [priority_recipients, recipients] = recipients(actor, activity) @@ -276,16 +279,10 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) # instance would only accept a first message for the first recipient and ignore the rest. cc = get_cc_ap_ids(ap_id, recipients) - json = - data - |> Map.put("cc", cc) - |> Jason.encode!() - __MODULE__.enqueue_one(%{ inbox: inbox, - json: json, - actor_id: actor.id, - id: activity.data["id"], + cc: cc, + activity_id: activity.id, unreachable_since: unreachable_since }) end) @@ -302,9 +299,6 @@ def publish(%User{} = actor, %Activity{} = activity) do Relay.publish(activity) end - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - json = Jason.encode!(data) - [priority_inboxes, inboxes] = recipients(actor, activity) |> Enum.map(fn recipients -> @@ -326,9 +320,7 @@ def publish(%User{} = actor, %Activity{} = activity) do __MODULE__.enqueue_one( %{ inbox: inbox, - json: json, - actor_id: actor.id, - id: activity.data["id"], + activity_id: activity.id, unreachable_since: unreachable_since }, priority: priority diff --git a/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex b/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex index 1894000ff4..0f22dd5389 100644 --- a/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex +++ b/lib/pleroma/web/admin_api/controllers/admin_api_controller.ex @@ -13,6 +13,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do alias Pleroma.ModerationLog alias Pleroma.Stats alias Pleroma.User + alias Pleroma.User.Backup alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.AdminAPI alias Pleroma.Web.AdminAPI.AccountView @@ -429,7 +430,9 @@ def stats(conn, params) do def create_backup(%{assigns: %{user: admin}} = conn, %{"nickname" => nickname}) do with %User{} = user <- User.get_by_nickname(nickname), - {:ok, _} <- Pleroma.User.Backup.create(user, admin.id) do + %Backup{} = backup <- Backup.new(user), + {:ok, inserted_backup} <- Pleroma.Repo.insert(backup), + {:ok, %Oban.Job{}} <- Backup.schedule_backup(inserted_backup) do ModerationLog.insert_log(%{actor: admin, subject: user, action: "create_backup"}) json(conn, "") diff --git a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex index 400f3825d8..86f7095159 100644 --- a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex +++ b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex @@ -65,12 +65,7 @@ defp backup do file_name: %Schema{type: :string}, file_size: %Schema{type: :integer}, processed: %Schema{type: :boolean, description: "whether this backup has succeeded"}, - state: %Schema{ - type: :string, - description: "the state of the backup", - enum: ["pending", "running", "complete", "failed"] - }, - processed_number: %Schema{type: :integer, description: "the number of records processed"} + tempdir: %Schema{type: :string} }, example: %{ "content_type" => "application/zip", @@ -79,8 +74,7 @@ defp backup do "file_size" => 4105, "inserted_at" => "2020-09-08T16:42:07.000Z", "processed" => true, - "state" => "complete", - "processed_number" => 20 + "tempdir" => "/tmp/PZIMw40vmpM" } } end diff --git a/lib/pleroma/web/common_api.ex b/lib/pleroma/web/common_api.ex index 1d035afb29..5f88b04532 100644 --- a/lib/pleroma/web/common_api.ex +++ b/lib/pleroma/web/common_api.ex @@ -850,11 +850,11 @@ defp make_update_event_data(user, orig_object, changes, location) do end end - defp maybe_cancel_jobs(%Activity{data: %{"id" => ap_id}}) do + defp maybe_cancel_jobs(%Activity{id: activity_id}) do Oban.Job |> where([j], j.worker == "Pleroma.Workers.PublisherWorker") |> where([j], j.args["op"] == "publish_one") - |> where([j], j.args["params"]["id"] == ^ap_id) + |> where([j], j.args["params"]["activity_id"] == ^activity_id) |> Oban.cancel_all_jobs() end diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 4b30fd21d2..3d3101d61e 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -35,10 +35,12 @@ def allowed_thread_distance?(distance) do end # Client API - def incoming_ap_doc(%{params: params, req_headers: req_headers}) do + def incoming_ap_doc(%{params: _params, req_headers: _req_headers} = args) do + job_args = Enum.into(args, %{}, fn {k, v} -> {Atom.to_string(k), v} end) + ReceiverWorker.enqueue( "incoming_ap_doc", - %{"req_headers" => req_headers, "params" => params, "timeout" => :timer.seconds(20)}, + Map.put(job_args, "timeout", :timer.seconds(20)), priority: 2 ) end diff --git a/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex b/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex index b9daed22bc..0115ec6450 100644 --- a/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex @@ -20,7 +20,7 @@ def index(%{assigns: %{user: user}} = conn, _params) do end def create(%{assigns: %{user: user}} = conn, _params) do - with {:ok, _} <- Backup.create(user) do + with {:ok, _} <- Backup.user(user) do backups = Backup.list(user) render(conn, "index.json", backups: backups) end diff --git a/lib/pleroma/web/pleroma_api/views/backup_view.ex b/lib/pleroma/web/pleroma_api/views/backup_view.ex index 20403aeee4..d778590f02 100644 --- a/lib/pleroma/web/pleroma_api/views/backup_view.ex +++ b/lib/pleroma/web/pleroma_api/views/backup_view.ex @@ -9,22 +9,12 @@ defmodule Pleroma.Web.PleromaAPI.BackupView do alias Pleroma.Web.CommonAPI.Utils def render("show.json", %{backup: %Backup{} = backup}) do - # To deal with records before the migration - state = - if backup.state == :invalid do - if backup.processed, do: :complete, else: :failed - else - backup.state - end - %{ id: backup.id, content_type: backup.content_type, url: download_url(backup), file_size: backup.file_size, processed: backup.processed, - state: to_string(state), - processed_number: backup.processed_number, inserted_at: Utils.to_masto_date(backup.inserted_at) } end diff --git a/lib/pleroma/web/plugs/http_signature_plug.ex b/lib/pleroma/web/plugs/http_signature_plug.ex index 6bf2dd432a..67974599a7 100644 --- a/lib/pleroma/web/plugs/http_signature_plug.ex +++ b/lib/pleroma/web/plugs/http_signature_plug.ex @@ -8,16 +8,12 @@ defmodule Pleroma.Web.Plugs.HTTPSignaturePlug do import Plug.Conn import Phoenix.Controller, only: [get_format: 1, text: 2] + alias Pleroma.Signature alias Pleroma.Web.ActivityPub.MRF require Logger @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config) - @http_signatures_impl Application.compile_env( - :pleroma, - [__MODULE__, :http_signatures_impl], - HTTPSignatures - ) def init(options) do options @@ -39,48 +35,6 @@ def call(conn, _opts) do end end - defp validate_signature(conn, request_target) do - # Newer drafts for HTTP signatures now use @request-target instead of the - # old (request-target). We'll now support both for incoming signatures. - conn = - conn - |> put_req_header("(request-target)", request_target) - |> put_req_header("@request-target", request_target) - - @http_signatures_impl.validate_conn(conn) - end - - defp validate_signature(conn) do - # This (request-target) is non-standard, but many implementations do it - # this way due to a misinterpretation of - # https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06 - # "path" was interpreted as not having the query, though later examples - # show that it must be the absolute path + query. This behavior is kept to - # make sure most software (Pleroma itself, Mastodon, and probably others) - # do not break. - request_target = String.downcase("#{conn.method}") <> " #{conn.request_path}" - - # This is the proper way to build the @request-target, as expected by - # many HTTP signature libraries, clarified in the following draft: - # https://www.ietf.org/archive/id/draft-ietf-httpbis-message-signatures-11.html#section-2.2.6 - # It is the same as before, but containing the query part as well. - proper_target = request_target <> "?#{conn.query_string}" - - cond do - # Normal, non-standard behavior but expected by Pleroma and more. - validate_signature(conn, request_target) -> - true - - # Has query string and the previous one failed: let's try the standard. - conn.query_string != "" -> - validate_signature(conn, proper_target) - - # If there's no query string and signature fails, it's rotten. - true -> - false - end - end - defp maybe_assign_valid_signature(conn) do if has_signature_header?(conn) do # we replace the digest header with the one we computed in DigestPlug @@ -90,7 +44,7 @@ defp maybe_assign_valid_signature(conn) do conn -> conn end - assign(conn, :valid_signature, validate_signature(conn)) + assign(conn, :valid_signature, Signature.validate_signature(conn)) else Logger.debug("No signature header!") conn diff --git a/lib/pleroma/web/plugs/o_auth_plug.ex b/lib/pleroma/web/plugs/o_auth_plug.ex index b59ac9d3ea..488968691d 100644 --- a/lib/pleroma/web/plugs/o_auth_plug.ex +++ b/lib/pleroma/web/plugs/o_auth_plug.ex @@ -52,7 +52,7 @@ defp fetch_user_and_token(token) do where: t.token == ^token ) - with %Token{user_id: user_id} = token_record <- Repo.one(token_query), + with %Token{user_id: user_id} = token_record <- Repo.one(token_query) |> Repo.preload(:user), false <- is_nil(user_id), %User{} = user <- User.get_cached_by_id(user_id) do {:ok, user, token_record} diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index 54ac31a3c6..d1b6fcdadf 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -6,64 +6,46 @@ defmodule Pleroma.Workers.BackupWorker do use Oban.Worker, queue: :slow, max_attempts: 1 alias Oban.Job + alias Pleroma.Config.Getting, as: Config alias Pleroma.User.Backup - def process(backup, admin_user_id \\ nil) do - %{"op" => "process", "backup_id" => backup.id, "admin_user_id" => admin_user_id} - |> new() - |> Oban.insert() - end - - def schedule_deletion(backup) do - days = Pleroma.Config.get([Backup, :purge_after_days]) - time = 60 * 60 * 24 * days - scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time) - - %{"op" => "delete", "backup_id" => backup.id} - |> new(scheduled_at: scheduled_at) - |> Oban.insert() - end - - def delete(backup) do - %{"op" => "delete", "backup_id" => backup.id} - |> new() - |> Oban.insert() - end - @impl Oban.Worker def perform(%Job{ - args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id} + args: %{"op" => "process", "backup_id" => backup_id} }) do - with {:ok, %Backup{} = backup} <- - backup_id |> Backup.get() |> Backup.process(), - {:ok, _job} <- schedule_deletion(backup), - :ok <- Backup.remove_outdated(backup), - :ok <- maybe_deliver_email(backup, admin_user_id) do - {:ok, backup} + with {_, %Backup{} = backup} <- {:get, Backup.get_by_id(backup_id)}, + {_, {:ok, updated_backup}} <- {:run, Backup.run(backup)}, + {_, {:ok, uploaded_backup}} <- {:upload, Backup.upload(updated_backup)}, + {_, {:ok, _job}} <- {:delete, Backup.schedule_delete(uploaded_backup)}, + {_, :ok} <- {:outdated, Backup.remove_outdated(uploaded_backup.user)}, + {_, :ok} <- {:email, maybe_deliver_email(uploaded_backup)} do + {:ok, uploaded_backup} + else + e -> {:error, e} end end def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do - case Backup.get(backup_id) do - %Backup{} = backup -> Backup.delete(backup) + case Backup.get_by_id(backup_id) do + %Backup{} = backup -> Backup.delete_archive(backup) nil -> :ok end end @impl Oban.Worker - def timeout(_job), do: :infinity + def timeout(_job), do: Config.get([Backup, :timeout], :timer.minutes(30)) defp has_email?(user) do not is_nil(user.email) and user.email != "" end - defp maybe_deliver_email(backup, admin_user_id) do + defp maybe_deliver_email(backup) do has_mailer = Pleroma.Config.get([Pleroma.Emails.Mailer, :enabled]) backup = backup |> Pleroma.Repo.preload(:user) if has_email?(backup.user) and has_mailer do backup - |> Pleroma.Emails.UserEmail.backup_is_ready_email(admin_user_id) + |> Pleroma.Emails.UserEmail.backup_is_ready_email() |> Pleroma.Emails.Mailer.deliver() :ok diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index d01813c258..fd5c13fca2 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -12,17 +12,30 @@ defmodule Pleroma.Workers.ReceiverWorker do @impl Oban.Worker def perform(%Job{ - args: %{"op" => "incoming_ap_doc", "req_headers" => req_headers, "params" => params} + args: %{ + "op" => "incoming_ap_doc", + "method" => method, + "params" => params, + "req_headers" => req_headers, + "request_path" => request_path, + "query_string" => query_string + } }) do # Oban's serialization converts our tuple headers to lists. # Revert it for the signature validation. req_headers = Enum.into(req_headers, [], &List.to_tuple(&1)) - conn_data = %{params: params, req_headers: req_headers} + conn_data = %Plug.Conn{ + method: method, + params: params, + req_headers: req_headers, + request_path: request_path, + query_string: query_string + } with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]), {:ok, _public_key} <- Signature.refetch_public_key(conn_data), - {:signature, true} <- {:signature, HTTPSignatures.validate_conn(conn_data)}, + {:signature, true} <- {:signature, Signature.validate_signature(conn_data)}, {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do {:ok, res} else diff --git a/priv/repo/migrations/20240622175346_backup_refactor.exs b/priv/repo/migrations/20240622175346_backup_refactor.exs new file mode 100644 index 0000000000..5dfc55789d --- /dev/null +++ b/priv/repo/migrations/20240622175346_backup_refactor.exs @@ -0,0 +1,19 @@ +defmodule Pleroma.Repo.Migrations.BackupRefactor do + use Ecto.Migration + + def up do + alter table("backups") do + remove(:state) + remove(:processed_number) + add(:tempdir, :string) + end + end + + def down do + alter table("backups") do + add(:state, :integer, default: 5) + add(:processed_number, :integer, default: 0) + remove(:tempdir) + end + end +end diff --git a/priv/repo/migrations/20240729163838_publisher_job_change.exs b/priv/repo/migrations/20240729163838_publisher_job_change.exs new file mode 100644 index 0000000000..cbea18205e --- /dev/null +++ b/priv/repo/migrations/20240729163838_publisher_job_change.exs @@ -0,0 +1,32 @@ +defmodule Pleroma.Repo.Migrations.PublisherJobChange do + use Ecto.Migration + + alias Pleroma.Activity + alias Pleroma.Repo + import Ecto.Query + + def up do + query = + from(j in Oban.Job, + where: j.worker == "Pleroma.Workers.PublisherWorker", + where: j.state in ["available", "retryable"] + ) + + jobs = Repo.all(query) + + Enum.each(jobs, fn job -> + args = job.args + + case Activity.get_by_ap_id(args["id"]) do + nil -> + :ok + + %Activity{id: activity_id} -> + updated_args = Map.put(args, "activity_id", activity_id) + + Pleroma.Workers.PublisherWorker.new(updated_args) + |> Oban.insert() + end + end) + end +end diff --git a/test/fixtures/bastianallgeier.json b/test/fixtures/bastianallgeier.json new file mode 100644 index 0000000000..6b47e7db9b --- /dev/null +++ b/test/fixtures/bastianallgeier.json @@ -0,0 +1,117 @@ +{ + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1", + { + "Curve25519Key": "toot:Curve25519Key", + "Device": "toot:Device", + "Ed25519Key": "toot:Ed25519Key", + "Ed25519Signature": "toot:Ed25519Signature", + "EncryptedMessage": "toot:EncryptedMessage", + "PropertyValue": "schema:PropertyValue", + "alsoKnownAs": { + "@id": "as:alsoKnownAs", + "@type": "@id" + }, + "cipherText": "toot:cipherText", + "claim": { + "@id": "toot:claim", + "@type": "@id" + }, + "deviceId": "toot:deviceId", + "devices": { + "@id": "toot:devices", + "@type": "@id" + }, + "discoverable": "toot:discoverable", + "featured": { + "@id": "toot:featured", + "@type": "@id" + }, + "featuredTags": { + "@id": "toot:featuredTags", + "@type": "@id" + }, + "fingerprintKey": { + "@id": "toot:fingerprintKey", + "@type": "@id" + }, + "focalPoint": { + "@container": "@list", + "@id": "toot:focalPoint" + }, + "identityKey": { + "@id": "toot:identityKey", + "@type": "@id" + }, + "indexable": "toot:indexable", + "manuallyApprovesFollowers": "as:manuallyApprovesFollowers", + "memorial": "toot:memorial", + "messageFranking": "toot:messageFranking", + "messageType": "toot:messageType", + "movedTo": { + "@id": "as:movedTo", + "@type": "@id" + }, + "publicKeyBase64": "toot:publicKeyBase64", + "schema": "http://schema.org#", + "suspended": "toot:suspended", + "toot": "http://joinmastodon.org/ns#", + "value": "schema:value" + } + ], + "attachment": [ + { + "name": "Website", + "type": "PropertyValue", + "value": "https://bastianallgeier.com" + }, + { + "name": "Project", + "type": "PropertyValue", + "value": "https://getkirby.com" + }, + { + "name": "Github", + "type": "PropertyValue", + "value": "https://github.com/bastianallgeier" + } + ], + "devices": "https://mastodon.social/users/bastianallgeier/collections/devices", + "discoverable": true, + "endpoints": { + "sharedInbox": "https://mastodon.social/inbox" + }, + "featured": "https://mastodon.social/users/bastianallgeier/collections/featured", + "featuredTags": "https://mastodon.social/users/bastianallgeier/collections/tags", + "followers": "https://mastodon.social/users/bastianallgeier/followers", + "following": "https://mastodon.social/users/bastianallgeier/following", + "icon": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://files.mastodon.social/accounts/avatars/000/007/393/original/0180a20079617c71.jpg" + }, + "id": "https://mastodon.social/users/bastianallgeier", + "image": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://files.mastodon.social/accounts/headers/000/007/393/original/13d644ab46d50478.jpeg" + }, + "inbox": "https://mastodon.social/users/bastianallgeier/inbox", + "indexable": false, + "manuallyApprovesFollowers": false, + "memorial": false, + "name": "Bastian Allgeier", + "outbox": "https://mastodon.social/users/bastianallgeier/outbox", + "preferredUsername": "bastianallgeier", + "publicKey": { + "id": "https://mastodon.social/users/bastianallgeier#main-key", + "owner": "https://mastodon.social/users/bastianallgeier", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3fz+hpgVztO9z6HUhyzv\nwP++ERBBoIwSLKf1TyIM8bvzGFm2YXaO5uxu1HvumYFTYc3ACr3q4j8VUb7NMxkQ\nlzu4QwPjOFJ43O+fY+HSPORXEDW5fXDGC5DGpox4+i08LxRmx7L6YPRUSUuPN8nI\nWyq1Qsq1zOQrNY/rohMXkBdSXxqC3yIRqvtLt4otCgay/5tMogJWkkS6ZKyFhb9z\nwVVy1fsbV10c9C+SHy4NH26CKaTtpTYLRBMjhTCS8bX8iDSjGIf2aZgYs1ir7gEz\n9wf5CvLiENmVWGwm64t6KSEAkA4NJ1hzgHUZPCjPHZE2SmhO/oHaxokTzqtbbENJ\n1QIDAQAB\n-----END PUBLIC KEY-----\n" + }, + "published": "2016-11-01T00:00:00Z", + "summary": "Designer & developer. Creator of Kirby CMS
", + "tag": [], + "type": "Person", + "url": "https://mastodon.social/@bastianallgeier" +} diff --git a/test/fixtures/denniskoch.json b/test/fixtures/denniskoch.json new file mode 100644 index 0000000000..7aa4de508e --- /dev/null +++ b/test/fixtures/denniskoch.json @@ -0,0 +1,112 @@ +{ + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1", + { + "Curve25519Key": "toot:Curve25519Key", + "Device": "toot:Device", + "Ed25519Key": "toot:Ed25519Key", + "Ed25519Signature": "toot:Ed25519Signature", + "EncryptedMessage": "toot:EncryptedMessage", + "PropertyValue": "schema:PropertyValue", + "alsoKnownAs": { + "@id": "as:alsoKnownAs", + "@type": "@id" + }, + "cipherText": "toot:cipherText", + "claim": { + "@id": "toot:claim", + "@type": "@id" + }, + "deviceId": "toot:deviceId", + "devices": { + "@id": "toot:devices", + "@type": "@id" + }, + "discoverable": "toot:discoverable", + "featured": { + "@id": "toot:featured", + "@type": "@id" + }, + "featuredTags": { + "@id": "toot:featuredTags", + "@type": "@id" + }, + "fingerprintKey": { + "@id": "toot:fingerprintKey", + "@type": "@id" + }, + "focalPoint": { + "@container": "@list", + "@id": "toot:focalPoint" + }, + "identityKey": { + "@id": "toot:identityKey", + "@type": "@id" + }, + "indexable": "toot:indexable", + "manuallyApprovesFollowers": "as:manuallyApprovesFollowers", + "memorial": "toot:memorial", + "messageFranking": "toot:messageFranking", + "messageType": "toot:messageType", + "movedTo": { + "@id": "as:movedTo", + "@type": "@id" + }, + "publicKeyBase64": "toot:publicKeyBase64", + "schema": "http://schema.org#", + "suspended": "toot:suspended", + "toot": "http://joinmastodon.org/ns#", + "value": "schema:value" + } + ], + "attachment": [ + { + "name": "GitHub", + "type": "PropertyValue", + "value": "https://github.com/pxlrbt/" + }, + { + "name": "Discord", + "type": "PropertyValue", + "value": "pxlrbt#6029" + } + ], + "devices": "https://phpc.social/users/denniskoch/collections/devices", + "discoverable": true, + "endpoints": { + "sharedInbox": "https://phpc.social/inbox" + }, + "featured": "https://phpc.social/users/denniskoch/collections/featured", + "featuredTags": "https://phpc.social/users/denniskoch/collections/tags", + "followers": "https://phpc.social/users/denniskoch/followers", + "following": "https://phpc.social/users/denniskoch/following", + "icon": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://media.phpc.social/accounts/avatars/109/364/097/179/042/485/original/6e770c7b3f5ef72d.jpg" + }, + "id": "https://phpc.social/users/denniskoch", + "image": { + "mediaType": "image/jpeg", + "type": "Image", + "url": "https://media.phpc.social/accounts/headers/109/364/097/179/042/485/original/709da24705260c04.jpg" + }, + "inbox": "https://phpc.social/users/denniskoch/inbox", + "indexable": true, + "manuallyApprovesFollowers": false, + "memorial": false, + "name": "Dennis Koch", + "outbox": "https://phpc.social/users/denniskoch/outbox", + "preferredUsername": "denniskoch", + "publicKey": { + "id": "https://phpc.social/users/denniskoch#main-key", + "owner": "https://phpc.social/users/denniskoch", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4dmcSlqLj18gPvuslkmt\nQTniZ8ybO4pgvMvPLYtBuTBUjo49vJ/8Sw6jB5zcKb1haqIdny7Rv/vY3kCdCXcP\nloh1I+jthEgqLT8JpZWGwLGwg9piFhrMGADmt3N8du7HfglzuZ8LlVpnZ8feCw7I\nS2ua/ZCxE47mI45Z3ed2kkFYKWopWWqFn2lan/1OyHrcFKtCvaVjRdvo0UUt2tgl\nvyJI4+zN8FnrCbsMtcbI5nSzfJIrOc4LeaGmLJh+0o2rwoOQZc2487XWbeyfhjsq\nPRBpYN7pfHWQDvzQIN075LHTf9zDFsm6+HqY7Zs5rYxr72rvcX7d9JcP6CasIosY\nqwIDAQAB\n-----END PUBLIC KEY-----\n" + }, + "published": "2022-11-18T00:00:00Z", + "summary": "🧑💻 Full Stack Developer
🚀 Laravel, Filament, Livewire, Vue, Inertia
🌍 Germany
Favorite piece of anthropology meta discourse.
", + "contentMap": { + "en": "Favorite piece of anthropology meta discourse.
" + }, + "conversation": "tag:chaos.social,2022-11-13:objectId=71843781:objectType=Conversation", + "id": "https://chaos.social/users/distantnative/statuses/109336635639931467", + "inReplyTo": null, + "inReplyToAtomUri": null, + "published": "2022-11-13T13:04:20Z", + "replies": { + "first": { + "items": [], + "next": "https://chaos.social/users/distantnative/statuses/109336635639931467/replies?only_other_accounts=true&page=true", + "partOf": "https://chaos.social/users/distantnative/statuses/109336635639931467/replies", + "type": "CollectionPage" + }, + "id": "https://chaos.social/users/distantnative/statuses/109336635639931467/replies", + "type": "Collection" + }, + "sensitive": false, + "summary": null, + "tag": [], + "to": [ + "https://www.w3.org/ns/activitystreams#Public" + ], + "type": "Note", + "url": "https://chaos.social/@distantnative/109336635639931467" +} diff --git a/test/pleroma/repo/migrations/publisher_migration_change_test.exs b/test/pleroma/repo/migrations/publisher_migration_change_test.exs new file mode 100644 index 0000000000..9c035e604e --- /dev/null +++ b/test/pleroma/repo/migrations/publisher_migration_change_test.exs @@ -0,0 +1,43 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors@bastianallgeier @distantnative @kev Another main argument: Discord is popular. Many people have an account, so you can just join an server quickly. Also you know the app and how to get around.
", + "contentMap" => %{ + "en" => + "@bastianallgeier @distantnative @kev Another main argument: Discord is popular. Many people have an account, so you can just join an server quickly. Also you know the app and how to get around.
" + }, + "conversation" => + "tag:mastodon.social,2024-07-25:objectId=760068442:objectType=Conversation", + "id" => "https://phpc.social/users/denniskoch/statuses/112847382711461301", + "inReplyTo" => + "https://mastodon.social/users/bastianallgeier/statuses/112846516276907281", + "inReplyToAtomUri" => + "https://mastodon.social/users/bastianallgeier/statuses/112846516276907281", + "published" => "2024-07-25T13:33:29Z", + "replies" => %{ + "first" => %{ + "items" => [], + "next" => + "https://phpc.social/users/denniskoch/statuses/112847382711461301/replies?only_other_accounts=true&page=true", + "partOf" => + "https://phpc.social/users/denniskoch/statuses/112847382711461301/replies", + "type" => "CollectionPage" + }, + "id" => "https://phpc.social/users/denniskoch/statuses/112847382711461301/replies", + "type" => "Collection" + }, + "sensitive" => false, + "tag" => [ + %{ + "href" => "https://mastodon.social/users/bastianallgeier", + "name" => "@bastianallgeier@mastodon.social", + "type" => "Mention" + }, + %{ + "href" => "https://chaos.social/users/distantnative", + "name" => "@distantnative@chaos.social", + "type" => "Mention" + }, + %{ + "href" => "https://fosstodon.org/users/kev", + "name" => "@kev@fosstodon.org", + "type" => "Mention" + } + ], + "to" => ["https://www.w3.org/ns/activitystreams#Public"], + "type" => "Note", + "url" => "https://phpc.social/@denniskoch/112847382711461301" + }, + "published" => "2024-07-25T13:33:29Z", + "signature" => %{ + "created" => "2024-07-25T13:33:29Z", + "creator" => "https://phpc.social/users/denniskoch#main-key", + "signatureValue" => + "slz9BKJzd2n1S44wdXGOU+bV/wsskdgAaUpwxj8R16mYOL8+DTpE6VnfSKoZGsBBJT8uG5gnVfVEz1YsTUYtymeUgLMh7cvd8VnJnZPS+oixbmBRVky/Myf91TEgQQE7G4vDmTdB4ii54hZrHcOOYYf5FKPNRSkMXboKA6LMqNtekhbI+JTUJYIB02WBBK6PUyo15f6B1RJ6HGWVgud9NE0y1EZXfrkqUt682p8/9D49ORf7AwjXUJibKic2RbPvhEBj70qUGfBm4vvgdWhSUn1IG46xh+U0+NrTSUED82j1ZVOeua/2k/igkGs8cSBkY35quXTkPz6gbqCCH66CuA==", + "type" => "RsaSignature2017" + }, + "to" => ["https://www.w3.org/ns/activitystreams#Public"], + "type" => "Create" + } + + req_headers = [ + ["accept-encoding", "gzip"], + ["content-length", "5184"], + ["content-type", "application/activity+json"], + ["date", "Thu, 25 Jul 2024 13:33:31 GMT"], + ["digest", "SHA-256=ouge/6HP2/QryG6F3JNtZ6vzs/hSwMk67xdxe87eH7A="], + ["host", "bikeshed.party"], + [ + "signature", + "keyId=\"https://mastodon.social/users/bastianallgeier#main-key\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest content-type\",signature=\"ymE3vn5Iw50N6ukSp8oIuXJB5SBjGAGjBasdTDvn+ahZIzq2SIJfmVCsIIzyqIROnhWyQoTbavTclVojEqdaeOx+Ejz2wBnRBmhz5oemJLk4RnnCH0lwMWyzeY98YAvxi9Rq57Gojuv/1lBqyGa+rDzynyJpAMyFk17XIZpjMKuTNMCbjMDy76ILHqArykAIL/v1zxkgwxY/+ELzxqMpNqtZ+kQ29znNMUBB3eVZ/mNAHAz6o33Y9VKxM2jw+08vtuIZOusXyiHbRiaj2g5HtN2WBUw1MzzfRfHF2/yy7rcipobeoyk5RvP5SyHV3WrIeZ3iyoNfmv33y8fxllF0EA==\"" + ], + [ + "user-agent", + "http.rb/5.2.0 (Mastodon/4.3.0-nightly.2024-07-25; +https://mastodon.social/)" + ] + ] + + {:ok, oban_job} = + Federator.incoming_ap_doc(%{ + method: "POST", + req_headers: req_headers, + request_path: "/inbox", + params: params, + query_string: "" + }) + + assert {:ok, %Pleroma.Activity{}} = ReceiverWorker.perform(oban_job) + end end diff --git a/test/support/mocks.ex b/test/support/mocks.ex index 63cbc49ab6..d84958e154 100644 --- a/test/support/mocks.ex +++ b/test/support/mocks.ex @@ -32,6 +32,4 @@ Mox.defmock(Pleroma.LoggerMock, for: Pleroma.Logging) -Mox.defmock(Pleroma.User.Backup.ProcessorMock, for: Pleroma.User.Backup.ProcessorAPI) - Mox.defmock(Pleroma.Uploaders.S3.ExAwsMock, for: Pleroma.Uploaders.S3.ExAwsAPI)