Skip to content

cache

src.cache

Content-hash skip cache for idempotent stages.

A stage is skipped when: 1. Every declared input file exists and has the same SHA-256 as the last successful run. 2. The stage's module source file has the same SHA-256 as last time. 3. Every declared output exists. 4. If the stage declares a ttl (seconds), the cache entry must be younger than that. This forces periodic re-runs for stages that fetch live data (e.g. GitHub stats) whose per-URL caches expire.

Cache entries live in <output_dir>/_build/.cache/<stage>.hash as plain text — one key=value line each — so they survive cleanly across pipeline runs but are easy to inspect or delete by hand.

Only stages that opt in (by declaring inputs in :class:src.stages.Stage) are eligible for skipping.

compute_key(stage: Stage, output_dir: Path) -> str | None

Return a content-hash key for stage or None if any input is missing.

Source code in src/cache.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def compute_key(stage: Stage, output_dir: Path) -> str | None:
    """Return a content-hash key for ``stage`` or ``None`` if any input is missing."""
    h = hashlib.sha256()
    h.update(stage.name.encode())

    src = _module_source_path(stage.module)
    if src is None or not src.is_file():
        return None
    h.update(b"code:")
    h.update(_hash_file(src).encode())

    for path in _resolve_paths(stage.inputs, output_dir):
        if not path.is_file():
            return None
        h.update(b"in:")
        h.update(str(path).encode())
        h.update(b"=")
        h.update(_hash_file(path).encode())

    return h.hexdigest()

should_skip(stage: Stage, output_dir: Path) -> bool

Return True if a previous run with identical inputs already produced every declared output file.

When the stage declares a ttl, the cache entry must also be younger than that many seconds — otherwise the stage re-runs even if inputs are unchanged (so that live-data stages periodically refresh).

Source code in src/cache.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def should_skip(stage: Stage, output_dir: Path) -> bool:
    """Return True if a previous run with identical inputs already produced
    every declared output file.

    When the stage declares a ``ttl``, the cache entry must also be younger
    than that many seconds — otherwise the stage re-runs even if inputs are
    unchanged (so that live-data stages periodically refresh).
    """
    if not stage.inputs:
        return False
    cache_file = _cache_file(stage, output_dir)
    if not cache_file.is_file():
        return False

    # TTL check: if the cache entry is older than the stage's TTL, force re-run.
    if stage.ttl is not None:
        age = time.time() - cache_file.stat().st_mtime
        if age >= stage.ttl:
            logger.info(
                "↻ %s: cache expired (%.1f days old, ttl=%.1f days)",
                stage.name,
                age / 86400,
                stage.ttl / 86400,
            )
            return False

    key = compute_key(stage, output_dir)
    if key is None or cache_file.read_text().strip() != key:
        return False
    for path in _resolve_paths(stage.outputs, output_dir):
        # Directory outputs (trailing /) just need to exist.
        if path.suffix == "" and str(path).endswith("/"):
            if not path.is_dir():
                return False
        elif not path.exists():
            return False
    return True

mark_done(stage: Stage, output_dir: Path) -> None

Record a successful run so a subsequent identical run can be skipped.

Source code in src/cache.py
128
129
130
131
132
133
134
135
136
137
def mark_done(stage: Stage, output_dir: Path) -> None:
    """Record a successful run so a subsequent identical run can be skipped."""
    if not stage.inputs:
        return
    key = compute_key(stage, output_dir)
    if key is None:
        return
    cache_file = _cache_file(stage, output_dir)
    cache_file.parent.mkdir(parents=True, exist_ok=True)
    cache_file.write_text(key + "\n")

invalidate(stage: Stage, output_dir: Path) -> None

Remove the cache entry for stage, forcing a re-run next time.

Source code in src/cache.py
140
141
142
143
144
def invalidate(stage: Stage, output_dir: Path) -> None:
    """Remove the cache entry for ``stage``, forcing a re-run next time."""
    cache_file = _cache_file(stage, output_dir)
    if cache_file.is_file():
        cache_file.unlink()