class QdrantHandler:
"""Универсальный обработчик Qdrant коллекций для текста или изображений."""
def __init__(
self, url: str, collection_name: str, vector_size: int
) -> None:
"""Initialize QdrantHandler instance.
Args:
url (str): URL сервера Qdrant.
collection_name (str): Имя коллекции.
vector_size (int): Размерность векторов.
"""
self.client = QdrantClient(url=url)
self.collection_name = collection_name
self.vector_size = vector_size
def create_collection(self) -> None:
"""Создает коллекцию, если она не существует."""
try:
self.client.get_collection(self.collection_name)
logger.info(f'Коллекция {self.collection_name} уже существует')
except Exception:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.vector_size, distance=Distance.COSINE
),
)
logger.info(f'Коллекция {self.collection_name} успешно создана')
def enrich_with_data(
self,
folder: str = Config.data_folder,
embed_type: str = 'text',
user_id: str | None = None,
) -> None:
"""Load all data from a folder into the current Qdrant collection.
Args:
folder (str): Path to the folder with documents/images.
embed_type (str): 'text' or 'image'.
user_id (str | None, optional): Store user id in payload. Defaults to None.
"""
self.create_collection()
folder_path = Path(folder)
if not folder_path.exists():
logger.warning(
f'Folder {folder} does not exist. Skipping data enrichment.'
)
return
logger.info(
f'Start uploading {embed_type} data from {folder} to collection {self.collection_name}...'
)
self.load_folder_to_qdrant(
folder=folder_path,
collection_name=self.collection_name,
embed_type=embed_type,
user_id=user_id,
)
logger.info(
f'✅ Data enrichment completed for collection {self.collection_name}.'
)
def add_points(self, points: List[Dict[str, Any]]) -> None:
"""Добавляет точки в коллекцию.
Args:
points (List[Dict[str, Any]]): Список точек вида {"id": str, "vector": List[float], "payload": {...}}
"""
self.client.upsert(
collection_name=self.collection_name,
points=[PointStruct(**p) for p in points],
)
def search(
self,
query_vector: List[float],
top_k: int = 5,
user_id: str | None = None,
folder_scopes: list[str] | None = None,
file_ids: list[str] | None = None,
) -> List[Dict[str, Any]]:
"""Search nearest points by query vector.
Note:
This method only performs retrieval and does not trigger indexing.
Data ingestion should be done explicitly via ``enrich_with_data``.
"""
self.create_collection()
must_conditions: list[FieldCondition] = []
if user_id:
must_conditions.append(
FieldCondition(key='user_id', match=MatchValue(value=user_id))
)
if folder_scopes:
must_conditions.append(
FieldCondition(
key='folder_scope',
match=MatchAny(
any=[str(scope) for scope in folder_scopes]
),
)
)
if file_ids:
must_conditions.append(
FieldCondition(
key='file_id',
match=MatchAny(any=[str(file_id) for file_id in file_ids]),
)
)
query_filter = (
Filter(must=must_conditions) if must_conditions else None
)
hits = self.client.query_points(
collection_name=self.collection_name,
query=query_vector,
limit=top_k,
with_payload=True,
score_threshold=Config.score_threshold,
query_filter=query_filter,
).points
results = []
for hit in hits:
results.append(
{'id': hit.id, 'score': hit.score, 'payload': hit.payload}
)
return results
def load_folder_to_qdrant(
self,
folder: Path,
collection_name: str,
embed_type: str = 'text', # "text" or "image"
user_id: str | None = None,
) -> None:
"""Load all documents or images from folder into Qdrant."""
all_files = list(folder.glob('*'))
for file_path in all_files:
if (
embed_type == 'text'
and file_path.suffix.lower()
in Config.text_extensions + Config.pdf_extensions
):
loader = DataLoader()
chunks = loader.process_file(file_path)
for chunk_idx, chunk in enumerate(chunks):
vector = text_embedding(chunk)
self.client.upsert(
collection_name=collection_name,
points=[
{
'id': str(
uuid.uuid5(
uuid.NAMESPACE_URL,
f'{file_path}:{chunk_idx}',
)
),
'vector': vector,
'payload': {
'text': chunk,
'source': str(file_path),
**(
{'user_id': user_id} if user_id else {}
),
},
}
],
)
if chunks:
logger.info(
f'✅ Uploaded {len(chunks)} text chunks from {file_path.name}'
)
else:
logger.info(f'⚠ Skipped empty text file {file_path.name}')
elif (
embed_type == 'image'
and file_path.suffix.lower() in Config.image_extensions
):
vector = image_embedding_from_path(str(file_path))
self.client.upsert(
collection_name=collection_name,
points=[
{
'id': str(
uuid.uuid5(
uuid.NAMESPACE_URL,
f'{file_path}:image',
)
),
'vector': vector,
'payload': {
'source': str(file_path),
**({'user_id': user_id} if user_id else {}),
},
}
],
)
logger.info(f'✅ Uploaded image {file_path.name}')
elif (
embed_type == 'video'
and file_path.suffix.lower() in Config.video_extensions
):
vector = video_embedding_from_path(str(file_path))
self.client.upsert(
collection_name=collection_name,
points=[
{
'id': str(
uuid.uuid5(
uuid.NAMESPACE_URL,
f'{file_path}:video',
)
),
'vector': vector,
'payload': {
'source': str(file_path),
'modality': 'video',
**({'user_id': user_id} if user_id else {}),
},
}
],
)
logger.info(f'✅ Uploaded video {file_path.name}')