Skip to content

Backend API

main.py

Health check endpoint.

Returns:

Name Type Description
dict Dict[str, str]

A simple JSON message confirming that the backend is running.

Example response

{ "message": "Multimodal RAG backend is running!!!" }

Source code in backend/main.py
@app.get('/')
def root() -> Dict[str, str]:
    """Health check endpoint.

    Returns:
        dict: A simple JSON message confirming that the backend is running.

    Example response:
        {
            "message": "Multimodal RAG backend is running!!!"
        }
    """
    return {'message': 'Multimodal RAG backend is running!!!'}

Prometheus scrape endpoint.

Source code in backend/main.py
@app.get('/metrics')
def metrics() -> Response:
    """Prometheus scrape endpoint."""
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

api/endpoints.py

Bases: BaseModel

Schema for user query requests.

Source code in backend/api/endpoints.py
class QueryRequest(BaseModel):
    """Schema for user query requests."""

    query: str = Field(min_length=1, max_length=4000)
    top_k: int = Field(default=5, ge=1, le=50)
    image: Optional[str] = None
    model: Optional[str] = Field(default=None, max_length=256)
    attachment_id: Optional[str] = Field(default=None, max_length=64)
    folder_ids: list[str] = Field(default_factory=list)
    file_ids: list[str] = Field(default_factory=list)

    @field_validator('query')
    @classmethod
    def validate_query(cls, value: str) -> str:
        """Ensure query is not empty after trimming."""
        stripped = value.strip()
        if not stripped:
            raise ValueError('query must not be empty')
        return stripped

Bases: BaseModel

Schema for text embedding endpoint.

Source code in backend/api/endpoints.py
class TextEmbeddingRequest(BaseModel):
    """Schema for text embedding endpoint."""

    text: str = Field(min_length=1, max_length=4000)
    provider: str = Field(
        default=Config.default_embedding_provider, min_length=1
    )

    @field_validator('text')
    @classmethod
    def validate_text(cls, value: str) -> str:
        """Ensure non-empty text after stripping."""
        stripped = value.strip()
        if not stripped:
            raise ValueError('text must not be empty')
        return stripped

Bases: BaseModel

Schema for image embedding endpoint.

Source code in backend/api/endpoints.py
class ImageEmbeddingRequest(BaseModel):
    """Schema for image embedding endpoint."""

    image_path: str = Field(min_length=1, max_length=2048)
    provider: str = Field(
        default=Config.default_embedding_provider, min_length=1
    )

    @field_validator('image_path')
    @classmethod
    def validate_image_path(cls, value: str) -> str:
        """Ensure image path exists and points to a file."""
        image_path = Path(value)
        if not image_path.exists() or not image_path.is_file():
            raise ValueError('image_path must point to an existing file')
        return str(image_path)

Bases: BaseModel

Schema for video embedding endpoint.

Source code in backend/api/endpoints.py
class VideoEmbeddingRequest(BaseModel):
    """Schema for video embedding endpoint."""

    video_path: str = Field(min_length=1, max_length=2048)
    sample_fps: float = Field(
        default=Config.embedding_video_sample_fps, gt=0, le=10
    )
    provider: str = Field(
        default=Config.default_embedding_provider, min_length=1
    )

    @field_validator('video_path')
    @classmethod
    def validate_video_path(cls, value: str) -> str:
        """Ensure video path exists and points to a file."""
        video_path = Path(value)
        if not video_path.exists() or not video_path.is_file():
            raise ValueError('video_path must point to an existing file')
        return str(video_path)

Handle chat request with optional model selection and attachment.

Source code in backend/api/endpoints.py
@router.post('/ask')
async def ask_mixed(
    request: Request,
    query: str | None = Form(default=None),
    top_k: int = Form(default=5),
    image: str | None = Form(default=None),
    model: str | None = Form(default=None),
    attachment_id: str | None = Form(default=None),
    folder_ids: str | None = Form(default=None),
    file_ids: str | None = Form(default=None),
    attachment: UploadFile | None = OPTIONAL_UPLOAD_FILE,
) -> dict:
    """Handle chat request with optional model selection and attachment."""
    payload = await _parse_query_request(
        request,
        query,
        top_k,
        image,
        model,
        attachment_id,
        folder_ids,
        file_ids,
    )

    (
        extra_docs,
        transient_storage_path,
        attachment_file_id,
    ) = await _prepare_attachment_data(
        request_payload=payload,
        attachment_file=attachment,
        user_id=None,
    )

    effective_file_ids = list(dict.fromkeys(payload.file_ids))
    if attachment_file_id:
        effective_file_ids.append(attachment_file_id)

    try:
        result = rag.generate_answer(
            payload.query,
            top_k=payload.top_k,
            image=payload.image,
            model=payload.model,
            file_ids=effective_file_ids or None,
            extra_docs=extra_docs,
        )
        return result
    finally:
        if transient_storage_path:
            delete_stored_file(transient_storage_path)

