Source code for rossum_mcp.tools.get.handler
from __future__ import annotations
import asyncio
import dataclasses
import logging
from enum import StrEnum
from typing import TYPE_CHECKING
from fastmcp.exceptions import ToolError
from rossum_api.models.engine import EngineField
from rossum_mcp.tools.get.annotations import _get_annotation_content
from rossum_mcp.tools.get.engines import _get_engine_fields
from rossum_mcp.tools.get.registry import EntityConfig, build_get_registry
from rossum_mcp.tools.get.related import _get_schema_tree_structure, fetch_related
from rossum_mcp.tools.search.models import (
SearchQuery, # noqa: TC001 - needed at runtime for FastMCP parameter serialization
)
from rossum_mcp.tools.search.registry import extract_search_kwargs
if TYPE_CHECKING:
from fastmcp import FastMCP
from rossum_api import AsyncRossumAPIClient
logger = logging.getLogger(__name__)
[docs]
class EntityType(StrEnum):
QUEUE = "queue"
SCHEMA = "schema"
HOOK = "hook"
ENGINE = "engine"
RULE = "rule"
USER = "user"
WORKSPACE = "workspace"
EMAIL_TEMPLATE = "email_template"
ORGANIZATION_GROUP = "organization_group"
ANNOTATION = "annotation"
RELATION = "relation"
DOCUMENT_RELATION = "document_relation"
ORGANIZATION_LIMIT = "organization_limit"
HOOK_SECRETS_KEYS = "hook_secrets_keys"
def _serialize(obj: object) -> object:
if dataclasses.is_dataclass(obj) and not isinstance(obj, type):
return dataclasses.asdict(obj)
return obj
async def _get_one(
client: AsyncRossumAPIClient,
config: EntityConfig,
entity: str,
entity_id: int,
include_related: bool,
) -> dict[str, object]:
if config.retrieve_fn is None:
raise RuntimeError(f"Entity '{entity}' has no retrieve_fn — use search instead")
result = await config.retrieve_fn(entity_id)
data = _serialize(result)
response: dict[str, object] = {"entity": entity, "id": entity_id, "data": data}
if include_related:
related = await fetch_related(client, entity, entity_id, result)
if related:
response["_related"] = related
return response
async def _get_many(
client: AsyncRossumAPIClient,
config: EntityConfig,
entity: str,
entity_ids: list[int],
include_related: bool,
) -> list[dict[str, object]]:
tasks = [_get_one(client, config, entity, eid, include_related) for eid in entity_ids]
return list(await asyncio.gather(*tasks))
[docs]
def register_get_tools(mcp: FastMCP, client: AsyncRossumAPIClient) -> None: # noqa: C901 - many tool registrations
registry = build_get_registry(client)
# Fail fast at startup if EntityType drifts from the registry
for _entity in EntityType:
if _entity not in registry or registry[_entity].retrieve_fn is None:
raise RuntimeError(
f"EntityType member '{_entity}' is missing from registry or has no retrieve_fn — "
"update EntityType or build_get_registry to keep them in sync"
)
@mcp.tool(
description=(
"Get entities by ID. Accepts a single ID or a list of IDs for batch retrieval. "
"include_related=True enriches with related data (queue→schema_tree+engine+hooks, schema→queues+rules, hook→queues+events). "
"hook_secrets_keys returns stored secret key names (values are write-only, never returned)."
),
tags={"read"},
annotations={"readOnlyHint": True},
)
async def get(
entity: EntityType, entity_id: int | list[int], include_related: bool = False
) -> dict[str, object] | list[dict[str, object]]:
config = registry.get(entity)
if config is None:
raise ToolError(f"Unknown entity type: {entity}")
if config.retrieve_fn is None:
raise ToolError(f"Entity '{entity}' does not support get by ID. Use search instead.")
if isinstance(entity_id, list):
return await _get_many(client, config, entity, entity_id, include_related)
return await _get_one(client, config, entity, entity_id, include_related)
@mcp.tool(
description="Search/list entities with typed, entity-specific filters. Pass a query object with `entity` discriminator.",
tags={"read"},
annotations={"readOnlyHint": True},
)
async def search(query: SearchQuery) -> list[object]:
entity = query.entity
config = registry.get(entity)
if config is None:
raise ToolError(f"Unknown entity type: {entity}")
if config.search_fn is None:
raise ToolError(f"Entity '{entity}' does not support search/list.")
kwargs = extract_search_kwargs(query)
result = await config.search_fn(**kwargs)
return [_serialize(item) for item in result]
@mcp.tool(
description="Fetch annotation extracted content and save to a local JSON file; returns the path for jq/grep.",
tags={"annotations"},
annotations={"readOnlyHint": True},
)
async def get_annotation_content(annotation_id: int) -> dict:
return await _get_annotation_content(client, annotation_id)
@mcp.tool(
description="Lightweight schema tree (ids/labels/categories/types/required/hidden); accepts schema_id or queue_id.",
tags={"schemas"},
annotations={"readOnlyHint": True},
)
async def get_schema_tree_structure(schema_id: int | None = None, queue_id: int | None = None) -> list[dict]:
return await _get_schema_tree_structure(client, schema_id=schema_id, queue_id=queue_id)
@mcp.tool(
description="Retrieve engine fields for a specific engine or all engine fields.",
tags={"engines"},
annotations={"readOnlyHint": True},
)
async def get_engine_fields(engine_id: int | None = None) -> list[EngineField]:
return await _get_engine_fields(client, engine_id)