Перейти к содержанию

Utils

backend/utils/config_handler.py

Application settings loaded from backend_config.yaml.

Source code in backend/utils/config_handler.py
class Config:
    """Application settings loaded from backend_config.yaml."""

    qdrant_text_collection: str = _config['qdrant']['text_collection']
    qdrant_image_collection: str = _config['qdrant']['image_collection']
    qdrant_video_collection: str = _config['qdrant']['video_collection']
    qdrant_url: str = os.getenv('QDRANT_URL', 'http://localhost:6333')
    text_vector_size: int = _config['qdrant']['text_vector_size']
    image_vector_size: int = _config['qdrant']['image_vector_size']
    video_vector_size: int = _config['qdrant']['video_vector_size']
    score_threshold: float = _config['qdrant']['score_threshold']

    data_folder: str = _config['data']['data_folder']
    chunk_size: int = _config['data']['chunk_size']
    chunk_overlap: int = _config['data']['chunk_overlap']
    text_extensions: list = _config['data']['supported_text_extensions']
    pdf_extensions: list = _config['data']['supported_pdf_extensions']
    image_extensions: list = _config['data']['supported_image_extensions']
    video_extensions: list = _config['data']['supported_video_extensions']

    llm_model_name: str = _config['llm']['model_name']
    llm_max_new_tokens: int = _config['llm']['max_new_tokens']
    llm_available_models: list[str] = [
        model.strip()
        for model in os.getenv('LLM_AVAILABLE_MODELS', llm_model_name).split(
            ','
        )
        if model.strip()
    ]

    default_embedding_provider: str = _config['embeddings']['default_provider']
    embedding_video_sample_fps: float = _config['embeddings'][
        'video_sample_fps'
    ]
    embedding_providers: dict = _config['embeddings']['providers']

    log_dir: str = _config['logging']['log_dir']
    log_file: str = _config['logging']['log_file']
    max_bytes: int = _config['logging']['max_bytes']
    backup_count: int = _config['logging']['backup_count']

    supabase_url: str | None = os.getenv('SUPABASE_URL')
    supabase_anon_key: str | None = os.getenv('SUPABASE_ANON_KEY')
    supabase_service_role_key: str | None = os.getenv(
        'SUPABASE_SERVICE_ROLE_KEY'
    )

backend/utils/load_data.py

Unified loader for text documents and images with optional Qdrant upsert.

Source code in backend/utils/load_data.py
class DataLoader:
    """Unified loader for text documents and images with optional Qdrant upsert."""

    @staticmethod
    def chunk_text(
        text: str,
        size: int = Config.chunk_size,
        overlap: int = Config.chunk_overlap,
    ) -> List[str]:
        """Split text into overlapping chunks."""
        if size <= 0:
            raise ValueError('chunk_size must be greater than 0')
        if overlap < 0:
            raise ValueError('chunk_overlap must be non-negative')
        if overlap >= size:
            raise ValueError('chunk_overlap must be smaller than chunk_size')

        chunks = []
        start = 0
        while start < len(text):
            end = start + size
            chunks.append(text[start:end])
            start += size - overlap
        return chunks

    @staticmethod
    def load_file(file_path: Path) -> str:
        """Load text from a single file (PDF, TXT, MD)."""
        text_parts = []
        if file_path.suffix.lower() in Config.pdf_extensions:
            from langchain_community.document_loaders import PyPDFLoader

            loader = PyPDFLoader(str(file_path))
            docs = loader.load()
            text_parts.extend([doc.page_content for doc in docs])
        elif file_path.suffix.lower() in Config.text_extensions:
            from langchain_community.document_loaders import TextLoader

            loader = TextLoader(str(file_path))
            docs = loader.load()
            text_parts.extend([doc.page_content for doc in docs])
        else:
            logger.info(f'Skipped unsupported file {file_path.name}')
        return '\n'.join(text_parts)

    @staticmethod
    def list_images(folder: Path) -> List[Path]:
        """List all image paths in a folder."""
        return [
            p
            for p in folder.glob('*')
            if p.suffix.lower() in Config.image_extensions
        ]

    def process_file(self, file_path: Path) -> List[str]:
        """Process a single text document into chunks."""
        text = self.load_file(file_path)
        if not text:
            return []
        return self.chunk_text(text)

