moved ActivityExpiration to Oban Periodic jobs
This commit is contained in:
parent
6f202a401b
commit
c5766a8100
7 changed files with 66 additions and 103 deletions
|
@ -509,7 +509,8 @@
|
||||||
crontab: [
|
crontab: [
|
||||||
{"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker},
|
{"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker},
|
||||||
{"0 * * * *", Pleroma.Workers.Cron.StatsWorker},
|
{"0 * * * *", Pleroma.Workers.Cron.StatsWorker},
|
||||||
{"* * * * *", Pleroma.Workers.Cron.ScheduledActivityWorker}
|
{"* * * * *", Pleroma.Workers.Cron.ScheduledActivityWorker},
|
||||||
|
{"* * * * *", Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker}
|
||||||
]
|
]
|
||||||
|
|
||||||
config :pleroma, :workers,
|
config :pleroma, :workers,
|
||||||
|
|
|
@ -35,7 +35,6 @@ def start(_type, _args) do
|
||||||
Pleroma.Config.TransferTask,
|
Pleroma.Config.TransferTask,
|
||||||
Pleroma.Emoji,
|
Pleroma.Emoji,
|
||||||
Pleroma.Captcha,
|
Pleroma.Captcha,
|
||||||
Pleroma.Daemons.ActivityExpirationDaemon,
|
|
||||||
Pleroma.Plugs.RateLimiter.Supervisor
|
Pleroma.Plugs.RateLimiter.Supervisor
|
||||||
] ++
|
] ++
|
||||||
cachex_children() ++
|
cachex_children() ++
|
||||||
|
|
|
@ -1,66 +0,0 @@
|
||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2019 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.Daemons.ActivityExpirationDaemon do
|
|
||||||
alias Pleroma.Activity
|
|
||||||
alias Pleroma.ActivityExpiration
|
|
||||||
alias Pleroma.Config
|
|
||||||
alias Pleroma.Repo
|
|
||||||
alias Pleroma.User
|
|
||||||
alias Pleroma.Web.CommonAPI
|
|
||||||
|
|
||||||
require Logger
|
|
||||||
use GenServer
|
|
||||||
import Ecto.Query
|
|
||||||
|
|
||||||
@schedule_interval :timer.minutes(1)
|
|
||||||
|
|
||||||
def start_link(_) do
|
|
||||||
GenServer.start_link(__MODULE__, nil)
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def init(_) do
|
|
||||||
if Config.get([ActivityExpiration, :enabled]) do
|
|
||||||
schedule_next()
|
|
||||||
{:ok, nil}
|
|
||||||
else
|
|
||||||
:ignore
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def perform(:execute, expiration_id) do
|
|
||||||
try do
|
|
||||||
expiration =
|
|
||||||
ActivityExpiration
|
|
||||||
|> where([e], e.id == ^expiration_id)
|
|
||||||
|> Repo.one!()
|
|
||||||
|
|
||||||
activity = Activity.get_by_id_with_object(expiration.activity_id)
|
|
||||||
user = User.get_by_ap_id(activity.object.data["actor"])
|
|
||||||
CommonAPI.delete(activity.id, user)
|
|
||||||
rescue
|
|
||||||
error ->
|
|
||||||
Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_info(:perform, state) do
|
|
||||||
ActivityExpiration.due_expirations(@schedule_interval)
|
|
||||||
|> Enum.each(fn expiration ->
|
|
||||||
Pleroma.Workers.ActivityExpirationWorker.enqueue(
|
|
||||||
"activity_expiration",
|
|
||||||
%{"activity_expiration_id" => expiration.id}
|
|
||||||
)
|
|
||||||
end)
|
|
||||||
|
|
||||||
schedule_next()
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
defp schedule_next do
|
|
||||||
Process.send_after(self(), :perform, @schedule_interval)
|
|
||||||
end
|
|
||||||
end
|
|
|
@ -1,18 +0,0 @@
|
||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.Workers.ActivityExpirationWorker do
|
|
||||||
use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
|
|
||||||
|
|
||||||
@impl Oban.Worker
|
|
||||||
def perform(
|
|
||||||
%{
|
|
||||||
"op" => "activity_expiration",
|
|
||||||
"activity_expiration_id" => activity_expiration_id
|
|
||||||
},
|
|
||||||
_job
|
|
||||||
) do
|
|
||||||
Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
|
|
||||||
end
|
|
||||||
end
|
|
39
lib/pleroma/workers/cron/purge_expired_activities_worker.ex
Normal file
39
lib/pleroma/workers/cron/purge_expired_activities_worker.ex
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
|
||||||
|
@moduledoc """
|
||||||
|
The worker to purge expired activities.
|
||||||
|
"""
|
||||||
|
|
||||||
|
use Oban.Worker, queue: "activity_expiration"
|
||||||
|
|
||||||
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.ActivityExpiration
|
||||||
|
alias Pleroma.Config
|
||||||
|
alias Pleroma.User
|
||||||
|
alias Pleroma.Web.CommonAPI
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
@interval :timer.minutes(1)
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(_opts, _job) do
|
||||||
|
if Config.get([ActivityExpiration, :enabled]) do
|
||||||
|
Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def delete_activity(expiration) do
|
||||||
|
try do
|
||||||
|
activity = Activity.get_by_id_with_object(expiration.activity_id)
|
||||||
|
user = User.get_by_ap_id(activity.object.data["actor"])
|
||||||
|
CommonAPI.delete(activity.id, user)
|
||||||
|
rescue
|
||||||
|
error ->
|
||||||
|
Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -7,6 +7,8 @@ defmodule Pleroma.ActivityExpirationTest do
|
||||||
alias Pleroma.ActivityExpiration
|
alias Pleroma.ActivityExpiration
|
||||||
import Pleroma.Factory
|
import Pleroma.Factory
|
||||||
|
|
||||||
|
clear_config([ActivityExpiration, :enabled])
|
||||||
|
|
||||||
test "finds activities due to be deleted only" do
|
test "finds activities due to be deleted only" do
|
||||||
activity = insert(:note_activity)
|
activity = insert(:note_activity)
|
||||||
expiration_due = insert(:expiration_in_the_past, %{activity_id: activity.id})
|
expiration_due = insert(:expiration_in_the_past, %{activity_id: activity.id})
|
||||||
|
@ -24,4 +26,27 @@ test "denies expirations that don't live long enough" do
|
||||||
now = NaiveDateTime.utc_now()
|
now = NaiveDateTime.utc_now()
|
||||||
assert {:error, _} = ActivityExpiration.create(activity, now)
|
assert {:error, _} = ActivityExpiration.create(activity, now)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "deletes an expiration activity" do
|
||||||
|
Pleroma.Config.put([ActivityExpiration, :enabled], true)
|
||||||
|
activity = insert(:note_activity)
|
||||||
|
|
||||||
|
naive_datetime =
|
||||||
|
NaiveDateTime.add(
|
||||||
|
NaiveDateTime.utc_now(),
|
||||||
|
-:timer.minutes(2),
|
||||||
|
:millisecond
|
||||||
|
)
|
||||||
|
|
||||||
|
expiration =
|
||||||
|
insert(
|
||||||
|
:expiration_in_the_past,
|
||||||
|
%{activity_id: activity.id, scheduled_at: naive_datetime}
|
||||||
|
)
|
||||||
|
|
||||||
|
Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker.perform(:ops, :pid)
|
||||||
|
|
||||||
|
refute Pleroma.Repo.get(Pleroma.Activity, activity.id)
|
||||||
|
refute Pleroma.Repo.get(Pleroma.ActivityExpiration, expiration.id)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,17 +0,0 @@
|
||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.ActivityExpirationWorkerTest do
|
|
||||||
use Pleroma.DataCase
|
|
||||||
alias Pleroma.Activity
|
|
||||||
import Pleroma.Factory
|
|
||||||
|
|
||||||
test "deletes an activity" do
|
|
||||||
activity = insert(:note_activity)
|
|
||||||
expiration = insert(:expiration_in_the_past, %{activity_id: activity.id})
|
|
||||||
Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, expiration.id)
|
|
||||||
|
|
||||||
refute Repo.get(Activity, activity.id)
|
|
||||||
end
|
|
||||||
end
|
|
Loading…
Reference in a new issue