Schedule a final poll refresh before streaming out the notifications

This commit is contained in:
Mark Felder 2024-09-30 14:17:35 -04:00
parent 4b3f604f95
commit 47ce3a4a96
2 changed files with 12 additions and 2 deletions

View file

@ -22,6 +22,10 @@ defmodule Pleroma.Workers.PollWorker do
def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)}, with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)},
{:ok, notifications} <- Notification.create_poll_notifications(activity) do {:ok, notifications} <- Notification.create_poll_notifications(activity) do
# Schedule a final refresh
__MODULE__.new(%{"op" => "refresh", "activity_id" => activity_id})
|> Oban.insert()
Notification.stream(notifications) Notification.stream(notifications)
else else
{:activity, nil} -> {:cancel, :poll_activity_not_found} {:activity, nil} -> {:cancel, :poll_activity_not_found}
@ -32,8 +36,8 @@ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do
with {_, %Activity{object: object}} <- with {_, %Activity{object: object}} <-
{:activity, Activity.get_by_id_with_object(activity_id)}, {:activity, Activity.get_by_id_with_object(activity_id)},
{:ok, naive_closed} <- NaiveDateTime.from_iso8601(object.data["closed"]), {:ok, naive_closed} <- NaiveDateTime.from_iso8601(object.data["closed"]),
{_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, naive_closed)}, {_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, naive_closed)},
{_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do
stream_update(activity_id) stream_update(activity_id)

View file

@ -44,6 +44,12 @@ test "poll notification job" do
# Ensure notifications were streamed out when job executes # Ensure notifications were streamed out when job executes
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], :_)) assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], :_))
assert called(Pleroma.Web.Push.send(:_)) assert called(Pleroma.Web.Push.send(:_))
# Ensure we scheduled a final refresh of the poll
assert_enqueued(
worker: PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end end
end end
end end