diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex index c27d025c48..f9c8cec321 100644 --- a/lib/pleroma/web/mastodon_api/mastodon_socket.ex +++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex @@ -1,12 +1,18 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do use Phoenix.Socket - transport :streaming, Phoenix.Transports.WebSocket.Raw + transport :streaming, Phoenix.Transports.WebSocket.Raw, + timeout: :infinity # We never receive data. def connect(params, socket) do - IO.inspect(params) - Pleroma.Web.Streamer.add_socket(params["stream"], socket) - {:ok, socket} + if params["stream"] == "public" do + socket = socket + |> assign(:topic, params["stream"]) + Pleroma.Web.Streamer.add_socket(params["stream"], socket) + {:ok, socket} + else + :error + end end def id(socket), do: nil @@ -21,7 +27,8 @@ def handle(:text, message, state) do {:text, message} end - def handle(:closed, reason, _state) do - IO.inspect reason + def handle(:closed, reason, %{socket: socket}) do + topic = socket.assigns[:topic] + Pleroma.Web.Streamer.remove_socket(topic, socket) end end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index cc38058946..3a7b917432 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -4,6 +4,10 @@ defmodule Pleroma.Web.Streamer do import Plug.Conn def start_link do + spawn(fn -> + Process.sleep(1000 * 30) # 30 seconds + GenServer.cast(__MODULE__, %{action: :ping}) + end) GenServer.start_link(__MODULE__, %{}, name: __MODULE__) end @@ -11,10 +15,28 @@ def add_socket(topic, socket) do GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) end + def remove_socket(topic, socket) do + GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) + end + def stream(topic, item) do GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) end + def handle_cast(%{action: :ping}, topics) do + Map.values(topics) + |> List.flatten + |> Enum.each(fn (socket) -> + Logger.debug("Sending keepalive ping") + send socket.transport_pid, {:text, ""} + end) + spawn(fn -> + Process.sleep(1000 * 30) # 30 seconds + GenServer.cast(__MODULE__, %{action: :ping}) + end) + {:noreply, topics} + end + def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do Logger.debug("Trying to push to #{topic}") Logger.debug("Pushing item to #{topic}") @@ -38,6 +60,15 @@ def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do {:noreply, sockets} end + def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do + sockets_for_topic = sockets[topic] || [] + sockets_for_topic = List.delete(sockets_for_topic, socket) + sockets = Map.put(sockets, topic, sockets_for_topic) + Logger.debug("Removed conn for #{topic}") + IO.inspect(sockets) + {:noreply, sockets} + end + def handle_cast(m, state) do IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") {:noreply, state}