| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 批量补录族谱人员缺失字段:name_word(字辈)、former_name(曾用名)、childhood_name(幼名/乳名)
- 用法:
- python3 fill_missing_fields.py # dry-run,仅输出预览 CSV,不写数据库
- python3 fill_missing_fields.py --commit # 真正写入数据库
- python3 fill_missing_fields.py --stats # 仅统计字段填写率,不做任何修改
- """
- from __future__ import annotations
- import sys
- import re
- import json
- import csv
- import pymysql
- from datetime import datetime
- # ─── 数据库配置 ───────────────────────────────────────────────────────────────
- DB_CONFIG = {
- "host": "rm-f8ze60yirdj8786u2wo.mysql.rds.aliyuncs.com",
- "port": 3306,
- "user": "root",
- "password": "csqz@20255",
- "db": "csqz-client",
- "charset": "utf8mb4",
- "cursorclass": pymysql.cursors.DictCursor,
- }
- COMMIT_MODE = "--commit" in sys.argv
- STATS_ONLY = "--stats" in sys.argv
- # ─── 族谱原文正则规则 ────────────────────────────────────────────────────────
- _BOUNDARY = r'(?=[,,。.\s公字幼又乳小二三四五六七八九十\))\]】]|$)'
- REGEX_RULES = {
- "former_name": [
- re.compile(r'曾用名[::,,\s]*([^\s,,。.公字幼又乳小]{1,10})' + _BOUNDARY),
- re.compile(r'又名[::,,\s]*([^\s,,。.公字幼乳小]{1,10})' + _BOUNDARY),
- re.compile(r'曾名[::,,\s]*([^\s,,。.公字幼乳小]{1,10})' + _BOUNDARY),
- re.compile(r'一名[::,,\s]*([^\s,,。.公字幼乳小]{1,10})' + _BOUNDARY),
- ],
- "childhood_name": [
- re.compile(r'幼名[::,,\s]*([^\s,,。.公字又曾]{1,10})' + _BOUNDARY),
- re.compile(r'乳名[::,,\s]*([^\s,,。.公字又曾]{1,10})' + _BOUNDARY),
- re.compile(r'小名[::,,\s]*([^\s,,。.公字又曾]{1,10})' + _BOUNDARY),
- ],
- }
- def _normalize_text(text):
- """全角标点→半角,统一处理"""
- if not text:
- return ""
- text = text.replace('\uff1a', ':').replace('\uff0c', ',').replace('\u3002', '.')
- text = text.replace('\u3000', ' ').replace('\xa0', ' ')
- return text
- def _extract_from_original(text_trad, text_simp):
- """从族谱原文中提取 former_name / childhood_name,返回 dict"""
- result = {}
- for field, patterns in REGEX_RULES.items():
- for text in [text_trad or "", text_simp or ""]:
- norm = _normalize_text(text)
- for pat in patterns:
- m = pat.search(norm)
- if m:
- val = m.group(1).strip().rstrip('公')
- if val:
- result[field] = val
- break
- if field in result:
- break
- return result
- def _match_ai_name_word(ai_content_str, member_name, member_simp):
- """从 ai_content JSON 中按姓名匹配 name_word,返回字符串或 None"""
- if not ai_content_str:
- return None
- try:
- records = json.loads(ai_content_str)
- if not isinstance(records, list):
- records = [records]
- for r in records:
- orig = (r.get('original_name') or '').strip()
- nm = (r.get('name') or '').strip()
- nw = (r.get('name_word') or '').strip()
- if not nw:
- continue
- targets = set()
- if member_name: targets.add(member_name)
- if member_simp: targets.add(member_simp)
- # 精确匹配
- if orig in targets or nm in targets:
- return nw
- # 模糊匹配(名字互含)
- for t in targets:
- if t and (t in orig or t in nm or orig in t or nm in t):
- return nw
- except Exception:
- pass
- return None
- def _guess_name_word(simplified_name, name):
- """从姓名推算字辈:去掉"公"后缀,取第1个字;返回字符串或 None"""
- raw = (simplified_name or name or "").strip()
- raw = re.sub(r'公$', '', raw)
- raw = re.sub(r'\s+', '', raw)
- if len(raw) >= 2:
- return raw[0]
- return None
- def get_conn():
- return pymysql.connect(**DB_CONFIG)
- def print_stats(conn):
- with conn.cursor() as cur:
- cur.execute("SELECT COUNT(*) AS total FROM family_member_info")
- total = cur.fetchone()['total']
- for field, label in [
- ('name_word', '字辈'),
- ('former_name', '曾用名'),
- ('childhood_name', '幼名/乳名'),
- ]:
- cur.execute("""
- SELECT COUNT(*) AS cnt FROM family_member_info
- WHERE {} IS NOT NULL AND TRIM({}) != ''
- """.format(field, field))
- filled = cur.fetchone()['cnt']
- pct = filled / total * 100 if total else 0
- print(" {:<12s} ({:<15s}): 已填 {:4d} / {} ({:.1f}%)".format(
- label, field, filled, total, pct))
- def collect_updates(conn):
- """
- 遍历所有成员,计算需要补录的字段。
- 返回 list of dict。
- """
- print(" 正在加载成员数据…")
- with conn.cursor() as cur:
- cur.execute("""
- SELECT id, name, simplified_name,
- name_word, former_name, childhood_name,
- genealogy_original_traditional,
- genealogy_original_simplified,
- source_record_id
- FROM family_member_info
- """)
- members = cur.fetchall()
- print(" 成员总数:{}".format(len(members)))
- print(" 正在加载 AI 解析记录…")
- cur.execute("""
- SELECT id, ai_content
- FROM genealogy_records
- WHERE ai_status = 2 AND ai_content IS NOT NULL
- """)
- ai_map = {r['id']: r['ai_content'] for r in cur.fetchall()}
- print(" AI 记录数:{}".format(len(ai_map)))
- updates = []
- for m in members:
- mid = m['id']
- name = (m['name'] or '').strip()
- simp = (m['simplified_name'] or '').strip()
- cur_nw = (m['name_word'] or '').strip()
- cur_fn = (m['former_name'] or '').strip()
- cur_cn = (m['childhood_name'] or '').strip()
- orig_trad = m['genealogy_original_traditional'] or ''
- orig_simp = m['genealogy_original_simplified'] or ''
- source_rec_id = m['source_record_id']
- new_nw = None
- nw_source = ''
- new_fn = None
- new_cn = None
- # ── 字辈:优先 ai_content ─────────────────────────────────────────
- if not cur_nw:
- if source_rec_id and source_rec_id in ai_map:
- ai_nw = _match_ai_name_word(ai_map[source_rec_id], name, simp)
- if ai_nw:
- new_nw = ai_nw
- nw_source = 'ai_content'
- # 兜底:从姓名推算
- if not new_nw:
- guessed = _guess_name_word(simp, name)
- if guessed:
- new_nw = guessed
- nw_source = 'name_guess'
- # ── 曾用名 / 幼名:从族谱原文提取 ──────────────────────────────
- extracted = _extract_from_original(orig_trad, orig_simp)
- if not cur_fn and extracted.get('former_name'):
- new_fn = extracted['former_name']
- if not cur_cn and extracted.get('childhood_name'):
- new_cn = extracted['childhood_name']
- # 只收录有变化的记录
- if new_nw or new_fn or new_cn:
- updates.append({
- 'id': mid,
- 'name': name,
- 'simplified_name': simp,
- 'name_word_current': cur_nw,
- 'name_word_new': new_nw or '',
- 'name_word_source': nw_source,
- 'former_name_current': cur_fn,
- 'former_name_new': new_fn or '',
- 'childhood_name_current': cur_cn,
- 'childhood_name_new': new_cn or '',
- })
- return updates
- def write_csv(updates, path):
- fieldnames = [
- 'id', 'name', 'simplified_name',
- 'name_word_current', 'name_word_new', 'name_word_source',
- 'former_name_current', 'former_name_new',
- 'childhood_name_current', 'childhood_name_new',
- ]
- with open(path, 'w', newline='', encoding='utf-8-sig') as f:
- writer = csv.DictWriter(f, fieldnames=fieldnames)
- writer.writeheader()
- writer.writerows(updates)
- print(" 已写出预览 CSV:{}".format(path))
- def apply_updates(conn, updates):
- """执行 UPDATE,仅覆盖当前为空的字段"""
- stats = {'name_word': 0, 'former_name': 0, 'childhood_name': 0, 'total_members': 0}
- with conn.cursor() as cur:
- for u in updates:
- sets = []
- params = []
- if u['name_word_new']:
- sets.append("name_word = %s")
- params.append(u['name_word_new'])
- stats['name_word'] += 1
- if u['former_name_new']:
- sets.append("former_name = %s")
- params.append(u['former_name_new'])
- stats['former_name'] += 1
- if u['childhood_name_new']:
- sets.append("childhood_name = %s")
- params.append(u['childhood_name_new'])
- stats['childhood_name'] += 1
- if sets:
- params.append(u['id'])
- cur.execute(
- "UPDATE family_member_info SET {} WHERE id = %s".format(', '.join(sets)),
- params
- )
- stats['total_members'] += 1
- conn.commit()
- return stats
- def main():
- print("=" * 60)
- mode_label = '[COMMIT 模式 - 真正写入]' if COMMIT_MODE else '[DRY-RUN 模式 - 仅预览]'
- print(" 族谱字段批量补录工具 {}".format(mode_label))
- print(" 运行时间:{}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
- print("=" * 60)
- conn = get_conn()
- try:
- print("\n【字段填写率统计(运行前)】")
- print_stats(conn)
- if STATS_ONLY:
- return
- print("\n【扫描成员数据…】")
- updates = collect_updates(conn)
- ai_nw = [u for u in updates if u['name_word_source'] == 'ai_content']
- guess_nw = [u for u in updates if u['name_word_source'] == 'name_guess']
- fn_hits = [u for u in updates if u['former_name_new']]
- cn_hits = [u for u in updates if u['childhood_name_new']]
- print("\n 字辈(来自 ai_content) : {:4d} 条 ← 高可信度".format(len(ai_nw)))
- print(" 字辈(来自姓名推算) : {:4d} 条 ← 建议人工抽检".format(len(guess_nw)))
- print(" 曾用名(来自族谱原文) : {:4d} 条".format(len(fn_hits)))
- print(" 幼名/乳名(来自族谱原文): {:4d} 条".format(len(cn_hits)))
- print(" 涉及成员总计 : {:4d} 名".format(len(updates)))
- ts = datetime.now().strftime('%Y%m%d_%H%M%S')
- # 全量预览 CSV
- all_csv = "fill_preview_{}.csv".format(ts)
- write_csv(updates, all_csv)
- # 推算字辈单独一个 CSV(供重点核查)
- if guess_nw:
- guess_csv = "fill_preview_guess_nameword_{}.csv".format(ts)
- write_csv(guess_nw, guess_csv)
- print(" ⚠ 推算字辈单独导出:{}(请优先人工核查)".format(guess_csv))
- if COMMIT_MODE:
- print("\n【开始写入数据库…】")
- stats = apply_updates(conn, updates)
- print(" 字辈写入 : {} 条".format(stats['name_word']))
- print(" 曾用名写入 : {} 条".format(stats['former_name']))
- print(" 幼名/乳名写入 : {} 条".format(stats['childhood_name']))
- print(" 影响成员 : {} 名".format(stats['total_members']))
- print("\n【字段填写率统计(运行后)】")
- print_stats(conn)
- else:
- print("\n DRY-RUN 完成,未写入数据库。")
- print(" 确认 CSV 内容无误后,执行真正写入:")
- print(" python3 fill_missing_fields.py --commit")
- finally:
- conn.close()
- print("\n" + "=" * 60)
- if __name__ == '__main__':
- main()
|