From 8d218ebaf5ab0b72e419068340c40a5ef9744924 Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Thu, 10 Sep 2020 10:54:57 +0300 Subject: [PATCH] Moving some background jobs into simple tasks - fetching activity data - attachment prefetching - using limiter to prevent overload --- lib/pleroma/application.ex | 6 +++++ lib/pleroma/web/activity_pub/activity_pub.ex | 4 +++- .../mrf/media_proxy_warming_policy.ex | 19 +++++++++++----- lib/pleroma/web/activity_pub/side_effects.ex | 5 +++-- lib/pleroma/web/rich_media/helpers.ex | 5 ----- lib/pleroma/workers/background_worker.ex | 15 ------------- .../20200915095704_remove_background_jobs.exs | 22 +++++++++++++++++++ .../config/deprecation_warnings_test.exs | 2 +- .../mrf/media_proxy_warming_policy_test.exs | 12 +++++----- .../controllers/status_controller_test.exs | 6 ++--- .../chat_message_reference_view_test.exs | 2 +- 11 files changed, 58 insertions(+), 40 deletions(-) create mode 100644 priv/repo/migrations/20200915095704_remove_background_jobs.exs diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 7c4cd9626d..769af1806c 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -57,6 +57,7 @@ def start(_type, _args) do setup_instrumenters() load_custom_modules() Pleroma.Docs.JSON.compile() + limiters_setup() adapter = Application.get_env(:tesla, :adapter) @@ -273,4 +274,9 @@ defp http_children(Tesla.Adapter.Gun, _) do end defp http_children(_, _), do: [] + + def limiters_setup do + [Pleroma.Web.RichMedia.Helpers, Pleroma.Web.MediaProxy] + |> Enum.each(&ConcurrentLimiter.new(&1, 1, 0)) + end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index d8f685d381..6008f2f4ab 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -123,7 +123,9 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when # Splice in the child object if we have one. activity = Maps.put_if_present(activity, :object, object) - BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) + ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn -> + Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end) + end) {:ok, activity} else diff --git a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex index 0fb05d3c4b..816cc89bfe 100644 --- a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex @@ -8,7 +8,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do alias Pleroma.HTTP alias Pleroma.Web.MediaProxy - alias Pleroma.Workers.BackgroundWorker require Logger @@ -17,7 +16,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do recv_timeout: 10_000 ] - def perform(:prefetch, url) do + defp prefetch(url) do # Fetching only proxiable resources if MediaProxy.enabled?() and MediaProxy.url_proxiable?(url) do # If preview proxy is enabled, it'll also hit media proxy (so we're caching both requests) @@ -25,17 +24,25 @@ def perform(:prefetch, url) do Logger.debug("Prefetching #{inspect(url)} as #{inspect(prefetch_url)}") - HTTP.get(prefetch_url, [], @adapter_options) + if Pleroma.Config.get(:env) == :test do + fetch(prefetch_url) + else + ConcurrentLimiter.limit(MediaProxy, fn -> + Task.start(fn -> fetch(prefetch_url) end) + end) + end end end - def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do + defp fetch(url), do: HTTP.get(url, [], @adapter_options) + + defp preload(%{"object" => %{"attachment" => attachments}} = _message) do Enum.each(attachments, fn %{"url" => url} when is_list(url) -> url |> Enum.each(fn %{"href" => href} -> - BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) + prefetch(href) x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -51,7 +58,7 @@ def filter( %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) + preload(message) {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index bbff35c360..4d8fb721e9 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -24,7 +24,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Push alias Pleroma.Web.Streamer - alias Pleroma.Workers.BackgroundWorker require Logger @@ -191,7 +190,9 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do Object.increase_replies_count(in_reply_to) end - BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) + ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn -> + Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end) + end) meta = meta diff --git a/lib/pleroma/web/rich_media/helpers.ex b/lib/pleroma/web/rich_media/helpers.ex index d67b594b57..442bf99957 100644 --- a/lib/pleroma/web/rich_media/helpers.ex +++ b/lib/pleroma/web/rich_media/helpers.ex @@ -78,11 +78,6 @@ def fetch_data_for_activity(%Activity{data: %{"type" => "Create"}} = activity) d def fetch_data_for_activity(_), do: %{} - def perform(:fetch, %Activity{} = activity) do - fetch_data_for_activity(activity) - :ok - end - def rich_media_get(url) do headers = [{"user-agent", Pleroma.Application.user_agent() <> "; Bot"}] diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 55b5a13d9e..0647c65ae2 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -3,9 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.BackgroundWorker do - alias Pleroma.Activity alias Pleroma.User - alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy use Pleroma.Workers.WorkerHelper, queue: "background" @@ -32,19 +30,6 @@ def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => iden {:ok, User.Import.perform(String.to_atom(op), user, identifiers)} end - def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do - MediaProxyWarmingPolicy.perform(:preload, message) - end - - def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do - MediaProxyWarmingPolicy.perform(:prefetch, url) - end - - def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do - activity = Activity.get_by_id(activity_id) - Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) - end - def perform(%Job{ args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id} }) do diff --git a/priv/repo/migrations/20200915095704_remove_background_jobs.exs b/priv/repo/migrations/20200915095704_remove_background_jobs.exs new file mode 100644 index 0000000000..9785bfb8a7 --- /dev/null +++ b/priv/repo/migrations/20200915095704_remove_background_jobs.exs @@ -0,0 +1,22 @@ +defmodule Pleroma.Repo.Migrations.RemoveBackgroundJobs do + use Ecto.Migration + + import Ecto.Query, only: [from: 2] + + def up do + from(j in "oban_jobs", + where: + j.queue == ^"background" and + fragment("?->>'op'", j.args) in ^[ + "fetch_data_for_activity", + "media_proxy_prefetch", + "media_proxy_preload" + ] and + j.worker == ^"Pleroma.Workers.BackgroundWorker", + select: [:id] + ) + |> Pleroma.Repo.delete_all() + end + + def down, do: :ok +end diff --git a/test/pleroma/config/deprecation_warnings_test.exs b/test/pleroma/config/deprecation_warnings_test.exs index 0cfed45557..f52629f8ab 100644 --- a/test/pleroma/config/deprecation_warnings_test.exs +++ b/test/pleroma/config/deprecation_warnings_test.exs @@ -12,7 +12,7 @@ defmodule Pleroma.Config.DeprecationWarningsTest do alias Pleroma.Config.DeprecationWarnings test "check_old_mrf_config/0" do - clear_config([:instance, :rewrite_policy], Pleroma.Web.ActivityPub.MRF.NoOpPolicy) + clear_config([:instance, :rewrite_policy], []) clear_config([:instance, :mrf_transparency], true) clear_config([:instance, :mrf_transparency_exclusions], []) diff --git a/test/pleroma/web/activity_pub/mrf/media_proxy_warming_policy_test.exs b/test/pleroma/web/activity_pub/mrf/media_proxy_warming_policy_test.exs index 1710c4d2ae..84362ce785 100644 --- a/test/pleroma/web/activity_pub/mrf/media_proxy_warming_policy_test.exs +++ b/test/pleroma/web/activity_pub/mrf/media_proxy_warming_policy_test.exs @@ -3,10 +3,10 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do - use Pleroma.DataCase + use ExUnit.Case + use Pleroma.Tests.Helpers alias Pleroma.HTTP - alias Pleroma.Tests.ObanHelpers alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy import Mock @@ -25,13 +25,13 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do setup do: clear_config([:media_proxy, :enabled], true) test "it prefetches media proxy URIs" do + Tesla.Mock.mock(fn %{method: :get, url: "http://example.com/image.jpg"} -> + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do MediaProxyWarmingPolicy.filter(@message) - ObanHelpers.perform_all() - # Performing jobs which has been just enqueued - ObanHelpers.perform_all() - assert called(HTTP.get(:_, :_, :_)) end end diff --git a/test/pleroma/web/mastodon_api/controllers/status_controller_test.exs b/test/pleroma/web/mastodon_api/controllers/status_controller_test.exs index 436608e515..252cae6a93 100644 --- a/test/pleroma/web/mastodon_api/controllers/status_controller_test.exs +++ b/test/pleroma/web/mastodon_api/controllers/status_controller_test.exs @@ -328,7 +328,7 @@ test "fake statuses' preview card is not cached", %{conn: conn} do end test "posting a status with OGP link preview", %{conn: conn} do - Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) + Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) clear_config([:rich_media, :enabled], true) conn = @@ -1197,7 +1197,7 @@ test "on pin removes deletion job, on unpin reschedule deletion" do end test "returns rich-media card", %{conn: conn, user: user} do - Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) + Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) {:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp"}) @@ -1242,7 +1242,7 @@ test "returns rich-media card", %{conn: conn, user: user} do end test "replaces missing description with an empty string", %{conn: conn, user: user} do - Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) + Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) {:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp-missing-data"}) diff --git a/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs b/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs index ae82578708..93eef00a2c 100644 --- a/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs +++ b/test/pleroma/web/pleroma_api/views/chat_message_reference_view_test.exs @@ -48,7 +48,7 @@ test "it displays a chat message" do clear_config([:rich_media, :enabled], true) - Tesla.Mock.mock(fn + Tesla.Mock.mock_global(fn %{url: "https://example.com/ogp"} -> %Tesla.Env{status: 200, body: File.read!("test/fixtures/rich_media/ogp.html")} end)