Stage dependency graph for the pipeline.
Each stage declares its module, the files it produces, and which stages
must run before it (depends_on). The orchestrator uses this graph to
determine valid execution order and to parallelise independent stages.
Stage
dataclass
A single pipeline stage.
Source code in src/stages.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | @dataclass(frozen=True)
class Stage:
"""A single pipeline stage."""
name: str
module: str
description: str
depends_on: tuple[str, ...] = ()
optional: bool = False
outputs: tuple[str, ...] = ()
#: Files this stage reads. When non-empty the stage participates in the
#: content-hash skip cache (see :mod:`src.cache`). Paths are resolved
#: relative to the pipeline ``output_dir`` unless absolute.
inputs: tuple[str, ...] = ()
#: Maximum age (seconds) of a cached result before the stage must re-run,
#: even when input hashes have not changed. ``None`` means the cache
#: never expires by time alone (content-hash only). Useful for stages
#: that fetch live data (GitHub stats, citation counts) whose per-URL
#: caches have their own TTL.
ttl: int | None = None
|
topological_order(stages: tuple[Stage, ...] = STAGES) -> list[Stage]
Return stages in a valid execution order (Kahn's algorithm).
Source code in src/stages.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225 | def topological_order(stages: tuple[Stage, ...] = STAGES) -> list[Stage]:
"""Return stages in a valid execution order (Kahn's algorithm)."""
stage_map = {s.name: s for s in stages}
in_degree = {s.name: 0 for s in stages}
dependants: dict[str, list[str]] = {s.name: [] for s in stages}
for s in stages:
for dep in s.depends_on:
if dep in stage_map:
in_degree[s.name] += 1
dependants[dep].append(s.name)
queue = [name for name, deg in in_degree.items() if deg == 0]
order: list[Stage] = []
while queue:
# Sort for deterministic output
queue.sort()
name = queue.pop(0)
order.append(stage_map[name])
for child in dependants[name]:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
if len(order) != len(stages):
executed = {s.name for s in order}
missing = {s.name for s in stages} - executed
msg = f"Cycle detected in stage dependencies: {missing}"
raise ValueError(msg)
return order
|
parallel_groups(stages: tuple[Stage, ...] = STAGES) -> list[list[Stage]]
Return stages grouped into parallel tiers.
Each tier contains stages whose dependencies are fully satisfied by
earlier tiers, so all stages within a tier can execute concurrently.
Source code in src/stages.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257 | def parallel_groups(stages: tuple[Stage, ...] = STAGES) -> list[list[Stage]]:
"""Return stages grouped into parallel tiers.
Each tier contains stages whose dependencies are fully satisfied by
earlier tiers, so all stages within a tier can execute concurrently.
"""
stage_map = {s.name: s for s in stages}
in_degree = {s.name: 0 for s in stages}
dependants: dict[str, list[str]] = {s.name: [] for s in stages}
for s in stages:
for dep in s.depends_on:
if dep in stage_map:
in_degree[s.name] += 1
dependants[dep].append(s.name)
groups: list[list[Stage]] = []
current = sorted([name for name, deg in in_degree.items() if deg == 0])
while current:
groups.append([stage_map[name] for name in current])
next_tier: list[str] = []
for name in current:
for child in dependants[name]:
in_degree[child] -= 1
if in_degree[child] == 0:
next_tier.append(child)
current = sorted(next_tier)
return groups
|