Run RAG with user filtering and optional attachment/model.

Source code in backend/api/endpoints.py
@router.post('/ask_auth')
async def ask_mixed_auth(
    request: Request,
    user: Annotated[dict, Depends(get_current_user)],
    query: str | None = Form(default=None),
    top_k: int = Form(default=5),
    image: str | None = Form(default=None),
    model: str | None = Form(default=None),
    attachment_id: str | None = Form(default=None),
    folder_ids: str | None = Form(default=None),
    file_ids: str | None = Form(default=None),
    attachment: UploadFile | None = OPTIONAL_UPLOAD_FILE,
) -> dict:
    """Run RAG with user filtering and optional attachment/model."""
    payload = await _parse_query_request(
        request,
        query,
        top_k,
        image,
        model,
        attachment_id,
        folder_ids,
        file_ids,
    )

    kb = _kb_service()
    (
        extra_docs,
        transient_storage_path,
        attachment_file_id,
    ) = await _prepare_attachment_data(
        request_payload=payload,
        attachment_file=attachment,
        user_id=user['id'],
    )

    effective_file_ids = list(dict.fromkeys(payload.file_ids))
    if attachment_file_id:
        effective_file_ids.append(attachment_file_id)

    folder_scopes: list[str] | None = None
    if not effective_file_ids and payload.folder_ids:
        expanded = kb.get_descendant_folder_ids(
            user_id=user['id'], folder_ids=payload.folder_ids
        )
        folder_scopes = ['root', *expanded]

    try:
        result = rag.generate_answer(
            payload.query,
            top_k=payload.top_k,
            image=payload.image,
            user_id=user['id'],
            model=payload.model,
            folder_scopes=folder_scopes,
            file_ids=effective_file_ids or None,
            extra_docs=extra_docs,
        )

        supabase = get_supabase_client(role='service')
        supabase.table('query_history').insert(
            {
                'user_id': user['id'],
                'query': payload.query,
                'answer': result.get('answer', ''),
                'retrieved_docs': result.get('retrieved_docs', []),
            }
        ).execute()

        return result
    finally:
        if transient_storage_path:
            delete_stored_file(transient_storage_path)

Generate embedding for a text payload.

Source code in backend/api/endpoints.py
@router.post('/embed/text')
def embed_text(request: TextEmbeddingRequest) -> dict:
    """Generate embedding for a text payload."""
    vector = text_embedding(request.text, provider_name=request.provider)
    return {
        'provider': request.provider,
        'modality': 'text',
        'dimension': len(vector),
        'embedding': vector,
    }

Generate embedding for a local image file.

Source code in backend/api/endpoints.py
@router.post('/embed/image')
def embed_image(request: ImageEmbeddingRequest) -> dict:
    """Generate embedding for a local image file."""
    vector = image_embedding_from_path(
        request.image_path,
        provider_name=request.provider,
    )
    return {
        'provider': request.provider,
        'modality': 'image',
        'dimension': len(vector),
        'embedding': vector,
    }

Generate embedding for a local video file.

Source code in backend/api/endpoints.py
@router.post('/embed/video')
def embed_video(request: VideoEmbeddingRequest) -> dict:
    """Generate embedding for a local video file."""
    vector = video_embedding_from_path(
        request.video_path,
        sample_fps=request.sample_fps,
        provider_name=request.provider,
    )
    return {
        'provider': request.provider,
        'modality': 'video',
        'dimension': len(vector),
        'embedding': vector,
        'sample_fps': request.sample_fps,
    }

Upload a file from chat attachment and persist metadata.

