Allow subscribing to streams
This commit is contained in:
parent
2b5636bf12
commit
273cda63ad
3 changed files with 182 additions and 0 deletions
|
@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.Web.OAuth.Token
|
alias Pleroma.Web.OAuth.Token
|
||||||
alias Pleroma.Web.Streamer
|
alias Pleroma.Web.Streamer
|
||||||
|
alias Pleroma.Web.StreamerView
|
||||||
|
|
||||||
@behaviour :cowboy_websocket
|
@behaviour :cowboy_websocket
|
||||||
|
|
||||||
|
@ -73,6 +74,16 @@ def websocket_handle(:pong, state) do
|
||||||
# We only receive pings for now
|
# We only receive pings for now
|
||||||
def websocket_handle(:ping, state), do: {:ok, state}
|
def websocket_handle(:ping, state), do: {:ok, state}
|
||||||
|
|
||||||
|
def websocket_handle({:text, text}, state) do
|
||||||
|
with {:ok, %{} = event} <- Jason.decode(text) do
|
||||||
|
handle_client_event(event, state)
|
||||||
|
else
|
||||||
|
_ ->
|
||||||
|
Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}")
|
||||||
|
{:ok, state}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def websocket_handle(frame, state) do
|
def websocket_handle(frame, state) do
|
||||||
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
|
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
|
||||||
{:ok, state}
|
{:ok, state}
|
||||||
|
@ -144,4 +155,67 @@ defp authenticate_request(access_token, sec_websocket) do
|
||||||
defp timer do
|
defp timer do
|
||||||
Process.send_after(self(), :tick, @tick)
|
Process.send_after(self(), :tick, @tick)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp handle_client_event(%{"type" => "subscribe", "stream" => _topic} = params, state) do
|
||||||
|
with {_, {:ok, topic}} <-
|
||||||
|
{:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
|
||||||
|
{_, false} <- {:subscribed, topic in state.topics} do
|
||||||
|
Streamer.add_socket(topic, state.oauth_token)
|
||||||
|
|
||||||
|
{[
|
||||||
|
{:text,
|
||||||
|
StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "success"})}
|
||||||
|
], %{state | topics: [topic | state.topics]}}
|
||||||
|
else
|
||||||
|
{:subscribed, true} ->
|
||||||
|
{[
|
||||||
|
{:text,
|
||||||
|
StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "ignored"})}
|
||||||
|
], state}
|
||||||
|
|
||||||
|
{:topic, {:error, error}} ->
|
||||||
|
{[
|
||||||
|
{:text,
|
||||||
|
StreamerView.render("pleroma_respond.json", %{
|
||||||
|
type: "subscribe",
|
||||||
|
result: "error",
|
||||||
|
error: error
|
||||||
|
})}
|
||||||
|
], state}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params, state) do
|
||||||
|
with {_, {:ok, topic}} <-
|
||||||
|
{:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
|
||||||
|
{_, true} <- {:subscribed, topic in state.topics} do
|
||||||
|
Streamer.remove_socket(topic)
|
||||||
|
|
||||||
|
{[
|
||||||
|
{:text,
|
||||||
|
StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "success"})}
|
||||||
|
], %{state | topics: List.delete(state.topics, topic)}}
|
||||||
|
else
|
||||||
|
{:subscribed, false} ->
|
||||||
|
{[
|
||||||
|
{:text,
|
||||||
|
StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "ignored"})}
|
||||||
|
], state}
|
||||||
|
|
||||||
|
{:topic, {:error, error}} ->
|
||||||
|
{[
|
||||||
|
{:text,
|
||||||
|
StreamerView.render("pleroma_respond.json", %{
|
||||||
|
type: "unsubscribe",
|
||||||
|
result: "error",
|
||||||
|
error: error
|
||||||
|
})}
|
||||||
|
], state}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_client_event(params, state) do
|
||||||
|
Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}")
|
||||||
|
{[], state}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -135,4 +135,22 @@ def render("conversation.json", %Participation{} = participation) do
|
||||||
}
|
}
|
||||||
|> Jason.encode!()
|
|> Jason.encode!()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def render("pleroma_respond.json", %{type: type, result: result} = params) do
|
||||||
|
%{
|
||||||
|
event: "pleroma.respond",
|
||||||
|
payload:
|
||||||
|
%{
|
||||||
|
result: result,
|
||||||
|
type: type
|
||||||
|
}
|
||||||
|
|> Map.merge(maybe_error(params))
|
||||||
|
|> Jason.encode!()
|
||||||
|
}
|
||||||
|
|> Jason.encode!()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"}
|
||||||
|
defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"}
|
||||||
|
defp maybe_error(_), do: %{}
|
||||||
end
|
end
|
||||||
|
|
|
@ -31,6 +31,13 @@ def start_socket(qs \\ nil, headers \\ []) do
|
||||||
WebsocketClient.start_link(self(), path, headers)
|
WebsocketClient.start_link(self(), path, headers)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp decode_json(json) do
|
||||||
|
with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
|
||||||
|
{:ok, payload} <- Jason.decode(payload_text) do
|
||||||
|
{:ok, %{"event" => event, "payload" => payload}}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
test "refuses invalid requests" do
|
test "refuses invalid requests" do
|
||||||
capture_log(fn ->
|
capture_log(fn ->
|
||||||
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
|
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
|
||||||
|
@ -79,6 +86,89 @@ test "receives well formatted events" do
|
||||||
assert json == view_json
|
assert json == view_json
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "subscribing via WebSocket" do
|
||||||
|
test "can subscribe" do
|
||||||
|
user = insert(:user)
|
||||||
|
{:ok, pid} = start_socket()
|
||||||
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
||||||
|
assert_receive {:text, raw_json}, 1_000
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
"event" => "pleroma.respond",
|
||||||
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
||||||
|
}} = decode_json(raw_json)
|
||||||
|
|
||||||
|
{:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
|
||||||
|
|
||||||
|
assert_receive {:text, raw_json}, 1_000
|
||||||
|
assert {:ok, json} = Jason.decode(raw_json)
|
||||||
|
|
||||||
|
assert "update" == json["event"]
|
||||||
|
assert json["payload"]
|
||||||
|
assert {:ok, json} = Jason.decode(json["payload"])
|
||||||
|
|
||||||
|
view_json =
|
||||||
|
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
|
||||||
|
|> Jason.encode!()
|
||||||
|
|> Jason.decode!()
|
||||||
|
|
||||||
|
assert json == view_json
|
||||||
|
end
|
||||||
|
|
||||||
|
test "won't double subscribe" do
|
||||||
|
user = insert(:user)
|
||||||
|
{:ok, pid} = start_socket()
|
||||||
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
||||||
|
assert_receive {:text, raw_json}, 1_000
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
"event" => "pleroma.respond",
|
||||||
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
||||||
|
}} = decode_json(raw_json)
|
||||||
|
|
||||||
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
||||||
|
assert_receive {:text, raw_json}, 1_000
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
"event" => "pleroma.respond",
|
||||||
|
"payload" => %{"type" => "subscribe", "result" => "ignored"}
|
||||||
|
}} = decode_json(raw_json)
|
||||||
|
|
||||||
|
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
|
||||||
|
|
||||||
|
assert_receive {:text, _}, 1_000
|
||||||
|
refute_receive {:text, _}, 1_000
|
||||||
|
end
|
||||||
|
|
||||||
|
test "can unsubscribe" do
|
||||||
|
user = insert(:user)
|
||||||
|
{:ok, pid} = start_socket()
|
||||||
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
||||||
|
assert_receive {:text, raw_json}, 1_000
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
"event" => "pleroma.respond",
|
||||||
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
||||||
|
}} = decode_json(raw_json)
|
||||||
|
|
||||||
|
WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
|
||||||
|
assert_receive {:text, raw_json}, 1_000
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
"event" => "pleroma.respond",
|
||||||
|
"payload" => %{"type" => "unsubscribe", "result" => "success"}
|
||||||
|
}} = decode_json(raw_json)
|
||||||
|
|
||||||
|
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
|
||||||
|
refute_receive {:text, _}, 1_000
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "with a valid user token" do
|
describe "with a valid user token" do
|
||||||
setup do
|
setup do
|
||||||
{:ok, app} =
|
{:ok, app} =
|
||||||
|
|
Loading…
Reference in a new issue