diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 393d093e5b..94e4595d84 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -19,26 +19,12 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do # Hibernate every X messages @hibernate_every 100 - @streams [ - "public", - "public:local", - "public:media", - "public:local:media", - "user", - "user:notification", - "direct", - "list", - "hashtag" - ] - @anonymous_streams ["public", "public:local", "hashtag"] - def init(%{qs: qs} = req, state) do - with params <- :cow_qs.parse_qs(qs), + with params <- Enum.into(:cow_qs.parse_qs(qs), %{}), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), - access_token <- List.keyfind(params, "access_token", 0), - {_, stream} <- List.keyfind(params, "stream", 0), - {:ok, user} <- allow_request(stream, [access_token, sec_websocket]), - topic when is_binary(topic) <- expand_topic(stream, params) do + access_token <- Map.get(params, "access_token"), + {:ok, user} <- authenticate_request(access_token, sec_websocket), + {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do req = if sec_websocket do :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) @@ -49,14 +35,14 @@ def init(%{qs: qs} = req, state) do {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil}, %{idle_timeout: @timeout}} else - {:error, code} -> - Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") - {:ok, req} = :cowboy_req.reply(code, req) + {:error, :bad_topic} -> + Logger.debug("#{__MODULE__} bad topic #{inspect(req)}") + {:ok, req} = :cowboy_req.reply(404, req) {:ok, req, state} - error -> - Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}") - {:ok, req} = :cowboy_req.reply(400, req) + {:error, :unauthorized} -> + Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}") + {:ok, req} = :cowboy_req.reply(401, req) {:ok, req, state} end end @@ -124,50 +110,23 @@ def terminate(reason, _req, state) do end # Public streams without authentication. - defp allow_request(stream, [nil, nil]) when stream in @anonymous_streams do + defp authenticate_request(nil, nil) do {:ok, nil} end # Authenticated streams. - defp allow_request(stream, [access_token, sec_websocket]) when stream in @streams do - token = - with {"access_token", token} <- access_token do - token - else - _ -> sec_websocket - end + defp authenticate_request(access_token, sec_websocket) do + token = access_token || sec_websocket with true <- is_bitstring(token), %Token{user_id: user_id} <- Repo.get_by(Token, token: token), user = %User{} <- User.get_cached_by_id(user_id) do {:ok, user} else - _ -> {:error, 403} + _ -> {:error, :unauthorized} end end - # Not authenticated. - defp allow_request(stream, _) when stream in @streams, do: {:error, 403} - - # No matching stream. - defp allow_request(_, _), do: {:error, 404} - - defp expand_topic("hashtag", params) do - case List.keyfind(params, "tag", 0) do - {_, tag} -> "hashtag:#{tag}" - _ -> nil - end - end - - defp expand_topic("list", params) do - case List.keyfind(params, "list", 0) do - {_, list} -> "list:#{list}" - _ -> nil - end - end - - defp expand_topic(topic, _), do: topic - defp timer do Process.send_after(self(), :tick, @tick) end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 5ad4aa9367..49a400df7e 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -21,12 +21,68 @@ defmodule Pleroma.Web.Streamer do def registry, do: @registry - def add_socket(topic, %User{} = user) do - if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true) + @public_streams ["public", "public:local", "public:media", "public:local:media"] + @user_streams ["user", "user:notification", "direct"] + + @doc "Expands and authorizes a stream, and registers the process for streaming." + @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} + def get_topic_and_add_socket(stream, user, params \\ %{}) do + case get_topic(stream, user, params) do + {:ok, topic} -> add_socket(topic, user) + error -> error + end end - def add_socket(topic, _) do - if should_env_send?(), do: Registry.register(@registry, topic, false) + @doc "Expand and authorizes a stream" + @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: + {:ok, topic :: String.t()} | {:error, :bad_topic} + def get_topic(stream, user, params \\ %{}) + + # Allow all public steams. + def get_topic(stream, _, _) when stream in @public_streams do + {:ok, stream} + end + + # Allow all hashtags streams. + def get_topic("hashtag", _, %{"tag" => tag}) do + {:ok, "hashtag:" <> tag} + end + + # Expand user streams. + def get_topic(stream, %User{} = user, _) when stream in @user_streams do + {:ok, stream <> ":" <> to_string(user.id)} + end + + def get_topic(stream, _, _) when stream in @user_streams do + {:error, :unauthorized} + end + + # List streams. + def get_topic("list", %User{} = user, %{"list" => id}) do + if Pleroma.List.get(id, user) do + {:ok, "list:" <> to_string(id)} + else + {:error, :bad_topic} + end + end + + def get_topic("list", _, _) do + {:error, :unauthorized} + end + + def get_topic(_, _, _) do + {:error, :bad_topic} + end + + @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic." + def add_socket(topic, user) do + if should_env_send?() do + auth? = if user, do: true + Registry.register(@registry, topic, auth?) + end + + {:ok, topic} end def remove_socket(topic) do @@ -231,13 +287,4 @@ def should_env_send?, do: false true -> def should_env_send?, do: true end - - defp user_topic(topic, user) - when topic in ~w[user user:notification direct] do - "#{topic}:#{user.id}" - end - - defp user_topic(topic, _) do - topic - end end diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs index 109c7b4cb6..f61150cd27 100644 --- a/test/integration/mastodon_websocket_test.exs +++ b/test/integration/mastodon_websocket_test.exs @@ -32,7 +32,7 @@ def start_socket(qs \\ nil, headers \\ []) do test "refuses invalid requests" do capture_log(fn -> - assert {:error, {400, _}} = start_socket() + assert {:error, {404, _}} = start_socket() assert {:error, {404, _}} = start_socket("?stream=ncjdk") Process.sleep(30) end) @@ -40,8 +40,8 @@ test "refuses invalid requests" do test "requires authentication and a valid token for protected streams" do capture_log(fn -> - assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") - assert {:error, {403, _}} = start_socket("?stream=user") + assert {:error, {401, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") + assert {:error, {401, _}} = start_socket("?stream=user") Process.sleep(30) end) end @@ -100,7 +100,7 @@ test "accepts the 'user' stream", %{token: token} = _state do assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") assert capture_log(fn -> - assert {:error, {403, "Forbidden"}} = start_socket("?stream=user") + assert {:error, {401, _}} = start_socket("?stream=user") Process.sleep(30) end) =~ ":badarg" end @@ -109,7 +109,7 @@ test "accepts the 'user:notification' stream", %{token: token} = _state do assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") assert capture_log(fn -> - assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification") + assert {:error, {401, _}} = start_socket("?stream=user:notification") Process.sleep(30) end) =~ ":badarg" end @@ -118,7 +118,7 @@ test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) assert capture_log(fn -> - assert {:error, {403, "Forbidden"}} = + assert {:error, {401, _}} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}]) Process.sleep(30) diff --git a/test/notification_test.exs b/test/notification_test.exs index 24e5f0c73d..4dfbc10190 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -170,13 +170,13 @@ test "it creates a notification for user and send to the 'user' and the 'user:no task = Task.async(fn -> - Streamer.add_socket("user", user) + Streamer.get_topic_and_add_socket("user", user) assert_receive {:render_with_user, _, _, _}, 4_000 end) task_user_notification = Task.async(fn -> - Streamer.add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user) assert_receive {:render_with_user, _, _, _}, 4_000 end) diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs index ee530f4e94..db07c5df5e 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -17,6 +17,76 @@ defmodule Pleroma.Web.StreamerTest do setup do: clear_config([:instance, :skip_thread_containment]) + describe "get_topic without an user" do + test "allows public" do + assert {:ok, "public"} = Streamer.get_topic("public", nil) + assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil) + assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil) + assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil) + end + + test "allows hashtag streams" do + assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"}) + end + + test "disallows user streams" do + assert {:error, _} = Streamer.get_topic("user", nil) + assert {:error, _} = Streamer.get_topic("user:notification", nil) + assert {:error, _} = Streamer.get_topic("direct", nil) + end + + test "disallows list streams" do + assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42}) + end + end + + describe "get_topic with an user" do + setup do + user = insert(:user) + {:ok, %{user: user}} + end + + test "allows public streams", %{user: user} do + assert {:ok, "public"} = Streamer.get_topic("public", user) + assert {:ok, "public:local"} = Streamer.get_topic("public:local", user) + assert {:ok, "public:media"} = Streamer.get_topic("public:media", user) + assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user) + end + + test "allows user streams", %{user: user} do + expected_user_topic = "user:#{user.id}" + expected_notif_topic = "user:notification:#{user.id}" + expected_direct_topic = "direct:#{user.id}" + assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user) + assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user) + assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user) + end + + test "allows hashtag streams", %{user: user} do + assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"}) + end + + test "disallows registering to an user stream", %{user: user} do + another_user = insert(:user) + assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user) + assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user) + assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user) + end + + test "allows list stream that are owned by the user", %{user: user} do + {:ok, list} = List.create("Test", user) + assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) + assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id}) + end + + test "disallows list stream that are not owned by the user", %{user: user} do + another_user = insert(:user) + {:ok, list} = List.create("Test", another_user) + assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) + assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id}) + end + end + describe "user streams" do setup do user = insert(:user) @@ -25,14 +95,14 @@ defmodule Pleroma.Web.StreamerTest do end test "it streams the user's post in the 'user' stream", %{user: user} do - Streamer.add_socket("user", user) + Streamer.get_topic_and_add_socket("user", user) {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"}) assert_receive {:render_with_user, _, _, ^activity} refute Streamer.filtered_by_user?(user, activity) end test "it streams boosts of the user in the 'user' stream", %{user: user} do - Streamer.add_socket("user", user) + Streamer.get_topic_and_add_socket("user", user) other_user = insert(:user) {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"}) @@ -43,14 +113,14 @@ test "it streams boosts of the user in the 'user' stream", %{user: user} do end test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do - Streamer.add_socket("user", user) + Streamer.get_topic_and_add_socket("user", user) Streamer.stream("user", notify) assert_receive {:render_with_user, _, _, ^notify} refute Streamer.filtered_by_user?(user, notify) end test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do - Streamer.add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user) Streamer.stream("user:notification", notify) assert_receive {:render_with_user, _, _, ^notify} refute Streamer.filtered_by_user?(user, notify) @@ -62,7 +132,7 @@ test "it doesn't send notify to the 'user:notification' stream when a user is bl blocked = insert(:user) {:ok, _user_relationship} = User.block(user, blocked) - Streamer.add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user) {:ok, activity} = CommonAPI.post(user, %{"status" => ":("}) {:ok, _} = CommonAPI.favorite(blocked, activity.id) @@ -78,7 +148,7 @@ test "it doesn't send notify to the 'user:notification' stream when a thread is {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) {:ok, _} = CommonAPI.add_mute(user, activity) - Streamer.add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) @@ -92,7 +162,7 @@ test "it sends favorite to 'user:notification' stream'", %{ user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) - Streamer.add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) assert_receive {:render_with_user, _, "notification.json", notif} @@ -107,7 +177,7 @@ test "it doesn't send the 'user:notification' stream' when a domain is blocked", {:ok, user} = User.block_domain(user, "hecking-lewd-place.com") {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) - Streamer.add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) refute_receive _ @@ -130,7 +200,7 @@ test "it sends follow activities to the 'user:notification' stream", %{ %Tesla.Env{status: 200, body: body} end) - Streamer.add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) assert_receive {:render_with_user, _, "notification.json", notif} @@ -143,7 +213,7 @@ test "it sends to public authenticated" do user = insert(:user) other_user = insert(:user) - Streamer.add_socket("public", other_user) + Streamer.get_topic_and_add_socket("public", other_user) {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"}) assert_receive {:render_with_user, _, _, ^activity} @@ -155,7 +225,7 @@ test "works for deletions" do other_user = insert(:user) {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) - Streamer.add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user) {:ok, _} = CommonAPI.delete(activity.id, other_user) activity_id = activity.id @@ -166,7 +236,7 @@ test "works for deletions" do test "it sends to public unauthenticated" do user = insert(:user) - Streamer.add_socket("public", nil) + Streamer.get_topic_and_add_socket("public", nil) {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"}) activity_id = activity.id @@ -195,7 +265,7 @@ test "it filters to user if recipients invalid and thread containment is enabled ) ) - Streamer.add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user) Streamer.stream("public", activity) assert_receive {:render_with_user, _, _, ^activity} assert Streamer.filtered_by_user?(user, activity) @@ -216,7 +286,7 @@ test "it sends message if recipients invalid and thread containment is disabled" ) ) - Streamer.add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user) Streamer.stream("public", activity) assert_receive {:render_with_user, _, _, ^activity} @@ -238,7 +308,7 @@ test "it sends message if recipients invalid and thread containment is enabled b ) ) - Streamer.add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user) Streamer.stream("public", activity) assert_receive {:render_with_user, _, _, ^activity} @@ -252,7 +322,7 @@ test "it filters messages involving blocked users" do blocked_user = insert(:user) {:ok, _user_relationship} = User.block(user, blocked_user) - Streamer.add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user) {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) assert_receive {:render_with_user, _, _, ^activity} assert Streamer.filtered_by_user?(user, activity) @@ -263,7 +333,7 @@ test "it filters messages transitively involving blocked users" do blockee = insert(:user) friend = insert(:user) - Streamer.add_socket("public", blocker) + Streamer.get_topic_and_add_socket("public", blocker) {:ok, _user_relationship} = User.block(blocker, blockee) @@ -295,7 +365,7 @@ test "it doesn't send unwanted DMs to list" do {:ok, list} = List.create("Test", user_a) {:ok, list} = List.follow(list, user_b) - Streamer.add_socket("list:#{list.id}", user_a) + Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) {:ok, _activity} = CommonAPI.post(user_b, %{ @@ -313,7 +383,7 @@ test "it doesn't send unwanted private posts to list" do {:ok, list} = List.create("Test", user_a) {:ok, list} = List.follow(list, user_b) - Streamer.add_socket("list:#{list.id}", user_a) + Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) {:ok, _activity} = CommonAPI.post(user_b, %{ @@ -333,7 +403,7 @@ test "it sends wanted private posts to list" do {:ok, list} = List.create("Test", user_a) {:ok, list} = List.follow(list, user_b) - Streamer.add_socket("list:#{list.id}", user_a) + Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) {:ok, activity} = CommonAPI.post(user_b, %{ @@ -356,7 +426,7 @@ test "it filters muted reblogs" do {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) - Streamer.add_socket("user", user1) + Streamer.get_topic_and_add_socket("user", user1) {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2) assert_receive {:render_with_user, _, _, ^announce_activity} assert Streamer.filtered_by_user?(user1, announce_activity) @@ -369,7 +439,7 @@ test "it filters reblog notification for reblog-muted actors" do CommonAPI.hide_reblogs(user1, user2) {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"}) - Streamer.add_socket("user", user1) + Streamer.get_topic_and_add_socket("user", user1) {:ok, _favorite_activity, _} = CommonAPI.repeat(create_activity.id, user2) assert_receive {:render_with_user, _, "notification.json", notif} @@ -383,7 +453,7 @@ test "it send non-reblog notification for reblog-muted actors" do CommonAPI.hide_reblogs(user1, user2) {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"}) - Streamer.add_socket("user", user1) + Streamer.get_topic_and_add_socket("user", user1) {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) assert_receive {:render_with_user, _, "notification.json", notif} @@ -394,7 +464,7 @@ test "it send non-reblog notification for reblog-muted actors" do test "it filters posts from muted threads" do user = insert(:user) user2 = insert(:user) - Streamer.add_socket("user", user2) + Streamer.get_topic_and_add_socket("user", user2) {:ok, user2, user, _activity} = CommonAPI.follow(user2, user) {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) {:ok, _} = CommonAPI.add_mute(user2, activity) @@ -411,7 +481,7 @@ test "it sends conversation update to the 'direct' stream", %{} do user = insert(:user) another_user = insert(:user) - Streamer.add_socket("direct", user) + Streamer.get_topic_and_add_socket("direct", user) {:ok, _create_activity} = CommonAPI.post(another_user, %{ @@ -433,7 +503,7 @@ test "it doesn't send conversation update to the 'direct' stream when the last m user = insert(:user) another_user = insert(:user) - Streamer.add_socket("direct", user) + Streamer.get_topic_and_add_socket("direct", user) {:ok, create_activity} = CommonAPI.post(another_user, %{ @@ -459,7 +529,7 @@ test "it doesn't send conversation update to the 'direct' stream when the last m test "it sends conversation update to the 'direct' stream when a message is deleted" do user = insert(:user) another_user = insert(:user) - Streamer.add_socket("direct", user) + Streamer.get_topic_and_add_socket("direct", user) {:ok, create_activity} = CommonAPI.post(another_user, %{