Source code in backend/api/endpoints.py
@router.post('/files/upload')
async def upload_file(
    file: UploadFile = REQUIRED_UPLOAD_FILE,
    user: Annotated[dict, Depends(get_current_user)] = None,
) -> dict:
    """Upload a file from chat attachment and persist metadata."""
    stored = await save_upload_file(file)
    kb = _kb_service()
    kb.create_uploaded_file_record(
        user_id=user['id'],
        file_id=stored.file_id,
        filename=stored.filename,
        mime=stored.mime,
        size=stored.size,
        storage_path=stored.storage_path,
    )

    ingest = _ingest_service()
    ingest.ingest_file(
        file_id=stored.file_id,
        file_path=stored.storage_path,
        filename=stored.filename,
        mime=stored.mime,
        user_id=user['id'],
        folder_id=None,
        folder_name=None,
    )

    return {
        'file_id': stored.file_id,
        'filename': stored.filename,
        'mime': stored.mime,
        'size': stored.size,
        'storage_path': stored.storage_path,
    }

Get KB tree (folders and attached files) for the current user.

Source code in backend/api/endpoints.py
@router.get('/kb/tree')
def kb_tree(user: Annotated[dict, Depends(get_current_user)]) -> dict:
    """Get KB tree (folders and attached files) for the current user."""
    kb = _kb_service()
    return kb.build_tree(user_id=user['id'])

Return latest query history for the current user.

Source code in backend/api/endpoints.py
@router.get('/history')
def get_history(user: Annotated[dict, Depends(get_current_user)]) -> dict:
    """Return latest query history for the current user."""
    supabase = get_supabase_client(role='service')
    resp = (
        supabase.table('query_history')
        .select('*')
        .eq('user_id', user['id'])
        .order('created_at', desc=True)
        .limit(50)
        .execute()
    )
    return {'data': getattr(resp, 'data', None)}

core/embeddings.py

Generate an embedding vector for a given text string.

Parameters:

Name Type Description Default
text str

The input text to encode.

required
provider_name str | None

Optional provider name override.

None

Returns:

Type Description
list[float]

List[float]: A list of floats representing the text embedding.

Notes
  • This embedding can be stored in a vector database like Qdrant.
  • Ensure the model used for text embeddings is compatible with your retrieval pipeline.
Source code in backend/core/embeddings.py
def text_embedding(
    text: str,
    provider_name: str | None = None,
) -> list[float]:
    """Generate an embedding vector for a given text string.

    Args:
        text (str): The input text to encode.
        provider_name (str | None): Optional provider name override.

    Returns:
        List[float]: A list of floats representing the text embedding.

    Notes:
        - This embedding can be stored in a vector database like Qdrant.
        - Ensure the model used for text embeddings is compatible with your retrieval pipeline.
    """
    started = time.perf_counter()
    resolved_provider = provider_name or Config.default_embedding_provider
    status = 'ok'
    try:
        provider = get_provider(resolved_provider)
        vector = provider.encode_text(text)
        return vector
    except Exception:
        status = 'error'
        raise
    finally:
        observe_embedding_request(
            modality='text',
            provider=resolved_provider,
            status=status,
            duration_seconds=time.perf_counter() - started,
        )

Generate an embedding vector for an image from a file path.

Parameters:

Name Type Description Default
image_path str

Path to the image file to encode.

required
provider_name str | None

Optional provider name override.

None

Returns:

Type Description
List[float]

List[float]: A list of floats representing the image embedding.

Notes
  • The image is converted to RGB before encoding.
  • Uses a CLIP-based model ("clip-ViT-B-32") for generating visual embeddings.
  • Embeddings can be stored in Qdrant or compared with other image embeddings.
Source code in backend/core/embeddings.py
def image_embedding_from_path(
    image_path: str,
    provider_name: str | None = None,
) -> List[float]:
    """Generate an embedding vector for an image from a file path.

    Args:
        image_path (str): Path to the image file to encode.
        provider_name (str | None): Optional provider name override.

    Returns:
        List[float]: A list of floats representing the image embedding.

    Notes:
        - The image is converted to RGB before encoding.
        - Uses a CLIP-based model ("clip-ViT-B-32") for generating visual embeddings.
        - Embeddings can be stored in Qdrant or compared with other image embeddings.
    """
    started = time.perf_counter()
    resolved_provider = provider_name or Config.default_embedding_provider
    status = 'ok'
    try:
        provider = get_provider(resolved_provider)
        with Image.open(image_path) as img:
            vector = provider.encode_image(img)
        return vector
    except Exception:
        status = 'error'
        raise
    finally:
        observe_embedding_request(
            modality='image',
            provider=resolved_provider,
            status=status,
            duration_seconds=time.perf_counter() - started,
        )

Generate an embedding vector for a video from a file path.

Parameters:

