diff --git a/.gitignore b/.gitignore index 3b0c7d3614..4e71a7df0d 100644 --- a/.gitignore +++ b/.gitignore @@ -43,7 +43,3 @@ docs/generated_config.md # Code test coverage /cover /Elixir.*.coverdata - -.idea -pleroma.iml - diff --git a/config/config.exs b/config/config.exs index b1b98af93f..ab6e00c984 100644 --- a/config/config.exs +++ b/config/config.exs @@ -331,10 +331,6 @@ follow_handshake_timeout: 500, sign_object_fetches: true -config :pleroma, :streamer, - workers: 3, - overflow_workers: 2 - config :pleroma, :user, deny_follow_blocked: true config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex deleted file mode 100644 index 010897abcc..0000000000 --- a/lib/pleroma/activity/ir/topics.ex +++ /dev/null @@ -1,63 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Activity.Ir.Topics do - alias Pleroma.Object - alias Pleroma.Web.ActivityPub.Visibility - - def get_activity_topics(activity) do - activity - |> Object.normalize() - |> generate_topics(activity) - |> List.flatten() - end - - defp generate_topics(%{data: %{"type" => "Answer"}}, _) do - [] - end - - defp generate_topics(object, activity) do - ["user", "list"] ++ visibility_tags(object, activity) - end - - defp visibility_tags(object, activity) do - case Visibility.get_visibility(activity) do - "public" -> - if activity.local do - ["public", "public:local"] - else - ["public"] - end - |> item_creation_tags(object, activity) - - "direct" -> - ["direct"] - - _ -> - [] - end - end - - defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do - tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity) - end - - defp item_creation_tags(tags, _, _) do - tags - end - - defp hashtags_to_topics(%{data: %{"tag" => tags}}) do - tags - |> Enum.filter(&is_bitstring(&1)) - |> Enum.map(fn tag -> "hashtag:" <> tag end) - end - - defp hashtags_to_topics(_), do: [] - - defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: [] - - defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"] - - defp attachment_topics(_object, _act), do: ["public:media"] -end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 3b37ce630b..49094704b3 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -141,7 +141,7 @@ defp oauth_cleanup_enabled?, defp streamer_child(:test), do: [] defp streamer_child(_) do - [Pleroma.Web.Streamer.supervisor()] + [Pleroma.Web.Streamer] end defp oauth_cleanup_child(true), diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index 8012389ac3..b7c880c515 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -210,10 +210,8 @@ def create_notification(%Activity{} = activity, %User{} = user) do unless skip?(activity, user) do notification = %Notification{user_id: user.id, activity: activity} {:ok, notification} = Repo.insert(notification) - - ["user", "user:notification"] - |> Streamer.stream(notification) - + Streamer.stream("user", notification) + Streamer.stream("user:notification", notification) Push.send(notification) notification end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index bc5ae7fbf9..41f6a0f1f7 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,7 +4,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity - alias Pleroma.Activity.Ir.Topics alias Pleroma.Config alias Pleroma.Conversation alias Pleroma.Notification @@ -17,7 +16,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier - alias Pleroma.Web.Streamer alias Pleroma.Web.WebFinger alias Pleroma.Workers.BackgroundWorker @@ -189,7 +187,9 @@ def stream_out_participations(participations) do participations |> Repo.preload(:user) - Streamer.stream("participation", participations) + Enum.each(participations, fn participation -> + Pleroma.Web.Streamer.stream("participation", participation) + end) end def stream_out_participations(%Object{data: %{"context" => context}}, user) do @@ -208,15 +208,41 @@ def stream_out_participations(%Object{data: %{"context" => context}}, user) do def stream_out_participations(_, _), do: :noop - def stream_out(%Activity{data: %{"type" => data_type}} = activity) - when data_type in ["Create", "Announce", "Delete"] do - activity - |> Topics.get_activity_topics() - |> Streamer.stream(activity) - end + def stream_out(activity) do + if activity.data["type"] in ["Create", "Announce", "Delete"] do + object = Object.normalize(activity) + # Do not stream out poll replies + unless object.data["type"] == "Answer" do + Pleroma.Web.Streamer.stream("user", activity) + Pleroma.Web.Streamer.stream("list", activity) - def stream_out(_activity) do - :noop + if get_visibility(activity) == "public" do + Pleroma.Web.Streamer.stream("public", activity) + + if activity.local do + Pleroma.Web.Streamer.stream("public:local", activity) + end + + if activity.data["type"] in ["Create"] do + object.data + |> Map.get("tag", []) + |> Enum.filter(fn tag -> is_bitstring(tag) end) + |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) + + if object.data["attachment"] != [] do + Pleroma.Web.Streamer.stream("public:media", activity) + + if activity.local do + Pleroma.Web.Streamer.stream("public:local:media", activity) + end + end + end + else + if get_visibility(activity) == "direct", + do: Pleroma.Web.Streamer.stream("direct", activity) + end + end + end end def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 3c26eb4069..dbd3542ead 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.OAuth.Token - alias Pleroma.Web.Streamer @behaviour :cowboy_websocket @@ -25,7 +24,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do ] @anonymous_streams ["public", "public:local", "hashtag"] - # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. + # Handled by periodic keepalive in Pleroma.Web.Streamer. @timeout :infinity def init(%{qs: qs} = req, state) do @@ -66,7 +65,7 @@ def websocket_info(:subscribe, state) do }, topic #{state.topic}" ) - Streamer.add_socket(state.topic, streamer_socket(state)) + Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state)) {:ok, state} end @@ -81,7 +80,7 @@ def terminate(reason, _req, state) do }, topic #{state.topic || "?"}: #{inspect(reason)}" ) - Streamer.remove_socket(state.topic, streamer_socket(state)) + Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state)) :ok end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex new file mode 100644 index 0000000000..587c43f401 --- /dev/null +++ b/lib/pleroma/web/streamer.ex @@ -0,0 +1,318 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.Streamer do + use GenServer + require Logger + alias Pleroma.Activity + alias Pleroma.Config + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.Object + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Visibility + alias Pleroma.Web.CommonAPI + alias Pleroma.Web.MastodonAPI.NotificationView + + @keepalive_interval :timer.seconds(30) + + def start_link(_) do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + def add_socket(topic, socket) do + GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) + end + + def remove_socket(topic, socket) do + GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) + end + + def stream(topic, item) do + GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) + end + + def init(args) do + Process.send_after(self(), %{action: :ping}, @keepalive_interval) + + {:ok, args} + end + + def handle_info(%{action: :ping}, topics) do + topics + |> Map.values() + |> List.flatten() + |> Enum.each(fn socket -> + Logger.debug("Sending keepalive ping") + send(socket.transport_pid, {:text, ""}) + end) + + Process.send_after(self(), %{action: :ping}, @keepalive_interval) + + {:noreply, topics} + end + + def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "direct:#{id}" end) + + Enum.each(recipient_topics || [], fn user_topic -> + Logger.debug("Trying to push direct message to #{user_topic}\n\n") + push_to_socket(topics, user_topic, item) + end) + + {:noreply, topics} + end + + def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do + user_topic = "direct:#{participation.user_id}" + Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") + + push_to_socket(topics, user_topic, participation) + + {:noreply, topics} + end + + def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do + # filter the recipient list if the activity is not public, see #270. + recipient_lists = + case Visibility.is_public?(item) do + true -> + Pleroma.List.get_lists_from_activity(item) + + _ -> + Pleroma.List.get_lists_from_activity(item) + |> Enum.filter(fn list -> + owner = User.get_cached_by_id(list.user_id) + + Visibility.visible_for_user?(item, owner) + end) + end + + recipient_topics = + recipient_lists + |> Enum.map(fn %{id: id} -> "list:#{id}" end) + + Enum.each(recipient_topics || [], fn list_topic -> + Logger.debug("Trying to push message to #{list_topic}\n\n") + push_to_socket(topics, list_topic, item) + end) + + {:noreply, topics} + end + + def handle_cast( + %{action: :stream, topic: topic, item: %Notification{} = item}, + topics + ) + when topic in ["user", "user:notification"] do + topics + |> Map.get("#{topic}:#{item.user_id}", []) + |> Enum.each(fn socket -> + with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), + true <- should_send?(user, item) do + send( + socket.transport_pid, + {:text, represent_notification(socket.assigns[:user], item)} + ) + end + end) + + {:noreply, topics} + end + + def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do + Logger.debug("Trying to push to users") + + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + + Enum.each(recipient_topics, fn topic -> + push_to_socket(topics, topic, item) + end) + + {:noreply, topics} + end + + def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do + Logger.debug("Trying to push to #{topic}") + Logger.debug("Pushing item to #{topic}") + push_to_socket(topics, topic, item) + {:noreply, topics} + end + + def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do + topic = internal_topic(topic, socket) + sockets_for_topic = sockets[topic] || [] + sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) + sockets = Map.put(sockets, topic, sockets_for_topic) + Logger.debug("Got new conn for #{topic}") + {:noreply, sockets} + end + + def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do + topic = internal_topic(topic, socket) + sockets_for_topic = sockets[topic] || [] + sockets_for_topic = List.delete(sockets_for_topic, socket) + sockets = Map.put(sockets, topic, sockets_for_topic) + Logger.debug("Removed conn for #{topic}") + {:noreply, sockets} + end + + def handle_cast(m, state) do + Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") + {:noreply, state} + end + + defp represent_update(%Activity{} = activity, %User{} = user) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity, + for: user + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + defp represent_update(%Activity{} = activity) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def represent_conversation(%Participation{} = participation) do + %{ + event: "conversation", + payload: + Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ + participation: participation, + for: participation.user + }) + |> Jason.encode!() + } + |> Jason.encode!() + end + + @spec represent_notification(User.t(), Notification.t()) :: binary() + defp represent_notification(%User{} = user, %Notification{} = notify) do + %{ + event: "notification", + payload: + NotificationView.render( + "show.json", + %{notification: notify, for: user} + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + defp should_send?(%User{} = user, %Activity{} = item) do + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + reblog_mutes = user.info.muted_reblogs || [] + domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) + + with parent when not is_nil(parent) <- Object.normalize(item), + true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), + true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), + %{host: item_host} <- URI.parse(item.actor), + %{host: parent_host} <- URI.parse(parent.data["actor"]), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), + true <- thread_containment(item, user), + false <- CommonAPI.thread_muted?(user, item) do + true + else + _ -> false + end + end + + defp should_send?(%User{} = user, %Notification{activity: activity}) do + should_send?(user, activity) + end + + def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do + Enum.each(topics[topic] || [], fn socket -> + # Get the current user so we have up-to-date blocks etc. + if socket.assigns[:user] do + user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) + + if should_send?(user, item) do + send(socket.transport_pid, {:text, represent_update(item, user)}) + end + else + send(socket.transport_pid, {:text, represent_update(item)}) + end + end) + end + + def push_to_socket(topics, topic, %Participation{} = participation) do + Enum.each(topics[topic] || [], fn socket -> + send(socket.transport_pid, {:text, represent_conversation(participation)}) + end) + end + + def push_to_socket(topics, topic, %Activity{ + data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} + }) do + Enum.each(topics[topic] || [], fn socket -> + send( + socket.transport_pid, + {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} + ) + end) + end + + def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + + def push_to_socket(topics, topic, item) do + Enum.each(topics[topic] || [], fn socket -> + # Get the current user so we have up-to-date blocks etc. + if socket.assigns[:user] do + user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + + with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), + true <- thread_containment(item, user) do + send(socket.transport_pid, {:text, represent_update(item, user)}) + end + else + send(socket.transport_pid, {:text, represent_update(item)}) + end + end) + end + + defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do + "#{topic}:#{socket.assigns[:user].id}" + end + + defp internal_topic(topic, _), do: topic + + @spec thread_containment(Activity.t(), User.t()) :: boolean() + defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true + + defp thread_containment(activity, user) do + if Config.get([:instance, :skip_thread_containment]) do + true + else + ActivityPub.contain_activity(activity, user) + end + end +end diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex deleted file mode 100644 index f77cbb95c6..0000000000 --- a/lib/pleroma/web/streamer/ping.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Pleroma.Web.Streamer.Ping do - use GenServer - require Logger - - alias Pleroma.Web.Streamer.State - alias Pleroma.Web.Streamer.StreamerSocket - - @keepalive_interval :timer.seconds(30) - - def start_link(opts) do - ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) - GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) - end - - def init(%{ping_interval: ping_interval} = args) do - Process.send_after(self(), :ping, ping_interval) - {:ok, args} - end - - def handle_info(:ping, %{ping_interval: ping_interval} = state) do - State.get_sockets() - |> Map.values() - |> List.flatten() - |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> - Logger.debug("Sending keepalive ping") - send(transport_pid, {:text, ""}) - end) - - Process.send_after(self(), :ping, ping_interval) - - {:noreply, state} - end -end diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex deleted file mode 100644 index 7b5199068f..0000000000 --- a/lib/pleroma/web/streamer/state.ex +++ /dev/null @@ -1,68 +0,0 @@ -defmodule Pleroma.Web.Streamer.State do - use GenServer - require Logger - - alias Pleroma.Web.Streamer.StreamerSocket - - def start_link(_) do - GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) - end - - def add_socket(topic, socket) do - GenServer.call(__MODULE__, {:add, socket, topic}) - end - - def remove_socket(topic, socket) do - GenServer.call(__MODULE__, {:remove, socket, topic}) - end - - def get_sockets do - %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) - stream_sockets - end - - def init(init_arg) do - {:ok, init_arg} - end - - def handle_call(:get_state, _from, state) do - {:reply, state, state} - end - - def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do - internal_topic = internal_topic(topic, socket) - stream_socket = StreamerSocket.from_socket(socket) - - sockets_for_topic = - sockets - |> Map.get(internal_topic, []) - |> List.insert_at(0, stream_socket) - |> Enum.uniq() - - state = put_in(state, [:sockets, internal_topic], sockets_for_topic) - Logger.debug("Got new conn for #{topic}") - {:reply, state, state} - end - - def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do - internal_topic = internal_topic(topic, socket) - stream_socket = StreamerSocket.from_socket(socket) - - sockets_for_topic = - sockets - |> Map.get(internal_topic, []) - |> List.delete(stream_socket) - - state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) - {:reply, state, state} - end - - defp internal_topic(topic, socket) - when topic in ~w[user user:notification direct] do - "#{topic}:#{socket.assigns[:user].id}" - end - - defp internal_topic(topic, _) do - topic - end -end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex deleted file mode 100644 index 8cf719277f..0000000000 --- a/lib/pleroma/web/streamer/streamer.ex +++ /dev/null @@ -1,55 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer do - alias Pleroma.Web.Streamer.State - alias Pleroma.Web.Streamer.Worker - - @timeout 60_000 - @mix_env Mix.env() - - def add_socket(topic, socket) do - State.add_socket(topic, socket) - end - - def remove_socket(topic, socket) do - State.remove_socket(topic, socket) - end - - def get_sockets do - State.get_sockets() - end - - def stream(topics, items) do - if should_send?() do - Task.async(fn -> - :poolboy.transaction( - :streamer_worker, - &Worker.stream(&1, topics, items), - @timeout - ) - end) - end - end - - def supervisor, do: Pleroma.Web.Streamer.Supervisor - - defp should_send? do - handle_should_send(@mix_env) - end - - defp handle_should_send(:test) do - case Process.whereis(:streamer_worker) do - nil -> - false - - pid -> - Process.alive?(pid) - end - end - - defp handle_should_send(_) do - true - end -end diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex deleted file mode 100644 index f006c03061..0000000000 --- a/lib/pleroma/web/streamer/streamer_socket.ex +++ /dev/null @@ -1,31 +0,0 @@ -defmodule Pleroma.Web.Streamer.StreamerSocket do - defstruct transport_pid: nil, user: nil - - alias Pleroma.User - alias Pleroma.Web.Streamer.StreamerSocket - - def from_socket(%{ - transport_pid: transport_pid, - assigns: %{user: nil} - }) do - %StreamerSocket{ - transport_pid: transport_pid - } - end - - def from_socket(%{ - transport_pid: transport_pid, - assigns: %{user: %User{} = user} - }) do - %StreamerSocket{ - transport_pid: transport_pid, - user: user - } - end - - def from_socket(%{transport_pid: transport_pid}) do - %StreamerSocket{ - transport_pid: transport_pid - } - end -end diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex deleted file mode 100644 index 6afe19323a..0000000000 --- a/lib/pleroma/web/streamer/supervisor.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Pleroma.Web.Streamer.Supervisor do - use Supervisor - - def start_link(opts) do - Supervisor.start_link(__MODULE__, opts, name: __MODULE__) - end - - def init(args) do - children = [ - {Pleroma.Web.Streamer.State, args}, - {Pleroma.Web.Streamer.Ping, args}, - :poolboy.child_spec(:streamer_worker, poolboy_config()) - ] - - opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] - Supervisor.init(children, opts) - end - - defp poolboy_config do - opts = - Pleroma.Config.get(:streamer, - workers: 3, - overflow_workers: 2 - ) - - [ - {:name, {:local, :streamer_worker}}, - {:worker_module, Pleroma.Web.Streamer.Worker}, - {:size, opts[:workers]}, - {:max_overflow, opts[:overflow_workers]} - ] - end -end diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex deleted file mode 100644 index 5804508eb1..0000000000 --- a/lib/pleroma/web/streamer/worker.ex +++ /dev/null @@ -1,220 +0,0 @@ -defmodule Pleroma.Web.Streamer.Worker do - use GenServer - - require Logger - - alias Pleroma.Activity - alias Pleroma.Config - alias Pleroma.Conversation.Participation - alias Pleroma.Notification - alias Pleroma.Object - alias Pleroma.User - alias Pleroma.Web.ActivityPub.ActivityPub - alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.CommonAPI - alias Pleroma.Web.Streamer.State - alias Pleroma.Web.Streamer.StreamerSocket - alias Pleroma.Web.StreamerView - - def start_link(_) do - GenServer.start_link(__MODULE__, %{}, []) - end - - def init(init_arg) do - {:ok, init_arg} - end - - def stream(pid, topics, items) do - GenServer.call(pid, {:stream, topics, items}) - end - - def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do - Enum.each(topics, fn t -> - do_stream(%{topic: t, item: item}) - end) - - {:reply, state, state} - end - - def handle_call({:stream, topic, items}, _from, state) when is_list(items) do - Enum.each(items, fn i -> - do_stream(%{topic: topic, item: i}) - end) - - {:reply, state, state} - end - - def handle_call({:stream, topic, item}, _from, state) do - do_stream(%{topic: topic, item: item}) - - {:reply, state, state} - end - - defp do_stream(%{topic: "direct", item: item}) do - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - - Enum.each(recipient_topics, fn user_topic -> - Logger.debug("Trying to push direct message to #{user_topic}\n\n") - push_to_socket(State.get_sockets(), user_topic, item) - end) - end - - defp do_stream(%{topic: "participation", item: participation}) do - user_topic = "direct:#{participation.user_id}" - Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - - push_to_socket(State.get_sockets(), user_topic, participation) - end - - defp do_stream(%{topic: "list", item: item}) do - # filter the recipient list if the activity is not public, see #270. - recipient_lists = - case Visibility.is_public?(item) do - true -> - Pleroma.List.get_lists_from_activity(item) - - _ -> - Pleroma.List.get_lists_from_activity(item) - |> Enum.filter(fn list -> - owner = User.get_cached_by_id(list.user_id) - - Visibility.visible_for_user?(item, owner) - end) - end - - recipient_topics = - recipient_lists - |> Enum.map(fn %{id: id} -> "list:#{id}" end) - - Enum.each(recipient_topics, fn list_topic -> - Logger.debug("Trying to push message to #{list_topic}\n\n") - push_to_socket(State.get_sockets(), list_topic, item) - end) - end - - defp do_stream(%{topic: topic, item: %Notification{} = item}) - when topic in ["user", "user:notification"] do - State.get_sockets() - |> Map.get("#{topic}:#{item.user_id}", []) - |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> - with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), - true <- should_send?(user, item) do - send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) - end - end) - end - - defp do_stream(%{topic: "user", item: item}) do - Logger.debug("Trying to push to users") - - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "user:#{id}" end) - - Enum.each(recipient_topics, fn topic -> - push_to_socket(State.get_sockets(), topic, item) - end) - end - - defp do_stream(%{topic: topic, item: item}) do - Logger.debug("Trying to push to #{topic}") - Logger.debug("Pushing item to #{topic}") - push_to_socket(State.get_sockets(), topic, item) - end - - defp should_send?(%User{} = user, %Activity{} = item) do - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - reblog_mutes = user.info.muted_reblogs || [] - domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) - - with parent when not is_nil(parent) <- Object.normalize(item), - true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), - true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), - %{host: item_host} <- URI.parse(item.actor), - %{host: parent_host} <- URI.parse(parent.data["actor"]), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), - true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do - true - else - _ -> false - end - end - - defp should_send?(%User{} = user, %Notification{activity: activity}) do - should_send?(user, activity) - end - - def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{ - transport_pid: transport_pid, - user: socket_user - } -> - # Get the current user so we have up-to-date blocks etc. - if socket_user do - user = User.get_cached_by_ap_id(socket_user.ap_id) - - if should_send?(user, item) do - send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) - end - else - send(transport_pid, {:text, StreamerView.render("update.json", item)}) - end - end) - end - - def push_to_socket(topics, topic, %Participation{} = participation) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> - send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) - end) - end - - def push_to_socket(topics, topic, %Activity{ - data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} - }) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> - send( - transport_pid, - {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} - ) - end) - end - - def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - - def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{ - transport_pid: transport_pid, - user: socket_user - } -> - # Get the current user so we have up-to-date blocks etc. - if socket_user do - user = User.get_cached_by_ap_id(socket_user.ap_id) - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - - with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), - true <- thread_containment(item, user) do - send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) - end - else - send(transport_pid, {:text, StreamerView.render("update.json", item)}) - end - end) - end - - @spec thread_containment(Activity.t(), User.t()) :: boolean() - defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true - - defp thread_containment(activity, user) do - if Config.get([:instance, :skip_thread_containment]) do - true - else - ActivityPub.contain_activity(activity, user) - end - end -end diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex deleted file mode 100644 index b13030fa0a..0000000000 --- a/lib/pleroma/web/views/streamer_view.ex +++ /dev/null @@ -1,66 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.StreamerView do - use Pleroma.Web, :view - - alias Pleroma.Activity - alias Pleroma.Conversation.Participation - alias Pleroma.Notification - alias Pleroma.User - alias Pleroma.Web.MastodonAPI.NotificationView - - def render("update.json", %Activity{} = activity, %User{} = user) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity, - for: user - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - def render("notification.json", %User{} = user, %Notification{} = notify) do - %{ - event: "notification", - payload: - NotificationView.render( - "show.json", - %{notification: notify, for: user} - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - def render("update.json", %Activity{} = activity) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - def render("conversation.json", %Participation{} = participation) do - %{ - event: "conversation", - payload: - Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ - participation: participation, - for: participation.user - }) - |> Jason.encode!() - } - |> Jason.encode!() - end -end diff --git a/mix.exs b/mix.exs index 911ebad1d4..f1e98585bd 100644 --- a/mix.exs +++ b/mix.exs @@ -144,7 +144,6 @@ defp deps do git: "https://git.pleroma.social/pleroma/http_signatures.git", ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"}, {:telemetry, "~> 0.3"}, - {:poolboy, "~> 1.5"}, {:prometheus_ex, "~> 3.0"}, {:prometheus_plugs, "~> 1.1"}, {:prometheus_phoenix, "~> 1.3"}, diff --git a/mix.lock b/mix.lock index 0bf6a811ee..41697dd5c0 100644 --- a/mix.lock +++ b/mix.lock @@ -73,7 +73,6 @@ "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, "plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, - "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"}, "postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, "prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"}, "prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/activity/ir/topics_test.exs b/test/activity/ir/topics_test.exs deleted file mode 100644 index e75f83586a..0000000000 --- a/test/activity/ir/topics_test.exs +++ /dev/null @@ -1,141 +0,0 @@ -defmodule Pleroma.Activity.Ir.TopicsTest do - use Pleroma.DataCase - - alias Pleroma.Activity - alias Pleroma.Activity.Ir.Topics - alias Pleroma.Object - - require Pleroma.Constants - - describe "poll answer" do - test "produce no topics" do - activity = %Activity{object: %Object{data: %{"type" => "Answer"}}} - - assert [] == Topics.get_activity_topics(activity) - end - end - - describe "non poll answer" do - test "always add user and list topics" do - activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}} - topics = Topics.get_activity_topics(activity) - - assert Enum.member?(topics, "user") - assert Enum.member?(topics, "list") - end - end - - describe "public visibility" do - setup do - activity = %Activity{ - object: %Object{data: %{"type" => "Note"}}, - data: %{"to" => [Pleroma.Constants.as_public()]} - } - - {:ok, activity: activity} - end - - test "produces public topic", %{activity: activity} do - topics = Topics.get_activity_topics(activity) - - assert Enum.member?(topics, "public") - end - - test "local action produces public:local topic", %{activity: activity} do - activity = %{activity | local: true} - topics = Topics.get_activity_topics(activity) - - assert Enum.member?(topics, "public:local") - end - - test "non-local action does not produce public:local topic", %{activity: activity} do - activity = %{activity | local: false} - topics = Topics.get_activity_topics(activity) - - refute Enum.member?(topics, "public:local") - end - end - - describe "public visibility create events" do - setup do - activity = %Activity{ - object: %Object{data: %{"type" => "Create", "attachment" => []}}, - data: %{"to" => [Pleroma.Constants.as_public()]} - } - - {:ok, activity: activity} - end - - test "with no attachments doesn't produce public:media topics", %{activity: activity} do - topics = Topics.get_activity_topics(activity) - - refute Enum.member?(topics, "public:media") - refute Enum.member?(topics, "public:local:media") - end - - test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do - tagged_data = Map.put(data, "tag", ["foo", "bar"]) - activity = %{activity | object: %{object | data: tagged_data}} - - topics = Topics.get_activity_topics(activity) - - assert Enum.member?(topics, "hashtag:foo") - assert Enum.member?(topics, "hashtag:bar") - end - - test "only converts strinngs to hash tags", %{ - activity: %{object: %{data: data} = object} = activity - } do - tagged_data = Map.put(data, "tag", [2]) - activity = %{activity | object: %{object | data: tagged_data}} - - topics = Topics.get_activity_topics(activity) - - refute Enum.member?(topics, "hashtag:2") - end - end - - describe "public visibility create events with attachments" do - setup do - activity = %Activity{ - object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}}, - data: %{"to" => [Pleroma.Constants.as_public()]} - } - - {:ok, activity: activity} - end - - test "produce public:media topics", %{activity: activity} do - topics = Topics.get_activity_topics(activity) - - assert Enum.member?(topics, "public:media") - end - - test "local produces public:local:media topics", %{activity: activity} do - topics = Topics.get_activity_topics(activity) - - assert Enum.member?(topics, "public:local:media") - end - - test "non-local doesn't produce public:local:media topics", %{activity: activity} do - activity = %{activity | local: false} - - topics = Topics.get_activity_topics(activity) - - refute Enum.member?(topics, "public:local:media") - end - end - - describe "non-public visibility" do - test "produces direct topic" do - activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}} - topics = Topics.get_activity_topics(activity) - - assert Enum.member?(topics, "direct") - refute Enum.member?(topics, "public") - refute Enum.member?(topics, "public:local") - refute Enum.member?(topics, "public:media") - refute Enum.member?(topics, "public:local:media") - end - end -end diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs index c04262808f..63bf73412f 100644 --- a/test/integration/mastodon_websocket_test.exs +++ b/test/integration/mastodon_websocket_test.exs @@ -11,6 +11,7 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do alias Pleroma.Integration.WebsocketClient alias Pleroma.Web.CommonAPI alias Pleroma.Web.OAuth + alias Pleroma.Web.Streamer @path Pleroma.Web.Endpoint.url() |> URI.parse() @@ -18,6 +19,16 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do |> Map.put(:path, "/api/v1/streaming") |> URI.to_string() + setup do + GenServer.start(Streamer, %{}, name: Streamer) + + on_exit(fn -> + if pid = Process.whereis(Streamer) do + Process.exit(pid, :kill) + end + end) + end + def start_socket(qs \\ nil, headers \\ []) do path = case qs do @@ -42,14 +53,12 @@ test "requires authentication and a valid token for protected streams" do end) end - @tag needs_streamer: true test "allows public streams without authentication" do assert {:ok, _} = start_socket("?stream=public") assert {:ok, _} = start_socket("?stream=public:local") assert {:ok, _} = start_socket("?stream=hashtag&tag=lain") end - @tag needs_streamer: true test "receives well formatted events" do user = insert(:user) {:ok, _} = start_socket("?stream=public") @@ -94,7 +103,6 @@ test "accepts valid tokens", state do assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}") end - @tag needs_streamer: true test "accepts the 'user' stream", %{token: token} = _state do assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") @@ -103,7 +111,6 @@ test "accepts the 'user' stream", %{token: token} = _state do end) =~ ":badarg" end - @tag needs_streamer: true test "accepts the 'user:notification' stream", %{token: token} = _state do assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") @@ -112,7 +119,6 @@ test "accepts the 'user:notification' stream", %{token: token} = _state do end) =~ ":badarg" end - @tag needs_streamer: true test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) diff --git a/test/notification_test.exs b/test/notification_test.exs index 3d2f9a8fcc..3be9db09b3 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -69,7 +69,16 @@ test "does not create a notification for subscribed users if status is a reply" end describe "create_notification" do - @tag needs_streamer: true + setup do + GenServer.start(Streamer, %{}, name: Streamer) + + on_exit(fn -> + if pid = Process.whereis(Streamer) do + Process.exit(pid, :kill) + end + end) + end + test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do user = insert(:user) task = Task.async(fn -> assert_receive {:text, _}, 4_000 end) diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index b39c706774..ec5892ff53 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -40,10 +40,6 @@ defmodule Pleroma.Web.ConnCase do Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) end - if tags[:needs_streamer] do - start_supervised(Pleroma.Web.Streamer.supervisor()) - end - {:ok, conn: Phoenix.ConnTest.build_conn()} end end diff --git a/test/support/data_case.ex b/test/support/data_case.ex index 17fa152140..f3d98e7e31 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -39,10 +39,6 @@ defmodule Pleroma.DataCase do Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) end - if tags[:needs_streamer] do - start_supervised(Pleroma.Web.Streamer.supervisor()) - end - :ok end diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs index 4100108a56..d0118fefad 100644 --- a/test/web/activity_pub/activity_pub_test.exs +++ b/test/web/activity_pub/activity_pub_test.exs @@ -38,7 +38,9 @@ test "it streams them out" do stream: fn _, _ -> nil end do ActivityPub.stream_out_participations(conversation.participations) - assert called(Pleroma.Web.Streamer.stream("participation", participations)) + Enum.each(participations, fn participation -> + assert called(Pleroma.Web.Streamer.stream("participation", participation)) + end) end end end diff --git a/test/web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs deleted file mode 100644 index 3d52c00e41..0000000000 --- a/test/web/streamer/ping_test.exs +++ /dev/null @@ -1,36 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.PingTest do - use Pleroma.DataCase - - import Pleroma.Factory - alias Pleroma.Web.Streamer - - setup do - start_supervised({Streamer.supervisor(), [ping_interval: 30]}) - - :ok - end - - describe "sockets" do - setup do - user = insert(:user) - {:ok, %{user: user}} - end - - test "it sends pings", %{user: user} do - task = - Task.async(fn -> - assert_receive {:text, received_event}, 40 - assert_receive {:text, received_event}, 40 - assert_receive {:text, received_event}, 40 - end) - - Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}}) - - Task.await(task) - end - end -end diff --git a/test/web/streamer/state_test.exs b/test/web/streamer/state_test.exs deleted file mode 100644 index d1aeac541a..0000000000 --- a/test/web/streamer/state_test.exs +++ /dev/null @@ -1,54 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.StateTest do - use Pleroma.DataCase - - import Pleroma.Factory - alias Pleroma.Web.Streamer - alias Pleroma.Web.Streamer.StreamerSocket - - @moduletag needs_streamer: true - - describe "sockets" do - setup do - user = insert(:user) - user2 = insert(:user) - {:ok, %{user: user, user2: user2}} - end - - test "it can add a socket", %{user: user} do - Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) - - assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets()) - end - - test "it can add multiple sockets per user", %{user: user} do - Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}}) - Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}}) - - assert( - %{ - "public" => [ - %StreamerSocket{transport_pid: 2}, - %StreamerSocket{transport_pid: 1} - ] - } = Streamer.get_sockets() - ) - end - - test "it will not add a duplicate socket", %{user: user} do - Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) - Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}}) - - assert( - %{ - "activity" => [ - %StreamerSocket{transport_pid: 1} - ] - } = Streamer.get_sockets() - ) - end - end -end diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer_test.exs similarity index 86% rename from test/web/streamer/streamer_test.exs rename to test/web/streamer_test.exs index 88847e20f5..96fa7645f9 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer_test.exs @@ -5,20 +5,24 @@ defmodule Pleroma.Web.StreamerTest do use Pleroma.DataCase - import Pleroma.Factory - alias Pleroma.List alias Pleroma.User alias Pleroma.Web.CommonAPI alias Pleroma.Web.Streamer - alias Pleroma.Web.Streamer.StreamerSocket - alias Pleroma.Web.Streamer.Worker + import Pleroma.Factory - @moduletag needs_streamer: true clear_config_all([:instance, :skip_thread_containment]) describe "user streams" do setup do + GenServer.start(Streamer, %{}, name: Streamer) + + on_exit(fn -> + if pid = Process.whereis(Streamer) do + Process.exit(pid, :kill) + end + end) + user = insert(:user) notify = insert(:notification, user: user, activity: build(:note_activity)) {:ok, %{user: user, notify: notify}} @@ -121,9 +125,11 @@ test "it sends to public" do assert_receive {:text, _}, 4_000 end) - fake_socket = %StreamerSocket{ + fake_socket = %{ transport_pid: task.pid, - user: user + assigns: %{ + user: user + } } {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) @@ -132,7 +138,7 @@ test "it sends to public" do "public" => [fake_socket] } - Worker.push_to_socket(topics, "public", activity) + Streamer.push_to_socket(topics, "public", activity) Task.await(task) @@ -149,9 +155,11 @@ test "it sends to public" do assert received_event == expected_event end) - fake_socket = %StreamerSocket{ + fake_socket = %{ transport_pid: task.pid, - user: user + assigns: %{ + user: user + } } {:ok, activity} = CommonAPI.delete(activity.id, other_user) @@ -160,7 +168,7 @@ test "it sends to public" do "public" => [fake_socket] } - Worker.push_to_socket(topics, "public", activity) + Streamer.push_to_socket(topics, "public", activity) Task.await(task) end @@ -181,9 +189,9 @@ test "it doesn't send to user if recipients invalid and thread containment is en ) task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} + fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} topics = %{"public" => [fake_socket]} - Worker.push_to_socket(topics, "public", activity) + Streamer.push_to_socket(topics, "public", activity) Task.await(task) end @@ -203,9 +211,9 @@ test "it sends message if recipients invalid and thread containment is disabled" ) task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} + fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} topics = %{"public" => [fake_socket]} - Worker.push_to_socket(topics, "public", activity) + Streamer.push_to_socket(topics, "public", activity) Task.await(task) end @@ -225,9 +233,9 @@ test "it sends message if recipients invalid and thread containment is enabled b ) task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} + fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} topics = %{"public" => [fake_socket]} - Worker.push_to_socket(topics, "public", activity) + Streamer.push_to_socket(topics, "public", activity) Task.await(task) end @@ -243,9 +251,11 @@ test "it doesn't send to blocked users" do refute_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{ + fake_socket = %{ transport_pid: task.pid, - user: user + assigns: %{ + user: user + } } {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) @@ -254,7 +264,7 @@ test "it doesn't send to blocked users" do "public" => [fake_socket] } - Worker.push_to_socket(topics, "public", activity) + Streamer.push_to_socket(topics, "public", activity) Task.await(task) end @@ -274,9 +284,11 @@ test "it doesn't send unwanted DMs to list" do refute_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{ + fake_socket = %{ transport_pid: task.pid, - user: user_a + assigns: %{ + user: user_a + } } {:ok, activity} = @@ -289,7 +301,7 @@ test "it doesn't send unwanted DMs to list" do "list:#{list.id}" => [fake_socket] } - Worker.handle_call({:stream, "list", activity}, self(), topics) + Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Task.await(task) end @@ -306,9 +318,11 @@ test "it doesn't send unwanted private posts to list" do refute_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{ + fake_socket = %{ transport_pid: task.pid, - user: user_a + assigns: %{ + user: user_a + } } {:ok, activity} = @@ -321,12 +335,12 @@ test "it doesn't send unwanted private posts to list" do "list:#{list.id}" => [fake_socket] } - Worker.handle_call({:stream, "list", activity}, self(), topics) + Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Task.await(task) end - test "it sends wanted private posts to list" do + test "it send wanted private posts to list" do user_a = insert(:user) user_b = insert(:user) @@ -340,9 +354,11 @@ test "it sends wanted private posts to list" do assert_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{ + fake_socket = %{ transport_pid: task.pid, - user: user_a + assigns: %{ + user: user_a + } } {:ok, activity} = @@ -351,12 +367,11 @@ test "it sends wanted private posts to list" do "visibility" => "private" }) - Streamer.add_socket( - "list:#{list.id}", - fake_socket - ) + topics = %{ + "list:#{list.id}" => [fake_socket] + } - Worker.handle_call({:stream, "list", activity}, self(), %{}) + Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Task.await(task) end @@ -372,9 +387,11 @@ test "it doesn't send muted reblogs" do refute_receive {:text, _}, 1_000 end) - fake_socket = %StreamerSocket{ + fake_socket = %{ transport_pid: task.pid, - user: user1 + assigns: %{ + user: user1 + } } {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) @@ -384,7 +401,7 @@ test "it doesn't send muted reblogs" do "public" => [fake_socket] } - Worker.push_to_socket(topics, "public", announce_activity) + Streamer.push_to_socket(topics, "public", announce_activity) Task.await(task) end @@ -400,8 +417,6 @@ test "it doesn't send posts from muted threads" do task = Task.async(fn -> refute_receive {:text, _}, 4_000 end) - Process.sleep(4000) - Streamer.add_socket( "user", %{transport_pid: task.pid, assigns: %{user: user2}} @@ -413,6 +428,14 @@ test "it doesn't send posts from muted threads" do describe "direct streams" do setup do + GenServer.start(Streamer, %{}, name: Streamer) + + on_exit(fn -> + if pid = Process.whereis(Streamer) do + Process.exit(pid, :kill) + end + end) + :ok end @@ -457,8 +480,6 @@ test "it doesn't send conversation update to the 'direct' streamj when the last refute_receive {:text, _}, 4_000 end) - Process.sleep(1000) - Streamer.add_socket( "direct", %{transport_pid: task.pid, assigns: %{user: user}} @@ -500,8 +521,6 @@ test "it sends conversation update to the 'direct' stream when a message is dele assert last_status["id"] == to_string(create_activity.id) end) - Process.sleep(1000) - Streamer.add_socket( "direct", %{transport_pid: task.pid, assigns: %{user: user}}