Skip to content

enrich_affiliations_csrankings

src.enrichers.enrich_affiliations_csrankings

Enrich author affiliations using CSRankings data.

CSRankings (http://csrankings.org) maintains a comprehensive database of computer science faculty affiliations. This script downloads the official csrankings.csv file and matches our authors to their faculty records.

download_csrankings(force_refresh: bool = False, verbose: bool = False) -> Path

Download CSRankings CSV file with caching.

Source code in src/enrichers/enrich_affiliations_csrankings.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def download_csrankings(force_refresh: bool = False, verbose: bool = False) -> Path:
    """Download CSRankings CSV file with caching."""
    CACHE_DIR.mkdir(parents=True, exist_ok=True)

    # Check cache freshness
    if CACHE_FILE.exists() and not force_refresh:
        age_days = (time.time() - CACHE_FILE.stat().st_mtime) / _SECONDS_PER_DAY
        if age_days < CACHE_TTL_DAYS:
            if verbose:
                logger.info(f"Using cached CSRankings data (age: {age_days:.1f} days)")
            return CACHE_FILE

    # Download fresh data
    if verbose:
        logger.info(f"Downloading CSRankings data from {CSRANKINGS_URL}...")

    # Support proxy environment variables
    proxies = {}
    if os.environ.get("http_proxy"):
        proxies["http"] = os.environ["http_proxy"]
    if os.environ.get("https_proxy"):
        proxies["https"] = os.environ["https_proxy"]

    try:
        response = requests.get(CSRANKINGS_URL, proxies=proxies, timeout=60)
        response.raise_for_status()

        CACHE_FILE.write_text(response.text, encoding="utf-8")
        if verbose:
            logger.info(f"Downloaded {len(response.text)} bytes to {CACHE_FILE}")

        return CACHE_FILE
    except Exception as e:
        if CACHE_FILE.exists():
            logger.warning(f"Download failed ({e}), using stale cache")
            return CACHE_FILE
        raise

load_csrankings(csv_path: Path, verbose: bool = False) -> dict[str, list[dict]]

Load CSRankings CSV and build name lookup index. Returns dict mapping normalized names to list of possible records.

Source code in src/enrichers/enrich_affiliations_csrankings.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def load_csrankings(csv_path: Path, verbose: bool = False) -> dict[str, list[dict]]:
    """
    Load CSRankings CSV and build name lookup index.
    Returns dict mapping normalized names to list of possible records.
    """
    name_index = defaultdict(list)

    with open(csv_path, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            name = row.get("name", "").strip()
            affiliation = row.get("affiliation", "").strip()

            if not name or not affiliation:
                continue

            # Store record with multiple name variations for matching
            record = {
                "name": name,
                "affiliation": affiliation,
                "homepage": row.get("homepage", "").strip(),
                "scholarid": row.get("scholarid", "").strip(),
                "orcid": row.get("orcid", "").strip(),
            }

            # Index by normalized full name
            normalized_name = normalize_name(name)
            name_index[normalized_name].append(record)

            # Also index by last name for partial matching
            parts = name.split()
            if len(parts) >= 2:
                last_name = parts[-1].lower()
                name_index[f"lastname:{last_name}"].append(record)

    if verbose:
        logger.info(
            f"Loaded {len([r for records in name_index.values() for r in records if 'lastname:' not in r])} CSRankings records"
        )

    return name_index

normalize_name(name: str) -> str

Normalize name for matching: lowercase, remove punctuation.

Source code in src/enrichers/enrich_affiliations_csrankings.py
115
116
117
118
def normalize_name(name: str) -> str:
    """Normalize name for matching: lowercase, remove punctuation."""
    name = re.sub(r"\s*\[[^\]]+\]\s*", " ", name)
    return "".join(c.lower() for c in name if c.isalnum() or c.isspace()).strip()

fuzzy_name_match(author_name: str, csrankings_name: str) -> bool

Check if names match, handling common variations: - Middle names / initials - Name order (First Last vs Last, First) - Accents and Unicode normalization

Source code in src/enrichers/enrich_affiliations_csrankings.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def fuzzy_name_match(author_name: str, csrankings_name: str) -> bool:
    """
    Check if names match, handling common variations:
    - Middle names / initials
    - Name order (First Last vs Last, First)
    - Accents and Unicode normalization
    """
    auth_norm = normalize_name(author_name)
    cs_norm = normalize_name(csrankings_name)

    # Exact match
    if auth_norm == cs_norm:
        return True

    # Split into parts
    auth_parts = auth_norm.split()
    cs_parts = cs_norm.split()

    if not auth_parts or not cs_parts:
        return False

    # Last name must match
    if auth_parts[-1] != cs_parts[-1]:
        return False

    # First name match (allowing initials)
    auth_first = auth_parts[0]
    cs_first = cs_parts[0]

    if auth_first == cs_first:
        return True

    # Check if one is initial of the other
    return bool(
        len(auth_first) == 1
        and cs_first.startswith(auth_first)
        or len(cs_first) == 1
        and auth_first.startswith(cs_first)
    )

match_author_to_csrankings(author_name: str, name_index: dict[str, list[dict]], verbose: bool = False) -> Optional[str]

Match author to CSRankings record and return affiliation. Returns None if no match found.

Source code in src/enrichers/enrich_affiliations_csrankings.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def match_author_to_csrankings(
    author_name: str, name_index: dict[str, list[dict]], verbose: bool = False
) -> Optional[str]:
    """
    Match author to CSRankings record and return affiliation.
    Returns None if no match found.
    """
    normalized = normalize_name(author_name)

    # Try exact normalized match first
    candidates = name_index.get(normalized, [])

    if not candidates:
        # Try last name match
        parts = author_name.split()
        if len(parts) >= 2:
            last_name_key = f"lastname:{parts[-1].lower()}"
            candidates = name_index.get(last_name_key, [])

    # Find best match using fuzzy matching
    for record in candidates:
        if fuzzy_name_match(author_name, record["name"]):
            if verbose:
                logger.info(f"    Matched '{author_name}' -> '{record['name']}' ({record['affiliation']})")
            return record["affiliation"]

    return None

enrich_affiliations(authors_file: Path, output_file: Path, name_index: dict[str, list[dict]], max_authors: Optional[int] = None, dry_run: bool = False, verbose: bool = False, data_dir: Optional[str] = None) -> dict[str, int]

Enrich author affiliations using CSRankings data. Returns statistics about enrichment.

Source code in src/enrichers/enrich_affiliations_csrankings.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def enrich_affiliations(
    authors_file: Path,
    output_file: Path,
    name_index: dict[str, list[dict]],
    max_authors: Optional[int] = None,
    dry_run: bool = False,
    verbose: bool = False,
    data_dir: Optional[str] = None,
) -> dict[str, int]:
    """
    Enrich author affiliations using CSRankings data.
    Returns statistics about enrichment.
    """
    # Load authors
    with open(authors_file, "r", encoding="utf-8") as f:
        authors = json.load(f)

    # Load author index if data_dir is provided
    index_by_name = {}
    if data_dir:
        try:
            from src.utils.author_index import load_author_index, save_author_index, update_author_affiliation

            _, index_by_name = load_author_index(data_dir)
            if index_by_name:
                logger.info(f"Loaded author index ({len(index_by_name)} entries)")
        except ImportError:
            logger.debug("Author index not available, skipping author ID enrichment")

    # Track statistics
    stats = {"total": len(authors), "already_has_affiliation": 0, "csrankings_match": 0, "no_match": 0, "enriched": 0}

    # Track overwrites when CSRankings supersedes existing affiliations
    stats["overwritten"] = 0

    if max_authors:
        authors = authors[:max_authors]

    logger.info(f"Processing {len(authors)} authors for CSRankings matches...")

    # Enrich affiliations (CSRankings takes precedence if available)
    enriched_count = 0
    for i, author in enumerate(authors, 1):
        name = author.get("name", "")
        current_affil = author.get("affiliation", "")
        has_affil = bool(current_affil and current_affil != "Unknown")
        if has_affil:
            stats["already_has_affiliation"] += 1

        if verbose:
            logger.info(f"  [{i}/{len(authors)}] Looking up: {name}")

        affiliation = match_author_to_csrankings(name, name_index, verbose)

        if affiliation:
            if affiliation != current_affil:
                author["affiliation"] = affiliation
                # Update author index
                if name in index_by_name:
                    update_author_affiliation(index_by_name[name], affiliation, "csrankings")
                if has_affil:
                    stats["overwritten"] += 1
                else:
                    enriched_count += 1
            stats["csrankings_match"] += 1
            if verbose:
                logger.info(f"    ✓ Found: {affiliation}")

            # Progress update every 100 authors
            if not verbose and i % 100 == 0:
                logger.info(f"  Processed {i}/{len(authors)}... (found {enriched_count} so far)")
        else:
            stats["no_match"] += 1
            if verbose:
                logger.info("    ✗ No match in CSRankings")

    stats["enriched"] = enriched_count
    stats["remaining"] = sum(1 for a in authors if not a.get("affiliation") or a.get("affiliation") == "Unknown")
    stats["final_coverage"] = 100 * (stats["total"] - stats["remaining"]) / stats["total"] if stats["total"] else 0

    # Save results
    if not dry_run:
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(authors, f, indent=2, ensure_ascii=False)
        logger.info(f"\nEnriched authors saved to: {output_file}")
        # Save updated author index
        if data_dir and index_by_name:
            idx_path = save_author_index(
                data_dir, [index_by_name[n] for n in sorted(index_by_name, key=lambda n: index_by_name[n]["id"])]
            )
            logger.info(f"Author index updated: {idx_path}")
    else:
        logger.info(f"\nDry run - would save to: {output_file}")

    return stats