Name Type Description Default
video_path str

Path to the video file to encode.

required
sample_fps float | None

Sampling FPS. Falls back to config when None.

None
provider_name str | None

Optional provider name override.

None
Source code in backend/core/embeddings.py
def video_embedding_from_path(
    video_path: str,
    sample_fps: float | None = None,
    provider_name: str | None = None,
) -> List[float]:
    """Generate an embedding vector for a video from a file path.

    Args:
        video_path (str): Path to the video file to encode.
        sample_fps (float | None): Sampling FPS. Falls back to config when None.
        provider_name (str | None): Optional provider name override.
    """
    started = time.perf_counter()
    resolved_provider = provider_name or Config.default_embedding_provider
    resolved_sample_fps = sample_fps or Config.embedding_video_sample_fps
    status = 'ok'
    try:
        provider = get_provider(resolved_provider)
        vector = provider.encode_video(
            video_path,
            sample_fps=resolved_sample_fps,
        )
        return vector
    except Exception:
        status = 'error'
        raise
    finally:
        observe_embedding_request(
            modality='video',
            provider=resolved_provider,
            status=status,
            duration_seconds=time.perf_counter() - started,
        )

core/llm.py

Wrapper for a multimodal Vision-Text LLM (Qwen2-VL) to generate text from images and prompts.

Attributes:

Name Type Description
processor

Processor for preparing images and text for the model.

model

The loaded Vision2Seq model for multimodal inference.

Source code in backend/core/llm.py
class QwenVisionLLM:
    """Wrapper for a multimodal Vision-Text LLM (Qwen2-VL) to generate text from images and prompts.

    Attributes:
        processor: Processor for preparing images and text for the model.
        model: The loaded Vision2Seq model for multimodal inference.
    """

    def __init__(self) -> None:
        """Initialize the Vision LLM, loading the model and processor."""
        logger.info(f'Loading {Config.llm_model_name}...')
        self.processor = AutoProcessor.from_pretrained(MODEL_NAME)
        self.model = AutoModelForVision2Seq.from_pretrained(
            MODEL_NAME,
            torch_dtype=torch.float16
            if torch.backends.mps.is_available()
            else torch.float32,
            device_map='auto',
        )
        logger.info(f'{Config.llm_model_name} loaded successfully')

    def build_messages(self, prompt, image=None) -> List[Dict[str, Any]]:
        """Constructs a chat-style message payload for the model.

        Args:
            prompt (str): Text prompt to send to the model.
            image (str or PIL.Image.Image, optional): Path, URL, or PIL Image to include in the message.

        Returns:
            list[dict]: A list of message dictionaries ready for the processor.
        """
        content = []
        if image is not None:
            if isinstance(image, str):
                if image.startswith('http'):
                    response = requests.get(image, stream=True, timeout=10)
                    response.raise_for_status()
                    img = Image.open(response.raw).convert('RGB')
                else:
                    img = Image.open(image).convert('RGB')
            else:
                img = image
            content.append({'type': 'image', 'image': img})
        content.append({'type': 'text', 'text': prompt})
        messages = [{'role': 'user', 'content': content}]
        return messages

    def generate(self, prompt, context=None, image=None) -> str:
        """Generate a text response using the Vision LLM, optionally with context and/or image.

        Args:
            prompt (str): The main text prompt or question.
            context (list[dict], optional): A list of retrieved documents for RAG context. Each dict must contain 'text'.
            image (str or PIL.Image.Image, optional): Path, URL, or PIL image to include in generation.

        Returns:
            str: The generated text output from the model.
        """
        full_prompt = prompt
        if context:
            context_text = '\n'.join([d['text'] for d in context])
            full_prompt = f'Используя следующий контекст:\n{context_text}\n\nОтветьте на вопрос:\n{prompt}'

        messages = self.build_messages(full_prompt, image)

        inputs = self.processor.apply_chat_template(
            messages,
            add_generation_prompt=True,
            tokenize=True,
            return_dict=True,
            return_tensors='pt',
        ).to(self.model.device)

        with torch.inference_mode():
            output = self.model.generate(
                **inputs,
                max_new_tokens=Config.llm_max_new_tokens,
                do_sample=False,
            )

        result = self.processor.decode(
            output[0][inputs['input_ids'].shape[-1] :],
            skip_special_tokens=True,
        )
        return result

Helper function to generate a response using the global QwenVisionLLM instance.

Parameters:

Name Type Description Default
prompt str

User prompt or question.

required
context list[dict]

Retrieved documents for context.

None
image str or Image

Image to include in generation.

None
model str | None

Requested model identifier.

None

Returns:

Name Type Description
str str

Generated text from the LLM.

Source code in backend/core/llm.py
def get_llm_response(prompt, context=None, image=None, model=None) -> str:
    """Helper function to generate a response using the global QwenVisionLLM instance.

    Args:
        prompt (str): User prompt or question.
        context (list[dict], optional): Retrieved documents for context.
        image (str or PIL.Image.Image, optional): Image to include in generation.
        model (str | None, optional): Requested model identifier.

    Returns:
        str: Generated text from the LLM.
    """
    backend = _resolve_llm_backend(model)

    if context:
        by_source: Dict[str, List[str]] = {}
        for item in context:
            source = item.get('source') or 'unknown'
            by_source.setdefault(source, []).append(item.get('text', ''))

        summaries: List[str] = []
        for source, texts in by_source.items():
            source_text = '\n'.join(texts).strip()
            source_prompt = (
                'Сформулируй ОДНО короткое предложение, о чем этот источник. '
                'Используй только информацию из текста. '
                'Не повторяй слова "Источник", "Текст", "Запрос", не цитируй текст. '
                'Не добавляй лишние заголовки.\n\n'
                f'Текст:\n{source_text}\n\n'
                f'Запрос пользователя: {prompt}'
            )
            raw_summary = backend.generate(
                source_prompt, context=None, image=image
            )
            # Sanitize common prompt-echo artifacts
            summary = (
                raw_summary.replace('Источник:', '')
                .replace('Текст:', '')
                .replace('Запрос пользователя:', '')
                .replace('Summary:', '')
                .strip()
            )
            summaries.append(f'- {source}: {summary}')

        return '\n'.join(summaries)

    return backend.generate(prompt, context=None, image=image)

core/multimodal_rag.py

Local Retrieval-Augmented Generation (RAG) pipeline using Qdrant and a local LLM.

Attributes:

Name Type Description
client QdrantClient

Client to connect to the local Qdrant vector database.

Source code in backend/core/multimodal_rag.py
class LocalRAG:
    """Local Retrieval-Augmented Generation (RAG) pipeline using Qdrant and a local LLM.

    Attributes:
        client (QdrantClient): Client to connect to the local Qdrant vector database.
    """

    def __init__(self) -> None:
        """Initialize the LocalRAG pipeline by connecting to the Qdrant instance."""
        self.client = QdrantHandler(
            url=Config.qdrant_url,
            collection_name=Config.qdrant_text_collection,
            vector_size=Config.text_vector_size,
        )

    def retrieve_data(
        self,
        query: str,
        top_k: int = 5,
        user_id: str | None = None,
        folder_scopes: list[str] | None = None,
        file_ids: list[str] | None = None,
    ) -> List[Dict[str, str]]:
        """Retrieve top-K most similar documents from Qdrant for a given query.

        Args:
            query (str): User text query.
            top_k (int, optional): Number of top documents to retrieve. Defaults to 5.
            user_id (str | None, optional): Filter results by user id. Defaults to None.
            folder_scopes (list[str] | None, optional): Optional folder scope filter.
            file_ids (list[str] | None, optional): Optional file filter.

        Returns:
            List[Dict[str, str]]: A list of dictionaries containing the retrieved documents:
                - 'text' (str): The text content of the document chunk.
                - 'source' (str): Source file path or metadata for the chunk.
        """
        query_vector = text_embedding(query)
        results = self.client.search(
            query_vector=query_vector,
            top_k=top_k,
            user_id=user_id,
            folder_scopes=folder_scopes,
            file_ids=file_ids,
        )

        docs = []
        for r in results:
            payload = r.get('payload', {})
            docs.append(
                {
                    'text': payload.get('text', ''),
                    'source': payload.get('source', ''),
                }
            )

        return docs

    def generate_answer(
        self,
        query: str,
        top_k: int = 5,
        image=None,
        user_id: str | None = None,
        model: str | None = None,
        folder_scopes: list[str] | None = None,
        file_ids: list[str] | None = None,
        extra_docs: list[dict[str, str]] | None = None,
    ) -> Dict[str, Any]:
        """Generate an answer using the local LLM based on the query and optionally an image.

        Args:
            query (str): User question or prompt.
            top_k (int, optional): Number of top documents to retrieve for context. Defaults to 5.
            image (Optional[str], optional): Optional image path or URL to include in the prompt. Defaults to None.
            user_id (str | None, optional): Filter context by user id. Defaults to None.
            model (str | None, optional): LLM model name hint for backend routing.
            folder_scopes (list[str] | None, optional): Optional folder filter for retrieval.
            file_ids (list[str] | None, optional): Optional file filter for retrieval.
            extra_docs (list[dict[str, str]] | None, optional): Extra context docs from attachments.

        Returns:
            Dict[str, object]: A dictionary with:
                - 'answer' (str): The generated text answer from the LLM.
                - 'retrieved_docs' (List[Dict[str, str]]): The list of retrieved documents used as context.
        """
        query_type = 'multimodal' if image else 'text'
        started = time.perf_counter()
        status = 'ok'
        docs: List[Dict[str, str]] = []

        try:
            from backend.core.llm import get_llm_response

            docs = self.retrieve_data(
                query,
                top_k,
                user_id=user_id,
                folder_scopes=folder_scopes,
                file_ids=file_ids,
            )
            final_docs = docs + (extra_docs or [])
            answer_text = get_llm_response(
                query, context=final_docs, image=image, model=model
            )
            return {'answer': answer_text, 'retrieved_docs': final_docs}
        except Exception:
            status = 'error'
            raise
        finally:
            observe_rag_query(
                query_type=query_type,
                status=status,
                duration_seconds=time.perf_counter() - started,
                retrieved_docs_count=len(docs),
            )

