InitRunner

Flow

Agent Flow lets you define multiple agents in a single flow.yaml file, wire them together with delegate sinks, and run them all with one command. Each agent runs as an asyncio task on a shared event loop, with agent executions dispatched to a thread pool. Delegate sinks route output from one agent to another via in-memory queues.

Agents start in tiers based on needs. Each agent is a standalone unit connected to others via delegate sinks.

Quick Start

# flow.yaml
apiVersion: initrunner/v1
kind: Flow
metadata:
  name: my-pipeline
  description: Simple producer-consumer pipeline
spec:
  agents:
    producer:
      role: roles/producer.yaml
      sink:
        type: delegate
        target: consumer
    consumer:
      role: roles/consumer.yaml
      needs:
        - producer
# Validate
initrunner flow validate flow.yaml

# Start (foreground, Ctrl+C to stop)
initrunner flow up flow.yaml

Scaffold a Project

Use initrunner flow new to generate a complete multi-agent project with role files and a flow.yaml:

initrunner flow new my-pipeline                          # default: pipeline pattern
initrunner flow new my-pipeline --pattern fan-out        # dispatcher + parallel workers
initrunner flow new my-pipeline --pattern route          # intake with sense-based routing
initrunner flow new my-pipeline --agents 4               # customize agent count
initrunner flow new my-pipeline --shared-memory          # enable shared memory
initrunner flow new --list-patterns                      # show available patterns

Three patterns are available:

PatternDescription
pipelineLinear chain of agents. Configurable agent count.
fan-outA dispatcher fans work to parallel workers.
routeAn intake agent routes messages to specialized agents (researcher, responder, escalator) using sense-based scoring.

Each pattern generates a ready-to-run project directory with role YAML files and a flow.yaml. Review the generated files, customize as needed, then run with initrunner flow up flow.yaml.

Flow Definition

The top-level structure follows the apiVersion/kind/metadata/spec pattern:

FieldTypeDefaultDescription
apiVersionstr(required)e.g. initrunner/v1
kindstr(required)Must be "Flow"
metadata.namestr(required)Flow definition name
metadata.descriptionstr""Human-readable description
spec.agentsdict(required)Map of agent name to configuration

Agent Configuration

agents:
  my-agent:
    role: roles/my-role.yaml
    sink:
      type: delegate
      target: other-agent
    needs:
      - dependency-agent
    restart:
      condition: on-failure
      max_retries: 3
      delay_seconds: 5
    environment: {}
FieldTypeDefaultDescription
rolestr(required)Path to role YAML (relative to flow file)
sinkobject | nullnullDelegate sink for routing output
needslist[str][]Agents that must start first
restart.conditionstr"none""none", "on-failure", or "always"
restart.max_retriesint3Maximum restart attempts
restart.delay_secondsint5Seconds before restarting
environmentdict{}Additional environment variables

Delegate Sinks

Route an agent's output to other agents via in-memory queues.

# Single target
sink:
  type: delegate
  target: consumer
  queue_size: 100
  timeout_seconds: 60

# Fan-out to multiple targets
sink:
  type: delegate
  strategy: sense
  target:
    - researcher
    - responder
  keep_existing_sinks: true
FieldTypeDefaultDescription
typestr(required)Must be "delegate"
targetstr | list[str](required)Target agent name(s)
strategy"all" | "keyword" | "sense""all"Routing strategy for multi-target delegates
keep_existing_sinksboolfalseAlso activate role-level sinks
queue_sizeint100Max buffered events in target's inbox
timeout_secondsint60Block time when queue is full before dropping
circuit_breaker_thresholdint | nullnullConsecutive failures before circuit opens
circuit_breaker_reset_secondsint60Seconds before probe in open state

Only successful runs are forwarded. Failed runs are silently skipped.

Routing Strategy

When a delegate sink has multiple targets, the strategy field controls how messages are routed.

StrategyBehaviorAPI calls
allFan-out — every target receives every message (default, backward compatible)None
keywordIntent Sensing keyword scoring picks the best targetNone
senseKeyword scoring first; LLM tiebreaker when ambiguous0 or 1 per message

The keyword and sense strategies use the same two-pass Intent Sensing logic used by --sense in the CLI. They score the agent's output text against each target agent's metadata.name, metadata.description, and metadata.tags from its role definition.

Before (static fan-out): every message goes to ALL targets:

