Message Router
The Message Router is a central hub for capability-based message routing in WebAgents. It enables automatic wiring of handlers based on declared capabilities, supports custom event types, and provides extensibility through hooks.
Overview
The router provides:
- Auto-wiring — handlers declare
subscribesandproduces, the router wires them automatically. - Priority-based selection — preferred handlers run first.
- Loop prevention — three-layer protection (source tracking, seen set, TTL).
- Observers — non-consuming listeners for logging / analytics.
- System events — control flow (stop, cancel, error, ping/pong).
- Extensibility hooks —
onUnroutable,onError,beforeRoute,afterRoute.
Basic Usage
import { MessageRouter, BufferSink } from 'webagents';
import type { UAMPEvent, RouterContext } from 'webagents';
const router = new MessageRouter();
async function* processText(event: UAMPEvent, ctx: RouterContext) {
yield {
id: 'resp-1',
type: 'response.delta',
payload: { text: 'Hello!' },
} satisfies UAMPEvent;
}
router.registerHandler({
name: 'text-handler',
subscribes: ['input.text'],
produces: ['response.delta'],
priority: 0,
process: processText,
});
router.setDefault('text-handler');
const sink = new BufferSink();
router.registerSink(sink);
router.setActiveSink(sink.id);
await router.send({
id: 'msg-1',
type: 'input.text',
payload: { text: 'Hello' },
});
console.log(sink.getEvents());Handler Declaration
Using @handoff
import { Skill, handoff } from 'webagents';
class MySkill extends Skill {
readonly name = 'my-skill';
@handoff({
name: 'my-handler',
subscribes: ['input.text'],
produces: ['response.delta'],
priority: 50,
})
async *process(events) {
yield { type: 'response.delta', delta: 'Response' } as const;
}
}Regex Pattern Matching
@handoff({
name: 'translator',
subscribes: [/^translate\..+$/], // matches translate.en, translate.fr
produces: ['response.delta'],
})
async *translate(events) {
// event.type might be 'translate.en', 'translate.es', etc.
yield { type: 'response.delta', delta: '...' } as const;
}Default Values
| Parameter | Default | Description |
|---|---|---|
subscribes | ['input.text'] | Most handlers process text |
produces | ['response.delta'] | Most handlers stream responses |
priority | 50 (Python) / 0 (TS) | Lower runs first; in TS with priority 0 and the higher-priority interpretation, see router.ts |
Observers
Observers receive copies of events without consuming them:
import { Skill, observe } from 'webagents';
class LoggingSkill extends Skill {
readonly name = 'logging';
@observe({ name: 'message-logger', subscribes: ['*'] })
async logMessages(event) {
console.log(`[${event.type}]`, event.payload);
// Does NOT consume — message continues to handlers
}
}Transport Sinks
CallbackSink
import { CallbackSink } from 'webagents';
const events: unknown[] = [];
const sink = new CallbackSink((e) => events.push(e));
router.registerSink(sink);BufferSink
import { BufferSink } from 'webagents';
const sink = new BufferSink({ maxSize: 100 });
router.registerSink(sink);
const allEvents = sink.getEvents();Loop Prevention
The router implements three-layer protection:
- Source tracking — messages carry their source handler; the router won't route back to the producer.
- Seen set — tracks which handlers have already processed a message.
- TTL (Time-to-Live) — maximum hops a message can traverse (default: 10).
Extensibility Hooks
Error Handling
router.onError(async (error, event, handler, context) => {
console.error(`Handler ${handler.name} failed:`, error);
});Unroutable Events
router.onUnroutable(async (event, context) => {
console.warn(`No handler for ${event.type}`);
});Interceptors
router.beforeRoute(async (event, handler, context) => {
if (isBlocked(event)) return null; // Block
return event; // Continue
});
router.afterRoute(async (event, handler, context) => {
logMetric('routed', handler.name);
return event;
});
function isBlocked(_: unknown) { return false; }
function logMetric(_: string, __: string) {}System Events
| Event | Description |
|---|---|
system.error | Error occurred during processing |
system.stop | Request to stop current processing |
system.cancel | Cancel and cleanup resources |
system.ping | Keep-alive request |
system.pong | Keep-alive response |
system.unroutable | No handler found for message |
Backward Compatibility (Python)
The new subscribes / produces parameters are optional in Python. Existing code works unchanged.
# Before (still works)
@handoff(name='my-handler', priority=10)
async def process(self, messages, **kwargs):
pass
# Equivalent to:
@handoff(
name='my-handler',
priority=10,
subscribes=['input.text'],
produces=['response.delta'],
)
async def process(self, messages, **kwargs):
passAPI Reference
UAMPEvent
interface UAMPEvent {
id: string;
type: string;
payload: Record<string, unknown>;
source?: string; // Handler that produced this
ttl?: number; // Time-to-live
seen?: string[]; // Handlers that processed this
}Handler
interface Handler {
name: string;
subscribes: (string | RegExp)[];
produces: string[];
priority?: number;
process: (event: UAMPEvent, context?: RouterContext) => AsyncGenerator<UAMPEvent>;
}TransportSink
abstract class TransportSink {
readonly id: string;
readonly isActive: boolean;
abstract send(event: ServerEvent): Promise<void>;
abstract close(): void;
}MessageRouter
class MessageRouter {
send(event: UAMPEvent, context?: RouterContext): Promise<void>;
registerHandler(handler: Handler): void;
unregisterHandler(name: string): void;
registerObserver(observer: Observer): void;
unregisterObserver(name: string): void;
route(eventType: string, handlerName: string, priority?: number): void;
registerSink(sink: TransportSink): void;
registerDefaultSink(sink: TransportSink): void;
unregisterSink(sinkId: string): void;
setActiveSink(sinkId: string): void;
setDefault(handlerName: string): void;
onUnroutable(handler: Function): void;
onError(handler: Function): void;
beforeRoute(interceptor: Function): void;
afterRoute(interceptor: Function): void;
}