Detail backup states
This commit is contained in:
parent
6bce88b9e7
commit
e4ac2a7cd6
5 changed files with 166 additions and 29 deletions
|
@ -27,3 +27,11 @@
|
||||||
failed: 4,
|
failed: 4,
|
||||||
manual: 5
|
manual: 5
|
||||||
)
|
)
|
||||||
|
|
||||||
|
defenum(Pleroma.User.Backup.State,
|
||||||
|
pending: 1,
|
||||||
|
running: 2,
|
||||||
|
complete: 3,
|
||||||
|
failed: 4,
|
||||||
|
invalid: 5
|
||||||
|
)
|
||||||
|
|
|
@ -15,6 +15,7 @@ defmodule Pleroma.User.Backup do
|
||||||
alias Pleroma.Bookmark
|
alias Pleroma.Bookmark
|
||||||
alias Pleroma.Repo
|
alias Pleroma.Repo
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
|
alias Pleroma.User.Backup.State
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||||
alias Pleroma.Web.ActivityPub.UserView
|
alias Pleroma.Web.ActivityPub.UserView
|
||||||
|
@ -25,12 +26,16 @@ defmodule Pleroma.User.Backup do
|
||||||
field(:file_name, :string)
|
field(:file_name, :string)
|
||||||
field(:file_size, :integer, default: 0)
|
field(:file_size, :integer, default: 0)
|
||||||
field(:processed, :boolean, default: false)
|
field(:processed, :boolean, default: false)
|
||||||
|
field(:state, State, default: :invalid)
|
||||||
|
field(:processed_number, :integer, default: 0)
|
||||||
|
|
||||||
belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
|
belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
|
||||||
|
|
||||||
timestamps()
|
timestamps()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@report_every 100
|
||||||
|
|
||||||
def create(user, admin_id \\ nil) do
|
def create(user, admin_id \\ nil) do
|
||||||
with :ok <- validate_limit(user, admin_id),
|
with :ok <- validate_limit(user, admin_id),
|
||||||
{:ok, backup} <- user |> new() |> Repo.insert() do
|
{:ok, backup} <- user |> new() |> Repo.insert() do
|
||||||
|
@ -46,7 +51,8 @@ def new(user) do
|
||||||
%__MODULE__{
|
%__MODULE__{
|
||||||
user_id: user.id,
|
user_id: user.id,
|
||||||
content_type: "application/zip",
|
content_type: "application/zip",
|
||||||
file_name: name
|
file_name: name,
|
||||||
|
state: :pending
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -109,27 +115,75 @@ def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do
|
||||||
|
|
||||||
def get(id), do: Repo.get(__MODULE__, id)
|
def get(id), do: Repo.get(__MODULE__, id)
|
||||||
|
|
||||||
|
defp set_state(backup, state, processed_number \\ nil) do
|
||||||
|
struct =
|
||||||
|
%{state: state}
|
||||||
|
|> Pleroma.Maps.put_if_present(:processed_number, processed_number)
|
||||||
|
|
||||||
|
backup
|
||||||
|
|> cast(struct, [:state, :processed_number])
|
||||||
|
|> Repo.update()
|
||||||
|
end
|
||||||
|
|
||||||
def process(%__MODULE__{} = backup) do
|
def process(%__MODULE__{} = backup) do
|
||||||
with {:ok, zip_file} <- export(backup),
|
set_state(backup, :running, 0)
|
||||||
{:ok, %{size: size}} <- File.stat(zip_file),
|
|
||||||
{:ok, _upload} <- upload(backup, zip_file) do
|
current_pid = self()
|
||||||
backup
|
|
||||||
|> cast(%{file_size: size, processed: true}, [:file_size, :processed])
|
Task.Supervisor.async_nolink(
|
||||||
|> Repo.update()
|
Pleroma.TaskSupervisor,
|
||||||
|
fn ->
|
||||||
|
with {:ok, zip_file} <- export(backup, current_pid),
|
||||||
|
{:ok, %{size: size}} <- File.stat(zip_file),
|
||||||
|
{:ok, _upload} <- upload(backup, zip_file) do
|
||||||
|
backup
|
||||||
|
|> cast(
|
||||||
|
%{
|
||||||
|
file_size: size,
|
||||||
|
processed: true,
|
||||||
|
state: :complete
|
||||||
|
},
|
||||||
|
[:file_size, :processed, :state]
|
||||||
|
)
|
||||||
|
|> Repo.update()
|
||||||
|
|
||||||
|
send(current_pid, :completed)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
)
|
||||||
|
|
||||||
|
wait_backup(backup, backup.processed_number)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp wait_backup(backup, current_processed) do
|
||||||
|
receive do
|
||||||
|
{:progress, new_processed} ->
|
||||||
|
total_processed = current_processed + new_processed
|
||||||
|
|
||||||
|
with {:ok, updated_backup} <- set_state(backup, :running, total_processed) do
|
||||||
|
wait_backup(updated_backup, total_processed)
|
||||||
|
else
|
||||||
|
_ -> wait_backup(backup, total_processed)
|
||||||
|
end
|
||||||
|
|
||||||
|
:completed ->
|
||||||
|
{:ok, get(backup.id)}
|
||||||
|
after
|
||||||
|
30_000 -> set_state(backup, :failed)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
|
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
|
||||||
def export(%__MODULE__{} = backup) do
|
def export(%__MODULE__{} = backup, caller_pid \\ nil) do
|
||||||
backup = Repo.preload(backup, :user)
|
backup = Repo.preload(backup, :user)
|
||||||
name = String.trim_trailing(backup.file_name, ".zip")
|
name = String.trim_trailing(backup.file_name, ".zip")
|
||||||
dir = dir(name)
|
dir = dir(name)
|
||||||
|
|
||||||
with :ok <- File.mkdir(dir),
|
with :ok <- File.mkdir(dir),
|
||||||
:ok <- actor(dir, backup.user),
|
:ok <- actor(dir, backup.user, caller_pid),
|
||||||
:ok <- statuses(dir, backup.user),
|
:ok <- statuses(dir, backup.user, caller_pid),
|
||||||
:ok <- likes(dir, backup.user),
|
:ok <- likes(dir, backup.user, caller_pid),
|
||||||
:ok <- bookmarks(dir, backup.user),
|
:ok <- bookmarks(dir, backup.user, caller_pid),
|
||||||
{:ok, zip_path} <- :zip.create(String.to_charlist(dir <> ".zip"), @files, cwd: dir),
|
{:ok, zip_path} <- :zip.create(String.to_charlist(dir <> ".zip"), @files, cwd: dir),
|
||||||
{:ok, _} <- File.rm_rf(dir) do
|
{:ok, _} <- File.rm_rf(dir) do
|
||||||
{:ok, to_string(zip_path)}
|
{:ok, to_string(zip_path)}
|
||||||
|
@ -157,11 +211,12 @@ def upload(%__MODULE__{} = backup, zip_path) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp actor(dir, user) do
|
defp actor(dir, user, caller_pid) do
|
||||||
with {:ok, json} <-
|
with {:ok, json} <-
|
||||||
UserView.render("user.json", %{user: user})
|
UserView.render("user.json", %{user: user})
|
||||||
|> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
|
|> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
|
||||||
|> Jason.encode() do
|
|> Jason.encode() do
|
||||||
|
send(caller_pid, {:progress, 1})
|
||||||
File.write(Path.join(dir, "actor.json"), json)
|
File.write(Path.join(dir, "actor.json"), json)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -180,7 +235,9 @@ defp write_header(file, name) do
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp write(query, dir, name, fun) do
|
defp should_report?(num), do: rem(num, @report_every) == 0
|
||||||
|
|
||||||
|
defp write(query, dir, name, fun, caller_pid) do
|
||||||
path = Path.join(dir, "#{name}.json")
|
path = Path.join(dir, "#{name}.json")
|
||||||
|
|
||||||
with {:ok, file} <- File.open(path, [:write, :utf8]),
|
with {:ok, file} <- File.open(path, [:write, :utf8]),
|
||||||
|
@ -192,35 +249,41 @@ defp write(query, dir, name, fun) do
|
||||||
with {:ok, data} <- fun.(i),
|
with {:ok, data} <- fun.(i),
|
||||||
{:ok, str} <- Jason.encode(data),
|
{:ok, str} <- Jason.encode(data),
|
||||||
:ok <- IO.write(file, str <> ",\n") do
|
:ok <- IO.write(file, str <> ",\n") do
|
||||||
|
if should_report?(acc + 1) do
|
||||||
|
send(caller_pid, {:progress, @report_every})
|
||||||
|
end
|
||||||
|
|
||||||
acc + 1
|
acc + 1
|
||||||
else
|
else
|
||||||
_ -> acc
|
_ -> acc
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
send(caller_pid, {:progress, rem(total, @report_every)})
|
||||||
|
|
||||||
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
|
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
|
||||||
File.close(file)
|
File.close(file)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp bookmarks(dir, %{id: user_id} = _user) do
|
defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
|
||||||
Bookmark
|
Bookmark
|
||||||
|> where(user_id: ^user_id)
|
|> where(user_id: ^user_id)
|
||||||
|> join(:inner, [b], activity in assoc(b, :activity))
|
|> join(:inner, [b], activity in assoc(b, :activity))
|
||||||
|> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
|
|> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
|
||||||
|> write(dir, "bookmarks", fn a -> {:ok, a.object} end)
|
|> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp likes(dir, user) do
|
defp likes(dir, user, caller_pid) do
|
||||||
user.ap_id
|
user.ap_id
|
||||||
|> Activity.Queries.by_actor()
|
|> Activity.Queries.by_actor()
|
||||||
|> Activity.Queries.by_type("Like")
|
|> Activity.Queries.by_type("Like")
|
||||||
|> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
|
|> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
|
||||||
|> write(dir, "likes", fn a -> {:ok, a.object} end)
|
|> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp statuses(dir, user) do
|
defp statuses(dir, user, caller_pid) do
|
||||||
opts =
|
opts =
|
||||||
%{}
|
%{}
|
||||||
|> Map.put(:type, ["Create", "Announce"])
|
|> Map.put(:type, ["Create", "Announce"])
|
||||||
|
@ -233,10 +296,15 @@ defp statuses(dir, user) do
|
||||||
]
|
]
|
||||||
|> Enum.concat()
|
|> Enum.concat()
|
||||||
|> ActivityPub.fetch_activities_query(opts)
|
|> ActivityPub.fetch_activities_query(opts)
|
||||||
|> write(dir, "outbox", fn a ->
|
|> write(
|
||||||
with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
|
dir,
|
||||||
{:ok, Map.delete(activity, "@context")}
|
"outbox",
|
||||||
end
|
fn a ->
|
||||||
end)
|
with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
|
||||||
|
{:ok, Map.delete(activity, "@context")}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
caller_pid
|
||||||
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -51,7 +51,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl Oban.Worker
|
@impl Oban.Worker
|
||||||
def timeout(_job), do: :timer.seconds(900)
|
def timeout(_job), do: :infinity
|
||||||
|
|
||||||
defp has_email?(user) do
|
defp has_email?(user) do
|
||||||
not is_nil(user.email) and user.email != ""
|
not is_nil(user.email) and user.email != ""
|
||||||
|
|
21
priv/repo/migrations/20221216052127_add_state_to_backups.exs
Normal file
21
priv/repo/migrations/20221216052127_add_state_to_backups.exs
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Repo.Migrations.AddStateToBackups do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def up do
|
||||||
|
alter table(:backups) do
|
||||||
|
add(:state, :integer, default: 5)
|
||||||
|
add(:processed_number, :integer, default: 0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
alter table(:backups) do
|
||||||
|
remove(:state)
|
||||||
|
remove(:processed_number)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -39,7 +39,7 @@ test "it creates a backup record and an Oban job" do
|
||||||
assert_enqueued(worker: BackupWorker, args: args)
|
assert_enqueued(worker: BackupWorker, args: args)
|
||||||
|
|
||||||
backup = Backup.get(args["backup_id"])
|
backup = Backup.get(args["backup_id"])
|
||||||
assert %Backup{user_id: ^user_id, processed: false, file_size: 0} = backup
|
assert %Backup{user_id: ^user_id, processed: false, file_size: 0, state: :pending} = backup
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it return an error if the export limit is over" do
|
test "it return an error if the export limit is over" do
|
||||||
|
@ -59,7 +59,30 @@ test "it process a backup record" do
|
||||||
assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user)
|
assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user)
|
||||||
assert {:ok, backup} = perform_job(BackupWorker, args)
|
assert {:ok, backup} = perform_job(BackupWorker, args)
|
||||||
assert backup.file_size > 0
|
assert backup.file_size > 0
|
||||||
assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup
|
assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup
|
||||||
|
|
||||||
|
delete_job_args = %{"op" => "delete", "backup_id" => backup_id}
|
||||||
|
|
||||||
|
assert_enqueued(worker: BackupWorker, args: delete_job_args)
|
||||||
|
assert {:ok, backup} = perform_job(BackupWorker, delete_job_args)
|
||||||
|
refute Backup.get(backup_id)
|
||||||
|
|
||||||
|
email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup)
|
||||||
|
|
||||||
|
assert_email_sent(
|
||||||
|
to: {user.name, user.email},
|
||||||
|
html_body: email.html_body
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it updates states of the backup" do
|
||||||
|
clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local)
|
||||||
|
%{id: user_id} = user = insert(:user)
|
||||||
|
|
||||||
|
assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user)
|
||||||
|
assert {:ok, backup} = perform_job(BackupWorker, args)
|
||||||
|
assert backup.file_size > 0
|
||||||
|
assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup
|
||||||
|
|
||||||
delete_job_args = %{"op" => "delete", "backup_id" => backup_id}
|
delete_job_args = %{"op" => "delete", "backup_id" => backup_id}
|
||||||
|
|
||||||
|
@ -148,7 +171,7 @@ test "it creates a zip archive with user data" do
|
||||||
Bookmark.create(user.id, status3.id)
|
Bookmark.create(user.id, status3.id)
|
||||||
|
|
||||||
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
|
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
|
||||||
assert {:ok, path} = Backup.export(backup)
|
assert {:ok, path} = Backup.export(backup, self())
|
||||||
assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(path), [:memory])
|
assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(path), [:memory])
|
||||||
assert {:ok, {'actor.json', json}} = :zip.zip_get('actor.json', zipfile)
|
assert {:ok, {'actor.json', json}} = :zip.zip_get('actor.json', zipfile)
|
||||||
|
|
||||||
|
@ -230,6 +253,23 @@ test "it creates a zip archive with user data" do
|
||||||
File.rm!(path)
|
File.rm!(path)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "it counts the correct number processed" do
|
||||||
|
user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
|
||||||
|
|
||||||
|
Enum.map(1..120, fn i ->
|
||||||
|
{:ok, status} = CommonAPI.post(user, %{status: "status #{i}"})
|
||||||
|
CommonAPI.favorite(user, status.id)
|
||||||
|
Bookmark.create(user.id, status.id)
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
|
||||||
|
{:ok, backup} = Backup.process(backup)
|
||||||
|
|
||||||
|
assert backup.processed_number == 1 + 120 + 120 + 120
|
||||||
|
|
||||||
|
Backup.delete(backup)
|
||||||
|
end
|
||||||
|
|
||||||
describe "it uploads and deletes a backup archive" do
|
describe "it uploads and deletes a backup archive" do
|
||||||
setup do
|
setup do
|
||||||
clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com")
|
clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com")
|
||||||
|
@ -246,7 +286,7 @@ test "it creates a zip archive with user data" do
|
||||||
Bookmark.create(user.id, status3.id)
|
Bookmark.create(user.id, status3.id)
|
||||||
|
|
||||||
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
|
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
|
||||||
assert {:ok, path} = Backup.export(backup)
|
assert {:ok, path} = Backup.export(backup, self())
|
||||||
|
|
||||||
[path: path, backup: backup]
|
[path: path, backup: backup]
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue