diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs index 5426467e5e..7c4b9e2887 100644 --- a/test/pleroma/web/streamer_test.exs +++ b/test/pleroma/web/streamer_test.exs @@ -815,7 +815,47 @@ test "it sends conversation update to the 'direct' stream when a message is dele end describe "stop streaming if token got revoked" do - test "do not revoke other tokens" do + setup do + child_proc = fn start, finalize -> + fn -> + start.() + + receive do + {StreamerTest, :ready} -> + assert_receive {:render_with_user, _, "update.json", _} + + receive do + {StreamerTest, :revoked} -> finalize.() + end + end + end + end + + starter = fn user, token -> + fn -> Streamer.get_topic_and_add_socket("user", user, token) end + end + + hit = fn -> assert_receive :close end + miss = fn -> refute_receive :close end + + send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end + + %{ + child_proc: child_proc, + starter: starter, + hit: hit, + miss: miss, + send_all: send_all + } + end + + test "do not revoke other tokens", %{ + child_proc: child_proc, + starter: starter, + hit: hit, + miss: miss, + send_all: send_all + } do %{user: user, token: token} = oauth_access(["read"]) %{token: token2} = oauth_access(["read"], user: user) %{user: user2, token: user2_token} = oauth_access(["read"]) @@ -824,47 +864,54 @@ test "do not revoke other tokens" do CommonAPI.follow(user, post_user) CommonAPI.follow(user2, post_user) - Streamer.get_topic_and_add_socket("user", user, token) - Streamer.get_topic_and_add_socket("user", user, token2) - Streamer.get_topic_and_add_socket("user", user2, user2_token) + tasks = [ + Task.async(child_proc.(starter.(user, token), hit)), + Task.async(child_proc.(starter.(user, token2), miss)), + Task.async(child_proc.(starter.(user2, user2_token), miss)) + ] {:ok, _} = CommonAPI.post(post_user, %{ status: "hi" }) - assert_receive {:render_with_user, _, "update.json", _} - assert_receive {:render_with_user, _, "update.json", _} - assert_receive {:render_with_user, _, "update.json", _} + send_all.(tasks, {StreamerTest, :ready}) Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) - assert_receive :close - refute_receive :close + send_all.(tasks, {StreamerTest, :revoked}) + + Enum.each(tasks, &Task.await/1) end - test "revoke all streams for this token" do + test "revoke all streams for this token", %{ + child_proc: child_proc, + starter: starter, + hit: hit, + send_all: send_all + } do %{user: user, token: token} = oauth_access(["read"]) post_user = insert(:user) CommonAPI.follow(user, post_user) - Streamer.get_topic_and_add_socket("user", user, token) - Streamer.get_topic_and_add_socket("user", user, token) + tasks = [ + Task.async(child_proc.(starter.(user, token), hit)), + Task.async(child_proc.(starter.(user, token), hit)) + ] {:ok, _} = CommonAPI.post(post_user, %{ status: "hi" }) - assert_receive {:render_with_user, _, "update.json", _} - assert_receive {:render_with_user, _, "update.json", _} + send_all.(tasks, {StreamerTest, :ready}) Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) - assert_receive :close - assert_receive :close - refute_receive :close + send_all.(tasks, {StreamerTest, :revoked}) + + Enum.each(tasks, &Task.await/1) end end end