From 382426e0338d7918cd2db7c72ede446a2a8f7f4f Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 12:41:06 -0400 Subject: [PATCH 01/19] Remove Object.get_by_id_and_maybe_refetch/2 This was only used for poll refreshing and is not a good approach to the problem. --- lib/pleroma/object.ex | 21 --- .../controllers/poll_controller.ex | 2 +- test/pleroma/object_test.exs | 144 ------------------ 3 files changed, 1 insertion(+), 166 deletions(-) diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index 748f18e6cd..77dfda8510 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -99,27 +99,6 @@ defp hashtags_changed?(_, _), do: false def get_by_id(nil), do: nil def get_by_id(id), do: Repo.get(Object, id) - @spec get_by_id_and_maybe_refetch(integer(), list()) :: Object.t() | nil - def get_by_id_and_maybe_refetch(id, opts \\ []) do - with %Object{updated_at: updated_at} = object <- get_by_id(id) do - if opts[:interval] && - NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do - case Fetcher.refetch_object(object) do - {:ok, %Object{} = object} -> - object - - e -> - Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}") - object - end - else - object - end - else - nil -> nil - end - end - def get_by_ap_id(nil), do: nil def get_by_ap_id(ap_id) do diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex index a2af8148ca..303b995f66 100644 --- a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex @@ -30,7 +30,7 @@ defmodule Pleroma.Web.MastodonAPI.PollController do @doc "GET /api/v1/polls/:id" def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id}}}} = conn, _) do - with %Object{} = object <- Object.get_by_id_and_maybe_refetch(id, interval: 60), + with %Object{} = object <- Object.get_by_id(id), %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), true <- Visibility.visible_for_user?(activity, user) do try_render(conn, "show.json", %{object: object, for: user}) diff --git a/test/pleroma/object_test.exs b/test/pleroma/object_test.exs index 48d4d86ebd..b3c528e32f 100644 --- a/test/pleroma/object_test.exs +++ b/test/pleroma/object_test.exs @@ -6,12 +6,10 @@ defmodule Pleroma.ObjectTest do use Pleroma.DataCase use Oban.Testing, repo: Pleroma.Repo - import ExUnit.CaptureLog import Mox import Pleroma.Factory import Tesla.Mock - alias Pleroma.Activity alias Pleroma.Hashtag alias Pleroma.Object alias Pleroma.Repo @@ -282,148 +280,6 @@ test "does not fetch unknown objects when fetch is false" do end end - describe "get_by_id_and_maybe_refetch" do - setup do - mock(fn - %{method: :get, url: "https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"} -> - %Tesla.Env{ - status: 200, - body: File.read!("test/fixtures/tesla_mock/poll_original.json"), - headers: HttpRequestMock.activitypub_object_headers() - } - - env -> - apply(HttpRequestMock, :request, [env]) - end) - - mock_modified = fn resp -> - mock(fn - %{method: :get, url: "https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"} -> - resp - - env -> - apply(HttpRequestMock, :request, [env]) - end) - end - - on_exit(fn -> mock(fn env -> apply(HttpRequestMock, :request, [env]) end) end) - - [mock_modified: mock_modified] - end - - test "refetches if the time since the last refetch is greater than the interval", %{ - mock_modified: mock_modified - } do - %Object{} = - object = - Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d", - fetch: true - ) - - Object.set_cache(object) - - assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4 - assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0 - - mock_modified.(%Tesla.Env{ - status: 200, - body: File.read!("test/fixtures/tesla_mock/poll_modified.json"), - headers: HttpRequestMock.activitypub_object_headers() - }) - - updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1) - object_in_cache = Object.get_cached_by_ap_id(object.data["id"]) - assert updated_object == object_in_cache - assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 8 - assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 3 - end - - test "returns the old object if refetch fails", %{mock_modified: mock_modified} do - %Object{} = - object = - Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d", - fetch: true - ) - - Object.set_cache(object) - - assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4 - assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0 - - assert capture_log(fn -> - mock_modified.(%Tesla.Env{status: 404, body: ""}) - - updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1) - object_in_cache = Object.get_cached_by_ap_id(object.data["id"]) - assert updated_object == object_in_cache - assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 4 - assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 0 - end) =~ - "[error] Couldn't refresh https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d" - end - - test "does not refetch if the time since the last refetch is greater than the interval", %{ - mock_modified: mock_modified - } do - %Object{} = - object = - Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d", - fetch: true - ) - - Object.set_cache(object) - - assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4 - assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0 - - mock_modified.(%Tesla.Env{ - status: 200, - body: File.read!("test/fixtures/tesla_mock/poll_modified.json"), - headers: HttpRequestMock.activitypub_object_headers() - }) - - updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: 100) - object_in_cache = Object.get_cached_by_ap_id(object.data["id"]) - assert updated_object == object_in_cache - assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 4 - assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 0 - end - - test "preserves internal fields on refetch", %{mock_modified: mock_modified} do - %Object{} = - object = - Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d", - fetch: true - ) - - Object.set_cache(object) - - assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4 - assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0 - - user = insert(:user) - activity = Activity.get_create_by_object_ap_id(object.data["id"]) - {:ok, activity} = CommonAPI.favorite(activity.id, user) - object = Object.get_by_ap_id(activity.data["object"]) - - assert object.data["like_count"] == 1 - - mock_modified.(%Tesla.Env{ - status: 200, - body: File.read!("test/fixtures/tesla_mock/poll_modified.json"), - headers: HttpRequestMock.activitypub_object_headers() - }) - - updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1) - object_in_cache = Object.get_cached_by_ap_id(object.data["id"]) - assert updated_object == object_in_cache - assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 8 - assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 3 - - assert updated_object.data["like_count"] == 1 - end - end - describe ":hashtags association" do test "Hashtag records are created with Object record and updated on its change" do user = insert(:user) From 2380ae6dcc267d7d6ff81a55ae95eed718176563 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 13:38:13 -0400 Subject: [PATCH 02/19] Validate an Oban job is inserted for poll refreshes --- .../web/mastodon_api/controllers/poll_controller_test.exs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs b/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs index 7912b1d5f7..b2cceec513 100644 --- a/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs +++ b/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs @@ -3,6 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.MastodonAPI.PollControllerTest do + use Oban.Testing, repo: Pleroma.Repo use Pleroma.Web.ConnCase, async: true alias Pleroma.Object @@ -27,6 +28,11 @@ test "returns poll entity for object id", %{user: user, conn: conn} do response = json_response_and_validate_schema(conn, 200) id = to_string(object.id) assert %{"id" => ^id, "expired" => false, "multiple" => false} = response + + assert_enqueued( + worker: Pleroma.Workers.PollWorker, + args: %{"op" => "refresh", "activity_id" => activity.id} + ) end test "does not expose polls for private statuses", %{conn: conn} do From c077a14ce1343f5515fa11938df7d808f23a566c Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 13:54:56 -0400 Subject: [PATCH 03/19] Add Oban job to handle poll refreshing and stream out the update --- .../controllers/poll_controller.ex | 4 +++ lib/pleroma/workers/poll_worker.ex | 36 ++++++++++++++----- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex index 303b995f66..0d5a575184 100644 --- a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex @@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.PollController do alias Pleroma.Activity alias Pleroma.Object + alias Pleroma.Workers.PollWorker alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.CommonAPI alias Pleroma.Web.Plugs.OAuthScopesPlug @@ -33,6 +34,9 @@ def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id} with %Object{} = object <- Object.get_by_id(id), %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), true <- Visibility.visible_for_user?(activity, user) do + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert(unique: [period: 60]) + try_render(conn, "show.json", %{object: object, for: user}) else error when is_nil(error) or error == false -> diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index d263aa1b9e..0d2d67326a 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -11,14 +11,34 @@ defmodule Pleroma.Workers.PollWorker do alias Pleroma.Activity alias Pleroma.Notification alias Pleroma.Object + alias Pleroma.Object.Fetcher + + @stream_out_impl Pleroma.Config.get( + [__MODULE__, :stream_out], + Pleroma.Web.ActivityPub.ActivityPub + ) @impl true def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do - with %Activity{} = activity <- find_poll_activity(activity_id), + with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)}, {:ok, notifications} <- Notification.create_poll_notifications(activity) do Notification.stream(notifications) else - {:error, :poll_activity_not_found} = e -> {:cancel, e} + {:activity, nil} -> {:cancel, :poll_activity_not_found} + e -> {:error, e} + end + end + + def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do + with {_, %Activity{object: object}} <- + {:activity, Activity.get_by_id_with_object(activity_id)}, + {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do + stream_update(activity_id) + + :ok + else + {:activity, nil} -> {:cancel, :poll_activity_not_found} + {:refetch, _} = e -> {:cancel, e} e -> {:error, e} end end @@ -26,12 +46,6 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do @impl true def timeout(_job), do: :timer.seconds(5) - defp find_poll_activity(activity_id) do - with nil <- Activity.get_by_id(activity_id) do - {:error, :poll_activity_not_found} - end - end - def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <- Object.normalize(activity), @@ -49,4 +63,10 @@ def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = end def schedule_poll_end(activity), do: {:error, activity} + + defp stream_update(activity_id) do + Activity.get_by_id(activity_id) + |> Activity.normalize() + |> @stream_out_impl.stream_out() + end end From 4b3f604f9529c9ced23f747cb6f6d82fedfadab0 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:02:41 -0400 Subject: [PATCH 04/19] Skip refetching poll results if the object's updated_at is newer than the poll closed timestamp --- lib/pleroma/workers/poll_worker.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index 0d2d67326a..a61c5eac17 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -32,6 +32,8 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do with {_, %Activity{object: object}} <- {:activity, Activity.get_by_id_with_object(activity_id)}, + {:ok, naive_closed} <- NaiveDateTime.from_iso8601(object.data["closed"]), + {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, naive_closed)}, {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do stream_update(activity_id) @@ -39,6 +41,7 @@ def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do else {:activity, nil} -> {:cancel, :poll_activity_not_found} {:refetch, _} = e -> {:cancel, e} + {:closed_compare, _} -> {:cancel, :poll_finalized} e -> {:error, e} end end From 47ce3a4a961bd7496f8105bc957dbf958b77d342 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:17:35 -0400 Subject: [PATCH 05/19] Schedule a final poll refresh before streaming out the notifications --- lib/pleroma/workers/poll_worker.ex | 8 ++++++-- test/pleroma/workers/poll_worker_test.exs | 6 ++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index a61c5eac17..574daa9ba7 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -22,6 +22,10 @@ defmodule Pleroma.Workers.PollWorker do def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)}, {:ok, notifications} <- Notification.create_poll_notifications(activity) do + # Schedule a final refresh + __MODULE__.new(%{"op" => "refresh", "activity_id" => activity_id}) + |> Oban.insert() + Notification.stream(notifications) else {:activity, nil} -> {:cancel, :poll_activity_not_found} @@ -32,8 +36,8 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do with {_, %Activity{object: object}} <- {:activity, Activity.get_by_id_with_object(activity_id)}, - {:ok, naive_closed} <- NaiveDateTime.from_iso8601(object.data["closed"]), - {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, naive_closed)}, + {:ok, naive_closed} <- NaiveDateTime.from_iso8601(object.data["closed"]), + {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, naive_closed)}, {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do stream_update(activity_id) diff --git a/test/pleroma/workers/poll_worker_test.exs b/test/pleroma/workers/poll_worker_test.exs index 749df8affd..e1c67f0571 100644 --- a/test/pleroma/workers/poll_worker_test.exs +++ b/test/pleroma/workers/poll_worker_test.exs @@ -44,6 +44,12 @@ test "poll notification job" do # Ensure notifications were streamed out when job executes assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], :_)) assert called(Pleroma.Web.Push.send(:_)) + + # Ensure we scheduled a final refresh of the poll + assert_enqueued( + worker: PollWorker, + args: %{"op" => "refresh", "activity_id" => activity.id} + ) end end end From a2e7db43aa3636569f4d770df980347a03c957fe Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:23:04 -0400 Subject: [PATCH 06/19] Rename assignment for consistency --- lib/pleroma/workers/poll_worker.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index 574daa9ba7..f70ab48a4d 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -36,8 +36,8 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do with {_, %Activity{object: object}} <- {:activity, Activity.get_by_id_with_object(activity_id)}, - {:ok, naive_closed} <- NaiveDateTime.from_iso8601(object.data["closed"]), - {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, naive_closed)}, + {:ok, end_time} <- NaiveDateTime.from_iso8601(object.data["closed"]), + {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, end_time)}, {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do stream_update(activity_id) From 766edfe5b2b19f4819704540341b8fcc92f133bd Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:32:28 -0400 Subject: [PATCH 07/19] Test Poll refresh jobs stream out updates after refetching the object --- test/pleroma/workers/poll_worker_test.exs | 29 +++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/test/pleroma/workers/poll_worker_test.exs b/test/pleroma/workers/poll_worker_test.exs index e1c67f0571..56a338bac3 100644 --- a/test/pleroma/workers/poll_worker_test.exs +++ b/test/pleroma/workers/poll_worker_test.exs @@ -52,4 +52,33 @@ test "poll notification job" do ) end end + + test "poll refresh job" do + user = insert(:user, local: false) + question = insert(:question, user: user) + activity = insert(:question_activity, question: question) + + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert() + + expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"} + + assert_enqueued(args: expected_job_args) + + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> nil end + ] + } + ]) do + [job] = all_enqueued(worker: PollWorker) + PollWorker.perform(job) + + # Ensure updates are streamed out + assert called(Pleroma.Web.Streamer.stream(["user", "list", "public", "public:local"], :_)) + end + end end From b2340b5b776d243f6cf12971393783cc3b7c2dc2 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:45:13 -0400 Subject: [PATCH 08/19] Permit backdating the poll closed timestamp --- test/support/factory.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/support/factory.ex b/test/support/factory.ex index 8f1c6faf97..732ea3143c 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -241,6 +241,7 @@ def tombstone_factory do def question_factory(attrs \\ %{}) do user = attrs[:user] || insert(:user) + closed = attrs[:closed] || DateTime.utc_now() |> DateTime.add(86_400) |> DateTime.to_iso8601() data = %{ "id" => Pleroma.Web.ActivityPub.Utils.generate_object_id(), @@ -251,7 +252,7 @@ def question_factory(attrs \\ %{}) do "to" => ["https://www.w3.org/ns/activitystreams#Public"], "cc" => [user.follower_address], "context" => Pleroma.Web.ActivityPub.Utils.generate_context_id(), - "closed" => DateTime.utc_now() |> DateTime.add(86_400) |> DateTime.to_iso8601(), + "closed" => closed, "content" => "Which flavor of ice cream do you prefer?", "oneOf" => [ %{ From a1b384f63c3587d0463109b74b0bbcc5c5ae82ee Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:45:41 -0400 Subject: [PATCH 09/19] Test that a poll refresh is cancelled if updated_at on the object is newer than the poll closing time --- test/pleroma/workers/poll_worker_test.exs | 61 +++++++++++++++-------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/test/pleroma/workers/poll_worker_test.exs b/test/pleroma/workers/poll_worker_test.exs index 56a338bac3..0fafcae111 100644 --- a/test/pleroma/workers/poll_worker_test.exs +++ b/test/pleroma/workers/poll_worker_test.exs @@ -53,32 +53,51 @@ test "poll notification job" do end end - test "poll refresh job" do - user = insert(:user, local: false) - question = insert(:question, user: user) - activity = insert(:question_activity, question: question) + describe "poll refresh" do + test "normal job" do + user = insert(:user, local: false) + question = insert(:question, user: user) + activity = insert(:question_activity, question: question) - PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) - |> Oban.insert() + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert() - expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"} + expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"} - assert_enqueued(args: expected_job_args) + assert_enqueued(args: expected_job_args) + + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> nil end + ] + } + ]) do + [job] = all_enqueued(worker: PollWorker) + PollWorker.perform(job) + + # Ensure updates are streamed out + assert called(Pleroma.Web.Streamer.stream(["user", "list", "public", "public:local"], :_)) + end + end + + test "when updated_at is after poll closing" do + poll_closed = DateTime.utc_now() |> DateTime.add(-86_400) |> DateTime.to_iso8601() + user = insert(:user, local: false) + question = insert(:question, user: user, closed: poll_closed) + activity = insert(:question_activity, question: question) + + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert() + + expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"} + + assert_enqueued(args: expected_job_args) - with_mocks([ - { - Pleroma.Web.Streamer, - [], - [ - stream: fn _, _ -> nil end - ] - } - ]) do [job] = all_enqueued(worker: PollWorker) - PollWorker.perform(job) - - # Ensure updates are streamed out - assert called(Pleroma.Web.Streamer.stream(["user", "list", "public", "public:local"], :_)) + assert {:cancel, :poll_finalized} = PollWorker.perform(job) end end end From 2ab4049508148756076853bae26279b698740597 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:47:30 -0400 Subject: [PATCH 10/19] Poll refreshing changelog --- changelog.d/poll-refresh.change | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/poll-refresh.change diff --git a/changelog.d/poll-refresh.change b/changelog.d/poll-refresh.change new file mode 100644 index 0000000000..b755128a12 --- /dev/null +++ b/changelog.d/poll-refresh.change @@ -0,0 +1 @@ +Poll results refreshing is handled asynchronously and will not attempt to keep fetching updates to a closed poll. From b735d9e6e19a1c64f43428e6342e3d172728c736 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 14:55:38 -0400 Subject: [PATCH 11/19] Improve assertion --- test/pleroma/workers/poll_worker_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/pleroma/workers/poll_worker_test.exs b/test/pleroma/workers/poll_worker_test.exs index 0fafcae111..c34647f1ba 100644 --- a/test/pleroma/workers/poll_worker_test.exs +++ b/test/pleroma/workers/poll_worker_test.exs @@ -97,7 +97,7 @@ test "when updated_at is after poll closing" do assert_enqueued(args: expected_job_args) [job] = all_enqueued(worker: PollWorker) - assert {:cancel, :poll_finalized} = PollWorker.perform(job) + assert {:cancel, :poll_finalized} == PollWorker.perform(job) end end end From 9ff57946e7d6fa7dabaf90457e11041ce46991c4 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 15:25:13 -0400 Subject: [PATCH 12/19] Credo --- lib/pleroma/web/mastodon_api/controllers/poll_controller.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex index 0d5a575184..f89bfa7f29 100644 --- a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex @@ -9,10 +9,10 @@ defmodule Pleroma.Web.MastodonAPI.PollController do alias Pleroma.Activity alias Pleroma.Object - alias Pleroma.Workers.PollWorker alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.CommonAPI alias Pleroma.Web.Plugs.OAuthScopesPlug + alias Pleroma.Workers.PollWorker action_fallback(Pleroma.Web.MastodonAPI.FallbackController) From 0a42a3f2eaf53fa87d934226874de5919320de26 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Wed, 2 Oct 2024 11:05:17 -0400 Subject: [PATCH 13/19] Do not attempt to schedule poll refresh jobs for local activities --- .../controllers/poll_controller.ex | 6 ++++-- .../controllers/poll_controller_test.exs | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex index f89bfa7f29..495f89278f 100644 --- a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex @@ -34,8 +34,10 @@ def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id} with %Object{} = object <- Object.get_by_id(id), %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), true <- Visibility.visible_for_user?(activity, user) do - PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) - |> Oban.insert(unique: [period: 60]) + unless activity.local do + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert(unique: [period: 60]) + end try_render(conn, "show.json", %{object: object, for: user}) else diff --git a/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs b/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs index b2cceec513..4b236678c0 100644 --- a/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs +++ b/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs @@ -29,6 +29,25 @@ test "returns poll entity for object id", %{user: user, conn: conn} do id = to_string(object.id) assert %{"id" => ^id, "expired" => false, "multiple" => false} = response + refute_enqueued( + worker: Pleroma.Workers.PollWorker, + args: %{"op" => "refresh", "activity_id" => activity.id} + ) + end + + test "does not create oban job to refresh poll if activity is local", %{conn: conn} do + user = insert(:user, local: false) + question = insert(:question, user: user) + activity = insert(:question_activity, question: question, local: false) + + # Ensure this is not represented as a local activity + refute activity.local + + object = Object.normalize(activity, fetch: false) + + get(conn, "/api/v1/polls/#{object.id}") + |> json_response_and_validate_schema(200) + assert_enqueued( worker: Pleroma.Workers.PollWorker, args: %{"op" => "refresh", "activity_id" => activity.id} From ba2ae5e40bbe98d20be083d331222a9aea8b61de Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 3 Oct 2024 10:14:02 -0400 Subject: [PATCH 14/19] Check if a refresh is permitted by comparing timestamps before attempting to insert an Oban job It's better to avoid inserting an Oban job that will just be rejected if it's not expensive to check. --- .../mastodon_api/controllers/poll_controller.ex | 17 ++++++++++++----- lib/pleroma/workers/poll_worker.ex | 3 --- .../controllers/poll_controller_test.exs | 5 ++++- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex index 495f89278f..4b347a6a7b 100644 --- a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex @@ -32,12 +32,10 @@ defmodule Pleroma.Web.MastodonAPI.PollController do @doc "GET /api/v1/polls/:id" def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id}}}} = conn, _) do with %Object{} = object <- Object.get_by_id(id), - %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), + %Activity{} = activity <- + Activity.get_create_by_object_ap_id_with_object(object.data["id"]), true <- Visibility.visible_for_user?(activity, user) do - unless activity.local do - PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) - |> Oban.insert(unique: [period: 60]) - end + maybe_refresh_poll(activity) try_render(conn, "show.json", %{object: object, for: user}) else @@ -76,4 +74,13 @@ defp get_cached_vote_or_vote(object, user, choices) do end end) end + + defp maybe_refresh_poll(%Activity{object: %Object{} = object} = activity) do + with false <- activity.local, + {:ok, end_time} <- NaiveDateTime.from_iso8601(object.data["closed"]), + {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, end_time)} do + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert(unique: [period: 60]) + end + end end diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index f70ab48a4d..bb92634c98 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -36,8 +36,6 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do with {_, %Activity{object: object}} <- {:activity, Activity.get_by_id_with_object(activity_id)}, - {:ok, end_time} <- NaiveDateTime.from_iso8601(object.data["closed"]), - {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, end_time)}, {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do stream_update(activity_id) @@ -45,7 +43,6 @@ def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do else {:activity, nil} -> {:cancel, :poll_activity_not_found} {:refetch, _} = e -> {:cancel, e} - {:closed_compare, _} -> {:cancel, :poll_finalized} e -> {:error, e} end end diff --git a/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs b/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs index 4b236678c0..51af877424 100644 --- a/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs +++ b/test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs @@ -29,13 +29,16 @@ test "returns poll entity for object id", %{user: user, conn: conn} do id = to_string(object.id) assert %{"id" => ^id, "expired" => false, "multiple" => false} = response + # Local activities should not generate an Oban job to refresh + assert activity.local + refute_enqueued( worker: Pleroma.Workers.PollWorker, args: %{"op" => "refresh", "activity_id" => activity.id} ) end - test "does not create oban job to refresh poll if activity is local", %{conn: conn} do + test "creates an oban job to refresh poll if activity is remote", %{conn: conn} do user = insert(:user, local: false) question = insert(:question, user: user) activity = insert(:question_activity, question: question, local: false) From fa8de790dfbdb2cc7de212be4ecdd2823048ba8f Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 3 Oct 2024 10:19:10 -0400 Subject: [PATCH 15/19] Remove test superceded by logic change We will not be inserting jobs that should be skipped due to updated_at --- test/pleroma/workers/poll_worker_test.exs | 61 ++++++++--------------- 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/test/pleroma/workers/poll_worker_test.exs b/test/pleroma/workers/poll_worker_test.exs index c34647f1ba..70eb7c4226 100644 --- a/test/pleroma/workers/poll_worker_test.exs +++ b/test/pleroma/workers/poll_worker_test.exs @@ -53,51 +53,32 @@ test "poll notification job" do end end - describe "poll refresh" do - test "normal job" do - user = insert(:user, local: false) - question = insert(:question, user: user) - activity = insert(:question_activity, question: question) + test "poll refresh" do + user = insert(:user, local: false) + question = insert(:question, user: user) + activity = insert(:question_activity, question: question) - PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) - |> Oban.insert() + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert() - expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"} + expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"} - assert_enqueued(args: expected_job_args) - - with_mocks([ - { - Pleroma.Web.Streamer, - [], - [ - stream: fn _, _ -> nil end - ] - } - ]) do - [job] = all_enqueued(worker: PollWorker) - PollWorker.perform(job) - - # Ensure updates are streamed out - assert called(Pleroma.Web.Streamer.stream(["user", "list", "public", "public:local"], :_)) - end - end - - test "when updated_at is after poll closing" do - poll_closed = DateTime.utc_now() |> DateTime.add(-86_400) |> DateTime.to_iso8601() - user = insert(:user, local: false) - question = insert(:question, user: user, closed: poll_closed) - activity = insert(:question_activity, question: question) - - PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) - |> Oban.insert() - - expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"} - - assert_enqueued(args: expected_job_args) + assert_enqueued(args: expected_job_args) + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> nil end + ] + } + ]) do [job] = all_enqueued(worker: PollWorker) - assert {:cancel, :poll_finalized} == PollWorker.perform(job) + PollWorker.perform(job) + + # Ensure updates are streamed out + assert called(Pleroma.Web.Streamer.stream(["user", "list", "public", "public:local"], :_)) end end end From b854e3836fd22a2589a6a6b97478998675d72048 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 3 Oct 2024 10:30:32 -0400 Subject: [PATCH 16/19] Remove pattern that can never match --- lib/pleroma/workers/poll_worker.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index bb92634c98..7d69bea548 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -43,7 +43,6 @@ def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do else {:activity, nil} -> {:cancel, :poll_activity_not_found} {:refetch, _} = e -> {:cancel, e} - e -> {:error, e} end end From a3038aa6a2189ced1e5c394a4e6e8be76f2644d0 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 3 Oct 2024 11:01:33 -0400 Subject: [PATCH 17/19] Increase poll refresh interval to 120 seconds --- lib/pleroma/web/mastodon_api/controllers/poll_controller.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex index 4b347a6a7b..6526457df3 100644 --- a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex @@ -28,6 +28,7 @@ defmodule Pleroma.Web.MastodonAPI.PollController do defdelegate open_api_operation(action), to: Pleroma.Web.ApiSpec.PollOperation @cachex Pleroma.Config.get([:cachex, :provider], Cachex) + @poll_refresh_interval 120 @doc "GET /api/v1/polls/:id" def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id}}}} = conn, _) do @@ -80,7 +81,7 @@ defp maybe_refresh_poll(%Activity{object: %Object{} = object} = activity) do {:ok, end_time} <- NaiveDateTime.from_iso8601(object.data["closed"]), {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, end_time)} do PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) - |> Oban.insert(unique: [period: 60]) + |> Oban.insert(unique: [period: @poll_refresh_interval]) end end end From 03a6e33b81281256f2e9b6ffb75910fdd1a7894f Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Wed, 9 Oct 2024 16:25:58 -0400 Subject: [PATCH 18/19] Skip the final refresh job if the activity is local --- lib/pleroma/workers/poll_worker.ex | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index 7d69bea548..a9afe9d63a 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -22,9 +22,11 @@ defmodule Pleroma.Workers.PollWorker do def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)}, {:ok, notifications} <- Notification.create_poll_notifications(activity) do - # Schedule a final refresh - __MODULE__.new(%{"op" => "refresh", "activity_id" => activity_id}) - |> Oban.insert() + unless activity.local do + # Schedule a final refresh + __MODULE__.new(%{"op" => "refresh", "activity_id" => activity_id}) + |> Oban.insert() + end Notification.stream(notifications) else From 5b04c2bf131f70120c407f5b4c242e3d245151f8 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Wed, 9 Oct 2024 20:15:00 -0400 Subject: [PATCH 19/19] Test the final refresh behavior of a PollWorker poll_end job --- test/pleroma/workers/poll_worker_test.exs | 32 ++++++++++++++++++++--- test/support/factory.ex | 3 ++- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/test/pleroma/workers/poll_worker_test.exs b/test/pleroma/workers/poll_worker_test.exs index 70eb7c4226..a7cbbdb83c 100644 --- a/test/pleroma/workers/poll_worker_test.exs +++ b/test/pleroma/workers/poll_worker_test.exs @@ -11,10 +11,10 @@ defmodule Pleroma.Workers.PollWorkerTest do alias Pleroma.Workers.PollWorker - test "poll notification job" do + test "local poll ending notification job" do user = insert(:user) question = insert(:question, user: user) - activity = insert(:question_activity, question: question) + activity = insert(:question_activity, question: question, user: user) PollWorker.schedule_poll_end(activity) @@ -45,14 +45,38 @@ test "poll notification job" do assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], :_)) assert called(Pleroma.Web.Push.send(:_)) - # Ensure we scheduled a final refresh of the poll - assert_enqueued( + # Skip refreshing polls for local activities + assert activity.local + + refute_enqueued( worker: PollWorker, args: %{"op" => "refresh", "activity_id" => activity.id} ) end end + test "remote poll ending notification job schedules refresh" do + user = insert(:user, local: false) + question = insert(:question, user: user) + activity = insert(:question_activity, question: question, user: user) + + PollWorker.schedule_poll_end(activity) + + expected_job_args = %{"activity_id" => activity.id, "op" => "poll_end"} + + assert_enqueued(args: expected_job_args) + + [job] = all_enqueued(worker: PollWorker) + PollWorker.perform(job) + + refute activity.local + + assert_enqueued( + worker: PollWorker, + args: %{"op" => "refresh", "activity_id" => activity.id} + ) + end + test "poll refresh" do user = insert(:user, local: false) question = insert(:question, user: user) diff --git a/test/support/factory.ex b/test/support/factory.ex index 732ea3143c..91e5805c8e 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -510,7 +510,8 @@ def question_activity_factory(attrs \\ %{}) do %Pleroma.Activity{ data: data, actor: data["actor"], - recipients: data["to"] + recipients: data["to"], + local: user.local } |> Map.merge(attrs) end