Robutler

Message Router

The Message Router is a central hub for capability-based message routing in WebAgents. It enables automatic wiring of handlers based on declared capabilities, supports custom event types, and provides extensibility through hooks.

Overview

The router provides:

  • Auto-wiring — handlers declare subscribes and produces, the router wires them automatically.
  • Priority-based selection — preferred handlers run first.
  • Loop prevention — three-layer protection (source tracking, seen set, TTL).
  • Observers — non-consuming listeners for logging / analytics.
  • System events — control flow (stop, cancel, error, ping/pong).
  • Extensibility hooksonUnroutable, onError, beforeRoute, afterRoute.

Basic Usage

import { MessageRouter, BufferSink } from 'webagents';
import type { UAMPEvent, RouterContext } from 'webagents';

const router = new MessageRouter();

async function* processText(event: UAMPEvent, ctx: RouterContext) {
  yield {
    id: 'resp-1',
    type: 'response.delta',
    payload: { text: 'Hello!' },
  } satisfies UAMPEvent;
}

router.registerHandler({
  name: 'text-handler',
  subscribes: ['input.text'],
  produces: ['response.delta'],
  priority: 0,
  process: processText,
});

router.setDefault('text-handler');

const sink = new BufferSink();
router.registerSink(sink);
router.setActiveSink(sink.id);

await router.send({
  id: 'msg-1',
  type: 'input.text',
  payload: { text: 'Hello' },
});

console.log(sink.getEvents());

Handler Declaration

Using @handoff

import { Skill, handoff } from 'webagents';

class MySkill extends Skill {
  readonly name = 'my-skill';

  @handoff({
    name: 'my-handler',
    subscribes: ['input.text'],
    produces: ['response.delta'],
    priority: 50,
  })
  async *process(events) {
    yield { type: 'response.delta', delta: 'Response' } as const;
  }
}

Regex Pattern Matching

@handoff({
  name: 'translator',
  subscribes: [/^translate\..+$/],   // matches translate.en, translate.fr
  produces: ['response.delta'],
})
async *translate(events) {
  // event.type might be 'translate.en', 'translate.es', etc.
  yield { type: 'response.delta', delta: '...' } as const;
}

Default Values

ParameterDefaultDescription
subscribes['input.text']Most handlers process text
produces['response.delta']Most handlers stream responses
priority50 (Python) / 0 (TS)Lower runs first; in TS with priority 0 and the higher-priority interpretation, see router.ts

Observers

Observers receive copies of events without consuming them:

import { Skill, observe } from 'webagents';

class LoggingSkill extends Skill {
  readonly name = 'logging';

  @observe({ name: 'message-logger', subscribes: ['*'] })
  async logMessages(event) {
    console.log(`[${event.type}]`, event.payload);
    // Does NOT consume — message continues to handlers
  }
}

Transport Sinks

CallbackSink

import { CallbackSink } from 'webagents';

const events: unknown[] = [];
const sink = new CallbackSink((e) => events.push(e));
router.registerSink(sink);

BufferSink

import { BufferSink } from 'webagents';

const sink = new BufferSink({ maxSize: 100 });
router.registerSink(sink);

const allEvents = sink.getEvents();

Loop Prevention

The router implements three-layer protection:

  1. Source tracking — messages carry their source handler; the router won't route back to the producer.
  2. Seen set — tracks which handlers have already processed a message.
  3. TTL (Time-to-Live) — maximum hops a message can traverse (default: 10).

Extensibility Hooks

Error Handling

router.onError(async (error, event, handler, context) => {
  console.error(`Handler ${handler.name} failed:`, error);
});

Unroutable Events

router.onUnroutable(async (event, context) => {
  console.warn(`No handler for ${event.type}`);
});

Interceptors

router.beforeRoute(async (event, handler, context) => {
  if (isBlocked(event)) return null;  // Block
  return event;                       // Continue
});

router.afterRoute(async (event, handler, context) => {
  logMetric('routed', handler.name);
  return event;
});

function isBlocked(_: unknown) { return false; }
function logMetric(_: string, __: string) {}

System Events

EventDescription
system.errorError occurred during processing
system.stopRequest to stop current processing
system.cancelCancel and cleanup resources
system.pingKeep-alive request
system.pongKeep-alive response
system.unroutableNo handler found for message

Backward Compatibility (Python)

The new subscribes / produces parameters are optional in Python. Existing code works unchanged.

# Before (still works)
@handoff(name='my-handler', priority=10)
async def process(self, messages, **kwargs):
    pass

# Equivalent to:
@handoff(
    name='my-handler',
    priority=10,
    subscribes=['input.text'],
    produces=['response.delta'],
)
async def process(self, messages, **kwargs):
    pass

API Reference

UAMPEvent

interface UAMPEvent {
  id: string;
  type: string;
  payload: Record<string, unknown>;
  source?: string;       // Handler that produced this
  ttl?: number;          // Time-to-live
  seen?: string[];       // Handlers that processed this
}

Handler

interface Handler {
  name: string;
  subscribes: (string | RegExp)[];
  produces: string[];
  priority?: number;
  process: (event: UAMPEvent, context?: RouterContext) => AsyncGenerator<UAMPEvent>;
}

TransportSink

abstract class TransportSink {
  readonly id: string;
  readonly isActive: boolean;
  abstract send(event: ServerEvent): Promise<void>;
  abstract close(): void;
}

MessageRouter

class MessageRouter {
  send(event: UAMPEvent, context?: RouterContext): Promise<void>;
  registerHandler(handler: Handler): void;
  unregisterHandler(name: string): void;
  registerObserver(observer: Observer): void;
  unregisterObserver(name: string): void;
  route(eventType: string, handlerName: string, priority?: number): void;
  registerSink(sink: TransportSink): void;
  registerDefaultSink(sink: TransportSink): void;
  unregisterSink(sinkId: string): void;
  setActiveSink(sinkId: string): void;
  setDefault(handlerName: string): void;
  onUnroutable(handler: Function): void;
  onError(handler: Function): void;
  beforeRoute(interceptor: Function): void;
  afterRoute(interceptor: Function): void;
}

On this page