From 05a20c8d6d45315effe2fdd9f20c04dba2328696 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 5 Sep 2022 12:24:14 -0500 Subject: [PATCH 1/3] Create notifications async with NotificationWorker, #91 --- config/config.exs | 1 + lib/pleroma/web/activity_pub/activity_pub.ex | 4 +-- lib/pleroma/workers/notification_worker.ex | 27 ++++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 lib/pleroma/workers/notification_worker.ex diff --git a/config/config.exs b/config/config.exs index 321ea40d0d..786f52c1b8 100644 --- a/config/config.exs +++ b/config/config.exs @@ -578,6 +578,7 @@ transmogrifier: 20, scheduled_activities: 10, poll_notifications: 10, + notifications: 20, background: 5, remote_fetcher: 2, attachments_cleanup: 1, diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 317c23e219..1bbde160ef 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -12,7 +12,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Filter alias Pleroma.Hashtag alias Pleroma.Maps - alias Pleroma.Notification alias Pleroma.Object alias Pleroma.Object.Containment alias Pleroma.Object.Fetcher @@ -25,6 +24,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Web.Streamer alias Pleroma.Web.WebFinger alias Pleroma.Workers.BackgroundWorker + alias Pleroma.Workers.NotificationWorker alias Pleroma.Workers.PollWorker import Ecto.Changeset @@ -201,7 +201,7 @@ defp insert_activity_with_expiration(data, local, recipients) do end def notify_and_stream(activity) do - Notification.create_notifications(activity) + NotificationWorker.enqueue("create", %{"activity_id" => activity.id}) conversation = create_or_bump_conversation(activity, activity.actor) participations = get_participations(conversation) diff --git a/lib/pleroma/workers/notification_worker.ex b/lib/pleroma/workers/notification_worker.ex new file mode 100644 index 0000000000..a94c5a70e5 --- /dev/null +++ b/lib/pleroma/workers/notification_worker.ex @@ -0,0 +1,27 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.NotificationWorker do + @moduledoc """ + Creates notifications for an Activity. + """ + use Pleroma.Workers.WorkerHelper, queue: "notifications" + + alias Pleroma.Activity + alias Pleroma.Notification + + @impl Oban.Worker + @spec perform(Oban.Job.t()) :: {:error, :activity_not_found} | {:ok, [Pleroma.Notification.t()]} + def perform(%Job{args: %{"op" => "create", "activity_id" => activity_id}}) do + with %Activity{} = activity <- find_activity(activity_id) do + Notification.create_notifications(activity) + end + end + + defp find_activity(activity_id) do + with nil <- Activity.get_by_id(activity_id) do + {:error, :activity_not_found} + end + end +end From 5b8b21b9e967cfc12ca30a6884ff659d71e3ab4f Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 5 Sep 2022 14:16:27 -0500 Subject: [PATCH 2/3] Fix tests for async notifications --- test/mix/tasks/pleroma/digest_test.exs | 2 + .../notification_backfill_test.exs | 2 + test/pleroma/notification_test.exs | 37 ++++++++++++++++++- test/pleroma/user_test.exs | 4 ++ .../web/activity_pub/activity_pub_test.exs | 2 +- test/pleroma/web/common_api_test.exs | 2 + .../notification_controller_test.exs | 18 ++++++++- .../notification_controller_test.exs | 1 + test/pleroma/web/push/impl_test.exs | 3 ++ .../cron/digest_emails_worker_test.exs | 1 + 10 files changed, 68 insertions(+), 4 deletions(-) diff --git a/test/mix/tasks/pleroma/digest_test.exs b/test/mix/tasks/pleroma/digest_test.exs index d2a8606c7d..37ac10738f 100644 --- a/test/mix/tasks/pleroma/digest_test.exs +++ b/test/mix/tasks/pleroma/digest_test.exs @@ -35,6 +35,8 @@ test "Sends digest to the given user" do }) end) + ObanHelpers.perform_all() + yesterday = NaiveDateTime.add( NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second), diff --git a/test/pleroma/migration_helper/notification_backfill_test.exs b/test/pleroma/migration_helper/notification_backfill_test.exs index 6d47bb6a84..946931bba1 100644 --- a/test/pleroma/migration_helper/notification_backfill_test.exs +++ b/test/pleroma/migration_helper/notification_backfill_test.exs @@ -24,6 +24,8 @@ test "it fills in missing notification types" do {:ok, like} = CommonAPI.favorite(other_user, post.id) {:ok, react_2} = CommonAPI.react_with_emoji(post.id, other_user, "☕") + Pleroma.Tests.ObanHelpers.perform_all() + data = react_2.data |> Map.put("type", "EmojiReaction") diff --git a/test/pleroma/notification_test.exs b/test/pleroma/notification_test.exs index 3169d06c1a..89e95d6a4b 100644 --- a/test/pleroma/notification_test.exs +++ b/test/pleroma/notification_test.exs @@ -84,6 +84,8 @@ test "notifies someone when they are directly addressed" do {:ok, [notification, other_notification]} = Notification.create_notifications(activity) + Pleroma.Tests.ObanHelpers.perform_all() + notified_ids = Enum.sort([notification.user_id, other_notification.user_id]) assert notified_ids == [other_user.id, third_user.id] assert notification.activity_id == activity.id @@ -122,6 +124,8 @@ test "does not create a notification for subscribed users if status is a reply" in_reply_to_status_id: activity.id }) + Pleroma.Tests.ObanHelpers.perform_all() + user_notifications = Notification.for_user(user) assert length(user_notifications) == 1 @@ -154,6 +158,8 @@ test "create_poll_notifications/1" do {:ok, _activity} = CommonAPI.post(user, %{status: "hey @#{blocker.nickname}!"}) + Pleroma.Tests.ObanHelpers.perform_all() + blocker_id = blocker.id assert [%Notification{user_id: ^blocker_id}] = Repo.all(Notification) refute called(Push.send(:_)) @@ -168,6 +174,7 @@ test "create_poll_notifications/1" do {:ok, _user_relationships} = User.mute(muter, user) {:ok, _activity} = CommonAPI.post(user, %{status: "hey @#{muter.nickname}!"}) + Pleroma.Tests.ObanHelpers.perform_all() muter_id = muter.id assert [%Notification{user_id: ^muter_id}] = Repo.all(Notification) @@ -183,6 +190,9 @@ test "create_poll_notifications/1" do {:ok, activity} = CommonAPI.post(user, %{status: "hey @#{thread_muter.nickname}!"}) + Pleroma.Tests.ObanHelpers.perform_all() + [pre_mute_notification] = Repo.all(Notification) + {:ok, _} = CommonAPI.add_mute(thread_muter, activity) {:ok, _same_context_activity} = @@ -191,8 +201,15 @@ test "create_poll_notifications/1" do in_reply_to_status_id: activity.id }) - [pre_mute_notification, post_mute_notification] = - Repo.all(from(n in Notification, where: n.user_id == ^thread_muter.id, order_by: n.id)) + Pleroma.Tests.ObanHelpers.perform_all() + + [post_mute_notification] = + Repo.all( + from(n in Notification, + where: n.id != ^pre_mute_notification.id and n.user_id == ^thread_muter.id, + order_by: n.id + ) + ) pre_mute_notification_id = pre_mute_notification.id post_mute_notification_id = post_mute_notification.id @@ -535,6 +552,7 @@ test "clears all notifications of a certain type for a given user" do Notification.destroy_multiple_from_types(user1, ["pleroma:report"]) + Pleroma.Tests.ObanHelpers.perform_all() assert [%Pleroma.Notification{type: "mention"}] = Notification.for_user(user1) assert [%Pleroma.Notification{type: "pleroma:report"}] = Notification.for_user(user2) end @@ -554,6 +572,8 @@ test "it sets all notifications as read up to a specified notification ID" do status: "hey again @#{other_user.nickname}!" }) + Pleroma.Tests.ObanHelpers.perform_all() + [n2, n1] = Notification.for_user(other_user) assert n2.id > n1.id @@ -563,6 +583,8 @@ test "it sets all notifications as read up to a specified notification ID" do status: "hey yet again @#{other_user.nickname}!" }) + Pleroma.Tests.ObanHelpers.perform_all() + [_, read_notification] = Notification.set_read_up_to(other_user, n2.id) assert read_notification.activity.object @@ -594,6 +616,8 @@ test "it send updated marker to the 'user' and the 'user:notification' stream" d status: "hi @#{user.nickname}!" }) + Pleroma.Tests.ObanHelpers.perform_all() + [%{id: notification_id}] = Notification.for_user(user) notification_id = to_string(notification_id) @@ -986,6 +1010,7 @@ test "replying to a deleted post without tagging does not generate a notificatio in_reply_to_status_id: activity.id }) + Pleroma.Tests.ObanHelpers.perform_all() assert Enum.empty?(Notification.for_user(user)) end @@ -996,6 +1021,7 @@ test "notifications are deleted if a local user is deleted" do {:ok, _activity} = CommonAPI.post(user, %{status: "hi @#{other_user.nickname}", visibility: "direct"}) + Pleroma.Tests.ObanHelpers.perform_all() refute Enum.empty?(Notification.for_user(other_user)) {:ok, job} = User.delete(user) @@ -1100,6 +1126,8 @@ test "it returns notifications for muted user without notifications", %{user: us {:ok, _activity} = CommonAPI.post(muted, %{status: "hey @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() + [notification] = Notification.for_user(user) assert notification.activity.object @@ -1142,6 +1170,7 @@ test "it returns notifications for domain-blocked but followed user" do {:ok, _activity} = CommonAPI.post(blocked, %{status: "hey @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() assert length(Notification.for_user(user)) == 1 end @@ -1160,6 +1189,8 @@ test "it returns notifications from a muted user when with_muted is set", %{user {:ok, _activity} = CommonAPI.post(muted, %{status: "hey @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() + assert length(Notification.for_user(user, %{with_muted: true})) == 1 end @@ -1189,6 +1220,7 @@ test "it returns notifications from muted threads when with_muted is set", %{use another_user = insert(:user) {:ok, activity} = CommonAPI.post(another_user, %{status: "hey @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() {:ok, _} = Pleroma.ThreadMute.add_mute(user.id, activity.data["context"]) assert length(Notification.for_user(user, %{with_muted: true})) == 1 @@ -1208,6 +1240,7 @@ test "it returns notifications about mentions with not hidden filtered word", %{ another_user = insert(:user) {:ok, _} = CommonAPI.post(another_user, %{status: "@#{user.nickname} test"}) + Pleroma.Tests.ObanHelpers.perform_all() assert length(Notification.for_user(user)) == 1 end diff --git a/test/pleroma/user_test.exs b/test/pleroma/user_test.exs index 063f168f64..a9ec280b3a 100644 --- a/test/pleroma/user_test.exs +++ b/test/pleroma/user_test.exs @@ -1504,6 +1504,8 @@ test "hide a user's statuses from timelines and notifications" do {:ok, activity} = CommonAPI.post(user, %{status: "hey @#{user2.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() + activity = Repo.preload(activity, :bookmark) [notification] = Pleroma.Notification.for_user(user2) @@ -2142,6 +2144,8 @@ test "Only includes users with no read notifications" do }) end) + Pleroma.Tests.ObanHelpers.perform_all() + Enum.each(active, fn user -> [n1, _n2] = Pleroma.Notification.for_user(user) {:ok, _} = Pleroma.Notification.read_one(user, n1.id) diff --git a/test/pleroma/web/activity_pub/activity_pub_test.exs b/test/pleroma/web/activity_pub/activity_pub_test.exs index 01c02dab3e..d1e8777166 100644 --- a/test/pleroma/web/activity_pub/activity_pub_test.exs +++ b/test/pleroma/web/activity_pub/activity_pub_test.exs @@ -1927,7 +1927,7 @@ test "create" do assert_enqueued(worker: Pleroma.Workers.BackgroundWorker, args: params) - Pleroma.Workers.BackgroundWorker.perform(%Oban.Job{args: params}) + Pleroma.Tests.ObanHelpers.perform_all() refute User.following?(follower, old_user) assert User.following?(follower, new_user) diff --git a/test/pleroma/web/common_api_test.exs b/test/pleroma/web/common_api_test.exs index de13f4d580..d799a91d0d 100644 --- a/test/pleroma/web/common_api_test.exs +++ b/test/pleroma/web/common_api_test.exs @@ -1052,6 +1052,8 @@ test "marks notifications as read after mute" do {:ok, favorite_activity} = CommonAPI.favorite(friend2, activity.id) {:ok, repeat_activity} = CommonAPI.repeat(activity.id, friend1) + Pleroma.Tests.ObanHelpers.perform_all() + assert Repo.aggregate( from(n in Notification, where: n.seen == false and n.user_id == ^friend1.id), :count diff --git a/test/pleroma/web/mastodon_api/controllers/notification_controller_test.exs b/test/pleroma/web/mastodon_api/controllers/notification_controller_test.exs index 4036284886..57ef7e330f 100644 --- a/test/pleroma/web/mastodon_api/controllers/notification_controller_test.exs +++ b/test/pleroma/web/mastodon_api/controllers/notification_controller_test.exs @@ -86,6 +86,8 @@ test "by default, does not contain pleroma:report" do {:ok, _report} = CommonAPI.report(third_user, %{account_id: other_user.id, status_ids: [activity.id]}) + Pleroma.Tests.ObanHelpers.perform_all() + result = conn |> get("/api/v1/notifications") @@ -196,6 +198,7 @@ test "paginates notifications using min_id, since_id, max_id, and limit" do {:ok, activity2} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"}) {:ok, activity3} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"}) {:ok, activity4} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() notification1_id = get_notification_id_by_activity(activity1) notification2_id = get_notification_id_by_activity(activity2) @@ -246,6 +249,8 @@ test "filters notifications for mentions" do {:ok, private_activity} = CommonAPI.post(other_user, %{status: "@#{user.nickname}", visibility: "private"}) + Pleroma.Tests.ObanHelpers.perform_all() + query = params_to_query(%{exclude_visibilities: ["public", "unlisted", "private"]}) conn_res = get(conn, "/api/v1/notifications?" <> query) @@ -375,13 +380,15 @@ test "doesn't return less than the requested amount of records when the user's r {:ok, _favorite} = CommonAPI.favorite(user, reply.id) + Pleroma.Tests.ObanHelpers.perform_all() + activity_ids = conn |> get("/api/v1/notifications?exclude_visibilities[]=direct&limit=2") |> json_response_and_validate_schema(200) |> Enum.map(& &1["status"]["id"]) - assert [reply.id, mention.id] == activity_ids + assert MapSet.new([reply.id, mention.id]) == MapSet.new(activity_ids) end end @@ -394,6 +401,7 @@ test "filters notifications using exclude_types" do {:ok, favorite_activity} = CommonAPI.favorite(other_user, create_activity.id) {:ok, reblog_activity} = CommonAPI.repeat(create_activity.id, other_user) {:ok, _, _, follow_activity} = CommonAPI.follow(other_user, user) + Pleroma.Tests.ObanHelpers.perform_all() mention_notification_id = get_notification_id_by_activity(mention_activity) favorite_notification_id = get_notification_id_by_activity(favorite_activity) @@ -432,6 +440,7 @@ test "filters notifications using types" do {:ok, favorite_activity} = CommonAPI.favorite(other_user, create_activity.id) {:ok, reblog_activity} = CommonAPI.repeat(create_activity.id, other_user) {:ok, _, _, follow_activity} = CommonAPI.follow(other_user, user) + Pleroma.Tests.ObanHelpers.perform_all() mention_notification_id = get_notification_id_by_activity(mention_activity) favorite_notification_id = get_notification_id_by_activity(favorite_activity) @@ -495,6 +504,7 @@ test "destroy multiple" do {:ok, activity2} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"}) {:ok, activity3} = CommonAPI.post(user, %{status: "hi @#{other_user.nickname}"}) {:ok, activity4} = CommonAPI.post(user, %{status: "hi @#{other_user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() notification1_id = get_notification_id_by_activity(activity1) notification2_id = get_notification_id_by_activity(activity2) @@ -539,6 +549,7 @@ test "doesn't see notifications after muting user with notifications" do {:ok, _, _, _} = CommonAPI.follow(user, user2) {:ok, _} = CommonAPI.post(user2, %{status: "hey @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() ret_conn = get(conn, "/api/v1/notifications") @@ -557,6 +568,7 @@ test "see notifications after muting user without notifications" do {:ok, _, _, _} = CommonAPI.follow(user, user2) {:ok, _} = CommonAPI.post(user2, %{status: "hey @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() ret_conn = get(conn, "/api/v1/notifications") @@ -575,6 +587,7 @@ test "see notifications after muting user with notifications and with_muted para {:ok, _, _, _} = CommonAPI.follow(user, user2) {:ok, _} = CommonAPI.post(user2, %{status: "hey @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() ret_conn = get(conn, "/api/v1/notifications") @@ -618,6 +631,8 @@ test "preserves parameters in link headers" do visibility: "public" }) + Pleroma.Tests.ObanHelpers.perform_all() + notification1 = Repo.get_by(Notification, activity_id: activity1.id) notification2 = Repo.get_by(Notification, activity_id: activity2.id) @@ -642,6 +657,7 @@ test "account_id" do {:ok, _activity} = CommonAPI.post(other_user1, %{status: "hi @#{user.nickname}"}) {:ok, _activity} = CommonAPI.post(other_user2, %{status: "bye @#{user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() assert [%{"account" => %{"id" => ^account_id}}] = conn diff --git a/test/pleroma/web/pleroma_api/controllers/notification_controller_test.exs b/test/pleroma/web/pleroma_api/controllers/notification_controller_test.exs index b8c7964f9c..76a01bcd84 100644 --- a/test/pleroma/web/pleroma_api/controllers/notification_controller_test.exs +++ b/test/pleroma/web/pleroma_api/controllers/notification_controller_test.exs @@ -37,6 +37,7 @@ test "it marks multiple notifications as read", %{user: user1, conn: conn} do {:ok, _activity1} = CommonAPI.post(user2, %{status: "hi @#{user1.nickname}"}) {:ok, _activity2} = CommonAPI.post(user2, %{status: "hi @#{user1.nickname}"}) {:ok, _activity3} = CommonAPI.post(user2, %{status: "HIE @#{user1.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() [notification3, notification2, notification1] = Notification.for_user(user1, %{limit: 3}) diff --git a/test/pleroma/web/push/impl_test.exs b/test/pleroma/web/push/impl_test.exs index b8112cce5e..54f7529b02 100644 --- a/test/pleroma/web/push/impl_test.exs +++ b/test/pleroma/web/push/impl_test.exs @@ -246,6 +246,8 @@ test "builds content for chat messages with no content" do {:ok, chat} = CommonAPI.post_chat_message(user, recipient, nil, media_id: upload.id) object = Object.normalize(chat, fetch: false) + + Pleroma.Tests.ObanHelpers.perform_all() [notification] = Notification.for_user(recipient) res = Impl.build_content(notification, user, object) @@ -347,6 +349,7 @@ test "returns regular content when hiding contents option disabled" do } {:ok, activity} = CommonAPI.favorite(user, activity.id) + Pleroma.Tests.ObanHelpers.perform_all() notif = insert(:notification, user: user2, activity: activity, type: "favourite") diff --git a/test/pleroma/workers/cron/digest_emails_worker_test.exs b/test/pleroma/workers/cron/digest_emails_worker_test.exs index 851f4d63ab..11ff36aed5 100644 --- a/test/pleroma/workers/cron/digest_emails_worker_test.exs +++ b/test/pleroma/workers/cron/digest_emails_worker_test.exs @@ -30,6 +30,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorkerTest do user2 = insert(:user, last_digest_emailed_at: date) {:ok, _} = User.switch_email_notifications(user2, "digest", true) CommonAPI.post(user, %{status: "hey @#{user2.nickname}!"}) + ObanHelpers.perform_all() {:ok, user2: user2} end From 7375535af24cc5bf153935f701cdfd8259932be7 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 6 Sep 2022 11:04:22 -0500 Subject: [PATCH 3/3] Update description.exs with :poll_notifications and :notifications oban queues --- config/description.exs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/config/description.exs b/config/description.exs index 40053144cf..b394de24eb 100644 --- a/config/description.exs +++ b/config/description.exs @@ -1917,6 +1917,8 @@ federator_outgoing: 50, mailer: 10, scheduled_activities: 10, + poll_notifications: 10, + notifications: 20, transmogrifier: 20, web_push: 50 ], @@ -1969,6 +1971,18 @@ description: "Scheduled activities queue, see Pleroma.ScheduledActivities", suggestions: [10] }, + %{ + key: :poll_notifications, + type: :integer, + description: "Stores poll expirations so it can notify users when a poll ends", + suggestions: [10] + }, + %{ + key: :notifications, + type: :integer, + description: "Creates notifications for activities in the background", + suggestions: [20] + }, %{ key: :transmogrifier, type: :integer,