From a9aa810d3dadaac5a40d18f56ab41b6276206db1 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 22 Aug 2024 12:49:32 -0400 Subject: [PATCH] Change imports to generate an Oban job per each task --- changelog.d/user-imports.fix | 2 +- lib/pleroma/user/import.ex | 144 ++++++++++-------- .../controllers/user_import_controller.ex | 8 +- lib/pleroma/workers/background_worker.ex | 6 +- test/pleroma/user/import_test.exs | 27 ++-- .../user_import_controller_test.exs | 92 +++++++---- 6 files changed, 170 insertions(+), 109 deletions(-) diff --git a/changelog.d/user-imports.fix b/changelog.d/user-imports.fix index 9f39dfedad..0076c73d77 100644 --- a/changelog.d/user-imports.fix +++ b/changelog.d/user-imports.fix @@ -1 +1 @@ -Imports of blocks, mutes, and following would retry until Oban runs out of attempts due to incorrect return value being considered an error. +Imports of blocks, mutes, and follows would retry repeatedly due to incorrect error handling and all work executed in a single job diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex index bee5862343..400e621535 100644 --- a/lib/pleroma/user/import.ex +++ b/lib/pleroma/user/import.ex @@ -5,6 +5,7 @@ defmodule Pleroma.User.Import do use Ecto.Schema + alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.CommonAPI alias Pleroma.Workers.BackgroundWorker @@ -12,80 +13,103 @@ defmodule Pleroma.User.Import do require Logger @spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()} - def perform(:mutes_import, %User{} = user, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier), - {:ok, _} <- User.mute(user, muted_user) do - {:ok, muted_user} - else - error -> handle_error(:mutes_import, identifier, error) - end - end - ) + def perform(:mute_import, %User{} = user, actor) do + with {:ok, %User{} = muted_user} <- User.get_or_fetch(actor), + {_, false} <- {:existing_mute, User.mutes_user?(user, muted_user)}, + {:ok, _} <- User.mute(user, muted_user), + # User.mute/2 returns a FollowingRelationship not a %User{} like we get + # from CommonAPI.block/2 or CommonAPI.follow/2, so we fetch again to + # return the target actor for consistency + {:ok, muted_user} <- User.get_or_fetch(actor) do + {:ok, muted_user} + else + {:existing_mute, true} -> :ok + error -> handle_error(:mutes_import, actor, error) + end end - def perform(:blocks_import, %User{} = blocker, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier), - {:ok, _block} <- CommonAPI.block(blocked, blocker) do - {:ok, blocked} - else - error -> handle_error(:blocks_import, identifier, error) - end - end - ) + def perform(:block_import, %User{} = user, actor) do + with {:ok, %User{} = blocked} <- User.get_or_fetch(actor), + {_, false} <- {:existing_block, User.blocks_user?(user, blocked)}, + {:ok, _block} <- CommonAPI.block(blocked, user) do + {:ok, blocked} + else + {:existing_block, true} -> :ok + error -> handle_error(:blocks_import, actor, error) + end end - def perform(:follow_import, %User{} = follower, [_ | _] = identifiers) do - Enum.map( - identifiers, - fn identifier -> - with {:ok, %User{} = followed} <- User.get_or_fetch(identifier), - {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed), - {:ok, _, _, _} <- CommonAPI.follow(followed, follower) do - {:ok, followed} - else - error -> handle_error(:follow_import, identifier, error) - end - end - ) + def perform(:follow_import, %User{} = user, actor) do + with {:ok, %User{} = followed} <- User.get_or_fetch(actor), + {_, false} <- {:existing_follow, User.following?(user, followed)}, + {:ok, user, followed} <- User.maybe_direct_follow(user, followed), + {:ok, _, _, _} <- CommonAPI.follow(followed, user) do + {:ok, followed} + else + {:existing_follow, true} -> :ok + error -> handle_error(:follow_import, actor, error) + end end - def perform(_, _, _), do: :ok - defp handle_error(op, user_id, error) do Logger.debug("#{op} failed for #{user_id} with: #{inspect(error)}") error end - def blocks_import(%User{} = blocker, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "blocks_import", - "user_id" => blocker.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def blocks_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "block_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end - def follow_import(%User{} = follower, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "follow_import", - "user_id" => follower.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def follows_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "follow_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end - def mutes_import(%User{} = user, [_ | _] = identifiers) do - BackgroundWorker.new(%{ - "op" => "mutes_import", - "user_id" => user.id, - "identifiers" => identifiers - }) - |> Oban.insert() + def mutes_import(%User{} = user, [_ | _] = actors) do + jobs = + Repo.checkout(fn -> + Enum.reduce(actors, [], fn actor, acc -> + {:ok, job} = + BackgroundWorker.new(%{ + "op" => "mute_import", + "user_id" => user.id, + "actor" => actor + }) + |> Oban.insert() + + acc ++ [job] + end) + end) + + {:ok, jobs} end end diff --git a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex index 96466f1921..d65c30dab5 100644 --- a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex @@ -38,8 +38,8 @@ def do_follow(%{assigns: %{user: follower}} = conn, list) do |> Enum.map(&(&1 |> String.trim() |> String.trim_leading("@"))) |> Enum.reject(&(&1 == "")) - User.Import.follow_import(follower, identifiers) - json(conn, "job started") + User.Import.follows_import(follower, identifiers) + json(conn, "jobs started") end def blocks( @@ -55,7 +55,7 @@ def blocks(%{private: %{open_api_spex: %{body_params: %{list: list}}}} = conn, _ defp do_block(%{assigns: %{user: blocker}} = conn, list) do User.Import.blocks_import(blocker, prepare_user_identifiers(list)) - json(conn, "job started") + json(conn, "jobs started") end def mutes( @@ -71,7 +71,7 @@ def mutes(%{private: %{open_api_spex: %{body_params: %{list: list}}}} = conn, _) defp do_mute(%{assigns: %{user: user}} = conn, list) do User.Import.mutes_import(user, prepare_user_identifiers(list)) - json(conn, "job started") + json(conn, "jobs started") end defp prepare_user_identifiers(list) do diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 60da2d5ca7..4737c6ea24 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -19,10 +19,10 @@ def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}}) User.perform(:force_password_reset, user) end - def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}}) - when op in ["blocks_import", "follow_import", "mutes_import"] do + def perform(%Job{args: %{"op" => op, "user_id" => user_id, "actor" => actor}}) + when op in ["block_import", "follow_import", "mute_import"] do user = User.get_cached_by_id(user_id) - {:ok, User.Import.perform(String.to_existing_atom(op), user, identifiers)} + User.Import.perform(String.to_existing_atom(op), user, actor) end def perform(%Job{ diff --git a/test/pleroma/user/import_test.exs b/test/pleroma/user/import_test.exs index 54c521698f..1d6469a4f6 100644 --- a/test/pleroma/user/import_test.exs +++ b/test/pleroma/user/import_test.exs @@ -25,11 +25,12 @@ test "it imports user followings from list" do user3.nickname ] - {:ok, job} = User.Import.follow_import(user1, identifiers) + {:ok, jobs} = User.Import.follows_import(user1, identifiers) + + for job <- jobs do + assert {:ok, %User{}} = ObanHelpers.perform(job) + end - assert {:ok, result} = ObanHelpers.perform(job) - assert is_list(result) - assert result == [{:ok, refresh_record(user2)}, {:ok, refresh_record(user3)}] assert User.following?(user1, user2) assert User.following?(user1, user3) end @@ -44,11 +45,12 @@ test "it imports user blocks from list" do user3.nickname ] - {:ok, job} = User.Import.blocks_import(user1, identifiers) + {:ok, jobs} = User.Import.blocks_import(user1, identifiers) + + for job <- jobs do + assert {:ok, %User{}} = ObanHelpers.perform(job) + end - assert {:ok, result} = ObanHelpers.perform(job) - assert is_list(result) - assert result == [{:ok, user2}, {:ok, user3}] assert User.blocks?(user1, user2) assert User.blocks?(user1, user3) end @@ -63,11 +65,12 @@ test "it imports user mutes from list" do user3.nickname ] - {:ok, job} = User.Import.mutes_import(user1, identifiers) + {:ok, jobs} = User.Import.mutes_import(user1, identifiers) + + for job <- jobs do + assert {:ok, %User{}} = ObanHelpers.perform(job) + end - assert {:ok, result} = ObanHelpers.perform(job) - assert is_list(result) - assert result == [{:ok, user2}, {:ok, user3}] assert User.mutes?(user1, user2) assert User.mutes?(user1, user3) end diff --git a/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs b/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs index 52a62e4166..efdc743e3e 100644 --- a/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs +++ b/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs @@ -22,7 +22,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do test "it returns HTTP 200", %{conn: conn} do user2 = insert(:user) - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/follow_import", %{"list" => "#{user2.ap_id}"}) @@ -38,7 +38,7 @@ test "it imports follow lists from file", %{conn: conn} do "Account address,Show boosts\n#{user2.ap_id},true" end} ]) do - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/follow_import", %{ @@ -46,9 +46,9 @@ test "it imports follow lists from file", %{conn: conn} do }) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == [refresh_record(user2)] - assert [%Pleroma.User{follower_count: 1}] = job_result + assert [{:ok, updated_user}] = ObanHelpers.perform_all() + assert updated_user.id == user2.id + assert updated_user.follower_count == 1 end end @@ -63,7 +63,7 @@ test "it imports new-style mastodon follow lists", %{conn: conn} do }) |> json_response_and_validate_schema(200) - assert response == "job started" + assert response == "jobs started" end test "requires 'follow' or 'write:follows' permissions" do @@ -102,14 +102,20 @@ test "it imports follows with different nickname variations", %{conn: conn} do ] |> Enum.join("\n") - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/follow_import", %{"list" => identifiers}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == Enum.map(users, &refresh_record/1) + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert returned_users == Enum.map(users, &refresh_record/1) end end @@ -120,7 +126,7 @@ test "it imports follows with different nickname variations", %{conn: conn} do test "it returns HTTP 200", %{conn: conn} do user2 = insert(:user) - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/blocks_import", %{"list" => "#{user2.ap_id}"}) @@ -133,7 +139,7 @@ test "it imports blocks users from file", %{conn: conn} do with_mocks([ {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end} ]) do - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/blocks_import", %{ @@ -141,8 +147,14 @@ test "it imports blocks users from file", %{conn: conn} do }) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert returned_users == users end end @@ -159,14 +171,25 @@ test "it imports blocks with different nickname variations", %{conn: conn} do ] |> Enum.join(" ") - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/blocks_import", %{"list" => identifiers}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users + results = ObanHelpers.perform_all() + + returned_user_ids = + for {_, user} <- results do + user.id + end + + original_user_ids = + for user <- users do + user.id + end + + assert match?(^original_user_ids, returned_user_ids) end end @@ -177,24 +200,25 @@ test "it imports blocks with different nickname variations", %{conn: conn} do test "it returns HTTP 200", %{user: user, conn: conn} do user2 = insert(:user) - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/mutes_import", %{"list" => "#{user2.ap_id}"}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == [user2] + [{:ok, result_user}] = ObanHelpers.perform_all() + + assert result_user == refresh_record(user2) assert Pleroma.User.mutes?(user, user2) end test "it imports mutes users from file", %{user: user, conn: conn} do - users = [user2, user3] = insert_list(2, :user) + [user2, user3] = insert_list(2, :user) with_mocks([ {File, [], read!: fn "mutes_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end} ]) do - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/mutes_import", %{ @@ -202,14 +226,19 @@ test "it imports mutes users from file", %{user: user, conn: conn} do }) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users - assert Enum.all?(users, &Pleroma.User.mutes?(user, &1)) + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1)) end end test "it imports mutes with different nickname variations", %{user: user, conn: conn} do - users = [user2, user3, user4, user5, user6] = insert_list(5, :user) + [user2, user3, user4, user5, user6] = insert_list(5, :user) identifiers = [ @@ -221,15 +250,20 @@ test "it imports mutes with different nickname variations", %{user: user, conn: ] |> Enum.join(" ") - assert "job started" == + assert "jobs started" == conn |> put_req_header("content-type", "application/json") |> post("/api/pleroma/mutes_import", %{"list" => identifiers}) |> json_response_and_validate_schema(200) - assert [{:ok, job_result}] = ObanHelpers.perform_all() - assert job_result == users - assert Enum.all?(users, &Pleroma.User.mutes?(user, &1)) + results = ObanHelpers.perform_all() + + returned_users = + for {_, returned_user} <- results do + returned_user + end + + assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1)) end end end