Merge branch 'fix-imports' into 'develop'

Refactor Oban jobs for imports

See merge request pleroma/pleroma!4234
This commit is contained in:
feld 2024-08-22 18:10:47 +00:00
commit 0f3920f791
6 changed files with 166 additions and 108 deletions

View file

@ -0,0 +1 @@
Imports of blocks, mutes, and follows would retry repeatedly due to incorrect error handling and all work executed in a single job

View file

@ -5,6 +5,7 @@
defmodule Pleroma.User.Import do
use Ecto.Schema
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.BackgroundWorker
@ -12,80 +13,99 @@ defmodule Pleroma.User.Import do
require Logger
@spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()}
def perform(:mutes_import, %User{} = user, [_ | _] = identifiers) do
Enum.map(
identifiers,
fn identifier ->
with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier),
{:ok, _} <- User.mute(user, muted_user) do
muted_user
else
error -> handle_error(:mutes_import, identifier, error)
end
end
)
def perform(:mute_import, %User{} = user, actor) do
with {:ok, %User{} = muted_user} <- User.get_or_fetch(actor),
{_, false} <- {:existing_mute, User.mutes_user?(user, muted_user)},
{:ok, _} <- User.mute(user, muted_user) do
{:ok, muted_user}
else
{:existing_mute, true} -> :ok
error -> handle_error(:mutes_import, actor, error)
end
end
def perform(:blocks_import, %User{} = blocker, [_ | _] = identifiers) do
Enum.map(
identifiers,
fn identifier ->
with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier),
{:ok, _block} <- CommonAPI.block(blocked, blocker) do
blocked
else
error -> handle_error(:blocks_import, identifier, error)
end
end
)
def perform(:block_import, %User{} = user, actor) do
with {:ok, %User{} = blocked} <- User.get_or_fetch(actor),
{_, false} <- {:existing_block, User.blocks_user?(user, blocked)},
{:ok, _block} <- CommonAPI.block(blocked, user) do
{:ok, blocked}
else
{:existing_block, true} -> :ok
error -> handle_error(:blocks_import, actor, error)
end
end
def perform(:follow_import, %User{} = follower, [_ | _] = identifiers) do
Enum.map(
identifiers,
fn identifier ->
with {:ok, %User{} = followed} <- User.get_or_fetch(identifier),
{:ok, follower, followed} <- User.maybe_direct_follow(follower, followed),
{:ok, _, _, _} <- CommonAPI.follow(followed, follower) do
followed
else
error -> handle_error(:follow_import, identifier, error)
end
end
)
def perform(:follow_import, %User{} = user, actor) do
with {:ok, %User{} = followed} <- User.get_or_fetch(actor),
{_, false} <- {:existing_follow, User.following?(user, followed)},
{:ok, user, followed} <- User.maybe_direct_follow(user, followed),
{:ok, _, _, _} <- CommonAPI.follow(followed, user) do
{:ok, followed}
else
{:existing_follow, true} -> :ok
error -> handle_error(:follow_import, actor, error)
end
end
def perform(_, _, _), do: :ok
defp handle_error(op, user_id, error) do
Logger.debug("#{op} failed for #{user_id} with: #{inspect(error)}")
error
end
def blocks_import(%User{} = blocker, [_ | _] = identifiers) do
BackgroundWorker.new(%{
"op" => "blocks_import",
"user_id" => blocker.id,
"identifiers" => identifiers
})
|> Oban.insert()
def blocks_import(%User{} = user, [_ | _] = actors) do
jobs =
Repo.checkout(fn ->
Enum.reduce(actors, [], fn actor, acc ->
{:ok, job} =
BackgroundWorker.new(%{
"op" => "block_import",
"user_id" => user.id,
"actor" => actor
})
|> Oban.insert()
acc ++ [job]
end)
end)
{:ok, jobs}
end
def follow_import(%User{} = follower, [_ | _] = identifiers) do
BackgroundWorker.new(%{
"op" => "follow_import",
"user_id" => follower.id,
"identifiers" => identifiers
})
|> Oban.insert()
def follows_import(%User{} = user, [_ | _] = actors) do
jobs =
Repo.checkout(fn ->
Enum.reduce(actors, [], fn actor, acc ->
{:ok, job} =
BackgroundWorker.new(%{
"op" => "follow_import",
"user_id" => user.id,
"actor" => actor
})
|> Oban.insert()
acc ++ [job]
end)
end)
{:ok, jobs}
end
def mutes_import(%User{} = user, [_ | _] = identifiers) do
BackgroundWorker.new(%{
"op" => "mutes_import",
"user_id" => user.id,
"identifiers" => identifiers
})
|> Oban.insert()
def mutes_import(%User{} = user, [_ | _] = actors) do
jobs =
Repo.checkout(fn ->
Enum.reduce(actors, [], fn actor, acc ->
{:ok, job} =
BackgroundWorker.new(%{
"op" => "mute_import",
"user_id" => user.id,
"actor" => actor
})
|> Oban.insert()
acc ++ [job]
end)
end)
{:ok, jobs}
end
end

