Merge branch 'qdrant-search-2' into 'develop'

Search: Basic Qdrant/Ollama search

See merge request pleroma/pleroma!4109
This commit is contained in:
lain 2024-05-27 18:41:20 +00:00
commit 3316a7ab70
14 changed files with 572 additions and 0 deletions

View file

@ -0,0 +1 @@
Add Qdrant/OpenAI embedding search

View file

@ -933,6 +933,19 @@
config :pleroma, Pleroma.Uploaders.Uploader, timeout: 30_000
config :pleroma, Pleroma.Search.QdrantSearch,
qdrant_url: "http://127.0.0.1:6333/",
qdrant_api_key: "",
openai_url: "http://127.0.0.1:11345",
# The healthcheck url has to be set to nil when used with the real openai
# API, as it doesn't have a healthcheck endpoint.
openai_healthcheck_url: "http://127.0.0.1:11345/health",
openai_model: "snowflake/snowflake-arctic-embed-xs",
openai_api_key: "",
qdrant_index_configuration: %{
vectors: %{size: 384, distance: "Cosine"}
}
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"

View file

@ -10,6 +10,30 @@ To use built-in search that has no external dependencies, set the search module
While it has no external dependencies, it has problems with performance and relevancy.
## QdrantSearch
This uses the vector search engine [Qdrant](https://qdrant.tech) to search the posts in a vector space. This needs a way to generate embeddings and uses the [OpenAI API](https://platform.openai.com/docs/guides/embeddings/what-are-embeddings). This is implemented by several project besides OpenAI itself, including the python-based fastembed-server found in `supplemental/search/fastembed-api`.
The default settings will support a setup where both the fastembed server and Qdrant run on the same system as pleroma. To use it, set the search provider and run the fastembed server, see the README in `supplemental/search/fastembed-api`:
> config :pleroma, Pleroma.Search, module: Pleroma.Search.QdrantSearch
Then, start the Qdrant server, see [here](https://qdrant.tech/documentation/quick-start/) for instructions.
You will also need to create the Qdrant index once by running `mix pleroma.search.indexer create_index`. Running `mix pleroma.search.indexer index` will retroactively index the last 100_000 activities.
### Indexing and model options
To see the available configuration options, check out the QdrantSearch section in `config/config.exs`.
The default indexing option work for the default model (`snowflake-arctic-embed-xs`). To optimize for a low memory footprint, adjust the index configuration as described in the [Qdrant docs](https://qdrant.tech/documentation/guides/optimize/). See also [this blog post](https://qdrant.tech/articles/memory-consumption/) that goes into detail.
Different embedding models will need different vector size settings. You can see a list of the models supported by the fastembed server [here](https://qdrant.github.io/fastembed/examples/Supported_Models), including their vector dimensions. These vector dimensions need to be set in the `qdrant_index_configuration`.
E.g, If you want to use `sentence-transformers/all-MiniLM-L6-v2` as a model, you will not need to adjust things, because it and `snowflake-arctic-embed-xs` are both 384 dimensional models. If you want to use `snowflake/snowflake-arctic-embed-l`, you will need to adjust the `size` parameter in the `qdrant_index_configuration` to 1024, as it has a dimension of 1024.
When using a different model, you will need do drop the index and recreate it (`mix pleroma.search.indexer drop_index` and `mix pleroma.search.indexer create_index`), as the different embeddings are not compatible with each other.
## Meilisearch
Note that it's quite a bit more memory hungry than PostgreSQL (around 4-5G for ~1.2 million

View file

@ -0,0 +1,80 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Search.Indexer do
import Mix.Pleroma
import Ecto.Query
alias Pleroma.Workers.SearchIndexingWorker
def run(["create_index"]) do
start_pleroma()
with :ok <- Pleroma.Config.get([Pleroma.Search, :module]).create_index() do
IO.puts("Index created")
else
e -> IO.puts("Could not create index: #{inspect(e)}")
end
end
def run(["drop_index"]) do
start_pleroma()
with :ok <- Pleroma.Config.get([Pleroma.Search, :module]).drop_index() do
IO.puts("Index dropped")
else
e -> IO.puts("Could not drop index: #{inspect(e)}")
end
end
def run(["index" | options]) do
{options, [], []} =
OptionParser.parse(
options,
strict: [
limit: :integer
]
)
start_pleroma()
limit = Keyword.get(options, :limit, 100_000)
per_step = 1000
chunks = max(div(limit, per_step), 1)
1..chunks
|> Enum.each(fn step ->
q =
from(a in Pleroma.Activity,
limit: ^per_step,
offset: ^per_step * (^step - 1),
select: [:id],
order_by: [desc: :id]
)
{:ok, ids} =
Pleroma.Repo.transaction(fn ->
Pleroma.Repo.stream(q, timeout: :infinity)
|> Enum.map(fn a ->
a.id
end)
end)
IO.puts("Got #{length(ids)} activities, adding to indexer")
ids
|> Enum.chunk_every(100)
|> Enum.each(fn chunk ->
IO.puts("Adding #{length(chunk)} activities to indexing queue")
chunk
|> Enum.map(fn id ->
SearchIndexingWorker.new(%{"op" => "add_to_index", "activity" => id})
end)
|> Oban.insert_all()
end)
end)
end
end

View file

@ -48,6 +48,12 @@ def add_to_index(_activity), do: :ok
@impl true
def remove_from_index(_object), do: :ok
@impl true
def create_index, do: :ok
@impl true
def drop_index, do: :ok
@impl true
def healthcheck_endpoints, do: nil

View file

@ -10,6 +10,12 @@ defmodule Pleroma.Search.Meilisearch do
@behaviour Pleroma.Search.SearchBackend
@impl true
def create_index, do: :ok
@impl true
def drop_index, do: :ok
defp meili_headers do
private_key = Config.get([Pleroma.Search.Meilisearch, :private_key])

View file

@ -0,0 +1,182 @@
defmodule Pleroma.Search.QdrantSearch do
@behaviour Pleroma.Search.SearchBackend
import Ecto.Query
alias Pleroma.Activity
alias Pleroma.Config.Getting, as: Config
alias __MODULE__.OpenAIClient
alias __MODULE__.QdrantClient
import Pleroma.Search.Meilisearch, only: [object_to_search_data: 1]
import Pleroma.Search.DatabaseSearch, only: [maybe_fetch: 3]
@impl true
def create_index do
payload = Config.get([Pleroma.Search.QdrantSearch, :qdrant_index_configuration])
with {:ok, %{status: 200}} <- QdrantClient.put("/collections/posts", payload) do
:ok
else
e -> {:error, e}
end
end
@impl true
def drop_index do
with {:ok, %{status: 200}} <- QdrantClient.delete("/collections/posts") do
:ok
else
e -> {:error, e}
end
end
def get_embedding(text) do
with {:ok, %{body: %{"data" => [%{"embedding" => embedding}]}}} <-
OpenAIClient.post("/v1/embeddings", %{
input: text,
model: Config.get([Pleroma.Search.QdrantSearch, :openai_model])
}) do
{:ok, embedding}
else
_ ->
{:error, "Failed to get embedding"}
end
end
defp actor_from_activity(%{data: %{"actor" => actor}}) do
actor
end
defp actor_from_activity(_), do: nil
defp build_index_payload(activity, embedding) do
actor = actor_from_activity(activity)
published_at = activity.data["published"]
%{
points: [
%{
id: activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!(),
vector: embedding,
payload: %{actor: actor, published_at: published_at}
}
]
}
end
defp build_search_payload(embedding, options) do
base = %{
vector: embedding,
limit: options[:limit] || 20,
offset: options[:offset] || 0
}
if author = options[:author] do
Map.put(base, :filter, %{
must: [%{key: "actor", match: %{value: author.ap_id}}]
})
else
base
end
end
@impl true
def add_to_index(activity) do
# This will only index public or unlisted notes
maybe_search_data = object_to_search_data(activity.object)
if activity.data["type"] == "Create" and maybe_search_data do
with {:ok, embedding} <- get_embedding(maybe_search_data.content),
{:ok, %{status: 200}} <-
QdrantClient.put(
"/collections/posts/points",
build_index_payload(activity, embedding)
) do
:ok
else
e -> {:error, e}
end
else
:ok
end
end
@impl true
def remove_from_index(object) do
activity = Activity.get_by_object_ap_id_with_object(object.data["id"])
id = activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!()
with {:ok, %{status: 200}} <-
QdrantClient.post("/collections/posts/points/delete", %{"points" => [id]}) do
:ok
else
e -> {:error, e}
end
end
@impl true
def search(user, original_query, options) do
query = "Represent this sentence for searching relevant passages: #{original_query}"
with {:ok, embedding} <- get_embedding(query),
{:ok, %{body: %{"result" => result}}} <-
QdrantClient.post(
"/collections/posts/points/search",
build_search_payload(embedding, options)
) do
ids =
Enum.map(result, fn %{"id" => id} ->
Ecto.UUID.dump!(id)
end)
from(a in Activity, where: a.id in ^ids)
|> Activity.with_preloaded_object()
|> Activity.restrict_deactivated_users()
|> Ecto.Query.order_by([a], fragment("array_position(?, ?)", ^ids, a.id))
|> Pleroma.Repo.all()
|> maybe_fetch(user, original_query)
else
_ ->
[]
end
end
@impl true
def healthcheck_endpoints do
qdrant_health =
Config.get([Pleroma.Search.QdrantSearch, :qdrant_url])
|> URI.parse()
|> Map.put(:path, "/healthz")
|> URI.to_string()
openai_health = Config.get([Pleroma.Search.QdrantSearch, :openai_healthcheck_url])
[qdrant_health, openai_health] |> Enum.filter(& &1)
end
end
defmodule Pleroma.Search.QdrantSearch.OpenAIClient do
use Tesla
alias Pleroma.Config.Getting, as: Config
plug(Tesla.Middleware.BaseUrl, Config.get([Pleroma.Search.QdrantSearch, :openai_url]))
plug(Tesla.Middleware.JSON)
plug(Tesla.Middleware.Headers, [
{"Authorization",
"Bearer #{Pleroma.Config.get([Pleroma.Search.QdrantSearch, :openai_api_key])}"}
])
end
defmodule Pleroma.Search.QdrantSearch.QdrantClient do
use Tesla
alias Pleroma.Config.Getting, as: Config
plug(Tesla.Middleware.BaseUrl, Config.get([Pleroma.Search.QdrantSearch, :qdrant_url]))
plug(Tesla.Middleware.JSON)
plug(Tesla.Middleware.Headers, [
{"api-key", Pleroma.Config.get([Pleroma.Search.QdrantSearch, :qdrant_api_key])}
])
end

View file

@ -22,6 +22,16 @@ defmodule Pleroma.Search.SearchBackend do
"""
@callback remove_from_index(object :: Pleroma.Object.t()) :: :ok | {:error, any()}
@doc """
Create the index
"""
@callback create_index() :: :ok | {:error, any()}
@doc """
Drop the index
"""
@callback drop_index() :: :ok | {:error, any()}
@doc """
Healthcheck endpoints of search backend infrastructure to monitor for controlling
processing of jobs in the Oban queue.

View file

@ -0,0 +1,9 @@
FROM python:3.9
WORKDIR /code
COPY fastembed-server.py /workdir/fastembed-server.py
COPY requirements.txt /workdir/requirements.txt
RUN pip install -r /workdir/requirements.txt
CMD ["python", "/workdir/fastembed-server.py"]

View file

@ -0,0 +1,6 @@
# About
This is a minimal implementation of the [OpenAI Embeddings API](https://platform.openai.com/docs/guides/embeddings/what-are-embeddings) meant to be used with the QdrantSearch backend.
# Usage
The easiest way to run it is to just use docker compose with `docker compose up`. This starts the server on the default configured port. Different models can be used, for a full list of supported models, check the [fastembed documentation](https://qdrant.github.io/fastembed/examples/Supported_Models/). The first time a model is requested it will be downloaded, which can take a few seconds.

View file

@ -0,0 +1,5 @@
services:
web:
build: .
ports:
- "11345:11345"

View file

@ -0,0 +1,27 @@
from fastembed import TextEmbedding
from fastapi import FastAPI
from pydantic import BaseModel
models = {}
app = FastAPI()
class EmbeddingRequest(BaseModel):
model: str
input: str
@app.post("/v1/embeddings")
def embeddings(request: EmbeddingRequest):
model = models.get(request.model) or TextEmbedding(request.model)
models[request.model] = model
embeddings = next(model.embed(request.input)).tolist()
return {"data": [{"embedding": embeddings}]}
@app.get("/health")
def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=11345)

View file

@ -0,0 +1,4 @@
fastapi==0.111.0
fastembed==0.2.7
pydantic==1.10.15
uvicorn==0.29.0

View file

@ -0,0 +1,199 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.QdrantSearchTest do
use Pleroma.DataCase, async: true
use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory
import Mox
alias Pleroma.Search.QdrantSearch
alias Pleroma.UnstubbedConfigMock, as: Config
alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.SearchIndexingWorker
describe "Qdrant search" do
test "returns the correct healthcheck endpoints" do
# No openai healthcheck URL
Config
|> expect(:get, 2, fn
[Pleroma.Search.QdrantSearch, key], nil ->
%{qdrant_url: "https://qdrant.url"}[key]
end)
[health_endpoint] = QdrantSearch.healthcheck_endpoints()
assert "https://qdrant.url/healthz" == health_endpoint
# Set openai healthcheck URL
Config
|> expect(:get, 2, fn
[Pleroma.Search.QdrantSearch, key], nil ->
%{qdrant_url: "https://qdrant.url", openai_healthcheck_url: "https://openai.url/health"}[
key
]
end)
[_, health_endpoint] = QdrantSearch.healthcheck_endpoints()
assert "https://openai.url/health" == health_endpoint
end
test "searches for a term by encoding it and sending it to qdrant" do
user = insert(:user)
{:ok, activity} =
CommonAPI.post(user, %{
status: "guys i just don't wanna leave the swamp",
visibility: "public"
})
Config
|> expect(:get, 3, fn
[Pleroma.Search, :module], nil ->
QdrantSearch
[Pleroma.Search.QdrantSearch, key], nil ->
%{
openai_model: "a_model",
openai_url: "https://openai.url",
qdrant_url: "https://qdrant.url"
}[key]
end)
Tesla.Mock.mock(fn
%{url: "https://openai.url/v1/embeddings", method: :post} ->
Tesla.Mock.json(%{
data: [%{embedding: [1, 2, 3]}]
})
%{url: "https://qdrant.url/collections/posts/points/search", method: :post, body: body} ->
data = Jason.decode!(body)
refute data["filter"]
Tesla.Mock.json(%{
result: [%{"id" => activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!()}]
})
end)
results = QdrantSearch.search(nil, "guys i just don't wanna leave the swamp", %{})
assert results == [activity]
end
test "for a given actor, ask for only relevant matches" do
user = insert(:user)
{:ok, activity} =
CommonAPI.post(user, %{
status: "guys i just don't wanna leave the swamp",
visibility: "public"
})
Config
|> expect(:get, 3, fn
[Pleroma.Search, :module], nil ->
QdrantSearch
[Pleroma.Search.QdrantSearch, key], nil ->
%{
openai_model: "a_model",
openai_url: "https://openai.url",
qdrant_url: "https://qdrant.url"
}[key]
end)
Tesla.Mock.mock(fn
%{url: "https://openai.url/v1/embeddings", method: :post} ->
Tesla.Mock.json(%{
data: [%{embedding: [1, 2, 3]}]
})
%{url: "https://qdrant.url/collections/posts/points/search", method: :post, body: body} ->
data = Jason.decode!(body)
assert data["filter"] == %{
"must" => [%{"key" => "actor", "match" => %{"value" => user.ap_id}}]
}
Tesla.Mock.json(%{
result: [%{"id" => activity.id |> FlakeId.from_string() |> Ecto.UUID.cast!()}]
})
end)
results =
QdrantSearch.search(nil, "guys i just don't wanna leave the swamp", %{author: user})
assert results == [activity]
end
test "indexes a public post on creation, deletes from the index on deletion" do
user = insert(:user)
Tesla.Mock.mock(fn
%{method: :post, url: "https://openai.url/v1/embeddings"} ->
send(self(), "posted_to_openai")
Tesla.Mock.json(%{
data: [%{embedding: [1, 2, 3]}]
})
%{method: :put, url: "https://qdrant.url/collections/posts/points", body: body} ->
send(self(), "posted_to_qdrant")
data = Jason.decode!(body)
%{"points" => [%{"vector" => vector, "payload" => payload}]} = data
assert vector == [1, 2, 3]
assert payload["actor"]
assert payload["published_at"]
Tesla.Mock.json("ok")
%{method: :post, url: "https://qdrant.url/collections/posts/points/delete"} ->
send(self(), "deleted_from_qdrant")
Tesla.Mock.json("ok")
end)
Config
|> expect(:get, 6, fn
[Pleroma.Search, :module], nil ->
QdrantSearch
[Pleroma.Search.QdrantSearch, key], nil ->
%{
openai_model: "a_model",
openai_url: "https://openai.url",
qdrant_url: "https://qdrant.url"
}[key]
end)
{:ok, activity} =
CommonAPI.post(user, %{
status: "guys i just don't wanna leave the swamp",
visibility: "public"
})
args = %{"op" => "add_to_index", "activity" => activity.id}
assert_enqueued(
worker: SearchIndexingWorker,
args: args
)
assert :ok = perform_job(SearchIndexingWorker, args)
assert_received("posted_to_openai")
assert_received("posted_to_qdrant")
{:ok, _} = CommonAPI.delete(activity.id, user)
delete_args = %{"op" => "remove_from_index", "object" => activity.object.id}
assert_enqueued(worker: SearchIndexingWorker, args: delete_args)
assert :ok = perform_job(SearchIndexingWorker, delete_args)
assert_received("deleted_from_qdrant")
end
end
end