From 2b5636bf127b1987736c281d346a5771c8168c09 Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 31 Mar 2023 21:47:37 -0400 Subject: [PATCH 01/23] Allow unified streaming endpoint --- .../web/mastodon_api/websocket_handler.ex | 17 ++++++++++++----- lib/pleroma/web/streamer.ex | 8 ++++++-- .../integration/mastodon_websocket_test.exs | 5 ++++- test/pleroma/web/streamer_test.exs | 4 ++++ 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 88444106d3..97a1f14012 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -32,8 +32,15 @@ def init(%{qs: qs} = req, state) do req end + topics = + if topic do + [topic] + else + [] + end + {:cowboy_websocket, req, - %{user: user, topic: topic, oauth_token: oauth_token, count: 0, timer: nil}, + %{user: user, topics: topics, oauth_token: oauth_token, count: 0, timer: nil}, %{idle_timeout: @timeout}} else {:error, :bad_topic} -> @@ -50,10 +57,10 @@ def init(%{qs: qs} = req, state) do def websocket_init(state) do Logger.debug( - "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}" + "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics}" ) - Streamer.add_socket(state.topic, state.oauth_token) + Enum.each(state.topics, fn topic -> Streamer.add_socket(topic, state.oauth_token) end) {:ok, %{state | timer: timer()}} end @@ -109,10 +116,10 @@ def terminate(_reason, _req, []), do: :ok def terminate(reason, _req, state) do Logger.debug( - "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}" + "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics || "?"}: #{inspect(reason)}" ) - Streamer.remove_socket(state.topic) + Enum.each(state.topics, fn topic -> Streamer.remove_socket(topic) end) :ok end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index b9a04cc767..6b5fdbf0c8 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -59,10 +59,14 @@ defp can_access_stream(user, oauth_token, kind) do end @doc "Expand and authorizes a stream" - @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) :: - {:ok, topic :: String.t()} | {:error, :bad_topic} + @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, Map.t()) :: + {:ok, topic :: String.t() | nil} | {:error, :bad_topic} def get_topic(stream, user, oauth_token, params \\ %{}) + def get_topic(nil = _stream, _user, _oauth_token, _params) do + {:ok, nil} + end + # Allow all public steams if the instance allows unauthenticated access. # Otherwise, only allow users with valid oauth tokens. def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 9be0445c0e..3b120c10e5 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -33,7 +33,6 @@ def start_socket(qs \\ nil, headers \\ []) do test "refuses invalid requests" do capture_log(fn -> - assert {:error, %WebSockex.RequestError{code: 404}} = start_socket() assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk") Process.sleep(30) end) @@ -49,6 +48,10 @@ test "requires authentication and a valid token for protected streams" do end) end + test "allows unified stream" do + assert {:ok, _} = start_socket() + end + test "allows public streams without authentication" do assert {:ok, _} = start_socket("?stream=public") assert {:ok, _} = start_socket("?stream=public:local") diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs index 7ab0e379b4..da97b87d8f 100644 --- a/test/pleroma/web/streamer_test.exs +++ b/test/pleroma/web/streamer_test.exs @@ -22,6 +22,10 @@ defmodule Pleroma.Web.StreamerTest do setup do: clear_config([:instance, :skip_thread_containment]) describe "get_topic/_ (unauthenticated)" do + test "allows no stream" do + assert {:ok, nil} = Streamer.get_topic(nil, nil, nil) + end + test "allows public" do assert {:ok, "public"} = Streamer.get_topic("public", nil, nil) assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil) From 273cda63ad79b61f4d37e4b7603694908e894e4f Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 31 Mar 2023 22:55:52 -0400 Subject: [PATCH 02/23] Allow subscribing to streams --- .../web/mastodon_api/websocket_handler.ex | 74 +++++++++++++++ lib/pleroma/web/views/streamer_view.ex | 18 ++++ .../integration/mastodon_websocket_test.exs | 90 +++++++++++++++++++ 3 files changed, 182 insertions(+) diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 97a1f14012..a42a9a63c0 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do alias Pleroma.User alias Pleroma.Web.OAuth.Token alias Pleroma.Web.Streamer + alias Pleroma.Web.StreamerView @behaviour :cowboy_websocket @@ -73,6 +74,16 @@ def websocket_handle(:pong, state) do # We only receive pings for now def websocket_handle(:ping, state), do: {:ok, state} + def websocket_handle({:text, text}, state) do + with {:ok, %{} = event} <- Jason.decode(text) do + handle_client_event(event, state) + else + _ -> + Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}") + {:ok, state} + end + end + def websocket_handle(frame, state) do Logger.error("#{__MODULE__} received frame: #{inspect(frame)}") {:ok, state} @@ -144,4 +155,67 @@ defp authenticate_request(access_token, sec_websocket) do defp timer do Process.send_after(self(), :tick, @tick) end + + defp handle_client_event(%{"type" => "subscribe", "stream" => _topic} = params, state) do + with {_, {:ok, topic}} <- + {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)}, + {_, false} <- {:subscribed, topic in state.topics} do + Streamer.add_socket(topic, state.oauth_token) + + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "success"})} + ], %{state | topics: [topic | state.topics]}} + else + {:subscribed, true} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "ignored"})} + ], state} + + {:topic, {:error, error}} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{ + type: "subscribe", + result: "error", + error: error + })} + ], state} + end + end + + defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params, state) do + with {_, {:ok, topic}} <- + {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)}, + {_, true} <- {:subscribed, topic in state.topics} do + Streamer.remove_socket(topic) + + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "success"})} + ], %{state | topics: List.delete(state.topics, topic)}} + else + {:subscribed, false} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "ignored"})} + ], state} + + {:topic, {:error, error}} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{ + type: "unsubscribe", + result: "error", + error: error + })} + ], state} + end + end + + defp handle_client_event(params, state) do + Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}") + {[], state} + end end diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index 6a55242b09..19f0987833 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -135,4 +135,22 @@ def render("conversation.json", %Participation{} = participation) do } |> Jason.encode!() end + + def render("pleroma_respond.json", %{type: type, result: result} = params) do + %{ + event: "pleroma.respond", + payload: + %{ + result: result, + type: type + } + |> Map.merge(maybe_error(params)) + |> Jason.encode!() + } + |> Jason.encode!() + end + + defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"} + defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"} + defp maybe_error(_), do: %{} end diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 3b120c10e5..9db0f714f4 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -31,6 +31,13 @@ def start_socket(qs \\ nil, headers \\ []) do WebsocketClient.start_link(self(), path, headers) end + defp decode_json(json) do + with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json), + {:ok, payload} <- Jason.decode(payload_text) do + {:ok, %{"event" => event, "payload" => payload}} + end + end + test "refuses invalid requests" do capture_log(fn -> assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk") @@ -79,6 +86,89 @@ test "receives well formatted events" do assert json == view_json end + describe "subscribing via WebSocket" do + test "can subscribe" do + user = insert(:user) + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"}) + + assert_receive {:text, raw_json}, 1_000 + assert {:ok, json} = Jason.decode(raw_json) + + assert "update" == json["event"] + assert json["payload"] + assert {:ok, json} = Jason.decode(json["payload"]) + + view_json = + Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil) + |> Jason.encode!() + |> Jason.decode!() + + assert json == view_json + end + + test "won't double subscribe" do + user = insert(:user) + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "ignored"} + }} = decode_json(raw_json) + + {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"}) + + assert_receive {:text, _}, 1_000 + refute_receive {:text, _}, 1_000 + end + + test "can unsubscribe" do + user = insert(:user) + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "unsubscribe", "result" => "success"} + }} = decode_json(raw_json) + + {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"}) + refute_receive {:text, _}, 1_000 + end + end + describe "with a valid user token" do setup do {:ok, app} = From 21395aa5090f2a53bdbe0ef5fac46693d16025ed Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 31 Mar 2023 23:19:57 -0400 Subject: [PATCH 03/23] Allow authenticating via client-sent events --- .../web/mastodon_api/websocket_handler.ex | 36 +++++++++ lib/pleroma/web/views/streamer_view.ex | 1 + .../integration/mastodon_websocket_test.exs | 81 +++++++++++++++++++ 3 files changed, 118 insertions(+) diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index a42a9a63c0..6233c3340a 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -214,6 +214,42 @@ defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params end end + defp handle_client_event( + %{"type" => "pleroma.authenticate", "token" => access_token} = _params, + state + ) do + with {:auth, nil, nil} <- {:auth, state.user, state.oauth_token}, + {:ok, user, oauth_token} <- authenticate_request(access_token, nil) do + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{ + type: "pleroma.authenticate", + result: "success" + })} + ], %{state | user: user, oauth_token: oauth_token}} + else + {:auth, _, _} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{ + type: "pleroma.authenticate", + result: "error", + error: :already_authenticated + })} + ], state} + + _ -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{ + type: "pleroma.authenticate", + result: "error", + error: :unauthorized + })} + ], state} + end + end + defp handle_client_event(params, state) do Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}") {[], state} diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index 19f0987833..0cdcb19185 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -152,5 +152,6 @@ def render("pleroma_respond.json", %{type: type, result: result} = params) do defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"} defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"} + defp maybe_error(%{error: :already_authenticated}), do: %{error: "already_authenticated"} defp maybe_error(_), do: %{} end diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 9db0f714f4..827c7b5b07 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -224,6 +224,87 @@ test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do end) end + test "accepts valid token on client-sent event", %{token: token} do + assert {:ok, pid} = start_socket() + + WebsocketClient.send_text( + pid, + %{type: "pleroma.authenticate", token: token.token} |> Jason.encode!() + ) + + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "pleroma.authenticate", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + end + + test "rejects invalid token on client-sent event" do + assert {:ok, pid} = start_socket() + + WebsocketClient.send_text( + pid, + %{type: "pleroma.authenticate", token: "Something else"} |> Jason.encode!() + ) + + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{ + "type" => "pleroma.authenticate", + "result" => "error", + "error" => "unauthorized" + } + }} = decode_json(raw_json) + end + + test "rejects new authenticate request if already logged-in", %{token: token} do + assert {:ok, pid} = start_socket() + + WebsocketClient.send_text( + pid, + %{type: "pleroma.authenticate", token: token.token} |> Jason.encode!() + ) + + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "pleroma.authenticate", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text( + pid, + %{type: "pleroma.authenticate", token: "Something else"} |> Jason.encode!() + ) + + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{ + "type" => "pleroma.authenticate", + "result" => "error", + "error" => "already_authenticated" + } + }} = decode_json(raw_json) + end + test "disconnect when token is revoked", %{app: app, user: user, token: token} do assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") From 7d005e8c93b22dc3d7be1a66dd2d404b7f54306a Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 01:25:13 -0400 Subject: [PATCH 04/23] Return stream attribute in server-sent events --- lib/pleroma/constants.ex | 4 ++ .../web/mastodon_api/websocket_handler.ex | 4 +- lib/pleroma/web/streamer.ex | 27 +++++---- lib/pleroma/web/views/streamer_view.ex | 42 +++++++++++--- .../integration/mastodon_websocket_test.exs | 33 +++++++++++ test/pleroma/web/streamer_test.exs | 58 +++++++++---------- 6 files changed, 119 insertions(+), 49 deletions(-) diff --git a/lib/pleroma/constants.ex b/lib/pleroma/constants.ex index ed60bcc371..77bc4bfac6 100644 --- a/lib/pleroma/constants.ex +++ b/lib/pleroma/constants.ex @@ -94,4 +94,8 @@ defmodule Pleroma.Constants do "application/activity+json" ] ) + + const(public_streams, + do: ["public", "public:local", "public:media", "public:local:media"] + ) end diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 6233c3340a..2707673ba8 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -89,11 +89,11 @@ def websocket_handle(frame, state) do {:ok, state} end - def websocket_info({:render_with_user, view, template, item}, state) do + def websocket_info({:render_with_user, view, template, item, topic}, state) do user = %User{} = User.get_cached_by_ap_id(state.user.ap_id) unless Streamer.filtered_by_user?(user, item) do - websocket_info({:text, view.render(template, item, user)}, %{state | user: user}) + websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user}) else {:ok, state} end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index 6b5fdbf0c8..70e46617ad 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Web.Streamer do require Logger + require Pleroma.Constants alias Pleroma.Activity alias Pleroma.Chat.MessageReference @@ -24,7 +25,7 @@ defmodule Pleroma.Web.Streamer do def registry, do: @registry - @public_streams ["public", "public:local", "public:media", "public:local:media"] + @public_streams Pleroma.Constants.public_streams() @local_streams ["public:local", "public:local:media"] @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] @@ -223,8 +224,8 @@ defp do_stream("direct", item) do end defp do_stream("follow_relationship", item) do - text = StreamerView.render("follow_relationships_update.json", item) user_topic = "user:#{item.follower.id}" + text = StreamerView.render("follow_relationships_update.json", item, user_topic) Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n") @@ -270,9 +271,11 @@ defp do_stream("list", item) do defp do_stream(topic, %Notification{} = item) when topic in ["user", "user:notification"] do - Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> + user_topic = "#{topic}:#{item.user_id}" + + Registry.dispatch(@registry, user_topic, fn list -> Enum.each(list, fn {pid, _auth} -> - send(pid, {:render_with_user, StreamerView, "notification.json", item}) + send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic}) end) end) end @@ -281,7 +284,7 @@ defp do_stream(topic, {user, %MessageReference{} = cm_ref}) when topic in ["user", "user:pleroma_chat"] do topic = "#{topic}:#{user.id}" - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, _auth} -> @@ -309,7 +312,7 @@ defp do_stream(topic, item) do end defp push_to_socket(topic, %Participation{} = participation) do - rendered = StreamerView.render("conversation.json", participation) + rendered = StreamerView.render("conversation.json", participation, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, _} -> @@ -337,12 +340,15 @@ defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"]) |> Map.put(:object, item.object) - anon_render = StreamerView.render("status_update.json", create_activity) + anon_render = StreamerView.render("status_update.json", create_activity, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, auth?} -> if auth? do - send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity}) + send( + pid, + {:render_with_user, StreamerView, "status_update.json", create_activity, topic} + ) else send(pid, {:text, anon_render}) end @@ -351,12 +357,13 @@ defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do end defp push_to_socket(topic, item) do - anon_render = StreamerView.render("update.json", item) + Logger.debug("topic=#{topic}") + anon_render = StreamerView.render("update.json", item, topic) Registry.dispatch(@registry, topic, fn list -> Enum.each(list, fn {pid, auth?} -> if auth? do - send(pid, {:render_with_user, StreamerView, "update.json", item}) + send(pid, {:render_with_user, StreamerView, "update.json", item, topic}) else send(pid, {:text, anon_render}) end diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index 0cdcb19185..f591da9a6d 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -11,8 +11,11 @@ defmodule Pleroma.Web.StreamerView do alias Pleroma.User alias Pleroma.Web.MastodonAPI.NotificationView - def render("update.json", %Activity{} = activity, %User{} = user) do + require Pleroma.Constants + + def render("update.json", %Activity{} = activity, %User{} = user, topic) do %{ + stream: render("stream.json", %{topic: topic}), event: "update", payload: Pleroma.Web.MastodonAPI.StatusView.render( @@ -25,8 +28,9 @@ def render("update.json", %Activity{} = activity, %User{} = user) do |> Jason.encode!() end - def render("status_update.json", %Activity{} = activity, %User{} = user) do + def render("status_update.json", %Activity{} = activity, %User{} = user, topic) do %{ + stream: render("stream.json", %{topic: topic}), event: "status.update", payload: Pleroma.Web.MastodonAPI.StatusView.render( @@ -39,8 +43,9 @@ def render("status_update.json", %Activity{} = activity, %User{} = user) do |> Jason.encode!() end - def render("notification.json", %Notification{} = notify, %User{} = user) do + def render("notification.json", %Notification{} = notify, %User{} = user, topic) do %{ + stream: render("stream.json", %{topic: topic}), event: "notification", payload: NotificationView.render( @@ -52,8 +57,9 @@ def render("notification.json", %Notification{} = notify, %User{} = user) do |> Jason.encode!() end - def render("update.json", %Activity{} = activity) do + def render("update.json", %Activity{} = activity, topic) do %{ + stream: render("stream.json", %{topic: topic}), event: "update", payload: Pleroma.Web.MastodonAPI.StatusView.render( @@ -65,8 +71,9 @@ def render("update.json", %Activity{} = activity) do |> Jason.encode!() end - def render("status_update.json", %Activity{} = activity) do + def render("status_update.json", %Activity{} = activity, topic) do %{ + stream: render("stream.json", %{topic: topic}), event: "status.update", payload: Pleroma.Web.MastodonAPI.StatusView.render( @@ -78,7 +85,7 @@ def render("status_update.json", %Activity{} = activity) do |> Jason.encode!() end - def render("chat_update.json", %{chat_message_reference: cm_ref}) do + def render("chat_update.json", %{chat_message_reference: cm_ref}, topic) do # Explicitly giving the cmr for the object here, so we don't accidentally # send a later 'last_message' that was inserted between inserting this and # streaming it out @@ -93,6 +100,7 @@ def render("chat_update.json", %{chat_message_reference: cm_ref}) do ) %{ + stream: render("stream.json", %{topic: topic}), event: "pleroma:chat_update", payload: representation @@ -101,8 +109,9 @@ def render("chat_update.json", %{chat_message_reference: cm_ref}) do |> Jason.encode!() end - def render("follow_relationships_update.json", item) do + def render("follow_relationships_update.json", item, topic) do %{ + stream: render("stream.json", %{topic: topic}), event: "pleroma:follow_relationships_update", payload: %{ @@ -123,8 +132,9 @@ def render("follow_relationships_update.json", item) do |> Jason.encode!() end - def render("conversation.json", %Participation{} = participation) do + def render("conversation.json", %Participation{} = participation, topic) do %{ + stream: render("stream.json", %{topic: topic}), event: "conversation", payload: Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ @@ -150,6 +160,22 @@ def render("pleroma_respond.json", %{type: type, result: result} = params) do |> Jason.encode!() end + def render("stream.json", %{topic: "user:pleroma_chat:" <> _}), do: ["user:pleroma_chat"] + def render("stream.json", %{topic: "user:notification:" <> _}), do: ["user:notification"] + def render("stream.json", %{topic: "user:" <> _}), do: ["user"] + def render("stream.json", %{topic: "direct:" <> _}), do: ["direct"] + def render("stream.json", %{topic: "list:" <> id}), do: ["list", id] + def render("stream.json", %{topic: "hashtag:" <> tag}), do: ["hashtag", tag] + + def render("stream.json", %{topic: "public:remote:media:" <> instance}), + do: ["public:remote:media", instance] + + def render("stream.json", %{topic: "public:remote:" <> instance}), + do: ["public:remote", instance] + + def render("stream.json", %{topic: stream}) when stream in Pleroma.Constants.public_streams(), + do: [stream] + defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"} defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"} defp maybe_error(%{error: :already_authenticated}), do: %{error: "already_authenticated"} diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 827c7b5b07..d2e98df3b8 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -116,6 +116,39 @@ test "can subscribe" do assert json == view_json end + test "can subscribe to multiple streams" do + user = insert(:user) + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber #mew"}) + + assert_receive {:text, raw_json}, 1_000 + assert {:ok, %{"stream" => stream1}} = Jason.decode(raw_json) + assert_receive {:text, raw_json}, 1_000 + assert {:ok, %{"stream" => stream2}} = Jason.decode(raw_json) + + streams = [stream1, stream2] + assert ["hashtag", "mew"] in streams + assert ["public"] in streams + end + test "won't double subscribe" do user = insert(:user) {:ok, pid} = start_socket() diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs index da97b87d8f..cc57a29891 100644 --- a/test/pleroma/web/streamer_test.exs +++ b/test/pleroma/web/streamer_test.exs @@ -246,7 +246,7 @@ test "it streams the user's post in the 'user' stream", %{user: user, token: oau Streamer.get_topic_and_add_socket("user", user, oauth_token) {:ok, activity} = CommonAPI.post(user, %{status: "hey"}) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} refute Streamer.filtered_by_user?(user, activity) end @@ -257,7 +257,7 @@ test "it streams boosts of the user in the 'user' stream", %{user: user, token: {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) {:ok, announce} = CommonAPI.repeat(activity.id, user) - assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce} + assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _} refute Streamer.filtered_by_user?(user, announce) end @@ -310,7 +310,7 @@ test "it streams boosts of mastodon user in the 'user' stream", %{ {:ok, %Pleroma.Activity{data: _data, local: false} = announce} = Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data) - assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce} + assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _} refute Streamer.filtered_by_user?(user, announce) end @@ -322,7 +322,7 @@ test "it sends notify to in the 'user' stream", %{ Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.stream("user", notify) - assert_receive {:render_with_user, _, _, ^notify} + assert_receive {:render_with_user, _, _, ^notify, _} refute Streamer.filtered_by_user?(user, notify) end @@ -334,7 +334,7 @@ test "it sends notify to in the 'user:notification' stream", %{ Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.stream("user:notification", notify) - assert_receive {:render_with_user, _, _, ^notify} + assert_receive {:render_with_user, _, _, ^notify, _} refute Streamer.filtered_by_user?(user, notify) end @@ -355,7 +355,7 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{ Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token) Streamer.stream("user:pleroma_chat", {user, cm_ref}) - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, "user:pleroma_chat:#{user.id}") assert text =~ "hey cirno" assert_receive {:text, ^text} @@ -373,7 +373,7 @@ test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_t Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.stream("user", {user, cm_ref}) - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) + text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, "user:#{user.id}") assert text =~ "hey cirno" assert_receive {:text, ^text} @@ -394,7 +394,7 @@ test "it sends chat message notifications to the 'user:notification' stream", %{ Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.stream("user:notification", notify) - assert_receive {:render_with_user, _, _, ^notify} + assert_receive {:render_with_user, _, _, ^notify, _} refute Streamer.filtered_by_user?(user, notify) end @@ -440,7 +440,7 @@ test "it sends favorite to 'user:notification' stream'", %{ Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, _} assert notif.activity.id == favorite_activity.id refute Streamer.filtered_by_user?(user, notif) end @@ -469,7 +469,7 @@ test "it sends follow activities to the 'user:notification' stream", %{ Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, _} assert notif.activity.id == follow_activity.id refute Streamer.filtered_by_user?(user, notif) end @@ -534,7 +534,7 @@ test "it streams edits in the 'user' stream", %{user: user, token: oauth_token} {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"}) create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"]) - assert_receive {:render_with_user, _, "status_update.json", ^create} + assert_receive {:render_with_user, _, "status_update.json", ^create, _} refute Streamer.filtered_by_user?(user, edited) end @@ -545,7 +545,7 @@ test "it streams own edits in the 'user' stream", %{user: user, token: oauth_tok {:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"}) create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"]) - assert_receive {:render_with_user, _, "status_update.json", ^create} + assert_receive {:render_with_user, _, "status_update.json", ^create, _} refute Streamer.filtered_by_user?(user, edited) end end @@ -558,7 +558,7 @@ test "it sends to public (authenticated)" do Streamer.get_topic_and_add_socket("public", user, oauth_token) {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"}) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} refute Streamer.filtered_by_user?(other_user, activity) end @@ -658,7 +658,7 @@ test "it filters to user if recipients invalid and thread containment is enabled Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} assert Streamer.filtered_by_user?(user, activity) end @@ -680,7 +680,7 @@ test "it sends message if recipients invalid and thread containment is disabled" Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} refute Streamer.filtered_by_user?(user, activity) end @@ -703,7 +703,7 @@ test "it sends message if recipients invalid and thread containment is enabled b Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} refute Streamer.filtered_by_user?(user, activity) end end @@ -717,7 +717,7 @@ test "it filters messages involving blocked users", %{user: user, token: oauth_t Streamer.get_topic_and_add_socket("public", user, oauth_token) {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"}) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} assert Streamer.filtered_by_user?(user, activity) end @@ -734,17 +734,17 @@ test "it filters messages transitively involving blocked users", %{ {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"}) - assert_receive {:render_with_user, _, _, ^activity_one} + assert_receive {:render_with_user, _, _, ^activity_one, _} assert Streamer.filtered_by_user?(blocker, activity_one) {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"}) - assert_receive {:render_with_user, _, _, ^activity_two} + assert_receive {:render_with_user, _, _, ^activity_two, _} assert Streamer.filtered_by_user?(blocker, activity_two) {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"}) - assert_receive {:render_with_user, _, _, ^activity_three} + assert_receive {:render_with_user, _, _, ^activity_three, _} assert Streamer.filtered_by_user?(blocker, activity_three) end end @@ -805,7 +805,7 @@ test "it sends wanted private posts to list", %{user: user_a, token: user_a_toke visibility: "private" }) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} refute Streamer.filtered_by_user?(user_a, activity) end end @@ -823,7 +823,7 @@ test "it filters muted reblogs", %{user: user1, token: user1_token} do Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2) - assert_receive {:render_with_user, _, _, ^announce_activity} + assert_receive {:render_with_user, _, _, ^announce_activity, _} assert Streamer.filtered_by_user?(user1, announce_activity) end @@ -839,7 +839,7 @@ test "it filters reblog notification for reblog-muted actors", %{ Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, _} assert Streamer.filtered_by_user?(user1, notif) end @@ -855,7 +855,7 @@ test "it send non-reblog notification for reblog-muted actors", %{ Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) - assert_receive {:render_with_user, _, "notification.json", notif} + assert_receive {:render_with_user, _, "notification.json", notif, _} refute Streamer.filtered_by_user?(user1, notif) end end @@ -870,7 +870,7 @@ test "it filters posts from muted threads" do {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, _} = CommonAPI.add_mute(user2, activity) - assert_receive {:render_with_user, _, _, ^activity} + assert_receive {:render_with_user, _, _, ^activity, _} assert Streamer.filtered_by_user?(user2, activity) end end @@ -912,7 +912,7 @@ test "it doesn't send conversation update to the 'direct' stream when the last m }) create_activity_id = create_activity.id - assert_receive {:render_with_user, _, _, ^create_activity} + assert_receive {:render_with_user, _, _, ^create_activity, _} assert_receive {:text, received_conversation1} assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) @@ -947,8 +947,8 @@ test "it sends conversation update to the 'direct' stream when a message is dele visibility: "direct" }) - assert_receive {:render_with_user, _, _, ^create_activity} - assert_receive {:render_with_user, _, _, ^create_activity2} + assert_receive {:render_with_user, _, _, ^create_activity, _} + assert_receive {:render_with_user, _, _, ^create_activity2, _} assert_receive {:text, received_conversation1} assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1) assert_receive {:text, received_conversation1} @@ -977,7 +977,7 @@ test "it sends conversation update to the 'direct' stream when a message is dele receive do {StreamerTest, :ready} -> - assert_receive {:render_with_user, _, "update.json", _} + assert_receive {:render_with_user, _, "update.json", _, _} receive do {StreamerTest, :revoked} -> finalize.() From a348c2e4dd0551a603509501d718d9e0b995be90 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 01:29:11 -0400 Subject: [PATCH 05/23] Use pleroma: instead of pleroma. for ws events --- .../web/mastodon_api/websocket_handler.ex | 8 ++-- lib/pleroma/web/views/streamer_view.ex | 2 +- .../integration/mastodon_websocket_test.exs | 46 ++++++++++--------- test/pleroma/web/streamer_test.exs | 14 +++++- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 2707673ba8..07c2b62e38 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -215,7 +215,7 @@ defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params end defp handle_client_event( - %{"type" => "pleroma.authenticate", "token" => access_token} = _params, + %{"type" => "pleroma:authenticate", "token" => access_token} = _params, state ) do with {:auth, nil, nil} <- {:auth, state.user, state.oauth_token}, @@ -223,7 +223,7 @@ defp handle_client_event( {[ {:text, StreamerView.render("pleroma_respond.json", %{ - type: "pleroma.authenticate", + type: "pleroma:authenticate", result: "success" })} ], %{state | user: user, oauth_token: oauth_token}} @@ -232,7 +232,7 @@ defp handle_client_event( {[ {:text, StreamerView.render("pleroma_respond.json", %{ - type: "pleroma.authenticate", + type: "pleroma:authenticate", result: "error", error: :already_authenticated })} @@ -242,7 +242,7 @@ defp handle_client_event( {[ {:text, StreamerView.render("pleroma_respond.json", %{ - type: "pleroma.authenticate", + type: "pleroma:authenticate", result: "error", error: :unauthorized })} diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index f591da9a6d..f97570b0a2 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -148,7 +148,7 @@ def render("conversation.json", %Participation{} = participation, topic) do def render("pleroma_respond.json", %{type: type, result: result} = params) do %{ - event: "pleroma.respond", + event: "pleroma:respond", payload: %{ result: result, diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index d2e98df3b8..21e7ca2b02 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -95,7 +95,7 @@ test "can subscribe" do assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "subscribe", "result" => "success"} }} = decode_json(raw_json) @@ -124,16 +124,20 @@ test "can subscribe to multiple streams" do assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "subscribe", "result" => "success"} }} = decode_json(raw_json) - WebsocketClient.send_text(pid, %{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()) + WebsocketClient.send_text( + pid, + %{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!() + ) + assert_receive {:text, raw_json}, 1_000 assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "subscribe", "result" => "success"} }} = decode_json(raw_json) @@ -157,7 +161,7 @@ test "won't double subscribe" do assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "subscribe", "result" => "success"} }} = decode_json(raw_json) @@ -166,7 +170,7 @@ test "won't double subscribe" do assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "subscribe", "result" => "ignored"} }} = decode_json(raw_json) @@ -184,7 +188,7 @@ test "can unsubscribe" do assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "subscribe", "result" => "success"} }} = decode_json(raw_json) @@ -193,7 +197,7 @@ test "can unsubscribe" do assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "unsubscribe", "result" => "success"} }} = decode_json(raw_json) @@ -262,15 +266,15 @@ test "accepts valid token on client-sent event", %{token: token} do WebsocketClient.send_text( pid, - %{type: "pleroma.authenticate", token: token.token} |> Jason.encode!() + %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!() ) assert_receive {:text, raw_json}, 1_000 assert {:ok, %{ - "event" => "pleroma.respond", - "payload" => %{"type" => "pleroma.authenticate", "result" => "success"} + "event" => "pleroma:respond", + "payload" => %{"type" => "pleroma:authenticate", "result" => "success"} }} = decode_json(raw_json) WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!()) @@ -278,7 +282,7 @@ test "accepts valid token on client-sent event", %{token: token} do assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{"type" => "subscribe", "result" => "success"} }} = decode_json(raw_json) end @@ -288,16 +292,16 @@ test "rejects invalid token on client-sent event" do WebsocketClient.send_text( pid, - %{type: "pleroma.authenticate", token: "Something else"} |> Jason.encode!() + %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!() ) assert_receive {:text, raw_json}, 1_000 assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{ - "type" => "pleroma.authenticate", + "type" => "pleroma:authenticate", "result" => "error", "error" => "unauthorized" } @@ -309,29 +313,29 @@ test "rejects new authenticate request if already logged-in", %{token: token} do WebsocketClient.send_text( pid, - %{type: "pleroma.authenticate", token: token.token} |> Jason.encode!() + %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!() ) assert_receive {:text, raw_json}, 1_000 assert {:ok, %{ - "event" => "pleroma.respond", - "payload" => %{"type" => "pleroma.authenticate", "result" => "success"} + "event" => "pleroma:respond", + "payload" => %{"type" => "pleroma:authenticate", "result" => "success"} }} = decode_json(raw_json) WebsocketClient.send_text( pid, - %{type: "pleroma.authenticate", token: "Something else"} |> Jason.encode!() + %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!() ) assert_receive {:text, raw_json}, 1_000 assert {:ok, %{ - "event" => "pleroma.respond", + "event" => "pleroma:respond", "payload" => %{ - "type" => "pleroma.authenticate", + "type" => "pleroma:authenticate", "result" => "error", "error" => "already_authenticated" } diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs index cc57a29891..d85358fd4d 100644 --- a/test/pleroma/web/streamer_test.exs +++ b/test/pleroma/web/streamer_test.exs @@ -355,7 +355,12 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{ Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token) Streamer.stream("user:pleroma_chat", {user, cm_ref}) - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, "user:pleroma_chat:#{user.id}") + text = + StreamerView.render( + "chat_update.json", + %{chat_message_reference: cm_ref}, + "user:pleroma_chat:#{user.id}" + ) assert text =~ "hey cirno" assert_receive {:text, ^text} @@ -373,7 +378,12 @@ test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_t Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.stream("user", {user, cm_ref}) - text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, "user:#{user.id}") + text = + StreamerView.render( + "chat_update.json", + %{chat_message_reference: cm_ref}, + "user:#{user.id}" + ) assert text =~ "hey cirno" assert_receive {:text, ^text} From 2d430679468a4c6a9b5c365a53f007cfa28679d9 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 02:24:30 -0400 Subject: [PATCH 06/23] Document the streaming endpoint --- .../API/differences_in_mastoapi_responses.md | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/docs/development/API/differences_in_mastoapi_responses.md b/docs/development/API/differences_in_mastoapi_responses.md index 4007c63c82..ef6b3b3beb 100644 --- a/docs/development/API/differences_in_mastoapi_responses.md +++ b/docs/development/API/differences_in_mastoapi_responses.md @@ -357,6 +357,122 @@ The message payload consist of: - `follower_count`: follower count - `following_count`: following count +### Authenticating via `sec-websocket-protocol` header + +Pleroma allows to authenticate via the `sec-websocket-protocol` header, for example, if your access token is `your-access-token`, you can authenticate using the following: + +``` +sec-websocket-protocol: your-access-token +``` + +### Authenticating after connection via `pleroma:authenticate` event + +Pleroma allows to authenticate after connection is established, via the `pleroma:authenticate` event. For example, if your access token is `your-access-token`, you can send the following after the connection is established: + +``` +{"type": "pleroma:authenticate", "token": "your-access-token"} +``` + +### Response to client-sent events + +Pleroma will respond to client-sent events that it recognizes. Supported event types are: + +- `subscribe` +- `unsubscribe` +- `pleroma:authenticate` + +The reply will be in the following format: + +``` +{ + "event": "pleroma:respond", + "payload": "{\"type\": \"\", \"result\": \"\", \"error\": \"\"}" +} +``` + +Result of the action can be either `success`, `ignored` or `error`. If it is `error`, the `error` property will contain the error code. Otherwise, the `error` property will not be present. Below are some examples: + +``` +{ + "event": "pleroma:respond", + "payload": "{\"type\": \"pleroma:authenticate\", \"result\": \"success\"}" +} + +{ + "event": "pleroma:respond", + "payload": "{\"type\": \"subscribe\", \"result\": \"ignored\"}" +} + +{ + "event": "pleroma:respond", + "payload": "{\"type\": \"unsubscribe\", \"result\": \"error\", \"error\": \"bad_topic\"}" +} +``` + +If the sent event is not of a type that Pleroma supports, it will not reply. + +### The `stream` attribute of a server-sent event + +Technically, this is in Mastodon, but its documentation does nothing to specify its format. + +This attribute appears on every event type except `pleroma:respond` and `delete`. It helps clients determine where they should display the new statuses. + +The value of the attribute is an array containing one or two elements. The first element is the type of the stream. The second is the identifier related to that specific stream, if applicable. + +For the following stream types, there is a second element in the array: + +- `list`: The second element is the id of the list. +- `hashtag`: The second element is the name of the hashtag. +- `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance. + +For all other stream types, there is no second element. + +Some examples of valid `stream` values: + +- `list:1`: List of id 1. +- `hashtag:mew`: The hashtag #mew. +- `user:notifications`: Notifications for the current user. +- `user`: Home timeline. +- `public:remote:mew.moe`: Public posts from the instance mew.moe . + +### The unified streaming endpoint + +If you do not specify a stream to connect to when requesting `/api/v1/streaming`, you will enter a connection that subscribes to no streams. After the connection is established, you can authenticate and then subscribe to different streams. + +### List of supported streams + +Below is a list of supported streams by Pleroma. To make a single-stream WebSocket connection, append the string specified in "Query style" to the streaming endpoint url. +To subscribe to a stream after the connection is established, merge the JSON object specified in "Subscribe style" with `{"type": "subscribe"}`. To unsubscribe, merge it with `{"type": "unsubscribe"}`. + +For example, to receive updates on the list 1, you can connect to `/api/v1/streaming/?stream=list&list=1`, or send + +``` +{"type": "subscribe", "stream": "list", "list": 1} +``` + +upon establishing the websocket connection. + +To unsubscribe to list 1, send + +``` +{"type": "unsubscribe", "stream": "list", "list": 1} +``` + +Note that if you specify a stream that requires a logged-in user in the query string (for example, `user` or `list`), you have to specify the access token when you are trying to establish the connection, i.e. in the query string or via the `sec-websocket-protocol` header. + +- `list` + - Query style: `?stream=list&list=` + - Subscribe style: `{"stream": "list", "list": }` +- `public`, `public:local`, `public:media`, `public:local:media`, `user`, `user:pleroma_chat`, `user:notifications`, `direct` + - Query style: `?stream=` + - Subscribe style: `{"stream": ""}` +- `hashtag` + - Query style: `?stream=hashtag&tag=` + - Subscribe style: `{"stream": "hashtag", "tag": ""}` +- `public:remote`, `public:remote:media` + - Query style: `?stream=&instance=` + - Subscribe style: `{"stream": "", "instance": ""}` + ## User muting and thread muting Both user muting and thread muting can be done for only a certain time by adding an `expires_in` parameter to the API calls and giving the expiration time in seconds. From 9572be1e5fe0201cd069271c7025ef233e8ddf00 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 02:35:12 -0400 Subject: [PATCH 07/23] Add tests for list streams --- .../integration/mastodon_websocket_test.exs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 21e7ca2b02..8129645ec7 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -342,6 +342,43 @@ test "rejects new authenticate request if already logged-in", %{token: token} do }} = decode_json(raw_json) end + test "accepts the 'list' stream", %{token: token, user: user} do + posting_user = insert(:user) + + {:ok, list} = Pleroma.List.create("test", user) + Pleroma.List.follow(list, posting_user) + + assert {:ok, _} = start_socket("?stream=list&access_token=#{token.token}&list=#{list.id}") + + assert {:ok, pid} = start_socket("?access_token=#{token.token}") + + WebsocketClient.send_text( + pid, + %{type: "subscribe", stream: "list", list: list.id} |> Jason.encode!() + ) + + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma:respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text( + pid, + %{type: "subscribe", stream: "list", list: to_string(list.id)} |> Jason.encode!() + ) + + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma:respond", + "payload" => %{"type" => "subscribe", "result" => "ignored"} + }} = decode_json(raw_json) + end + test "disconnect when token is revoked", %{app: app, user: user, token: token} do assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") From 7f12ad6dccfe4c81fa7e0d4e66c43bedadbf4c6a Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 02:37:19 -0400 Subject: [PATCH 08/23] Fix docs wording --- .../API/differences_in_mastoapi_responses.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/development/API/differences_in_mastoapi_responses.md b/docs/development/API/differences_in_mastoapi_responses.md index ef6b3b3beb..48a9c104c3 100644 --- a/docs/development/API/differences_in_mastoapi_responses.md +++ b/docs/development/API/differences_in_mastoapi_responses.md @@ -421,7 +421,7 @@ The value of the attribute is an array containing one or two elements. The first For the following stream types, there is a second element in the array: -- `list`: The second element is the id of the list. +- `list`: The second element is the id of the list, as a string. - `hashtag`: The second element is the name of the hashtag. - `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance. @@ -429,11 +429,11 @@ For all other stream types, there is no second element. Some examples of valid `stream` values: -- `list:1`: List of id 1. -- `hashtag:mew`: The hashtag #mew. -- `user:notifications`: Notifications for the current user. -- `user`: Home timeline. -- `public:remote:mew.moe`: Public posts from the instance mew.moe . +- `["list", "1"]`: List of id 1. +- `["hashtag", "mew"]`: The hashtag #mew. +- `["user:notifications"]`: Notifications for the current user. +- `["user"]`: Home timeline. +- `["public:remote", "mew.moe"]`: Public posts from the instance mew.moe . ### The unified streaming endpoint @@ -447,7 +447,7 @@ To subscribe to a stream after the connection is established, merge the JSON obj For example, to receive updates on the list 1, you can connect to `/api/v1/streaming/?stream=list&list=1`, or send ``` -{"type": "subscribe", "stream": "list", "list": 1} +{"type": "subscribe", "stream": "list", "list": "1"} ``` upon establishing the websocket connection. @@ -455,14 +455,14 @@ upon establishing the websocket connection. To unsubscribe to list 1, send ``` -{"type": "unsubscribe", "stream": "list", "list": 1} +{"type": "unsubscribe", "stream": "list", "list": "1"} ``` Note that if you specify a stream that requires a logged-in user in the query string (for example, `user` or `list`), you have to specify the access token when you are trying to establish the connection, i.e. in the query string or via the `sec-websocket-protocol` header. - `list` - Query style: `?stream=list&list=` - - Subscribe style: `{"stream": "list", "list": }` + - Subscribe style: `{"stream": "list", "list": ""}` - `public`, `public:local`, `public:media`, `public:local:media`, `user`, `user:pleroma_chat`, `user:notifications`, `direct` - Query style: `?stream=` - Subscribe style: `{"stream": ""}` From 949c4f01c6d5686ff1852e6439616c2069557ef0 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 02:38:16 -0400 Subject: [PATCH 09/23] Fix NotificationTest --- test/pleroma/notification_test.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/pleroma/notification_test.exs b/test/pleroma/notification_test.exs index e55aa3a089..71af9acb81 100644 --- a/test/pleroma/notification_test.exs +++ b/test/pleroma/notification_test.exs @@ -252,7 +252,7 @@ test "it creates a notification for user and send to the 'user' and the 'user:no task = Task.async(fn -> {:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token) - assert_receive {:render_with_user, _, _, _}, 4_000 + assert_receive {:render_with_user, _, _, _, _}, 4_000 end) task_user_notification = @@ -260,7 +260,7 @@ test "it creates a notification for user and send to the 'user' and the 'user:no {:ok, _topic} = Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) - assert_receive {:render_with_user, _, _, _}, 4_000 + assert_receive {:render_with_user, _, _, _, _}, 4_000 end) activity = insert(:note_activity) From eebc605bc25deead55c305d703c06ddb9d9b1107 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 08:27:43 -0400 Subject: [PATCH 10/23] Clear up debug statement --- lib/pleroma/web/streamer.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index 70e46617ad..48ca82421b 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -357,7 +357,6 @@ defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do end defp push_to_socket(topic, item) do - Logger.debug("topic=#{topic}") anon_render = StreamerView.render("update.json", item, topic) Registry.dispatch(@registry, topic, fn list -> From 050227f11898f402f5888d53e6460b704bcd0a8b Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 08:39:38 -0400 Subject: [PATCH 11/23] Add test to cover error: bad_topic --- test/pleroma/integration/mastodon_websocket_test.exs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 8129645ec7..0e84137490 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -180,6 +180,18 @@ test "won't double subscribe" do refute_receive {:text, _}, 1_000 end + test "rejects invalid streams" do + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "nonsense"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma:respond", + "payload" => %{"type" => "subscribe", "result" => "error", "error" => "bad_topic"} + }} = decode_json(raw_json) + end + test "can unsubscribe" do user = insert(:user) {:ok, pid} = start_socket() From 4cf109d3c4093d788e6b2de229a9e4034146a5be Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 08:47:46 -0400 Subject: [PATCH 12/23] Add test to cover rendering update with user --- .../integration/mastodon_websocket_test.exs | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 0e84137490..5ec681ce30 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -406,5 +406,32 @@ test "disconnect when token is revoked", %{app: app, user: user, token: token} d assert_receive {:close, _} refute_receive {:close, _} end + + test "receives private statuses", %{user: reading_user, token: token} do + user = insert(:user) + CommonAPI.follow(reading_user, user) + + {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") + + {:ok, activity} = + CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"}) + + assert_receive {:text, raw_json}, 1_000 + assert {:ok, json} = Jason.decode(raw_json) + + assert "update" == json["event"] + assert json["payload"] + assert {:ok, json} = Jason.decode(json["payload"]) + + view_json = + Pleroma.Web.MastodonAPI.StatusView.render("show.json", + activity: activity, + for: reading_user + ) + |> Jason.encode!() + |> Jason.decode!() + + assert json == view_json + end end end From 26f5caebae1581f1dcdc6d1352840d27009e061c Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 08:56:53 -0400 Subject: [PATCH 13/23] Add test to cover notifications streaming --- .../integration/mastodon_websocket_test.exs | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 5ec681ce30..96648264e1 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -433,5 +433,30 @@ test "receives private statuses", %{user: reading_user, token: token} do assert json == view_json end + + test "receives notifications", %{user: reading_user, token: token} do + user = insert(:user) + CommonAPI.follow(reading_user, user) + + {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") + + {:ok, %Pleroma.Activity{id: activity_id} = _activity} = + CommonAPI.post(user, %{ + status: "nice echo chamber @#{reading_user.nickname}", + visibility: "private" + }) + + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "notification", + "payload" => %{ + "status" => %{ + "id" => ^activity_id + } + } + }} = decode_json(raw_json) + end end end From 314360e5e3a4bdd13f152e94050cb8562e2ea0ed Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 09:02:43 -0400 Subject: [PATCH 14/23] Add test to cover edit streaming --- .../integration/mastodon_websocket_test.exs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 96648264e1..7d5dfc2559 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -434,6 +434,34 @@ test "receives private statuses", %{user: reading_user, token: token} do assert json == view_json end + test "receives edits", %{user: reading_user, token: token} do + user = insert(:user) + CommonAPI.follow(reading_user, user) + + {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") + + {:ok, activity} = + CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"}) + + assert_receive {:text, _raw_json}, 1_000 + + {:ok, _} = CommonAPI.update(user, activity, %{status: "mew mew", visibility: "private"}) + + assert_receive {:text, raw_json}, 1_000 + + activity = Pleroma.Activity.normalize(activity) + + view_json = + Pleroma.Web.MastodonAPI.StatusView.render("show.json", + activity: activity, + for: reading_user + ) + |> Jason.encode!() + |> Jason.decode!() + + assert {:ok, %{"event" => "status.update", "payload" => ^view_json}} = decode_json(raw_json) + end + test "receives notifications", %{user: reading_user, token: token} do user = insert(:user) CommonAPI.follow(reading_user, user) From 844d1a14e0b4aabbb61a6693fa6dd3a0aa0dbc5b Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 15:31:56 -0400 Subject: [PATCH 15/23] Start writing api docs for streaming endpoint --- lib/pleroma/web/api_spec.ex | 10 +- .../operations/streaming_operation.ex | 186 ++++++++++++++++++ 2 files changed, 195 insertions(+), 1 deletion(-) create mode 100644 lib/pleroma/web/api_spec/operations/streaming_operation.ex diff --git a/lib/pleroma/web/api_spec.ex b/lib/pleroma/web/api_spec.ex index 2d56dc6439..163226ce57 100644 --- a/lib/pleroma/web/api_spec.ex +++ b/lib/pleroma/web/api_spec.ex @@ -10,6 +10,14 @@ defmodule Pleroma.Web.ApiSpec do @behaviour OpenApi + defp streaming_paths do + %{ + "/api/v1/streaming" => %OpenApiSpex.PathItem{ + get: Pleroma.Web.ApiSpec.StreamingOperation.streaming_operation() + } + } + end + @impl OpenApi def spec(opts \\ []) do %OpenApi{ @@ -45,7 +53,7 @@ def spec(opts \\ []) do } }, # populate the paths from a phoenix router - paths: OpenApiSpex.Paths.from_router(Router), + paths: Map.merge(streaming_paths(), OpenApiSpex.Paths.from_router(Router)), components: %OpenApiSpex.Components{ parameters: %{ "accountIdOrNickname" => diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex new file mode 100644 index 0000000000..1ef7d72efa --- /dev/null +++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex @@ -0,0 +1,186 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.ApiSpec.StreamingOperation do + alias OpenApiSpex.Operation + alias OpenApiSpex.Response + alias OpenApiSpex.Schema + alias Pleroma.Web.ApiSpec.Helpers + alias Pleroma.Web.ApiSpec.Schemas.Status + + @spec open_api_operation(atom) :: Operation.t() + def open_api_operation(action) do + operation = String.to_existing_atom("#{action}_operation") + apply(__MODULE__, operation, []) + end + + @spec streaming_operation() :: Operation.t() + def streaming_operation do + %Operation{ + tags: ["Timelines"], + summary: "Establish streaming connection", + description: "Receive statuses in real-time via WebSocket.", + security: [%{"oAuth" => ["read:statuses", "read:notifications"]}], + parameters: [ + Operation.parameter(:connection, :header, %Schema{type: :string}, "connection header", + required: true + ), + Operation.parameter(:upgrade, :header, %Schema{type: :string}, "upgrade header", + required: true + ), + Operation.parameter( + :"sec-websocket-key", + :header, + %Schema{type: :string}, + "sec-websocket-key header", + required: true + ), + Operation.parameter( + :"sec-websocket-version", + :header, + %Schema{type: :string}, + "sec-websocket-version header", + required: true + ) + ], + responses: %{ + 101 => switching_protocols_response(), + 200 => + Operation.response( + "Server-sent events", + "application/json", + server_sent_events() + ) + } + } + end + + defp switching_protocols_response do + %Response{ + description: "Switching protocols", + headers: %{ + "connection" => %OpenApiSpex.Header{required: true}, + "upgrade" => %OpenApiSpex.Header{required: true}, + "sec-websocket-accept" => %OpenApiSpex.Header{required: true} + } + } + end + + defp server_sent_events do + %Schema{ + oneOf: [ + update_event(), + status_update_event(), + pleroma_respond_event() + ] + } + end + + defp stream do + %Schema{ + type: :array, + title: "Stream", + description: """ + The stream identifier. + The first item is the name of the stream. If the stream needs a differentiator, the second item will be the corresponding identifier. + Currently, for the following stream types, there is a second element in the array: + + - `list`: The second element is the id of the list, as a string. + - `hashtag`: The second element is the name of the hashtag. + - `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance. + """, + maxItems: 2, + minItems: 1, + items: %Schema{type: :string}, + example: ["hashtag", "mew"] + } + end + + defp get_schema(%Schema{} = schema), do: schema + defp get_schema(schema), do: schema.schema + + defp server_sent_event_helper(name, description, type, payload, opts \\ []) do + payload_type = opts[:payload_type] || :json + has_stream = opts[:has_stream] || true + + stream_properties = + if has_stream do + %{stream: stream()} + else + %{} + end + + stream_example = if has_stream, do: %{"stream" => get_schema(stream()).example}, else: %{} + + stream_required = if has_stream, do: [:stream], else: [] + + %Schema{ + type: :object, + title: name, + description: description, + required: [:event, :payload] ++ stream_required, + properties: + %{ + event: %Schema{ + title: "Event type", + description: "Type of the event.", + type: :string, + required: true, + enum: [type] + }, + payload: + if payload_type == :json do + %Schema{ + title: "Event payload", + description: "JSON-encoded string of #{get_schema(payload).title}", + allOf: [payload] + } + else + payload + end + } + |> Map.merge(stream_properties), + example: + %{ + "event" => type, + "payload" => get_schema(payload).example |> Jason.encode!() + } + |> Map.merge(stream_example) + } + end + + defp update_event do + server_sent_event_helper("New status", "A newly-posted status.", "update", Status) + end + + defp status_update_event do + server_sent_event_helper("Edit", "A status that was just edited", "status.update", Status) + end + + defp pleroma_respond_event do + server_sent_event_helper( + "Server response", + "A response to a client-sent event.", + "pleroma:respond", + %Schema{ + type: :object, + title: "Results", + required: [:result], + properties: %{ + result: %Schema{ + type: :string, + title: "Result of the request", + enum: ["success", "error", "ignored"] + }, + error: %Schema{ + type: :string, + title: "Error code", + description: "An error identifier. Only appears if `result` is `error`." + } + }, + example: %{"result" => "success"} + } + ) + end +end From dcef33f5f0c93883a634d12dc662b83d7ef6abfa Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 16:07:27 -0400 Subject: [PATCH 16/23] Document server-sent events of streaming --- .../operations/streaming_operation.ex | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex index 1ef7d72efa..cdef416030 100644 --- a/lib/pleroma/web/api_spec/operations/streaming_operation.ex +++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex @@ -7,6 +7,10 @@ defmodule Pleroma.Web.ApiSpec.StreamingOperation do alias OpenApiSpex.Response alias OpenApiSpex.Schema alias Pleroma.Web.ApiSpec.Helpers + alias Pleroma.Web.ApiSpec.NotificationOperation + alias Pleroma.Web.ApiSpec.Schemas.Chat + alias Pleroma.Web.ApiSpec.Schemas.Conversation + alias Pleroma.Web.ApiSpec.Schemas.FlakeID alias Pleroma.Web.ApiSpec.Schemas.Status @spec open_api_operation(atom) :: Operation.t() @@ -22,6 +26,7 @@ def streaming_operation do summary: "Establish streaming connection", description: "Receive statuses in real-time via WebSocket.", security: [%{"oAuth" => ["read:statuses", "read:notifications"]}], + operationId: "WebsocketHandler.streaming", parameters: [ Operation.parameter(:connection, :header, %Schema{type: :string}, "connection header", required: true @@ -72,6 +77,11 @@ defp server_sent_events do oneOf: [ update_event(), status_update_event(), + notification_event(), + chat_update_event(), + follow_relationships_update_event(), + conversation_event(), + delete_event(), pleroma_respond_event() ] } @@ -158,6 +168,102 @@ defp status_update_event do server_sent_event_helper("Edit", "A status that was just edited", "status.update", Status) end + defp notification_event do + server_sent_event_helper( + "Notification", + "A new notification.", + "notification", + NotificationOperation.notification() + ) + end + + defp follow_relationships_update_event do + server_sent_event_helper( + "Follow relationships update", + "An update to follow relationships.", + "pleroma:follow_relationships_update", + %Schema{ + type: :object, + title: "Follow relationships update", + required: [:state, :follower, :following], + properties: %{ + state: %Schema{ + type: :string, + description: "Follow state of the relationship.", + enum: ["follow_pending", "follow_accept", "follow_reject", "unfollow"] + }, + follower: %Schema{ + type: :object, + description: "Information about the follower.", + required: [:id, :follower_count, :following_count], + properties: %{ + id: FlakeID, + follower_count: %Schema{type: :integer}, + following_count: %Schema{type: :integer} + } + }, + following: %Schema{ + type: :object, + description: "Information about the following person.", + required: [:id, :follower_count, :following_count], + properties: %{ + id: FlakeID, + follower_count: %Schema{type: :integer}, + following_count: %Schema{type: :integer} + } + } + }, + example: %{ + "state" => "follow_pending", + "follower" => %{ + "id" => "someUser1", + "follower_count" => 1, + "following_count" => 1 + }, + "following" => %{ + "id" => "someUser2", + "follower_count" => 1, + "following_count" => 1 + } + } + } + ) + end + + defp chat_update_event do + server_sent_event_helper( + "Chat update", + "A new chat message.", + "pleroma:chat_update", + Chat + ) + end + + defp conversation_event do + server_sent_event_helper( + "Conversation", + "An update about a conversation", + "conversation", + Conversation + ) + end + + defp delete_event do + server_sent_event_helper( + "Delete", + "A status that was just deleted.", + "delete", + %Schema{ + type: :string, + title: "Status id", + description: "Id of the deleted status", + allOf: [FlakeID], + example: "some-opaque-id" + }, + payload_type: :string + ) + end + defp pleroma_respond_event do server_sent_event_helper( "Server response", From 8829dcaee42b3ad1ee50f95b0586b22118771785 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 16:33:22 -0400 Subject: [PATCH 17/23] Document client-sent events in streaming --- .../operations/streaming_operation.ex | 126 +++++++++++++++++- 1 file changed, 125 insertions(+), 1 deletion(-) diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex index cdef416030..18674c9a10 100644 --- a/lib/pleroma/web/api_spec/operations/streaming_operation.ex +++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex @@ -6,13 +6,14 @@ defmodule Pleroma.Web.ApiSpec.StreamingOperation do alias OpenApiSpex.Operation alias OpenApiSpex.Response alias OpenApiSpex.Schema - alias Pleroma.Web.ApiSpec.Helpers alias Pleroma.Web.ApiSpec.NotificationOperation alias Pleroma.Web.ApiSpec.Schemas.Chat alias Pleroma.Web.ApiSpec.Schemas.Conversation alias Pleroma.Web.ApiSpec.Schemas.FlakeID alias Pleroma.Web.ApiSpec.Schemas.Status + require Pleroma.Constants + @spec open_api_operation(atom) :: Operation.t() def open_api_operation(action) do operation = String.to_existing_atom("#{action}_operation") @@ -49,6 +50,7 @@ def streaming_operation do required: true ) ], + requestBody: request_body("Client-sent events", client_sent_events()), responses: %{ 101 => switching_protocols_response(), 200 => @@ -289,4 +291,126 @@ defp pleroma_respond_event do } ) end + + defp client_sent_events do + %Schema{ + oneOf: [ + subscribe_event(), + unsubscribe_event(), + authenticate_event() + ] + } + end + + defp request_body(description, schema, opts \\ []) do + %OpenApiSpex.RequestBody{ + description: description, + content: %{ + "application/json" => %OpenApiSpex.MediaType{ + schema: schema, + example: opts[:example], + examples: opts[:examples] + } + } + } + end + + defp client_sent_event_helper(name, description, type, properties, opts) do + required = opts[:required] || [] + + %Schema{ + type: :object, + title: name, + required: [:type] ++ required, + description: description, + properties: + %{ + type: %Schema{type: :string, enum: [type], description: "Type of the event."} + } + |> Map.merge(properties), + example: opts[:example] + } + end + + defp subscribe_event do + client_sent_event_helper( + "Subscribe", + "Subscribe to a stream.", + "subscribe", + stream_specifier(), + required: [:stream], + example: %{"type" => "subscribe", "stream" => "list", "list" => "1"} + ) + end + + defp unsubscribe_event do + client_sent_event_helper( + "Unsubscribe", + "Unsubscribe from a stream.", + "subscribe", + stream_specifier(), + required: [:stream], + example: %{ + "type" => "unsubscribe", + "stream" => "public:remote:media", + "instance" => "example.org" + } + ) + end + + defp authenticate_event do + client_sent_event_helper( + "Authenticate", + "Authenticate via an access token.", + "pleroma:authenticate", + %{ + token: %Schema{ + type: :string, + description: "An OAuth access token with corresponding permissions.", + example: "some token" + } + }, + required: [:token] + ) + end + + defp stream_specifier do + %{ + stream: %Schema{ + type: :string, + description: "The name of the stream.", + enum: + Pleroma.Constants.public_streams() ++ + [ + "public:remote", + "public:remote:media", + "user", + "user:pleroma_chat", + "user:notification", + "direct", + "list", + "hashtag" + ] + }, + list: %Schema{ + type: :string, + title: "List id", + description: "The id of the list. Required when `stream` is `list`.", + example: "some-id" + }, + tag: %Schema{ + type: :string, + title: "Hashtag name", + description: "The name of the hashtag. Required when `stream` is `hashtag`.", + example: "mew" + }, + instance: %Schema{ + type: :string, + title: "Domain name", + description: + "Domain name of the instance. Required when `stream` is `public:remote` or `public:remote:media`.", + example: "example.org" + } + } + end end From f393a15dd1217a7f6aec9e9acc7b983e7b165a91 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 16:46:32 -0400 Subject: [PATCH 18/23] Fix some specs about server-sent events in streaming --- .../operations/streaming_operation.ex | 49 ++++++++++++------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex index 18674c9a10..fa48b96131 100644 --- a/lib/pleroma/web/api_spec/operations/streaming_operation.ex +++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex @@ -113,8 +113,8 @@ defp get_schema(%Schema{} = schema), do: schema defp get_schema(schema), do: schema.schema defp server_sent_event_helper(name, description, type, payload, opts \\ []) do - payload_type = opts[:payload_type] || :json - has_stream = opts[:has_stream] || true + payload_type = Keyword.get(opts, :payload_type, :json) + has_stream = Keyword.get(opts, :has_stream, true) stream_properties = if has_stream do @@ -127,6 +127,24 @@ defp server_sent_event_helper(name, description, type, payload, opts \\ []) do stream_required = if has_stream, do: [:stream], else: [] + payload_schema = + if payload_type == :json do + %Schema{ + title: "Event payload", + description: "JSON-encoded string of #{get_schema(payload).title}", + allOf: [payload] + } + else + payload + end + + payload_example = + if payload_type == :json do + get_schema(payload).example |> Jason.encode!() + else + get_schema(payload).example + end + %Schema{ type: :object, title: name, @@ -141,22 +159,13 @@ defp server_sent_event_helper(name, description, type, payload, opts \\ []) do required: true, enum: [type] }, - payload: - if payload_type == :json do - %Schema{ - title: "Event payload", - description: "JSON-encoded string of #{get_schema(payload).title}", - allOf: [payload] - } - else - payload - end + payload: payload_schema } |> Map.merge(stream_properties), example: %{ "event" => type, - "payload" => get_schema(payload).example |> Jason.encode!() + "payload" => payload_example } |> Map.merge(stream_example) } @@ -262,7 +271,8 @@ defp delete_event do allOf: [FlakeID], example: "some-opaque-id" }, - payload_type: :string + payload_type: :string, + has_stream: false ) end @@ -274,7 +284,7 @@ defp pleroma_respond_event do %Schema{ type: :object, title: "Results", - required: [:result], + required: [:result, :type], properties: %{ result: %Schema{ type: :string, @@ -285,10 +295,15 @@ defp pleroma_respond_event do type: :string, title: "Error code", description: "An error identifier. Only appears if `result` is `error`." + }, + type: %Schema{ + type: :string, + description: "Type of the request." } }, - example: %{"result" => "success"} - } + example: %{"result" => "success", "type" => "pleroma:authenticate"} + }, + has_stream: false ) end From c13f0a8460853d8fe9aa04cb5d57ea06b692a3bf Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 17:00:35 -0400 Subject: [PATCH 19/23] Add meta-info and query strings to streaming doc --- .../operations/streaming_operation.ex | 89 +++++++++++++------ 1 file changed, 61 insertions(+), 28 deletions(-) diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex index fa48b96131..4c4888d8ee 100644 --- a/lib/pleroma/web/api_spec/operations/streaming_operation.ex +++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex @@ -25,31 +25,46 @@ def streaming_operation do %Operation{ tags: ["Timelines"], summary: "Establish streaming connection", - description: "Receive statuses in real-time via WebSocket.", + description: """ + Receive statuses in real-time via WebSocket. + + You can specify the access token on the query string or through the `sec-websocket-protocol` header. Using + the query string to authenticate is considered unsafe and should not be used unless you have to (e.g. to maintain + your client's compatibility with Mastodon). + + You may specify a stream on the query string. If you do so and you are connecting to a stream that requires logged-in users, + you must specify the access token at the time of the connection (i.e. via query string or header). + + Otherwise, you have the option to authenticate after you have established the connection through client-sent events. + + The "Request body" section below describes what events clients can send through WebSocket, and the "Responses" section + describes what events server will send through WebSocket. + """, security: [%{"oAuth" => ["read:statuses", "read:notifications"]}], operationId: "WebsocketHandler.streaming", - parameters: [ - Operation.parameter(:connection, :header, %Schema{type: :string}, "connection header", - required: true - ), - Operation.parameter(:upgrade, :header, %Schema{type: :string}, "upgrade header", - required: true - ), - Operation.parameter( - :"sec-websocket-key", - :header, - %Schema{type: :string}, - "sec-websocket-key header", - required: true - ), - Operation.parameter( - :"sec-websocket-version", - :header, - %Schema{type: :string}, - "sec-websocket-version header", - required: true - ) - ], + parameters: + [ + Operation.parameter(:connection, :header, %Schema{type: :string}, "connection header", + required: true + ), + Operation.parameter(:upgrade, :header, %Schema{type: :string}, "upgrade header", + required: true + ), + Operation.parameter( + :"sec-websocket-key", + :header, + %Schema{type: :string}, + "sec-websocket-key header", + required: true + ), + Operation.parameter( + :"sec-websocket-version", + :header, + %Schema{type: :string}, + "sec-websocket-version header", + required: true + ) + ] ++ stream_params() ++ access_token_params(), requestBody: request_body("Client-sent events", client_sent_events()), responses: %{ 101 => switching_protocols_response(), @@ -63,6 +78,20 @@ def streaming_operation do } end + defp stream_params do + stream_specifier() + |> Enum.map(fn {name, schema} -> + Operation.parameter(name, :query, schema, get_schema(schema).description) + end) + end + + defp access_token_params do + [ + Operation.parameter(:access_token, :query, token(), token().description), + Operation.parameter(:"sec-websocket-protocol", :header, token(), token().description) + ] + end + defp switching_protocols_response do %Response{ description: "Switching protocols", @@ -379,16 +408,20 @@ defp authenticate_event do "Authenticate via an access token.", "pleroma:authenticate", %{ - token: %Schema{ - type: :string, - description: "An OAuth access token with corresponding permissions.", - example: "some token" - } + token: token() }, required: [:token] ) end + defp token do + %Schema{ + type: :string, + description: "An OAuth access token with corresponding permissions.", + example: "some token" + } + end + defp stream_specifier do %{ stream: %Schema{ From 32de0683c4069f5877722dce0b4f5a9a6825b7c7 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 17:15:58 -0400 Subject: [PATCH 20/23] Fix unsubscribe event type in streaming doc --- lib/pleroma/web/api_spec/operations/streaming_operation.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex index 4c4888d8ee..ae3aeb4abc 100644 --- a/lib/pleroma/web/api_spec/operations/streaming_operation.ex +++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex @@ -391,7 +391,7 @@ defp unsubscribe_event do client_sent_event_helper( "Unsubscribe", "Unsubscribe from a stream.", - "subscribe", + "unsubscribe", stream_specifier(), required: [:stream], example: %{ From 840dd01035581a37613f695facdd99fbf6ac8319 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Apr 2023 18:33:43 -0400 Subject: [PATCH 21/23] Fix duplicated schema names --- lib/pleroma/web/api_spec/operations/streaming_operation.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex index ae3aeb4abc..b580bc2f03 100644 --- a/lib/pleroma/web/api_spec/operations/streaming_operation.ex +++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex @@ -281,7 +281,7 @@ defp chat_update_event do defp conversation_event do server_sent_event_helper( - "Conversation", + "Conversation update", "An update about a conversation", "conversation", Conversation From 3e7d2e29b369535a9a942a4090cde9a21892f8c1 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 1 Jul 2023 23:07:07 -0400 Subject: [PATCH 22/23] Add changelog --- changelog.d/unified-streaming.add | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/unified-streaming.add diff --git a/changelog.d/unified-streaming.add b/changelog.d/unified-streaming.add new file mode 100644 index 0000000000..84821fcc8d --- /dev/null +++ b/changelog.d/unified-streaming.add @@ -0,0 +1 @@ +Add unified streaming endpoint From eb33a03d0ad8571e3f0f3e0c5e9af39158c72a09 Mon Sep 17 00:00:00 2001 From: tusooa Date: Tue, 18 Jul 2023 18:13:49 -0400 Subject: [PATCH 23/23] Explain the encode-decode roundtrip --- test/pleroma/integration/mastodon_websocket_test.exs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 7d5dfc2559..a2c20f0a6c 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -38,6 +38,13 @@ defp decode_json(json) do end end + # Turns atom keys to strings + defp atom_key_to_string(json) do + json + |> Jason.encode!() + |> Jason.decode!() + end + test "refuses invalid requests" do capture_log(fn -> assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk") @@ -80,8 +87,7 @@ test "receives well formatted events" do view_json = Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil) - |> Jason.encode!() - |> Jason.decode!() + |> atom_key_to_string() assert json == view_json end