View file

@ -38,8 +38,8 @@ def do_follow(%{assigns: %{user: follower}} = conn, list) do
|> Enum.map(&(&1 |> String.trim() |> String.trim_leading("@")))
|> Enum.reject(&(&1 == ""))
User.Import.follow_import(follower, identifiers)
json(conn, "job started")
User.Import.follows_import(follower, identifiers)
json(conn, "jobs started")
end
def blocks(
@ -55,7 +55,7 @@ def blocks(%{private: %{open_api_spex: %{body_params: %{list: list}}}} = conn, _
defp do_block(%{assigns: %{user: blocker}} = conn, list) do
User.Import.blocks_import(blocker, prepare_user_identifiers(list))
json(conn, "job started")
json(conn, "jobs started")
end
def mutes(
@ -71,7 +71,7 @@ def mutes(%{private: %{open_api_spex: %{body_params: %{list: list}}}} = conn, _)
defp do_mute(%{assigns: %{user: user}} = conn, list) do
User.Import.mutes_import(user, prepare_user_identifiers(list))
json(conn, "job started")
json(conn, "jobs started")
end
defp prepare_user_identifiers(list) do

View file

@ -19,10 +19,10 @@ def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}})
User.perform(:force_password_reset, user)
end
def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}})
when op in ["blocks_import", "follow_import", "mutes_import"] do
def perform(%Job{args: %{"op" => op, "user_id" => user_id, "actor" => actor}})
when op in ["block_import", "follow_import", "mute_import"] do
user = User.get_cached_by_id(user_id)
{:ok, User.Import.perform(String.to_existing_atom(op), user, identifiers)}
User.Import.perform(String.to_existing_atom(op), user, actor)
end
def perform(%Job{

View file

@ -25,11 +25,12 @@ test "it imports user followings from list" do
user3.nickname
]
{:ok, job} = User.Import.follow_import(user1, identifiers)
{:ok, jobs} = User.Import.follows_import(user1, identifiers)
for job <- jobs do
assert {:ok, %User{}} = ObanHelpers.perform(job)
end
assert {:ok, result} = ObanHelpers.perform(job)
assert is_list(result)
assert result == [refresh_record(user2), refresh_record(user3)]
assert User.following?(user1, user2)
assert User.following?(user1, user3)
end
@ -44,11 +45,12 @@ test "it imports user blocks from list" do
user3.nickname
]
{:ok, job} = User.Import.blocks_import(user1, identifiers)
{:ok, jobs} = User.Import.blocks_import(user1, identifiers)
for job <- jobs do
assert {:ok, %User{}} = ObanHelpers.perform(job)
end
assert {:ok, result} = ObanHelpers.perform(job)
assert is_list(result)
assert result == [user2, user3]
assert User.blocks?(user1, user2)
assert User.blocks?(user1, user3)
end
@ -63,11 +65,12 @@ test "it imports user mutes from list" do
user3.nickname
]
{:ok, job} = User.Import.mutes_import(user1, identifiers)
{:ok, jobs} = User.Import.mutes_import(user1, identifiers)
for job <- jobs do
assert {:ok, %User{}} = ObanHelpers.perform(job)
end
assert {:ok, result} = ObanHelpers.perform(job)
assert is_list(result)
assert result == [user2, user3]
assert User.mutes?(user1, user2)
assert User.mutes?(user1, user3)
end

View file

@ -22,7 +22,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
test "it returns HTTP 200", %{conn: conn} do
user2 = insert(:user)
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/follow_import", %{"list" => "#{user2.ap_id}"})
@ -38,7 +38,7 @@ test "it imports follow lists from file", %{conn: conn} do
"Account address,Show boosts\n#{user2.ap_id},true"
end}
]) do
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/follow_import", %{
@ -46,9 +46,9 @@ test "it imports follow lists from file", %{conn: conn} do
})
|> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == [refresh_record(user2)]
assert [%Pleroma.User{follower_count: 1}] = job_result
assert [{:ok, updated_user}] = ObanHelpers.perform_all()
assert updated_user.id == user2.id
assert updated_user.follower_count == 1
end
end
@ -63,7 +63,7 @@ test "it imports new-style mastodon follow lists", %{conn: conn} do
})
|> json_response_and_validate_schema(200)
assert response == "job started"
assert response == "jobs started"
end
test "requires 'follow' or 'write:follows' permissions" do
@ -102,14 +102,20 @@ test "it imports follows with different nickname variations", %{conn: conn} do
]
|> Enum.join("\n")
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/follow_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == Enum.map(users, &refresh_record/1)
results = ObanHelpers.perform_all()
returned_users =
for {_, returned_user} <- results do
returned_user
end
assert returned_users == Enum.map(users, &refresh_record/1)
end
end
@ -120,7 +126,7 @@ test "it imports follows with different nickname variations", %{conn: conn} do
test "it returns HTTP 200", %{conn: conn} do
user2 = insert(:user)
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/blocks_import", %{"list" => "#{user2.ap_id}"})
@ -133,7 +139,7 @@ test "it imports blocks users from file", %{conn: conn} do
with_mocks([
{File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
]) do
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/blocks_import", %{
@ -141,8 +147,14 @@ test "it imports blocks users from file", %{conn: conn} do
})
|> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == users
results = ObanHelpers.perform_all()
returned_users =
for {_, returned_user} <- results do
returned_user
end
assert returned_users == users
end
end
@ -159,14 +171,25 @@ test "it imports blocks with different nickname variations", %{conn: conn} do
]
|> Enum.join(" ")
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/blocks_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == users
results = ObanHelpers.perform_all()
returned_user_ids =
for {_, user} <- results do
user.id
end
original_user_ids =
for user <- users do
user.id
end
assert match?(^original_user_ids, returned_user_ids)
end
end
@ -177,24 +200,25 @@ test "it imports blocks with different nickname variations", %{conn: conn} do
test "it returns HTTP 200", %{user: user, conn: conn} do
user2 = insert(:user)
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/mutes_import", %{"list" => "#{user2.ap_id}"})
|> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == [user2]
[{:ok, result_user}] = ObanHelpers.perform_all()
assert result_user == refresh_record(user2)
assert Pleroma.User.mutes?(user, user2)
end
test "it imports mutes users from file", %{user: user, conn: conn} do
users = [user2, user3] = insert_list(2, :user)
[user2, user3] = insert_list(2, :user)
with_mocks([
{File, [], read!: fn "mutes_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
]) do
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/mutes_import", %{
@ -202,14 +226,19 @@ test "it imports mutes users from file", %{user: user, conn: conn} do
})
|> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == users
assert Enum.all?(users, &Pleroma.User.mutes?(user, &1))
results = ObanHelpers.perform_all()
returned_users =
for {_, returned_user} <- results do
returned_user
end
assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1))
end
end
test "it imports mutes with different nickname variations", %{user: user, conn: conn} do
users = [user2, user3, user4, user5, user6] = insert_list(5, :user)
[user2, user3, user4, user5, user6] = insert_list(5, :user)
identifiers =
[
@ -221,15 +250,20 @@ test "it imports mutes with different nickname variations", %{user: user, conn:
]
|> Enum.join(" ")
assert "job started" ==
assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/mutes_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == users
assert Enum.all?(users, &Pleroma.User.mutes?(user, &1))
results = ObanHelpers.perform_all()
returned_users =
for {_, returned_user} <- results do
returned_user
end
assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1))
end
end
end