Observability
Agents emit structured events for every significant operation — RPC calls, state changes, schedule execution, workflow transitions, MCP connections, and more. These events are published to diagnostics channels and are silent by default (zero overhead when nobody is listening).
Every event has these fields:
{ type: "rpc", // what happened agent: "MyAgent", // which agent class emitted it name: "user-123", // which agent instance (Durable Object name) payload: { method: "getWeather" }, // details timestamp: 1758005142787 // when (ms since epoch)}agent and name identify the source agent — agent is the class name and name is the Durable Object instance name.
Events are routed to eight named channels based on their type:
| Channel | Event types | Description |
|---|---|---|
agents:state | state:update | State sync events |
agents:rpc | rpc, rpc:error | RPC method calls and failures |
agents:message | message:request, message:response, message:clear, message:cancel, message:error, tool:result, tool:approval | Chat message and tool lifecycle |
agents:schedule | schedule:create, schedule:execute, schedule:cancel, schedule:retry, schedule:error, queue:create, queue:retry, queue:error | Scheduled and queued task lifecycle |
agents:lifecycle | connect, disconnect, destroy | Agent connection and teardown |
agents:workflow | workflow:start, workflow:event, workflow:approved, workflow:rejected, workflow:terminated, workflow:paused, workflow:resumed, workflow:restarted | Workflow state transitions |
agents:mcp | mcp:client:preconnect, mcp:client:connect, mcp:client:authorize, mcp:client:discover | MCP client operations |
agents:email | email:receive, email:reply | Email processing |
The subscribe() function from agents/observability provides type-safe access to events on a specific channel:
import { subscribe } from "agents/observability";
const unsub = subscribe("rpc", (event) => { if (event.type === "rpc") { console.log(`RPC call: ${event.payload.method}`); } if (event.type === "rpc:error") { console.error( `RPC failed: ${event.payload.method} — ${event.payload.error}`, ); }});
// Clean up when doneunsub();import { subscribe } from "agents/observability";
const unsub = subscribe("rpc", (event) => { if (event.type === "rpc") { console.log(`RPC call: ${event.payload.method}`); } if (event.type === "rpc:error") { console.error( `RPC failed: ${event.payload.method} — ${event.payload.error}`, ); }});
// Clean up when doneunsub();The callback is fully typed — event is narrowed to only the event types that flow through that channel.
You can also subscribe directly using the Node.js API:
import { subscribe } from "node:diagnostics_channel";
subscribe("agents:schedule", (event) => { console.log(event);});import { subscribe } from "node:diagnostics_channel";
subscribe("agents:schedule", (event) => { console.log(event);});In production, all diagnostics channel messages are automatically forwarded to Tail Workers. No subscription code is needed in the agent itself — attach a Tail Worker and access events via event.diagnosticsChannelEvents:
export default { async tail(events) { for (const event of events) { for (const msg of event.diagnosticsChannelEvents) { // msg.channel is "agents:rpc", "agents:workflow", etc. // msg.message is the typed event payload console.log(msg.timestamp, msg.channel, msg.message); } } },};export default { async tail(events) { for (const event of events) { for (const msg of event.diagnosticsChannelEvents) { // msg.channel is "agents:rpc", "agents:workflow", etc. // msg.message is the typed event payload console.log(msg.timestamp, msg.channel, msg.message); } } },};This gives you structured, filterable observability in production with zero overhead in the agent hot path.
You can override the default implementation by providing your own Observability interface:
import { Agent } from "agents";const myObservability = { emit(event) { // Send to your logging service, filter events, etc. if (event.type === "rpc:error") { console.error(event.payload.method, event.payload.error); } },};
class MyAgent extends Agent { observability = myObservability;}import { Agent } from "agents";import type { Observability } from "agents/observability";
const myObservability: Observability = { emit(event) { // Send to your logging service, filter events, etc. if (event.type === "rpc:error") { console.error(event.payload.method, event.payload.error); } },};
class MyAgent extends Agent { override observability = myObservability;}Set observability to undefined to disable all event emission:
import { Agent } from "agents";
class MyAgent extends Agent { observability = undefined;}import { Agent } from "agents";
class MyAgent extends Agent { override observability = undefined;}| Type | Payload | When |
|---|---|---|
rpc | { method, streaming? } | A @callable method is invoked |
rpc:error | { method, error } | A @callable method throws |
| Type | Payload | When |
|---|---|---|
state:update | {} | setState() is called |
These events are emitted by AIChatAgent from @cloudflare/ai-chat. They track the chat message lifecycle, including client-side tool interactions.
| Type | Payload | When |
|---|---|---|
message:request | {} | A chat message is received |
message:response | {} | A chat response stream completes |
message:clear | {} | Chat history is cleared |
message:cancel | { requestId } | A streaming request is cancelled |
message:error | { error } | A chat stream fails |
tool:result | { toolCallId, toolName } | A client tool result is received |
tool:approval | { toolCallId, approved } | A tool call is approved or rejected |
| Type | Payload | When |
|---|---|---|
schedule:create | { callback, id } | A schedule is created |
schedule:execute | { callback, id } | A scheduled callback starts |
schedule:cancel | { callback, id } | A schedule is cancelled |
schedule:retry | { callback, id, attempt, maxAttempts } | A scheduled callback is retried |
schedule:error | { callback, id, error, attempts } | A scheduled callback fails after all retries |
queue:create | { callback, id } | A task is enqueued |
queue:retry | { callback, id, attempt, maxAttempts } | A queued callback is retried |
queue:error | { callback, id, error, attempts } | A queued callback fails after all retries |
| Type | Payload | When |
|---|---|---|
connect | { connectionId } | A WebSocket connection is established |
disconnect | { connectionId, code, reason } | A WebSocket connection is closed |
destroy | {} | The agent is destroyed |
| Type | Payload | When |
|---|---|---|
workflow:start | { workflowId, workflowName? } | A workflow instance is started |
workflow:event | { workflowId, eventType? } | An event is sent to a workflow |
workflow:approved | { workflowId, reason? } | A workflow is approved |
workflow:rejected | { workflowId, reason? } | A workflow is rejected |
workflow:terminated | { workflowId, workflowName? } | A workflow is terminated |
workflow:paused | { workflowId, workflowName? } | A workflow is paused |
workflow:resumed | { workflowId, workflowName? } | A workflow is resumed |
workflow:restarted | { workflowId, workflowName? } | A workflow is restarted |
| Type | Payload | When |
|---|---|---|
mcp:client:preconnect | { serverId } | Before connecting to an MCP server |
mcp:client:connect | { url, transport, state, error? } | An MCP connection attempt completes or fails |
mcp:client:authorize | { serverId, authUrl, clientId? } | An MCP OAuth flow begins |
mcp:client:discover | { url?, state?, error?, capability? } | MCP capability discovery succeeds or fails |
| Type | Payload | When |
|---|---|---|
email:receive | { from, to, subject? } | An email is received |
email:reply | { from, to, subject? } | A reply email is sent |