Merge branch 'poll-refresh' into 'develop'

Refactor Poll Refreshing

See merge request pleroma/pleroma!4278
This commit is contained in:
feld 2024-10-10 00:46:28 +00:00
commit 3f3f8bc57a
8 changed files with 143 additions and 179 deletions

View file

@ -0,0 +1 @@
Poll results refreshing is handled asynchronously and will not attempt to keep fetching updates to a closed poll.

View file

@ -99,27 +99,6 @@ defp hashtags_changed?(_, _), do: false
def get_by_id(nil), do: nil def get_by_id(nil), do: nil
def get_by_id(id), do: Repo.get(Object, id) def get_by_id(id), do: Repo.get(Object, id)
@spec get_by_id_and_maybe_refetch(integer(), list()) :: Object.t() | nil
def get_by_id_and_maybe_refetch(id, opts \\ []) do
with %Object{updated_at: updated_at} = object <- get_by_id(id) do
if opts[:interval] &&
NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
case Fetcher.refetch_object(object) do
{:ok, %Object{} = object} ->
object
e ->
Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
object
end
else
object
end
else
nil -> nil
end
end
def get_by_ap_id(nil), do: nil def get_by_ap_id(nil), do: nil
def get_by_ap_id(ap_id) do def get_by_ap_id(ap_id) do

View file

@ -12,6 +12,7 @@ defmodule Pleroma.Web.MastodonAPI.PollController do
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Plugs.OAuthScopesPlug alias Pleroma.Web.Plugs.OAuthScopesPlug
alias Pleroma.Workers.PollWorker
action_fallback(Pleroma.Web.MastodonAPI.FallbackController) action_fallback(Pleroma.Web.MastodonAPI.FallbackController)
@ -27,12 +28,16 @@ defmodule Pleroma.Web.MastodonAPI.PollController do
defdelegate open_api_operation(action), to: Pleroma.Web.ApiSpec.PollOperation defdelegate open_api_operation(action), to: Pleroma.Web.ApiSpec.PollOperation
@cachex Pleroma.Config.get([:cachex, :provider], Cachex) @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
@poll_refresh_interval 120
@doc "GET /api/v1/polls/:id" @doc "GET /api/v1/polls/:id"
def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id}}}} = conn, _) do def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id}}}} = conn, _) do
with %Object{} = object <- Object.get_by_id_and_maybe_refetch(id, interval: 60), with %Object{} = object <- Object.get_by_id(id),
%Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), %Activity{} = activity <-
Activity.get_create_by_object_ap_id_with_object(object.data["id"]),
true <- Visibility.visible_for_user?(activity, user) do true <- Visibility.visible_for_user?(activity, user) do
maybe_refresh_poll(activity)
try_render(conn, "show.json", %{object: object, for: user}) try_render(conn, "show.json", %{object: object, for: user})
else else
error when is_nil(error) or error == false -> error when is_nil(error) or error == false ->
@ -70,4 +75,13 @@ defp get_cached_vote_or_vote(object, user, choices) do
end end
end) end)
end end
defp maybe_refresh_poll(%Activity{object: %Object{} = object} = activity) do
with false <- activity.local,
{:ok, end_time} <- NaiveDateTime.from_iso8601(object.data["closed"]),
{_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, end_time)} do
PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id})
|> Oban.insert(unique: [period: @poll_refresh_interval])
end
end
end end

View file

@ -11,27 +11,46 @@ defmodule Pleroma.Workers.PollWorker do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Object.Fetcher
@stream_out_impl Pleroma.Config.get(
[__MODULE__, :stream_out],
Pleroma.Web.ActivityPub.ActivityPub
)
@impl true @impl true
def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
with %Activity{} = activity <- find_poll_activity(activity_id), with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)},
{:ok, notifications} <- Notification.create_poll_notifications(activity) do {:ok, notifications} <- Notification.create_poll_notifications(activity) do
unless activity.local do
# Schedule a final refresh
__MODULE__.new(%{"op" => "refresh", "activity_id" => activity_id})
|> Oban.insert()
end
Notification.stream(notifications) Notification.stream(notifications)
else else
{:error, :poll_activity_not_found} = e -> {:cancel, e} {:activity, nil} -> {:cancel, :poll_activity_not_found}
e -> {:error, e} e -> {:error, e}
end end
end end
def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do
with {_, %Activity{object: object}} <-
{:activity, Activity.get_by_id_with_object(activity_id)},
{_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do
stream_update(activity_id)
:ok
else
{:activity, nil} -> {:cancel, :poll_activity_not_found}
{:refetch, _} = e -> {:cancel, e}
end
end
@impl true @impl true
def timeout(_job), do: :timer.seconds(5) def timeout(_job), do: :timer.seconds(5)
defp find_poll_activity(activity_id) do
with nil <- Activity.get_by_id(activity_id) do
{:error, :poll_activity_not_found}
end
end
def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do
with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <- with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <-
Object.normalize(activity), Object.normalize(activity),
@ -49,4 +68,10 @@ def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} =
end end
def schedule_poll_end(activity), do: {:error, activity} def schedule_poll_end(activity), do: {:error, activity}
defp stream_update(activity_id) do
Activity.get_by_id(activity_id)
|> Activity.normalize()
|> @stream_out_impl.stream_out()
end
end end

