Refactor backups to be fully controlled by Oban
This commit is contained in:
parent
ff663c0aeb
commit
8f285a787f
11 changed files with 209 additions and 296 deletions
1
changelog.d/backups-refactor.change
Normal file
1
changelog.d/backups-refactor.change
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Refactor the user backups code and improve test coverage
|
|
@ -187,6 +187,8 @@
|
||||||
config :pleroma, Pleroma.Web.RichMedia.Backfill,
|
config :pleroma, Pleroma.Web.RichMedia.Backfill,
|
||||||
stream_out: Pleroma.Web.ActivityPub.ActivityPubMock
|
stream_out: Pleroma.Web.ActivityPub.ActivityPubMock
|
||||||
|
|
||||||
|
config :pleroma, Pleroma.User.Backup, tempdir: "test/tmp"
|
||||||
|
|
||||||
if File.exists?("./config/test.secret.exs") do
|
if File.exists?("./config/test.secret.exs") do
|
||||||
import_config "test.secret.exs"
|
import_config "test.secret.exs"
|
||||||
else
|
else
|
||||||
|
|
|
@ -27,11 +27,3 @@
|
||||||
failed: 4,
|
failed: 4,
|
||||||
manual: 5
|
manual: 5
|
||||||
)
|
)
|
||||||
|
|
||||||
defenum(Pleroma.User.Backup.State,
|
|
||||||
pending: 1,
|
|
||||||
running: 2,
|
|
||||||
complete: 3,
|
|
||||||
failed: 4,
|
|
||||||
invalid: 5
|
|
||||||
)
|
|
||||||
|
|
|
@ -345,37 +345,22 @@ def unsubscribe_url(user, notifications_type) do
|
||||||
Router.Helpers.subscription_url(Endpoint, :unsubscribe, token)
|
Router.Helpers.subscription_url(Endpoint, :unsubscribe, token)
|
||||||
end
|
end
|
||||||
|
|
||||||
def backup_is_ready_email(backup, admin_user_id \\ nil) do
|
def backup_is_ready_email(backup) do
|
||||||
%{user: user} = Pleroma.Repo.preload(backup, :user)
|
%{user: user} = Pleroma.Repo.preload(backup, :user)
|
||||||
|
|
||||||
Gettext.with_locale_or_default user.language do
|
Gettext.with_locale_or_default user.language do
|
||||||
download_url = Pleroma.Web.PleromaAPI.BackupView.download_url(backup)
|
download_url = Pleroma.Web.PleromaAPI.BackupView.download_url(backup)
|
||||||
|
|
||||||
html_body =
|
html_body =
|
||||||
if is_nil(admin_user_id) do
|
Gettext.dpgettext(
|
||||||
Gettext.dpgettext(
|
"static_pages",
|
||||||
"static_pages",
|
"account archive email body",
|
||||||
"account archive email body - self-requested",
|
"""
|
||||||
"""
|
<p>A full backup of your Pleroma account was requested. It's ready for download:</p>
|
||||||
<p>You requested a full backup of your Pleroma account. It's ready for download:</p>
|
<p><a href="%{download_url}">%{download_url}</a></p>
|
||||||
<p><a href="%{download_url}">%{download_url}</a></p>
|
""",
|
||||||
""",
|
download_url: download_url
|
||||||
download_url: download_url
|
)
|
||||||
)
|
|
||||||
else
|
|
||||||
admin = Pleroma.Repo.get(User, admin_user_id)
|
|
||||||
|
|
||||||
Gettext.dpgettext(
|
|
||||||
"static_pages",
|
|
||||||
"account archive email body - admin requested",
|
|
||||||
"""
|
|
||||||
<p>Admin @%{admin_nickname} requested a full backup of your Pleroma account. It's ready for download:</p>
|
|
||||||
<p><a href="%{download_url}">%{download_url}</a></p>
|
|
||||||
""",
|
|
||||||
admin_nickname: admin.nickname,
|
|
||||||
download_url: download_url
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
new()
|
new()
|
||||||
|> to(recipient(user))
|
|> to(recipient(user))
|
||||||
|
|
|
@ -14,9 +14,10 @@ defmodule Pleroma.User.Backup do
|
||||||
|
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Bookmark
|
alias Pleroma.Bookmark
|
||||||
|
alias Pleroma.Config
|
||||||
alias Pleroma.Repo
|
alias Pleroma.Repo
|
||||||
|
alias Pleroma.Uploaders.Uploader
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.User.Backup.State
|
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||||
alias Pleroma.Web.ActivityPub.UserView
|
alias Pleroma.Web.ActivityPub.UserView
|
||||||
|
@ -29,71 +30,111 @@ defmodule Pleroma.User.Backup do
|
||||||
field(:file_name, :string)
|
field(:file_name, :string)
|
||||||
field(:file_size, :integer, default: 0)
|
field(:file_size, :integer, default: 0)
|
||||||
field(:processed, :boolean, default: false)
|
field(:processed, :boolean, default: false)
|
||||||
field(:state, State, default: :invalid)
|
field(:tempdir, :string)
|
||||||
field(:processed_number, :integer, default: 0)
|
|
||||||
|
|
||||||
belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
|
belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
|
||||||
|
|
||||||
timestamps()
|
timestamps()
|
||||||
end
|
end
|
||||||
|
|
||||||
@config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
|
@doc """
|
||||||
|
Schedules a job to backup a user if the number of backup requests has not exceeded the limit.
|
||||||
|
|
||||||
def create(user, admin_id \\ nil) do
|
Admins can directly call new/1 and schedule_backup/1 to bypass the limit.
|
||||||
with :ok <- validate_limit(user, admin_id),
|
"""
|
||||||
{:ok, backup} <- user |> new() |> Repo.insert() do
|
@spec user(User.t()) :: {:ok, t()} | {:error, any()}
|
||||||
BackupWorker.process(backup, admin_id)
|
def user(user) do
|
||||||
|
days = Config.get([__MODULE__, :limit_days])
|
||||||
|
|
||||||
|
with true <- permitted?(user),
|
||||||
|
%__MODULE__{} = backup <- new(user),
|
||||||
|
{:ok, inserted_backup} <- Repo.insert(backup),
|
||||||
|
{:ok, %Oban.Job{}} <- schedule_backup(inserted_backup) do
|
||||||
|
{:ok, inserted_backup}
|
||||||
|
else
|
||||||
|
false ->
|
||||||
|
{:error,
|
||||||
|
dngettext(
|
||||||
|
"errors",
|
||||||
|
"Last export was less than a day ago",
|
||||||
|
"Last export was less than %{days} days ago",
|
||||||
|
days,
|
||||||
|
days: days
|
||||||
|
)}
|
||||||
|
|
||||||
|
e ->
|
||||||
|
{:error, e}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@doc "Generates a %Backup{} for a user with a random file name"
|
||||||
|
@spec new(User.t()) :: t()
|
||||||
def new(user) do
|
def new(user) do
|
||||||
rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false)
|
rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false)
|
||||||
datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now())
|
datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now())
|
||||||
name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip"
|
name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip"
|
||||||
|
|
||||||
%__MODULE__{
|
%__MODULE__{
|
||||||
user_id: user.id,
|
|
||||||
content_type: "application/zip",
|
content_type: "application/zip",
|
||||||
file_name: name,
|
file_name: name,
|
||||||
state: :pending
|
tempdir: tempdir(),
|
||||||
|
user: user
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
def delete(backup) do
|
@doc "Schedules the execution of the provided backup"
|
||||||
uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
|
@spec schedule_backup(t()) :: {:ok, Oban.Job.t()} | {:error, any()}
|
||||||
|
def schedule_backup(backup) do
|
||||||
|
with false <- is_nil(backup.id) do
|
||||||
|
%{"op" => "process", "backup_id" => backup.id}
|
||||||
|
|> BackupWorker.new()
|
||||||
|
|> Oban.insert()
|
||||||
|
else
|
||||||
|
true ->
|
||||||
|
{:error, "Backup is missing id. Please insert it into the Repo first."}
|
||||||
|
|
||||||
|
e ->
|
||||||
|
{:error, e}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc "Deletes the backup archive file and removes the database record"
|
||||||
|
@spec delete_archive(t()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}
|
||||||
|
def delete_archive(backup) do
|
||||||
|
uploader = Config.get([Pleroma.Upload, :uploader])
|
||||||
|
|
||||||
with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do
|
with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do
|
||||||
Repo.delete(backup)
|
Repo.delete(backup)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok
|
@doc "Schedules a job to delete the backup archive"
|
||||||
|
@spec schedule_delete(t()) :: {:ok, Oban.Job.t()} | {:error, any()}
|
||||||
|
def schedule_delete(backup) do
|
||||||
|
days = Config.get([__MODULE__, :purge_after_days])
|
||||||
|
time = 60 * 60 * 24 * days
|
||||||
|
scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time)
|
||||||
|
|
||||||
defp validate_limit(user, nil) do
|
%{"op" => "delete", "backup_id" => backup.id}
|
||||||
case get_last(user.id) do
|
|> BackupWorker.new(scheduled_at: scheduled_at)
|
||||||
%__MODULE__{inserted_at: inserted_at} ->
|
|> Oban.insert()
|
||||||
days = Pleroma.Config.get([__MODULE__, :limit_days])
|
end
|
||||||
diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days)
|
|
||||||
|
|
||||||
if diff > days do
|
defp permitted?(user) do
|
||||||
:ok
|
with {_, %__MODULE__{inserted_at: inserted_at}} <- {:last, get_last(user)},
|
||||||
else
|
days = Config.get([__MODULE__, :limit_days]),
|
||||||
{:error,
|
diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days),
|
||||||
dngettext(
|
{_, true} <- {:diff, diff > days} do
|
||||||
"errors",
|
true
|
||||||
"Last export was less than a day ago",
|
else
|
||||||
"Last export was less than %{days} days ago",
|
{:last, nil} -> true
|
||||||
days,
|
{:diff, false} -> false
|
||||||
days: days
|
|
||||||
)}
|
|
||||||
end
|
|
||||||
|
|
||||||
nil ->
|
|
||||||
:ok
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_last(user_id) do
|
@doc "Returns last backup for the provided user"
|
||||||
|
@spec get_last(User.t()) :: t()
|
||||||
|
def get_last(%User{id: user_id}) do
|
||||||
__MODULE__
|
__MODULE__
|
||||||
|> where(user_id: ^user_id)
|
|> where(user_id: ^user_id)
|
||||||
|> order_by(desc: :id)
|
|> order_by(desc: :id)
|
||||||
|
@ -101,6 +142,8 @@ def get_last(user_id) do
|
||||||
|> Repo.one()
|
|> Repo.one()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@doc "Lists all existing backups for a user"
|
||||||
|
@spec list(User.t()) :: [Ecto.Schema.t() | term()]
|
||||||
def list(%User{id: user_id}) do
|
def list(%User{id: user_id}) do
|
||||||
__MODULE__
|
__MODULE__
|
||||||
|> where(user_id: ^user_id)
|
|> where(user_id: ^user_id)
|
||||||
|
@ -108,94 +151,37 @@ def list(%User{id: user_id}) do
|
||||||
|> Repo.all()
|
|> Repo.all()
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do
|
@doc "Schedules deletion of all but the the most recent backup"
|
||||||
__MODULE__
|
@spec remove_outdated(User.t()) :: :ok
|
||||||
|> where(user_id: ^user_id)
|
def remove_outdated(user) do
|
||||||
|> where([b], b.id != ^latest_id)
|
with %__MODULE__{} = latest_backup <- get_last(user) do
|
||||||
|> Repo.all()
|
__MODULE__
|
||||||
|> Enum.each(&BackupWorker.delete/1)
|
|> where(user_id: ^user.id)
|
||||||
end
|
|> where([b], b.id != ^latest_backup.id)
|
||||||
|
|> Repo.all()
|
||||||
def get(id), do: Repo.get(__MODULE__, id)
|
|> Enum.each(&schedule_delete/1)
|
||||||
|
else
|
||||||
defp set_state(backup, state, processed_number \\ nil) do
|
_ -> :ok
|
||||||
struct =
|
|
||||||
%{state: state}
|
|
||||||
|> Pleroma.Maps.put_if_present(:processed_number, processed_number)
|
|
||||||
|
|
||||||
backup
|
|
||||||
|> cast(struct, [:state, :processed_number])
|
|
||||||
|> Repo.update()
|
|
||||||
end
|
|
||||||
|
|
||||||
def process(
|
|
||||||
%__MODULE__{} = backup,
|
|
||||||
processor_module \\ __MODULE__.Processor
|
|
||||||
) do
|
|
||||||
set_state(backup, :running, 0)
|
|
||||||
|
|
||||||
current_pid = self()
|
|
||||||
|
|
||||||
task =
|
|
||||||
Task.Supervisor.async_nolink(
|
|
||||||
Pleroma.TaskSupervisor,
|
|
||||||
processor_module,
|
|
||||||
:do_process,
|
|
||||||
[backup, current_pid]
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_backup(backup, backup.processed_number, task)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp wait_backup(backup, current_processed, task) do
|
|
||||||
wait_time = @config_impl.get([__MODULE__, :process_wait_time])
|
|
||||||
|
|
||||||
receive do
|
|
||||||
{:progress, new_processed} ->
|
|
||||||
total_processed = current_processed + new_processed
|
|
||||||
|
|
||||||
set_state(backup, :running, total_processed)
|
|
||||||
wait_backup(backup, total_processed, task)
|
|
||||||
|
|
||||||
{:DOWN, _ref, _proc, _pid, reason} ->
|
|
||||||
backup = get(backup.id)
|
|
||||||
|
|
||||||
if reason != :normal do
|
|
||||||
Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
|
|
||||||
|
|
||||||
{:ok, backup} = set_state(backup, :failed)
|
|
||||||
|
|
||||||
cleanup(backup)
|
|
||||||
|
|
||||||
{:error,
|
|
||||||
%{
|
|
||||||
backup: backup,
|
|
||||||
reason: :exit,
|
|
||||||
details: reason
|
|
||||||
}}
|
|
||||||
else
|
|
||||||
{:ok, backup}
|
|
||||||
end
|
|
||||||
after
|
|
||||||
wait_time ->
|
|
||||||
Logger.error(
|
|
||||||
"Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
|
|
||||||
)
|
|
||||||
|
|
||||||
Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
|
|
||||||
|
|
||||||
{:ok, backup} = set_state(backup, :failed)
|
|
||||||
|
|
||||||
cleanup(backup)
|
|
||||||
|
|
||||||
{:error,
|
|
||||||
%{
|
|
||||||
backup: backup,
|
|
||||||
reason: :timeout
|
|
||||||
}}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def get_by_id(id), do: Repo.get(__MODULE__, id)
|
||||||
|
|
||||||
|
@doc "Generates changeset for %Pleroma.User.Backup{}"
|
||||||
|
@spec changeset(%__MODULE__{}, map()) :: %Ecto.Changeset{}
|
||||||
|
def changeset(backup \\ %__MODULE__{}, attrs) do
|
||||||
|
backup
|
||||||
|
|> cast(attrs, [:content_type, :file_name, :file_size, :processed, :tempdir])
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc "Updates the backup record"
|
||||||
|
@spec update_record(%__MODULE__{}, map()) :: {:ok, %__MODULE__{}} | {:error, %Ecto.Changeset{}}
|
||||||
|
def update_record(%__MODULE__{} = backup, attrs) do
|
||||||
|
backup
|
||||||
|
|> changeset(attrs)
|
||||||
|
|> Repo.update()
|
||||||
|
end
|
||||||
|
|
||||||
@files [
|
@files [
|
||||||
~c"actor.json",
|
~c"actor.json",
|
||||||
~c"outbox.json",
|
~c"outbox.json",
|
||||||
|
@ -204,53 +190,66 @@ defp wait_backup(backup, current_processed, task) do
|
||||||
~c"followers.json",
|
~c"followers.json",
|
||||||
~c"following.json"
|
~c"following.json"
|
||||||
]
|
]
|
||||||
@spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error
|
|
||||||
def export(%__MODULE__{} = backup, caller_pid) do
|
|
||||||
backup = Repo.preload(backup, :user)
|
|
||||||
dir = backup_tempdir(backup)
|
|
||||||
|
|
||||||
with :ok <- File.mkdir(dir),
|
@spec run(t()) :: {:ok, t()} | {:error, :failed}
|
||||||
:ok <- actor(dir, backup.user, caller_pid),
|
def run(%__MODULE__{} = backup) do
|
||||||
:ok <- statuses(dir, backup.user, caller_pid),
|
backup = Repo.preload(backup, :user)
|
||||||
:ok <- likes(dir, backup.user, caller_pid),
|
tempfile = Path.join([backup.tempdir, backup.file_name])
|
||||||
:ok <- bookmarks(dir, backup.user, caller_pid),
|
|
||||||
:ok <- followers(dir, backup.user, caller_pid),
|
with {_, :ok} <- {:mkdir, File.mkdir_p(backup.tempdir)},
|
||||||
:ok <- following(dir, backup.user, caller_pid),
|
{_, :ok} <- {:actor, actor(backup.tempdir, backup.user)},
|
||||||
{:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir),
|
{_, :ok} <- {:statuses, statuses(backup.tempdir, backup.user)},
|
||||||
{:ok, _} <- File.rm_rf(dir) do
|
{_, :ok} <- {:likes, likes(backup.tempdir, backup.user)},
|
||||||
{:ok, zip_path}
|
{_, :ok} <- {:bookmarks, bookmarks(backup.tempdir, backup.user)},
|
||||||
|
{_, :ok} <- {:followers, followers(backup.tempdir, backup.user)},
|
||||||
|
{_, :ok} <- {:following, following(backup.tempdir, backup.user)},
|
||||||
|
{_, {:ok, _zip_path}} <-
|
||||||
|
{:zip, :zip.create(to_charlist(tempfile), @files, cwd: to_charlist(backup.tempdir))},
|
||||||
|
{_, {:ok, %File.Stat{size: zip_size}}} <- {:filestat, File.stat(tempfile)},
|
||||||
|
{:ok, updated_backup} <- update_record(backup, %{file_size: zip_size}) do
|
||||||
|
{:ok, updated_backup}
|
||||||
else
|
else
|
||||||
_ -> :error
|
_ ->
|
||||||
|
File.rm_rf(backup.tempdir)
|
||||||
|
{:error, :failed}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def dir(name) do
|
defp tempdir do
|
||||||
dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!()
|
case Config.get([__MODULE__, :tempdir]) do
|
||||||
Path.join(dir, name)
|
nil ->
|
||||||
|
System.tmp_dir!()
|
||||||
|
|
||||||
|
path ->
|
||||||
|
rand = :crypto.strong_rand_bytes(8) |> Base.url_encode64(padding: false)
|
||||||
|
Path.join([path, rand])
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def upload(%__MODULE__{} = backup, zip_path) do
|
@doc "Uploads the completed backup and marks it as processed"
|
||||||
uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
|
@spec upload(t()) :: {:ok, t()}
|
||||||
|
def upload(%__MODULE__{tempdir: tempdir} = backup) when is_binary(tempdir) do
|
||||||
|
uploader = Config.get([Pleroma.Upload, :uploader])
|
||||||
|
|
||||||
upload = %Pleroma.Upload{
|
upload = %Pleroma.Upload{
|
||||||
name: backup.file_name,
|
name: backup.file_name,
|
||||||
tempfile: zip_path,
|
tempfile: Path.join([tempdir, backup.file_name]),
|
||||||
content_type: backup.content_type,
|
content_type: backup.content_type,
|
||||||
path: Path.join("backups", backup.file_name)
|
path: Path.join("backups", backup.file_name)
|
||||||
}
|
}
|
||||||
|
|
||||||
with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload),
|
with {:ok, _} <- Uploader.put_file(uploader, upload),
|
||||||
:ok <- File.rm(zip_path) do
|
{:ok, uploaded_backup} <- update_record(backup, %{processed: true}),
|
||||||
{:ok, upload}
|
{:ok, _} <- File.rm_rf(tempdir) do
|
||||||
|
{:ok, uploaded_backup}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp actor(dir, user, caller_pid) do
|
defp actor(dir, user) do
|
||||||
with {:ok, json} <-
|
with {:ok, json} <-
|
||||||
UserView.render("user.json", %{user: user})
|
UserView.render("user.json", %{user: user})
|
||||||
|> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
|
|> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
|
||||||
|> Jason.encode() do
|
|> Jason.encode() do
|
||||||
send(caller_pid, {:progress, 1})
|
|
||||||
File.write(Path.join(dir, "actor.json"), json)
|
File.write(Path.join(dir, "actor.json"), json)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -269,22 +268,10 @@ defp write_header(file, name) do
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
|
defp write(query, dir, name, fun) do
|
||||||
|
|
||||||
defp backup_tempdir(backup) do
|
|
||||||
name = String.trim_trailing(backup.file_name, ".zip")
|
|
||||||
dir(name)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp cleanup(backup) do
|
|
||||||
dir = backup_tempdir(backup)
|
|
||||||
File.rm_rf(dir)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp write(query, dir, name, fun, caller_pid) do
|
|
||||||
path = Path.join(dir, "#{name}.json")
|
path = Path.join(dir, "#{name}.json")
|
||||||
|
|
||||||
chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
|
chunk_size = Config.get([__MODULE__, :process_chunk_size])
|
||||||
|
|
||||||
with {:ok, file} <- File.open(path, [:write, :utf8]),
|
with {:ok, file} <- File.open(path, [:write, :utf8]),
|
||||||
:ok <- write_header(file, name) do
|
:ok <- write_header(file, name) do
|
||||||
|
@ -300,10 +287,6 @@ defp write(query, dir, name, fun, caller_pid) do
|
||||||
end),
|
end),
|
||||||
{:ok, str} <- Jason.encode(data),
|
{:ok, str} <- Jason.encode(data),
|
||||||
:ok <- IO.write(file, str <> ",\n") do
|
:ok <- IO.write(file, str <> ",\n") do
|
||||||
if should_report?(acc + 1, chunk_size) do
|
|
||||||
send(caller_pid, {:progress, chunk_size})
|
|
||||||
end
|
|
||||||
|
|
||||||
acc + 1
|
acc + 1
|
||||||
else
|
else
|
||||||
{:error, e} ->
|
{:error, e} ->
|
||||||
|
@ -318,31 +301,29 @@ defp write(query, dir, name, fun, caller_pid) do
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
send(caller_pid, {:progress, rem(total, chunk_size)})
|
|
||||||
|
|
||||||
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
|
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
|
||||||
File.close(file)
|
File.close(file)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
|
defp bookmarks(dir, %{id: user_id} = _user) do
|
||||||
Bookmark
|
Bookmark
|
||||||
|> where(user_id: ^user_id)
|
|> where(user_id: ^user_id)
|
||||||
|> join(:inner, [b], activity in assoc(b, :activity))
|
|> join(:inner, [b], activity in assoc(b, :activity))
|
||||||
|> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
|
|> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
|
||||||
|> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
|
|> write(dir, "bookmarks", fn a -> {:ok, a.object} end)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp likes(dir, user, caller_pid) do
|
defp likes(dir, user) do
|
||||||
user.ap_id
|
user.ap_id
|
||||||
|> Activity.Queries.by_actor()
|
|> Activity.Queries.by_actor()
|
||||||
|> Activity.Queries.by_type("Like")
|
|> Activity.Queries.by_type("Like")
|
||||||
|> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
|
|> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
|
||||||
|> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
|
|> write(dir, "likes", fn a -> {:ok, a.object} end)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp statuses(dir, user, caller_pid) do
|
defp statuses(dir, user) do
|
||||||
opts =
|
opts =
|
||||||
%{}
|
%{}
|
||||||
|> Map.put(:type, ["Create", "Announce"])
|
|> Map.put(:type, ["Create", "Announce"])
|
||||||
|
@ -362,52 +343,17 @@ defp statuses(dir, user, caller_pid) do
|
||||||
with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
|
with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
|
||||||
{:ok, Map.delete(activity, "@context")}
|
{:ok, Map.delete(activity, "@context")}
|
||||||
end
|
end
|
||||||
end,
|
end
|
||||||
caller_pid
|
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp followers(dir, user, caller_pid) do
|
defp followers(dir, user) do
|
||||||
User.get_followers_query(user)
|
User.get_followers_query(user)
|
||||||
|> write(dir, "followers", fn a -> {:ok, a.ap_id} end, caller_pid)
|
|> write(dir, "followers", fn a -> {:ok, a.ap_id} end)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp following(dir, user, caller_pid) do
|
defp following(dir, user) do
|
||||||
User.get_friends_query(user)
|
User.get_friends_query(user)
|
||||||
|> write(dir, "following", fn a -> {:ok, a.ap_id} end, caller_pid)
|
|> write(dir, "following", fn a -> {:ok, a.ap_id} end)
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defmodule Pleroma.User.Backup.ProcessorAPI do
|
|
||||||
@callback do_process(%Pleroma.User.Backup{}, pid()) ::
|
|
||||||
{:ok, %Pleroma.User.Backup{}} | {:error, any()}
|
|
||||||
end
|
|
||||||
|
|
||||||
defmodule Pleroma.User.Backup.Processor do
|
|
||||||
@behaviour Pleroma.User.Backup.ProcessorAPI
|
|
||||||
|
|
||||||
alias Pleroma.Repo
|
|
||||||
alias Pleroma.User.Backup
|
|
||||||
|
|
||||||
import Ecto.Changeset
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def do_process(backup, current_pid) do
|
|
||||||
with {:ok, zip_file} <- Backup.export(backup, current_pid),
|
|
||||||
{:ok, %{size: size}} <- File.stat(zip_file),
|
|
||||||
{:ok, _upload} <- Backup.upload(backup, zip_file) do
|
|
||||||
backup
|
|
||||||
|> cast(
|
|
||||||
%{
|
|
||||||
file_size: size,
|
|
||||||
processed: true,
|
|
||||||
state: :complete
|
|
||||||
},
|
|
||||||
[:file_size, :processed, :state]
|
|
||||||
)
|
|
||||||
|> Repo.update()
|
|
||||||
else
|
|
||||||
e -> {:error, e}
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -13,6 +13,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do
|
||||||
alias Pleroma.ModerationLog
|
alias Pleroma.ModerationLog
|
||||||
alias Pleroma.Stats
|
alias Pleroma.Stats
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
|
alias Pleroma.User.Backup
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
alias Pleroma.Web.AdminAPI
|
alias Pleroma.Web.AdminAPI
|
||||||
alias Pleroma.Web.AdminAPI.AccountView
|
alias Pleroma.Web.AdminAPI.AccountView
|
||||||
|
@ -429,7 +430,9 @@ def stats(conn, params) do
|
||||||
|
|
||||||
def create_backup(%{assigns: %{user: admin}} = conn, %{"nickname" => nickname}) do
|
def create_backup(%{assigns: %{user: admin}} = conn, %{"nickname" => nickname}) do
|
||||||
with %User{} = user <- User.get_by_nickname(nickname),
|
with %User{} = user <- User.get_by_nickname(nickname),
|
||||||
{:ok, _} <- Pleroma.User.Backup.create(user, admin.id) do
|
%Backup{} = backup <- Backup.new(user),
|
||||||
|
{:ok, inserted_backup} <- Pleroma.Repo.insert(backup),
|
||||||
|
{:ok, %Oban.Job{}} <- Backup.schedule_backup(inserted_backup) do
|
||||||
ModerationLog.insert_log(%{actor: admin, subject: user, action: "create_backup"})
|
ModerationLog.insert_log(%{actor: admin, subject: user, action: "create_backup"})
|
||||||
|
|
||||||
json(conn, "")
|
json(conn, "")
|
||||||
|
|
|
@ -65,12 +65,7 @@ defp backup do
|
||||||
file_name: %Schema{type: :string},
|
file_name: %Schema{type: :string},
|
||||||
file_size: %Schema{type: :integer},
|
file_size: %Schema{type: :integer},
|
||||||
processed: %Schema{type: :boolean, description: "whether this backup has succeeded"},
|
processed: %Schema{type: :boolean, description: "whether this backup has succeeded"},
|
||||||
state: %Schema{
|
tempdir: %Schema{type: :string}
|
||||||
type: :string,
|
|
||||||
description: "the state of the backup",
|
|
||||||
enum: ["pending", "running", "complete", "failed"]
|
|
||||||
},
|
|
||||||
processed_number: %Schema{type: :integer, description: "the number of records processed"}
|
|
||||||
},
|
},
|
||||||
example: %{
|
example: %{
|
||||||
"content_type" => "application/zip",
|
"content_type" => "application/zip",
|
||||||
|
@ -79,8 +74,7 @@ defp backup do
|
||||||
"file_size" => 4105,
|
"file_size" => 4105,
|
||||||
"inserted_at" => "2020-09-08T16:42:07.000Z",
|
"inserted_at" => "2020-09-08T16:42:07.000Z",
|
||||||
"processed" => true,
|
"processed" => true,
|
||||||
"state" => "complete",
|
"tempdir" => "/tmp/PZIMw40vmpM"
|
||||||
"processed_number" => 20
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
|
@ -20,7 +20,7 @@ def index(%{assigns: %{user: user}} = conn, _params) do
|
||||||
end
|
end
|
||||||
|
|
||||||
def create(%{assigns: %{user: user}} = conn, _params) do
|
def create(%{assigns: %{user: user}} = conn, _params) do
|
||||||
with {:ok, _} <- Backup.create(user) do
|
with {:ok, _} <- Backup.user(user) do
|
||||||
backups = Backup.list(user)
|
backups = Backup.list(user)
|
||||||
render(conn, "index.json", backups: backups)
|
render(conn, "index.json", backups: backups)
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,22 +9,12 @@ defmodule Pleroma.Web.PleromaAPI.BackupView do
|
||||||
alias Pleroma.Web.CommonAPI.Utils
|
alias Pleroma.Web.CommonAPI.Utils
|
||||||
|
|
||||||
def render("show.json", %{backup: %Backup{} = backup}) do
|
def render("show.json", %{backup: %Backup{} = backup}) do
|
||||||
# To deal with records before the migration
|
|
||||||
state =
|
|
||||||
if backup.state == :invalid do
|
|
||||||
if backup.processed, do: :complete, else: :failed
|
|
||||||
else
|
|
||||||
backup.state
|
|
||||||
end
|
|
||||||
|
|
||||||
%{
|
%{
|
||||||
id: backup.id,
|
id: backup.id,
|
||||||
content_type: backup.content_type,
|
content_type: backup.content_type,
|
||||||
url: download_url(backup),
|
url: download_url(backup),
|
||||||
file_size: backup.file_size,
|
file_size: backup.file_size,
|
||||||
processed: backup.processed,
|
processed: backup.processed,
|
||||||
state: to_string(state),
|
|
||||||
processed_number: backup.processed_number,
|
|
||||||
inserted_at: Utils.to_masto_date(backup.inserted_at)
|
inserted_at: Utils.to_masto_date(backup.inserted_at)
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
|
@ -8,44 +8,25 @@ defmodule Pleroma.Workers.BackupWorker do
|
||||||
alias Oban.Job
|
alias Oban.Job
|
||||||
alias Pleroma.User.Backup
|
alias Pleroma.User.Backup
|
||||||
|
|
||||||
def process(backup, admin_user_id \\ nil) do
|
|
||||||
%{"op" => "process", "backup_id" => backup.id, "admin_user_id" => admin_user_id}
|
|
||||||
|> new()
|
|
||||||
|> Oban.insert()
|
|
||||||
end
|
|
||||||
|
|
||||||
def schedule_deletion(backup) do
|
|
||||||
days = Pleroma.Config.get([Backup, :purge_after_days])
|
|
||||||
time = 60 * 60 * 24 * days
|
|
||||||
scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time)
|
|
||||||
|
|
||||||
%{"op" => "delete", "backup_id" => backup.id}
|
|
||||||
|> new(scheduled_at: scheduled_at)
|
|
||||||
|> Oban.insert()
|
|
||||||
end
|
|
||||||
|
|
||||||
def delete(backup) do
|
|
||||||
%{"op" => "delete", "backup_id" => backup.id}
|
|
||||||
|> new()
|
|
||||||
|> Oban.insert()
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl Oban.Worker
|
@impl Oban.Worker
|
||||||
def perform(%Job{
|
def perform(%Job{
|
||||||
args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id}
|
args: %{"op" => "process", "backup_id" => backup_id}
|
||||||
}) do
|
}) do
|
||||||
with {:ok, %Backup{} = backup} <-
|
with {_, %Backup{} = backup} <- {:get, Backup.get_by_id(backup_id)},
|
||||||
backup_id |> Backup.get() |> Backup.process(),
|
{_, {:ok, updated_backup}} <- {:run, Backup.run(backup)},
|
||||||
{:ok, _job} <- schedule_deletion(backup),
|
{_, {:ok, uploaded_backup}} <- {:upload, Backup.upload(updated_backup)},
|
||||||
:ok <- Backup.remove_outdated(backup),
|
{_, {:ok, _job}} <- {:delete, Backup.schedule_delete(uploaded_backup)},
|
||||||
:ok <- maybe_deliver_email(backup, admin_user_id) do
|
{_, :ok} <- {:outdated, Backup.remove_outdated(uploaded_backup.user)},
|
||||||
{:ok, backup}
|
{_, :ok} <- {:email, maybe_deliver_email(uploaded_backup)} do
|
||||||
|
{:ok, uploaded_backup}
|
||||||
|
else
|
||||||
|
e -> {:error, e}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do
|
def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do
|
||||||
case Backup.get(backup_id) do
|
case Backup.get_by_id(backup_id) do
|
||||||
%Backup{} = backup -> Backup.delete(backup)
|
%Backup{} = backup -> Backup.delete_archive(backup)
|
||||||
nil -> :ok
|
nil -> :ok
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -57,13 +38,13 @@ defp has_email?(user) do
|
||||||
not is_nil(user.email) and user.email != ""
|
not is_nil(user.email) and user.email != ""
|
||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_deliver_email(backup, admin_user_id) do
|
defp maybe_deliver_email(backup) do
|
||||||
has_mailer = Pleroma.Config.get([Pleroma.Emails.Mailer, :enabled])
|
has_mailer = Pleroma.Config.get([Pleroma.Emails.Mailer, :enabled])
|
||||||
backup = backup |> Pleroma.Repo.preload(:user)
|
backup = backup |> Pleroma.Repo.preload(:user)
|
||||||
|
|
||||||
if has_email?(backup.user) and has_mailer do
|
if has_email?(backup.user) and has_mailer do
|
||||||
backup
|
backup
|
||||||
|> Pleroma.Emails.UserEmail.backup_is_ready_email(admin_user_id)
|
|> Pleroma.Emails.UserEmail.backup_is_ready_email()
|
||||||
|> Pleroma.Emails.Mailer.deliver()
|
|> Pleroma.Emails.Mailer.deliver()
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
|
|
19
priv/repo/migrations/20240622175346_backup_refactor.exs
Normal file
19
priv/repo/migrations/20240622175346_backup_refactor.exs
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.BackupRefactor do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def up do
|
||||||
|
alter table("backups") do
|
||||||
|
remove(:state)
|
||||||
|
remove(:processed_number)
|
||||||
|
add(:tempdir, :string)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
alter table("backups") do
|
||||||
|
add(:state, :integer, default: 5)
|
||||||
|
add(:processed_number, :integer, default: 0)
|
||||||
|
remove(:tempdir)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in a new issue