Initial meilisearch implementation, doesn't delete posts yet

This commit is contained in:
Ekaterina Vaartis 2021-08-15 21:53:04 +03:00
parent 8042e0ebe1
commit e154ebbf79
9 changed files with 123 additions and 6 deletions

View file

@ -866,9 +866,14 @@
config :pleroma, ConcurrentLimiter, [ config :pleroma, ConcurrentLimiter, [
{Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]}, {Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]},
{Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy, [max_running: 5, max_waiting: 5]} {Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy, [max_running: 5, max_waiting: 5]},
{Pleroma.Search, [max_running: 20, max_waiting: 50]}
] ]
config :pleroma, Pleroma.Search, module: Pleroma.Activity.Search
config :pleroma, Pleroma.Search.Meilisearch, url: "http://127.0.0.1:7700/"
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above. # of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs" import_config "#{Mix.env()}.exs"

View file

@ -133,6 +133,8 @@
ap_streamer: Pleroma.Web.ActivityPub.ActivityPubMock, ap_streamer: Pleroma.Web.ActivityPub.ActivityPubMock,
logger: Pleroma.LoggerMock logger: Pleroma.LoggerMock
config :pleroma, Pleroma.Search, module: Pleroma.Activity.Search
# Reduce recompilation time # Reduce recompilation time
# https://dashbit.co/blog/speeding-up-re-compilation-of-elixir-projects # https://dashbit.co/blog/speeding-up-re-compilation-of-elixir-projects
config :phoenix, :plug_init_mode, :runtime config :phoenix, :plug_init_mode, :runtime

View file

@ -0,0 +1,38 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
import Mix.Pleroma
import Ecto.Query
def run(["index"]) do
start_pleroma()
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
Pleroma.Repo.chunk_stream(
from(Pleroma.Object,
limit: 200,
where: fragment("data->>'type' = 'Note'") and fragment("LENGTH(data->>'source') > 0")
),
100,
:batches
)
|> Stream.map(fn objects ->
Enum.map(objects, fn object ->
data = object.data
%{id: object.id, source: data["source"], ap: data["id"]}
end)
end)
|> Stream.each(fn activities ->
{:ok, _} =
Pleroma.HTTP.post(
"#{endpoint}/indexes/objects/documents",
Jason.encode!(activities)
)
end)
|> Stream.run()
end
end

View file

@ -367,6 +367,7 @@ def restrict_deactivated_users(query) do
end end
defdelegate search(user, query, options \\ []), to: Pleroma.Activity.Search defdelegate search(user, query, options \\ []), to: Pleroma.Activity.Search
def add_to_index(_activity), do: nil
def direct_conversation_id(activity, for_user) do def direct_conversation_id(activity, for_user) do
alias Pleroma.Conversation.Participation alias Pleroma.Conversation.Participation

View file

@ -136,7 +136,7 @@ defp query_with(q, :rum, search_query, :websearch) do
) )
end end
defp maybe_restrict_local(q, user) do def maybe_restrict_local(q, user) do
limit = Pleroma.Config.get([:instance, :limit_to_local_content], :unauthenticated) limit = Pleroma.Config.get([:instance, :limit_to_local_content], :unauthenticated)
case {limit, user} do case {limit, user} do
@ -149,7 +149,7 @@ defp maybe_restrict_local(q, user) do
defp restrict_local(q), do: where(q, local: true) defp restrict_local(q), do: where(q, local: true)
defp maybe_fetch(activities, user, search_query) do def maybe_fetch(activities, user, search_query) do
with true <- Regex.match?(~r/https?:/, search_query), with true <- Regex.match?(~r/https?:/, search_query),
{:ok, object} <- Fetcher.fetch_object_from_id(search_query), {:ok, object} <- Fetcher.fetch_object_from_id(search_query),
%Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]),

View file

@ -321,7 +321,11 @@ defp http_children(_, _), do: []
def limiters_setup do def limiters_setup do
config = Config.get(ConcurrentLimiter, []) config = Config.get(ConcurrentLimiter, [])
[Pleroma.Web.RichMedia.Helpers, Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy] [
Pleroma.Web.RichMedia.Helpers,
Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy,
Pleroma.Search
]
|> Enum.each(fn module -> |> Enum.each(fn module ->
mod_config = Keyword.get(config, module, []) mod_config = Keyword.get(config, module, [])

View file

@ -0,0 +1,60 @@
defmodule Pleroma.Search.Meilisearch do
require Logger
alias Pleroma.Activity
import Pleroma.Activity.Search
import Ecto.Query
def search(user, query, options \\ []) do
limit = Enum.min([Keyword.get(options, :limit), 40])
offset = Keyword.get(options, :offset, 0)
author = Keyword.get(options, :author)
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
{:ok, result} =
Pleroma.HTTP.post(
"#{endpoint}/indexes/objects/search",
Jason.encode!(%{q: query, offset: offset, limit: limit})
)
hits = Jason.decode!(result.body)["hits"] |> Enum.map(& &1["ap"])
try do
hits
|> Activity.create_by_object_ap_id()
|> Activity.with_preloaded_object()
|> Activity.with_preloaded_object()
|> Activity.restrict_deactivated_users()
|> maybe_restrict_local(user)
|> maybe_restrict_author(author)
|> maybe_restrict_blocked(user)
|> maybe_fetch(user, query)
|> order_by([activity], desc: activity.id)
|> Pleroma.Repo.all()
rescue
_ -> maybe_fetch([], user, query)
end
end
def add_to_index(activity) do
object = activity.object
if activity.data["type"] == "Create" and not is_nil(object) and object.data["type"] == "Note" do
data = object.data
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
{:ok, result} =
Pleroma.HTTP.post(
"#{endpoint}/indexes/objects/documents",
Jason.encode!([%{id: object.id, source: data["source"], ap: data["id"]}])
)
if not Map.has_key?(Jason.decode!(result.body), "updateId") do
Logger.error("Failed to add activity #{activity.id} to index: #{result.body}")
end
end
end
end

View file

@ -140,6 +140,12 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end) Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
end) end)
search_module = Pleroma.Config.get([Pleroma.Search, :module])
ConcurrentLimiter.limit(Pleroma.Search, fn ->
Task.start(fn -> search_module.add_to_index(activity) end)
end)
{:ok, activity} {:ok, activity}
else else
%Activity{} = activity -> %Activity{} = activity ->

View file

@ -5,7 +5,6 @@
defmodule Pleroma.Web.MastodonAPI.SearchController do defmodule Pleroma.Web.MastodonAPI.SearchController do
use Pleroma.Web, :controller use Pleroma.Web, :controller
alias Pleroma.Activity
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ControllerHelper alias Pleroma.Web.ControllerHelper
@ -100,7 +99,9 @@ defp resource_search(_, "accounts", query, options) do
end end
defp resource_search(_, "statuses", query, options) do defp resource_search(_, "statuses", query, options) do
statuses = with_fallback(fn -> Activity.search(options[:for_user], query, options) end) search_module = Pleroma.Config.get([Pleroma.Search, :module], Pleroma.Activity)
statuses = with_fallback(fn -> search_module.search(options[:for_user], query, options) end)
StatusView.render("index.json", StatusView.render("index.json",
activities: statuses, activities: statuses,