Retrieve top-K most similar documents from Qdrant for a given query.

Parameters:

Name Type Description Default
query str

User text query.

required
top_k int

Number of top documents to retrieve. Defaults to 5.

5
user_id str | None

Filter results by user id. Defaults to None.

None
folder_scopes list[str] | None

Optional folder scope filter.

None
file_ids list[str] | None

Optional file filter.

None

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: A list of dictionaries containing the retrieved documents: - 'text' (str): The text content of the document chunk. - 'source' (str): Source file path or metadata for the chunk.

Source code in backend/core/multimodal_rag.py
def retrieve_data(
    self,
    query: str,
    top_k: int = 5,
    user_id: str | None = None,
    folder_scopes: list[str] | None = None,
    file_ids: list[str] | None = None,
) -> List[Dict[str, str]]:
    """Retrieve top-K most similar documents from Qdrant for a given query.

    Args:
        query (str): User text query.
        top_k (int, optional): Number of top documents to retrieve. Defaults to 5.
        user_id (str | None, optional): Filter results by user id. Defaults to None.
        folder_scopes (list[str] | None, optional): Optional folder scope filter.
        file_ids (list[str] | None, optional): Optional file filter.

    Returns:
        List[Dict[str, str]]: A list of dictionaries containing the retrieved documents:
            - 'text' (str): The text content of the document chunk.
            - 'source' (str): Source file path or metadata for the chunk.
    """
    query_vector = text_embedding(query)
    results = self.client.search(
        query_vector=query_vector,
        top_k=top_k,
        user_id=user_id,
        folder_scopes=folder_scopes,
        file_ids=file_ids,
    )

    docs = []
    for r in results:
        payload = r.get('payload', {})
        docs.append(
            {
                'text': payload.get('text', ''),
                'source': payload.get('source', ''),
            }
        )

    return docs

Generate an answer using the local LLM based on the query and optionally an image.

Parameters:

Name Type Description Default
query str

User question or prompt.

required
top_k int

Number of top documents to retrieve for context. Defaults to 5.

5
image Optional[str]

Optional image path or URL to include in the prompt. Defaults to None.

None
user_id str | None

Filter context by user id. Defaults to None.

None
model str | None

LLM model name hint for backend routing.

None
folder_scopes list[str] | None

Optional folder filter for retrieval.

None
file_ids list[str] | None

Optional file filter for retrieval.

None
extra_docs list[dict[str, str]] | None

Extra context docs from attachments.

None

Returns:

Type Description
Dict[str, Any]

Dict[str, object]: A dictionary with: - 'answer' (str): The generated text answer from the LLM. - 'retrieved_docs' (List[Dict[str, str]]): The list of retrieved documents used as context.

