Unify notification push and streaming events for both local and federated activities

This also removes generation of notifications for blocked/filtered/muted users and threads.
This commit is contained in:
Mark Felder 2024-01-29 10:18:11 -05:00
parent f775a1931b
commit 291d531e4c
6 changed files with 68 additions and 246 deletions

View file

@ -0,0 +1 @@
Web Push notifications are no longer generated for muted/blocked threads and users.

View file

@ -361,36 +361,32 @@ def dismiss(%{id: user_id} = _user, id) do
end
end
@spec create_notifications(Activity.t(), keyword()) :: {:ok, [Notification.t()] | []}
def create_notifications(activity, options \\ [])
@spec create_notifications(Activity.t()) :: {:ok, [Notification.t()] | []}
def create_notifications(activity)
def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
object = Object.normalize(activity, fetch: false)
if object && object.data["type"] == "Answer" do
{:ok, []}
else
do_create_notifications(activity, options)
do_create_notifications(activity)
end
end
def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
def create_notifications(%Activity{data: %{"type" => type}} = activity)
when type in ["Follow", "Like", "Announce", "Move", "EmojiReact", "Flag", "Update"] do
do_create_notifications(activity, options)
do_create_notifications(activity)
end
def create_notifications(_, _), do: {:ok, []}
def create_notifications(_), do: {:ok, []}
defp do_create_notifications(%Activity{} = activity, options) do
do_send = Keyword.get(options, :do_send, true)
{enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
potential_receivers = enabled_receivers ++ disabled_receivers
defp do_create_notifications(%Activity{} = activity) do
enabled_receivers = get_notified_from_activity(activity)
notifications =
Enum.map(potential_receivers, fn user ->
do_send = do_send && user in enabled_receivers
create_notification(activity, user, do_send: do_send)
Enum.map(enabled_receivers, fn user ->
create_notification(activity, user)
end)
|> Enum.reject(&is_nil/1)
@ -450,7 +446,6 @@ defp type_from_activity_object(%{data: %{"type" => "Create"}} = activity) do
# TODO move to sql, too.
def create_notification(%Activity{} = activity, %User{} = user, opts \\ []) do
do_send = Keyword.get(opts, :do_send, true)
type = Keyword.get(opts, :type, type_from_activity(activity))
unless skip?(activity, user, opts) do
@ -465,11 +460,6 @@ def create_notification(%Activity{} = activity, %User{} = user, opts \\ []) do
|> Marker.multi_set_last_read_id(user, "notifications")
|> Repo.transaction()
if do_send do
Streamer.stream(["user", "user:notification"], notification)
Push.send(notification)
end
notification
end
end
@ -527,10 +517,7 @@ def get_notified_from_activity(%Activity{data: %{"type" => type}} = activity, lo
|> exclude_relationship_restricted_ap_ids(activity)
|> exclude_thread_muter_ap_ids(activity)
notification_enabled_users =
Enum.filter(potential_receivers, fn u -> u.ap_id in notification_enabled_ap_ids end)
{notification_enabled_users, potential_receivers -- notification_enabled_users}
Enum.filter(potential_receivers, fn u -> u.ap_id in notification_enabled_ap_ids end)
end
def get_notified_from_activity(_, _local_only), do: {[], []}
@ -748,4 +735,12 @@ def mark_context_as_read(%User{id: id}, context) do
)
|> Repo.update_all(set: [seen: true])
end
@spec send(list(Notification.t())) :: :ok
def send(notifications) do
Enum.each(notifications, fn notification ->
Streamer.stream(["user", "user:notification"], notification)
Push.send(notification)
end)
end
end

View file

@ -202,7 +202,8 @@ defp insert_activity_with_expiration(data, local, recipients) do
end
def notify_and_stream(activity) do
Notification.create_notifications(activity)
{:ok, notifications} = Notification.create_notifications(activity)
Notification.send(notifications)
original_activity =
case activity do

View file

@ -21,7 +21,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.ActivityPub.Builder
alias Pleroma.Web.ActivityPub.Pipeline
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Push
alias Pleroma.Web.Streamer
alias Pleroma.Workers.PollWorker
@ -125,7 +124,7 @@ def handle(
nil
end
{:ok, notifications} = Notification.create_notifications(object, do_send: false)
{:ok, notifications} = Notification.create_notifications(object)
meta =
meta
@ -184,7 +183,11 @@ def handle(%{data: %{"type" => "Like"}} = object, meta) do
liked_object = Object.get_by_ap_id(object.data["object"])
Utils.add_like_to_object(object, liked_object)
Notification.create_notifications(object)
{:ok, notifications} = Notification.create_notifications(object)
meta =
meta
|> add_notifications(notifications)
{:ok, object, meta}
end
@ -202,7 +205,7 @@ def handle(%{data: %{"type" => "Like"}} = object, meta) do
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
%User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
{:ok, notifications} = Notification.create_notifications(activity)
{:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
{:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
@ -258,11 +261,20 @@ def handle(%{data: %{"type" => "Announce"}} = object, meta) do
Utils.add_announce_to_object(object, announced_object)
if !User.internal?(user) do
Notification.create_notifications(object)
notifications =
if !User.is_internal_user?(user) do
{:ok, notifications} = Notification.create_notifications(object)
ap_streamer().stream_out(object)
end
ap_streamer().stream_out(object)
notifications
else
[]
end
meta =
meta
|> add_notifications(notifications)
{:ok, object, meta}
end
@ -283,7 +295,11 @@ def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
reacted_object = Object.get_by_ap_id(object.data["object"])
Utils.add_emoji_reaction_to_object(object, reacted_object)
Notification.create_notifications(object)
{:ok, notifications} = Notification.create_notifications(object)
meta =
meta
|> add_notifications(notifications)
{:ok, object, meta}
end
@ -587,10 +603,7 @@ defp delete_object(object) do
defp send_notifications(meta) do
Keyword.get(meta, :notifications, [])
|> Enum.each(fn notification ->
Streamer.stream(["user", "user:notification"], notification)
Push.send(notification)
end)
|> Notification.send()
meta
end

View file

@ -6,7 +6,6 @@ defmodule Pleroma.NotificationTest do
use Pleroma.DataCase, async: false
import Pleroma.Factory
import Mock
alias Pleroma.FollowingRelationship
alias Pleroma.Notification
@ -18,8 +17,6 @@ defmodule Pleroma.NotificationTest do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.MastodonAPI.NotificationView
alias Pleroma.Web.Push
alias Pleroma.Web.Streamer
setup do
Mox.stub_with(Pleroma.UnstubbedConfigMock, Pleroma.Config)
@ -175,158 +172,7 @@ test "create_poll_notifications/1" do
assert [user2.id, user3.id, user1.id] == Enum.map(notifications, & &1.user_id)
end
describe "CommonApi.post/2 notification-related functionality" do
test_with_mock "creates but does NOT send notification to blocker user",
Push,
[:passthrough],
[] do
user = insert(:user)
blocker = insert(:user)
{:ok, _user_relationship} = User.block(blocker, user)
{:ok, _activity} = CommonAPI.post(user, %{status: "hey @#{blocker.nickname}!"})
blocker_id = blocker.id
assert [%Notification{user_id: ^blocker_id}] = Repo.all(Notification)
refute called(Push.send(:_))
end
test_with_mock "creates but does NOT send notification to notification-muter user",
Push,
[:passthrough],
[] do
user = insert(:user)
muter = insert(:user)
{:ok, _user_relationships} = User.mute(muter, user)
{:ok, _activity} = CommonAPI.post(user, %{status: "hey @#{muter.nickname}!"})
muter_id = muter.id
assert [%Notification{user_id: ^muter_id}] = Repo.all(Notification)
refute called(Push.send(:_))
end
test_with_mock "creates but does NOT send notification to thread-muter user",
Push,
[:passthrough],
[] do
user = insert(:user)
thread_muter = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{status: "hey @#{thread_muter.nickname}!"})
{:ok, _} = CommonAPI.add_mute(thread_muter, activity)
{:ok, _same_context_activity} =
CommonAPI.post(user, %{
status: "hey-hey-hey @#{thread_muter.nickname}!",
in_reply_to_status_id: activity.id
})
[pre_mute_notification, post_mute_notification] =
Repo.all(from(n in Notification, where: n.user_id == ^thread_muter.id, order_by: n.id))
pre_mute_notification_id = pre_mute_notification.id
post_mute_notification_id = post_mute_notification.id
assert called(
Push.send(
:meck.is(fn
%Notification{id: ^pre_mute_notification_id} -> true
_ -> false
end)
)
)
refute called(
Push.send(
:meck.is(fn
%Notification{id: ^post_mute_notification_id} -> true
_ -> false
end)
)
)
end
end
describe "create_notification" do
@tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
%{user: user, token: oauth_token} = oauth_access(["read"])
task =
Task.async(fn ->
{:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token)
assert_receive {:render_with_user, _, _, _, _}, 4_000
end)
task_user_notification =
Task.async(fn ->
{:ok, _topic} =
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
assert_receive {:render_with_user, _, _, _, _}, 4_000
end)
activity = insert(:note_activity)
notify = Notification.create_notification(activity, user)
assert notify.user_id == user.id
Task.await(task)
Task.await(task_user_notification)
end
test "it creates a notification for user if the user blocks the activity author" do
activity = insert(:note_activity)
author = User.get_cached_by_ap_id(activity.data["actor"])
user = insert(:user)
{:ok, _user_relationship} = User.block(user, author)
assert Notification.create_notification(activity, user)
end
test "it creates a notification for the user if the user mutes the activity author" do
muter = insert(:user)
muted = insert(:user)
{:ok, _} = User.mute(muter, muted)
muter = Repo.get(User, muter.id)
{:ok, activity} = CommonAPI.post(muted, %{status: "Hi @#{muter.nickname}"})
notification = Notification.create_notification(activity, muter)
assert notification.id
assert notification.seen
end
test "notification created if user is muted without notifications" do
muter = insert(:user)
muted = insert(:user)
{:ok, _user_relationships} = User.mute(muter, muted, %{notifications: false})
{:ok, activity} = CommonAPI.post(muted, %{status: "Hi @#{muter.nickname}"})
assert Notification.create_notification(activity, muter)
end
test "it creates a notification for an activity from a muted thread" do
muter = insert(:user)
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(muter, %{status: "hey"})
CommonAPI.add_mute(muter, activity)
{:ok, activity} =
CommonAPI.post(other_user, %{
status: "Hi @#{muter.nickname}",
in_reply_to_status_id: activity.id
})
notification = Notification.create_notification(activity, muter)
assert notification.id
assert notification.seen
end
test "it disables notifications from strangers" do
follower = insert(:user)
@ -680,7 +526,7 @@ test "it sends notifications to addressed users in new messages" do
status: "hey @#{other_user.nickname}!"
})
{enabled_receivers, _disabled_receivers} = Notification.get_notified_from_activity(activity)
enabled_receivers = Notification.get_notified_from_activity(activity)
assert other_user in enabled_receivers
end
@ -712,7 +558,7 @@ test "it sends notifications to mentioned users in new messages" do
{:ok, activity} = Transmogrifier.handle_incoming(create_activity)
{enabled_receivers, _disabled_receivers} = Notification.get_notified_from_activity(activity)
enabled_receivers = Notification.get_notified_from_activity(activity)
assert other_user in enabled_receivers
end
@ -739,7 +585,7 @@ test "it does not send notifications to users who are only cc in new messages" d
{:ok, activity} = Transmogrifier.handle_incoming(create_activity)
{enabled_receivers, _disabled_receivers} = Notification.get_notified_from_activity(activity)
enabled_receivers = Notification.get_notified_from_activity(activity)
assert other_user not in enabled_receivers
end
@ -756,8 +602,7 @@ test "it does not send notification to mentioned users in likes" do
{:ok, activity_two} = CommonAPI.favorite(third_user, activity_one.id)
{enabled_receivers, _disabled_receivers} =
Notification.get_notified_from_activity(activity_two)
enabled_receivers = Notification.get_notified_from_activity(activity_two)
assert other_user not in enabled_receivers
end
@ -779,7 +624,7 @@ test "it only notifies the post's author in likes" do
|> Map.put("to", [other_user.ap_id | like_data["to"]])
|> ActivityPub.persist(local: true)
{enabled_receivers, _disabled_receivers} = Notification.get_notified_from_activity(like)
enabled_receivers = Notification.get_notified_from_activity(like)
assert other_user not in enabled_receivers
end
@ -796,39 +641,36 @@ test "it does not send notification to mentioned users in announces" do
{:ok, activity_two} = CommonAPI.repeat(activity_one.id, third_user)
{enabled_receivers, _disabled_receivers} =
Notification.get_notified_from_activity(activity_two)
enabled_receivers = Notification.get_notified_from_activity(activity_two)
assert other_user not in enabled_receivers
end
test "it returns blocking recipient in disabled recipients list" do
test "it does not return blocking recipient in recipients list" do
user = insert(:user)
other_user = insert(:user)
{:ok, _user_relationship} = User.block(other_user, user)
{:ok, activity} = CommonAPI.post(user, %{status: "hey @#{other_user.nickname}!"})
{enabled_receivers, disabled_receivers} = Notification.get_notified_from_activity(activity)
enabled_receivers = Notification.get_notified_from_activity(activity)
assert [] == enabled_receivers
assert [other_user] == disabled_receivers
end
test "it returns notification-muting recipient in disabled recipients list" do
test "it does not return notification-muting recipient in recipients list" do
user = insert(:user)
other_user = insert(:user)
{:ok, _user_relationships} = User.mute(other_user, user)
{:ok, activity} = CommonAPI.post(user, %{status: "hey @#{other_user.nickname}!"})
{enabled_receivers, disabled_receivers} = Notification.get_notified_from_activity(activity)
enabled_receivers = Notification.get_notified_from_activity(activity)
assert [] == enabled_receivers
assert [other_user] == disabled_receivers
end
test "it returns thread-muting recipient in disabled recipients list" do
test "it does not return thread-muting recipient in recipients list" do
user = insert(:user)
other_user = insert(:user)
@ -842,14 +684,12 @@ test "it returns thread-muting recipient in disabled recipients list" do
in_reply_to_status_id: activity.id
})
{enabled_receivers, disabled_receivers} =
Notification.get_notified_from_activity(same_context_activity)
enabled_receivers = Notification.get_notified_from_activity(same_context_activity)
assert [other_user] == disabled_receivers
refute other_user in enabled_receivers
end
test "it returns non-following domain-blocking recipient in disabled recipients list" do
test "it does not return non-following domain-blocking recipient in recipients list" do
blocked_domain = "blocked.domain"
user = insert(:user, %{ap_id: "https://#{blocked_domain}/@actor"})
other_user = insert(:user)
@ -858,10 +698,9 @@ test "it returns non-following domain-blocking recipient in disabled recipients
{:ok, activity} = CommonAPI.post(user, %{status: "hey @#{other_user.nickname}!"})
{enabled_receivers, disabled_receivers} = Notification.get_notified_from_activity(activity)
enabled_receivers = Notification.get_notified_from_activity(activity)
assert [] == enabled_receivers
assert [other_user] == disabled_receivers
end
test "it returns following domain-blocking recipient in enabled recipients list" do
@ -874,10 +713,9 @@ test "it returns following domain-blocking recipient in enabled recipients list"
{:ok, activity} = CommonAPI.post(user, %{status: "hey @#{other_user.nickname}!"})
{enabled_receivers, disabled_receivers} = Notification.get_notified_from_activity(activity)
enabled_receivers = Notification.get_notified_from_activity(activity)
assert [other_user] == enabled_receivers
assert [] == disabled_receivers
end
test "it sends edited notifications to those who repeated a status" do
@ -897,11 +735,10 @@ test "it sends edited notifications to those who repeated a status" do
status: "hey @#{other_user.nickname}! mew mew"
})
{enabled_receivers, _disabled_receivers} =
Notification.get_notified_from_activity(edit_activity)
enabled_receivers = Notification.get_notified_from_activity(edit_activity)
assert repeated_user in enabled_receivers
assert other_user not in enabled_receivers
refute other_user in enabled_receivers
end
end
@ -1189,13 +1026,13 @@ test "it doesn't return notifications for muted thread", %{user: user} do
assert Notification.for_user(user) == []
end
test "it returns notifications from a muted user when with_muted is set", %{user: user} do
test "it doesn't return notifications from a muted user when with_muted is set", %{user: user} do
muted = insert(:user)
{:ok, _user_relationships} = User.mute(user, muted)
{:ok, _activity} = CommonAPI.post(muted, %{status: "hey @#{user.nickname}"})
assert length(Notification.for_user(user, %{with_muted: true})) == 1
assert Enum.empty?(Notification.for_user(user, %{with_muted: true}))
end
test "it doesn't return notifications from a blocked user when with_muted is set", %{

View file

@ -827,31 +827,6 @@ test "creates a notification", %{announce: announce, poster: poster} do
{:ok, announce, _} = SideEffects.handle(announce)
assert Repo.get_by(Notification, user_id: poster.id, activity_id: announce.id)
end
test "it streams out the announce", %{announce: announce} do
with_mocks([
{
Pleroma.Web.Streamer,
[],
[
stream: fn _, _ -> nil end
]
},
{
Pleroma.Web.Push,
[],
[
send: fn _ -> nil end
]
}
]) do
{:ok, announce, _} = SideEffects.handle(announce)
assert called(Pleroma.Web.Streamer.stream(["user", "list"], announce))
assert called(Pleroma.Web.Push.send(:_))
end
end
end
describe "removing a follower" do