import os import pymysql import requests import json import re import threading import urllib3 import fitz # PyMuPDF from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, Response, stream_with_context from werkzeug.utils import secure_filename from oss_utils import upload_to_oss from ocr_utils import extract_page_number import time from datetime import datetime # Suppress InsecureRequestWarning urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) app = Flask(__name__, static_folder='static', static_url_path='/manager/static') app.secret_key = 'genealogy_secret_key' app.config['UPLOAD_FOLDER'] = 'uploads' os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # 数据库配置 DB_CONFIG = { "host": "rm-f8ze60yirdj8786u2wo.mysql.rds.aliyuncs.com", "port": 3306, "user": "root", "password": "csqz@20255", "db": "csqz-client", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor } from PIL import Image def compress_image_if_needed(file_path, max_dim=2000): """Compress, resize and normalize image to JPEG for AI processing.""" try: # We always want to normalize to JPEG so AI doesn't complain about format with Image.open(file_path) as img: # Convert RGBA/P or any other mode to RGB for JPEG saving if img.mode != 'RGB': img = img.convert('RGB') width, height = img.size if max(width, height) > max_dim: ratio = max_dim / max(width, height) new_size = (int(width * ratio), int(height * ratio)) img = img.resize(new_size, Image.Resampling.LANCZOS) # Always save as JPEG to normalize the format new_path = os.path.splitext(file_path)[0] + '_normalized.jpg' img.save(new_path, 'JPEG', quality=85) return new_path except Exception as e: print(f"Warning: Image compression/normalization failed for {file_path}: {e}") return file_path # 尝试使用数据库连接池,如果不可用则使用普通连接 try: from DBUtils.PooledDB import PooledDB # 创建连接池 pool = PooledDB( creator=pymysql, maxconnections=10, # 连接池最大连接数 mincached=2, # 初始化时创建的空闲连接数 maxcached=5, # 最大空闲连接数 maxshared=3, # 最大共享连接数 blocking=True, # 连接池满时是否阻塞等待 maxusage=1000, # 一个连接最多被重复使用的次数,防止连接长时间使用失效 setsession=[], # 开始会话前执行的命令列表 ping=1, # 每次获取连接时都检查连接是否可用 **DB_CONFIG ) def get_db_connection(): conn = pool.connection() print(f"[Database] Got connection from pool: {id(conn)}") return conn print("[Database] Database connection pool initialized successfully") except ImportError: # 如果DBUtils不可用,使用普通连接 def get_db_connection(): conn = pymysql.connect(**DB_CONFIG) print(f"[Database] Created new connection: {id(conn)}") return conn print("[Database] DBUtils not available, using regular database connections") def verify_connection(conn): """Verify database connection is still alive""" try: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() return True except Exception as e: print(f"[Database] Connection verification failed: {e}") return False def safe_commit(conn): """Safely commit transaction with error handling""" try: conn.commit() print(f"[Database] Transaction committed successfully") return True except Exception as e: print(f"[Database] Commit failed: {e}") try: conn.rollback() print(f"[Database] Rollback completed") except Exception as rollback_err: print(f"[Database] Rollback also failed: {rollback_err}") return False def format_timestamp(ts): if not ts: return '未知' try: # 兼容秒和毫秒 if ts > 10000000000: # 超过2286年的秒数,通常认为是毫秒 ts = ts / 1000 return time.strftime('%Y-%m-%d', time.localtime(ts)) except: return '未知' def manual_simplify(text): """ Simple fallback for common Traditional to Simplified conversion if AI fails to convert specific characters. """ if not text: return text mapping = { '學': '学', '國': '国', '萬': '万', '寶': '宝', '興': '兴', '華': '华', '會': '会', '葉': '叶', '藝': '艺', '號': '号', '處': '处', '見': '见', '視': '视', '言': '言', '語': '语', '貝': '贝', '車': '车', '長': '长', '門': '门', '韋': '韦', '頁': '页', '風': '风', '飛': '飞', '食': '食', '馬': '马', '魚': '鱼', '鳥': '鸟', '麥': '麦', '黃': '黄', '齊': '齐', '齒': '齿', '龍': '龙', '龜': '龟', '壽': '寿', '榮': '荣', '愛': '爱', '慶': '庆', '衛': '卫', '賢': '贤', '義': '义', '禮': '礼', '樂': '乐', '靈': '灵', '滅': '灭', '氣': '气', '智': '智', '信': '信', '仁': '仁', '勇': '勇', '嚴': '严', '銳': '锐', '優': '优', '楊': '杨', '吳': '吴', '銀': '银' } result = "" for char in text: result += mapping.get(char, char) return result def convert_to_simplified(text): """繁体转简体,优先使用 zhconv 库,失败则降级到 manual_simplify""" if not text: return text try: import zhconv return zhconv.convert(text, 'zh-hans') except Exception: return manual_simplify(text) def _build_reverse_simplify_map(): """ Build a reverse map from simplified char -> list of traditional chars based on the fallback manual_simplify mapping. """ mapping = { '學': '学', '國': '国', '萬': '万', '寶': '宝', '興': '兴', '華': '华', '會': '会', '葉': '叶', '藝': '艺', '號': '号', '處': '处', '見': '见', '視': '视', '言': '言', '語': '语', '貝': '贝', '車': '车', '長': '长', '門': '门', '韋': '韦', '頁': '页', '風': '风', '飛': '飞', '食': '食', '馬': '马', '魚': '鱼', '鳥': '鸟', '麥': '麦', '黃': '黄', '齊': '齐', '齒': '齿', '龍': '龙', '龜': '龟', '壽': '寿', '榮': '荣', '愛': '爱', '慶': '庆', '衛': '卫', '賢': '贤', '義': '义', '禮': '礼', '樂': '乐', '靈': '灵', '滅': '灭', '氣': '气', '智': '智', '信': '信', '仁': '仁', '勇': '勇', '嚴': '严', '銳': '锐', '優': '优', '楊': '杨', '吳': '吴', '銀': '银' } rev = {} for trad, simp in mapping.items(): rev.setdefault(simp, []) if trad not in rev[simp]: rev[simp].append(trad) return rev _REVERSE_SIMPLIFY_MAP = _build_reverse_simplify_map() def expand_name_search_variants(keyword, max_variants=60): """ Expand keyword into a small set of variants so Simplified/Traditional searches can match both `name` and `simplified_name`. - Always includes original keyword - Includes fallback-trad->simp conversion - Includes best-effort simp->trad expansions based on reverse map """ if not keyword: return [] kw = str(keyword).strip() if not kw: return [] variants = set([kw]) variants.add(manual_simplify(kw)) # Build possible traditional variants when the input is simplified. # For each char, if we have traditional candidates, branch; otherwise keep itself. choices = [] for ch in kw: cand = _REVERSE_SIMPLIFY_MAP.get(ch) if cand: # include itself too (covers already-traditional or neutral chars) choices.append([ch] + cand) else: choices.append([ch]) # Cartesian product with early stop. results = [''] for opts in choices: new_results = [] for prefix in results: for opt in opts: new_results.append(prefix + opt) if len(new_results) >= max_variants: break if len(new_results) >= max_variants: break results = new_results if len(results) >= max_variants: break for r in results: if r: variants.add(r) variants.add(manual_simplify(r)) # Keep deterministic order for stable SQL params ordered = [] for v in variants: v2 = (v or '').strip() if v2 and v2 not in ordered: ordered.append(v2) if len(ordered) >= max_variants: break return ordered def clean_name(name): """ Clean name according to Liu family genealogy rules: 1. If name is '学公' or '留学公', keep 'Gong' (exception). 2. Otherwise, if name ends with '公', remove '公'. 3. If name does not start with '留', prepend '留'. """ if not name: return name name = name.strip() # Pre-process: Ensure Simplified Chinese for specific chars name = manual_simplify(name) # 1. Check exceptions (names that SHOULD keep 'Gong') exceptions = ['学公', '留学公'] if name in exceptions: if not name.startswith('留'): name = '留' + name return name # 2. General Rule: Remove 'Gong' suffix if name.endswith('公'): name = name[:-1] # 3. Ensure 'Liu' surname if not name.startswith('留'): name = '留' + name return name def is_female_value(sex_value): """Return True when sex value represents female.""" if sex_value is None: return False s = str(sex_value).strip().lower() return s in ('女', '2', 'female', 'f') def normalize_lookup_name(name): """Normalize names for loose matching in AI parsed content.""" if not name: return '' return manual_simplify(str(name)).strip() def should_skip_liu_prefix_for_person(person, spouse_name_set): """ Female spouse records should not auto-prepend '留' in simplified_name. We treat a person as female spouse if: 1) sex is female, and 2) has spouse_name field OR appears in another person's spouse_name list. """ if not isinstance(person, dict): return False if not is_female_value(person.get('sex')): return False own_names = set() own_names.add(normalize_lookup_name(person.get('name'))) own_names.add(normalize_lookup_name(person.get('original_name'))) own_names.discard('') has_spouse_name = bool(normalize_lookup_name(person.get('spouse_name'))) referenced_by_other = any(n in spouse_name_set for n in own_names) return has_spouse_name or referenced_by_other def get_normalized_base64_image(image_url): """Download image, normalize to JPEG, and return base64 data URI for AI payload.""" import io import base64 import requests from PIL import Image try: response = requests.get(image_url, timeout=30) response.raise_for_status() with Image.open(io.BytesIO(response.content)) as img: # Convert to RGB to ensure JPEG compatibility if img.mode != 'RGB': img = img.convert('RGB') # Resize if too large max_dim = 2000 if max(img.width, img.height) > max_dim: ratio = max_dim / max(img.width, img.height) new_size = (int(img.width * ratio), int(img.height * ratio)) img = img.resize(new_size, Image.Resampling.LANCZOS) # Save as JPEG in memory buffer = io.BytesIO() img.save(buffer, format='JPEG', quality=85) b64_str = base64.b64encode(buffer.getvalue()).decode('utf-8') return f"data:image/jpeg;base64,{b64_str}" except Exception as e: print(f"Error normalizing image from {image_url}: {e}") return image_url # Fallback to original URL if processing fails def process_ai_task(record_id, image_url): """Background task to process image with AI and store result.""" print(f"[AI Task] Starting task for record {record_id}...") conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("UPDATE genealogy_records SET ai_status = 1 WHERE id = %s", (record_id,)) conn.commit() print(f"[AI Task] Status updated to 'Processing' for record {record_id}") api_key = "a1800657-9212-4afe-9b7c-b49f015c54d3" api_url = "https://ark.cn-beijing.volces.com/api/v3/responses" prompt = """ 请分析这张家谱图片,提取其中关于人物的信息。 请务必将繁体字转换为简体字(original_name 字段除外)。 特别注意:'name' 字段必须是纯简体中文,不能包含繁体字(例如:'學'应转换为'学','劉'应转换为'刘','萬'应转换为'万')。 请提取以下字段(如果存在): - original_name: 原始姓名(严格保持图片上的繁体字,不做任何修改或转换) - name: 简体姓名(必须转换为简体中文,去除不需要的敬称) - sex: 性别(男/女) - birthday: 出生日期(尝试转换为YYYY-MM-DD格式,如果无法确定年份可只填月日) - death_date: 逝世日期(如文本中出现“殁”、“葬”、“卒”等字眼及其对应的时间,请提取) - father_name: 父亲姓名 - spouse_name: 配偶姓名 - generation: 第几世/代数 - name_word: 字辈(例如名字为“学勤公”,“学”为字辈;提取名字中的字辈信息) - education: 学历/功名 - title: 官职/称号 请严格以JSON列表格式返回,不要包含Markdown代码块标记(如 ```json ... ```),直接返回JSON数组。 如果包含多个人物,请都提取出来。 Do not output any reasoning or explanation, just the JSON. """ ai_payload_url = get_normalized_base64_image(image_url) payload = { "model": "doubao-seed-1-8-251228", "stream": True, # Streaming for robust handling "input": [ { "role": "user", "content": [ {"type": "input_image", "image_url": ai_payload_url}, {"type": "input_text", "text": prompt} ] } ] } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } max_retries = 3 last_exception = None for attempt in range(max_retries): try: print(f"[AI Task] Attempt {attempt+1}/{max_retries} connecting to API for record {record_id}...") response = requests.post( api_url, json=payload, headers=headers, timeout=1200, stream=True, verify=False, proxies={"http": None, "https": None} ) if response.status_code == 200: print(f"[AI Task] Connection established for record {record_id}, receiving stream...") full_content = "" for line in response.iter_lines(): if not line: continue line_str = line.decode('utf-8') # Debug: Print full line to understand event flow print(f"[AI Task Debug] Raw Line: {line_str[:500]}") # Truncate very long lines if line_str.startswith('data: '): json_str = line_str[6:] if json_str.strip() == '[DONE]': print("[AI Task Debug] Received [DONE]") break try: chunk = json.loads(json_str) chunk_type = chunk.get('type') # Standard OpenAI format (choices) if 'choices' in chunk and len(chunk['choices']) > 0: delta = chunk['choices'][0].get('delta', {}) if 'content' in delta: full_content += delta['content'] # Doubao/Volcengine specific formats (delta) elif chunk_type == 'response.text.delta': full_content += chunk.get('delta', '') # Check response.completed if empty elif chunk_type == 'response.completed' and not full_content: output = chunk.get('response', {}).get('output', []) for item in output: # Also extract from reasoning if it contains JSON-like text if item.get('type') == 'reasoning': summary = item.get('summary', []) for sum_item in summary: if sum_item.get('type') == 'summary_text': full_content += sum_item.get('text', '') elif item.get('type') == 'message': content = item.get('content') if isinstance(content, str): full_content += content elif isinstance(content, list): for part in content: if isinstance(part, dict) and part.get('type') == 'text': full_content += part.get('text', '') # Fallback: output_item.added elif chunk_type == 'response.output_item.added': item = chunk.get('item', {}) if item.get('role') == 'assistant': content_field = item.get('content', []) if isinstance(content_field, str): full_content += content_field elif isinstance(content_field, list): for part in content_field: if isinstance(part, dict) and part.get('type') == 'text': full_content += part.get('text', '') except Exception as e: print(f"[AI Task] Chunk parse error: {e}") else: # Fallback for non-SSE try: chunk = json.loads(line_str) if 'choices' in chunk and len(chunk['choices']) > 0: content = chunk['choices'][0]['message']['content'] full_content += content except: pass print(f"[AI Task] Stream finished. Content length: {len(full_content)}") if len(full_content) == 0: print(f"[AI Task] WARNING: No content received from AI stream.") # Continue to JSON parse to fail gracefully # Clean JSON try: # 1. Try finding [...] array start = full_content.find('[') end = full_content.rfind(']') # 2. If not found, try finding {...} object and wrap it is_single_object = False if start == -1 or end == -1 or end <= start: start = full_content.find('{') end = full_content.rfind('}') is_single_object = True if start != -1 and end != -1 and end > start: content_clean = full_content[start:end+1] else: # Fallback to regex or raw content_clean = re.sub(r'^```json\s*', '', full_content) content_clean = re.sub(r'```$', '', content_clean) parsed = json.loads(content_clean) # Normalize single object to list if is_single_object and isinstance(parsed, dict): parsed = [parsed] content_clean = json.dumps(parsed, ensure_ascii=False) elif isinstance(parsed, dict) and not isinstance(parsed, list): # Just in case json.loads parsed a dict even if we looked for [] parsed = [parsed] content_clean = json.dumps(parsed, ensure_ascii=False) # Build spouse name lookup for "female spouse" detection spouse_name_set = set() if isinstance(parsed, list): for person in parsed: n = normalize_lookup_name(person.get('spouse_name')) if n: spouse_name_set.add(n) # Clean names in parsed content if isinstance(parsed, list): for person in parsed: # Process Name: 'name' is Simplified from AI, 'original_name' is Traditional/Raw from AI simplified_name = person.get('name', '') or person.get('original_name', '') original_name = person.get('original_name', '') # Female spouse: only simplify Chinese, do NOT prepend '留' if should_skip_liu_prefix_for_person(person, spouse_name_set): cleaned_simplified = manual_simplify(simplified_name) else: # Same-clan default: prepend '留' and handle trailing '公' cleaned_simplified = clean_name(simplified_name) person['simplified_name'] = cleaned_simplified # Store raw name in 'name' field (as requested) if original_name: person['name'] = original_name else: # Fallback: if no original_name returned, use the uncleaned name as 'name' # or keep existing logic. But user wants raw in 'name'. # If AI didn't return original_name, 'name' is likely simplified. pass # Keep 'name' as is (which is Simplified) if original_name missing # Father name:同族,需要按“留”姓规则清洗 if 'father_name' in person and person['father_name']: person['father_name'] = clean_name(person['father_name']) # Spouse name:只做繁转简,不拼接“留”姓,也不去“公” if 'spouse_name' in person and person['spouse_name']: person['spouse_name'] = manual_simplify(person['spouse_name']) # Re-serialize content_clean = json.dumps(parsed, ensure_ascii=False) with conn.cursor() as cursor: cursor.execute("UPDATE genealogy_records SET ai_status = 2, ai_content = %s WHERE id = %s", (content_clean, record_id)) conn.commit() print(f"[AI Task] SUCCESS: Record {record_id} processed and saved.") return # Success except json.JSONDecodeError as err: raise Exception(f"JSON Parse Error: {str(err)}. Raw: {full_content}") else: raise Exception(f"API Error {response.status_code}: {response.text}") except Exception as e: print(f"[AI Task] Attempt {attempt+1} failed for record {record_id}: {e}") last_exception = e if attempt < max_retries - 1: wait_time = 2 * (attempt + 1) print(f"[AI Task] Waiting {wait_time}s before retry...") time.sleep(wait_time) raise last_exception or Exception("Unknown error") except Exception as e: print(f"[AI Task] FINAL FAILURE for record {record_id}: {e}") try: with conn.cursor() as cursor: cursor.execute("UPDATE genealogy_records SET ai_status = 3, ai_content = %s WHERE id = %s", (f"Max Retries Exceeded. Error: {str(e)}", record_id)) conn.commit() except: pass finally: conn.close() print(f"[AI Task] Task finished for record {record_id}") def ensure_pdf_table(): conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS genealogy_pdfs ( id INT AUTO_INCREMENT PRIMARY KEY, file_name VARCHAR(255) NOT NULL, oss_url TEXT NOT NULL, description VARCHAR(500) DEFAULT '', upload_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, uploader VARCHAR(100) DEFAULT '', version_name VARCHAR(255) DEFAULT '', version_source VARCHAR(255) DEFAULT '', file_provider VARCHAR(100) DEFAULT '', parse_status INT DEFAULT 0 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) # 检查是否存在parse_status字段,如果不存在则添加 cursor.execute("SHOW COLUMNS FROM genealogy_pdfs LIKE 'parse_status'") if not cursor.fetchone(): cursor.execute("ALTER TABLE genealogy_pdfs ADD COLUMN parse_status INT DEFAULT 0") # 检查是否存在version_name字段,如果不存在则添加 cursor.execute("SHOW COLUMNS FROM genealogy_pdfs LIKE 'version_name'") if not cursor.fetchone(): cursor.execute("ALTER TABLE genealogy_pdfs ADD COLUMN version_name VARCHAR(255) DEFAULT ''") # 检查是否存在version_source字段,如果不存在则添加 cursor.execute("SHOW COLUMNS FROM genealogy_pdfs LIKE 'version_source'") if not cursor.fetchone(): cursor.execute("ALTER TABLE genealogy_pdfs ADD COLUMN version_source VARCHAR(255) DEFAULT ''") # 检查是否存在file_provider字段,如果不存在则添加 cursor.execute("SHOW COLUMNS FROM genealogy_pdfs LIKE 'file_provider'") if not cursor.fetchone(): cursor.execute("ALTER TABLE genealogy_pdfs ADD COLUMN file_provider VARCHAR(100) DEFAULT ''") conn.commit() finally: conn.close() @app.route('/manager/pdf_management') def pdf_management(): if 'user_id' not in session: return redirect(url_for('login')) username = session.get('username', 'unknown') is_super_admin = session.get('is_super_admin', 'NOT_SET') print(f"[PDF Management Access] User: {username}, is_super_admin: {is_super_admin}") # Verify is_super_admin against database - always check latest status conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT is_super_admin FROM users WHERE id = %s", (session['user_id'],)) db_result = cursor.fetchone() db_is_super = db_result['is_super_admin'] if db_result else 0 print(f"[PDF Management Access] DB is_super_admin: {db_is_super}") if not db_is_super: print(f"[PDF Management Access] Denied for {username} (DB check)") flash('无权限访问此页面') return redirect(url_for('home')) finally: conn.close() print(f"[PDF Management Access] Allowed for {username}") ensure_pdf_table() view_id = request.args.get('view', type=int) preview = request.args.get('preview', type=bool, default=False) selected_pdf = None conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT * FROM genealogy_pdfs ORDER BY upload_time DESC") pdfs = cursor.fetchall() if view_id and preview: cursor.execute("SELECT * FROM genealogy_pdfs WHERE id = %s", (view_id,)) selected_pdf = cursor.fetchone() finally: conn.close() return render_template('pdf_management.html', pdfs=pdfs, selected_pdf=selected_pdf) @app.route('/manager/parse_pdf/', methods=['POST']) def parse_pdf(pdf_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 # 标记PDF为解析中 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("UPDATE genealogy_pdfs SET parse_status = 1 WHERE id = %s", (pdf_id,)) conn.commit() finally: conn.close() # 异步执行PDF解析 def parse_pdf_async(): try: # 获取PDF信息 conn = get_db_connection() pdf_info = None try: with conn.cursor() as cursor: cursor.execute("SELECT * FROM genealogy_pdfs WHERE id = %s", (pdf_id,)) pdf_info = cursor.fetchone() finally: conn.close() if not pdf_info: return # 下载PDF并拆分 pdf_url = pdf_info['oss_url'] response = requests.get(pdf_url) response.raise_for_status() # 保存临时PDF文件 temp_pdf_path = f"/tmp/{pdf_info['file_name']}" with open(temp_pdf_path, 'wb') as f: f.write(response.content) # 使用PyMuPDF拆分PDF doc = fitz.open(temp_pdf_path) page_count = doc.page_count # 每个PDF的页码从1开始计算 max_page = 0 # 逐页处理 for i in range(page_count): page = doc[i] pix = page.get_pixmap() image_path = f"/tmp/{pdf_info['file_name']}_page_{i+1}.png" pix.save(image_path) # 上传图片到OSS image_oss_url = upload_to_oss(image_path, f"{pdf_info['file_name']}_page_{i+1}.png") # 检查上传是否成功 if not image_oss_url: raise Exception(f"Failed to upload image to OSS: {image_path}") # 保存到genealogy_records表 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" INSERT INTO genealogy_records (file_name, oss_url, file_type, page_number, genealogy_version, genealogy_source, upload_person, upload_time) VALUES (%s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP) """, ( f"{pdf_info['file_name']}_page_{i+1}.png", image_oss_url, '图片', max_page + i + 1, pdf_info['version_name'], pdf_info['version_source'], pdf_info['file_provider'] )) conn.commit() finally: conn.close() # 删除临时图片文件 if os.path.exists(image_path): os.remove(image_path) # 删除临时PDF文件 if os.path.exists(temp_pdf_path): os.remove(temp_pdf_path) # 更新PDF解析状态为成功 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("UPDATE genealogy_pdfs SET parse_status = 2 WHERE id = %s", (pdf_id,)) conn.commit() finally: conn.close() except Exception as e: # 更新PDF解析状态为失败 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("UPDATE genealogy_pdfs SET parse_status = 3 WHERE id = %s", (pdf_id,)) conn.commit() finally: conn.close() print(f"PDF解析失败: {e}") # 启动异步任务 thread = threading.Thread(target=parse_pdf_async) thread.daemon = True thread.start() return jsonify({"success": True, "message": "PDF解析已开始,将在后台执行"}) @app.route('/manager/batch_ai_parse', methods=['GET']) def batch_ai_parse(): """Batch AI parse for unprocessed records.""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 # Start background thread thread = threading.Thread(target=batch_ai_parse_async) thread.daemon = True thread.start() return jsonify({"success": True, "message": "批量AI解析已开始,请稍候查看结果"}) def batch_ai_parse_async(): """Background task to batch AI parse unprocessed records.""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Starting batch AI parse task...") # Get unprocessed records (ai_status = 0) conn = None unprocessed_records = [] try: conn = get_db_connection() with conn.cursor() as cursor: cursor.execute("SELECT id, oss_url FROM genealogy_records WHERE ai_status = 0 order by page_number") unprocessed_records = cursor.fetchall() conn.close() conn = None total_records = len(unprocessed_records) timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Found {total_records} unprocessed records") if total_records == 0: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] No unprocessed records found") return # Control concurrency to 5 max_concurrency = 5 semaphore = threading.Semaphore(max_concurrency) threads = [] def process_record(record): """Process a single record with semaphore.""" with semaphore: try: record_id = record['id'] image_url = record['oss_url'] timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Processing record {record_id}") process_ai_task(record_id, image_url) except Exception as e: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Error processing record {record['id']}: {e}") # If failed, we'll handle it in the next batch # Start threads for each record for record in unprocessed_records: thread = threading.Thread(target=process_record, args=(record,)) thread.daemon = True thread.start() threads.append(thread) # Wait for all threads to complete for thread in threads: thread.join() timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Batch processing completed. Processed {total_records} records") # Check for failed records and restart them check_failed_records() except Exception as e: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Error: {e}") finally: if conn: try: conn.close() except: pass def check_failed_records(): """Check for failed records and restart them.""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Checking for failed records...") conn = None failed_records = [] try: conn = get_db_connection() with conn.cursor() as cursor: cursor.execute("SELECT id, oss_url FROM genealogy_records WHERE ai_status = 3") failed_records = cursor.fetchall() conn.close() conn = None total_failed = len(failed_records) timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Found {total_failed} failed records") if total_failed == 0: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] No failed records found") return # Control concurrency to 5 for failed records max_concurrency = 5 semaphore = threading.Semaphore(max_concurrency) threads = [] def process_failed_record(record): """Process a failed record with semaphore.""" with semaphore: retry_conn = None try: record_id = record['id'] image_url = record['oss_url'] timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Retrying failed record {record_id}") # Reset status to processing retry_conn = get_db_connection() with retry_conn.cursor() as cursor: cursor.execute("UPDATE genealogy_records SET ai_status = 1 WHERE id = %s", (record_id,)) retry_conn.commit() retry_conn.close() retry_conn = None process_ai_task(record_id, image_url) except Exception as e: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Error retrying record {record['id']}: {e}") finally: if retry_conn: try: retry_conn.close() except: pass # Start threads for each failed record for record in failed_records: thread = threading.Thread(target=process_failed_record, args=(record,)) thread.daemon = True thread.start() threads.append(thread) # Wait for all threads to complete for thread in threads: thread.join() timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Retry processing completed. Retried {total_failed} failed records") except Exception as e: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] [Batch AI Parse] Error checking failed records: {e}") finally: if conn: try: conn.close() except: pass @app.route('/manager/delete_pdf/', methods=['POST']) def delete_pdf(pdf_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("DELETE FROM genealogy_pdfs WHERE id = %s", (pdf_id,)) conn.commit() flash('PDF文件记录已删除') except Exception as e: flash(f'删除失败: {e}') finally: conn.close() return redirect(url_for('pdf_management')) @app.route('/manager/') def index(): if 'user_id' not in session: return redirect(url_for('login')) page = request.args.get('page', 1, type=int) version = request.args.get('version', '').strip() print(f"Received version parameter: '{version}'") source = request.args.get('source', '').strip() person = request.args.get('person', '').strip() file_type = request.args.get('file_type', '').strip() per_page = 10 offset = (page - 1) * per_page conn = get_db_connection() try: with conn.cursor() as cursor: query_conditions = [] params = [] if version: query_conditions.append("genealogy_version LIKE %s") params.append(f"%{version}%") if source: query_conditions.append("genealogy_source LIKE %s") params.append(f"%{source}%") if person: query_conditions.append("upload_person LIKE %s") params.append(f"%{person}%") if file_type: query_conditions.append("file_type = %s") params.append(file_type) where_clause = "" if query_conditions: where_clause = "WHERE " + " AND ".join(query_conditions) count_sql = f"SELECT COUNT(*) as count FROM genealogy_records {where_clause}" cursor.execute(count_sql, params) total = cursor.fetchone()['count'] sql = f"SELECT * FROM genealogy_records {where_clause} ORDER BY page_number ASC LIMIT %s OFFSET %s" cursor.execute(sql, params + [per_page, offset]) records = cursor.fetchall() total_pages = (total + per_page - 1) // per_page finally: conn.close() return render_template('index.html', records=records, page=page, total_pages=total_pages, version=version, source=source, person=person, file_type=file_type, total=total) @app.route('/manager/members') def members(): if 'user_id' not in session: return redirect(url_for('login')) search_name = request.args.get('name', '').strip() page = request.args.get('page', 1, type=int) per_page = 10 offset = (page - 1) * per_page print(f"[Members List] Fetching members page: {page}, search: '{search_name}', per_page: {per_page}") conn = get_db_connection() try: with conn.cursor() as cursor: # 1. Get total count if search_name: variants = expand_name_search_variants(search_name) where_parts = [] params = [] for v in variants: where_parts.append("(name LIKE %s OR simplified_name LIKE %s)") like = f"%{v}%" params.extend([like, like]) where_clause = " OR ".join(where_parts) if where_parts else "name LIKE %s" if not where_parts: params = [f"%{search_name}%"] count_sql = f"SELECT COUNT(*) as count FROM family_member_info WHERE {where_clause}" print(f"[Members List] Executing count SQL: {count_sql}") print(f"[Members List] Count SQL parameters: {params}") cursor.execute(count_sql, tuple(params)) else: count_sql = "SELECT COUNT(*) as count FROM family_member_info" print(f"[Members List] Executing count SQL: {count_sql}") cursor.execute(count_sql) result = cursor.fetchone() total = result['count'] if result else 0 total_pages = (total + per_page - 1) // per_page print(f"[Members List] Total members: {total}, total pages: {total_pages}") # 2. Get paginated results, ordered by modified_time DESC (or create_time if modified is null/same) # Using COALESCE to ensure sort works even if modified_time is NULL order_clause = "ORDER BY COALESCE(modified_time, create_time) DESC" if search_name: variants = expand_name_search_variants(search_name) where_parts = [] params = [] for v in variants: where_parts.append("(name LIKE %s OR simplified_name LIKE %s)") like = f"%{v}%" params.extend([like, like]) where_clause = " OR ".join(where_parts) if where_parts else "(name LIKE %s OR simplified_name LIKE %s)" if not where_parts: like = f"%{search_name}%" params = [like, like] sql = f"SELECT id, name, simplified_name, sex, name_word_generation, birthday, occupation, family_rank, branch_family_hall, residential_address, is_pass_away, create_time, modified_time FROM family_member_info WHERE {where_clause} {order_clause} LIMIT %s OFFSET %s" print(f"[Members List] Executing members SQL: {sql}") print(f"[Members List] Members SQL parameters: {params + [per_page, offset]}") cursor.execute(sql, tuple(params + [per_page, offset])) else: sql = f"SELECT id, name, simplified_name, sex, name_word_generation, birthday, occupation, family_rank, branch_family_hall, residential_address, is_pass_away, create_time, modified_time FROM family_member_info {order_clause} LIMIT %s OFFSET %s" print(f"[Members List] Executing members SQL: {sql}") print(f"[Members List] Members SQL parameters: {[per_page, offset]}") cursor.execute(sql, (per_page, offset)) members = cursor.fetchall() print(f"[Members List] Fetched {len(members)} members") # 格式化日期 for m in members: m['birthday_str'] = format_timestamp(m.get('birthday')) # 格式化创建时间 (针对 TIMESTAMP 字段) if m.get('create_time'): m['create_time_str'] = m['create_time'].strftime('%Y-%m-%d') if m.get('modified_time'): m['modified_time_str'] = m['modified_time'].strftime('%Y-%m-%d %H:%M') finally: print(f"[Members List] Closing database connection") conn.close() return render_template('members.html', members=members, search_name=search_name, page=page, total_pages=total_pages, total=total) @app.route('/manager/batch_genealogy') def batch_genealogy(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('batch_genealogy.html') @app.route('/manager/suspected_errors') def suspected_errors(): if 'user_id' not in session: return redirect(url_for('login')) search_name = request.args.get('name', '').strip() page = request.args.get('page', 1, type=int) per_page = 20 offset = (page - 1) * per_page conn = get_db_connection() try: with conn.cursor() as cursor: # Base query with condition for non-empty suspected_error (using TRIM to remove whitespace) base_query = "SELECT id, name, simplified_name, sex, name_word_generation, birthday, suspected_error FROM family_member_info WHERE suspected_error IS NOT NULL AND TRIM(suspected_error) != ''" count_query = "SELECT COUNT(*) as count FROM family_member_info WHERE suspected_error IS NOT NULL AND TRIM(suspected_error) != ''" # Add search condition if provided params = [] if search_name: # Support both traditional and simplified name search base_query += " AND (name LIKE %s OR simplified_name LIKE %s)" count_query += " AND (name LIKE %s OR simplified_name LIKE %s)" search_param = f"%{search_name}%" params.extend([search_param, search_param]) # Get total count cursor.execute(count_query, params) result = cursor.fetchone() total = result['count'] if result else 0 total_pages = (total + per_page - 1) // per_page # Get members with pagination base_query += " ORDER BY name LIMIT %s OFFSET %s" params.extend([per_page, offset]) cursor.execute(base_query, params) members = cursor.fetchall() # Format birthday for display for member in members: if member['birthday']: member['birthday_str'] = format_timestamp(member['birthday']) else: member['birthday_str'] = '未知' finally: conn.close() return render_template('suspected_errors.html', members=members, search_name=search_name, page=page, total_pages=total_pages, total=total) @app.route('/manager/tree') def tree(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('tree.html') @app.route('/manager/lineage_query') def lineage_query(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('lineage_query.html') @app.route('/manager/tree_classic') def tree_classic(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('tree_classic.html') @app.route('/manager/api/tree_data') def tree_data(): if 'user_id' not in session: return jsonify({"error": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: # 获取所有成员 cursor.execute("SELECT id, name, simplified_name, sex, family_rank, name_word_generation FROM family_member_info") members = cursor.fetchall() # 获取所有关系 (1:父子 2:母子 10:夫妻 11:兄弟 12:姐妹),包括子类型 cursor.execute("SELECT parent_mid, child_mid, relation_type, sub_relation_type FROM family_relation_info") relations = cursor.fetchall() return jsonify({"members": members, "relations": relations}) finally: conn.close() @app.route('/manager/api/search_member', methods=['POST']) def search_member(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 data = request.get_json() keyword = data.get('keyword', '').strip() if not keyword: return jsonify({"success": False, "message": "请输入搜索关键词"}) conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT id, name, simplified_name FROM family_member_info WHERE name LIKE %s OR simplified_name LIKE %s OR former_name LIKE %s ORDER BY CASE WHEN name = %s THEN 1 WHEN simplified_name = %s THEN 2 WHEN name LIKE %s THEN 3 WHEN simplified_name LIKE %s THEN 4 ELSE 5 END """, (f'%{keyword}%', f'%{keyword}%', f'%{keyword}%', keyword, keyword, f'{keyword}%', f'{keyword}%')) members = cursor.fetchall() if members: return jsonify({"success": True, "members": members}) else: return jsonify({"success": False, "message": "未找到匹配的成员"}) finally: conn.close() @app.route('/manager/api/get_lineage/') def get_lineage(member_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 import time start_time = time.time() print(f"[Lineage Query] Starting query for member_id: {member_id} at {time.strftime('%Y-%m-%d %H:%M:%S')}") conn = get_db_connection() try: with conn.cursor() as cursor: # Step 1: Get center person step_start = time.time() cursor.execute("SELECT id, name, simplified_name, name_word, name_word_generation FROM family_member_info WHERE id = %s", (member_id,)) center = cursor.fetchone() print(f"[Lineage Query] Step 1 - Get center: {time.time() - step_start:.3f}s") if not center: return jsonify({"success": False, "message": "成员不存在"}) # Step 2: Get ancestors with their siblings (generations) step_start = time.time() generations = [] # Array of generations, each with main ancestor and siblings current_id = member_id max_depth = 15 ancestor_ids = [] # Track ancestor IDs for exclusion when expanding displayed_ids = set() # Track IDs that are already displayed displayed_ids.add(member_id) # Center person is displayed for depth in range(max_depth): # 获取所有父母关系(支持出继/入继) cursor.execute(""" SELECT p.id, p.name, p.simplified_name, p.name_word, p.name_word_generation, EXISTS(SELECT 1 FROM family_relation_info WHERE parent_mid = p.id AND relation_type IN (1, 2)) as has_children, r.sub_relation_type FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type IN (1, 2) """, (current_id,)) parents = cursor.fetchall() if not parents: break # 优先选择直系父母(非出继),如果都是出继/入继,选择入继 parent = None adoptive_parent = None for p in parents: if p['sub_relation_type'] == 2: # 出继(亲生父母) parent = p elif p['sub_relation_type'] == 3: # 入继(养父母) adoptive_parent = p else: # 普通关系(亲生) parent = p # 如果没有找到普通父母,使用入继父母 if not parent: parent = adoptive_parent ancestor_ids.append(parent['id']) displayed_ids.add(parent['id']) # Get siblings of this ancestor (father's brothers) # First get grandparent (parent's father) cursor.execute(""" SELECT gp.id FROM family_relation_info r JOIN family_member_info gp ON r.parent_mid = gp.id WHERE r.child_mid = %s AND r.relation_type IN (1, 2) LIMIT 1 """, (parent['id'],)) grandparent = cursor.fetchone() parent_siblings = [] if grandparent: # Get siblings of parent (father's brothers) cursor.execute(""" SELECT c.id, c.name, c.simplified_name, c.name_word, c.name_word_generation, EXISTS(SELECT 1 FROM family_relation_info WHERE parent_mid = c.id AND relation_type IN (1, 2)) as has_children FROM family_relation_info r JOIN family_member_info c ON r.child_mid = c.id WHERE r.parent_mid = %s AND r.relation_type IN (1, 2) AND c.id != %s ORDER BY COALESCE(r.child_order, 99999), c.id LIMIT 30 """, (grandparent['id'], parent['id'])) parent_siblings = cursor.fetchall() # Mark sibling IDs as displayed for sibling in parent_siblings: displayed_ids.add(sibling['id']) # Check if parent has any children NOT already displayed # Only show expand button if there are undisplayed children cursor.execute(""" SELECT COUNT(*) as count FROM family_relation_info r JOIN family_member_info c ON r.child_mid = c.id WHERE r.parent_mid = %s AND r.relation_type IN (1, 2) """, (parent['id'],)) total_children = cursor.fetchone()['count'] # Check if current child is displayed (current_id is the child of parent) child_displayed = current_id in displayed_ids # Show expand if there are children not displayed show_expand = total_children > (1 if child_displayed else 0) parent['show_expand'] = show_expand generations.append({ 'ancestor': parent, 'siblings': parent_siblings, 'depth': depth }) current_id = parent['id'] print(f"[Lineage Query] Step 2 - Get generations ({len(generations)}): {time.time() - step_start:.3f}s") # Step 3: Get immediate children only (limited count) step_start = time.time() # 获取所有子女(包括出继和入继) cursor.execute(""" SELECT c.id, c.name, c.simplified_name, c.name_word, c.name_word_generation, EXISTS(SELECT 1 FROM family_relation_info WHERE parent_mid = c.id AND relation_type IN (1, 2)) as has_children, r.sub_relation_type FROM family_relation_info r JOIN family_member_info c ON r.child_mid = c.id WHERE r.parent_mid = %s AND r.relation_type IN (1, 2) ORDER BY COALESCE(r.child_order, 99999), c.id LIMIT 30 """, (member_id,)) children = cursor.fetchall() # 对于出继的子女,需要获取他们入继到的家庭信息 for child in children: if child['sub_relation_type'] == 2: # 出继 # 查找该子女入继到的父母 cursor.execute(""" SELECT p.id, p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.sub_relation_type = 3 LIMIT 1 """, (child['id'],)) adoptive_parent = cursor.fetchone() if adoptive_parent: child['adoptive_parent_name'] = adoptive_parent['name'] if adoptive_parent['simplified_name'] and adoptive_parent['simplified_name'] != adoptive_parent['name']: child['adoptive_parent_name'] += f" ({adoptive_parent['simplified_name']})" # Initialize children array for child in children: child['children'] = [] print(f"[Lineage Query] Step 3 - Get children ({len(children)}): {time.time() - step_start:.3f}s") # Step 4: Get siblings of center person step_start = time.time() siblings = [] if generations: parent_id = generations[0]['ancestor']['id'] # Father cursor.execute(""" SELECT c.id, c.name, c.simplified_name, c.name_word, c.name_word_generation, EXISTS(SELECT 1 FROM family_relation_info WHERE parent_mid = c.id AND relation_type IN (1, 2)) as has_children, r.sub_relation_type FROM family_relation_info r JOIN family_member_info c ON r.child_mid = c.id WHERE r.parent_mid = %s AND r.relation_type IN (1, 2) AND c.id != %s ORDER BY COALESCE(r.child_order, 99999), c.id LIMIT 30 """, (parent_id, member_id)) siblings = cursor.fetchall() print(f"[Lineage Query] Step 4 - Get siblings ({len(siblings)}): {time.time() - step_start:.3f}s") total_time = time.time() - start_time print(f"[Lineage Query] Total time: {total_time:.3f}s") return jsonify({ "success": True, "data": { "center": center, "generations": generations, "ancestor_ids": ancestor_ids, "siblings": siblings, "children": children } }) except Exception as e: print(f"[Lineage Query] Error: {e}") return jsonify({"success": False, "message": str(e)}) finally: conn.close() @app.route('/manager/api/get_descendants/') def get_descendants(parent_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 # Get excluded IDs from query parameter excluded_ids = request.args.get('exclude', '') excluded_list = [] if excluded_ids: excluded_list = [int(id.strip()) for id in excluded_ids.split(',') if id.strip().isdigit()] print(f"[get_descendants] Parent ID: {parent_id}, Excluded IDs: {excluded_list}") conn = get_db_connection() try: with conn.cursor() as cursor: if excluded_list: # Build query with exclusion placeholders = ', '.join(['%s'] * len(excluded_list)) cursor.execute(f""" SELECT c.id, c.name, c.simplified_name, c.name_word, c.name_word_generation, EXISTS(SELECT 1 FROM family_relation_info WHERE parent_mid = c.id AND relation_type IN (1, 2)) as has_children FROM family_relation_info r JOIN family_member_info c ON r.child_mid = c.id WHERE r.parent_mid = %s AND r.relation_type IN (1, 2) AND c.id NOT IN ({placeholders}) ORDER BY COALESCE(r.child_order, 99999), c.id LIMIT 20 """, (parent_id,) + tuple(excluded_list)) else: cursor.execute(""" SELECT c.id, c.name, c.simplified_name, c.name_word, c.name_word_generation, EXISTS(SELECT 1 FROM family_relation_info WHERE parent_mid = c.id AND relation_type IN (1, 2)) as has_children FROM family_relation_info r JOIN family_member_info c ON r.child_mid = c.id WHERE r.parent_mid = %s AND r.relation_type IN (1, 2) ORDER BY COALESCE(r.child_order, 99999), c.id LIMIT 20 """, (parent_id,)) children = cursor.fetchall() for child in children: child['children'] = [] return jsonify({"success": True, "children": children}) finally: conn.close() @app.route('/manager/api/save_relation', methods=['POST']) def save_relation(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 data = request.json source_mid = data.get('source_mid') # The member being dragged target_mid = data.get('target_mid') # The member being dropped onto rel_type = int(data.get('relation_type')) sub_rel_type = int(data.get('sub_relation_type', 0)) if not source_mid or not target_mid or not rel_type: return jsonify({"success": False, "message": "参数不完整"}), 400 conn = get_db_connection() try: with conn.cursor() as cursor: # 简单处理:如果是父子/母子关系 # target_mid 是父辈,source_mid 是子辈 parent_mid = target_mid child_mid = source_mid gen_diff = 1 if rel_type == 10: # 夫妻 # 夫妻关系中,我们通常把关联人设为 parent_mid parent_mid = target_mid child_mid = source_mid gen_diff = 0 elif rel_type in [11, 12]: # 兄弟姐妹 # 这里逻辑上比较复杂,通常兄弟姐妹有共同父母。 # 简化处理:暂时存为同级关系 (gen_diff=0) parent_mid = target_mid child_mid = source_mid gen_diff = 0 # 删除旧关系 cursor.execute("DELETE FROM family_relation_info WHERE source_mid = %s", (source_mid,)) # 插入新关系 sql = """ INSERT INTO family_relation_info (parent_mid, child_mid, relation_type, sub_relation_type, source_mid, generation_diff) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute(sql, (parent_mid, child_mid, rel_type, sub_rel_type, source_mid, gen_diff)) conn.commit() return jsonify({"success": True, "message": "关系已保存"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 finally: conn.close() @app.route('/manager/api/members') def get_members(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 page = int(request.args.get('page', 1)) search = request.args.get('search', '') per_page = 10 offset = (page - 1) * per_page conn = get_db_connection() try: with conn.cursor() as cursor: # Count total members if search: cursor.execute("SELECT COUNT(*) as total FROM family_member_info WHERE name LIKE %s OR simplified_name LIKE %s", (f'%{search}%', f'%{search}%')) else: cursor.execute("SELECT COUNT(*) as total FROM family_member_info") total_result = cursor.fetchone() total = total_result['total'] if total_result else 0 # Get members for current page with father information if search: cursor.execute(""" SELECT fmi.id, fmi.name, fmi.simplified_name, fmi.sex, fmi.name_word_generation, father.name as father_name, father.simplified_name as father_simplified_name, father.name_word_generation as father_generation FROM family_member_info fmi LEFT JOIN family_relation_info fri ON fmi.id = fri.child_mid AND fri.relation_type IN (1, 2) LEFT JOIN family_member_info father ON fri.parent_mid = father.id WHERE fmi.name LIKE %s OR fmi.simplified_name LIKE %s LIMIT %s OFFSET %s """, (f'%{search}%', f'%{search}%', per_page, offset)) else: cursor.execute(""" SELECT fmi.id, fmi.name, fmi.simplified_name, fmi.sex, fmi.name_word_generation, father.name as father_name, father.simplified_name as father_simplified_name, father.name_word_generation as father_generation FROM family_member_info fmi LEFT JOIN family_relation_info fri ON fmi.id = fri.child_mid AND fri.relation_type IN (1, 2) LEFT JOIN family_member_info father ON fri.parent_mid = father.id LIMIT %s OFFSET %s """, (per_page, offset)) members = cursor.fetchall() # Convert to list of dictionaries if needed members_list = [] for member in members: members_list.append({ 'id': member['id'], 'name': member['name'], 'simplified_name': member['simplified_name'], 'sex': member['sex'], 'name_word_generation': member.get('name_word_generation'), 'father_name': member.get('father_name'), 'father_simplified_name': member.get('father_simplified_name'), 'father_generation': member.get('father_generation') }) return jsonify({"success": True, "members": members_list, "total": total}) except Exception as e: return jsonify({"success": False, "message": f"获取成员失败: {e}"}), 500 finally: conn.close() def call_doubao_api(prompt, image_url=None): """调用豆包API处理文本""" api_key = "a1800657-9212-4afe-9b7c-b49f015c54d3" api_url = "https://ark.cn-beijing.volces.com/api/v3/responses" payload = { "model": "doubao-seed-1-8-251228", "stream": False, "input": [ { "role": "user", "content": [ {"type": "input_text", "text": prompt} ] } ] } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.post( api_url, json=payload, headers=headers, timeout=120, verify=False, proxies={"http": None, "https": None} ) if response.status_code == 200: result = response.json() print(f"[AI API] Raw response: {result}") # 解析响应 - 尝试多种格式 if 'output' in result: for item in result['output']: if item.get('type') == 'message': content = item.get('content') if isinstance(content, str): return content elif isinstance(content, list): for part in content: if isinstance(part, dict) and part.get('type') == 'text': return part.get('text', '') elif isinstance(content, dict) and 'text' in content: return content.get('text', '') # 尝试其他响应格式 if 'choices' in result and len(result['choices']) > 0: message = result['choices'][0].get('message', {}) return message.get('content', '') # 尝试直接获取文本内容 if 'text' in result: return result['text'] # 尝试获取响应中的message if 'message' in result: msg = result['message'] if isinstance(msg, str): return msg elif isinstance(msg, dict) and 'content' in msg: return msg['content'] # 返回字符串形式 return str(result) else: print(f"[AI API] Error: {response.status_code} - {response.text}") return None except Exception as e: print(f"[AI API] Exception: {e}") return None def parse_ai_response(ai_response): """解析AI响应,提取族谱原文""" if not ai_response: return None, None # 尝试从响应中提取JSON try: # 移除可能的markdown代码块标记 text = ai_response.strip() if text.startswith('```json'): text = text[7:] if text.endswith('```'): text = text[:-3] text = text.strip() # 尝试解析JSON result = json.loads(text) traditional = result.get('genealogy_traditional', '') simplified = result.get('genealogy_simplified', '') if traditional or simplified: return traditional, simplified except json.JSONDecodeError: print(f"[AI Parse] JSON decode error: {ai_response[:200]}") # 如果JSON解析失败,尝试直接提取文本 # 尝试匹配模式 import re traditional_match = re.search(r'genealogy_traditional["\']?\s*[,:]\s*["\']([^"\']+)["\']', ai_response) simplified_match = re.search(r'genealogy_simplified["\']?\s*[,:]\s*["\']([^"\']+)["\']', ai_response) traditional = traditional_match.group(1) if traditional_match else '' simplified = simplified_match.group(1) if simplified_match else '' return traditional, simplified @app.route('/manager/api/members/empty_genealogy', methods=['GET']) def get_members_empty_genealogy(): """获取族谱原文为空的成员列表""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 20)) offset = (page - 1) * per_page conn = get_db_connection() try: with conn.cursor() as cursor: # Count total cursor.execute(""" SELECT COUNT(*) as total FROM family_member_info WHERE (genealogy_original_traditional IS NULL OR genealogy_original_traditional = '' OR genealogy_original_traditional = 'None') AND (genealogy_original_simplified IS NULL OR genealogy_original_simplified = '' OR genealogy_original_simplified = 'None') """) total_result = cursor.fetchone() total = total_result['total'] if total_result else 0 # Get members cursor.execute(""" SELECT id, name, simplified_name, name_word_generation, sex, occupation, notes, birth_place FROM family_member_info WHERE (genealogy_original_traditional IS NULL OR genealogy_original_traditional = '' OR genealogy_original_traditional = 'None') AND (genealogy_original_simplified IS NULL OR genealogy_original_simplified = '' OR genealogy_original_simplified = 'None') LIMIT %s OFFSET %s """, (per_page, offset)) members = cursor.fetchall() # 关联查询父亲信息 member_list = [] for member in members: cursor.execute(""" SELECT p.name, p.simplified_name, p.name_word_generation FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 1 LIMIT 1 """, (member['id'],)) father = cursor.fetchone() cursor.execute(""" SELECT p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 2 LIMIT 1 """, (member['id'],)) mother = cursor.fetchone() member_list.append({ 'id': member['id'], 'name': member['name'], 'simplified_name': member['simplified_name'], 'name_word_generation': member['name_word_generation'], 'sex': member['sex'], 'occupation': member['occupation'], 'notes': member['notes'], 'birth_place': member['birth_place'], 'father_name': father['name'] if father else None, 'father_simplified_name': father['simplified_name'] if father else None, 'father_generation': father['name_word_generation'] if father else None, 'mother_name': mother['name'] if mother else None, 'mother_simplified_name': mother['simplified_name'] if mother else None }) return jsonify({"success": True, "members": member_list, "total": total}) except Exception as e: return jsonify({"success": False, "message": f"获取成员失败: {e}"}), 500 finally: conn.close() @app.route('/manager/api/members/batch_process_genealogy', methods=['POST']) def batch_process_genealogy(): """批量处理成员族谱原文""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 data = request.get_json() member_ids = data.get('member_ids', []) if not member_ids or len(member_ids) > 10: return jsonify({"success": False, "message": "请选择1-10个成员进行处理"}), 400 conn = get_db_connection() results = [] try: for member_id in member_ids: with conn.cursor() as cursor: cursor.execute(""" SELECT id, name, simplified_name, name_word_generation, birth_place, occupation, notes, sex FROM family_member_info WHERE id = %s """, (member_id,)) member = cursor.fetchone() # 获取父亲信息 cursor.execute(""" SELECT p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 1 LIMIT 1 """, (member_id,)) father = cursor.fetchone() # 获取母亲信息 cursor.execute(""" SELECT p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 2 LIMIT 1 """, (member_id,)) mother = cursor.fetchone() member['father_name'] = father['name'] if father else None member['father_simplified_name'] = father['simplified_name'] if father else None member['mother_name'] = mother['name'] if mother else None member['mother_simplified_name'] = mother['simplified_name'] if mother else None if not member: results.append({"member_id": member_id, "success": False, "message": "成员不存在"}) continue # 构建AI提示词 member_info = f""" 姓名(繁体):{member['name']} 姓名(简体):{member['simplified_name'] or '未知'} 世系世代:{member['name_word_generation'] or '未知'} 父亲姓名:{member['father_name'] or '未知'} 母亲姓名:{member['mother_name'] or '未知'} 出生地:{member['birth_place'] or '未知'} 职业:{member['occupation'] or '未知'} 备注:{member['notes'] or '无'} """ prompt = f""" 请根据以下人员信息,模拟生成该人员的族谱原文: {member_info} 请输出两个字段: 1. genealogy_traditional: 族谱原文(繁体中文,模仿传统族谱格式) 2. genealogy_simplified: 族谱原文(简体中文,将繁体转换为简体) 请严格按照JSON格式输出,不要包含任何额外解释: {{ "genealogy_traditional": "繁体族谱原文内容", "genealogy_simplified": "简体族谱原文内容" }} """ ai_response = call_doubao_api(prompt) print(f"[AI Response] Member {member_id}: {ai_response}") if ai_response: # 使用新的解析函数 traditional, simplified = parse_ai_response(ai_response) if traditional or simplified: with conn.cursor() as cursor: cursor.execute(""" UPDATE family_member_info SET genealogy_original_traditional = %s, genealogy_original_simplified = %s WHERE id = %s """, (traditional, simplified, member_id)) conn.commit() results.append({ "member_id": member_id, "name": member['name'], "success": True, "traditional": traditional[:100] + "..." if len(traditional) > 100 else traditional, "simplified": simplified[:100] + "..." if len(simplified) > 100 else simplified }) else: results.append({ "member_id": member_id, "name": member['name'], "success": False, "message": "AI未返回有效数据" }) else: results.append({ "member_id": member_id, "name": member['name'], "success": False, "message": "AI调用失败" }) return jsonify({"success": True, "results": results}) except Exception as e: print(f"[Batch Process] Exception: {e}") return jsonify({"success": False, "message": f"批量处理失败: {e}"}), 500 finally: conn.close() @app.route('/manager/api/member/') def get_member(member_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT id, name, name_word_generation, source_record_id FROM family_member_info WHERE id = %s", (member_id,)) member = cursor.fetchone() if not member: return jsonify({"success": False, "message": "成员不存在"}), 404 return jsonify({"member": member}) except Exception as e: return jsonify({"success": False, "message": f"获取成员失败: {e}"}), 500 finally: conn.close() @app.route('/manager/api/check_relations', methods=['POST']) def check_relations(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 data = request.json people = data.get('people', []) if not people: return jsonify({"success": False, "matches": {}}) conn = get_db_connection() matches = {} try: with conn.cursor() as cursor: # Collect all father names and spouse names to query names_to_check = set() for p in people: if p.get('father_name'): names_to_check.add(p['father_name']) if p.get('spouse_name'): names_to_check.add(p['spouse_name']) if not names_to_check: return jsonify({"success": True, "matches": {}}) # Query DB format_strings = ','.join(['%s'] * len(names_to_check)) if names_to_check: sql = "SELECT id, name, simplified_name, sex, birthday FROM family_member_info WHERE name IN (%s) OR simplified_name IN (%s)" % (format_strings, format_strings) cursor.execute(sql, tuple(names_to_check) * 2) results = cursor.fetchall() else: results = [] # Organize by name db_map = {} # name -> [list of members] for r in results: # Add under 'name' (Traditional/Old Simplified) if r['name'] not in db_map: db_map[r['name']] = [] db_map[r['name']].append(r) # Add under 'simplified_name' if exists if r.get('simplified_name'): sname = r['simplified_name'] if sname not in db_map: db_map[sname] = [] # Avoid duplicates if simplified_name is same as name? # The list might contain same object reference, which is fine. if sname != r['name']: db_map[sname].append(r) # Build matches for each input person for index, p in enumerate(people): p_match = {} # Check Father fname = p.get('father_name') if fname and fname in db_map: candidates = db_map[fname] # Filter: Father should be Male usually, and older than child (if birthday available) valid_fathers = [c for c in candidates if c['sex'] == 1] if valid_fathers: p_match['father'] = valid_fathers # Return all candidates # Check Spouse sname = p.get('spouse_name') if sname and sname in db_map: candidates = db_map[sname] # Filter: Spouse usually opposite sex target_sex = 1 if p.get('sex') == '女' else 2 valid_spouses = [c for c in candidates if c['sex'] == target_sex] if valid_spouses: p_match['spouse'] = valid_spouses if p_match: matches[index] = p_match return jsonify({"success": True, "matches": matches}) finally: conn.close() @app.route('/manager/add_member', methods=['GET', 'POST']) def add_member(): if 'user_id' not in session: return redirect(url_for('login')) conn = get_db_connection() try: # Check for source_record_id (from GET or POST) source_record_id = request.args.get('record_id') or request.form.get('source_record_id') prefilled_content = None source_oss_url = None if source_record_id: with conn.cursor() as cursor: cursor.execute("SELECT oss_url, ai_content, ai_status FROM genealogy_records WHERE id = %s", (source_record_id,)) rec = cursor.fetchone() if rec: source_oss_url = rec['oss_url'] # Check ai_status (2 = success) if rec['ai_status'] == 2 and rec['ai_content']: prefilled_content = rec['ai_content'] if request.method == 'POST': # 处理生日转换为 Unix 时间戳 birthday_str = request.form.get('birthday') birthday_ts = 0 if birthday_str: try: birthday_ts = int(datetime.strptime(birthday_str, '%Y-%m-%d').timestamp()) except ValueError: birthday_ts = 0 # 关系数据 - 支持多条关系 relations = [] # Parse relations from form data i = 0 while True: parent_mid = request.form.get(f'relations[{i}][parent_mid]') rel_type = request.form.get(f'relations[{i}][relation_type]') sub_rel_type = request.form.get(f'relations[{i}][sub_relation_type]', '0') if not parent_mid or not rel_type: break relations.append({ 'parent_mid': int(parent_mid), 'relation_type': int(rel_type), 'sub_relation_type': int(sub_rel_type) }) i += 1 # For backward compatibility, check old-style single relation if not relations: related_mid = request.form.get('related_mid') relation_type = request.form.get('relation_type') if related_mid and relation_type: relations.append({ 'parent_mid': int(related_mid), 'relation_type': int(relation_type), 'sub_relation_type': int(request.form.get('sub_relation_type', '0')) }) # 年龄校验逻辑 for rel in relations: if rel['relation_type'] in [1, 2]: # 1:父子 2:母子 with conn.cursor() as cursor: cursor.execute("SELECT name, birthday FROM family_member_info WHERE id = %s", (rel['parent_mid'],)) parent = cursor.fetchone() if parent and parent['birthday'] > 0 and birthday_ts > 0: if birthday_ts < parent['birthday']: error_msg = f"数据冲突:成员年龄不能比其父亲/母亲({parent['name']})大,请检查并修正出生日期。" flash(error_msg) # Re-fetch data for rendering cursor.execute("SELECT id, name FROM family_member_info ORDER BY name") all_members = cursor.fetchall() cursor.execute("SELECT * FROM genealogy_records ORDER BY page_number ASC") images = cursor.fetchall() if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.is_json: return jsonify({ "success": False, "message": error_msg }), 400 selected_member_name = '' return render_template('add_member.html', all_members=all_members, images=images, prefilled_content=prefilled_content, source_oss_url=source_oss_url, source_record_id=source_record_id, selected_member_name=selected_member_name) break # 获取表单数据 data = { 'name': request.form['name'], 'simplified_name': request.form.get('simplified_name'), 'genealogy_original_traditional': request.form.get('genealogy_original_traditional'), 'genealogy_original_simplified': request.form.get('genealogy_original_simplified'), 'former_name': request.form.get('former_name'), 'childhood_name': request.form.get('childhood_name'), 'name_word': request.form.get('name_word'), 'name_word_generation': ';'.join([g.strip() for g in request.form.getlist('lineage_generations[]') if g.strip()]), 'name_title': request.form.get('name_title'), 'sex': request.form['sex'], 'birthday': birthday_ts, 'is_pass_away': request.form.get('is_pass_away', 0), 'marital_status': request.form.get('marital_status', 0), 'birth_place': request.form.get('birth_place'), 'branch_family_hall': request.form.get('branch_family_hall'), 'cluster_place': request.form.get('cluster_place'), 'nation': request.form.get('nation'), 'residential_address': request.form.get('residential_address'), 'phone': request.form.get('phone'), 'mail': request.form.get('mail'), 'wechat_account': request.form.get('wechat_account'), 'id_number': request.form.get('id_number'), 'occupation': request.form.get('occupation'), 'educational': request.form.get('educational'), 'blood_type': request.form.get('blood_type'), 'religion': request.form.get('religion'), 'hobbies': request.form.get('hobbies'), 'personal_achievements': request.form.get('personal_achievements'), 'family_rank': request.form.get('family_rank'), 'tags': request.form.get('tags'), 'notes': request.form.get('notes'), 'suspected_error': request.form.get('suspected_error').strip() if request.form.get('suspected_error') else '', 'source_record_id': request.form.get('source_record_id') or None, # Save source record ID 'create_uid': session['user_id'] # 记录当前操作人 } # ... (rest of logic) ... with conn.cursor() as cursor: print(f"[Add Member] Inserting member data: {data}") fields = ", ".join(data.keys()) placeholders = ", ".join(["%s"] * len(data)) sql = f"INSERT INTO family_member_info ({fields}) VALUES ({placeholders})" print(f"[Add Member] Executing SQL: {sql}") print(f"[Add Member] SQL parameters: {list(data.values())}") cursor.execute(sql, list(data.values())) member_id = cursor.lastrowid print(f"[Add Member] Inserted member with ID: {member_id}") # 录入关系(支持多条) sql_relation = """ INSERT INTO family_relation_info (parent_mid, child_mid, relation_type, sub_relation_type, source_mid, generation_diff, child_order) VALUES (%s, %s, %s, %s, %s, %s, %s) """ for rel in relations: rel_type = rel['relation_type'] parent_mid = rel['parent_mid'] sub_relation_type = rel['sub_relation_type'] child_order = rel.get('child_order') if rel_type in [1, 2] else None gen_diff = 1 if rel_type in [1, 2] else 0 print(f"[Add Member] Inserting relation: parent_mid={parent_mid}, child_mid={member_id}, relation_type={rel_type}, sub_relation_type={sub_relation_type}, child_order={child_order}") cursor.execute(sql_relation, (parent_mid, member_id, rel_type, sub_relation_type, member_id, gen_diff, child_order)) # Update AI Record Status if applicable source_record_id = data.get('source_record_id') source_index = request.form.get('source_index') if source_record_id and source_index and source_index.isdigit(): try: idx = int(source_index) print(f"[Add Member] Updating AI record status: record_id={source_record_id}, index={idx}") cursor.execute("SELECT ai_content FROM genealogy_records WHERE id = %s FOR UPDATE", (source_record_id,)) rec = cursor.fetchone() if rec and rec['ai_content']: import json content = json.loads(rec['ai_content']) # Ensure content is a list (it might be a dict if single object, though we try to normalize) if isinstance(content, dict): content = [content] if isinstance(content, list): updated = False if 0 <= idx < len(content): # Always update the status regardless of current value content[idx]['is_imported'] = True content[idx]['imported_member_id'] = member_id updated = True if updated: new_content = json.dumps(content, ensure_ascii=False) cursor.execute("UPDATE genealogy_records SET ai_content = %s WHERE id = %s", (new_content, source_record_id)) print(f"[Add Member] Updated AI record status") except Exception as e: print(f"[Add Member] Error updating AI content status: {e}") print(f"[Add Member] Committing transaction") if safe_commit(conn): print(f"[Add Member] Transaction committed successfully") if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.is_json: return jsonify({"success": True, "message": "成员录入成功", "member_id": member_id}) flash('成员录入成功') return redirect(url_for('members')) else: print(f"[Add Member] Transaction commit failed!") if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.is_json: return jsonify({"success": False, "message": "成员录入失败,事务提交失败"}), 500 flash('成员录入失败,事务提交失败') return redirect(url_for('add_member')) with conn.cursor() as cursor: cursor.execute("SELECT id, name FROM family_member_info ORDER BY name") all_members = cursor.fetchall() cursor.execute("SELECT * FROM genealogy_records ORDER BY page_number ASC") images = cursor.fetchall() except Exception as e: flash(f'发生错误: {e}') all_members = [] images = [] finally: conn.close() selected_member_name = '' return render_template('add_member.html', all_members=all_members, images=images, prefilled_content=prefilled_content, source_oss_url=source_oss_url, source_record_id=source_record_id, selected_member_name=selected_member_name) @app.route('/manager/edit_member/', methods=['GET', 'POST']) def edit_member(member_id): if 'user_id' not in session: return redirect(url_for('login')) conn = get_db_connection() try: if request.method == 'POST': birthday_str = request.form.get('birthday') birthday_ts = 0 if birthday_str: try: birthday_ts = int(datetime.strptime(birthday_str, '%Y-%m-%d').timestamp()) except ValueError: birthday_ts = 0 # 关系数据 - 支持多条关系 relations = [] i = 0 while True: parent_mid = request.form.get(f'relations[{i}][parent_mid]') rel_type = request.form.get(f'relations[{i}][relation_type]') sub_rel_type = request.form.get(f'relations[{i}][sub_relation_type]', '0') child_order_raw = request.form.get(f'relations[{i}][child_order]', '') if not parent_mid or not rel_type: break child_order = int(child_order_raw) if child_order_raw.strip().isdigit() else None relations.append({ 'parent_mid': int(parent_mid), 'relation_type': int(rel_type), 'sub_relation_type': int(sub_rel_type), 'child_order': child_order, }) i += 1 # For backward compatibility if not relations: related_mid = request.form.get('related_mid') relation_type = request.form.get('relation_type') if related_mid and relation_type: child_order_raw = request.form.get('child_order', '') relations.append({ 'parent_mid': int(related_mid), 'relation_type': int(relation_type), 'sub_relation_type': int(request.form.get('sub_relation_type', '0')), 'child_order': int(child_order_raw) if child_order_raw.strip().isdigit() else None, }) # 年龄校验逻辑 for rel in relations: if rel['relation_type'] in [1, 2]: with conn.cursor() as cursor: cursor.execute("SELECT name, birthday FROM family_member_info WHERE id = %s", (rel['parent_mid'],)) parent = cursor.fetchone() if parent and parent['birthday'] > 0 and birthday_ts > 0: if birthday_ts < parent['birthday']: flash(f"数据冲突:成员年龄不能比其父亲/母亲({parent['name']})大,请检查并修正出生日期。") # 重新加载编辑页所需数据 cursor.execute("SELECT * FROM family_member_info WHERE id = %s", (member_id,)) member = cursor.fetchone() member['birthday_date'] = birthday_str # 保持用户输入 cursor.execute("SELECT id, name FROM family_member_info WHERE id != %s ORDER BY name", (member_id,)) all_members = cursor.fetchall() cursor.execute("SELECT * FROM genealogy_records ORDER BY page_number ASC") images = cursor.fetchall() if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.is_json: return jsonify({ "success": False, "message": f"数据冲突:成员年龄不能比其父亲/母亲({parent['name']})大,请检查并修正出生日期。" }), 400 selected_member_name = '' return render_template('add_member.html', member=member, images=images, all_members=all_members, selected_member_name=selected_member_name) break data = { 'name': request.form['name'], 'simplified_name': request.form.get('simplified_name'), 'genealogy_original_traditional': request.form.get('genealogy_original_traditional'), 'genealogy_original_simplified': request.form.get('genealogy_original_simplified'), 'former_name': request.form.get('former_name'), 'childhood_name': request.form.get('childhood_name'), 'name_word': request.form.get('name_word'), 'name_word_generation': ';'.join([g.strip() for g in request.form.getlist('lineage_generations[]') if g.strip()]), 'name_title': request.form.get('name_title'), 'sex': request.form['sex'], 'birthday': birthday_ts, 'is_pass_away': request.form.get('is_pass_away', 0), 'marital_status': request.form.get('marital_status', 0), 'birth_place': request.form.get('birth_place'), 'branch_family_hall': request.form.get('branch_family_hall'), 'cluster_place': request.form.get('cluster_place'), 'nation': request.form.get('nation'), 'residential_address': request.form.get('residential_address'), 'phone': request.form.get('phone'), 'mail': request.form.get('mail'), 'wechat_account': request.form.get('wechat_account'), 'id_number': request.form.get('id_number'), 'occupation': request.form.get('occupation'), 'educational': request.form.get('educational'), 'blood_type': request.form.get('blood_type'), 'religion': request.form.get('religion'), 'hobbies': request.form.get('hobbies'), 'personal_achievements': request.form.get('personal_achievements'), 'family_rank': request.form.get('family_rank'), 'tags': request.form.get('tags'), 'notes': request.form.get('notes'), 'suspected_error': request.form.get('suspected_error').strip() if request.form.get('suspected_error') else '', 'source_record_id': request.form.get('source_record_id') or None, 'create_uid': session['user_id'] # 记录当前操作人 } with conn.cursor() as cursor: print(f"[Edit Member] Updating member data: {data}") update_parts = [f"{k} = %s" for k in data.keys()] sql = f"UPDATE family_member_info SET {', '.join(update_parts)} WHERE id = %s" print(f"[Edit Member] Executing SQL: {sql}") print(f"[Edit Member] SQL parameters: {list(data.values()) + [member_id]}") cursor.execute(sql, list(data.values()) + [member_id]) print(f"[Edit Member] Updated member with ID: {member_id}") # 更新关系(支持多条) print(f"[Edit Member] Deleting existing relations for member ID: {member_id}") cursor.execute("DELETE FROM family_relation_info WHERE source_mid = %s", (member_id,)) sql_relation = """ INSERT INTO family_relation_info (parent_mid, child_mid, relation_type, sub_relation_type, source_mid, generation_diff, child_order) VALUES (%s, %s, %s, %s, %s, %s, %s) """ for rel in relations: rel_type = rel['relation_type'] parent_mid = rel['parent_mid'] sub_relation_type = rel['sub_relation_type'] child_order = rel.get('child_order') if rel_type in [1, 2] else None gen_diff = 1 if rel_type in [1, 2] else 0 print(f"[Edit Member] Inserting relation: parent_mid={parent_mid}, child_mid={member_id}, relation_type={rel_type}, sub_relation_type={sub_relation_type}, child_order={child_order}") cursor.execute(sql_relation, (parent_mid, member_id, rel_type, sub_relation_type, member_id, gen_diff, child_order)) # Update AI Record Status if applicable source_record_id = data.get('source_record_id') source_index = request.form.get('source_index') if source_record_id and source_index and source_index.isdigit(): try: idx = int(source_index) print(f"[Edit Member] Updating AI record status: record_id={source_record_id}, index={idx}") cursor.execute("SELECT ai_content FROM genealogy_records WHERE id = %s FOR UPDATE", (source_record_id,)) rec = cursor.fetchone() if rec and rec['ai_content']: import json content = json.loads(rec['ai_content']) if isinstance(content, dict): content = [content] if isinstance(content, list): updated = False if 0 <= idx < len(content): # Always update the status regardless of current value content[idx]['is_imported'] = True content[idx]['imported_member_id'] = member_id updated = True if updated: new_content = json.dumps(content, ensure_ascii=False) cursor.execute("UPDATE genealogy_records SET ai_content = %s WHERE id = %s", (new_content, source_record_id)) print(f"[Edit Member] Updated AI record status") except Exception as e: print(f"[Edit Member] Error updating AI content status: {e}") print(f"[Edit Member] Committing transaction") conn.commit() print(f"[Edit Member] Transaction committed successfully") if request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.is_json: return jsonify({"success": True, "message": "成员信息更新成功"}) flash('成员信息更新成功') return redirect(url_for('members')) with conn.cursor() as cursor: cursor.execute("SELECT * FROM family_member_info WHERE id = %s", (member_id,)) member = cursor.fetchone() if not member: flash('成员不存在') return redirect(url_for('members')) # 格式化日期供显示 if member.get('birthday'): member['birthday_date'] = format_timestamp(member['birthday']) # 获取现有关系 cursor.execute("SELECT * FROM family_relation_info WHERE source_mid = %s LIMIT 1", (member_id,)) current_relation = cursor.fetchone() cursor.execute("SELECT id, name FROM family_member_info WHERE id != %s ORDER BY name", (member_id,)) all_members = cursor.fetchall() cursor.execute("SELECT * FROM genealogy_records ORDER BY page_number ASC") images = cursor.fetchall() finally: conn.close() # Calculate selected_member_name based on current_relation selected_member_name = '' if current_relation and current_relation['parent_mid']: for m in all_members: if m['id'] == current_relation['parent_mid']: selected_member_name = m['name'] break # Get source_record_id from member data source_record_id = member.get('source_record_id') if member else None return render_template('add_member.html', member=member, images=images, all_members=all_members, current_relation=current_relation, selected_member_name=selected_member_name, source_record_id=source_record_id) @app.route('/manager/member_detail/') def member_detail(member_id): if 'user_id' not in session: return redirect(url_for('login')) conn = get_db_connection() try: with conn.cursor() as cursor: # Join with genealogy_records to get source image info sql = """ SELECT m.*, r.oss_url as source_image_url, r.page_number as source_page, r.genealogy_version, r.genealogy_source, r.upload_person FROM family_member_info m LEFT JOIN genealogy_records r ON m.source_record_id = r.id WHERE m.id = %s """ cursor.execute(sql, (member_id,)) member = cursor.fetchone() if not member: flash('成员不存在') return redirect(url_for('members')) member['birthday_str'] = format_timestamp(member.get('birthday')) # 获取关系(包含子类型) cursor.execute(""" SELECT m.id, m.name, r.relation_type, r.sub_relation_type FROM family_relation_info r JOIN family_member_info m ON r.parent_mid = m.id WHERE r.child_mid = %s """, (member_id,)) parents = cursor.fetchall() cursor.execute(""" SELECT m.id, m.name, r.relation_type, r.sub_relation_type FROM family_relation_info r JOIN family_member_info m ON r.child_mid = m.id WHERE r.parent_mid = %s """, (member_id,)) children = cursor.fetchall() finally: conn.close() return render_template('member_detail.html', member=member, parents=parents, children=children) @app.route('/manager/delete_member/', methods=['POST']) def delete_member(member_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: # 1. 删除关系表中关联该成员的所有记录 cursor.execute("DELETE FROM family_relation_info WHERE parent_mid = %s OR child_mid = %s OR source_mid = %s", (member_id, member_id, member_id)) # 2. 删除成员本身 cursor.execute("DELETE FROM family_member_info WHERE id = %s", (member_id,)) conn.commit() flash('成员及其关系已成功删除') return redirect(url_for('members')) except Exception as e: conn.rollback() flash(f'删除失败: {e}') return redirect(url_for('members')) finally: conn.close() @app.route('/manager/home') def home(): """Home page - Dashboard for the genealogy management system""" if 'user_id' not in session: return redirect(url_for('login')) # Force re-login if is_super_admin not set in session (fresh login required) if 'is_super_admin' not in session: session.clear() flash('请重新登录以获取最新权限') return redirect(url_for('login')) conn = get_db_connection() try: with conn.cursor() as cursor: # Get member count cursor.execute("SELECT COUNT(*) as count FROM family_member_info") member_count = cursor.fetchone()['count'] # Get record count cursor.execute("SELECT COUNT(*) as count FROM genealogy_records") record_count = cursor.fetchone()['count'] # Get PDF count cursor.execute("SELECT COUNT(*) as count FROM genealogy_pdfs") pdf_count = cursor.fetchone()['count'] # Get suspected error count cursor.execute("SELECT COUNT(*) as count FROM family_member_info WHERE suspected_error IS NOT NULL AND TRIM(suspected_error) != ''") error_count = cursor.fetchone()['count'] finally: conn.close() return render_template('home.html', member_count=member_count, record_count=record_count, pdf_count=pdf_count, error_count=error_count) @app.route('/manager/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] try: conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password)) user = cursor.fetchone() if user: session['user_id'] = user['id'] session['username'] = user['username'] session['is_super_admin'] = user.get('is_super_admin', 0) == 1 return redirect(url_for('home')) else: flash('用户名或密码错误') finally: conn.close() except Exception as e: flash(f'数据库连接错误: {str(e)}') print(f'Login error: {str(e)}') return render_template('login.html') @app.route('/manager/logout') def logout(): session.clear() return redirect(url_for('login')) @app.route('/manager/api/check_name') def check_name(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 name = request.args.get('name', '').strip() if not name: return jsonify({"success": True, "exists": False}) conn = get_db_connection() try: with conn.cursor() as cursor: # Check for name or simplified_name match cursor.execute("SELECT id, name, simplified_name, sex, birthday, is_pass_away FROM family_member_info WHERE name = %s OR simplified_name = %s", (name, name)) matches = cursor.fetchall() if matches: # Format birthday for display for m in matches: if m.get('birthday'): m['birthday_str'] = format_timestamp(m['birthday']) else: m['birthday_str'] = '未知' return jsonify({"success": True, "exists": True, "matches": matches}) else: return jsonify({"success": True, "exists": False}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 finally: conn.close() import requests import json import re @app.route('/manager/api/recognize_image', methods=['POST']) def recognize_image(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 data = request.json image_url = data.get('image_url') if not image_url: return jsonify({"success": False, "message": "No image URL provided"}), 400 api_key = "a1800657-9212-4afe-9b7c-b49f015c54d3" api_url = "https://ark.cn-beijing.volces.com/api/v3/responses" prompt = """ 请分析这张家谱图片,提取其中关于人物的信息。 请务必将繁体字转换为简体字(original_name 字段除外)。 特别注意:'name' 字段必须是纯简体中文,不能包含繁体字(例如:'學'应转换为'学','劉'应转换为'刘','萬'应转换为'万')。 请提取以下字段(如果存在): - original_name: 原始姓名(严格保持图片上的繁体字,不做任何修改或转换) - name: 简体姓名(必须转换为简体中文,去除不需要的敬称) - sex: 性别(男/女) - birthday: 出生日期(尝试转换为YYYY-MM-DD格式,如果无法确定年份可只填月日) - death_date: 逝世日期(如文本中出现“殁”、“葬”、“卒”等字眼及其对应的时间,请提取) - father_name: 父亲姓名 - spouse_name: 配偶姓名 - generation: 第几世/代数 - name_word: 字辈(例如名字为“学勤公”,“学”为字辈;提取名字中的字辈信息) - education: 学历/功名 - title: 官职/称号 请严格以JSON列表格式返回,不要包含Markdown代码块标记(如 ```json ... ```),直接返回JSON数组。 如果包含多个人物,请都提取出来。 """ ai_payload_url = get_normalized_base64_image(image_url) payload = { "model": "doubao-seed-1-8-251228", "stream": True, "input": [ { "role": "user", "content": [ { "type": "input_image", "image_url": ai_payload_url }, { "type": "input_text", "text": prompt } ] } ] } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } def generate(): yield "正在连接 AI 服务...\n" try: # 使用 stream=True, timeout=120 # 增加 verify=False 以防 SSL 问题(开发环境) # 增加 proxies=None 以防本地代理干扰 with requests.post( api_url, json=payload, headers=headers, stream=True, timeout=1200, verify=False, proxies={"http": None, "https": None} ) as r: if r.status_code != 200: yield f"Error: API returned status code {r.status_code}. Response: {r.text}" return yield "连接成功,正在等待 AI 响应...\n" full_reasoning = "" json_started = False for line in r.iter_lines(): if line: line_str = line.decode('utf-8') if line_str.startswith('data: '): json_str = line_str[6:] if json_str.strip() == '[DONE]': break try: chunk = json.loads(json_str) # 处理 standard OpenAI choices format (content) if 'choices' in chunk and len(chunk['choices']) > 0: delta = chunk['choices'][0].get('delta', {}) if 'content' in delta: if not json_started: yield "|||JSON_START|||" json_started = True yield delta['content'] # 处理 standard OpenAI choices format (reasoning_content) if any if 'reasoning_content' in delta: yield f"\n[推理]: {delta['reasoning_content']}" # 处理 Doubao/Volcano specific formats # Type: response.reasoning_summary_text.delta if chunk.get('type') == 'response.reasoning_summary_text.delta': if 'delta' in chunk: yield chunk['delta'] # Type: response.text.delta if chunk.get('type') == 'response.text.delta': if 'delta' in chunk: if not json_started: yield "|||JSON_START|||" json_started = True yield chunk['delta'] # Type: response.output_item.added (May contain initial content or status) # Type: response.reasoning_summary_part.added except Exception as e: print(f"Chunk parse error: {e}") else: # 尝试直接解析非 data: 开头的行 try: chunk = json.loads(line_str) if 'choices' in chunk and len(chunk['choices']) > 0: content = chunk['choices'][0]['message']['content'] yield content except: pass except Exception as e: yield f"\n[Error: {str(e)}]" return Response(stream_with_context(generate()), mimetype='text/plain') @app.route('/manager/api/start_analysis/', methods=['POST']) def start_analysis(record_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: # Check if record exists cursor.execute("SELECT oss_url, ai_status FROM genealogy_records WHERE id = %s", (record_id,)) record = cursor.fetchone() if not record: return jsonify({"success": False, "message": "Record not found"}), 404 # Update status to processing (1) cursor.execute("UPDATE genealogy_records SET ai_status = 1 WHERE id = %s", (record_id,)) conn.commit() # Start background task threading.Thread(target=process_ai_task, args=(record_id, record['oss_url'])).start() return jsonify({"success": True, "message": "Analysis started"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 finally: conn.close() def process_files_background(upload_folder, saved_files, manual_page, suggested_page, genealogy_version, genealogy_source, upload_person): current_suggested_page = int(manual_page) if manual_page and str(manual_page).isdigit() else suggested_page ensure_pdf_table() for item in saved_files: if len(item) >= 4: filename, file_path, file_page, original_filename = item[0], item[1], item[2], item[3] elif len(item) == 3: filename, file_path, file_page = item original_filename = filename else: filename, file_path = item[0], item[1] file_page = None original_filename = filename try: if filename.lower().endswith('.pdf'): import uuid display_pdf_name = (original_filename or filename).strip() or filename oss_pdf_name = secure_filename(display_pdf_name) if not oss_pdf_name or not oss_pdf_name.lower().endswith('.pdf'): oss_pdf_name = f"genealogy_pdf_{uuid.uuid4().hex[:8]}.pdf" pdf_oss_url = upload_to_oss(file_path, custom_filename=oss_pdf_name) if pdf_oss_url: desc_parts = [] if genealogy_version: desc_parts.append(genealogy_version) if genealogy_source: desc_parts.append(genealogy_source) pdf_description = ' · '.join(desc_parts) if desc_parts else '' conn_pdf = get_db_connection() try: with conn_pdf.cursor() as cursor: cursor.execute( "INSERT INTO genealogy_pdfs (file_name, oss_url, description, uploader) VALUES (%s, %s, %s, %s)", (display_pdf_name, pdf_oss_url, pdf_description, upload_person or '') ) conn_pdf.commit() except Exception as pdf_meta_e: print(f"Error inserting genealogy_pdfs for {display_pdf_name}: {pdf_meta_e}") finally: conn_pdf.close() else: print(f"Warning: full PDF upload to OSS failed for {filename}, scan pages will still be processed.") doc = fitz.open(file_path) for page_index in range(len(doc)): img_path = None try: page = doc.load_page(page_index) max_dim = max(page.rect.width, page.rect.height) zoom = 2000 / max_dim if max_dim > 0 else 2.0 if zoom > 2.5: zoom = 2.5 mat = fitz.Matrix(zoom, zoom) # Use get_pixmap with matrix directly pix = page.get_pixmap(matrix=mat) final_page = current_suggested_page if genealogy_version and genealogy_source: if final_page is not None and str(final_page).strip() != '': img_filename = f"{genealogy_version}_{genealogy_source}_{final_page}.jpg" else: img_filename = f"{genealogy_version}_{genealogy_source}.jpg" else: img_filename = f"{os.path.splitext(filename)[0]}_page_{page_index+1}.jpg" img_path = os.path.join(upload_folder, img_filename) # Save the pixmap to the image path pix.save(img_path) oss_url = upload_to_oss(img_path, custom_filename=img_filename) if oss_url: conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO genealogy_records (file_name, oss_url, page_number, ai_status, genealogy_version, genealogy_source, upload_person, file_type) VALUES (%s, %s, %s, 1, %s, %s, %s, %s)""" cursor.execute(sql, (img_filename, oss_url, final_page, genealogy_version, genealogy_source, upload_person, 'PDF')) record_id = cursor.lastrowid conn.commit() threading.Thread(target=process_ai_task, args=(record_id, oss_url)).start() current_suggested_page += 1 finally: conn.close() except Exception as page_e: print(f"Error processing page {page_index} of {filename}: {page_e}") finally: if img_path and os.path.exists(img_path): try: os.remove(img_path) except: pass doc.close() else: img_path = compress_image_if_needed(file_path) # Use explicitly set page number if provided, otherwise extract from filename or auto-increment if file_page and str(file_page).isdigit(): final_page = int(file_page) current_suggested_page = final_page + 1 page_num = final_page else: page_num = extract_page_number(img_path) final_page = page_num if page_num else current_suggested_page ext = os.path.splitext(img_path)[1] if genealogy_version and genealogy_source: if final_page is not None and str(final_page).strip() != '': img_filename = f"{genealogy_version}_{genealogy_source}_{final_page}{ext}" else: img_filename = f"{genealogy_version}_{genealogy_source}{ext}" else: img_filename = os.path.basename(img_path) oss_url = upload_to_oss(img_path, custom_filename=img_filename) if oss_url: conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO genealogy_records (file_name, oss_url, page_number, ai_status, genealogy_version, genealogy_source, upload_person, file_type) VALUES (%s, %s, %s, 1, %s, %s, %s, %s)""" cursor.execute(sql, (img_filename, oss_url, final_page, genealogy_version, genealogy_source, upload_person, '图片')) record_id = cursor.lastrowid conn.commit() threading.Thread(target=process_ai_task, args=(record_id, oss_url)).start() if page_num: current_suggested_page = page_num + 1 else: current_suggested_page += 1 finally: conn.close() if img_path and img_path != file_path and os.path.exists(img_path): try: os.remove(img_path) except: pass except Exception as e: print(f"Error processing file {filename}: {e}") finally: if os.path.exists(file_path): try: os.remove(file_path) except: pass @app.route('/manager/upload', methods=['GET', 'POST']) def upload(): if 'user_id' not in session: return redirect(url_for('login')) # 获取建议页码 (当前最大页码 + 1) conn = get_db_connection() suggested_page = 1 try: with conn.cursor() as cursor: cursor.execute("SELECT MAX(page_number) as max_p FROM genealogy_records") result = cursor.fetchone() if result and result['max_p']: suggested_page = result['max_p'] + 1 finally: conn.close() if request.method == 'POST': if 'file' not in request.files: flash('未选择文件') return redirect(request.url) files = request.files.getlist('file') if not files or files[0].filename == '': flash('未选择文件') return redirect(request.url) manual_page = request.form.get('manual_page') genealogy_version = request.form.get('genealogy_version', '') genealogy_source = request.form.get('genealogy_source', '') upload_person = request.form.get('upload_person', '') if not upload_person: upload_person = session.get('username', '') import uuid saved_files = [] for i, file in enumerate(files): if not file or not file.filename: continue original_filename = file.filename ext = os.path.splitext(original_filename)[1].lower() base_name = secure_filename(original_filename) # If secure_filename removes all characters (e.g., pure Chinese name) or just leaves 'pdf' if not base_name or base_name == ext.strip('.'): filename = f"upload_{uuid.uuid4().hex[:8]}{ext}" else: # Ensure the extension is preserved if not base_name.lower().endswith(ext): filename = f"{base_name}{ext}" else: filename = base_name file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(file_path) # Fetch individual page number if it exists file_page = request.form.get(f'page_number_{i}') saved_files.append((filename, file_path, file_page, original_filename)) if saved_files: threading.Thread( target=process_files_background, args=(app.config['UPLOAD_FOLDER'], saved_files, manual_page, suggested_page, genealogy_version, genealogy_source, upload_person) ).start() flash('上传完成,AI解析中,稍后查看') time.sleep(1.5) return redirect(url_for('index')) return render_template('upload.html', suggested_page=suggested_page) @app.route('/manager/save_upload', methods=['POST']) def save_upload(): if 'user_id' not in session: return redirect(url_for('login')) filename = request.form.get('filename') oss_url = request.form.get('oss_url') page_number = request.form.get('page_number') genealogy_version = request.form.get('genealogy_version', '') genealogy_source = request.form.get('genealogy_source', '') upload_person = request.form.get('upload_person', session.get('username', '')) file_type = request.form.get('file_type', '图片') if not oss_url or not page_number: flash('页码不能为空') return redirect(url_for('upload')) conn = get_db_connection() try: with conn.cursor() as cursor: sql = """INSERT INTO genealogy_records (file_name, oss_url, page_number, ai_status, genealogy_version, genealogy_source, upload_person, file_type) VALUES (%s, %s, %s, 1, %s, %s, %s, %s)""" cursor.execute(sql, (filename, oss_url, page_number, genealogy_version, genealogy_source, upload_person, file_type)) record_id = cursor.lastrowid conn.commit() # Start AI Task threading.Thread(target=process_ai_task, args=(record_id, oss_url)).start() flash('上传完成,AI解析中,稍后查看') except Exception as e: flash(f'保存失败: {e}') finally: conn.close() return redirect(url_for('index')) @app.route('/manager/delete_upload/', methods=['POST']) def delete_upload(record_id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: # 删除记录 cursor.execute("DELETE FROM genealogy_records WHERE id = %s", (record_id,)) conn.commit() flash('文件记录已成功删除') return redirect(url_for('index')) except Exception as e: conn.rollback() flash(f'删除失败: {e}') return redirect(url_for('index')) finally: conn.close() @app.route('/manager/upload_pdf', methods=['GET', 'POST']) def upload_pdf(): if 'user_id' not in session: return redirect(url_for('login')) if request.method == 'GET': return render_template('upload_pdf.html') # POST请求处理 if 'file' not in request.files: flash('请选择要上传的PDF文件') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('请选择要上传的PDF文件') return redirect(request.url) # 检查文件类型 if not file.filename.lower().endswith('.pdf'): flash('只支持PDF文件上传') return redirect(request.url) # 获取表单数据 version_name = request.form.get('version_name', '').strip() version_source = request.form.get('version_source', '').strip() file_provider = request.form.get('file_provider', '').strip() # 验证必填字段 if not version_name: flash('版本名称为必填项') return redirect(request.url) if not version_source: flash('版本来源为必填项') return redirect(request.url) # 如果未提供文件提供人,使用当前登录用户 if not file_provider: file_provider = session.get('user_id', '未知') import uuid original_filename = file.filename ext = os.path.splitext(original_filename)[1].lower() base_name = secure_filename(original_filename) if not base_name or base_name == ext.strip('.'): filename = f"genealogy_pdf_{uuid.uuid4().hex[:8]}{ext}" else: if not base_name.lower().endswith(ext): filename = f"{base_name}{ext}" else: filename = base_name file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(file_path) try: # Upload to OSS oss_url = upload_to_oss(file_path, custom_filename=filename) if not oss_url: flash('文件上传失败') return redirect(request.url) # Save to database conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute( "INSERT INTO genealogy_pdfs (file_name, oss_url, version_name, version_source, file_provider, upload_time) VALUES (%s, %s, %s, %s, %s, CURRENT_TIMESTAMP)", (original_filename, oss_url, version_name, version_source, file_provider) ) conn.commit() flash('PDF文件上传成功') return redirect(url_for('pdf_management')) except Exception as e: flash(f'保存失败: {e}') return redirect(request.url) finally: conn.close() finally: if os.path.exists(file_path): try: os.remove(file_path) except: pass def process_pdf_pages(file_path, pdf_oss_url, uploader): """Process PDF pages and add them to genealogy records""" try: import fitz doc = fitz.open(file_path) # Get current max page number conn = get_db_connection() suggested_page = 1 try: with conn.cursor() as cursor: cursor.execute("SELECT MAX(page_number) as max_p FROM genealogy_records") result = cursor.fetchone() if result and result['max_p']: suggested_page = result['max_p'] + 1 finally: conn.close() for page_index in range(len(doc)): try: page = doc[page_index] pix = page.get_pixmap(dpi=150) # Save as image img_filename = f"{os.path.splitext(os.path.basename(file_path))[0]}_page_{page_index+1}.jpg" img_path = os.path.join(app.config['UPLOAD_FOLDER'], img_filename) pix.save(img_path) # Upload to OSS img_oss_url = upload_to_oss(img_path, custom_filename=img_filename) if img_oss_url: # Save to genealogy_records conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute( "INSERT INTO genealogy_records (file_name, oss_url, page_number, ai_status, upload_person, file_type) VALUES (%s, %s, %s, 1, %s, %s)", (img_filename, img_oss_url, suggested_page + page_index, uploader, '图片') ) record_id = cursor.lastrowid conn.commit() # Start AI processing threading.Thread(target=process_ai_task, args=(record_id, img_oss_url)).start() finally: conn.close() except Exception as e: print(f"Error processing page {page_index+1}: {e}") finally: if 'img_path' in locals() and os.path.exists(img_path): try: os.remove(img_path) except: pass except Exception as e: print(f"Error processing PDF: {e}") # --- Settlement Routes --- @app.route('/manager/settlements') def settlements(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('settlements.html') @app.route('/manager/api/settlements', methods=['GET']) def get_settlements(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT s.*, m.name as representative_name, m.simplified_name as representative_simplified_name FROM family_settlements s LEFT JOIN family_member_info m ON s.representative_id = m.id ORDER BY s.created_at DESC """) settlements = cursor.fetchall() # Convert Decimal to float/int for JSON serialization result = [] for s in settlements: item = dict(s) if item.get('latitude'): item['latitude'] = float(item['latitude']) if item.get('longitude'): item['longitude'] = float(item['longitude']) if item.get('population'): item['population'] = int(item['population']) result.append(item) return jsonify({"success": True, "settlements": result}) finally: conn.close() @app.route('/manager/api/settlements/', methods=['GET']) def get_settlement(id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT s.*, m.name as representative_name, m.simplified_name as representative_simplified_name FROM family_settlements s LEFT JOIN family_member_info m ON s.representative_id = m.id WHERE s.id = %s """, (id,)) settlement = cursor.fetchone() if settlement: # Convert Decimal to float/int for JSON serialization item = dict(settlement) if item.get('latitude'): item['latitude'] = float(item['latitude']) if item.get('longitude'): item['longitude'] = float(item['longitude']) if item.get('population'): item['population'] = int(item['population']) return jsonify({"success": True, "settlement": item}) else: return jsonify({"success": False, "message": "聚落不存在"}) finally: conn.close() @app.route('/manager/api/settlements', methods=['POST']) def add_settlement(): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 if not session.get('is_super_admin'): return jsonify({"success": False, "message": "权限不足"}), 403 data = request.get_json() conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" INSERT INTO family_settlements (name, region, latitude, longitude, population, representative_id, description, surname_type, new_surname, enthusiastic_members) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( data.get('name'), data.get('region'), data.get('latitude') or None, data.get('longitude') or None, data.get('population') or 0, data.get('representative_id') or None, data.get('description'), data.get('surname_type') or 0, data.get('new_surname') or None, data.get('enthusiastic_members') or None )) conn.commit() return jsonify({"success": True, "message": "添加成功"}) finally: conn.close() @app.route('/manager/api/settlements/', methods=['PUT']) def update_settlement(id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 if not session.get('is_super_admin'): return jsonify({"success": False, "message": "权限不足"}), 403 data = request.get_json() conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" UPDATE family_settlements SET name=%s, region=%s, latitude=%s, longitude=%s, population=%s, representative_id=%s, description=%s, surname_type=%s, new_surname=%s, enthusiastic_members=%s WHERE id=%s """, ( data.get('name'), data.get('region'), data.get('latitude') or None, data.get('longitude') or None, data.get('population') or 0, data.get('representative_id') or None, data.get('description'), data.get('surname_type') or 0, data.get('new_surname') or None, data.get('enthusiastic_members') or None, id )) conn.commit() return jsonify({"success": True, "message": "更新成功"}) finally: conn.close() @app.route('/manager/api/settlements/', methods=['DELETE']) def delete_settlement(id): if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 if not session.get('is_super_admin'): return jsonify({"success": False, "message": "权限不足"}), 403 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("DELETE FROM family_settlements WHERE id=%s", (id,)) conn.commit() return jsonify({"success": True, "message": "删除成功"}) finally: conn.close() # 异步批量处理族谱原文功能 import uuid def init_batch_task_table(): """初始化批量任务表(如果不存在)""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS batch_genealogy_task ( id INT AUTO_INCREMENT PRIMARY KEY, task_id VARCHAR(64) UNIQUE NOT NULL, user_id INT NOT NULL, status VARCHAR(20) DEFAULT 'pending', total_count INT DEFAULT 0, completed_count INT DEFAULT 0, failed_count INT DEFAULT 0, last_processed_id INT DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, results TEXT ); """) # 检查是否存在last_processed_id字段,如果不存在则添加 cursor.execute("SHOW COLUMNS FROM batch_genealogy_task LIKE 'last_processed_id'") if not cursor.fetchone(): cursor.execute("ALTER TABLE batch_genealogy_task ADD COLUMN last_processed_id INT DEFAULT 0") conn.commit() print("[Database] batch_genealogy_task table initialized") except Exception as e: print(f"[Database] Error creating batch_genealogy_task table: {e}") finally: conn.close() # 初始化表 init_batch_task_table() def migrate_child_order_column(): """为 family_relation_info 表添加 child_order 字段(如不存在)""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("SHOW COLUMNS FROM family_relation_info LIKE 'child_order'") if not cursor.fetchone(): cursor.execute( "ALTER TABLE family_relation_info ADD COLUMN child_order INT DEFAULT NULL COMMENT '第几子,用于兄弟排序'" ) conn.commit() print("[DB Migrate] Added child_order column to family_relation_info") else: print("[DB Migrate] child_order column already exists") except Exception as e: print(f"[DB Migrate] Error adding child_order: {e}") finally: conn.close() migrate_child_order_column() def migrate_enthusiastic_members_column(): """为 family_settlements 表添加 enthusiastic_members 字段(如不存在)""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute("SHOW COLUMNS FROM family_settlements LIKE 'enthusiastic_members'") if not cursor.fetchone(): cursor.execute( "ALTER TABLE family_settlements ADD COLUMN enthusiastic_members TEXT DEFAULT NULL COMMENT '热心宗亲,多人以逗号分隔'" ) conn.commit() print("[DB Migrate] Added enthusiastic_members column to family_settlements") else: print("[DB Migrate] enthusiastic_members column already exists") except Exception as e: print(f"[DB Migrate] Error adding enthusiastic_members: {e}") finally: conn.close() migrate_enthusiastic_members_column() def async_process_genealogy_task(task_id, member_ids, user_id): """异步处理族谱原文任务""" results = [] conn = get_db_connection() try: # 更新任务状态为处理中 with conn.cursor() as cursor: cursor.execute(""" UPDATE batch_genealogy_task SET status = 'processing', total_count = %s WHERE task_id = %s """, (len(member_ids), task_id)) conn.commit() completed_count = 0 failed_count = 0 for member_id in member_ids: try: with conn.cursor() as cursor: cursor.execute(""" SELECT id, name, simplified_name, name_word_generation, birth_place, occupation, notes, sex FROM family_member_info WHERE id = %s """, (member_id,)) member = cursor.fetchone() # 获取父亲信息 cursor.execute(""" SELECT p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 1 LIMIT 1 """, (member_id,)) father = cursor.fetchone() # 获取母亲信息 cursor.execute(""" SELECT p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 2 LIMIT 1 """, (member_id,)) mother = cursor.fetchone() member['father_name'] = father['name'] if father else None member['father_simplified_name'] = father['simplified_name'] if father else None member['mother_name'] = mother['name'] if mother else None member['mother_simplified_name'] = mother['simplified_name'] if mother else None except Exception as e: print(f"[Async Process] Error getting member {member_id}: {e}") results.append({ "member_id": member_id, "name": "未知", "success": False, "message": f"获取成员信息失败: {e}" }) failed_count += 1 continue if not member: results.append({ "member_id": member_id, "name": "未知", "success": False, "message": "成员不存在" }) failed_count += 1 continue # 构建AI提示词 member_info = f""" 姓名(繁体):{member['name']} 姓名(简体):{member['simplified_name'] or '未知'} 世系世代:{member['name_word_generation'] or '未知'} 父亲姓名:{member['father_name'] or '未知'} 母亲姓名:{member['mother_name'] or '未知'} 出生地:{member['birth_place'] or '未知'} 职业:{member['occupation'] or '未知'} 备注:{member['notes'] or '无'} """ prompt = f""" 请根据以下人员信息,模拟生成该人员的族谱原文: {member_info} 请输出两个字段: 1. genealogy_traditional: 族谱原文(繁体中文,模仿传统族谱格式) 2. genealogy_simplified: 族谱原文(简体中文,将繁体转换为简体) 请严格按照JSON格式输出,不要包含任何额外解释: {{ "genealogy_traditional": "繁体族谱原文内容", "genealogy_simplified": "简体族谱原文内容" }} """ ai_response = call_doubao_api(prompt) if ai_response: traditional, simplified = parse_ai_response(ai_response) if traditional or simplified: try: with conn.cursor() as cursor: cursor.execute(""" UPDATE family_member_info SET genealogy_original_traditional = %s, genealogy_original_simplified = %s WHERE id = %s """, (traditional, simplified, member_id)) conn.commit() results.append({ "member_id": member_id, "name": member['name'], "success": True, "traditional": traditional[:100] + "..." if len(traditional) > 100 else traditional, "simplified": simplified[:100] + "..." if len(simplified) > 100 else simplified }) completed_count += 1 except Exception as e: print(f"[Async Process] Error updating member {member_id}: {e}") results.append({ "member_id": member_id, "name": member['name'], "success": False, "message": f"保存失败: {e}" }) failed_count += 1 else: results.append({ "member_id": member_id, "name": member['name'], "success": False, "message": "AI未返回有效数据" }) failed_count += 1 else: results.append({ "member_id": member_id, "name": member['name'], "success": False, "message": "AI调用失败" }) failed_count += 1 # 更新任务状态 status = 'completed' if failed_count == 0 else 'completed_with_errors' with conn.cursor() as cursor: cursor.execute(""" UPDATE batch_genealogy_task SET status = %s, completed_count = %s, failed_count = %s, results = %s WHERE task_id = %s """, (status, completed_count, failed_count, json.dumps(results, ensure_ascii=False), task_id)) conn.commit() print(f"[Async Process] Task {task_id} completed: {completed_count} success, {failed_count} failed") except Exception as e: print(f"[Async Process] Error in task {task_id}: {e}") with conn.cursor() as cursor: cursor.execute(""" UPDATE batch_genealogy_task SET status = 'failed', results = %s WHERE task_id = %s """, (json.dumps({"error": str(e)}, ensure_ascii=False), task_id)) conn.commit() finally: conn.close() @app.route('/manager/api/members/batch_process_genealogy_async', methods=['POST']) def batch_process_genealogy_async(): """异步批量处理族谱原文""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 data = request.get_json() member_ids = data.get('member_ids', []) if not member_ids: return jsonify({"success": False, "message": "请选择成员进行处理"}), 400 # 生成任务ID task_id = str(uuid.uuid4()) # 保存任务到数据库 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" INSERT INTO batch_genealogy_task (task_id, user_id, status, total_count) VALUES (%s, %s, 'pending', %s) """, (task_id, session['user_id'], len(member_ids))) conn.commit() finally: conn.close() # 启动异步线程处理 threading.Thread(target=async_process_genealogy_task, args=(task_id, member_ids, session['user_id'])).start() return jsonify({ "success": True, "task_id": task_id, "message": "任务已创建,正在后台处理中" }) @app.route('/manager/api/members/batch_task_status/', methods=['GET']) def get_batch_task_status(task_id): """获取批量任务状态""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT task_id, status, total_count, completed_count, failed_count, created_at, updated_at, results FROM batch_genealogy_task WHERE task_id = %s AND user_id = %s """, (task_id, session['user_id'])) task = cursor.fetchone() if task: result = { "task_id": task['task_id'], "status": task['status'], "total_count": task['total_count'], "completed_count": task['completed_count'], "failed_count": task['failed_count'], "created_at": task['created_at'].isoformat() if task['created_at'] else None, "updated_at": task['updated_at'].isoformat() if task['updated_at'] else None } if task['results']: try: result['results'] = json.loads(task['results']) except: result['results'] = task['results'] return jsonify({"success": True, "task": result}) else: return jsonify({"success": False, "message": "任务不存在或无权访问"}), 404 finally: conn.close() @app.route('/manager/api/members/batch_tasks', methods=['GET']) def get_batch_tasks(): """获取用户的批量任务列表""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT task_id, status, total_count, completed_count, failed_count, last_processed_id, created_at, updated_at FROM batch_genealogy_task WHERE user_id = %s ORDER BY created_at DESC LIMIT 20 """, (session['user_id'],)) tasks = cursor.fetchall() result = [] for task in tasks: result.append({ "task_id": task['task_id'], "status": task['status'], "total_count": task['total_count'], "completed_count": task['completed_count'], "failed_count": task['failed_count'], "last_processed_id": task['last_processed_id'], "created_at": task['created_at'].isoformat() if task['created_at'] else None, "updated_at": task['updated_at'].isoformat() if task['updated_at'] else None }) return jsonify({"success": True, "tasks": result}) finally: conn.close() def call_doubao_image_api(image_url, prompt): """调用豆包API处理图片,提取文本内容""" api_key = "a1800657-9212-4afe-9b7c-b49f015c54d3" api_url = "https://ark.cn-beijing.volces.com/api/v3/responses" ai_payload_url = get_normalized_base64_image(image_url) payload = { "model": "doubao-seed-1-8-251228", "stream": False, "input": [ { "role": "user", "content": [ {"type": "input_image", "image_url": ai_payload_url}, {"type": "input_text", "text": prompt} ] } ] } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.post( api_url, json=payload, headers=headers, timeout=120, verify=False, proxies={"http": None, "https": None} ) if response.status_code == 200: return response.json() else: print(f"[Image AI API] Error: {response.status_code} - {response.text}") return None except Exception as e: print(f"[Image AI API] Exception: {e}") return None def extract_pure_text(response): """从API响应中提取纯文本内容,优先返回 message 类型的最终答案""" if not response: return '' # 优先从 output 列表中提取 message 类型(最终答案) if 'output' in response: # 第一遍:只找 message 类型 for item in response['output']: if item.get('type') == 'message': content = item.get('content') if isinstance(content, str): return content elif isinstance(content, list): text_parts = [] for part in content: if isinstance(part, dict) and part.get('type') == 'text': text_parts.append(part.get('text', '')) elif isinstance(part, str): text_parts.append(part) result = ''.join(text_parts) if result: return result # 第二遍:没有 message 时才使用 reasoning 内容作为兜底 for item in response['output']: if item.get('type') == 'reasoning': content = item.get('content') all_text = '' summary = item.get('summary', []) for part in summary: if isinstance(part, dict): if part.get('type') in ('summary_text', 'text'): all_text += part.get('text', '') elif isinstance(part, str): all_text += part if isinstance(content, str): all_text += content elif isinstance(content, list): for part in content: if isinstance(part, dict) and part.get('type') == 'text': all_text += part.get('text', '') elif isinstance(part, str): all_text += part if all_text: return all_text # 第三遍:content 直接是字符串的情况 for item in response['output']: content = item.get('content') if isinstance(content, str) and content: return content # 尝试从 choices 中提取(兼容 OpenAI 格式) if 'choices' in response and len(response['choices']) > 0: message = response['choices'][0].get('message', {}) return message.get('content', '') return str(response) def build_genealogy_prompt(member_name): """ 构建用于竖排繁体家谱图片 OCR 提取的 Prompt。 家谱图片为竖排版式(从上到下、从右到左),每位人物记录通常包含: 辈字+名讳、字号、行次、父子关系、配偶(配某氏)、生卒年、葬地、子嗣等。 """ return f"""这是一张竖排繁体中文家谱图片。图片文字采用竖排格式,从上到下、从右到左逐列阅读。 每位人物的记录通常包含以下内容(不一定全有): - 辈字加名讳(如:公諱光元) - 字号(如:字維亮) - 行次(如:行仁一) - 与父亲的关系(如:某某公長子、次子、三子) - 配偶(如:配李氏、娶王氏) - 生卒年月(如:生於某年某月、卒於某年某月) - 葬地(如:葬祖山某向、塟於某地) - 子嗣(如:子二:長某某、次某某) 任务:找到人物「{member_name}」在图片中的完整记录,将其繁体原文逐字准确复制输出。 要求: 1. 只输出「{member_name}」这一个人物的记录,不包含其他人的内容 2. 保持繁体字原貌,不要转换为简体 3. 保留原文中的标点符号 4. 不要添加任何解释、标注、序号或额外说明 5. 直接输出原文内容""" def _extract_from_thinking_output(text): """ 从推理模型的思维链输出中提取最终答案。 推理模型(如 doubao-seed 系列)会在 message 内容里写出完整思考过程: 反复写候选答案、说"不对"再修正,最后以"现在确认/所以输出这个内容"等结论收尾。 本函数的策略: 1. 找最后一个"答案引导词 + 冒号"之后的文本(如"准确的原文是:"、"准确复制:") 2. 若无引导词,则取"现在确认"/"所以输出这个内容"之前的最后一段文本 3. 以上均失败则原文返回 """ # 思维链特征词 THINKING_SIGNALS = ['不对,', '现在确认', '准确复制', '准确的原文是', '正确的输出是', '所以输出这个内容'] if not any(sig in text for sig in THINKING_SIGNALS): return text # 非思维链输出,原样返回 print(f"[CleanText] Detected thinking-model output, extracting final answer") # ---- 策略1:找最后一个答案引导词 ---- ANSWER_INTRO_PATTERNS = [ r'准确的原文是[::]\s*', r'正确的输出是[::]\s*', r'现在准确复制[::]\s*', r'准确复制[::]\s*', r'应该是[::]\s*', r'因此输出[::]\s*', r'所以正确.*?是[::]\s*', r'原文是[::]\s*', r'输出[::]\s*', ] last_end = -1 for pattern in ANSWER_INTRO_PATTERNS: for m in re.finditer(pattern, text): if m.end() > last_end: last_end = m.end() if last_end >= 0: remaining = text[last_end:] # 取到第一个"结束标志"前 END_MARKERS = ['不对', '现在确认', '但是', '然而', '\n\n'] end_pos = len(remaining) for marker in END_MARKERS: idx = remaining.find(marker) if 0 < idx < end_pos: end_pos = idx candidate = remaining[:end_pos].strip() if len(candidate) >= 5: print(f"[CleanText] Extracted via answer-intro pattern: '{candidate[:80]}'") return candidate # ---- 策略2:取"现在确认"之前的最后一段 ---- for end_phrase in ['现在确认', '所以输出这个内容', '这就是.*?的完整记录']: m = re.search(end_phrase, text) if m: before = text[:m.start()].rstrip() # 找最后一个换行符,取之后的内容 last_nl = before.rfind('\n') candidate = (before[last_nl + 1:] if last_nl >= 0 else before[-400:]).strip() if len(candidate) >= 5: print(f"[CleanText] Extracted before confirmation phrase: '{candidate[:80]}'") return candidate return text # 均失败则原样返回 def _apply_char_whitelist(text): """只保留汉字(含扩展A区)和常见中文标点""" return re.sub( r'[^\u4e00-\u9fff\u3400-\u4dbf\u3000-\u303f\uff00-\uffef,。;:、()【】「」『』〔〕·~—…《》]', '', text ).strip() def clean_genealogy_text(text): """ 清理从 AI 响应中提取的族谱文本。 - 处理 Markdown/JSON 格式噪声 - 自动识别思维链推理模型输出,提取最终答案段落 - 保留中文字符和中文标点,去除英文说明行 """ if not text: return '' text = text.strip() # 去除代码块标记 text = re.sub(r'^```[a-z]*\n?', '', text) text = re.sub(r'\n?```$', '', text) text = text.strip() # 尝试解析 JSON,从已知字段提取 try: result = json.loads(text) if isinstance(result, dict): for key in ['text', 'content', 'result', 'traditional', 'genealogy_traditional']: if key in result: text = str(result[key]) break except (json.JSONDecodeError, ValueError): pass # 针对思维链推理模型输出,提取最终答案(必须在行过滤之前,因为推理文本中含有必要的换行结构) text = _extract_from_thinking_output(text) # 按行过滤:去除纯英文/数字行、空行及明显解释性前缀行 lines = text.splitlines() kept_lines = [] for line in lines: line = line.strip() if not line: continue non_ascii = sum(1 for c in line if ord(c) > 127) if non_ascii == 0: continue if re.match(r'^(注[::]|说明[::]|Note[::]|备注[::])', line): continue kept_lines.append(line) text = ''.join(kept_lines) # 字符白名单:只保留汉字和中文标点 text = _apply_char_whitelist(text) return text def async_process_all_empty_genealogy(task_id, user_id): """ 异步批量处理族谱原文为空的成员,支持断点续跑。 连接管理原则:DB 连接仅在快速读写期间持有,AI 调用(最长120s)期间 不占用任何 DB 连接,避免影响其他用户的正常操作。 """ import time # ── 1. 读取断点位置,立即释放连接 ────────────────────────────────────── conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute( "SELECT last_processed_id FROM batch_genealogy_task WHERE task_id = %s", (task_id,) ) task = cursor.fetchone() last_processed_id = task['last_processed_id'] if task else 0 finally: conn.close() completed_count = 0 failed_count = 0 results = [] while True: # ── 2. 取下一条待处理成员(短暂占用连接后立即释放)──────────────── conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT m.id, m.name, m.name_word_generation, m.source_record_id, r.oss_url AS image_url, r.ai_content AS record_ai_content FROM family_member_info m LEFT JOIN genealogy_records r ON m.source_record_id = r.id WHERE (m.genealogy_original_traditional IS NULL OR m.genealogy_original_traditional = '' OR m.genealogy_original_traditional = 'None') AND (m.genealogy_original_simplified IS NULL OR m.genealogy_original_simplified = '' OR m.genealogy_original_simplified = 'None') AND m.id > %s ORDER BY m.id ASC LIMIT 1 """, (last_processed_id,)) member = cursor.fetchone() finally: conn.close() if not member: break member_id = member['id'] member_name = member['name'] image_url = member['image_url'] record_ai_content = member['record_ai_content'] print(f"[Batch Process] Processing member {member_id}: {member_name}") traditional = "" simplified = "" extract_source = "basic_info" try: # ── 3. AI 提取(此阶段不持有任何 DB 连接)──────────────────── if image_url: print(f"[Batch Process] Extracting from image: {image_url}") prompt = build_genealogy_prompt(member_name) ai_response = call_doubao_image_api(image_url, prompt) print(f"[Batch Process] AI response for {member_id}: {str(ai_response)[:300]}") if ai_response: raw_text = extract_pure_text(ai_response) traditional = clean_genealogy_text(raw_text) print(f"[Batch Process] Cleaned traditional: {traditional[:100]}") name_chars = [c for c in member_name if '\u4e00' <= c <= '\u9fff'] name_found = any(c in traditional for c in name_chars) if traditional and len(traditional) >= 5 and name_found: simplified = convert_to_simplified(traditional) extract_source = "image" print(f"[Batch Process] Image extract OK - trad: {traditional[:80]}") else: traditional = "" simplified = "" print(f"[Batch Process] Image extract invalid " f"(name_found={name_found}, len={len(traditional)}), resetting") # ── 4. 回退:从 record AI content 拼装(内存操作,无需 DB)── if not (traditional and simplified) and record_ai_content: print(f"[Batch Process] Fallback: trying record AI content") try: ai_content = json.loads(record_ai_content) if isinstance(ai_content, list): current_person = None for person in ai_content: person_name = person.get('original_name', person.get('name', '')).strip() if person_name and ( member_name in person_name or person_name in member_name ): current_person = person break if current_person: name = current_person.get('original_name', current_person.get('name', member_name)) father_name = current_person.get('father_name', '') spouse_name = current_person.get('spouse_name', '') generation = current_person.get('generation', member['name_word_generation']) traditional = f"{name},{father_name}之子" if father_name else name if spouse_name: traditional += f",配{spouse_name}" if generation: traditional = f"第{generation}世 " + traditional simplified = convert_to_simplified(traditional) extract_source = "ai_content" print(f"[Batch Process] AI content fallback: {traditional[:80]}") else: print(f"[Batch Process] No matching person for '{member_name}' in AI content") except Exception as e: print(f"[Batch Process] Failed to parse record AI content: {e}") # ── 5. 最终回退:从关系表查父亲和配偶,短暂占用连接后立即释放 ── if not (traditional and simplified): print(f"[Batch Process] Fallback: basic info from DB") conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT p.name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 1 LIMIT 1 """, (member_id,)) father = cursor.fetchone() cursor.execute(""" SELECT p.name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 2 LIMIT 1 """, (member_id,)) spouse = cursor.fetchone() finally: conn.close() father_name = father['name'] if father else '' spouse_name = spouse['name'] if spouse else '' generation = member['name_word_generation'] traditional = f"{member_name},{father_name}之子" if father_name else member_name if spouse_name: traditional += f",配{spouse_name}" if generation: traditional = f"第{generation}世 " + traditional simplified = convert_to_simplified(traditional) extract_source = "basic_info" print(f"[Batch Process] Basic info fallback: {traditional[:80]}") except Exception as extract_err: print(f"[Batch Process] Extraction error for member {member_id}: {extract_err}") traditional = "" simplified = "" # ── 6. 保存结果(短暂占用连接后立即释放)──────────────────────── last_processed_id = member_id conn = get_db_connection() try: if traditional and simplified: with conn.cursor() as cursor: cursor.execute(""" UPDATE family_member_info SET genealogy_original_traditional = %s, genealogy_original_simplified = %s WHERE id = %s """, (traditional, simplified, member_id)) completed_count += 1 results.append({ "member_id": member_id, "name": member_name, "success": True, "source": extract_source, "traditional_length": len(traditional), "simplified_length": len(simplified), }) print(f"[Batch Process] Saved member {member_id} (source={extract_source})") else: failed_count += 1 results.append({ "member_id": member_id, "name": member_name, "success": False, "message": "无法提取或生成族谱原文", }) print(f"[Batch Process] Skipped member {member_id}: no valid text extracted") with conn.cursor() as cursor: cursor.execute(""" UPDATE batch_genealogy_task SET completed_count = %s, failed_count = %s, last_processed_id = %s, status = 'processing' WHERE task_id = %s """, (completed_count, failed_count, last_processed_id, task_id)) conn.commit() except Exception as db_err: print(f"[Batch Process] DB save error for member {member_id}: {db_err}") failed_count += 1 finally: conn.close() # 每条处理完后短暂暂停,降低对 AI API 和服务器资源的压力 time.sleep(0.5) # ── 7. 任务完成,写入最终状态 ───────────────────────────────────────── conn = get_db_connection() try: status = 'completed' if failed_count == 0 else 'completed_with_errors' with conn.cursor() as cursor: cursor.execute(""" UPDATE batch_genealogy_task SET status = %s, completed_count = %s, failed_count = %s, results = %s WHERE task_id = %s """, (status, completed_count, failed_count, json.dumps(results, ensure_ascii=False), task_id)) conn.commit() print(f"[Batch Process] Task {task_id} done: " f"{completed_count} success, {failed_count} failed") except Exception as e: print(f"[Batch Process] Error writing final status for {task_id}: {e}") finally: conn.close() @app.route('/manager/api/members/extract_genealogy/', methods=['GET']) def extract_single_genealogy(member_id): """单人员提取族谱原文,核心逻辑与批量处理一致,提取后写入数据库""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: # 查询成员信息 with conn.cursor() as cursor: cursor.execute(""" SELECT m.id, m.name, m.name_word_generation, m.source_record_id, r.oss_url as image_url, r.ai_content AS record_ai_content FROM family_member_info m LEFT JOIN genealogy_records r ON m.source_record_id = r.id WHERE m.id = %s """, (member_id,)) row = cursor.fetchone() if not row: return jsonify({"success": False, "message": "未找到成员"}), 404 # 处理字典或元组格式的返回 if isinstance(row, dict): member = row else: member = { 'id': row[0], 'name': row[1], 'name_word_generation': row[2], 'source_record_id': row[3], 'image_url': row[4], 'record_ai_content': row[5] } # 调试:打印查询结果 print(f"[Single Extract] Query result - id: {member['id']}, name: '{member['name']}', name_word_generation: '{member['name_word_generation']}', source_record_id: {member['source_record_id']}, image_url: '{member['image_url']}', record_ai_content: '{member['record_ai_content'][:50] if member['record_ai_content'] else None}'") traditional = "" simplified = "" source = "basic_info" image_url = member['image_url'] record_ai_content = member['record_ai_content'] print(f"[Single Extract] Processing member {member_id}: {member['name']}") # 优先从关联图片中提取族谱原文 if image_url: print(f"[Single Extract] Extracting from image: {image_url}") member_name = member['name'] prompt = build_genealogy_prompt(member_name) ai_response = call_doubao_image_api(image_url, prompt) print(f"[Single Extract] AI response: {str(ai_response)[:500]}") if ai_response: raw_text = extract_pure_text(ai_response) print(f"[Single Extract] Raw text from response: '{raw_text[:300]}'") traditional = clean_genealogy_text(raw_text) print(f"[Single Extract] Cleaned traditional: '{traditional[:200]}', length: {len(traditional)}") # 验证提取结果是否包含该人物的姓名(至少包含名字中的一个字) name_chars = [c for c in member_name if '\u4e00' <= c <= '\u9fff'] name_found = any(c in traditional for c in name_chars) if traditional and len(traditional) >= 5 and name_found: simplified = convert_to_simplified(traditional) source = "image" print(f"[Single Extract] Extracted from image - traditional: {traditional[:100]}, simplified: {simplified[:100]}") else: traditional = "" simplified = "" if not name_found: print(f"[Single Extract] Extracted text does not contain name '{member_name}', resetting") else: print(f"[Single Extract] Image extraction too short ({len(traditional)} chars), resetting") else: print(f"[Single Extract] AI response is None or empty") else: print(f"[Single Extract] No image URL found for member {member_id}") # 如果从图片提取失败或没有图片,尝试从已有的AI解析内容中提取 if not (traditional and simplified) and record_ai_content: print(f"[Single Extract] Trying to extract from record AI content") try: ai_content = json.loads(record_ai_content) if isinstance(ai_content, list) and len(ai_content) > 0: current_person = None member_name = member['name'] for person in ai_content: person_name = person.get('original_name', person.get('name', '')).strip() if person_name and (member_name in person_name or person_name in member_name): current_person = person break if current_person: name = current_person.get('original_name', current_person.get('name', member['name'])) father_name = current_person.get('father_name', '') spouse_name = current_person.get('spouse_name', '') generation = current_person.get('generation', member['name_word_generation']) traditional = f"{name},{father_name}之子" if spouse_name: traditional += f",配{spouse_name}" if generation: traditional = f"第{generation}世 " + traditional simplified = convert_to_simplified(traditional) source = "ai_content" print(f"[Single Extract] Generated from AI content: {traditional}") except Exception as e: print(f"[Single Extract] Failed to parse record AI content: {e}") # 如果还是没有内容,使用基本信息生成(标注来源为 basic_info) if not (traditional and simplified): print(f"[Single Extract] Generating from basic info") with conn.cursor() as cursor: cursor.execute(""" SELECT p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 1 LIMIT 1 """, (member_id,)) father_row = cursor.fetchone() father_name = father_row[0] if father_row else '' cursor.execute(""" SELECT p.name, p.simplified_name FROM family_relation_info r JOIN family_member_info p ON r.parent_mid = p.id WHERE r.child_mid = %s AND r.relation_type = 2 LIMIT 1 """, (member_id,)) spouse_row = cursor.fetchone() spouse_name = spouse_row[0] if spouse_row else '' generation = member['name_word_generation'] name = member['name'] traditional = f"{name},{father_name}之子" if father_name else name if spouse_name: traditional += f",配{spouse_name}" if generation: traditional = f"第{generation}世 " + traditional simplified = convert_to_simplified(traditional) source = "basic_info" print(f"[Single Extract] Generated from basic info: {traditional}") # 调试:打印最终结果 print(f"[Single Extract] Final result - traditional: '{traditional}', simplified: '{simplified}'") # 写入数据库 if traditional and simplified: with conn.cursor() as cursor: cursor.execute(""" UPDATE family_member_info SET genealogy_original_traditional = %s, genealogy_original_simplified = %s WHERE id = %s """, (traditional, simplified, member_id)) conn.commit() print(f"[Single Extract] Successfully saved to database") return jsonify({ "success": True, "member_id": member_id, "name": member['name'], "genealogy_traditional": traditional, "genealogy_simplified": simplified, "source": source }) else: return jsonify({ "success": False, "member_id": member_id, "message": "无法提取或生成族谱原文" }) except Exception as e: import traceback print(f"[Single Extract] Error: {e}") print(f"[Single Extract] Traceback: {traceback.format_exc()}") return jsonify({ "success": False, "member_id": member_id, "message": str(e), "error_type": type(e).__name__ }) finally: conn.close() @app.route('/manager/api/members/batch_resume_task', methods=['GET']) def batch_resume_task(): """ 恢复因服务重启而中断的批量任务(GET,方便浏览器直接访问)。 可选参数:?task_id=xxx 不传则自动找最近一条中断任务。 """ if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 task_id = request.args.get('task_id') conn = get_db_connection() try: with conn.cursor() as cursor: if task_id: cursor.execute(""" SELECT task_id, status, last_processed_id, total_count, completed_count, failed_count FROM batch_genealogy_task WHERE task_id = %s AND user_id = %s """, (task_id, session['user_id'])) else: # 找最近一条中断的任务 cursor.execute(""" SELECT task_id, status, last_processed_id, total_count, completed_count, failed_count FROM batch_genealogy_task WHERE user_id = %s AND status IN ('pending', 'processing', 'interrupted') ORDER BY created_at DESC LIMIT 1 """, (session['user_id'],)) task = cursor.fetchone() if not task: return jsonify({"success": False, "message": "未找到可恢复的任务"}), 404 task_id = task['task_id'] # 重新标记为 processing,准备恢复线程 with conn.cursor() as cursor: cursor.execute(""" UPDATE batch_genealogy_task SET status = 'processing' WHERE task_id = %s """, (task_id,)) conn.commit() threading.Thread( target=async_process_all_empty_genealogy, args=(task_id, session['user_id']), daemon=True ).start() return jsonify({ "success": True, "task_id": task_id, "message": f"任务已从断点恢复(已完成 {task['completed_count']},从 last_processed_id={task['last_processed_id']} 继续)", "last_processed_id": task['last_processed_id'], "completed_count": task['completed_count'], "total_count": task['total_count'], }) finally: conn.close() @app.route('/manager/api/members/batch_process_all_empty', methods=['GET']) def batch_process_all_empty(): """简便批量处理接口:自动处理所有族谱原文为空的成员,支持断点续跑""" if 'user_id' not in session: return jsonify({"success": False, "message": "Unauthorized"}), 401 conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT COUNT(*) as count FROM family_member_info WHERE (genealogy_original_traditional IS NULL OR genealogy_original_traditional = '' OR genealogy_original_traditional = 'None') AND (genealogy_original_simplified IS NULL OR genealogy_original_simplified = '' OR genealogy_original_simplified = 'None') """) result = cursor.fetchone() total_empty = result['count'] if result else 0 cursor.execute(""" SELECT task_id, status, last_processed_id, total_count, completed_count, failed_count FROM batch_genealogy_task WHERE user_id = %s AND status IN ('pending', 'processing') ORDER BY created_at DESC LIMIT 1 """, (session['user_id'],)) running_task = cursor.fetchone() if running_task: return jsonify({ "success": False, "message": "存在正在进行的任务,若服务已重启可调用 POST /manager/api/members/batch_resume_task 恢复", "task_id": running_task['task_id'], "status": running_task['status'], "last_processed_id": running_task['last_processed_id'], "completed_count": running_task['completed_count'], "total_count": running_task['total_count'], "resume_tip": "POST /manager/api/members/batch_resume_task body: {\"task_id\": \"" + running_task['task_id'] + "\"}" }) task_id = str(uuid.uuid4()) with conn.cursor() as cursor: cursor.execute(""" INSERT INTO batch_genealogy_task (task_id, user_id, status, total_count, last_processed_id) VALUES (%s, %s, 'processing', %s, 0) """, (task_id, session['user_id'], total_empty)) conn.commit() threading.Thread( target=async_process_all_empty_genealogy, args=(task_id, session['user_id']), daemon=True ).start() return jsonify({ "success": True, "task_id": task_id, "message": f"任务已创建,将处理 {total_empty} 个族谱原文为空的成员", "total_count": total_empty }) finally: conn.close() if __name__ == '__main__': app.run(debug=False, port=5001)