View file

@ -6,12 +6,10 @@ defmodule Pleroma.ObjectTest do
use Pleroma.DataCase use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo use Oban.Testing, repo: Pleroma.Repo
import ExUnit.CaptureLog
import Mox import Mox
import Pleroma.Factory import Pleroma.Factory
import Tesla.Mock import Tesla.Mock
alias Pleroma.Activity
alias Pleroma.Hashtag alias Pleroma.Hashtag
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
@ -282,148 +280,6 @@ test "does not fetch unknown objects when fetch is false" do
end end
end end
describe "get_by_id_and_maybe_refetch" do
setup do
mock(fn
%{method: :get, url: "https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"} ->
%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_original.json"),
headers: HttpRequestMock.activitypub_object_headers()
}
env ->
apply(HttpRequestMock, :request, [env])
end)
mock_modified = fn resp ->
mock(fn
%{method: :get, url: "https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"} ->
resp
env ->
apply(HttpRequestMock, :request, [env])
end)
end
on_exit(fn -> mock(fn env -> apply(HttpRequestMock, :request, [env]) end) end)
[mock_modified: mock_modified]
end
test "refetches if the time since the last refetch is greater than the interval", %{
mock_modified: mock_modified
} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)
Object.set_cache(object)
assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0
mock_modified.(%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_modified.json"),
headers: HttpRequestMock.activitypub_object_headers()
})
updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 8
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 3
end
test "returns the old object if refetch fails", %{mock_modified: mock_modified} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)
Object.set_cache(object)
assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0
assert capture_log(fn ->
mock_modified.(%Tesla.Env{status: 404, body: ""})
updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 0
end) =~
"[error] Couldn't refresh https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"
end
test "does not refetch if the time since the last refetch is greater than the interval", %{
mock_modified: mock_modified
} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)
Object.set_cache(object)
assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0
mock_modified.(%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_modified.json"),
headers: HttpRequestMock.activitypub_object_headers()
})
updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: 100)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 0
end
test "preserves internal fields on refetch", %{mock_modified: mock_modified} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)
Object.set_cache(object)
assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0
user = insert(:user)
activity = Activity.get_create_by_object_ap_id(object.data["id"])
{:ok, activity} = CommonAPI.favorite(activity.id, user)
object = Object.get_by_ap_id(activity.data["object"])
assert object.data["like_count"] == 1
mock_modified.(%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_modified.json"),
headers: HttpRequestMock.activitypub_object_headers()
})
updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 8
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 3
assert updated_object.data["like_count"] == 1
end
end
describe ":hashtags association" do describe ":hashtags association" do
test "Hashtag records are created with Object record and updated on its change" do test "Hashtag records are created with Object record and updated on its change" do
user = insert(:user) user = insert(:user)

View file

