diff --git a/.gitignore b/.gitignore
index 4e71a7df0d..3b0c7d3614 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,3 +43,7 @@ docs/generated_config.md
# Code test coverage
/cover
/Elixir.*.coverdata
+
+.idea
+pleroma.iml
+
diff --git a/config/config.exs b/config/config.exs
index ab6e00c984..b1b98af93f 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -331,6 +331,10 @@
follow_handshake_timeout: 500,
sign_object_fetches: true
+config :pleroma, :streamer,
+ workers: 3,
+ overflow_workers: 2
+
config :pleroma, :user, deny_follow_blocked: true
config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default
diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex
new file mode 100644
index 0000000000..010897abcc
--- /dev/null
+++ b/lib/pleroma/activity/ir/topics.ex
@@ -0,0 +1,63 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Activity.Ir.Topics do
+ alias Pleroma.Object
+ alias Pleroma.Web.ActivityPub.Visibility
+
+ def get_activity_topics(activity) do
+ activity
+ |> Object.normalize()
+ |> generate_topics(activity)
+ |> List.flatten()
+ end
+
+ defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
+ []
+ end
+
+ defp generate_topics(object, activity) do
+ ["user", "list"] ++ visibility_tags(object, activity)
+ end
+
+ defp visibility_tags(object, activity) do
+ case Visibility.get_visibility(activity) do
+ "public" ->
+ if activity.local do
+ ["public", "public:local"]
+ else
+ ["public"]
+ end
+ |> item_creation_tags(object, activity)
+
+ "direct" ->
+ ["direct"]
+
+ _ ->
+ []
+ end
+ end
+
+ defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
+ tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
+ end
+
+ defp item_creation_tags(tags, _, _) do
+ tags
+ end
+
+ defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
+ tags
+ |> Enum.filter(&is_bitstring(&1))
+ |> Enum.map(fn tag -> "hashtag:" <> tag end)
+ end
+
+ defp hashtags_to_topics(_), do: []
+
+ defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
+
+ defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
+
+ defp attachment_topics(_object, _act), do: ["public:media"]
+end
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 49094704b3..3b37ce630b 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -141,7 +141,7 @@ defp oauth_cleanup_enabled?,
defp streamer_child(:test), do: []
defp streamer_child(_) do
- [Pleroma.Web.Streamer]
+ [Pleroma.Web.Streamer.supervisor()]
end
defp oauth_cleanup_child(true),
diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index b7c880c515..8012389ac3 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -210,8 +210,10 @@ def create_notification(%Activity{} = activity, %User{} = user) do
unless skip?(activity, user) do
notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification)
- Streamer.stream("user", notification)
- Streamer.stream("user:notification", notification)
+
+ ["user", "user:notification"]
+ |> Streamer.stream(notification)
+
Push.send(notification)
notification
end
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 41f6a0f1f7..bc5ae7fbf9 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
+ alias Pleroma.Activity.Ir.Topics
alias Pleroma.Config
alias Pleroma.Conversation
alias Pleroma.Notification
@@ -16,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
+ alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker
@@ -187,9 +189,7 @@ def stream_out_participations(participations) do
participations
|> Repo.preload(:user)
- Enum.each(participations, fn participation ->
- Pleroma.Web.Streamer.stream("participation", participation)
- end)
+ Streamer.stream("participation", participations)
end
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
@@ -208,41 +208,15 @@ def stream_out_participations(%Object{data: %{"context" => context}}, user) do
def stream_out_participations(_, _), do: :noop
- def stream_out(activity) do
- if activity.data["type"] in ["Create", "Announce", "Delete"] do
- object = Object.normalize(activity)
- # Do not stream out poll replies
- unless object.data["type"] == "Answer" do
- Pleroma.Web.Streamer.stream("user", activity)
- Pleroma.Web.Streamer.stream("list", activity)
+ def stream_out(%Activity{data: %{"type" => data_type}} = activity)
+ when data_type in ["Create", "Announce", "Delete"] do
+ activity
+ |> Topics.get_activity_topics()
+ |> Streamer.stream(activity)
+ end
- if get_visibility(activity) == "public" do
- Pleroma.Web.Streamer.stream("public", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local", activity)
- end
-
- if activity.data["type"] in ["Create"] do
- object.data
- |> Map.get("tag", [])
- |> Enum.filter(fn tag -> is_bitstring(tag) end)
- |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
-
- if object.data["attachment"] != [] do
- Pleroma.Web.Streamer.stream("public:media", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local:media", activity)
- end
- end
- end
- else
- if get_visibility(activity) == "direct",
- do: Pleroma.Web.Streamer.stream("direct", activity)
- end
- end
- end
+ def stream_out(_activity) do
+ :noop
end
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index dbd3542ead..3c26eb4069 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Web.Streamer
@behaviour :cowboy_websocket
@@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
]
@anonymous_streams ["public", "public:local", "hashtag"]
- # Handled by periodic keepalive in Pleroma.Web.Streamer.
+ # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
@timeout :infinity
def init(%{qs: qs} = req, state) do
@@ -65,7 +66,7 @@ def websocket_info(:subscribe, state) do
}, topic #{state.topic}"
)
- Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
+ Streamer.add_socket(state.topic, streamer_socket(state))
{:ok, state}
end
@@ -80,7 +81,7 @@ def terminate(reason, _req, state) do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
- Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
+ Streamer.remove_socket(state.topic, streamer_socket(state))
:ok
end
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
deleted file mode 100644
index 587c43f401..0000000000
--- a/lib/pleroma/web/streamer.ex
+++ /dev/null
@@ -1,318 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer do
- use GenServer
- require Logger
- alias Pleroma.Activity
- alias Pleroma.Config
- alias Pleroma.Conversation.Participation
- alias Pleroma.Notification
- alias Pleroma.Object
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.MastodonAPI.NotificationView
-
- @keepalive_interval :timer.seconds(30)
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
- end
-
- def add_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
- end
-
- def remove_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
- end
-
- def stream(topic, item) do
- GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
- end
-
- def init(args) do
- Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
- {:ok, args}
- end
-
- def handle_info(%{action: :ping}, topics) do
- topics
- |> Map.values()
- |> List.flatten()
- |> Enum.each(fn socket ->
- Logger.debug("Sending keepalive ping")
- send(socket.transport_pid, {:text, ""})
- end)
-
- Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
-
- Enum.each(recipient_topics || [], fn user_topic ->
- Logger.debug("Trying to push direct message to #{user_topic}\n\n")
- push_to_socket(topics, user_topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
- user_topic = "direct:#{participation.user_id}"
- Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
-
- push_to_socket(topics, user_topic, participation)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
- # filter the recipient list if the activity is not public, see #270.
- recipient_lists =
- case Visibility.is_public?(item) do
- true ->
- Pleroma.List.get_lists_from_activity(item)
-
- _ ->
- Pleroma.List.get_lists_from_activity(item)
- |> Enum.filter(fn list ->
- owner = User.get_cached_by_id(list.user_id)
-
- Visibility.visible_for_user?(item, owner)
- end)
- end
-
- recipient_topics =
- recipient_lists
- |> Enum.map(fn %{id: id} -> "list:#{id}" end)
-
- Enum.each(recipient_topics || [], fn list_topic ->
- Logger.debug("Trying to push message to #{list_topic}\n\n")
- push_to_socket(topics, list_topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(
- %{action: :stream, topic: topic, item: %Notification{} = item},
- topics
- )
- when topic in ["user", "user:notification"] do
- topics
- |> Map.get("#{topic}:#{item.user_id}", [])
- |> Enum.each(fn socket ->
- with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
- true <- should_send?(user, item) do
- send(
- socket.transport_pid,
- {:text, represent_notification(socket.assigns[:user], item)}
- )
- end
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
- Logger.debug("Trying to push to users")
-
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "user:#{id}" end)
-
- Enum.each(recipient_topics, fn topic ->
- push_to_socket(topics, topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
- Logger.debug("Trying to push to #{topic}")
- Logger.debug("Pushing item to #{topic}")
- push_to_socket(topics, topic, item)
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Got new conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = List.delete(sockets_for_topic, socket)
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Removed conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(m, state) do
- Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
- {:noreply, state}
- end
-
- defp represent_update(%Activity{} = activity, %User{} = user) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity,
- for: user
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- defp represent_update(%Activity{} = activity) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- def represent_conversation(%Participation{} = participation) do
- %{
- event: "conversation",
- payload:
- Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
- participation: participation,
- for: participation.user
- })
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- @spec represent_notification(User.t(), Notification.t()) :: binary()
- defp represent_notification(%User{} = user, %Notification{} = notify) do
- %{
- event: "notification",
- payload:
- NotificationView.render(
- "show.json",
- %{notification: notify, for: user}
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- defp should_send?(%User{} = user, %Activity{} = item) do
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
- reblog_mutes = user.info.muted_reblogs || []
- domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
-
- with parent when not is_nil(parent) <- Object.normalize(item),
- true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
- true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
- %{host: item_host} <- URI.parse(item.actor),
- %{host: parent_host} <- URI.parse(parent.data["actor"]),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
- true <- thread_containment(item, user),
- false <- CommonAPI.thread_muted?(user, item) do
- true
- else
- _ -> false
- end
- end
-
- defp should_send?(%User{} = user, %Notification{activity: activity}) do
- should_send?(user, activity)
- end
-
- def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
-
- if should_send?(user, item) do
- send(socket.transport_pid, {:text, represent_update(item, user)})
- end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
- end)
- end
-
- def push_to_socket(topics, topic, %Participation{} = participation) do
- Enum.each(topics[topic] || [], fn socket ->
- send(socket.transport_pid, {:text, represent_conversation(participation)})
- end)
- end
-
- def push_to_socket(topics, topic, %Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
- }) do
- Enum.each(topics[topic] || [], fn socket ->
- send(
- socket.transport_pid,
- {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
- )
- end)
- end
-
- def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
-
- def push_to_socket(topics, topic, item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
-
- with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
- true <- thread_containment(item, user) do
- send(socket.transport_pid, {:text, represent_update(item, user)})
- end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
- end)
- end
-
- defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
- "#{topic}:#{socket.assigns[:user].id}"
- end
-
- defp internal_topic(topic, _), do: topic
-
- @spec thread_containment(Activity.t(), User.t()) :: boolean()
- defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
-
- defp thread_containment(activity, user) do
- if Config.get([:instance, :skip_thread_containment]) do
- true
- else
- ActivityPub.contain_activity(activity, user)
- end
- end
-end
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex
new file mode 100644
index 0000000000..f77cbb95c6
--- /dev/null
+++ b/lib/pleroma/web/streamer/ping.ex
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Ping do
+ use GenServer
+ require Logger
+
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ @keepalive_interval :timer.seconds(30)
+
+ def start_link(opts) do
+ ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
+ GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
+ end
+
+ def init(%{ping_interval: ping_interval} = args) do
+ Process.send_after(self(), :ping, ping_interval)
+ {:ok, args}
+ end
+
+ def handle_info(:ping, %{ping_interval: ping_interval} = state) do
+ State.get_sockets()
+ |> Map.values()
+ |> List.flatten()
+ |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
+ Logger.debug("Sending keepalive ping")
+ send(transport_pid, {:text, ""})
+ end)
+
+ Process.send_after(self(), :ping, ping_interval)
+
+ {:noreply, state}
+ end
+end
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
new file mode 100644
index 0000000000..7b5199068f
--- /dev/null
+++ b/lib/pleroma/web/streamer/state.ex
@@ -0,0 +1,68 @@
+defmodule Pleroma.Web.Streamer.State do
+ use GenServer
+ require Logger
+
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
+ end
+
+ def add_socket(topic, socket) do
+ GenServer.call(__MODULE__, {:add, socket, topic})
+ end
+
+ def remove_socket(topic, socket) do
+ GenServer.call(__MODULE__, {:remove, socket, topic})
+ end
+
+ def get_sockets do
+ %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
+ stream_sockets
+ end
+
+ def init(init_arg) do
+ {:ok, init_arg}
+ end
+
+ def handle_call(:get_state, _from, state) do
+ {:reply, state, state}
+ end
+
+ def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.insert_at(0, stream_socket)
+ |> Enum.uniq()
+
+ state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ Logger.debug("Got new conn for #{topic}")
+ {:reply, state, state}
+ end
+
+ def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.delete(stream_socket)
+
+ state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ {:reply, state, state}
+ end
+
+ defp internal_topic(topic, socket)
+ when topic in ~w[user user:notification direct] do
+ "#{topic}:#{socket.assigns[:user].id}"
+ end
+
+ defp internal_topic(topic, _) do
+ topic
+ end
+end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
new file mode 100644
index 0000000000..8cf719277f
--- /dev/null
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -0,0 +1,55 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.Streamer do
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.Worker
+
+ @timeout 60_000
+ @mix_env Mix.env()
+
+ def add_socket(topic, socket) do
+ State.add_socket(topic, socket)
+ end
+
+ def remove_socket(topic, socket) do
+ State.remove_socket(topic, socket)
+ end
+
+ def get_sockets do
+ State.get_sockets()
+ end
+
+ def stream(topics, items) do
+ if should_send?() do
+ Task.async(fn ->
+ :poolboy.transaction(
+ :streamer_worker,
+ &Worker.stream(&1, topics, items),
+ @timeout
+ )
+ end)
+ end
+ end
+
+ def supervisor, do: Pleroma.Web.Streamer.Supervisor
+
+ defp should_send? do
+ handle_should_send(@mix_env)
+ end
+
+ defp handle_should_send(:test) do
+ case Process.whereis(:streamer_worker) do
+ nil ->
+ false
+
+ pid ->
+ Process.alive?(pid)
+ end
+ end
+
+ defp handle_should_send(_) do
+ true
+ end
+end
diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex
new file mode 100644
index 0000000000..f006c03061
--- /dev/null
+++ b/lib/pleroma/web/streamer/streamer_socket.ex
@@ -0,0 +1,31 @@
+defmodule Pleroma.Web.Streamer.StreamerSocket do
+ defstruct transport_pid: nil, user: nil
+
+ alias Pleroma.User
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ def from_socket(%{
+ transport_pid: transport_pid,
+ assigns: %{user: nil}
+ }) do
+ %StreamerSocket{
+ transport_pid: transport_pid
+ }
+ end
+
+ def from_socket(%{
+ transport_pid: transport_pid,
+ assigns: %{user: %User{} = user}
+ }) do
+ %StreamerSocket{
+ transport_pid: transport_pid,
+ user: user
+ }
+ end
+
+ def from_socket(%{transport_pid: transport_pid}) do
+ %StreamerSocket{
+ transport_pid: transport_pid
+ }
+ end
+end
diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex
new file mode 100644
index 0000000000..6afe19323a
--- /dev/null
+++ b/lib/pleroma/web/streamer/supervisor.ex
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Supervisor do
+ use Supervisor
+
+ def start_link(opts) do
+ Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
+ end
+
+ def init(args) do
+ children = [
+ {Pleroma.Web.Streamer.State, args},
+ {Pleroma.Web.Streamer.Ping, args},
+ :poolboy.child_spec(:streamer_worker, poolboy_config())
+ ]
+
+ opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
+ Supervisor.init(children, opts)
+ end
+
+ defp poolboy_config do
+ opts =
+ Pleroma.Config.get(:streamer,
+ workers: 3,
+ overflow_workers: 2
+ )
+
+ [
+ {:name, {:local, :streamer_worker}},
+ {:worker_module, Pleroma.Web.Streamer.Worker},
+ {:size, opts[:workers]},
+ {:max_overflow, opts[:overflow_workers]}
+ ]
+ end
+end
diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex
new file mode 100644
index 0000000000..5804508eb1
--- /dev/null
+++ b/lib/pleroma/web/streamer/worker.ex
@@ -0,0 +1,220 @@
+defmodule Pleroma.Web.Streamer.Worker do
+ use GenServer
+
+ require Logger
+
+ alias Pleroma.Activity
+ alias Pleroma.Config
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.Object
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Visibility
+ alias Pleroma.Web.CommonAPI
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.StreamerSocket
+ alias Pleroma.Web.StreamerView
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, %{}, [])
+ end
+
+ def init(init_arg) do
+ {:ok, init_arg}
+ end
+
+ def stream(pid, topics, items) do
+ GenServer.call(pid, {:stream, topics, items})
+ end
+
+ def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
+ Enum.each(topics, fn t ->
+ do_stream(%{topic: t, item: item})
+ end)
+
+ {:reply, state, state}
+ end
+
+ def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
+ Enum.each(items, fn i ->
+ do_stream(%{topic: topic, item: i})
+ end)
+
+ {:reply, state, state}
+ end
+
+ def handle_call({:stream, topic, item}, _from, state) do
+ do_stream(%{topic: topic, item: item})
+
+ {:reply, state, state}
+ end
+
+ defp do_stream(%{topic: "direct", item: item}) do
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+ Enum.each(recipient_topics, fn user_topic ->
+ Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+ push_to_socket(State.get_sockets(), user_topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: "participation", item: participation}) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(State.get_sockets(), user_topic, participation)
+ end
+
+ defp do_stream(%{topic: "list", item: item}) do
+ # filter the recipient list if the activity is not public, see #270.
+ recipient_lists =
+ case Visibility.is_public?(item) do
+ true ->
+ Pleroma.List.get_lists_from_activity(item)
+
+ _ ->
+ Pleroma.List.get_lists_from_activity(item)
+ |> Enum.filter(fn list ->
+ owner = User.get_cached_by_id(list.user_id)
+
+ Visibility.visible_for_user?(item, owner)
+ end)
+ end
+
+ recipient_topics =
+ recipient_lists
+ |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+ Enum.each(recipient_topics, fn list_topic ->
+ Logger.debug("Trying to push message to #{list_topic}\n\n")
+ push_to_socket(State.get_sockets(), list_topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: topic, item: %Notification{} = item})
+ when topic in ["user", "user:notification"] do
+ State.get_sockets()
+ |> Map.get("#{topic}:#{item.user_id}", [])
+ |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
+ with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
+ true <- should_send?(user, item) do
+ send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
+ end
+ end)
+ end
+
+ defp do_stream(%{topic: "user", item: item}) do
+ Logger.debug("Trying to push to users")
+
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+ Enum.each(recipient_topics, fn topic ->
+ push_to_socket(State.get_sockets(), topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: topic, item: item}) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ push_to_socket(State.get_sockets(), topic, item)
+ end
+
+ defp should_send?(%User{} = user, %Activity{} = item) do
+ blocks = user.info.blocks || []
+ mutes = user.info.mutes || []
+ reblog_mutes = user.info.muted_reblogs || []
+ domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
+
+ with parent when not is_nil(parent) <- Object.normalize(item),
+ true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
+ true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
+ %{host: item_host} <- URI.parse(item.actor),
+ %{host: parent_host} <- URI.parse(parent.data["actor"]),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+ true <- thread_containment(item, user),
+ false <- CommonAPI.thread_muted?(user, item) do
+ true
+ else
+ _ -> false
+ end
+ end
+
+ defp should_send?(%User{} = user, %Notification{activity: activity}) do
+ should_send?(user, activity)
+ end
+
+ def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{
+ transport_pid: transport_pid,
+ user: socket_user
+ } ->
+ # Get the current user so we have up-to-date blocks etc.
+ if socket_user do
+ user = User.get_cached_by_ap_id(socket_user.ap_id)
+
+ if should_send?(user, item) do
+ send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+ end
+ else
+ send(transport_pid, {:text, StreamerView.render("update.json", item)})
+ end
+ end)
+ end
+
+ def push_to_socket(topics, topic, %Participation{} = participation) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+ send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
+ end)
+ end
+
+ def push_to_socket(topics, topic, %Activity{
+ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+ }) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+ send(
+ transport_pid,
+ {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
+ )
+ end)
+ end
+
+ def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+ def push_to_socket(topics, topic, item) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{
+ transport_pid: transport_pid,
+ user: socket_user
+ } ->
+ # Get the current user so we have up-to-date blocks etc.
+ if socket_user do
+ user = User.get_cached_by_ap_id(socket_user.ap_id)
+ blocks = user.info.blocks || []
+ mutes = user.info.mutes || []
+
+ with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
+ true <- thread_containment(item, user) do
+ send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+ end
+ else
+ send(transport_pid, {:text, StreamerView.render("update.json", item)})
+ end
+ end)
+ end
+
+ @spec thread_containment(Activity.t(), User.t()) :: boolean()
+ defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
+
+ defp thread_containment(activity, user) do
+ if Config.get([:instance, :skip_thread_containment]) do
+ true
+ else
+ ActivityPub.contain_activity(activity, user)
+ end
+ end
+end
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
new file mode 100644
index 0000000000..b13030fa0a
--- /dev/null
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -0,0 +1,66 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.StreamerView do
+ use Pleroma.Web, :view
+
+ alias Pleroma.Activity
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.User
+ alias Pleroma.Web.MastodonAPI.NotificationView
+
+ def render("update.json", %Activity{} = activity, %User{} = user) do
+ %{
+ event: "update",
+ payload:
+ Pleroma.Web.MastodonAPI.StatusView.render(
+ "status.json",
+ activity: activity,
+ for: user
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("notification.json", %User{} = user, %Notification{} = notify) do
+ %{
+ event: "notification",
+ payload:
+ NotificationView.render(
+ "show.json",
+ %{notification: notify, for: user}
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("update.json", %Activity{} = activity) do
+ %{
+ event: "update",
+ payload:
+ Pleroma.Web.MastodonAPI.StatusView.render(
+ "status.json",
+ activity: activity
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("conversation.json", %Participation{} = participation) do
+ %{
+ event: "conversation",
+ payload:
+ Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
+ participation: participation,
+ for: participation.user
+ })
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+end
diff --git a/mix.exs b/mix.exs
index f1e98585bd..911ebad1d4 100644
--- a/mix.exs
+++ b/mix.exs
@@ -144,6 +144,7 @@ defp deps do
git: "https://git.pleroma.social/pleroma/http_signatures.git",
ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"},
{:telemetry, "~> 0.3"},
+ {:poolboy, "~> 1.5"},
{:prometheus_ex, "~> 3.0"},
{:prometheus_plugs, "~> 1.1"},
{:prometheus_phoenix, "~> 1.3"},
diff --git a/mix.lock b/mix.lock
index 41697dd5c0..0bf6a811ee 100644
--- a/mix.lock
+++ b/mix.lock
@@ -73,6 +73,7 @@
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
"plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
+ "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"},
"postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"},
"prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"},
diff --git a/test/activity/ir/topics_test.exs b/test/activity/ir/topics_test.exs
new file mode 100644
index 0000000000..e75f83586a
--- /dev/null
+++ b/test/activity/ir/topics_test.exs
@@ -0,0 +1,141 @@
+defmodule Pleroma.Activity.Ir.TopicsTest do
+ use Pleroma.DataCase
+
+ alias Pleroma.Activity
+ alias Pleroma.Activity.Ir.Topics
+ alias Pleroma.Object
+
+ require Pleroma.Constants
+
+ describe "poll answer" do
+ test "produce no topics" do
+ activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}
+
+ assert [] == Topics.get_activity_topics(activity)
+ end
+ end
+
+ describe "non poll answer" do
+ test "always add user and list topics" do
+ activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}}
+ topics = Topics.get_activity_topics(activity)
+
+ assert Enum.member?(topics, "user")
+ assert Enum.member?(topics, "list")
+ end
+ end
+
+ describe "public visibility" do
+ setup do
+ activity = %Activity{
+ object: %Object{data: %{"type" => "Note"}},
+ data: %{"to" => [Pleroma.Constants.as_public()]}
+ }
+
+ {:ok, activity: activity}
+ end
+
+ test "produces public topic", %{activity: activity} do
+ topics = Topics.get_activity_topics(activity)
+
+ assert Enum.member?(topics, "public")
+ end
+
+ test "local action produces public:local topic", %{activity: activity} do
+ activity = %{activity | local: true}
+ topics = Topics.get_activity_topics(activity)
+
+ assert Enum.member?(topics, "public:local")
+ end
+
+ test "non-local action does not produce public:local topic", %{activity: activity} do
+ activity = %{activity | local: false}
+ topics = Topics.get_activity_topics(activity)
+
+ refute Enum.member?(topics, "public:local")
+ end
+ end
+
+ describe "public visibility create events" do
+ setup do
+ activity = %Activity{
+ object: %Object{data: %{"type" => "Create", "attachment" => []}},
+ data: %{"to" => [Pleroma.Constants.as_public()]}
+ }
+
+ {:ok, activity: activity}
+ end
+
+ test "with no attachments doesn't produce public:media topics", %{activity: activity} do
+ topics = Topics.get_activity_topics(activity)
+
+ refute Enum.member?(topics, "public:media")
+ refute Enum.member?(topics, "public:local:media")
+ end
+
+ test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
+ tagged_data = Map.put(data, "tag", ["foo", "bar"])
+ activity = %{activity | object: %{object | data: tagged_data}}
+
+ topics = Topics.get_activity_topics(activity)
+
+ assert Enum.member?(topics, "hashtag:foo")
+ assert Enum.member?(topics, "hashtag:bar")
+ end
+
+ test "only converts strinngs to hash tags", %{
+ activity: %{object: %{data: data} = object} = activity
+ } do
+ tagged_data = Map.put(data, "tag", [2])
+ activity = %{activity | object: %{object | data: tagged_data}}
+
+ topics = Topics.get_activity_topics(activity)
+
+ refute Enum.member?(topics, "hashtag:2")
+ end
+ end
+
+ describe "public visibility create events with attachments" do
+ setup do
+ activity = %Activity{
+ object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}},
+ data: %{"to" => [Pleroma.Constants.as_public()]}
+ }
+
+ {:ok, activity: activity}
+ end
+
+ test "produce public:media topics", %{activity: activity} do
+ topics = Topics.get_activity_topics(activity)
+
+ assert Enum.member?(topics, "public:media")
+ end
+
+ test "local produces public:local:media topics", %{activity: activity} do
+ topics = Topics.get_activity_topics(activity)
+
+ assert Enum.member?(topics, "public:local:media")
+ end
+
+ test "non-local doesn't produce public:local:media topics", %{activity: activity} do
+ activity = %{activity | local: false}
+
+ topics = Topics.get_activity_topics(activity)
+
+ refute Enum.member?(topics, "public:local:media")
+ end
+ end
+
+ describe "non-public visibility" do
+ test "produces direct topic" do
+ activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}}
+ topics = Topics.get_activity_topics(activity)
+
+ assert Enum.member?(topics, "direct")
+ refute Enum.member?(topics, "public")
+ refute Enum.member?(topics, "public:local")
+ refute Enum.member?(topics, "public:media")
+ refute Enum.member?(topics, "public:local:media")
+ end
+ end
+end
diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs
index 63bf73412f..c04262808f 100644
--- a/test/integration/mastodon_websocket_test.exs
+++ b/test/integration/mastodon_websocket_test.exs
@@ -11,7 +11,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
alias Pleroma.Integration.WebsocketClient
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth
- alias Pleroma.Web.Streamer
@path Pleroma.Web.Endpoint.url()
|> URI.parse()
@@ -19,16 +18,6 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
|> Map.put(:path, "/api/v1/streaming")
|> URI.to_string()
- setup do
- GenServer.start(Streamer, %{}, name: Streamer)
-
- on_exit(fn ->
- if pid = Process.whereis(Streamer) do
- Process.exit(pid, :kill)
- end
- end)
- end
-
def start_socket(qs \\ nil, headers \\ []) do
path =
case qs do
@@ -53,12 +42,14 @@ test "requires authentication and a valid token for protected streams" do
end)
end
+ @tag needs_streamer: true
test "allows public streams without authentication" do
assert {:ok, _} = start_socket("?stream=public")
assert {:ok, _} = start_socket("?stream=public:local")
assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
end
+ @tag needs_streamer: true
test "receives well formatted events" do
user = insert(:user)
{:ok, _} = start_socket("?stream=public")
@@ -103,6 +94,7 @@ test "accepts valid tokens", state do
assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
end
+ @tag needs_streamer: true
test "accepts the 'user' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
@@ -111,6 +103,7 @@ test "accepts the 'user' stream", %{token: token} = _state do
end) =~ ":badarg"
end
+ @tag needs_streamer: true
test "accepts the 'user:notification' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
@@ -119,6 +112,7 @@ test "accepts the 'user:notification' stream", %{token: token} = _state do
end) =~ ":badarg"
end
+ @tag needs_streamer: true
test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])
diff --git a/test/notification_test.exs b/test/notification_test.exs
index 3be9db09b3..3d2f9a8fcc 100644
--- a/test/notification_test.exs
+++ b/test/notification_test.exs
@@ -69,16 +69,7 @@ test "does not create a notification for subscribed users if status is a reply"
end
describe "create_notification" do
- setup do
- GenServer.start(Streamer, %{}, name: Streamer)
-
- on_exit(fn ->
- if pid = Process.whereis(Streamer) do
- Process.exit(pid, :kill)
- end
- end)
- end
-
+ @tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user)
task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex
index ec5892ff53..b39c706774 100644
--- a/test/support/conn_case.ex
+++ b/test/support/conn_case.ex
@@ -40,6 +40,10 @@ defmodule Pleroma.Web.ConnCase do
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
end
+ if tags[:needs_streamer] do
+ start_supervised(Pleroma.Web.Streamer.supervisor())
+ end
+
{:ok, conn: Phoenix.ConnTest.build_conn()}
end
end
diff --git a/test/support/data_case.ex b/test/support/data_case.ex
index f3d98e7e31..17fa152140 100644
--- a/test/support/data_case.ex
+++ b/test/support/data_case.ex
@@ -39,6 +39,10 @@ defmodule Pleroma.DataCase do
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
end
+ if tags[:needs_streamer] do
+ start_supervised(Pleroma.Web.Streamer.supervisor())
+ end
+
:ok
end
diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs
index d0118fefad..4100108a56 100644
--- a/test/web/activity_pub/activity_pub_test.exs
+++ b/test/web/activity_pub/activity_pub_test.exs
@@ -38,9 +38,7 @@ test "it streams them out" do
stream: fn _, _ -> nil end do
ActivityPub.stream_out_participations(conversation.participations)
- Enum.each(participations, fn participation ->
- assert called(Pleroma.Web.Streamer.stream("participation", participation))
- end)
+ assert called(Pleroma.Web.Streamer.stream("participation", participations))
end
end
end
diff --git a/test/web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs
new file mode 100644
index 0000000000..3d52c00e41
--- /dev/null
+++ b/test/web/streamer/ping_test.exs
@@ -0,0 +1,36 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.PingTest do
+ use Pleroma.DataCase
+
+ import Pleroma.Factory
+ alias Pleroma.Web.Streamer
+
+ setup do
+ start_supervised({Streamer.supervisor(), [ping_interval: 30]})
+
+ :ok
+ end
+
+ describe "sockets" do
+ setup do
+ user = insert(:user)
+ {:ok, %{user: user}}
+ end
+
+ test "it sends pings", %{user: user} do
+ task =
+ Task.async(fn ->
+ assert_receive {:text, received_event}, 40
+ assert_receive {:text, received_event}, 40
+ assert_receive {:text, received_event}, 40
+ end)
+
+ Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})
+
+ Task.await(task)
+ end
+ end
+end
diff --git a/test/web/streamer/state_test.exs b/test/web/streamer/state_test.exs
new file mode 100644
index 0000000000..d1aeac541a
--- /dev/null
+++ b/test/web/streamer/state_test.exs
@@ -0,0 +1,54 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.StateTest do
+ use Pleroma.DataCase
+
+ import Pleroma.Factory
+ alias Pleroma.Web.Streamer
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ @moduletag needs_streamer: true
+
+ describe "sockets" do
+ setup do
+ user = insert(:user)
+ user2 = insert(:user)
+ {:ok, %{user: user, user2: user2}}
+ end
+
+ test "it can add a socket", %{user: user} do
+ Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
+
+ assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
+ end
+
+ test "it can add multiple sockets per user", %{user: user} do
+ Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
+ Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})
+
+ assert(
+ %{
+ "public" => [
+ %StreamerSocket{transport_pid: 2},
+ %StreamerSocket{transport_pid: 1}
+ ]
+ } = Streamer.get_sockets()
+ )
+ end
+
+ test "it will not add a duplicate socket", %{user: user} do
+ Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
+ Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
+
+ assert(
+ %{
+ "activity" => [
+ %StreamerSocket{transport_pid: 1}
+ ]
+ } = Streamer.get_sockets()
+ )
+ end
+ end
+end
diff --git a/test/web/streamer_test.exs b/test/web/streamer/streamer_test.exs
similarity index 86%
rename from test/web/streamer_test.exs
rename to test/web/streamer/streamer_test.exs
index 96fa7645f9..88847e20f5 100644
--- a/test/web/streamer_test.exs
+++ b/test/web/streamer/streamer_test.exs
@@ -5,24 +5,20 @@
defmodule Pleroma.Web.StreamerTest do
use Pleroma.DataCase
+ import Pleroma.Factory
+
alias Pleroma.List
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer
- import Pleroma.Factory
+ alias Pleroma.Web.Streamer.StreamerSocket
+ alias Pleroma.Web.Streamer.Worker
+ @moduletag needs_streamer: true
clear_config_all([:instance, :skip_thread_containment])
describe "user streams" do
setup do
- GenServer.start(Streamer, %{}, name: Streamer)
-
- on_exit(fn ->
- if pid = Process.whereis(Streamer) do
- Process.exit(pid, :kill)
- end
- end)
-
user = insert(:user)
notify = insert(:notification, user: user, activity: build(:note_activity))
{:ok, %{user: user, notify: notify}}
@@ -125,11 +121,9 @@ test "it sends to public" do
assert_receive {:text, _}, 4_000
end)
- fake_socket = %{
+ fake_socket = %StreamerSocket{
transport_pid: task.pid,
- assigns: %{
- user: user
- }
+ user: user
}
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
@@ -138,7 +132,7 @@ test "it sends to public" do
"public" => [fake_socket]
}
- Streamer.push_to_socket(topics, "public", activity)
+ Worker.push_to_socket(topics, "public", activity)
Task.await(task)
@@ -155,11 +149,9 @@ test "it sends to public" do
assert received_event == expected_event
end)
- fake_socket = %{
+ fake_socket = %StreamerSocket{
transport_pid: task.pid,
- assigns: %{
- user: user
- }
+ user: user
}
{:ok, activity} = CommonAPI.delete(activity.id, other_user)
@@ -168,7 +160,7 @@ test "it sends to public" do
"public" => [fake_socket]
}
- Streamer.push_to_socket(topics, "public", activity)
+ Worker.push_to_socket(topics, "public", activity)
Task.await(task)
end
@@ -189,9 +181,9 @@ test "it doesn't send to user if recipients invalid and thread containment is en
)
task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
- fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
+ fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]}
- Streamer.push_to_socket(topics, "public", activity)
+ Worker.push_to_socket(topics, "public", activity)
Task.await(task)
end
@@ -211,9 +203,9 @@ test "it sends message if recipients invalid and thread containment is disabled"
)
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
- fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
+ fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]}
- Streamer.push_to_socket(topics, "public", activity)
+ Worker.push_to_socket(topics, "public", activity)
Task.await(task)
end
@@ -233,9 +225,9 @@ test "it sends message if recipients invalid and thread containment is enabled b
)
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
- fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
+ fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]}
- Streamer.push_to_socket(topics, "public", activity)
+ Worker.push_to_socket(topics, "public", activity)
Task.await(task)
end
@@ -251,11 +243,9 @@ test "it doesn't send to blocked users" do
refute_receive {:text, _}, 1_000
end)
- fake_socket = %{
+ fake_socket = %StreamerSocket{
transport_pid: task.pid,
- assigns: %{
- user: user
- }
+ user: user
}
{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
@@ -264,7 +254,7 @@ test "it doesn't send to blocked users" do
"public" => [fake_socket]
}
- Streamer.push_to_socket(topics, "public", activity)
+ Worker.push_to_socket(topics, "public", activity)
Task.await(task)
end
@@ -284,11 +274,9 @@ test "it doesn't send unwanted DMs to list" do
refute_receive {:text, _}, 1_000
end)
- fake_socket = %{
+ fake_socket = %StreamerSocket{
transport_pid: task.pid,
- assigns: %{
- user: user_a
- }
+ user: user_a
}
{:ok, activity} =
@@ -301,7 +289,7 @@ test "it doesn't send unwanted DMs to list" do
"list:#{list.id}" => [fake_socket]
}
- Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+ Worker.handle_call({:stream, "list", activity}, self(), topics)
Task.await(task)
end
@@ -318,11 +306,9 @@ test "it doesn't send unwanted private posts to list" do
refute_receive {:text, _}, 1_000
end)
- fake_socket = %{
+ fake_socket = %StreamerSocket{
transport_pid: task.pid,
- assigns: %{
- user: user_a
- }
+ user: user_a
}
{:ok, activity} =
@@ -335,12 +321,12 @@ test "it doesn't send unwanted private posts to list" do
"list:#{list.id}" => [fake_socket]
}
- Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+ Worker.handle_call({:stream, "list", activity}, self(), topics)
Task.await(task)
end
- test "it send wanted private posts to list" do
+ test "it sends wanted private posts to list" do
user_a = insert(:user)
user_b = insert(:user)
@@ -354,11 +340,9 @@ test "it send wanted private posts to list" do
assert_receive {:text, _}, 1_000
end)
- fake_socket = %{
+ fake_socket = %StreamerSocket{
transport_pid: task.pid,
- assigns: %{
- user: user_a
- }
+ user: user_a
}
{:ok, activity} =
@@ -367,11 +351,12 @@ test "it send wanted private posts to list" do
"visibility" => "private"
})
- topics = %{
- "list:#{list.id}" => [fake_socket]
- }
+ Streamer.add_socket(
+ "list:#{list.id}",
+ fake_socket
+ )
- Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
+ Worker.handle_call({:stream, "list", activity}, self(), %{})
Task.await(task)
end
@@ -387,11 +372,9 @@ test "it doesn't send muted reblogs" do
refute_receive {:text, _}, 1_000
end)
- fake_socket = %{
+ fake_socket = %StreamerSocket{
transport_pid: task.pid,
- assigns: %{
- user: user1
- }
+ user: user1
}
{:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
@@ -401,7 +384,7 @@ test "it doesn't send muted reblogs" do
"public" => [fake_socket]
}
- Streamer.push_to_socket(topics, "public", announce_activity)
+ Worker.push_to_socket(topics, "public", announce_activity)
Task.await(task)
end
@@ -417,6 +400,8 @@ test "it doesn't send posts from muted threads" do
task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
+ Process.sleep(4000)
+
Streamer.add_socket(
"user",
%{transport_pid: task.pid, assigns: %{user: user2}}
@@ -428,14 +413,6 @@ test "it doesn't send posts from muted threads" do
describe "direct streams" do
setup do
- GenServer.start(Streamer, %{}, name: Streamer)
-
- on_exit(fn ->
- if pid = Process.whereis(Streamer) do
- Process.exit(pid, :kill)
- end
- end)
-
:ok
end
@@ -480,6 +457,8 @@ test "it doesn't send conversation update to the 'direct' streamj when the last
refute_receive {:text, _}, 4_000
end)
+ Process.sleep(1000)
+
Streamer.add_socket(
"direct",
%{transport_pid: task.pid, assigns: %{user: user}}
@@ -521,6 +500,8 @@ test "it sends conversation update to the 'direct' stream when a message is dele
assert last_status["id"] == to_string(create_activity.id)
end)
+ Process.sleep(1000)
+
Streamer.add_socket(
"direct",
%{transport_pid: task.pid, assigns: %{user: user}}