Source code for rossum_mcp.tools.get.related

"""include_related fetchers for enriched get responses.

Only meaningful for a few entities. Uses asyncio.gather for parallel fetches.
"""

from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING

from fastmcp.exceptions import ToolError
from rossum_api import APIClientError
from rossum_api.domain_logic.resources import Resource
from rossum_api.models import deserialize_default
from rossum_api.models.engine import Engine
from rossum_api.models.hook import Hook
from rossum_api.models.queue import Queue

from rossum_mcp.tools.base import build_filters, extract_id_from_url, graceful_list
from rossum_mcp.tools.get.schemas import extract_schema_tree
from rossum_mcp.tools.search.registry import _list_hooks

if TYPE_CHECKING:
    from collections.abc import Awaitable, Callable

    from rossum_api import AsyncRossumAPIClient

logger = logging.getLogger(__name__)






async def _get_queue_engine(client: AsyncRossumAPIClient, queue_id: int) -> Engine | dict:
    """Retrieve the engine assigned to a queue."""
    logger.debug(f"Retrieving queue engine: queue_id={queue_id}")
    queue: Queue = await client.retrieve_queue(queue_id)

    engine_url = None
    if queue.dedicated_engine:
        engine_url = queue.dedicated_engine
    elif queue.generic_engine:
        engine_url = queue.generic_engine
    elif queue.engine:
        engine_url = queue.engine

    if not engine_url:
        return {"message": "No engine assigned to this queue"}

    try:
        if isinstance(engine_url, str):
            engine_id = extract_id_from_url(engine_url)
            engine: Engine = await client.retrieve_engine(engine_id)
        else:
            engine = deserialize_default(Resource.Engine, engine_url)
    except APIClientError as e:
        if e.status_code == 404:
            return {"message": f"Engine not found (engine URL: {engine_url})"}
        raise

    return engine


async def _get_schema_tree_structure(
    client: AsyncRossumAPIClient, schema_id: int | None = None, queue_id: int | None = None
) -> list[dict]:
    """Get lightweight schema tree structure. Used by related fetchers."""
    # Import here to avoid circular import: get.related → get.registry → search.registry → ... → get.related
    from rossum_mcp.tools.get.registry import _get_schema  # noqa: PLC0415 - circular import avoidance

    if schema_id is None and queue_id is None:
        raise ToolError("Provide schema_id or queue_id")
    if schema_id is not None and queue_id is not None:
        raise ToolError("Provide schema_id or queue_id, not both")
    if queue_id:
        queue = await client.retrieve_queue(queue_id)
        schema_id = extract_id_from_url(queue.schema)
    schema = await _get_schema(client, schema_id)  # type: ignore[arg-type]
    return extract_schema_tree(schema)


async def _fetch_queue_related(
    client: AsyncRossumAPIClient, queue_id: int, _obj: object | None = None
) -> dict[str, object]:
    schema_tree, engine, hooks = await asyncio.gather(
        _get_schema_tree_structure(client, queue_id=queue_id),
        _get_queue_engine(client, queue_id),
        _list_hooks(client, queue_id=queue_id),
        return_exceptions=True,
    )

    related: dict[str, object] = {}

    if isinstance(schema_tree, BaseException):
        logger.warning(f"Failed to fetch schema tree for queue {queue_id}: {schema_tree}")
    else:
        related["schema_tree"] = schema_tree

    if isinstance(engine, BaseException):
        logger.warning(f"Failed to fetch engine for queue {queue_id}: {engine}")
    else:
        related["engine"] = engine

    if isinstance(hooks, BaseException):
        logger.warning(f"Failed to fetch hooks for queue {queue_id}: {hooks}")
    else:
        related["hooks"] = [{"id": h.id, "name": h.name, "active": h.active} for h in hooks]
        related["hooks_count"] = len(hooks)

    return related


async def _fetch_schema_related(
    client: AsyncRossumAPIClient, schema_id: int, _obj: object | None = None
) -> dict[str, object]:
    queue_result, rules_result = await asyncio.gather(
        graceful_list(client, Resource.Queue, "queue", **build_filters(schema=schema_id)),
        graceful_list(client, Resource.Rule, "rule", **build_filters(schema=schema_id)),
    )

    return {
        "queues": [q.url for q in queue_result.items],
        "rules": [{"id": r.id, "name": r.name, "enabled": r.enabled} for r in rules_result.items],
    }


async def _fetch_hook_related(
    client: AsyncRossumAPIClient, hook_id: int, obj: object | None = None
) -> dict[str, object]:
    # Use the already-fetched hook if available; avoids a redundant network call.
    hook = obj if isinstance(obj, Hook) else await client.retrieve_hook(hook_id)
    return {
        "queues": list(hook.queues) if hook.queues else [],
        "events": list(hook.events) if hook.events else [],
    }


RELATED_FETCHERS: dict[str, Callable[..., Awaitable[dict[str, object]]]] = {
    "queue": _fetch_queue_related,
    "schema": _fetch_schema_related,
    "hook": _fetch_hook_related,
}