Snapshot summaries for pipeline output stability verification.
A "snapshot" is a compact JSON file that records deterministic statistics
about every pipeline output: record counts, numeric field ranges, score
sums, and SHA-256 checksums of the serialised data. Two snapshots taken
from the same code + same input should be identical.
Usage::
# Create / update the reference snapshot
python -m src.snapshot --output_dir output/staging --update
# Compare current output against the saved snapshot (exit 1 on diff)
python -m src.snapshot --output_dir output/staging
The reference snapshot lives at tests/snapshots/pipeline_snapshot.json.
It is committed to the repo so that every PR shows exactly which numbers
changed.
MonotonicityViolation
A single monotonicity violation.
Source code in src/snapshot.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268 | class MonotonicityViolation:
"""A single monotonicity violation."""
def __init__(self, file: str, check: str, message: str, *, severity: str = "error") -> None:
self.file = file
self.check = check
self.message = message
self.severity = severity
def __str__(self) -> str:
return f"[{self.severity.upper()}] {self.file}: {self.check} — {self.message}"
def __repr__(self) -> str:
return f"MonotonicityViolation({self.file!r}, {self.check!r}, {self.message!r})"
|
create_summary(output_dir: Path) -> dict
Scan pipeline output and return a snapshot summary dict.
The dict maps relative_path -> {sha256, record_count, ...}.
Source code in src/snapshot.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 | def create_summary(output_dir: Path) -> dict:
"""Scan pipeline output and return a snapshot summary dict.
The dict maps ``relative_path -> {sha256, record_count, ...}``.
"""
summary: dict = {"_version": 1, "files": {}}
# JSON files in assets/data/
json_dir = output_dir / "assets" / "data"
if json_dir.is_dir():
for f in sorted(json_dir.glob("*.json")):
key = f"assets/data/{f.name}"
try:
summary["files"][key] = _summarise_json(f)
except Exception as exc:
logger.warning("Failed to summarise %s: %s", key, exc)
# YAML files in _data/
yaml_dir = output_dir / "_data"
if yaml_dir.is_dir():
for f in sorted(yaml_dir.glob("*.yml")):
key = f"_data/{f.name}"
try:
summary["files"][key] = _summarise_yaml(f)
except Exception as exc:
logger.warning("Failed to summarise %s: %s", key, exc)
# Chart SVGs — just count + total size (content-level diffing not useful)
charts_dir = output_dir / "assets" / "charts"
if charts_dir.is_dir():
svgs = sorted(charts_dir.glob("*.svg"))
summary["files"]["assets/charts/"] = {
"chart_count": len(svgs),
"total_bytes": sum(f.stat().st_size for f in svgs),
}
return summary
|
compare_summaries(old: dict, new: dict) -> list[str]
Compare two snapshot dicts and return a list of human-readable diffs.
Returns an empty list if the snapshots are identical.
Source code in src/snapshot.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217 | def compare_summaries(old: dict, new: dict) -> list[str]:
"""Compare two snapshot dicts and return a list of human-readable diffs.
Returns an empty list if the snapshots are identical.
"""
diffs: list[str] = []
old_files = old.get("files", {})
new_files = new.get("files", {})
all_keys = sorted(set(old_files) | set(new_files))
for key in all_keys:
if key not in old_files:
diffs.append(f"+ {key}: NEW file (not in reference snapshot)")
continue
if key not in new_files:
diffs.append(f"- {key}: REMOVED (was in reference snapshot)")
continue
o, n = old_files[key], new_files[key]
# Record count changes
for count_field in ("record_count", "key_count", "chart_count"):
ov = o.get(count_field)
nv = n.get(count_field)
if ov is not None and nv is not None and ov != nv:
delta = nv - ov
sign = "+" if delta > 0 else ""
diffs.append(f" {key}: {count_field} {ov} → {nv} ({sign}{delta})")
# Numeric field changes (scores, rates)
o_num = o.get("numeric", {})
n_num = n.get("numeric", {})
for field in sorted(set(o_num) | set(n_num)):
os_ = o_num.get(field, {})
ns_ = n_num.get(field, {})
for stat in ("sum", "min", "max", "mean", "count"):
ov = os_.get(stat)
nv = ns_.get(stat)
if ov is not None and nv is not None and ov != nv:
diffs.append(f" {key}: {field}.{stat} {ov} → {nv}")
# Content hash change (catch-all: if sha256 changed but nothing above triggered)
if o.get("sha256") and n.get("sha256") and o["sha256"] != n["sha256"]:
# Only flag if we haven't already listed specific changes for this file
prefix = f" {key}: "
file_specific = [d for d in diffs if d.startswith(prefix)]
if not file_specific:
diffs.append(f" {key}: content changed (sha256 differs)")
return diffs
|
check_monotonicity(old: dict, new: dict) -> list[MonotonicityViolation]
Compare two snapshots and flag any fields that decreased unexpectedly.
Returns a list of :class:MonotonicityViolation objects (empty = OK).
Designed to catch regressions like:
- A conference or author disappearing due to a scraper bug
- Badge/artifact counts dropping because of a parser change
- Record counts shrinking when they should be append-only
Source code in src/snapshot.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349 | def check_monotonicity(old: dict, new: dict) -> list[MonotonicityViolation]:
"""Compare two snapshots and flag any fields that decreased unexpectedly.
Returns a list of :class:`MonotonicityViolation` objects (empty = OK).
Designed to catch regressions like:
- A conference or author disappearing due to a scraper bug
- Badge/artifact counts dropping because of a parser change
- Record counts shrinking when they should be append-only
"""
violations: list[MonotonicityViolation] = []
old_files = old.get("files", {})
new_files = new.get("files", {})
# ── record_count must not decrease ───────────────────────────────────
for key in _MONOTONIC_RECORD_COUNT:
o = old_files.get(key, {})
n = new_files.get(key, {})
ov = o.get("record_count")
nv = n.get("record_count")
if ov is not None and nv is not None and nv < ov:
violations.append(
MonotonicityViolation(
key,
"record_count",
f"decreased from {ov} to {nv} (lost {ov - nv} records)",
)
)
# ── numeric sums must not decrease ───────────────────────────────────
for key, fields in _MONOTONIC_SUMS.items():
o_num = old_files.get(key, {}).get("numeric", {})
n_num = new_files.get(key, {}).get("numeric", {})
for field in fields:
ov = o_num.get(field, {}).get("sum")
nv = n_num.get(field, {}).get("sum")
if ov is not None and nv is not None and nv < ov:
violations.append(
MonotonicityViolation(
key,
f"{field}.sum",
f"decreased from {ov} to {nv}",
)
)
# ── author/entity names must not vanish ──────────────────────────────
for key in _MONOTONIC_NAMES:
o_names = set(old_files.get(key, {}).get("names", []))
n_names = set(new_files.get(key, {}).get("names", []))
if o_names and n_names:
vanished = o_names - n_names
if vanished:
sample = sorted(vanished)[:5]
extra = f" (and {len(vanished) - 5} more)" if len(vanished) > 5 else ""
violations.append(
MonotonicityViolation(
key,
"names",
f"{len(vanished)} name(s) vanished: {sample}{extra}",
)
)
# ── dict-level numeric keys must not decrease ────────────────────────
for key, fields in _MONOTONIC_DICT_NUMERIC.items():
o_dn = old_files.get(key, {}).get("dict_numeric", {})
n_dn = new_files.get(key, {}).get("dict_numeric", {})
for field in fields:
ov = o_dn.get(field)
nv = n_dn.get(field)
if ov is not None and nv is not None and nv < ov:
violations.append(
MonotonicityViolation(
key,
field,
f"decreased from {ov} to {nv}",
)
)
return violations
|
save_snapshot(summary: dict, path: Path | None = None) -> Path
Write a snapshot summary to disk.
Source code in src/snapshot.py
352
353
354
355
356
357
358 | def save_snapshot(summary: dict, path: Path | None = None) -> Path:
"""Write a snapshot summary to disk."""
path = path or SNAPSHOT_PATH
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n")
logger.info("Snapshot saved to %s", path)
return path
|
load_snapshot(path: Path | None = None) -> dict | None
Load a snapshot from disk, or None if it doesn't exist.
Source code in src/snapshot.py
361
362
363
364
365
366
367 | def load_snapshot(path: Path | None = None) -> dict | None:
"""Load a snapshot from disk, or None if it doesn't exist."""
path = path or SNAPSHOT_PATH
if not path.is_file():
return None
result: dict = json.loads(path.read_text())
return result
|