Source code for rossum_mcp.tools.update.handler
from __future__ import annotations
from typing import TYPE_CHECKING
from rossum_api.models.engine import Engine
from rossum_api.models.hook import Hook, HookAction, HookEvent, HookEventAndAction
from rossum_api.models.queue import Queue
from rossum_api.models.rule import Rule, RuleAction
from rossum_api.models.user import User
from rossum_mcp.tools.models import (
HookSideload, # noqa: TC001 - needed at runtime for FastMCP parameter serialization
SchemaNode, # noqa: TC001 - needed at runtime for FastMCP parameter serialization
)
from rossum_mcp.tools.update.annotations import _bulk_update_annotation_fields, _confirm_annotation, _start_annotation
from rossum_mcp.tools.update.engines import _update_engine
from rossum_mcp.tools.update.hooks import _test_hook, _update_hook
from rossum_mcp.tools.update.models import ( # noqa: TC001 - needed at runtime for FastMCP parameter serialization
EngineUpdateData,
QueueUpdateData,
SchemaNodeUpdate,
)
from rossum_mcp.tools.update.queues import _update_queue
from rossum_mcp.tools.update.rules import _patch_rule
from rossum_mcp.tools.update.schemas.handler import _patch_schema, _prune_schema_fields
from rossum_mcp.tools.update.schemas.patching import (
PatchOperation, # noqa: TC001 - needed at runtime for FastMCP parameter serialization
)
from rossum_mcp.tools.update.users import _update_user
if TYPE_CHECKING:
from fastmcp import FastMCP
from rossum_api import AsyncRossumAPIClient
[docs]
def register_update_tools(mcp: FastMCP, client: AsyncRossumAPIClient, base_url: str) -> None:
# --- Annotations ---
@mcp.tool(
description="Set annotation status to 'reviewing' (from 'to_review').",
tags={"annotations", "write"},
annotations={"readOnlyHint": False},
)
async def start_annotation(annotation_id: int) -> dict:
return await _start_annotation(client, annotation_id)
@mcp.tool(
description="Bulk update extracted fields. Requires annotation in 'reviewing' status. Use datapoint IDs from content, not schema_id.",
tags={"annotations", "write"},
annotations={"readOnlyHint": False},
)
async def bulk_update_annotation_fields(annotation_id: int, operations: list[dict]) -> dict:
return await _bulk_update_annotation_fields(client, annotation_id, operations)
@mcp.tool(
description="Set annotation status to 'confirmed' (typically after field updates).",
tags={"annotations", "write"},
annotations={"readOnlyHint": False},
)
async def confirm_annotation(annotation_id: int) -> dict:
return await _confirm_annotation(client, annotation_id)
# --- Queues ---
@mcp.tool(
description="Update queue settings.",
tags={"queues", "write"},
annotations={"readOnlyHint": False},
)
async def update_queue(queue_id: int, queue_data: QueueUpdateData) -> Queue | dict:
return await _update_queue(client, queue_id, queue_data)
# --- Schemas ---
@mcp.tool(
description="Patch schema nodes (add/update/remove). Prereq: load schema-patching skill. Ops: add (parent_id + node_data), update (node_id + node_data), remove (node_id only). Tuple datapoints require explicit id; section-level datapoints use the passed node_id.",
tags={"schemas", "write"},
annotations={"readOnlyHint": False},
)
async def patch_schema(
schema_id: int,
operation: PatchOperation,
node_id: str,
node_data: SchemaNode | SchemaNodeUpdate | None = None,
parent_id: str | None = None,
position: int | None = None,
) -> dict:
return await _patch_schema(client, schema_id, operation, node_id, node_data, parent_id, position)
@mcp.tool(
description="Remove many fields at once. Provide fields_to_keep (keep only these leaf IDs; parent containers preserved automatically; list section IDs to preserve them as empty containers) or fields_to_remove (remove these leaf IDs). Returns {removed_fields, remaining_fields}.",
tags={"schemas", "write"},
annotations={"readOnlyHint": False},
)
async def prune_schema_fields(
schema_id: int,
fields_to_keep: list[str] | None = None,
fields_to_remove: list[str] | None = None,
) -> dict:
return await _prune_schema_fields(client, schema_id, fields_to_keep, fields_to_remove)
# --- Engines ---
@mcp.tool(
description="Update engine settings.",
tags={"engines", "write"},
annotations={"readOnlyHint": False},
)
async def update_engine(engine_id: int, engine_data: EngineUpdateData) -> Engine | dict:
return await _update_engine(client, engine_id, engine_data)
# --- Hooks ---
@mcp.tool(
description="Patch a hook; only provided fields change. secrets is a dict of key-value env vars for serverless functions (write-only, values never returned). token_owner is a User URL for API token generation (cannot be organization_group_admin). run_after is a list of hook URLs that must execute before this hook. sideload controls which related objects are included in hook request payloads.",
tags={"hooks", "write"},
annotations={"readOnlyHint": False},
)
async def update_hook(
hook_id: int,
name: str | None = None,
queues: list[str] | None = None,
events: list[HookEventAndAction] | None = None,
config: dict | None = None,
settings: dict | None = None,
active: bool | None = None,
secrets: dict[str, str] | None = None,
token_owner: str | None = None,
run_after: list[str] | None = None,
sideload: list[HookSideload] | None = None,
) -> Hook:
return await _update_hook(
client, hook_id, name, queues, events, config, settings, active, secrets, token_owner, run_after, sideload
)
@mcp.tool(
description="Test a hook by auto-generating a realistic payload and executing it. For annotation_content/annotation_status events, annotation and status are auto-resolved from the hook's queues if not provided. If no annotations exist on the hook's queues, ask the user to upload a document first — never upload documents yourself.",
tags={"hooks", "write"},
annotations={"readOnlyHint": False},
)
async def test_hook(
hook_id: int,
event: HookEvent,
action: HookAction,
annotation: str | None = None,
status: str | None = None,
previous_status: str | None = None,
config: dict | None = None,
) -> dict:
return await _test_hook(client, hook_id, event, action, annotation, status, previous_status, config)
# --- Rules ---
@mcp.tool(
description="Patch a rule (PATCH); only provided fields change. queue_ids=[] clears queue scoping.",
tags={"rules", "write"},
annotations={"readOnlyHint": False},
)
async def patch_rule(
rule_id: int,
name: str | None = None,
trigger_condition: str | None = None,
actions: list[RuleAction] | None = None,
enabled: bool | None = None,
queue_ids: list[int] | None = None,
) -> Rule | dict:
return await _patch_rule(client, base_url, rule_id, name, trigger_condition, actions, enabled, queue_ids)
# --- Users ---
@mcp.tool(
description="Patch a user; only provided fields change. Use list_user_roles for role/group URLs.",
tags={"users", "write"},
annotations={"readOnlyHint": False},
)
async def update_user(
user_id: int,
username: str | None = None,
email: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
queues: list[str] | None = None,
groups: list[str] | None = None,
is_active: bool | None = None,
metadata: dict | None = None,
oidc_id: str | None = None,
auth_type: str | None = None,
ui_settings: dict | None = None,
) -> User | dict:
return await _update_user(
client,
user_id,
username,
email,
first_name,
last_name,
queues,
groups,
is_active,
metadata,
oidc_id,
auth_type,
ui_settings,
)