Skip to content

stages

src.stages

Stage dependency graph for the pipeline.

Each stage declares its module, the files it produces, and which stages must run before it (depends_on). The orchestrator uses this graph to determine valid execution order and to parallelise independent stages.

Stage dataclass

A single pipeline stage.

Source code in src/stages.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@dataclass(frozen=True)
class Stage:
    """A single pipeline stage."""

    name: str
    module: str
    description: str
    depends_on: tuple[str, ...] = ()
    optional: bool = False
    outputs: tuple[str, ...] = ()
    #: Files this stage reads. When non-empty the stage participates in the
    #: content-hash skip cache (see :mod:`src.cache`).  Paths are resolved
    #: relative to the pipeline ``output_dir`` unless absolute.
    inputs: tuple[str, ...] = ()
    #: Maximum age (seconds) of a cached result before the stage must re-run,
    #: even when input hashes have not changed.  ``None`` means the cache
    #: never expires by time alone (content-hash only).  Useful for stages
    #: that fetch live data (GitHub stats, citation counts) whose per-URL
    #: caches have their own TTL.
    ttl: int | None = None

topological_order(stages: tuple[Stage, ...] = STAGES) -> list[Stage]

Return stages in a valid execution order (Kahn's algorithm).

Source code in src/stages.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def topological_order(stages: tuple[Stage, ...] = STAGES) -> list[Stage]:
    """Return stages in a valid execution order (Kahn's algorithm)."""
    stage_map = {s.name: s for s in stages}
    in_degree = {s.name: 0 for s in stages}
    dependants: dict[str, list[str]] = {s.name: [] for s in stages}

    for s in stages:
        for dep in s.depends_on:
            if dep in stage_map:
                in_degree[s.name] += 1
                dependants[dep].append(s.name)

    queue = [name for name, deg in in_degree.items() if deg == 0]
    order: list[Stage] = []

    while queue:
        # Sort for deterministic output
        queue.sort()
        name = queue.pop(0)
        order.append(stage_map[name])
        for child in dependants[name]:
            in_degree[child] -= 1
            if in_degree[child] == 0:
                queue.append(child)

    if len(order) != len(stages):
        executed = {s.name for s in order}
        missing = {s.name for s in stages} - executed
        msg = f"Cycle detected in stage dependencies: {missing}"
        raise ValueError(msg)

    return order

parallel_groups(stages: tuple[Stage, ...] = STAGES) -> list[list[Stage]]

Return stages grouped into parallel tiers.

Each tier contains stages whose dependencies are fully satisfied by earlier tiers, so all stages within a tier can execute concurrently.

Source code in src/stages.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def parallel_groups(stages: tuple[Stage, ...] = STAGES) -> list[list[Stage]]:
    """Return stages grouped into parallel tiers.

    Each tier contains stages whose dependencies are fully satisfied by
    earlier tiers, so all stages within a tier can execute concurrently.
    """
    stage_map = {s.name: s for s in stages}
    in_degree = {s.name: 0 for s in stages}
    dependants: dict[str, list[str]] = {s.name: [] for s in stages}

    for s in stages:
        for dep in s.depends_on:
            if dep in stage_map:
                in_degree[s.name] += 1
                dependants[dep].append(s.name)

    groups: list[list[Stage]] = []
    current = sorted([name for name, deg in in_degree.items() if deg == 0])

    while current:
        groups.append([stage_map[name] for name in current])
        next_tier: list[str] = []
        for name in current:
            for child in dependants[name]:
                in_degree[child] -= 1
                if in_degree[child] == 0:
                    next_tier.append(child)
        current = sorted(next_tier)

    return groups