backend/utils/qdrant_handler.py

Универсальный обработчик Qdrant коллекций для текста или изображений.

Source code in backend/utils/qdrant_handler.py
class QdrantHandler:
    """Универсальный обработчик Qdrant коллекций для текста или изображений."""

    def __init__(
        self, url: str, collection_name: str, vector_size: int
    ) -> None:
        """Initialize QdrantHandler instance.

        Args:
        url (str): URL сервера Qdrant.
        collection_name (str): Имя коллекции.
        vector_size (int): Размерность векторов.
        """
        self.client = QdrantClient(url=url)
        self.collection_name = collection_name
        self.vector_size = vector_size

    def create_collection(self) -> None:
        """Создает коллекцию, если она не существует."""
        try:
            self.client.get_collection(self.collection_name)
            logger.info(f'Коллекция {self.collection_name} уже существует')
        except Exception:
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(
                    size=self.vector_size, distance=Distance.COSINE
                ),
            )
            logger.info(f'Коллекция {self.collection_name} успешно создана')

    def enrich_with_data(
        self,
        folder: str = Config.data_folder,
        embed_type: str = 'text',
        user_id: str | None = None,
    ) -> None:
        """Load all data from a folder into the current Qdrant collection.

        Args:
            folder (str): Path to the folder with documents/images.
            embed_type (str): 'text' or 'image'.
            user_id (str | None, optional): Store user id in payload. Defaults to None.
        """
        self.create_collection()

        folder_path = Path(folder)
        if not folder_path.exists():
            logger.warning(
                f'Folder {folder} does not exist. Skipping data enrichment.'
            )
            return

        logger.info(
            f'Start uploading {embed_type} data from {folder} to collection {self.collection_name}...'
        )
        self.load_folder_to_qdrant(
            folder=folder_path,
            collection_name=self.collection_name,
            embed_type=embed_type,
            user_id=user_id,
        )
        logger.info(
            f'✅ Data enrichment completed for collection {self.collection_name}.'
        )

    def add_points(self, points: List[Dict[str, Any]]) -> None:
        """Добавляет точки в коллекцию.

        Args:
            points (List[Dict[str, Any]]): Список точек вида {"id": str, "vector": List[float], "payload": {...}}
        """
        self.client.upsert(
            collection_name=self.collection_name,
            points=[PointStruct(**p) for p in points],
        )

    def search(
        self,
        query_vector: List[float],
        top_k: int = 5,
        user_id: str | None = None,
        folder_scopes: list[str] | None = None,
        file_ids: list[str] | None = None,
    ) -> List[Dict[str, Any]]:
        """Search nearest points by query vector.

        Note:
            This method only performs retrieval and does not trigger indexing.
            Data ingestion should be done explicitly via ``enrich_with_data``.
        """
        self.create_collection()

        must_conditions: list[FieldCondition] = []
        if user_id:
            must_conditions.append(
                FieldCondition(key='user_id', match=MatchValue(value=user_id))
            )
        if folder_scopes:
            must_conditions.append(
                FieldCondition(
                    key='folder_scope',
                    match=MatchAny(
                        any=[str(scope) for scope in folder_scopes]
                    ),
                )
            )
        if file_ids:
            must_conditions.append(
                FieldCondition(
                    key='file_id',
                    match=MatchAny(any=[str(file_id) for file_id in file_ids]),
                )
            )

        query_filter = (
            Filter(must=must_conditions) if must_conditions else None
        )

        hits = self.client.query_points(
            collection_name=self.collection_name,
            query=query_vector,
            limit=top_k,
            with_payload=True,
            score_threshold=Config.score_threshold,
            query_filter=query_filter,
        ).points

        results = []
        for hit in hits:
            results.append(
                {'id': hit.id, 'score': hit.score, 'payload': hit.payload}
            )

        return results

    def load_folder_to_qdrant(
        self,
        folder: Path,
        collection_name: str,
        embed_type: str = 'text',  # "text" or "image"
        user_id: str | None = None,
    ) -> None:
        """Load all documents or images from folder into Qdrant."""
        all_files = list(folder.glob('*'))

        for file_path in all_files:
            if (
                embed_type == 'text'
                and file_path.suffix.lower()
                in Config.text_extensions + Config.pdf_extensions
            ):
                loader = DataLoader()
                chunks = loader.process_file(file_path)
                for chunk_idx, chunk in enumerate(chunks):
                    vector = text_embedding(chunk)
                    self.client.upsert(
                        collection_name=collection_name,
                        points=[
                            {
                                'id': str(
                                    uuid.uuid5(
                                        uuid.NAMESPACE_URL,
                                        f'{file_path}:{chunk_idx}',
                                    )
                                ),
                                'vector': vector,
                                'payload': {
                                    'text': chunk,
                                    'source': str(file_path),
                                    **(
                                        {'user_id': user_id} if user_id else {}
                                    ),
                                },
                            }
                        ],
                    )
                if chunks:
                    logger.info(
                        f'✅ Uploaded {len(chunks)} text chunks from {file_path.name}'
                    )
                else:
                    logger.info(f'⚠ Skipped empty text file {file_path.name}')

            elif (
                embed_type == 'image'
                and file_path.suffix.lower() in Config.image_extensions
            ):
                vector = image_embedding_from_path(str(file_path))
                self.client.upsert(
                    collection_name=collection_name,
                    points=[
                        {
                            'id': str(
                                uuid.uuid5(
                                    uuid.NAMESPACE_URL,
                                    f'{file_path}:image',
                                )
                            ),
                            'vector': vector,
                            'payload': {
                                'source': str(file_path),
                                **({'user_id': user_id} if user_id else {}),
                            },
                        }
                    ],
                )
                logger.info(f'✅ Uploaded image {file_path.name}')

            elif (
                embed_type == 'video'
                and file_path.suffix.lower() in Config.video_extensions
            ):
                vector = video_embedding_from_path(str(file_path))
                self.client.upsert(
                    collection_name=collection_name,
                    points=[
                        {
                            'id': str(
                                uuid.uuid5(
                                    uuid.NAMESPACE_URL,
                                    f'{file_path}:video',
                                )
                            ),
                            'vector': vector,
                            'payload': {
                                'source': str(file_path),
                                'modality': 'video',
                                **({'user_id': user_id} if user_id else {}),
                            },
                        }
                    ],
                )
                logger.info(f'✅ Uploaded video {file_path.name}')