triager:
  role: roles/triager.yaml
  sink:
    type: delegate
    target: [researcher, responder, escalator]

After (sense picks the right target):

triager:
  role: roles/triager.yaml
  sink:
    type: delegate
    strategy: sense              # ← one line added
    target: [researcher, responder, escalator]

How routing works

  1. The upstream agent's output is scored against each target's role metadata (name, description, tags) using keyword matching.
  2. If the output doesn't produce a confident match, the original user prompt (preserved from the head of the delegation chain) is also scored.
  3. For sense strategy, if both attempts are inconclusive, an LLM tiebreaker call selects the best target.
  4. The message is forwarded to the selected target only (not fanned out).

Routing diagnostics are injected into the payload's trigger metadata as _flow_route_reason for audit visibility.

Optimizing roles for routing

The same tips from Intent Sensing — Writing Roles That Sense Well apply. Each target agent's role should have specific, non-overlapping tags and a clear description:

# roles/researcher.yaml
metadata:
  name: researcher
  description: Researches topics in depth and gathers supporting evidence
  tags: [research, analysis, investigation, evidence]

# roles/responder.yaml
metadata:
  name: responder
  description: Responds directly to user queries with concise answers
  tags: [response, chat, answer, reply]

# roles/escalator.yaml
metadata:
  name: escalator
  description: Escalates complex issues to human operators
  tags: [escalation, support, human, complex]

Single target behavior

When only one target is specified, strategy has no effect — the message always goes to that target regardless of the strategy setting.

Startup Order

Agents start in topological order based on needs. Agents without dependencies start first, forming tiers of parallel startup. Shutdown happens in reverse order.

agents:
  inbox-watcher:
    role: roles/inbox-watcher.yaml
    sink: { type: delegate, target: triager }
  triager:
    role: roles/triager.yaml
    needs: [inbox-watcher]
    sink: { type: delegate, target: [researcher, responder] }
  researcher:
    role: roles/researcher.yaml
    needs: [triager]
  responder:
    role: roles/responder.yaml
    needs: [triager]
Tier 0:  inbox-watcher          (no dependencies)
Tier 1:  triager                (depends on inbox-watcher)
Tier 2:  researcher, responder  (both depend on triager)

Restart Policies

ConditionRestart when...
noneNever restart
on-failureRestart only if errors were recorded
alwaysRestart whenever the agent thread exits

A health monitor checks every 10 seconds and applies restart policies. In flow mode, it runs as an asyncio task on the shared event loop; in standalone mode, it runs as a thread.

Runtime Architecture

Graph-Based Execution

Since v2026.3.8, flow and team runners use pydantic-graph for orchestration instead of thread-per-agent. Agents are modeled as graph nodes with edges representing delegate sinks. Fan-out, routing, and delegation run as graph steps with native async agent execution.

Since v2026.4.8, tool call start/complete events and a usage event (with token counts and cost) are streamed via SSE for flow runs, matching the agent stream contract. The dashboard displays these in the unified bottom panel with live tool activity.

Flow YAML

  └── pydantic-graph
        ├── Step: agent-a (Tier 0)
        ├── Fork: agent-b, agent-c (Tier 1)
        ├── Join
        └── Step: agent-d (Tier 2)

The graph topology is derived from needs declarations. Sequential chains become linear step sequences, fan-out patterns become fork/join nodes, and sense routing is handled at graph edges. Agent executions run as native async calls within each graph step.

Async Tool Execution

Flow agents build with prefer_async=True, which gives I/O-bound tools (http, web_reader, web_scraper, search) async closures. When the agent runs via PydanticAI's async agent.run(), these tools execute natively on the event loop without thread-pool overhead. Sync tools are auto-wrapped by PydanticAI.

Shutdown Semantics

  1. First Ctrl+C (or SIGTERM) sets the stop event — agents stop accepting new inbox events.
  2. Each agent waits for any in-flight run to complete (grace period, default 30s).
  3. After the grace period, the agent detaches from the in-flight run.
  4. Agent tasks are cancelled on the shared loop.
  5. The ThreadPoolExecutor shuts down, letting in-flight runs complete.
  6. Delegate sinks flush their audit threads.

A second Ctrl+C force-exits immediately.

Executor Pool Sizing

The thread pool for agent runs defaults to min(32, len(agents) + 4). This is configurable via max_agent_workers on FlowOrchestrator.

Shared Memory

When spec.shared_memory.enabled is true, all agents in the flow share a single memory database. One agent's remember() calls become visible to every other agent's recall().

Configuration

spec:
  shared_memory:
    enabled: true
    store_path: null          # default: ~/.initrunner/memory/{name}-shared.db
    max_memories: 1000
    store_backend: lancedb
FieldTypeDefaultDescription
shared_memory.enabledboolfalseEnable shared memory across all agents.
shared_memory.store_pathstr | nullnullPath to the shared memory store. Default: ~/.initrunner/memory/{name}-shared.db.
shared_memory.max_memoriesint1000Maximum number of memories in the shared store.
shared_memory.store_backendstr"lancedb"Store backend.

All agents sharing a memory store must use compatible embedding models (same dimensions). Keep memory.embeddings consistent across roles, or omit it to let all agents derive from their spec.model.provider defaults.

Shared Documents

When spec.shared_documents.enabled is true, all agents in the flow share a single document store. This lets you ingest documents once (e.g. via one agent's ingest config) and have every agent's search_documents tool query the same store.

Unlike shared memory, shared documents requires explicit embedding configuration at the flow level. This prevents embedding model mismatches between roles querying the same store.

Configuration

spec:
  shared_documents:
    enabled: true
    store_path: ./shared-docs.lance   # optional, default: ~/.initrunner/stores/{name}-shared.lance
    embeddings:
      provider: openai                # required when enabled
      model: text-embedding-3-small   # required when enabled
  agents:
    researcher:
      role: roles/researcher.yaml     # has ingest config with sources
    writer:
      role: roles/writer.yaml         # no ingest config needed
FieldTypeDefaultDescription
shared_documents.enabledboolfalseEnable a shared document store across all agents.
shared_documents.store_pathstr | nullnullPath to the shared document store. Default: ~/.initrunner/stores/{name}-shared.lance.
shared_documents.store_backendstr"lancedb"Store backend.
shared_documents.embeddings.providerstr(required when enabled)Embedding provider. Must be set explicitly when enabled: true.
shared_documents.embeddings.modelstr(required when enabled)Embedding model. Must be set explicitly when enabled: true.

How It Works

At startup, apply_shared_documents() patches each agent's role definition:

  • Roles with ingest: configured: the existing store_path, store_backend, and embeddings are overridden with the shared values. All other ingest settings (sources, chunking) are preserved.
  • Roles without ingest:: a minimal IngestConfig is injected with empty sources and the shared store settings. This registers the search_documents retrieval tool so the role can query the shared store without needing its own ingest config.

Shared documents is a flow-time config patch only. It does not run ingestion automatically. Run initrunner ingest against the role that has sources configured to populate the shared store.

Embedding Consistency

The flow definition owns the embedding configuration for the shared store. When shared_documents.enabled is true, both embeddings.provider and embeddings.model must be set explicitly. This is validated at parse time and prevents the situation where different roles derive different embedding models from their spec.model.provider.

Usage Pattern

  1. Configure one role (e.g. researcher) with ingest.sources pointing at your documents.
  2. Enable shared_documents with the same embedding model the researcher would use.
  3. Run initrunner ingest roles/researcher.yaml to populate the shared store.
  4. Start the flow. All agents can now query the shared documents via search_documents.

Systemd Deployment

Install flow pipelines as systemd user services for production:

# Install the unit
initrunner flow install flow.yaml

# Start
initrunner flow start my-pipeline

# Enable on boot
systemctl --user enable initrunner-my-pipeline.service

# Monitor
initrunner flow status my-pipeline
initrunner flow logs my-pipeline -f

Environment Variables

Systemd services don't inherit shell exports. Provide secrets via environment files:

  • {flow_dir}/.env — project-level secrets
  • ~/.initrunner/.env — user-level defaults

Use --generate-env to create a template .env file:

initrunner flow install flow.yaml --generate-env

User Lingering

To keep services running after logout:

loginctl enable-linger $USER

Example: Email Pipeline

inbox-watcher ──> triager ──> researcher

                     └──────> responder
apiVersion: initrunner/v1
kind: Flow
metadata:
  name: email-pipeline
  description: Multi-agent email processing pipeline
spec:
  agents:
    inbox-watcher:
      role: roles/inbox-watcher.yaml
      sink:
        type: delegate
        target: triager
    triager:
      role: roles/triager.yaml
      needs: [inbox-watcher]
      sink:
        type: delegate
        target: [researcher, responder]
        circuit_breaker_threshold: 5
    researcher:
      role: roles/researcher.yaml
      needs: [triager]
    responder:
      role: roles/responder.yaml
      needs: [triager]
      restart: { condition: on-failure, max_retries: 3, delay_seconds: 5 }

Agent Roles

Each agent points to a standalone role YAML. Here are the two key roles in this pipeline:

roles/triager.yaml — routes emails to the right handler:

apiVersion: initrunner/v1
kind: Agent
metadata:
  name: triager
  description: Routes emails to the right handler
spec:
  role: >
    You are an email triage agent. Analyze the email summary and
    determine if it needs research (technical questions, data requests)
    or a direct response (simple inquiries, acknowledgments).
    Output your decision and reasoning clearly.
  model:
    provider: openai
    name: gpt-4o-mini
    temperature: 0.1
  guardrails:
    max_tokens_per_run: 2000
    timeout_seconds: 30

roles/responder.yaml — drafts email responses:

apiVersion: initrunner/v1
kind: Agent
metadata:
  name: responder
  description: Drafts email responses
spec:
  role: >
    You are an email response agent. Given a triaged email that needs
    a direct response, draft a professional, helpful reply. Keep the
    tone friendly and concise.
  model:
    provider: openai
    name: gpt-4o-mini
    temperature: 0.5
  guardrails:
    max_tokens_per_run: 3000
    timeout_seconds: 30

Agent roles are minimal — they focus on a single task and don't need triggers or sinks (the flow file handles routing). This keeps each agent simple and testable independently.

Example: CI Pipeline

A webhook-driven pipeline that processes CI events, diagnoses build failures, and sends notifications.

webhook-receiver ──> build-analyzer ──> notifier

flow.yaml

apiVersion: initrunner/v1
kind: Flow
metadata:
  name: ci-pipeline
  description: CI event processing pipeline
spec:
  agents:
    webhook-receiver:
      role: roles/webhook-receiver.yaml
      sink:
        type: delegate
        target: build-analyzer
    build-analyzer:
      role: roles/build-analyzer.yaml
      needs: [webhook-receiver]
      sink:
        type: delegate
        target: notifier
    notifier:
      role: roles/notifier.yaml
      needs: [build-analyzer]
      restart: { condition: on-failure, max_retries: 3, delay_seconds: 5 }

roles/notifier.yaml

The most interesting agent — it combines Slack messaging with the GitHub commit status API:

apiVersion: initrunner/v1
kind: Agent
metadata:
  name: ci-notifier
  description: Sends Slack notifications and updates GitHub commit status
spec:
  role: |
    You are a CI notification agent. You receive analyzed build events and:

    1. Send a formatted Slack notification:
       - Success: "✅ Build passed — [repo] @ [branch] ([sha])"
       - Failure: "❌ Build failed — [repo] @ [branch] ([sha])\n
         Diagnosis: [diagnosis]\nCategory: [category]"
       - Include the build URL as a link
       - Add a timestamp via get_current_time

    2. Update the GitHub commit status using the create_commit_status API
       endpoint:
       - state: "success" or "failure"
       - description: brief status message
       - context: "ci-pipeline/initrunner"

    Always send both the Slack message and the GitHub status update.
  model:
    provider: openai
    name: gpt-4o-mini
    temperature: 0.0
  tools:
    - type: slack
      webhook_url: "${SLACK_WEBHOOK_URL}"
      default_channel: "#ci-alerts"
      username: CI Pipeline
      icon_emoji: ":construction_worker:"
    - type: api
      name: github-status
      description: GitHub commit status API
      base_url: https://api.github.com
      headers:
        Accept: application/vnd.github.v3+json
      auth:
        Authorization: "Bearer ${GITHUB_TOKEN}"
      endpoints:
        - name: create_commit_status
          method: POST
          path: "/repos/{owner}/{repo}/statuses/{sha}"
          description: Create a commit status check
          parameters:
            - name: owner
              type: string
              required: true
            - name: repo
              type: string
              required: true
            - name: sha
              type: string
              required: true
            - name: state
              type: string
              required: true
              description: "pending, success, failure, or error"
            - name: description
              type: string
              required: false
            - name: context
              type: string
              required: false
              default: "ci-pipeline/initrunner"
          body_template:
            state: "{state}"
            description: "{description}"
            context: "{context}"
          timeout_seconds: 15
    - type: datetime
  guardrails:
    max_tokens_per_run: 15000
    max_tool_calls: 10
    timeout_seconds: 60

Test the webhook

# Start the pipeline
initrunner flow up flow.yaml

# In another terminal, send a test event
curl -X POST http://localhost:9090/ci-webhook \
  -H "Content-Type: application/json" \
  -d '{
    "source": "github-actions",
    "repo": "myorg/myapp",
    "branch": "main",
    "sha": "abc12345",
    "status": "failure",
    "author": "dev@example.com",
    "message": "fix: update auth middleware",
    "url": "https://github.com/myorg/myapp/actions/runs/12345"
  }'

What to notice: The notifier combines two tool types — slack for human-readable alerts and api for machine-readable GitHub status updates. The webhook receiver uses a webhook trigger (port 9090), and the flow file wires all three agents together with delegate sinks.

Example: Support Desk

intake ──[sense]──> researcher | responder | escalator

A support desk pipeline where strategy: sense on the intake's delegate sink auto-routes each message to the best-matching handler — no static fan-out.

flow.yaml

apiVersion: initrunner/v1
kind: Flow
metadata:
  name: support-desk
  description: >
    Support desk pipeline with intelligent auto-routing. An intake agent
    summarizes incoming requests, then sense routing automatically sends
    each request to the right handler -- researcher for technical issues,
    responder for quick answers, or escalator for urgent/complex cases.
    No static fan-out: each message goes to exactly one target.
spec:
  agents:
    intake:
      role: roles/intake.yaml
      sink:
        type: delegate
        # strategy: sense uses keyword scoring + LLM tiebreak to pick the
        # best target for each message. Use "keyword" for zero API calls,
        # or "all" to fan out to every target (default).
        strategy: sense
        target:
          - researcher
          - responder
          - escalator

    researcher:
      role: roles/researcher.yaml
      needs:
        - intake

    responder:
      role: roles/responder.yaml
      needs:
        - intake
      restart:
        condition: on-failure
        max_retries: 3
        delay_seconds: 5

    escalator:
      role: roles/escalator.yaml
      needs:
        - intake

