Source code for rossum_mcp.tools.create.handler

from __future__ import annotations

from collections.abc import Sequence  # noqa: TC003 - needed at runtime for FastMCP
from typing import TYPE_CHECKING

from rossum_api.models.email_template import EmailTemplate
from rossum_api.models.engine import Engine, EngineField, EngineFieldType
from rossum_api.models.hook import Hook, HookEventAndAction, HookType
from rossum_api.models.queue import Queue
from rossum_api.models.rule import Rule, RuleAction
from rossum_api.models.user import User
from rossum_api.models.workspace import Workspace

from rossum_mcp.tools.create.annotations import _copy_annotations, _upload_document
from rossum_mcp.tools.create.email_templates import _create_email_template
from rossum_mcp.tools.create.engines import _create_engine, _create_engine_field
from rossum_mcp.tools.create.hooks import _create_hook, _create_hook_from_template
from rossum_mcp.tools.create.queues import _create_queue_from_template
from rossum_mcp.tools.create.rules import _create_rule
from rossum_mcp.tools.create.users import _create_user
from rossum_mcp.tools.create.workspaces import _create_workspace
from rossum_mcp.tools.models import (
    EmailRecipient,
    EmailTemplateType,
    EngineType,
    HookSideload,
    QueueTemplateName,
)

if TYPE_CHECKING:
    from fastmcp import FastMCP
    from rossum_api import AsyncRossumAPIClient


[docs] def register_create_tools(mcp: FastMCP, client: AsyncRossumAPIClient, base_url: str) -> None: # --- Annotations --- @mcp.tool( description="Upload a document; use search(entity='annotation', queue_id=...) to find the created annotation.", tags={"annotations", "write"}, annotations={"readOnlyHint": False}, ) async def upload_document(file_path: str, queue_id: int) -> dict: return await _upload_document(client, file_path, queue_id) @mcp.tool( description="Copy annotations to another queue. reimport=True re-extracts data in the target queue (use when moving/uploading documents between queues). reimport=False preserves original extracted data as-is.", tags={"annotations", "write"}, annotations={"readOnlyHint": False}, ) async def copy_annotations( annotation_ids: Sequence[int], target_queue_id: int, target_status: str | None = None, reimport: bool = False, ) -> dict: return await _copy_annotations(client, base_url, annotation_ids, target_queue_id, target_status, reimport) # --- Queues --- @mcp.tool( description="Create a queue from a template (includes schema + engine defaults).", tags={"queues", "write"}, annotations={"readOnlyHint": False}, ) async def create_queue_from_template( name: str, template_name: QueueTemplateName, workspace_id: int, include_documents: bool = False, engine_id: int | None = None, ) -> Queue | dict: return await _create_queue_from_template( client, base_url, name, template_name, workspace_id, include_documents, engine_id ) # --- Engines --- @mcp.tool( description="Create an engine; create matching engine fields for the target schema immediately after.", tags={"engines", "write"}, annotations={"readOnlyHint": False}, ) async def create_engine(name: str, organization_id: int, engine_type: EngineType) -> Engine | dict: return await _create_engine(client, base_url, name, organization_id, engine_type) @mcp.tool( description="Create an engine field corresponding to a schema field (used during engine+schema setup).", tags={"engines", "write"}, annotations={"readOnlyHint": False}, ) async def create_engine_field( engine_id: int, name: str, label: str, field_type: EngineFieldType, schema_ids: list[int], tabular: bool = False, multiline: bool = False, subtype: str | None = None, pre_trained_field_id: str | None = None, ) -> EngineField | dict: return await _create_engine_field( client, base_url, engine_id, name, label, field_type, schema_ids, tabular, multiline, subtype, pre_trained_field_id, ) # --- Hooks --- @mcp.tool( description="Create a hook. Function hooks: config.source auto-renamed to config.code, default runtime python3.12, timeout_s capped at 60. secrets is a dict of key-value env vars for serverless functions (write-only, values never returned). token_owner is a User URL for API token generation (cannot be organization_group_admin). run_after is a list of hook URLs that must execute before this hook. sideload controls which related objects are included in hook request payloads.", tags={"hooks", "write"}, annotations={"readOnlyHint": False}, ) async def create_hook( name: str, type: HookType, queues: list[str] | None = None, events: list[HookEventAndAction] | None = None, config: dict | None = None, settings: dict | None = None, secrets: dict[str, str] | None = None, token_owner: str | None = None, run_after: list[str] | None = None, sideload: list[HookSideload] | None = None, ) -> Hook: return await _create_hook( client, name, type, queues, events, config, settings, secrets, token_owner, run_after, sideload ) @mcp.tool( description="Create a hook from a template; events may override template defaults. If template requires use_token_owner, provide token_owner (not an organization_group_admin user).", tags={"hooks", "write"}, annotations={"readOnlyHint": False}, ) async def create_hook_from_template( name: str, hook_template_id: int, queues: list[str], events: list[HookEventAndAction] | None = None, token_owner: str | None = None, ) -> Hook: return await _create_hook_from_template(client, name, hook_template_id, queues, events, token_owner) # --- Rules --- @mcp.tool( description="Create a rule: trigger is a TxScript condition; action includes id, type, event, payload. Scope with schema_id and/or queue_ids (at least one required).", tags={"rules", "write"}, annotations={"readOnlyHint": False}, ) async def create_rule( name: str, trigger_condition: str, actions: list[RuleAction], enabled: bool = True, schema_id: int | None = None, queue_ids: list[int] | None = None, ) -> Rule | dict: return await _create_rule(client, base_url, name, trigger_condition, actions, enabled, schema_id, queue_ids) # --- Users --- @mcp.tool( description="Create a user (requires username + email). Use list_user_roles for role/group URLs; queue/group fields take full API URLs.", tags={"users", "write"}, annotations={"readOnlyHint": False}, ) async def create_user( username: str, email: str, queues: list[str] | None = None, groups: list[str] | None = None, first_name: str | None = None, last_name: str | None = None, is_active: bool = True, metadata: dict | None = None, oidc_id: str | None = None, auth_type: str = "password", ) -> User | dict: return await _create_user( client, username, email, queues, groups, first_name, last_name, is_active, metadata, oidc_id, auth_type ) # --- Workspaces --- @mcp.tool( description="Create a new workspace.", tags={"workspaces", "write"}, annotations={"readOnlyHint": False}, ) async def create_workspace(name: str, organization_id: int, metadata: dict | None = None) -> Workspace | dict: return await _create_workspace(client, base_url, name, organization_id, metadata) # --- Email Templates --- @mcp.tool( description="Create an email template; set automate=true for automatic sending. to/cc/bcc are recipient objects {type: annotator|constant|datapoint, value: ...}.", tags={"email_templates", "write"}, annotations={"readOnlyHint": False}, ) async def create_email_template( name: str, queue: int, subject: str, message: str, type: EmailTemplateType = EmailTemplateType.CUSTOM, automate: bool = False, to: list[EmailRecipient] | None = None, cc: list[EmailRecipient] | None = None, bcc: list[EmailRecipient] | None = None, triggers: list[str] | None = None, ) -> EmailTemplate | dict: return await _create_email_template( client, base_url, name, queue, subject, message, type, automate, to, cc, bcc, triggers )