backend/utils/supabase_client.py

Create a cached Supabase client for the given role.

Source code in backend/utils/supabase_client.py
@lru_cache(maxsize=2)
def get_supabase_client(role: str = 'service') -> Client:
    """Create a cached Supabase client for the given role."""
    url = _require('SUPABASE_URL')
    if role == 'anon':
        key = _require('SUPABASE_ANON_KEY')
    else:
        key = _require('SUPABASE_SERVICE_ROLE_KEY')
    return create_client(url, key)

backend/utils/log_config.py

Sets up rotating root logger to log everything into ./logs/app.log file.

Returns:

Type Description
Logger

logging.Logger: Root logger

Source code in backend/utils/log_config.py
def setup_logging() -> logging.Logger:
    """Sets up rotating root logger to log everything into ./logs/app.log file.

    Returns:
        logging.Logger: Root logger
    """
    project_root = Path(__file__).parent.parent.parent
    log_dir = project_root / Config.log_dir
    log_dir.mkdir(exist_ok=True)
    log_file = log_dir / Config.log_file

    root_logger = logging.getLogger()
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)

    formatter = logging.Formatter(
        '%(asctime)s %(levelname)s %(name)s %(funcName)s(%(lineno)d) - %(message)s',
        datefmt='%d.%m.%Y %H:%M:%S',
    )

    file_handler = RotatingFileHandler(
        log_file,
        maxBytes=Config.max_bytes,
        backupCount=Config.backup_count,
        encoding='utf-8',
    )
    file_handler.setFormatter(formatter)
    file_handler.setLevel(logging.DEBUG)

    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    console_handler.setLevel(logging.ERROR)

    root_logger.setLevel(logging.DEBUG)
    root_logger.addHandler(file_handler)
    root_logger.addHandler(console_handler)

    return root_logger