Source code in backend/core/multimodal_rag.py
def generate_answer(
    self,
    query: str,
    top_k: int = 5,
    image=None,
    user_id: str | None = None,
    model: str | None = None,
    folder_scopes: list[str] | None = None,
    file_ids: list[str] | None = None,
    extra_docs: list[dict[str, str]] | None = None,
) -> Dict[str, Any]:
    """Generate an answer using the local LLM based on the query and optionally an image.

    Args:
        query (str): User question or prompt.
        top_k (int, optional): Number of top documents to retrieve for context. Defaults to 5.
        image (Optional[str], optional): Optional image path or URL to include in the prompt. Defaults to None.
        user_id (str | None, optional): Filter context by user id. Defaults to None.
        model (str | None, optional): LLM model name hint for backend routing.
        folder_scopes (list[str] | None, optional): Optional folder filter for retrieval.
        file_ids (list[str] | None, optional): Optional file filter for retrieval.
        extra_docs (list[dict[str, str]] | None, optional): Extra context docs from attachments.

    Returns:
        Dict[str, object]: A dictionary with:
            - 'answer' (str): The generated text answer from the LLM.
            - 'retrieved_docs' (List[Dict[str, str]]): The list of retrieved documents used as context.
    """
    query_type = 'multimodal' if image else 'text'
    started = time.perf_counter()
    status = 'ok'
    docs: List[Dict[str, str]] = []

    try:
        from backend.core.llm import get_llm_response

        docs = self.retrieve_data(
            query,
            top_k,
            user_id=user_id,
            folder_scopes=folder_scopes,
            file_ids=file_ids,
        )
        final_docs = docs + (extra_docs or [])
        answer_text = get_llm_response(
            query, context=final_docs, image=image, model=model
        )
        return {'answer': answer_text, 'retrieved_docs': final_docs}
    except Exception:
        status = 'error'
        raise
    finally:
        observe_rag_query(
            query_type=query_type,
            status=status,
            duration_seconds=time.perf_counter() - started,
            retrieved_docs_count=len(docs),
        )

core/embedding_providers.py

Bases: ABC

Base interface for embedding providers.

Source code in backend/core/embedding_providers.py
class EmbeddingProvider(ABC):
    """Base interface for embedding providers."""

    @abstractmethod
    def encode_text(self, text: str) -> list[float]:
        """Encode text into an embedding vector."""

    @abstractmethod
    def encode_image(self, image: Image.Image) -> list[float]:
        """Encode image into an embedding vector."""

    @abstractmethod
    def encode_video(
        self, video_path: str, sample_fps: float = 1.0
    ) -> list[float]:
        """Encode video into an embedding vector."""

Bases: EmbeddingProvider

SentenceTransformers-based provider for text, image and video.

Source code in backend/core/embedding_providers.py
class SentenceTransformerProvider(EmbeddingProvider):
    """SentenceTransformers-based provider for text, image and video."""

    def __init__(
        self,
        text_model_name: str = 'all-MiniLM-L6-v2',
        image_model_name: str = 'clip-ViT-B-32',
    ) -> None:
        """Initialize text and image encoders used for all modalities.

        Args:
            text_model_name (str): SentenceTransformers model name for text.
            image_model_name (str): SentenceTransformers CLIP-like model for images.
        """
        self._text_model = SentenceTransformer(text_model_name)
        self._image_model = SentenceTransformer(image_model_name)

    def encode_text(self, text: str) -> list[float]:
        """Encode a text string into a dense vector."""
        return self._text_model.encode(text).tolist()

    def encode_image(self, image: Image.Image) -> list[float]:
        """Encode a PIL image into a dense vector."""
        rgb_image = image.convert('RGB')
        return self._image_model.encode(
            rgb_image,
            convert_to_numpy=True,
        ).tolist()

    def encode_video(
        self, video_path: str, sample_fps: float = 1.0
    ) -> list[float]:
        """Encode a video by sampling frames and mean-pooling frame vectors.

        Args:
            video_path (str): Path to a local video file.
            sample_fps (float): Target frame sampling rate.

        Returns:
            list[float]: Aggregated video embedding.
        """
        if sample_fps <= 0:
            raise ValueError('sample_fps must be > 0')

        # Delayed import avoids loading video stack for text-only workloads.
        from torchvision.io import read_video

        frames, _, info = read_video(video_path, pts_unit='sec')
        if frames.numel() == 0:
            raise ValueError(f'No frames extracted from video: {video_path}')

        source_fps = float(info.get('video_fps') or 1.0)
        stride = max(int(round(source_fps / sample_fps)), 1)
        sampled_frames = frames[::stride]
        if sampled_frames.numel() == 0:
            sampled_frames = frames[:1]

        pil_frames = [
            Image.fromarray(frame.numpy()).convert('RGB')
            for frame in sampled_frames
        ]
        frame_vectors = self._image_model.encode(
            pil_frames,
            convert_to_numpy=True,
        )
        video_vector = np.mean(frame_vectors, axis=0)
        return video_vector.tolist()

