Merge branch 'fix-subscriptions' into 'develop'

Create notifications async with NotificationWorker, #91

See merge request soapbox-pub/rebased!173
This commit is contained in:
Alex Gleason 2022-09-06 16:33:02 +00:00
commit ce9bacc3be
14 changed files with 112 additions and 6 deletions

View file

@ -578,6 +578,7 @@
transmogrifier: 20,
scheduled_activities: 10,
poll_notifications: 10,
notifications: 20,
background: 5,
remote_fetcher: 2,
attachments_cleanup: 1,

View file

@ -1939,6 +1939,8 @@
federator_outgoing: 50,
mailer: 10,
scheduled_activities: 10,
poll_notifications: 10,
notifications: 20,
transmogrifier: 20,
web_push: 50
],
@ -1991,6 +1993,18 @@
description: "Scheduled activities queue, see Pleroma.ScheduledActivities",
suggestions: [10]
},
%{
key: :poll_notifications,
type: :integer,
description: "Stores poll expirations so it can notify users when a poll ends",
suggestions: [10]
},
%{
key: :notifications,
type: :integer,
description: "Creates notifications for activities in the background",
suggestions: [20]
},
%{
key: :transmogrifier,
type: :integer,

View file

@ -12,7 +12,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Filter
alias Pleroma.Hashtag
alias Pleroma.Maps
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.Object.Containment
alias Pleroma.Object.Fetcher
@ -25,6 +24,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker
alias Pleroma.Workers.NotificationWorker
alias Pleroma.Workers.PollWorker
import Ecto.Query
@ -200,7 +200,7 @@ defp insert_activity_with_expiration(data, local, recipients) do
end
def notify_and_stream(activity) do
Notification.create_notifications(activity)
NotificationWorker.enqueue("create", %{"activity_id" => activity.id})
original_activity =
case activity do

View file

@ -0,0 +1,27 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.NotificationWorker do
@moduledoc """
Creates notifications for an Activity.
"""
use Pleroma.Workers.WorkerHelper, queue: "notifications"
alias Pleroma.Activity
alias Pleroma.Notification
@impl Oban.Worker
@spec perform(Oban.Job.t()) :: {:error, :activity_not_found} | {:ok, [Pleroma.Notification.t()]}
def perform(%Job{args: %{"op" => "create", "activity_id" => activity_id}}) do
with %Activity{} = activity <- find_activity(activity_id) do
Notification.create_notifications(activity)
end
end
defp find_activity(activity_id) do
with nil <- Activity.get_by_id(activity_id) do
{:error, :activity_not_found}
end
end
end

View file

@ -35,6 +35,8 @@ test "Sends digest to the given user" do
})
end)
ObanHelpers.perform_all()
yesterday =
NaiveDateTime.add(
NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second),

View file

@ -24,6 +24,8 @@ test "it fills in missing notification types" do
{:ok, like} = CommonAPI.favorite(other_user, post.id)
{:ok, react_2} = CommonAPI.react_with_emoji(post.id, other_user, "")
Pleroma.Tests.ObanHelpers.perform_all()
data =
react_2.data
|> Map.put("type", "EmojiReaction")

View file

