InitRunner

Sinks

Sinks define where agent output goes after a run completes. They are most useful in daemon mode and flow pipelines, where agents run unattended and their results need to be routed somewhere: a webhook, a file, a custom function, or another agent.

Sinks are configured in the spec.sinks list.

Quick Example

spec:
  sinks:
    - type: webhook
      url: https://hooks.slack.com/services/T.../B.../xxx
      headers:
        Content-Type: application/json
    - type: file
      path: ./output/results.json
      format: json

Sink Types

TypeDescription
webhookHTTP POST to a URL
fileWrite to a local file
customCall a Python function

Webhook

Sends a JSON payload to a URL via HTTP POST. Useful for Slack, Discord, PagerDuty, or any HTTP endpoint.

sinks:
  - type: webhook
    url: https://hooks.slack.com/services/T.../B.../xxx
    headers:
      Content-Type: application/json
      Authorization: Bearer ${WEBHOOK_TOKEN}
    timeout_seconds: 30
    retry_count: 3
FieldTypeDefaultDescription
urlstr(required)Destination URL
methodstr"POST"HTTP method
headersdict{}HTTP headers (supports ${VAR} substitution)
timeout_secondsint30Request timeout
retry_countint0Number of retry attempts on failure

Payload Format

The webhook POST body is a JSON object:

{
  "agent_name": "monitor-agent",
  "run_id": "a1b2c3d4e5f6",
  "prompt": "Check system health and report status.",
  "output": "All 3 services healthy. Response times: api=120ms, web=85ms, db=45ms.",
  "success": true,
  "error": null,
  "tokens_in": 850,
  "tokens_out": 400,
  "duration_ms": 4200,
  "model": "gpt-5-mini",
  "provider": "openai",
  "trigger_type": "cron",
  "trigger_metadata": {},
  "timestamp": "2025-01-15T09:00:05Z"
}

File

Appends agent output to a local file. Parent directories are created if they do not exist. Supports JSON and plain text formats.

sinks:
  - type: file
    path: ./output/results.json
    format: json
FieldTypeDefaultDescription
pathstr(required)Output file path
formatstr"json"Output format: "json" or "text"
  • json appends one JSON object per line (JSONL), same schema as the webhook payload
  • text appends one human-readable line per result: [timestamp] agent-name | OK | output

Custom

Calls a Python function with the run result. Use this for custom integrations like database writes, email, message queues, or anything else.

sinks:
  - type: custom
    module: my_sinks
    function: send_to_database
FieldTypeDefaultDescription
modulestr(required)Python module path (must be importable)
functionstr(required)Function name to call

The function signature:

def send_to_database(result: dict) -> None:
    """Called by InitRunner after each agent run.

    Args:
        result: Run result dict (same schema as webhook payload).
    """
    # ... process result

Multiple Sinks

An agent can have multiple sinks. All sinks fire after each run completes:

spec:
  sinks:
    # Log to file
    - type: file
      path: ./logs/runs.json
      format: json
    # Notify Slack
    - type: webhook
      url: ${SLACK_WEBHOOK_URL}
    # Store in database
    - type: custom
      module: my_sinks
      function: store_result

Sinks with Daemon Mode

Sinks are most commonly used with triggers and daemon mode. When a trigger fires and an agent run completes, all configured sinks receive the result:

spec:
  triggers:
    - type: cron
      schedule: "0 */6 * * *"
      prompt: "Check system health and report status."
  sinks:
    - type: webhook
      url: ${SLACK_WEBHOOK_URL}
    - type: file
      path: ./logs/health-checks.json
      format: json
initrunner run role.yaml --daemon

Every 6 hours, the agent runs, and the output is sent to both Slack and the log file.

Delegate Sink

The delegate sink routes one agent's output to one or more other agents. It is flow-only: you configure it under a flow agent's sink: field (spec.agents.<name>.sink), not in a role's spec.sinks list. Only successful runs are forwarded.

# Single target
spec:
  agents:
    writer:
      role: roles/writer.yaml
      sink:
        type: delegate
        target: editor

# Fan-out to multiple targets
spec:
  agents:
    triager:
      role: roles/triager.yaml
      sink:
        type: delegate
        target:
          - researcher
          - responder
