diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex index a4890b489a..b346b39d6e 100644 --- a/lib/pleroma/ecto_enums.ex +++ b/lib/pleroma/ecto_enums.ex @@ -27,3 +27,11 @@ failed: 4, manual: 5 ) + +defenum(Pleroma.User.Backup.State, + pending: 1, + running: 2, + complete: 3, + failed: 4, + invalid: 5 +) diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 9df0106057..74001f9c3a 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -15,6 +15,7 @@ defmodule Pleroma.User.Backup do alias Pleroma.Bookmark alias Pleroma.Repo alias Pleroma.User + alias Pleroma.User.Backup.State alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.UserView @@ -25,12 +26,16 @@ defmodule Pleroma.User.Backup do field(:file_name, :string) field(:file_size, :integer, default: 0) field(:processed, :boolean, default: false) + field(:state, State, default: :invalid) + field(:processed_number, :integer, default: 0) belongs_to(:user, User, type: FlakeId.Ecto.CompatType) timestamps() end + @report_every 100 + def create(user, admin_id \\ nil) do with :ok <- validate_limit(user, admin_id), {:ok, backup} <- user |> new() |> Repo.insert() do @@ -46,7 +51,8 @@ def new(user) do %__MODULE__{ user_id: user.id, content_type: "application/zip", - file_name: name + file_name: name, + state: :pending } end @@ -109,27 +115,75 @@ def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do def get(id), do: Repo.get(__MODULE__, id) + defp set_state(backup, state, processed_number \\ nil) do + struct = + %{state: state} + |> Pleroma.Maps.put_if_present(:processed_number, processed_number) + + backup + |> cast(struct, [:state, :processed_number]) + |> Repo.update() + end + def process(%__MODULE__{} = backup) do - with {:ok, zip_file} <- export(backup), - {:ok, %{size: size}} <- File.stat(zip_file), - {:ok, _upload} <- upload(backup, zip_file) do - backup - |> cast(%{file_size: size, processed: true}, [:file_size, :processed]) - |> Repo.update() + set_state(backup, :running, 0) + + current_pid = self() + + Task.Supervisor.async_nolink( + Pleroma.TaskSupervisor, + fn -> + with {:ok, zip_file} <- export(backup, current_pid), + {:ok, %{size: size}} <- File.stat(zip_file), + {:ok, _upload} <- upload(backup, zip_file) do + backup + |> cast( + %{ + file_size: size, + processed: true, + state: :complete + }, + [:file_size, :processed, :state] + ) + |> Repo.update() + + send(current_pid, :completed) + end + end + ) + + wait_backup(backup, backup.processed_number) + end + + defp wait_backup(backup, current_processed) do + receive do + {:progress, new_processed} -> + total_processed = current_processed + new_processed + + with {:ok, updated_backup} <- set_state(backup, :running, total_processed) do + wait_backup(updated_backup, total_processed) + else + _ -> wait_backup(backup, total_processed) + end + + :completed -> + {:ok, get(backup.id)} + after + 30_000 -> set_state(backup, :failed) end end @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json'] - def export(%__MODULE__{} = backup) do + def export(%__MODULE__{} = backup, caller_pid \\ nil) do backup = Repo.preload(backup, :user) name = String.trim_trailing(backup.file_name, ".zip") dir = dir(name) with :ok <- File.mkdir(dir), - :ok <- actor(dir, backup.user), - :ok <- statuses(dir, backup.user), - :ok <- likes(dir, backup.user), - :ok <- bookmarks(dir, backup.user), + :ok <- actor(dir, backup.user, caller_pid), + :ok <- statuses(dir, backup.user, caller_pid), + :ok <- likes(dir, backup.user, caller_pid), + :ok <- bookmarks(dir, backup.user, caller_pid), {:ok, zip_path} <- :zip.create(String.to_charlist(dir <> ".zip"), @files, cwd: dir), {:ok, _} <- File.rm_rf(dir) do {:ok, to_string(zip_path)} @@ -157,11 +211,12 @@ def upload(%__MODULE__{} = backup, zip_path) do end end - defp actor(dir, user) do + defp actor(dir, user, caller_pid) do with {:ok, json} <- UserView.render("user.json", %{user: user}) |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"}) |> Jason.encode() do + send(caller_pid, {:progress, 1}) File.write(Path.join(dir, "actor.json"), json) end end @@ -180,7 +235,9 @@ defp write_header(file, name) do ) end - defp write(query, dir, name, fun) do + defp should_report?(num), do: rem(num, @report_every) == 0 + + defp write(query, dir, name, fun, caller_pid) do path = Path.join(dir, "#{name}.json") with {:ok, file} <- File.open(path, [:write, :utf8]), @@ -192,35 +249,41 @@ defp write(query, dir, name, fun) do with {:ok, data} <- fun.(i), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do + if should_report?(acc + 1) do + send(caller_pid, {:progress, @report_every}) + end + acc + 1 else _ -> acc end end) + send(caller_pid, {:progress, rem(total, @report_every)}) + with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do File.close(file) end end end - defp bookmarks(dir, %{id: user_id} = _user) do + defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do Bookmark |> where(user_id: ^user_id) |> join(:inner, [b], activity in assoc(b, :activity)) |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)}) - |> write(dir, "bookmarks", fn a -> {:ok, a.object} end) + |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid) end - defp likes(dir, user) do + defp likes(dir, user, caller_pid) do user.ap_id |> Activity.Queries.by_actor() |> Activity.Queries.by_type("Like") |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)}) - |> write(dir, "likes", fn a -> {:ok, a.object} end) + |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid) end - defp statuses(dir, user) do + defp statuses(dir, user, caller_pid) do opts = %{} |> Map.put(:type, ["Create", "Announce"]) @@ -233,10 +296,15 @@ defp statuses(dir, user) do ] |> Enum.concat() |> ActivityPub.fetch_activities_query(opts) - |> write(dir, "outbox", fn a -> - with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do - {:ok, Map.delete(activity, "@context")} - end - end) + |> write( + dir, + "outbox", + fn a -> + with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do + {:ok, Map.delete(activity, "@context")} + end + end, + caller_pid + ) end end diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index 12ee70f00b..a485ddb4b4 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -51,7 +51,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do end @impl Oban.Worker - def timeout(_job), do: :timer.seconds(900) + def timeout(_job), do: :infinity defp has_email?(user) do not is_nil(user.email) and user.email != "" diff --git a/priv/repo/migrations/20221216052127_add_state_to_backups.exs b/priv/repo/migrations/20221216052127_add_state_to_backups.exs new file mode 100644 index 0000000000..73b30fc358 --- /dev/null +++ b/priv/repo/migrations/20221216052127_add_state_to_backups.exs @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Repo.Migrations.AddStateToBackups do + use Ecto.Migration + + def up do + alter table(:backups) do + add(:state, :integer, default: 5) + add(:processed_number, :integer, default: 0) + end + end + + def down do + alter table(:backups) do + remove(:state) + remove(:processed_number) + end + end +end diff --git a/test/pleroma/user/backup_test.exs b/test/pleroma/user/backup_test.exs index 5c9b940007..a536b4a4a8 100644 --- a/test/pleroma/user/backup_test.exs +++ b/test/pleroma/user/backup_test.exs @@ -39,7 +39,7 @@ test "it creates a backup record and an Oban job" do assert_enqueued(worker: BackupWorker, args: args) backup = Backup.get(args["backup_id"]) - assert %Backup{user_id: ^user_id, processed: false, file_size: 0} = backup + assert %Backup{user_id: ^user_id, processed: false, file_size: 0, state: :pending} = backup end test "it return an error if the export limit is over" do @@ -59,7 +59,30 @@ test "it process a backup record" do assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) assert {:ok, backup} = perform_job(BackupWorker, args) assert backup.file_size > 0 - assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup + assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup + + delete_job_args = %{"op" => "delete", "backup_id" => backup_id} + + assert_enqueued(worker: BackupWorker, args: delete_job_args) + assert {:ok, backup} = perform_job(BackupWorker, delete_job_args) + refute Backup.get(backup_id) + + email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup) + + assert_email_sent( + to: {user.name, user.email}, + html_body: email.html_body + ) + end + + test "it updates states of the backup" do + clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) + %{id: user_id} = user = insert(:user) + + assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) + assert {:ok, backup} = perform_job(BackupWorker, args) + assert backup.file_size > 0 + assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup delete_job_args = %{"op" => "delete", "backup_id" => backup_id} @@ -148,7 +171,7 @@ test "it creates a zip archive with user data" do Bookmark.create(user.id, status3.id) assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - assert {:ok, path} = Backup.export(backup) + assert {:ok, path} = Backup.export(backup, self()) assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(path), [:memory]) assert {:ok, {'actor.json', json}} = :zip.zip_get('actor.json', zipfile) @@ -230,6 +253,23 @@ test "it creates a zip archive with user data" do File.rm!(path) end + test "it counts the correct number processed" do + user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) + + Enum.map(1..120, fn i -> + {:ok, status} = CommonAPI.post(user, %{status: "status #{i}"}) + CommonAPI.favorite(user, status.id) + Bookmark.create(user.id, status.id) + end) + + assert {:ok, backup} = user |> Backup.new() |> Repo.insert() + {:ok, backup} = Backup.process(backup) + + assert backup.processed_number == 1 + 120 + 120 + 120 + + Backup.delete(backup) + end + describe "it uploads and deletes a backup archive" do setup do clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com") @@ -246,7 +286,7 @@ test "it creates a zip archive with user data" do Bookmark.create(user.id, status3.id) assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - assert {:ok, path} = Backup.export(backup) + assert {:ok, path} = Backup.export(backup, self()) [path: path, backup: backup] end