From 3e9399ec0b498c0c9783ccb0fea9f682c8b9d0ca Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 24 Jan 2019 19:15:23 +0300 Subject: [PATCH] [#534] Optimized bulk publish ops to filter on reachability early. `Instance` refactoring. --- lib/pleroma/instances.ex | 9 ++++ lib/pleroma/instances/instance.ex | 57 +++++++++++++------- lib/pleroma/web/activity_pub/activity_pub.ex | 5 +- lib/pleroma/web/salmon/salmon.ex | 8 ++- lib/pleroma/web/websub/websub.ex | 18 ++++--- 5 files changed, 69 insertions(+), 28 deletions(-) diff --git a/lib/pleroma/instances.ex b/lib/pleroma/instances.ex index 25b739520a..6d445d6b3a 100644 --- a/lib/pleroma/instances.ex +++ b/lib/pleroma/instances.ex @@ -3,10 +3,19 @@ defmodule Pleroma.Instances do @adapter Pleroma.Instances.Instance + defdelegate filter_reachable(urls), to: @adapter defdelegate reachable?(url), to: @adapter defdelegate set_reachable(url), to: @adapter defdelegate set_unreachable(url, unreachable_since \\ nil), to: @adapter def reachability_time_threshold, do: NaiveDateTime.add(NaiveDateTime.utc_now(), -30 * 24 * 3600, :second) + + def host(url_or_host) when is_binary(url_or_host) do + if url_or_host =~ ~r/^http/i do + URI.parse(url_or_host).host + else + url_or_host + end + end end diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index fe52331a35..a17c8dab10 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -18,12 +18,35 @@ defmodule Pleroma.Instances.Instance do timestamps() end - def update_changeset(struct, params \\ %{}) do + defdelegate host(url), to: Instances + + def changeset(struct, params \\ %{}) do struct |> cast(params, [:host, :unreachable_since, :reachability_checked_at]) + |> validate_required([:host]) |> unique_constraint(:host) end + def filter_reachable([]), do: [] + + def filter_reachable(urls) when is_list(urls) do + hosts = + urls + |> Enum.map(&(&1 && host(&1))) + |> Enum.filter(&(to_string(&1) != "")) + + unreachable_hosts = + Repo.all( + from(i in Instance, + where: + i.host in ^hosts and i.unreachable_since <= ^Instances.reachability_time_threshold(), + select: i.host + ) + ) + + Enum.filter(urls, &(&1 && host(&1) not in unreachable_hosts)) + end + def reachable?(url) when is_binary(url) do !Repo.one( from(i in Instance, @@ -37,13 +60,13 @@ def reachable?(url) when is_binary(url) do def reachable?(_), do: true def set_reachable(url) when is_binary(url) do - Repo.update_all( - from(i in Instance, where: i.host == ^host(url)), - set: [ - unreachable_since: nil, - reachability_checked_at: DateTime.utc_now() - ] - ) + with host <- host(url), + %Instance{} = existing_record <- Repo.get_by(Instance, %{host: host}) do + {:ok, _instance} = + existing_record + |> changeset(%{unreachable_since: nil, reachability_checked_at: DateTime.utc_now()}) + |> Repo.update() + end end def set_reachable(_), do: {0, :noop} @@ -67,19 +90,17 @@ def set_unreachable(url, unreachable_since) when is_binary(url) do do: Map.delete(changes, :unreachable_since), else: changes - {:ok, _instance} = Repo.update(update_changeset(existing_record, update_changes)) + {:ok, _instance} = + existing_record + |> changeset(update_changes) + |> Repo.update() else - {:ok, _instance} = Repo.insert(update_changeset(%Instance{}, Map.put(changes, :host, host))) + {:ok, _instance} = + %Instance{} + |> changeset(Map.put(changes, :host, host)) + |> Repo.insert() end end def set_unreachable(_, _), do: {0, :noop} - - defp host(url_or_host) do - if url_or_host =~ ~r/^http/i do - URI.parse(url_or_host).host - else - url_or_host - end - end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 44c295d658..4b34334a09 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -689,7 +689,7 @@ def should_federate?(inbox, public) do end def publish(actor, activity) do - followers = + remote_followers = if actor.follower_address in activity.recipients do {:ok, followers} = User.get_followers(actor) followers |> Enum.filter(&(!&1.local)) @@ -700,13 +700,14 @@ def publish(actor, activity) do public = is_public?(activity) remote_inboxes = - (Pleroma.Web.Salmon.remote_users(activity) ++ followers) + (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) |> Enum.filter(fn user -> User.ap_enabled?(user) end) |> Enum.map(fn %{info: %{source_data: data}} -> (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] end) |> Enum.uniq() |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Instances.filter_reachable() {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index e4d2d95171..848131d527 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -221,7 +221,13 @@ def publish(%{info: %{keys: keys}} = user, %{data: %{"type" => type}} = activity {:ok, private, _} = keys_from_pem(keys) {:ok, feed} = encode(private, feed) - remote_users(activity) + remote_users = remote_users(activity) + + salmon_urls = Enum.map(remote_users, & &1.info.salmon) + reachable_salmon_urls = Instances.filter_reachable(salmon_urls) + + remote_users + |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls)) |> Enum.each(fn remote_user -> Task.start(fn -> Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index ac8903913c..bb44425916 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -54,7 +54,12 @@ def verify(subscription, getter \\ &@httpoison.get/3) do ] def publish(topic, user, %{data: %{"type" => type}} = activity) when type in @supported_activities do - # TODO: Only send to still valid subscriptions. + response = + user + |> FeedRepresenter.to_simple_form([activity], [user]) + |> :xmerl.export_simple(:xmerl_xml) + |> to_string + query = from( sub in WebsubServerSubscription, @@ -64,13 +69,12 @@ def publish(topic, user, %{data: %{"type" => type}} = activity) subscriptions = Repo.all(query) - Enum.each(subscriptions, fn sub -> - response = - user - |> FeedRepresenter.to_simple_form([activity], [user]) - |> :xmerl.export_simple(:xmerl_xml) - |> to_string + callbacks = Enum.map(subscriptions, & &1.callback) + reachable_callbacks = Instances.filter_reachable(callbacks) + subscriptions + |> Enum.filter(&(&1.callback in reachable_callbacks)) + |> Enum.each(fn sub -> data = %{ xml: response, topic: topic,