FieldTypeDefaultDescription
typestr(required)Must be "delegate"
targetstr | list[str](required)Target agent name(s)
strategy"all" | "keyword" | "sense" | "ensemble""all"Routing strategy for multi-target delegates
ensembleEnsembleConfig | nullnullVoting config. Required when strategy is ensemble, rejected otherwise
loop_backLoopBackConfig | nullnullBounded loop-back edge for critic/refine patterns
keep_existing_sinksboolfalseWhen true, the agent's role-level sinks also fire alongside the delegate
queue_sizeint100Daemon ingress queue capacity (bounded backpressure for trigger-driven runs)
timeout_secondsint60Reserved (kept for schema compatibility)

For startup ordering, fan-in wiring, and full worked pipelines, see Flow. For routing multiple agents as a coordinated unit, see Team Mode.

Routing Strategy

The strategy field only matters when a delegate has multiple targets. With a single target it has no effect.

StrategyBehaviorAPI calls
allFan-out: every target receives every message (default)None
keywordIntent sensing keyword scoring picks one targetNone
senseKeyword scoring first, LLM tiebreaker when ambiguous0 or 1 per message
ensembleFan-out to all targets, then vote and keep one winnerDepends on mode

The keyword and sense strategies use the two-pass intent sensing logic. See Flow for the full routing walkthrough.

Ensemble Voting

With strategy: ensemble, the same prompt fans out to every target (like all), then a reducer keeps one winning answer that flows downstream as a single result. Ensemble requires at least two targets and an ensemble: block. The ensemble block is rejected for any other strategy.

spec:
  agents:
    drafter:
      role: roles/drafter.yaml
      sink:
        type: delegate
        strategy: ensemble
        target:
          - gpt
          - claude
          - gemini
        ensemble:
          mode: majority
FieldTypeDefaultDescription
mode"majority" | "weighted" | "judge""majority"How the winning answer is chosen
judge_modelstr"openai:gpt-4o-mini"Model used to score candidates when mode is judge
judge_criterialist[str][]Criteria the judge checks. Empty list falls back to clarity, completeness, accuracy
weightsdict[str, float] | nullnullPer-target weight for weighted mode. Keys must be target names, non-negative, not all zero

The three modes:

  • majority: the most frequent identical answer wins. Ties break on the lowest topology index, so the result is deterministic. Resolves in-process with no extra API calls.
  • weighted: the highest-weight target wins, with ties breaking on the lowest index. Requires a non-empty weights map. Resolves in-process with no extra API calls.
  • judge: an LLM judge scores each candidate, and the answer passing the most criteria wins (ties break on lowest index). Costs one judge call per candidate.

Weighted mode example:

sink:
  type: delegate
  strategy: ensemble
  target:
    - fast-model
    - strong-model
  ensemble:
    mode: weighted
    weights:
      fast-model: 1.0
      strong-model: 2.0

Each vote is recorded on the audit chain with trigger_type ensemble_vote and a vote trace. See Audit for the audit details and Flow for fan-in behavior.

Loop-Back Routing

A loop_back edge turns a forward delegation into a bounded refine loop, the classic writer to critic to writer pattern. It is the only cycle a flow permits. Every other cycle is rejected at validation.

spec:
  agents:
    writer:
      role: roles/writer.yaml
      sink:
        type: delegate
        target: critic
    critic:
      role: roles/critic.yaml
      sink:
        type: delegate
        target: publisher
        loop_back:
          target: writer
          max_iterations: 4
          until:
            output: "contains:APPROVED"
FieldTypeDefaultDescription
typestr"loop-back"Discriminator. Note the hyphenated value loop-back differs from the loop_back field name
targetstr(required)Agent the loop returns to. Must be a known agent and must not be one of the sink's forward targets
max_iterationsint3Hard cap on loop rounds, bounded 1 to 20
untildict[str, str] | nullnullOptional early-exit predicate. Only the output key is supported

The until value is one of:

  • contains:<text>: case-insensitive substring match against the latest output, for example a contains:APPROVED sentinel.
  • <op><number>: compares the first number parsed from the output, where <op> is one of >, >=, <, <=, ==. For example ">0.8" for a self-reported confidence score.

The loop stops when max_iterations rounds complete or the until predicate matches the latest output, whichever comes first. The flow depth limit remains a final backstop. See Flow for a full worked loop example.

Validate a flow and inspect its sink summaries with:

initrunner flow validate flow.yaml

The Sink column renders the delegate summary, for example delegate: a, b [ensemble:majority] for an ensemble sink, with a (loop-back: writer x4) note when a loop-back edge is set.

On this page