From 115d08a7542b92c5e1d889da41c0ee6837a1235e Mon Sep 17 00:00:00 2001 From: lain Date: Fri, 5 Jun 2020 16:47:02 +0200 Subject: [PATCH] Pipeline: Add a side effects step after the transaction finishes This is to run things like streaming notifications out, which will sometimes need data that is created by the transaction, but is streamed out asynchronously. --- lib/pleroma/notification.ex | 26 ++++-- lib/pleroma/web/activity_pub/pipeline.ex | 4 + lib/pleroma/web/activity_pub/side_effects.ex | 30 ++++++- test/web/activity_pub/pipeline_test.exs | 9 +- test/web/activity_pub/side_effects_test.exs | 86 +++++++++++++++++--- test/web/common_api/common_api_test.exs | 43 ++++++++-- 6 files changed, 169 insertions(+), 29 deletions(-) diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index e5b880b105..49e27c05a4 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -334,30 +334,34 @@ def dismiss(%{id: user_id} = _user, id) do end end - def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do + def create_notifications(activity, options \\ []) + + def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do object = Object.normalize(activity, false) if object && object.data["type"] == "Answer" do {:ok, []} else - do_create_notifications(activity) + do_create_notifications(activity, options) end end - def create_notifications(%Activity{data: %{"type" => type}} = activity) + def create_notifications(%Activity{data: %{"type" => type}} = activity, options) when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do - do_create_notifications(activity) + do_create_notifications(activity, options) end - def create_notifications(_), do: {:ok, []} + def create_notifications(_, _), do: {:ok, []} + + defp do_create_notifications(%Activity{} = activity, options) do + do_send = Keyword.get(options, :do_send, true) - defp do_create_notifications(%Activity{} = activity) do {enabled_receivers, disabled_receivers} = get_notified_from_activity(activity) potential_receivers = enabled_receivers ++ disabled_receivers notifications = Enum.map(potential_receivers, fn user -> - do_send = user in enabled_receivers + do_send = do_send && user in enabled_receivers create_notification(activity, user, do_send) end) @@ -623,4 +627,12 @@ def skip?(:recently_followed, %Activity{data: %{"type" => "Follow"}} = activity, end def skip?(_, _, _), do: false + + def for_user_and_activity(user, activity) do + from(n in __MODULE__, + where: n.user_id == ^user.id, + where: n.activity_id == ^activity.id + ) + |> Repo.one() + end end diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex index 0c54c4b234..6875c47f67 100644 --- a/lib/pleroma/web/activity_pub/pipeline.ex +++ b/lib/pleroma/web/activity_pub/pipeline.ex @@ -17,6 +17,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do {:ok, Activity.t() | Object.t(), keyword()} | {:error, any()} def common_pipeline(object, meta) do case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do + {:ok, {:ok, activity, meta}} -> + SideEffects.handle_after_transaction(meta) + {:ok, activity, meta} + {:ok, value} -> value diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index b3aacff406..10136789a5 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -16,6 +16,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do alias Pleroma.Web.ActivityPub.Pipeline alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Streamer + alias Pleroma.Web.Push def handle(object, meta \\ []) @@ -37,7 +38,12 @@ def handle(%{data: %{"type" => "Like"}} = object, meta) do # - Set up notifications def handle(%{data: %{"type" => "Create"}} = activity, meta) do with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do - Notification.create_notifications(activity) + {:ok, notifications} = Notification.create_notifications(activity, do_send: false) + + meta = + meta + |> add_notifications(notifications) + {:ok, activity, meta} else e -> Repo.rollback(e) @@ -200,4 +206,26 @@ def handle_undoing( end def handle_undoing(object), do: {:error, ["don't know how to handle", object]} + + defp send_notifications(meta) do + Keyword.get(meta, :created_notifications, []) + |> Enum.each(fn notification -> + Streamer.stream(["user", "user:notification"], notification) + Push.send(notification) + end) + + meta + end + + defp add_notifications(meta, notifications) do + existing = Keyword.get(meta, :created_notifications, []) + + meta + |> Keyword.put(:created_notifications, notifications ++ existing) + end + + def handle_after_transaction(meta) do + meta + |> send_notifications() + end end diff --git a/test/web/activity_pub/pipeline_test.exs b/test/web/activity_pub/pipeline_test.exs index 26557720b5..8deb645013 100644 --- a/test/web/activity_pub/pipeline_test.exs +++ b/test/web/activity_pub/pipeline_test.exs @@ -33,7 +33,10 @@ test "it goes through validation, filtering, persisting, side effects and federa { Pleroma.Web.ActivityPub.SideEffects, [], - [handle: fn o, m -> {:ok, o, m} end] + [ + handle: fn o, m -> {:ok, o, m} end, + handle_after_transaction: fn m -> m end + ] }, { Pleroma.Web.Federator, @@ -71,7 +74,7 @@ test "it goes through validation, filtering, persisting, side effects without fe { Pleroma.Web.ActivityPub.SideEffects, [], - [handle: fn o, m -> {:ok, o, m} end] + [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end] }, { Pleroma.Web.Federator, @@ -110,7 +113,7 @@ test "it goes through validation, filtering, persisting, side effects without fe { Pleroma.Web.ActivityPub.SideEffects, [], - [handle: fn o, m -> {:ok, o, m} end] + [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end] }, { Pleroma.Web.Federator, diff --git a/test/web/activity_pub/side_effects_test.exs b/test/web/activity_pub/side_effects_test.exs index 40df664eb5..43ffe13375 100644 --- a/test/web/activity_pub/side_effects_test.exs +++ b/test/web/activity_pub/side_effects_test.exs @@ -22,6 +22,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do import Pleroma.Factory import Mock + describe "handle_after_transaction" do + test "it streams out notifications" do + author = insert(:user, local: true) + recipient = insert(:user, local: true) + + {:ok, chat_message_data, _meta} = Builder.chat_message(author, recipient.ap_id, "hey") + + {:ok, create_activity_data, _meta} = + Builder.create(author, chat_message_data["id"], [recipient.ap_id]) + + {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false) + + {:ok, _create_activity, meta} = + SideEffects.handle(create_activity, local: false, object_data: chat_message_data) + + assert [notification] = meta[:created_notifications] + + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> nil end + ] + }, + { + Pleroma.Web.Push, + [], + [ + send: fn _ -> nil end + ] + } + ]) do + SideEffects.handle_after_transaction(meta) + + assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + assert called(Pleroma.Web.Push.send(notification)) + end + end + end + describe "delete objects" do setup do user = insert(:user) @@ -361,22 +402,47 @@ test "it creates a Chat and ChatMessageReferences for the local users and bumps {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false) - {:ok, _create_activity, _meta} = - SideEffects.handle(create_activity, local: false, object_data: chat_message_data) + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> nil end + ] + }, + { + Pleroma.Web.Push, + [], + [ + send: fn _ -> nil end + ] + } + ]) do + {:ok, _create_activity, meta} = + SideEffects.handle(create_activity, local: false, object_data: chat_message_data) - chat = Chat.get(author.id, recipient.ap_id) + # The notification gets created + assert [notification] = meta[:created_notifications] + assert notification.activity_id == create_activity.id - [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() + # But it is not sent out + refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + refute called(Pleroma.Web.Push.send(notification)) - assert cm_ref.object.data["content"] == "hey" - assert cm_ref.unread == false + chat = Chat.get(author.id, recipient.ap_id) - chat = Chat.get(recipient.id, author.ap_id) + [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() - [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() + assert cm_ref.object.data["content"] == "hey" + assert cm_ref.unread == false - assert cm_ref.object.data["content"] == "hey" - assert cm_ref.unread == true + chat = Chat.get(recipient.id, author.ap_id) + + [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() + + assert cm_ref.object.data["content"] == "hey" + assert cm_ref.unread == true + end end test "it creates a Chat for the local users and bumps the unread count" do diff --git a/test/web/common_api/common_api_test.exs b/test/web/common_api/common_api_test.exs index 611a9ae66c..63b59820e9 100644 --- a/test/web/common_api/common_api_test.exs +++ b/test/web/common_api/common_api_test.exs @@ -7,6 +7,7 @@ defmodule Pleroma.Web.CommonAPITest do alias Pleroma.Activity alias Pleroma.Chat alias Pleroma.Conversation.Participation + alias Pleroma.Notification alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub @@ -39,15 +40,41 @@ test "it posts a chat message without content but with an attachment" do {:ok, upload} = ActivityPub.upload(file, actor: author.ap_id) - {:ok, activity} = - CommonAPI.post_chat_message( - author, - recipient, - nil, - media_id: upload.id - ) + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> + nil + end + ] + }, + { + Pleroma.Web.Push, + [], + [ + send: fn _ -> nil end + ] + } + ]) do + {:ok, activity} = + CommonAPI.post_chat_message( + author, + recipient, + nil, + media_id: upload.id + ) - assert activity + notification = + Notification.for_user_and_activity(recipient, activity) + |> Repo.preload(:activity) + + assert called(Pleroma.Web.Push.send(notification)) + assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + + assert activity + end end test "it adds html newlines" do