Improve Remote Object Fetcher error handling, Oban
This commit is contained in:
parent
ee26d85578
commit
089fa4d146
5 changed files with 63 additions and 63 deletions
1
changelog.d/remote-object-fetcher.fix
Normal file
1
changelog.d/remote-object-fetcher.fix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Remote Fetcher Worker recognizes more permanent failure errors
|
|
@ -73,50 +73,22 @@ def fetch_object_from_id(id, options \\ []) do
|
||||||
{:object, data, Object.normalize(activity, fetch: false)} do
|
{:object, data, Object.normalize(activity, fetch: false)} do
|
||||||
{:ok, object}
|
{:ok, object}
|
||||||
else
|
else
|
||||||
{:allowed_depth, false} = e ->
|
|
||||||
log_fetch_error(id, e)
|
|
||||||
{:error, :allowed_depth}
|
|
||||||
|
|
||||||
{:containment, reason} = e ->
|
|
||||||
log_fetch_error(id, e)
|
|
||||||
{:error, reason}
|
|
||||||
|
|
||||||
{:transmogrifier, {:error, {:reject, reason}}} = e ->
|
|
||||||
log_fetch_error(id, e)
|
|
||||||
{:reject, reason}
|
|
||||||
|
|
||||||
{:transmogrifier, {:reject, reason}} = e ->
|
|
||||||
log_fetch_error(id, e)
|
|
||||||
{:reject, reason}
|
|
||||||
|
|
||||||
{:transmogrifier, reason} = e ->
|
|
||||||
log_fetch_error(id, e)
|
|
||||||
{:error, reason}
|
|
||||||
|
|
||||||
{:object, data, nil} ->
|
|
||||||
reinject_object(%Object{}, data)
|
|
||||||
|
|
||||||
{:normalize, object = %Object{}} ->
|
{:normalize, object = %Object{}} ->
|
||||||
{:ok, object}
|
{:ok, object}
|
||||||
|
|
||||||
{:fetch_object, %Object{} = object} ->
|
{:fetch_object, %Object{} = object} ->
|
||||||
{:ok, object}
|
{:ok, object}
|
||||||
|
|
||||||
{:fetch, {:error, reason}} = e ->
|
{:object, data, nil} ->
|
||||||
log_fetch_error(id, e)
|
reinject_object(%Object{}, data)
|
||||||
{:error, reason}
|
|
||||||
|
|
||||||
e ->
|
e ->
|
||||||
log_fetch_error(id, e)
|
Logger.metadata(object: id)
|
||||||
{:error, e}
|
Logger.error("Object rejected while fetching #{id} #{inspect(e)}")
|
||||||
|
e
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp log_fetch_error(id, error) do
|
|
||||||
Logger.metadata(object: id)
|
|
||||||
Logger.error("Object rejected while fetching #{id} #{inspect(error)}")
|
|
||||||
end
|
|
||||||
|
|
||||||
defp prepare_activity_params(data) do
|
defp prepare_activity_params(data) do
|
||||||
%{
|
%{
|
||||||
"type" => "Create",
|
"type" => "Create",
|
||||||
|
|
|
@ -13,17 +13,26 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
||||||
{:ok, _object} ->
|
{:ok, _object} ->
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
{:reject, reason} ->
|
{:allowed_depth, false} ->
|
||||||
|
{:cancel, :allowed_depth}
|
||||||
|
|
||||||
|
{:containment, reason} ->
|
||||||
{:cancel, reason}
|
{:cancel, reason}
|
||||||
|
|
||||||
{:error, :forbidden} ->
|
{:transmogrifier, reason} ->
|
||||||
{:cancel, :forbidden}
|
{:cancel, reason}
|
||||||
|
|
||||||
{:error, :not_found} ->
|
{:fetch, {:error, :forbidden = reason}} ->
|
||||||
{:cancel, :not_found}
|
{:cancel, reason}
|
||||||
|
|
||||||
{:error, :allowed_depth} ->
|
{:fetch, {:error, :not_found = reason}} ->
|
||||||
{:cancel, :allowed_depth}
|
{:cancel, reason}
|
||||||
|
|
||||||
|
{:fetch, {:error, {:content_type, _}} = reason} ->
|
||||||
|
{:cancel, reason}
|
||||||
|
|
||||||
|
{:fetch, {:error, reason}} ->
|
||||||
|
{:error, reason}
|
||||||
|
|
||||||
{:error, _} = e ->
|
{:error, _} = e ->
|
||||||
e
|
e
|
||||||
|
|
|
@ -100,7 +100,7 @@ test "it works when fetching the OP actor errors out" do
|
||||||
test "it returns thread depth exceeded error if thread depth is exceeded" do
|
test "it returns thread depth exceeded error if thread depth is exceeded" do
|
||||||
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
||||||
|
|
||||||
assert {:error, :allowed_depth} = Fetcher.fetch_object_from_id(@ap_id, depth: 1)
|
assert {:allowed_depth, false} = Fetcher.fetch_object_from_id(@ap_id, depth: 1)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it fetches object if max thread depth is restricted to 0 and depth is not specified" do
|
test "it fetches object if max thread depth is restricted to 0 and depth is not specified" do
|
||||||
|
@ -118,15 +118,18 @@ test "it fetches object if requested depth does not exceed max thread depth" do
|
||||||
|
|
||||||
describe "actor origin containment" do
|
describe "actor origin containment" do
|
||||||
test "it rejects objects with a bogus origin" do
|
test "it rejects objects with a bogus origin" do
|
||||||
{:error, _} = Fetcher.fetch_object_from_id("https://info.pleroma.site/activity.json")
|
{:containment, :error} =
|
||||||
|
Fetcher.fetch_object_from_id("https://info.pleroma.site/activity.json")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it rejects objects when attributedTo is wrong (variant 1)" do
|
test "it rejects objects when attributedTo is wrong (variant 1)" do
|
||||||
{:error, _} = Fetcher.fetch_object_from_id("https://info.pleroma.site/activity2.json")
|
{:containment, :error} =
|
||||||
|
Fetcher.fetch_object_from_id("https://info.pleroma.site/activity2.json")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it rejects objects when attributedTo is wrong (variant 2)" do
|
test "it rejects objects when attributedTo is wrong (variant 2)" do
|
||||||
{:error, _} = Fetcher.fetch_object_from_id("https://info.pleroma.site/activity3.json")
|
{:containment, :error} =
|
||||||
|
Fetcher.fetch_object_from_id("https://info.pleroma.site/activity3.json")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -150,14 +153,14 @@ test "Return MRF reason when fetched status is rejected by one" do
|
||||||
clear_config([:mrf_keyword, :reject], ["yeah"])
|
clear_config([:mrf_keyword, :reject], ["yeah"])
|
||||||
clear_config([:mrf, :policies], [Pleroma.Web.ActivityPub.MRF.KeywordPolicy])
|
clear_config([:mrf, :policies], [Pleroma.Web.ActivityPub.MRF.KeywordPolicy])
|
||||||
|
|
||||||
assert {:reject, "[KeywordPolicy] Matches with rejected keyword"} ==
|
assert {:transmogrifier, {:reject, "[KeywordPolicy] Matches with rejected keyword"}} ==
|
||||||
Fetcher.fetch_object_from_id(
|
Fetcher.fetch_object_from_id(
|
||||||
"http://mastodon.example.org/@admin/99541947525187367"
|
"http://mastodon.example.org/@admin/99541947525187367"
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it does not fetch a spoofed object uploaded on an instance as an attachment" do
|
test "it does not fetch a spoofed object uploaded on an instance as an attachment" do
|
||||||
assert {:error, _} =
|
assert {:fetch, {:error, {:content_type, "application/json"}}} =
|
||||||
Fetcher.fetch_object_from_id(
|
Fetcher.fetch_object_from_id(
|
||||||
"https://patch.cx/media/03ca3c8b4ac3ddd08bf0f84be7885f2f88de0f709112131a22d83650819e36c2.json"
|
"https://patch.cx/media/03ca3c8b4ac3ddd08bf0f84be7885f2f88de0f709112131a22d83650819e36c2.json"
|
||||||
)
|
)
|
||||||
|
|
|
@ -12,6 +12,7 @@ defmodule Pleroma.Workers.RemoteFetcherWorkerTest do
|
||||||
@deleted_object_two "https://deleted-410.example.com/"
|
@deleted_object_two "https://deleted-410.example.com/"
|
||||||
@unauthorized_object "https://unauthorized.example.com/"
|
@unauthorized_object "https://unauthorized.example.com/"
|
||||||
@depth_object "https://depth.example.com/"
|
@depth_object "https://depth.example.com/"
|
||||||
|
@content_type_object "https://bad_content_type.example.com/"
|
||||||
|
|
||||||
describe "RemoteFetcherWorker" do
|
describe "RemoteFetcherWorker" do
|
||||||
setup do
|
setup do
|
||||||
|
@ -35,34 +36,48 @@ defmodule Pleroma.Workers.RemoteFetcherWorkerTest do
|
||||||
%Tesla.Env{
|
%Tesla.Env{
|
||||||
status: 200
|
status: 200
|
||||||
}
|
}
|
||||||
|
|
||||||
|
%{method: :get, url: @content_type_object} ->
|
||||||
|
%Tesla.Env{
|
||||||
|
status: 200,
|
||||||
|
headers: [{"content-type", "application/json"}],
|
||||||
|
body: File.read!("test/fixtures/spoofed-object.json")
|
||||||
|
}
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not requeue a deleted object" do
|
test "does not retry jobs for a deleted object" do
|
||||||
assert {:cancel, _} =
|
[
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
%{"op" => "fetch_remote", "id" => @deleted_object_one},
|
||||||
args: %{"op" => "fetch_remote", "id" => @deleted_object_one}
|
%{"op" => "fetch_remote", "id" => @deleted_object_two}
|
||||||
})
|
]
|
||||||
|
|> Enum.each(fn job -> assert {:cancel, _} = perform_job(RemoteFetcherWorker, job) end)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "does not retry jobs for an unauthorized object" do
|
||||||
assert {:cancel, _} =
|
assert {:cancel, _} =
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
perform_job(RemoteFetcherWorker, %{
|
||||||
args: %{"op" => "fetch_remote", "id" => @deleted_object_two}
|
"op" => "fetch_remote",
|
||||||
|
"id" => @unauthorized_object
|
||||||
})
|
})
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not requeue an unauthorized object" do
|
test "does not retry jobs for an an object that exceeded depth" do
|
||||||
assert {:cancel, _} =
|
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
|
||||||
args: %{"op" => "fetch_remote", "id" => @unauthorized_object}
|
|
||||||
})
|
|
||||||
end
|
|
||||||
|
|
||||||
test "does not requeue an object that exceeded depth" do
|
|
||||||
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
||||||
|
|
||||||
assert {:cancel, _} =
|
assert {:cancel, _} =
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
perform_job(RemoteFetcherWorker, %{
|
||||||
args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1}
|
"op" => "fetch_remote",
|
||||||
|
"id" => @depth_object,
|
||||||
|
"depth" => 1
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
test "does not retry jobs for when object returns wrong content type" do
|
||||||
|
assert {:cancel, _} =
|
||||||
|
perform_job(RemoteFetcherWorker, %{
|
||||||
|
"op" => "fetch_remote",
|
||||||
|
"id" => @content_type_object
|
||||||
})
|
})
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue