Event-stream engine

agentgrep’s search and find engines produce typed event streams — sync generators that yield pydantic discriminated-union events as they walk the user’s stores. The same producer feeds the CLI’s live output path, the Textual TUI’s worker, and the MCP server’s response collector. Three frontends, one engine.

Why a stream

A short scan completes before the user notices. A long one — broad patterns, deep history, slow stores — can take seconds. The legacy list-return path (agentgrep.run_search_query) buffers every match until the scan finishes, then returns the list. That hides the engine’s progress from the consumer and forces a “wait, then dump” UX in the CLI.

The event stream solves both:

  • Per-record liveness. Each match emits as RecordEmitted the moment the engine decides “unique-and-included.” The CLI grep / find text paths consume the stream and print + flush per record; users see the first matches within milliseconds.

  • Single source of truth. Search progress (which source is active, how many records seen / matched) and the matches themselves are the same event stream, not two parallel side channels.

  • Decoupling. The engine doesn’t know about stdout, Textual, or fastmcp. It yields events. Consumers translate.

Architecture

┌────────────────────────────────────────────────────────────────┐
│                   PRODUCER  (agentgrep._engine)                │
│                                                                │
│   def iter_search_events(home, query, *, control=None)         │
│       -> Iterator[SearchEvent]:                                │
│                                                                │
│     yield SearchStarted(source_count=...)                      │
│     for source in discovered:                                  │
│         yield SourceStarted(adapter_id, index, total)          │
│         for record in iter_source_records(source):             │
│             if matches(record, query) and unique:              │
│                 yield RecordEmitted(record=record)             │
│         yield SourceFinished(adapter_id, records_seen, ...)    │
│     yield SearchFinished(match_count, elapsed_seconds)         │
└──────────────────────────┬─────────────────────────────────────┘
                           │
       ┌───────────────────┼───────────────────┐
       ▼                   ▼                   ▼
┌──────────────┐  ┌──────────────────┐  ┌──────────────────┐
│  CLI (sync)  │  │  TUI (Textual)   │  │   MCP (sync)     │
│              │  │                  │  │                  │
│ for ev in    │  │ @work(thread=    │  │ list(records     │
│   stream:    │  │  True) consumes  │  │   for ev in      │
│   if Record  │  │  via to_thread   │  │   stream if      │
│      print() │  │                  │  │   isinstance     │
│      flush() │  │                  │  │   RecordEmitted) │
└──────────────┘  └──────────────────┘  └──────────────────┘

Sync producer

The engine is a synchronous generator. Async consumers wrap it in asyncio.to_thread() with one line; sync consumers iterate directly. Tests exercise the producer without an event loop, which keeps the test surface small.

Pydantic events

Events are frozen pydantic.BaseModel subclasses tagged with a Literal["..."] discriminator field. The union types SearchEvent and FindEvent carry pydantic.Field(discriminator="type") so runtime validation routes each payload to the correct variant and isinstance narrowing works in consumer loops.

Events embed agentgrep’s existing SearchRecord / FindRecord dataclasses directly via arbitrary_types_allowed=True. Consumers read record attributes without an extra conversion step. Transport- layer consumers (a future HTTP SSE endpoint, for example) should serialise records through SearchRecordModel / FindRecordModel at the boundary so the dataclass-typed field doesn’t block model_dump_json().

Search events

The SearchEvent union has five members. Their guaranteed sequence:

SearchStarted → (SourceStarted → RecordEmitted* → SourceFinished)* → SearchFinished
  • SearchStarted — exactly once at the head. Carries source_count (the number of candidate sources after prefiltering).

  • SourceStarted — once per source, in source-discovery order (mtime descending). Carries adapter_id, index, total.

  • RecordEmitted — the hot-path event. Fires only after the per-session dedup decided unique-and-included.

  • SourceFinished — once per source, paired with its SourceStarted. Carries records_seen (every record parsed) and matches_seen (the subset that matched before dedup).

  • SearchFinished — exactly once at the tail. Carries match_count (total emitted) and elapsed_seconds.

Even on empty input the Started / Finished envelope fires so cleanup code is uniform.

Find events

The FindEvent union has three members. Find has no per-source scan loop — each discovered source produces exactly one record — so the sequence simplifies:

FindStarted → FindRecordEmitted* → FindFinished
  • FindStarted

  • FindRecordEmitted

  • FindFinished

Consumer recipes

Collect to a list (the MCP / TUI snapshot pattern)

import agentgrep
from agentgrep import events


def collect_records(home, query):
    return [
        event.record
        for event in agentgrep.iter_search_events(home, query)
        if isinstance(event, events.RecordEmitted)
    ]

Update a UI as events arrive (the Textual TUI pattern)

import asyncio
import agentgrep
from agentgrep import events


async def update_ui(home, query, render_record):
    def _drain() -> list[events.SearchEvent]:
        return list(agentgrep.iter_search_events(home, query))
    for event in await asyncio.to_thread(_drain):
        if isinstance(event, events.RecordEmitted):
            render_record(event.record)

For finer-grained live updates inside Textual, run the generator on a @work(thread=True)-decorated method and post a message per event rather than draining first.

Cancel mid-scan

Pass a SearchControl and flip its request_answer_now() flag to break out at the next per-record boundary:

control = agentgrep.SearchControl()

# … on a keypress / timeout / user action:
control.request_answer_now()

The generator still emits SearchFinished so cleanup runs.

Slice boundaries

This page documents Slice 1 — the sync iterator surface used by the CLI’s live streaming. Two follow-up slices are planned:

  • Slice 2: an aiter_search_events async wrapper that bridges the sync producer via a bounded asyncio.Queue and a thread- backed producer task. Cancellation propagates through CancelledError. The TUI moves to the async surface; the CLI keeps using the sync iterator.

  • Slice 3: source-level parallelism via asyncio.TaskGroup over asyncio.to_thread(parse_source, src). Each source’s events merge into a single output stream via the queue. Cancellation propagates through task cancel.

Both slices preserve the public event surface — consumers written today continue to work without changes.

Reference

The events module’s full API is documented at agentgrep.events. The iterators are at agentgrep.iter_search_events() and agentgrep.iter_find_events().