#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 批量补录族谱人员缺失字段:name_word(字辈)、former_name(曾用名)、childhood_name(幼名/乳名) 用法: python3 fill_missing_fields.py # dry-run,仅输出预览 CSV,不写数据库 python3 fill_missing_fields.py --commit # 真正写入数据库 python3 fill_missing_fields.py --stats # 仅统计字段填写率,不做任何修改 """ from __future__ import annotations import sys import re import json import csv import pymysql from datetime import datetime # ─── 数据库配置 ─────────────────────────────────────────────────────────────── DB_CONFIG = { "host": "rm-f8ze60yirdj8786u2wo.mysql.rds.aliyuncs.com", "port": 3306, "user": "root", "password": "csqz@20255", "db": "csqz-client", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } COMMIT_MODE = "--commit" in sys.argv STATS_ONLY = "--stats" in sys.argv # ─── 族谱原文正则规则 ──────────────────────────────────────────────────────── _BOUNDARY = r'(?=[,,。.\s公字幼又乳小二三四五六七八九十\))\]】]|$)' REGEX_RULES = { "former_name": [ re.compile(r'曾用名[::,,\s]*([^\s,,。.公字幼又乳小]{1,10})' + _BOUNDARY), re.compile(r'又名[::,,\s]*([^\s,,。.公字幼乳小]{1,10})' + _BOUNDARY), re.compile(r'曾名[::,,\s]*([^\s,,。.公字幼乳小]{1,10})' + _BOUNDARY), re.compile(r'一名[::,,\s]*([^\s,,。.公字幼乳小]{1,10})' + _BOUNDARY), ], "childhood_name": [ re.compile(r'幼名[::,,\s]*([^\s,,。.公字又曾]{1,10})' + _BOUNDARY), re.compile(r'乳名[::,,\s]*([^\s,,。.公字又曾]{1,10})' + _BOUNDARY), re.compile(r'小名[::,,\s]*([^\s,,。.公字又曾]{1,10})' + _BOUNDARY), ], } def _normalize_text(text): """全角标点→半角,统一处理""" if not text: return "" text = text.replace('\uff1a', ':').replace('\uff0c', ',').replace('\u3002', '.') text = text.replace('\u3000', ' ').replace('\xa0', ' ') return text def _extract_from_original(text_trad, text_simp): """从族谱原文中提取 former_name / childhood_name,返回 dict""" result = {} for field, patterns in REGEX_RULES.items(): for text in [text_trad or "", text_simp or ""]: norm = _normalize_text(text) for pat in patterns: m = pat.search(norm) if m: val = m.group(1).strip().rstrip('公') if val: result[field] = val break if field in result: break return result def _match_ai_name_word(ai_content_str, member_name, member_simp): """从 ai_content JSON 中按姓名匹配 name_word,返回字符串或 None""" if not ai_content_str: return None try: records = json.loads(ai_content_str) if not isinstance(records, list): records = [records] for r in records: orig = (r.get('original_name') or '').strip() nm = (r.get('name') or '').strip() nw = (r.get('name_word') or '').strip() if not nw: continue targets = set() if member_name: targets.add(member_name) if member_simp: targets.add(member_simp) # 精确匹配 if orig in targets or nm in targets: return nw # 模糊匹配(名字互含) for t in targets: if t and (t in orig or t in nm or orig in t or nm in t): return nw except Exception: pass return None def _guess_name_word(simplified_name, name): """从姓名推算字辈:去掉"公"后缀,取第1个字;返回字符串或 None""" raw = (simplified_name or name or "").strip() raw = re.sub(r'公$', '', raw) raw = re.sub(r'\s+', '', raw) if len(raw) >= 2: return raw[0] return None def get_conn(): return pymysql.connect(**DB_CONFIG) def print_stats(conn): with conn.cursor() as cur: cur.execute("SELECT COUNT(*) AS total FROM family_member_info") total = cur.fetchone()['total'] for field, label in [ ('name_word', '字辈'), ('former_name', '曾用名'), ('childhood_name', '幼名/乳名'), ]: cur.execute(""" SELECT COUNT(*) AS cnt FROM family_member_info WHERE {} IS NOT NULL AND TRIM({}) != '' """.format(field, field)) filled = cur.fetchone()['cnt'] pct = filled / total * 100 if total else 0 print(" {:<12s} ({:<15s}): 已填 {:4d} / {} ({:.1f}%)".format( label, field, filled, total, pct)) def collect_updates(conn): """ 遍历所有成员,计算需要补录的字段。 返回 list of dict。 """ print(" 正在加载成员数据…") with conn.cursor() as cur: cur.execute(""" SELECT id, name, simplified_name, name_word, former_name, childhood_name, genealogy_original_traditional, genealogy_original_simplified, source_record_id FROM family_member_info """) members = cur.fetchall() print(" 成员总数:{}".format(len(members))) print(" 正在加载 AI 解析记录…") cur.execute(""" SELECT id, ai_content FROM genealogy_records WHERE ai_status = 2 AND ai_content IS NOT NULL """) ai_map = {r['id']: r['ai_content'] for r in cur.fetchall()} print(" AI 记录数:{}".format(len(ai_map))) updates = [] for m in members: mid = m['id'] name = (m['name'] or '').strip() simp = (m['simplified_name'] or '').strip() cur_nw = (m['name_word'] or '').strip() cur_fn = (m['former_name'] or '').strip() cur_cn = (m['childhood_name'] or '').strip() orig_trad = m['genealogy_original_traditional'] or '' orig_simp = m['genealogy_original_simplified'] or '' source_rec_id = m['source_record_id'] new_nw = None nw_source = '' new_fn = None new_cn = None # ── 字辈:优先 ai_content ───────────────────────────────────────── if not cur_nw: if source_rec_id and source_rec_id in ai_map: ai_nw = _match_ai_name_word(ai_map[source_rec_id], name, simp) if ai_nw: new_nw = ai_nw nw_source = 'ai_content' # 兜底:从姓名推算 if not new_nw: guessed = _guess_name_word(simp, name) if guessed: new_nw = guessed nw_source = 'name_guess' # ── 曾用名 / 幼名:从族谱原文提取 ────────────────────────────── extracted = _extract_from_original(orig_trad, orig_simp) if not cur_fn and extracted.get('former_name'): new_fn = extracted['former_name'] if not cur_cn and extracted.get('childhood_name'): new_cn = extracted['childhood_name'] # 只收录有变化的记录 if new_nw or new_fn or new_cn: updates.append({ 'id': mid, 'name': name, 'simplified_name': simp, 'name_word_current': cur_nw, 'name_word_new': new_nw or '', 'name_word_source': nw_source, 'former_name_current': cur_fn, 'former_name_new': new_fn or '', 'childhood_name_current': cur_cn, 'childhood_name_new': new_cn or '', }) return updates def write_csv(updates, path): fieldnames = [ 'id', 'name', 'simplified_name', 'name_word_current', 'name_word_new', 'name_word_source', 'former_name_current', 'former_name_new', 'childhood_name_current', 'childhood_name_new', ] with open(path, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(updates) print(" 已写出预览 CSV:{}".format(path)) def apply_updates(conn, updates): """执行 UPDATE,仅覆盖当前为空的字段""" stats = {'name_word': 0, 'former_name': 0, 'childhood_name': 0, 'total_members': 0} with conn.cursor() as cur: for u in updates: sets = [] params = [] if u['name_word_new']: sets.append("name_word = %s") params.append(u['name_word_new']) stats['name_word'] += 1 if u['former_name_new']: sets.append("former_name = %s") params.append(u['former_name_new']) stats['former_name'] += 1 if u['childhood_name_new']: sets.append("childhood_name = %s") params.append(u['childhood_name_new']) stats['childhood_name'] += 1 if sets: params.append(u['id']) cur.execute( "UPDATE family_member_info SET {} WHERE id = %s".format(', '.join(sets)), params ) stats['total_members'] += 1 conn.commit() return stats def main(): print("=" * 60) mode_label = '[COMMIT 模式 - 真正写入]' if COMMIT_MODE else '[DRY-RUN 模式 - 仅预览]' print(" 族谱字段批量补录工具 {}".format(mode_label)) print(" 运行时间:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) print("=" * 60) conn = get_conn() try: print("\n【字段填写率统计(运行前)】") print_stats(conn) if STATS_ONLY: return print("\n【扫描成员数据…】") updates = collect_updates(conn) ai_nw = [u for u in updates if u['name_word_source'] == 'ai_content'] guess_nw = [u for u in updates if u['name_word_source'] == 'name_guess'] fn_hits = [u for u in updates if u['former_name_new']] cn_hits = [u for u in updates if u['childhood_name_new']] print("\n 字辈(来自 ai_content) : {:4d} 条 ← 高可信度".format(len(ai_nw))) print(" 字辈(来自姓名推算) : {:4d} 条 ← 建议人工抽检".format(len(guess_nw))) print(" 曾用名(来自族谱原文) : {:4d} 条".format(len(fn_hits))) print(" 幼名/乳名(来自族谱原文): {:4d} 条".format(len(cn_hits))) print(" 涉及成员总计 : {:4d} 名".format(len(updates))) ts = datetime.now().strftime('%Y%m%d_%H%M%S') # 全量预览 CSV all_csv = "fill_preview_{}.csv".format(ts) write_csv(updates, all_csv) # 推算字辈单独一个 CSV(供重点核查) if guess_nw: guess_csv = "fill_preview_guess_nameword_{}.csv".format(ts) write_csv(guess_nw, guess_csv) print(" ⚠ 推算字辈单独导出:{}(请优先人工核查)".format(guess_csv)) if COMMIT_MODE: print("\n【开始写入数据库…】") stats = apply_updates(conn, updates) print(" 字辈写入 : {} 条".format(stats['name_word'])) print(" 曾用名写入 : {} 条".format(stats['former_name'])) print(" 幼名/乳名写入 : {} 条".format(stats['childhood_name'])) print(" 影响成员 : {} 名".format(stats['total_members'])) print("\n【字段填写率统计(运行后)】") print_stats(conn) else: print("\n DRY-RUN 完成,未写入数据库。") print(" 确认 CSV 内容无误后,执行真正写入:") print(" python3 fill_missing_fields.py --commit") finally: conn.close() print("\n" + "=" * 60) if __name__ == '__main__': main()