Agent Roles

roles/intake.yaml — receives and summarizes support requests:

apiVersion: initrunner/v1
kind: Agent
metadata:
  name: intake
  description: Receives support requests and summarizes them for triage
  tags:
    - support
    - intake
spec:
  role: >
    You are a support intake agent. When you receive a support request,
    produce a concise summary including: the customer's issue, urgency
    level, and the type of action needed (research, direct response,
    or human escalation). Be factual and brief.
  model:
    provider: openai
    name: gpt-5-mini
    temperature: 0.1
  guardrails:
    max_tokens_per_run: 1000
    timeout_seconds: 30

roles/researcher.yaml — investigates technical issues:

apiVersion: initrunner/v1
kind: Agent
metadata:
  name: researcher
  description: Investigates technical issues and gathers diagnostic information
  tags:
    - research
    - analysis
    - investigation
    - technical
    - diagnose
spec:
  role: >
    You are a technical research agent for a support desk. When you
    receive a triaged support request that requires investigation,
    research the issue thoroughly. Produce a structured report with:
    root cause analysis, relevant documentation references, and
    recommended resolution steps.
  model:
    provider: openai
    name: gpt-5-mini
    temperature: 0.3
  guardrails:
    max_tokens_per_run: 4000
    timeout_seconds: 60
initrunner flow up flow.yaml

What to notice: The strategy: sense on the intake's delegate sink means each message is scored against the three targets' role metadata (name, description, tags). Because the tags are non-overlapping — researcher uses [research, analysis, investigation, technical, diagnose] while responder and escalator cover different domains — keyword scoring alone resolves most messages without an LLM call. See Routing Strategy for details.

Example: Content Pipeline

content-watcher ──> researcher ──> writer

                        └──────> reviewer

Uses process_existing: true on the file watch trigger to handle files already in the directory on startup. See Triggers for details.

See also: Team Mode for single-file multi-persona collaboration — simpler than Flow when you need multiple perspectives on the same task rather than independent agents.

On this page