From c077a14ce1343f5515fa11938df7d808f23a566c Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Mon, 30 Sep 2024 13:54:56 -0400 Subject: [PATCH] Add Oban job to handle poll refreshing and stream out the update --- .../controllers/poll_controller.ex | 4 +++ lib/pleroma/workers/poll_worker.ex | 36 ++++++++++++++----- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex index 303b995f66..0d5a575184 100644 --- a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex @@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.PollController do alias Pleroma.Activity alias Pleroma.Object + alias Pleroma.Workers.PollWorker alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.CommonAPI alias Pleroma.Web.Plugs.OAuthScopesPlug @@ -33,6 +34,9 @@ def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id} with %Object{} = object <- Object.get_by_id(id), %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), true <- Visibility.visible_for_user?(activity, user) do + PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id}) + |> Oban.insert(unique: [period: 60]) + try_render(conn, "show.json", %{object: object, for: user}) else error when is_nil(error) or error == false -> diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index d263aa1b9e..0d2d67326a 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -11,14 +11,34 @@ defmodule Pleroma.Workers.PollWorker do alias Pleroma.Activity alias Pleroma.Notification alias Pleroma.Object + alias Pleroma.Object.Fetcher + + @stream_out_impl Pleroma.Config.get( + [__MODULE__, :stream_out], + Pleroma.Web.ActivityPub.ActivityPub + ) @impl true def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do - with %Activity{} = activity <- find_poll_activity(activity_id), + with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)}, {:ok, notifications} <- Notification.create_poll_notifications(activity) do Notification.stream(notifications) else - {:error, :poll_activity_not_found} = e -> {:cancel, e} + {:activity, nil} -> {:cancel, :poll_activity_not_found} + e -> {:error, e} + end + end + + def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do + with {_, %Activity{object: object}} <- + {:activity, Activity.get_by_id_with_object(activity_id)}, + {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do + stream_update(activity_id) + + :ok + else + {:activity, nil} -> {:cancel, :poll_activity_not_found} + {:refetch, _} = e -> {:cancel, e} e -> {:error, e} end end @@ -26,12 +46,6 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do @impl true def timeout(_job), do: :timer.seconds(5) - defp find_poll_activity(activity_id) do - with nil <- Activity.get_by_id(activity_id) do - {:error, :poll_activity_not_found} - end - end - def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <- Object.normalize(activity), @@ -49,4 +63,10 @@ def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = end def schedule_poll_end(activity), do: {:error, activity} + + defp stream_update(activity_id) do + Activity.get_by_id(activity_id) + |> Activity.normalize() + |> @stream_out_impl.stream_out() + end end