@ -84,6 +84,8 @@ test "notifies someone when they are directly addressed" do
{:ok, [notification, other_notification]} = Notification.create_notifications(activity)
Pleroma.Tests.ObanHelpers.perform_all()
notified_ids = Enum.sort([notification.user_id, other_notification.user_id])
assert notified_ids == [other_user.id, third_user.id]
assert notification.activity_id == activity.id
@ -122,6 +124,8 @@ test "does not create a notification for subscribed users if status is a reply"
in_reply_to_status_id: activity.id
})
Pleroma.Tests.ObanHelpers.perform_all()
user_notifications = Notification.for_user(user)
assert length(user_notifications) == 1
@ -176,6 +180,8 @@ test "create_poll_notifications/1" do
{:ok, _activity} = CommonAPI.post(user, %{status: "hey @#{blocker.nickname}!"})
Pleroma.Tests.ObanHelpers.perform_all()
blocker_id = blocker.id
assert [%Notification{user_id: ^blocker_id}] = Repo.all(Notification)
refute called(Push.send(:_))
@ -190,6 +196,7 @@ test "create_poll_notifications/1" do
{:ok, _user_relationships} = User.mute(muter, user)
{:ok, _activity} = CommonAPI.post(user, %{status: "hey @#{muter.nickname}!"})
Pleroma.Tests.ObanHelpers.perform_all()
muter_id = muter.id
assert [%Notification{user_id: ^muter_id}] = Repo.all(Notification)
@ -205,6 +212,9 @@ test "create_poll_notifications/1" do
{:ok, activity} = CommonAPI.post(user, %{status: "hey @#{thread_muter.nickname}!"})
Pleroma.Tests.ObanHelpers.perform_all()
[pre_mute_notification] = Repo.all(Notification)
{:ok, _} = CommonAPI.add_mute(thread_muter, activity)
{:ok, _same_context_activity} =
@ -213,8 +223,15 @@ test "create_poll_notifications/1" do
in_reply_to_status_id: activity.id
})
[pre_mute_notification, post_mute_notification] =
Repo.all(from(n in Notification, where: n.user_id == ^thread_muter.id, order_by: n.id))
Pleroma.Tests.ObanHelpers.perform_all()
[post_mute_notification] =
Repo.all(
from(n in Notification,
where: n.id != ^pre_mute_notification.id and n.user_id == ^thread_muter.id,
order_by: n.id
)
)
pre_mute_notification_id = pre_mute_notification.id
post_mute_notification_id = post_mute_notification.id
@ -557,6 +574,7 @@ test "clears all notifications of a certain type for a given user" do
Notification.destroy_multiple_from_types(user1, ["pleroma:report"])
Pleroma.Tests.ObanHelpers.perform_all()
assert [%Pleroma.Notification{type: "mention"}] = Notification.for_user(user1)
assert [%Pleroma.Notification{type: "pleroma:report"}] = Notification.for_user(user2)
end
@ -576,6 +594,8 @@ test "it sets all notifications as read up to a specified notification ID" do
status: "hey again @#{other_user.nickname}!"
})
Pleroma.Tests.ObanHelpers.perform_all()
[n2, n1] = Notification.for_user(other_user)
assert n2.id > n1.id
@ -585,6 +605,8 @@ test "it sets all notifications as read up to a specified notification ID" do
status: "hey yet again @#{other_user.nickname}!"
})
Pleroma.Tests.ObanHelpers.perform_all()
[_, read_notification] = Notification.set_read_up_to(other_user, n2.id)
assert read_notification.activity.object
@ -616,6 +638,8 @@ test "it send updated marker to the 'user' and the 'user:notification' stream" d
status: "hi @#{user.nickname}!"
})
Pleroma.Tests.ObanHelpers.perform_all()
[%{id: notification_id}] = Notification.for_user(user)
notification_id = to_string(notification_id)
@ -1032,6 +1056,7 @@ test "replying to a deleted post without tagging does not generate a notificatio
in_reply_to_status_id: activity.id
})
Pleroma.Tests.ObanHelpers.perform_all()
assert Enum.empty?(Notification.for_user(user))
end
@ -1042,6 +1067,7 @@ test "notifications are deleted if a local user is deleted" do
{:ok, _activity} =
CommonAPI.post(user, %{status: "hi @#{other_user.nickname}", visibility: "direct"})
Pleroma.Tests.ObanHelpers.perform_all()
refute Enum.empty?(Notification.for_user(other_user))
{:ok, job} = User.delete(user)
@ -1146,6 +1172,8 @@ test "it returns notifications for muted user without notifications", %{user: us
{:ok, _activity} = CommonAPI.post(muted, %{status: "hey @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
[notification] = Notification.for_user(user)
assert notification.activity.object
@ -1188,6 +1216,7 @@ test "it returns notifications for domain-blocked but followed user" do
{:ok, _activity} = CommonAPI.post(blocked, %{status: "hey @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
assert length(Notification.for_user(user)) == 1
end
@ -1206,6 +1235,8 @@ test "it returns notifications from a muted user when with_muted is set", %{user
{:ok, _activity} = CommonAPI.post(muted, %{status: "hey @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
assert length(Notification.for_user(user, %{with_muted: true})) == 1
end
@ -1235,6 +1266,7 @@ test "it returns notifications from muted threads when with_muted is set", %{use
another_user = insert(:user)
{:ok, activity} = CommonAPI.post(another_user, %{status: "hey @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
{:ok, _} = Pleroma.ThreadMute.add_mute(user.id, activity.data["context"])
assert length(Notification.for_user(user, %{with_muted: true})) == 1
@ -1254,6 +1286,7 @@ test "it returns notifications about mentions with not hidden filtered word", %{
another_user = insert(:user)
{:ok, _} = CommonAPI.post(another_user, %{status: "@#{user.nickname} test"})
Pleroma.Tests.ObanHelpers.perform_all()
assert length(Notification.for_user(user)) == 1
end

View file

@ -1504,6 +1504,8 @@ test "hide a user's statuses from timelines and notifications" do
{:ok, activity} = CommonAPI.post(user, %{status: "hey @#{user2.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
activity = Repo.preload(activity, :bookmark)
[notification] = Pleroma.Notification.for_user(user2)
@ -2142,6 +2144,8 @@ test "Only includes users with no read notifications" do
})
end)
Pleroma.Tests.ObanHelpers.perform_all()
Enum.each(active, fn user ->
[n1, _n2] = Pleroma.Notification.for_user(user)
{:ok, _} = Pleroma.Notification.read_one(user, n1.id)

View file

@ -1919,7 +1919,7 @@ test "create" do
assert_enqueued(worker: Pleroma.Workers.BackgroundWorker, args: params)
Pleroma.Workers.BackgroundWorker.perform(%Oban.Job{args: params})
Pleroma.Tests.ObanHelpers.perform_all()
refute User.following?(follower, old_user)
assert User.following?(follower, new_user)

View file

@ -1052,6 +1052,8 @@ test "marks notifications as read after mute" do
{:ok, favorite_activity} = CommonAPI.favorite(friend2, activity.id)
{:ok, repeat_activity} = CommonAPI.repeat(activity.id, friend1)
Pleroma.Tests.ObanHelpers.perform_all()
assert Repo.aggregate(
from(n in Notification, where: n.seen == false and n.user_id == ^friend1.id),
:count

View file

@ -86,6 +86,8 @@ test "by default, does not contain pleroma:report" do
{:ok, _report} =
CommonAPI.report(third_user, %{account_id: other_user.id, status_ids: [activity.id]})
Pleroma.Tests.ObanHelpers.perform_all()
result =
conn
|> get("/api/v1/notifications")
@ -196,6 +198,7 @@ test "paginates notifications using min_id, since_id, max_id, and limit" do
{:ok, activity2} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"})
{:ok, activity3} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"})
{:ok, activity4} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
notification1_id = get_notification_id_by_activity(activity1)
notification2_id = get_notification_id_by_activity(activity2)
@ -246,6 +249,8 @@ test "filters notifications for mentions" do
{:ok, private_activity} =
CommonAPI.post(other_user, %{status: "@#{user.nickname}", visibility: "private"})
Pleroma.Tests.ObanHelpers.perform_all()
query = params_to_query(%{exclude_visibilities: ["public", "unlisted", "private"]})
conn_res = get(conn, "/api/v1/notifications?" <> query)
@ -375,13 +380,15 @@ test "doesn't return less than the requested amount of records when the user's r
{:ok, _favorite} = CommonAPI.favorite(user, reply.id)
Pleroma.Tests.ObanHelpers.perform_all()
activity_ids =
conn
|> get("/api/v1/notifications?exclude_visibilities[]=direct&limit=2")
|> json_response_and_validate_schema(200)
|> Enum.map(& &1["status"]["id"])
assert [reply.id, mention.id] == activity_ids
assert MapSet.new([reply.id, mention.id]) == MapSet.new(activity_ids)
end
end
@ -394,6 +401,7 @@ test "filters notifications using exclude_types" do
{:ok, favorite_activity} = CommonAPI.favorite(other_user, create_activity.id)
{:ok, reblog_activity} = CommonAPI.repeat(create_activity.id, other_user)
{:ok, _, _, follow_activity} = CommonAPI.follow(other_user, user)
Pleroma.Tests.ObanHelpers.perform_all()
mention_notification_id = get_notification_id_by_activity(mention_activity)
favorite_notification_id = get_notification_id_by_activity(favorite_activity)
@ -432,6 +440,7 @@ test "filters notifications using types" do
{:ok, favorite_activity} = CommonAPI.favorite(other_user, create_activity.id)
{:ok, reblog_activity} = CommonAPI.repeat(create_activity.id, other_user)
{:ok, _, _, follow_activity} = CommonAPI.follow(other_user, user)
Pleroma.Tests.ObanHelpers.perform_all()
mention_notification_id = get_notification_id_by_activity(mention_activity)
favorite_notification_id = get_notification_id_by_activity(favorite_activity)
@ -495,6 +504,7 @@ test "destroy multiple" do
{:ok, activity2} = CommonAPI.post(other_user, %{status: "hi @#{user.nickname}"})
{:ok, activity3} = CommonAPI.post(user, %{status: "hi @#{other_user.nickname}"})
{:ok, activity4} = CommonAPI.post(user, %{status: "hi @#{other_user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
notification1_id = get_notification_id_by_activity(activity1)
notification2_id = get_notification_id_by_activity(activity2)
@ -539,6 +549,7 @@ test "doesn't see notifications after muting user with notifications" do
{:ok, _, _, _} = CommonAPI.follow(user, user2)
{:ok, _} = CommonAPI.post(user2, %{status: "hey @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
ret_conn = get(conn, "/api/v1/notifications")
@ -557,6 +568,7 @@ test "see notifications after muting user without notifications" do
{:ok, _, _, _} = CommonAPI.follow(user, user2)
{:ok, _} = CommonAPI.post(user2, %{status: "hey @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
ret_conn = get(conn, "/api/v1/notifications")
@ -575,6 +587,7 @@ test "see notifications after muting user with notifications and with_muted para
{:ok, _, _, _} = CommonAPI.follow(user, user2)
{:ok, _} = CommonAPI.post(user2, %{status: "hey @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
ret_conn = get(conn, "/api/v1/notifications")
@ -618,6 +631,8 @@ test "preserves parameters in link headers" do
visibility: "public"
})
Pleroma.Tests.ObanHelpers.perform_all()
notification1 = Repo.get_by(Notification, activity_id: activity1.id)
notification2 = Repo.get_by(Notification, activity_id: activity2.id)
@ -642,6 +657,7 @@ test "account_id" do
{:ok, _activity} = CommonAPI.post(other_user1, %{status: "hi @#{user.nickname}"})
{:ok, _activity} = CommonAPI.post(other_user2, %{status: "bye @#{user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
assert [%{"account" => %{"id" => ^account_id}}] =
conn

View file

@ -37,6 +37,7 @@ test "it marks multiple notifications as read", %{user: user1, conn: conn} do
{:ok, _activity1} = CommonAPI.post(user2, %{status: "hi @#{user1.nickname}"})
{:ok, _activity2} = CommonAPI.post(user2, %{status: "hi @#{user1.nickname}"})
{:ok, _activity3} = CommonAPI.post(user2, %{status: "HIE @#{user1.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
[notification3, notification2, notification1] = Notification.for_user(user1, %{limit: 3})

View file

@ -246,6 +246,8 @@ test "builds content for chat messages with no content" do
{:ok, chat} = CommonAPI.post_chat_message(user, recipient, nil, media_id: upload.id)
object = Object.normalize(chat, fetch: false)
Pleroma.Tests.ObanHelpers.perform_all()
[notification] = Notification.for_user(recipient)
res = Impl.build_content(notification, user, object)
@ -347,6 +349,7 @@ test "returns regular content when hiding contents option disabled" do
}
{:ok, activity} = CommonAPI.favorite(user, activity.id)
Pleroma.Tests.ObanHelpers.perform_all()
notif = insert(:notification, user: user2, activity: activity, type: "favourite")

View file

@ -30,6 +30,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorkerTest do
user2 = insert(:user, last_digest_emailed_at: date)
{:ok, _} = User.switch_email_notifications(user2, "digest", true)
CommonAPI.post(user, %{status: "hey @#{user2.nickname}!"})
ObanHelpers.perform_all()
{:ok, user2: user2}
end