From 88c1c76d3eca3412d1e02008f1b8d96fe8fe0b96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9l=C3=A8ne?= Date: Mon, 15 Aug 2022 01:15:23 +0200 Subject: [PATCH] Migrations: delete contexts with BaseMigrator Due to the lengthiness of this task, the migration has been adapted into a BaseMigrator migration, running in the background instead. --- config/config.exs | 2 + config/description.exs | 21 +++ lib/pleroma/application.ex | 3 +- lib/pleroma/data_migration.ex | 1 + .../context_objects_deletion_migrator.ex | 139 ++++++++++++++++++ ..._data_migration_delete_context_objects.exs | 13 +- 6 files changed, 173 insertions(+), 6 deletions(-) create mode 100644 lib/pleroma/migrators/context_objects_deletion_migrator.ex diff --git a/config/config.exs b/config/config.exs index 0fc9598077..eadc255ccd 100644 --- a/config/config.exs +++ b/config/config.exs @@ -673,6 +673,8 @@ config :pleroma, :populate_hashtags_table, fault_rate_allowance: 0.01 +config :pleroma, :delete_context_objects, fault_rate_allowance: 0.01 + config :pleroma, :env, Mix.env() config :http_signatures, diff --git a/config/description.exs b/config/description.exs index c6c6b1b5d2..c28447b371 100644 --- a/config/description.exs +++ b/config/description.exs @@ -495,6 +495,27 @@ } ] }, + %{ + group: :pleroma, + key: :delete_context_objects, + type: :group, + description: "`delete_context_objects` background migration settings", + children: [ + %{ + key: :fault_rate_allowance, + type: :float, + description: + "Max accepted rate of objects that failed in the migration. Any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if context object deletion failed for all records.", + suggestions: [0.01] + }, + %{ + key: :sleep_interval_ms, + type: :integer, + description: + "Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances)." + } + ] + }, %{ group: :pleroma, key: :instance, diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index d808bc732b..c546713caf 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -238,7 +238,8 @@ defp dont_run_in_test(_) do defp background_migrators do [ - Pleroma.Migrators.HashtagsTableMigrator + Pleroma.Migrators.HashtagsTableMigrator, + Pleroma.Migrators.ContextObjectsDeletionMigrator ] end diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex index 59d891d8db..8451678fc4 100644 --- a/lib/pleroma/data_migration.ex +++ b/lib/pleroma/data_migration.ex @@ -42,4 +42,5 @@ def get_by_name(name) do end def populate_hashtags_table, do: get_by_name("populate_hashtags_table") + def delete_context_objects, do: get_by_name("delete_context_objects") end diff --git a/lib/pleroma/migrators/context_objects_deletion_migrator.ex b/lib/pleroma/migrators/context_objects_deletion_migrator.ex new file mode 100644 index 0000000000..fb224795a4 --- /dev/null +++ b/lib/pleroma/migrators/context_objects_deletion_migrator.ex @@ -0,0 +1,139 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.ContextObjectsDeletionMigrator do + defmodule State do + use Pleroma.Migrators.Support.BaseMigratorState + + @impl Pleroma.Migrators.Support.BaseMigratorState + defdelegate data_migration(), to: Pleroma.DataMigration, as: :delete_context_objects + end + + use Pleroma.Migrators.Support.BaseMigrator + + alias Pleroma.Migrators.Support.BaseMigrator + alias Pleroma.Object + + @doc "This migration removes objects created exclusively for contexts, containing only an `id` field." + + @impl BaseMigrator + def feature_config_path, do: [:features, :delete_context_objects] + + @impl BaseMigrator + def fault_rate_allowance, do: Config.get([:delete_context_objects, :fault_rate_allowance], 0) + + @impl BaseMigrator + def perform do + data_migration_id = data_migration_id() + max_processed_id = get_stat(:max_processed_id, 0) + + Logger.info("Deleting context objects from `objects` (from oid: #{max_processed_id})...") + + query() + |> where([object], object.id > ^max_processed_id) + |> Repo.chunk_stream(100, :batches, timeout: :infinity) + |> Stream.each(fn objects -> + object_ids = Enum.map(objects, & &1.id) + + results = Enum.map(object_ids, &delete_context_object(&1)) + + failed_ids = + results + |> Enum.filter(&(elem(&1, 0) == :error)) + |> Enum.map(&elem(&1, 1)) + + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + + for failed_id <- failed_ids do + _ = + Repo.query( + "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> + "VALUES ($1, $2) ON CONFLICT DO NOTHING;", + [data_migration_id, failed_id] + ) + end + + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration_id, object_ids -- failed_ids] + ) + + max_object_id = Enum.at(object_ids, -1) + + put_stat(:max_processed_id, max_object_id) + increment_stat(:iteration_processed_count, length(object_ids)) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + increment_stat(:affected_count, chunk_affected_count) + put_stat(:records_per_second, records_per_second()) + persist_state() + + # A quick and dirty approach to controlling the load this background migration imposes + sleep_interval = Config.get([:delete_context_objects, :sleep_interval_ms], 0) + Process.sleep(sleep_interval) + end) + |> Stream.run() + end + + @impl BaseMigrator + def query do + # Context objects have no activity type, and only one field, `id`. + # Only those context objects are without types. + from( + object in Object, + where: fragment("(?)->'type' IS NULL", object.data), + select: %{ + id: object.id + } + ) + end + + @spec delete_context_object(integer()) :: {:ok | :error, integer()} + defp delete_context_object(id) do + result = + %Object{id: id} + |> Repo.delete() + |> elem(0) + + {result, id} + end + + @impl BaseMigrator + def retry_failed do + data_migration_id = data_migration_id() + + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {res, _} when res != :error <- delete_context_object(object.id) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration_id, object.id] + ) + end + end) + |> Stream.run() + + put_stat(:failed_count, failures_count()) + persist_state() + + force_continue() + end + + defp failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) + |> order_by([o], asc: o.id) + end +end diff --git a/priv/repo/migrations/20220807125023_data_migration_delete_context_objects.exs b/priv/repo/migrations/20220807125023_data_migration_delete_context_objects.exs index debb474b2b..84365dbe33 100644 --- a/priv/repo/migrations/20220807125023_data_migration_delete_context_objects.exs +++ b/priv/repo/migrations/20220807125023_data_migration_delete_context_objects.exs @@ -3,13 +3,16 @@ defmodule Pleroma.Repo.Migrations.DataMigrationDeleteContextObjects do require Logger - @doc "This migration removes objects created exclusively for contexts, containing only an `id` field." + def up do + dt = NaiveDateTime.utc_now() - def change do - Logger.warn( - "This migration can take a very long time to execute, depending on your database size. Please be patient, Pleroma-tan is doing her best!\n" + execute( + "INSERT INTO data_migrations(name, inserted_at, updated_at) " <> + "VALUES ('delete_context_objects', '#{dt}', '#{dt}') ON CONFLICT DO NOTHING;" ) + end - execute("DELETE FROM objects WHERE (data->>'type') IS NULL;") + def down do + execute("DELETE FROM data_migrations WHERE name = 'delete_context_objects';") end end