Skip to content

enrich_affiliations_author_search

Affiliation enrichment via co-author bridge in OpenAlex.

Strategy
  1. For an unaffiliated author X, find their co-authors from our dataset.
  2. Search each co-author in OpenAlex to get their OpenAlex author ID.
  3. Scan that co-author's works for any paper listing X by name → extract X's OpenAlex author ID.
  4. Validate the matched ID: check that X's OpenAlex works contain at least one paper title from our dataset (confirms correct disambiguation).
  5. Fetch X's own works sorted by publication_year desc and return the institution from their most recent paper that has one.

This avoids false positives from common names (e.g. "Kai Ye" matching a medical researcher) by requiring the found author's profile to overlap with our known paper titles.

Usage

python -m src.enrichers.enrich_affiliations_author_search --authors_file output/staging/_data/authors.yml --papers_file output/staging/assets/data/paper_authors_map.json [--output_file output/staging/_data/authors.yml] [--max_authors 100][--verbose] [--dry_run]

resolve_via_coauthor_bridge(session: requests.Session, target_name: str, coauthor_names: list[str], verbose: bool = False) -> Optional[str]

For an unaffiliated author, find their OpenAlex ID via multiple co-authors' works. Require consensus: the same OpenAlex ID must be found via at least 2 independent co-authors (or 1 if the author has fewer than 2 co-authors in our dataset). Then return the most recent affiliation from that profile.

Source code in src/enrichers/enrich_affiliations_author_search.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def resolve_via_coauthor_bridge(
    session: requests.Session,
    target_name: str,
    coauthor_names: list[str],
    verbose: bool = False,
) -> Optional[str]:
    """
    For an unaffiliated author, find their OpenAlex ID via multiple co-authors'
    works. Require consensus: the same OpenAlex ID must be found via at least
    2 independent co-authors (or 1 if the author has fewer than 2 co-authors
    in our dataset). Then return the most recent affiliation from that profile.
    """
    # Collect votes: OpenAlex ID → set of co-authors that found it
    id_votes: dict[str, set[str]] = {}
    probed = 0

    for coauthor in coauthor_names:
        if probed >= _MAX_COAUTHORS_TO_PROBE:
            break

        # Find co-author's OpenAlex ID
        oa_id = _find_coauthor_openalex_id(session, coauthor)
        if not oa_id:
            continue

        probed += 1

        # Find target author's OpenAlex ID from co-author's papers
        target_oa_id = _find_author_openalex_id_via_coauthor(session, target_name, oa_id)
        if not target_oa_id:
            continue

        id_votes.setdefault(target_oa_id, set()).add(coauthor)

        # Early exit: if we already have consensus, no need to check more
        if len(id_votes[target_oa_id]) >= _MIN_CONSENSUS:
            break

    if not id_votes:
        return None

    # Pick the ID with the most votes
    best_id = max(id_votes, key=lambda k: len(id_votes[k]))
    vote_count = len(id_votes[best_id])

    # Require consensus unless the author has very few co-authors
    min_required = min(_MIN_CONSENSUS, len(coauthor_names))
    if vote_count < min_required:
        if verbose:
            logger.info(
                f"      Bridge: {target_name} no consensus (best={best_id}, votes={vote_count}, need={min_required})"
            )
        return None

    # Get most recent affiliation from the consensus-confirmed profile
    institution = _get_most_recent_affiliation(session, best_id)
    if institution and verbose:
        voters = sorted(id_votes[best_id])
        logger.info(f"      Bridge: {target_name} (OA:{best_id}) confirmed by {voters} -> {institution}")
    return institution

enrich(authors_file: str, papers_file: str, output_file: Optional[str] = None, max_authors: Optional[int] = None, verbose: bool = False, dry_run: bool = False, data_dir: Optional[str] = None) -> dict

Main entry point. Returns stats dict.

Source code in src/enrichers/enrich_affiliations_author_search.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def enrich(
    authors_file: str,
    papers_file: str,
    output_file: Optional[str] = None,
    max_authors: Optional[int] = None,
    verbose: bool = False,
    dry_run: bool = False,
    data_dir: Optional[str] = None,
) -> dict:
    """Main entry point. Returns stats dict."""
    output_file = output_file or authors_file

    logger.info("Loading paper-authors map...")
    author_papers = _build_author_papers_index(papers_file)
    logger.info(f"  {len(author_papers)} unique author names")

    logger.info("Parsing authors.yml...")
    authors = _parse_authors_yml_fast(authors_file)
    total = len(authors)
    candidates = [a for a in authors if not a.get("affiliation")]
    logger.info(f"  {total} total, {len(candidates)} missing affiliations")

    if max_authors:
        candidates = candidates[:max_authors]
        logger.info(f"  Processing first {len(candidates)} (--max_authors)")

    # HTTP session
    session = create_session()
    http_proxy = os.environ.get("http_proxy") or os.environ.get("HTTP_PROXY", "")
    https_proxy = os.environ.get("https_proxy") or os.environ.get("HTTPS_PROXY", "")
    if https_proxy or http_proxy:
        session.proxies = {"http": http_proxy, "https": https_proxy}

    # Optional author index
    index_by_name = {}
    _update_index_fn = None
    _save_index_fn = None
    if data_dir:
        try:
            from src.utils.normalization.author_index import (
                load_author_index,
                save_author_index,
                update_author_affiliation,
            )

            _, index_by_name = load_author_index(data_dir)
            _update_index_fn = update_author_affiliation

            def _save_index_fn():
                return save_author_index(data_dir, sorted(index_by_name.values(), key=lambda e: e["id"]))
        except ImportError:
            pass

    stats = {"total": total, "candidates": len(candidates), "found": 0, "not_found": 0, "errors": 0}
    updates: dict[str, str] = {}

    logger.info(f"\nEnriching {len(candidates)} authors via co-author bridge...")
    logger.info("=" * 70)

    for idx, author in enumerate(candidates, 1):
        name = author.get("name", "")
        if not name:
            continue

        try:
            coauthors = _get_coauthors(name, author_papers)
            if verbose:
                logger.info(f"[{idx}/{len(candidates)}] {name} ({len(coauthors)} co-authors)")

            affiliation = resolve_via_coauthor_bridge(session, name, coauthors, verbose)

            if affiliation:
                stats["found"] += 1
                updates[name] = affiliation
                if name in index_by_name and _update_index_fn:
                    _update_index_fn(index_by_name[name], affiliation, "coauthor_bridge")
                if not verbose:
                    logger.info(f"[{idx}/{len(candidates)}] {name:40s}  +  {affiliation[:50]}")
            else:
                stats["not_found"] += 1
                if not verbose:
                    logger.info(f"[{idx}/{len(candidates)}] {name:40s}  -")
        except Exception:
            stats["errors"] = stats.get("errors", 0) + 1
            logger.warning(f"[{idx}/{len(candidates)}] {name}: error", exc_info=True)

    logger.info("=" * 70)
    logger.info(f"\nResults: found {stats['found']}, not found {stats['not_found']}")

    if not dry_run and updates:
        logger.info(f"\nWriting {len(updates)} updates to {output_file} ...")
        replaced = _update_authors_yml(output_file, updates)
        logger.info(f"  {replaced} lines updated.")
        if _save_index_fn and index_by_name:
            _save_index_fn()
    elif dry_run:
        logger.info(f"\n[DRY RUN] Would update {len(updates)} authors.")

    stats["updates_written"] = len(updates) if not dry_run else 0
    return stats