@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.MastodonAPI.PollControllerTest do defmodule Pleroma.Web.MastodonAPI.PollControllerTest do
use Oban.Testing, repo: Pleroma.Repo
use Pleroma.Web.ConnCase, async: true use Pleroma.Web.ConnCase, async: true
alias Pleroma.Object alias Pleroma.Object
@ -27,6 +28,33 @@ test "returns poll entity for object id", %{user: user, conn: conn} do
response = json_response_and_validate_schema(conn, 200) response = json_response_and_validate_schema(conn, 200)
id = to_string(object.id) id = to_string(object.id)
assert %{"id" => ^id, "expired" => false, "multiple" => false} = response assert %{"id" => ^id, "expired" => false, "multiple" => false} = response
# Local activities should not generate an Oban job to refresh
assert activity.local
refute_enqueued(
worker: Pleroma.Workers.PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end
test "creates an oban job to refresh poll if activity is remote", %{conn: conn} do
user = insert(:user, local: false)
question = insert(:question, user: user)
activity = insert(:question_activity, question: question, local: false)
# Ensure this is not represented as a local activity
refute activity.local
object = Object.normalize(activity, fetch: false)
get(conn, "/api/v1/polls/#{object.id}")
|> json_response_and_validate_schema(200)
assert_enqueued(
worker: Pleroma.Workers.PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end end
test "does not expose polls for private statuses", %{conn: conn} do test "does not expose polls for private statuses", %{conn: conn} do

View file

@ -11,10 +11,10 @@ defmodule Pleroma.Workers.PollWorkerTest do
alias Pleroma.Workers.PollWorker alias Pleroma.Workers.PollWorker
test "poll notification job" do test "local poll ending notification job" do
user = insert(:user) user = insert(:user)
question = insert(:question, user: user) question = insert(:question, user: user)
activity = insert(:question_activity, question: question) activity = insert(:question_activity, question: question, user: user)
PollWorker.schedule_poll_end(activity) PollWorker.schedule_poll_end(activity)
@ -44,6 +44,65 @@ test "poll notification job" do
# Ensure notifications were streamed out when job executes # Ensure notifications were streamed out when job executes
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], :_)) assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], :_))
assert called(Pleroma.Web.Push.send(:_)) assert called(Pleroma.Web.Push.send(:_))
# Skip refreshing polls for local activities
assert activity.local
refute_enqueued(
worker: PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end
end
test "remote poll ending notification job schedules refresh" do
user = insert(:user, local: false)
question = insert(:question, user: user)
activity = insert(:question_activity, question: question, user: user)
PollWorker.schedule_poll_end(activity)
expected_job_args = %{"activity_id" => activity.id, "op" => "poll_end"}
assert_enqueued(args: expected_job_args)
[job] = all_enqueued(worker: PollWorker)
PollWorker.perform(job)
refute activity.local
assert_enqueued(
worker: PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end
test "poll refresh" do
user = insert(:user, local: false)
question = insert(:question, user: user)
activity = insert(:question_activity, question: question)
PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id})
|> Oban.insert()
expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"}
assert_enqueued(args: expected_job_args)
with_mocks([
{
Pleroma.Web.Streamer,
[],
[
stream: fn _, _ -> nil end
]
}
]) do
[job] = all_enqueued(worker: PollWorker)
PollWorker.perform(job)
# Ensure updates are streamed out
assert called(Pleroma.Web.Streamer.stream(["user", "list", "public", "public:local"], :_))
end end
end end
end end

View file

@ -241,6 +241,7 @@ def tombstone_factory do
def question_factory(attrs \\ %{}) do def question_factory(attrs \\ %{}) do
user = attrs[:user] || insert(:user) user = attrs[:user] || insert(:user)
closed = attrs[:closed] || DateTime.utc_now() |> DateTime.add(86_400) |> DateTime.to_iso8601()
data = %{ data = %{
"id" => Pleroma.Web.ActivityPub.Utils.generate_object_id(), "id" => Pleroma.Web.ActivityPub.Utils.generate_object_id(),
@ -251,7 +252,7 @@ def question_factory(attrs \\ %{}) do
"to" => ["https://www.w3.org/ns/activitystreams#Public"], "to" => ["https://www.w3.org/ns/activitystreams#Public"],
"cc" => [user.follower_address], "cc" => [user.follower_address],
"context" => Pleroma.Web.ActivityPub.Utils.generate_context_id(), "context" => Pleroma.Web.ActivityPub.Utils.generate_context_id(),
"closed" => DateTime.utc_now() |> DateTime.add(86_400) |> DateTime.to_iso8601(), "closed" => closed,
"content" => "Which flavor of ice cream do you prefer?", "content" => "Which flavor of ice cream do you prefer?",
"oneOf" => [ "oneOf" => [
%{ %{
@ -509,7 +510,8 @@ def question_activity_factory(attrs \\ %{}) do
%Pleroma.Activity{ %Pleroma.Activity{
data: data, data: data,
actor: data["actor"], actor: data["actor"],
recipients: data["to"] recipients: data["to"],
local: user.local
} }
|> Map.merge(attrs) |> Map.merge(attrs)
end end