Resolve embedding provider by name.

Source code in backend/core/embedding_providers.py
def get_provider(name: str | None = None) -> EmbeddingProvider:
    """Resolve embedding provider by name."""
    resolved_name = name or Config.default_embedding_provider
    if resolved_name not in _PROVIDER_INSTANCES:
        _PROVIDER_INSTANCES[resolved_name] = _build_provider(resolved_name)
    return _PROVIDER_INSTANCES[resolved_name]

utils/load_data.py

Unified loader for text documents and images with optional Qdrant upsert.

Source code in backend/utils/load_data.py
class DataLoader:
    """Unified loader for text documents and images with optional Qdrant upsert."""

    @staticmethod
    def chunk_text(
        text: str,
        size: int = Config.chunk_size,
        overlap: int = Config.chunk_overlap,
    ) -> List[str]:
        """Split text into overlapping chunks."""
        if size <= 0:
            raise ValueError('chunk_size must be greater than 0')
        if overlap < 0:
            raise ValueError('chunk_overlap must be non-negative')
        if overlap >= size:
            raise ValueError('chunk_overlap must be smaller than chunk_size')

        chunks = []
        start = 0
        while start < len(text):
            end = start + size
            chunks.append(text[start:end])
            start += size - overlap
        return chunks

    @staticmethod
    def load_file(file_path: Path) -> str:
        """Load text from a single file (PDF, TXT, MD)."""
        text_parts = []
        if file_path.suffix.lower() in Config.pdf_extensions:
            from langchain_community.document_loaders import PyPDFLoader

            loader = PyPDFLoader(str(file_path))
            docs = loader.load()
            text_parts.extend([doc.page_content for doc in docs])
        elif file_path.suffix.lower() in Config.text_extensions:
            from langchain_community.document_loaders import TextLoader

            loader = TextLoader(str(file_path))
            docs = loader.load()
            text_parts.extend([doc.page_content for doc in docs])
        else:
            logger.info(f'Skipped unsupported file {file_path.name}')
        return '\n'.join(text_parts)

    @staticmethod
    def list_images(folder: Path) -> List[Path]:
        """List all image paths in a folder."""
        return [
            p
            for p in folder.glob('*')
            if p.suffix.lower() in Config.image_extensions
        ]

    def process_file(self, file_path: Path) -> List[str]:
        """Process a single text document into chunks."""
        text = self.load_file(file_path)
        if not text:
            return []
        return self.chunk_text(text)

monitoring/metrics.py

Observe common HTTP metrics.

Source code in backend/monitoring/metrics.py
def observe_http_request(
    method: str,
    path: str,
    status: int,
    duration_seconds: float,
) -> None:
    """Observe common HTTP metrics."""
    normalized_path = normalize_http_path(path)
    HTTP_REQUESTS_TOTAL.labels(
        method=method,
        path=normalized_path,
        status=str(status),
    ).inc()
    HTTP_REQUEST_DURATION_SECONDS.labels(
        method=method,
        path=normalized_path,
    ).observe(duration_seconds)

Observe RAG query metrics.

Source code in backend/monitoring/metrics.py
def observe_rag_query(
    query_type: str,
    status: str,
    duration_seconds: float,
    retrieved_docs_count: int,
) -> None:
    """Observe RAG query metrics."""
    RAG_QUERIES_TOTAL.labels(query_type=query_type, status=status).inc()
    RAG_QUERY_DURATION_SECONDS.labels(query_type=query_type).observe(
        duration_seconds
    )
    RAG_RETRIEVED_DOCS.labels(query_type=query_type).observe(
        retrieved_docs_count
    )

Observe embedding generation metrics.

Source code in backend/monitoring/metrics.py
def observe_embedding_request(
    modality: str,
    provider: str,
    status: str,
    duration_seconds: float,
) -> None:
    """Observe embedding generation metrics."""
    EMBEDDING_REQUESTS_TOTAL.labels(
        modality=modality,
        provider=provider,
        status=status,
    ).inc()
    EMBEDDING_DURATION_SECONDS.labels(
        modality=modality,
        provider=provider,
    ).observe(duration_seconds)