import os import sys import json import re import tempfile import urllib.request import urllib.error import urllib.parse import mysql.connector import datetime import random import base64 import socket from typing import Optional, List from flask import Flask, render_template, request, jsonify, url_for, send_file, redirect from html.parser import HTMLParser from urllib.parse import urljoin, urlparse BASE_DIR = os.path.dirname(os.path.abspath(__file__)) def _load_dotenv_if_exists(): """ 允许把数据库/AI 等配置放在同目录 config.env 里(不改代码即可生效)。 重要:需要在读取 os.getenv(...) 之前执行。 """ try: from dotenv import load_dotenv # type: ignore except Exception: return # 兼容源码运行 / PyInstaller 打包运行 candidates = [ os.path.join(BASE_DIR, "config.env"), os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "config.env"), ] for p in candidates: if os.path.exists(p): load_dotenv(p, override=True) break _load_dotenv_if_exists() def resource_path(*parts: str) -> str: """ 资源路径兼容: - 源码运行:基于当前文件目录 - PyInstaller 打包:基于 _MEIPASS """ base = getattr(sys, "_MEIPASS", BASE_DIR) return os.path.join(base, *parts) app = Flask( __name__, template_folder=resource_path("templates"), static_folder=resource_path("static"), ) # 可能的主键列名(不同库/表可能不一致) _PK_CANDIDATE_COLS = ["id", "question_id", "pk_id", "qid"] _PK_COLS_CACHE: Optional[List[str]] = None # 数据库配置 # 默认值:按你刚刚给的 JDBC 配置来(也支持用环境变量覆盖) DB_CONFIG = { "host": os.getenv("DB_HOST", "rm-f8ze60yirdj8786u2wo.mysql.rds.aliyuncs.com"), "port": int(os.getenv("DB_PORT", "3306")), "database": os.getenv("DB_DATABASE", "math-conten-online2"), "user": os.getenv("DB_USERNAME", "root"), "password": os.getenv("DB_PASSWORD", "csqz@20255"), "charset": "utf8mb4", } # 远程 PDF 生成接口(你提供的) PDF_API_URL = os.getenv("PDF_API_URL", "https://teaching-content.chunsunqiuzhu.com/api/questions/pdf") PDF_STUDENT_ID = os.getenv("PDF_STUDENT_ID", "44") # 默认写死 44(也支持用环境变量覆盖) # AI 优化题干(通过环境变量配置,避免把 key 写进代码) AI_API_KEY = os.getenv("AI_API_KEY", "").strip() AI_BASE_URL = (os.getenv("AI_BASE_URL") or "").strip() # 为空则使用官方默认 AI_MODEL_NAME = (os.getenv("AI_MODEL_NAME") or "gpt-5.2").strip() # 难度评分配置(使用相同的 AI 配置) DIFFICULTY_SCORING_API_KEY = os.getenv("DIFFICULTY_SCORING_API_KEY", AI_API_KEY).strip() DIFFICULTY_SCORING_MODEL = os.getenv("DIFFICULTY_SCORING_MODEL", AI_MODEL_NAME).strip() DIFFICULTY_SCORING_TEMPERATURE = float(os.getenv("DIFFICULTY_SCORING_TEMPERATURE", "0.0")) DIFFICULTY_SCORING_TOP_P = float(os.getenv("DIFFICULTY_SCORING_TOP_P", "0.3")) DIFFICULTY_SCORING_PRESENCE_PENALTY = float(os.getenv("DIFFICULTY_SCORING_PRESENCE_PENALTY", "0.0")) DIFFICULTY_SCORING_FREQUENCY_PENALTY = float(os.getenv("DIFFICULTY_SCORING_FREQUENCY_PENALTY", "0.0")) # 默认难度评分提示词 DIFFICULTY_SCORING_PROMPT = """你是一名数学题库难度建模专家。你的任务不是解题,而是基于题目文本、公式及图像信息,对题目难度进行结构化量化评估,用于企业级题库难度算法。本评估包含四个评分维度,四个维度等权重参与最终得分,但在判断时需遵循重要性顺序:推理与运算步数最高,知识点数量第二,抽象与构造要求第三,题型常规性最低。每个维度的取值只能是 0.33、0.66 或 1.00,不允许出现其他数值。(1)推理与运算步数:直接或少量步骤取 0.33,中等连续步骤取 0.66,多步链式推理或存在中间结论取 1.00;(2)知识点数量:单一核心知识点取 0.33,两个知识点取 0.66,三个及以上或跨模块综合取 1.00;(3)抽象与构造要求:无需抽象或构造取 0.33,需要一定抽象或简单构造取 0.66,需要明显构造或较高层次建模取 1.00;(4)题型常规性:高度常规模板题取 0.33,存在一定变式取 0.66,非典型或创新题型取 1.00。四个维度得分相加后需归一化为 0~1 的 total_score,并按以下规则映射难度等级:0~0.25 为"筑基",0.25~0.5 为"提分",0.5 以上为"培优"。请仅以严格 JSON 格式输出 dimension_scores、total_score 和 difficulty_level,不得输出其他字段,不得展开解题过程。""" AI_STEM_PROMPT_DEFAULT = r"""你是“题干格式修复器”,不是解题老师。 你的任务:把输入题干(stem)做最小幅度的格式清洗,使其更适合 PDF / 系统展示。 【核心原则:原文已兼容就不要改】(最重要,违反此条视为失败) - 如果原文已经是 PDF 兼容格式(例如:数字、单位、上标³、普通文本混合),就不要做任何改动。 - **严禁添加**:反斜杠 `\`、逗号 `,`、LaTeX 命令(如 `\,`、`\n`、`^3` 等)、换行符 `\n` 等原本不存在的内容。 - **严禁改动**:不要把上标³改成 `^3`,不要把普通文本改成 LaTeX 格式,不要把 Unicode 上标改成 LaTeX 上标。 - **严禁添加换行**:不要添加 `\n` 或任何换行符,保持原文的换行格式不变。如果原文是连续文本,输出也必须是连续文本。 【严禁解题/严禁新增内容】(非常重要) - 不要解题:不要推导、不要解释、不要补充答案、不要给出步骤、不要增加任何原文中不存在的内容。 - 不要把题干改写成解析/答案。 【必须保留】(非常重要) 1) 保留原题干的结构与排版:不调整顺序、不合并/拆分段落、不新增/删除句子。 2) 保留所有 HTML 标签与结构原样不动(例如


等)。 3) 题干中出现的所有 ... 片段必须在输出中**逐字复制**(字符、空格、换行、属性顺序、大小写都必须完全一致),不得省略、重排、格式化。 4) **保留上标格式**:如果原文是 `dm³`、`cm³`、`m²` 等 Unicode 上标,必须保持原样,严禁改成 `dm^3`、`cm^3`、`m^2` 等 LaTeX 格式。 5) **保留单位格式**:如果原文是 `2.5 dm³` 这种格式(数字+空格+单位),保持原样,不要添加 `\,`、`\n` 等 LaTeX 命令。 【允许的唯一改动】(只允许做下面这些替换;除此之外任何字符都不要改) 6) 公式包裹:对行内公式 `$...$`,仅去掉外层 `$`,并保留内部内容的原顺序与原字符(不要删改里面的数字/变量/符号)。 7) 符号替换(只替换命中的这些 LaTeX 命令,其它所有内容保持原样): - \triangle → △ - \text{cm} → cm - \text{cm²} → cm² - \neq → != - \ge → ≥ - \le → ≤ - \Rightarrow => => - \sqrt → sqrt - \perp → ⟂ 8) 填空线:把所有“连续下划线填空线”(长度不固定)统一成:________(8 个下划线)。 9) **问号填空**:把题干中的问号 `?`(用于表示填空)替换成:________(8 个下划线)。注意:不要删除问号,要替换成填空线。 【输出格式】 10) 输出必须是一段“可直接写回 stem 字段”的文本/HTML。 11) 只输出结果,不要加任何解释、标题、Markdown 标记(不要使用 **、列表编号等)。 下面是题干原文,请按规则输出清洗后的题干: """ _env_prompt = os.getenv("AI_STEM_PROMPT") # 注意:如果环境变量存在但为空(例如 AI_STEM_PROMPT=),os.getenv 会返回空字符串, # 这会把默认提示词覆盖掉,导致 AI 没有收到规则。这里显式做“空值回退默认”。 AI_STEM_PROMPT = (_env_prompt.strip() if isinstance(_env_prompt, str) else "").strip() or AI_STEM_PROMPT_DEFAULT _RE_SVG_BLOCK = re.compile(r"", re.IGNORECASE) def _env_bool(name: str, default: bool) -> bool: val = os.getenv(name) if val is None: return default return str(val).strip().lower() in {"1", "true", "yes", "y", "on"} def get_db_connection(): """ use_pure=True:避免 C 扩展在部分环境抛出 “RuntimeError: Failed raising error.” connection_timeout:避免卡死 ssl:默认按 JDBC 里的 useSSL=true 走;如果你本地/网络环境不支持,也可以用环境变量关闭 DB_USE_SSL=false """ use_ssl = _env_bool("DB_USE_SSL", True) try: return mysql.connector.connect( **DB_CONFIG, use_pure=True, connection_timeout=5, ssl_disabled=(not use_ssl), ) except Exception: # 兜底:有些环境 SSL 握手会失败,尝试降级不启用 SSL,让你至少能先用起来 if use_ssl: return mysql.connector.connect( **DB_CONFIG, use_pure=True, connection_timeout=5, ssl_disabled=True, ) raise def render_db_error(e: Exception): return render_template( "db_error.html", error=str(e), db_host=DB_CONFIG.get("host"), db_port=DB_CONFIG.get("port"), db_name=DB_CONFIG.get("database"), db_user=DB_CONFIG.get("user"), ), 500 def get_pk_columns(conn) -> List[str]: """ 获取 questions_tem 表里"可能的主键列名"中实际存在的列。 只要命中一个就能用主键 ID 搜索跳转。 """ global _PK_COLS_CACHE if _PK_COLS_CACHE is not None: return _PK_COLS_CACHE cols: List[str] = [] try: cursor = conn.cursor() for c in _PK_CANDIDATE_COLS: cursor.execute("SHOW COLUMNS FROM questions_tem LIKE %s", (c,)) if cursor.fetchone(): cols.append(c) except Exception: cols = [] _PK_COLS_CACHE = cols return cols def get_question_pk(question: dict) -> Optional[str]: """ 从题目记录里提取“题目主键ID”(用于远程导出 PDF 接口的 question_ids)。 兼容不同库的列名:id / question_id / pk_id / qid """ for c in _PK_CANDIDATE_COLS: v = question.get(c) if v is None: continue s = str(v).strip() if s: return s return None def request_remote_pdf_url(question_id: str, include_grading: bool = True) -> List[str]: """ 调用你给的接口生成 PDF,返回 pdf_url 列表(可能是一个或多个)。 返回结构示例: {"success": true, "data": {"pdf_url": "https://...pdf"}} # 单个 或 {"success": true, "data": {"pdf_url": ["https://...pdf1", "https://...pdf2"]}} # 多个 """ payload = { "question_ids": [str(question_id)], "student_id": str(PDF_STUDENT_ID), "include_grading": include_grading } data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( PDF_API_URL, data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=20) as resp: raw = resp.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as e: detail = "" try: detail = e.read().decode("utf-8", errors="replace") except Exception: detail = "" raise RuntimeError(f"PDF 接口 HTTP 错误:{e.code} {e.reason}. {detail}".strip()) except Exception as e: raise RuntimeError(f"调用 PDF 接口失败:{e}") try: obj = json.loads(raw) except Exception: raise RuntimeError(f"PDF 接口返回不是合法 JSON:{raw[:300]}") if not isinstance(obj, dict) or not obj.get("success"): msg = obj.get("message") if isinstance(obj, dict) else "" raise RuntimeError(f"PDF 生成失败:{msg or raw[:300]}") data_obj = obj.get("data") if isinstance(obj, dict) else None if not isinstance(data_obj, dict): raise RuntimeError(f"PDF 接口返回的 data 不是对象:{raw[:300]}") # 接口返回格式:{"pdf_url": "...", "grading_pdf_url": "..."} # 只打开 grading_pdf_url(评分PDF),不打开 pdf_url(题目PDF) grading_pdf_url = data_obj.get("grading_pdf_url") if not grading_pdf_url or not isinstance(grading_pdf_url, str) or not grading_pdf_url.strip(): raise RuntimeError(f"PDF 接口未返回有效的 grading_pdf_url:{raw[:300]}") return [str(grading_pdf_url).strip()] def _extract_svg_blocks(text: str) -> List[str]: if not text: return [] return _RE_SVG_BLOCK.findall(text) def _svg_blocks_equal(a: str, b: str) -> bool: """ 严格校验:新旧题干中的 ... 片段列表必须完全一致(数量、顺序、内容都一致)。 """ return _extract_svg_blocks(a) == _extract_svg_blocks(b) def call_ai_optimize_stem(stem_html: str) -> str: """ 调用 AI,把 stem 原文优化成“安全版 stem”,返回一段可直接写入 stem 的文本/HTML。 注意:AI 返回的是一段文本,我们直接当作新的 stem 处理。 """ if not AI_API_KEY: raise RuntimeError("缺少 AI_API_KEY:请在 config.env 里配置 AI_API_KEY") base = AI_BASE_URL.rstrip("/") if not base: base = "https://api.openai.com/v1" url = f"{base}/chat/completions" # 把“规则”放 system,把题干原文放 user payload = { "model": AI_MODEL_NAME, "messages": [ {"role": "system", "content": AI_STEM_PROMPT}, {"role": "user", "content": stem_html or ""}, ], # 关键:不设置时,部分兼容实现会默认只生成很短的内容(看起来像“没读 prompt”)。 # 为了让模型能完整输出“带 SVG 的题干”,这里给足 token 预算。 # 注意:部分 API 端点只支持 max_completion_tokens,不支持 max_tokens,所以只传前者。 "max_completion_tokens": int(os.getenv("AI_MAX_TOKENS", "4096")), "temperature": float(os.getenv("AI_TEMPERATURE", "0.0")), } data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( url, data=data, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {AI_API_KEY}", }, method="POST", ) try: with urllib.request.urlopen(req, timeout=60) as resp: raw = resp.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as e: detail = "" try: detail = e.read().decode("utf-8", errors="replace") except Exception: detail = "" raise RuntimeError(f"AI 接口 HTTP 错误:{e.code} {e.reason}. {detail}".strip()) except Exception as e: raise RuntimeError(f"调用 AI 接口失败:{e}") try: obj = json.loads(raw) except Exception: raise RuntimeError(f"AI 接口返回不是合法 JSON:{raw[:300]}") try: content = obj["choices"][0]["message"]["content"] except Exception: raise RuntimeError(f"AI 接口返回结构不符合预期:{raw[:300]}") s = str(content or "").strip() if not s: raise RuntimeError("AI 返回为空") # 后处理:清理 AI 误加的无意义换行符 `\n` # 规则:只清理那些在连续文本中间的单个 `\n`(前后都是字母/数字/中文,没有空格或标点) # 保留 HTML 标签内的换行(如 内的换行)和原本有意义的段落换行 # 匹配:字母/数字/中文 + \n + 字母/数字/中文(这种通常是误加的) s = re.sub(r"([\w\u4e00-\u9fff])\n([\w\u4e00-\u9fff])", r"\1 \2", s) # 清理多余的连续空格(避免替换后产生多个空格) s = re.sub(r" +", " ", s) # 保护:如果输入很长但输出极短,通常是 token 上限/模型拒答/接口兼容问题。 # 直接抛错,避免用户误以为“优化成功”。 if len((stem_html or "").strip()) >= 200 and len(s) <= 40: raise RuntimeError( "AI 返回内容过短(疑似被 max_tokens 截断或模型未按要求输出)。请重试;" "如仍复现,可把 AI_MODEL_NAME / AI_BASE_URL 发我进一步定位。" ) return s # 加载知识点映射和层级结构 # ==================== 难度评分相关函数 ==================== class ImageExtractor(HTMLParser): """从HTML中提取图片URL""" def __init__(self): super().__init__() self.image_urls = [] def handle_starttag(self, tag, attrs): if tag == 'img': for attr in attrs: if attr[0] in ['src', 'data-src']: url = attr[1] if url: self.image_urls.append(url) def extract_images_from_html(html_content, base_url=None): """从HTML内容中提取所有图片URL""" parser = ImageExtractor() parser.feed(html_content) image_urls = parser.image_urls # 如果是相对路径,转换为绝对路径 if base_url and image_urls: image_urls = [urljoin(base_url, url) if not urlparse(url).netloc else url for url in image_urls] return image_urls def extract_text_from_html(html_content): """从HTML中提取纯文本(去除标签)""" # 简单的HTML标签去除 text = re.sub(r'<[^>]+>', '', html_content) # 去除多余的空白字符 text = re.sub(r'\s+', ' ', text).strip() return text def parse_json_response(text): """从AI响应中提取JSON""" # 尝试直接解析 try: return json.loads(text) except: pass # 尝试提取JSON部分(如果响应包含其他文本) json_match = re.search(r'\{[\s\S]*\}', text) if json_match: try: return json.loads(json_match.group()) except: pass # 如果都失败,返回None return None def get_openai_client(): """获取OpenAI客户端""" try: from openai import OpenAI api_key = DIFFICULTY_SCORING_API_KEY or AI_API_KEY if not api_key: return None base_url = AI_BASE_URL.rstrip("/") if AI_BASE_URL else None if base_url: return OpenAI(api_key=api_key, base_url=base_url) else: return OpenAI(api_key=api_key) except ImportError: return None except Exception as e: print(f"警告: OpenAI客户端初始化失败: {e}") return None def load_kp_structure(): """ Load knowledge point structure from database Returns: - kp_map: {id: label} mapping - kp_hierarchy: hierarchical structure list, each element contains chapter, section, subsection info """ # Load from database instead of JSON file _, kp_tree = load_kp_structure_from_db() kp_map = {} kp_hierarchy = [] def process_node(node, parent_chapter=None, parent_section=None): """Process node recursively, only keep three levels: chapter, section, subsection""" if not isinstance(node, dict): return node_id = node.get("id") node_label = node.get("label", "") kp_level = node.get("kp_level", "") # Record mapping for all nodes if node_id and node_label: kp_map[str(node_id)] = node_label # Only process three levels if kp_level == "chapter": # Chapter level chapter_info = { "id": node_id, "label": node_label, "sections": [] } kp_hierarchy.append(chapter_info) # Process child nodes (sections) children = node.get("children", []) for child in children: process_node(child, parent_chapter=chapter_info, parent_section=None) elif kp_level == "section": # Section level if parent_chapter: section_info = { "id": node_id, "label": node_label, "subsections": [] } parent_chapter["sections"].append(section_info) # Process child nodes (subsections) children = node.get("children", []) for child in children: process_node(child, parent_chapter=parent_chapter, parent_section=section_info) elif kp_level == "subsection": # Subsection level if parent_section: subsection_info = { "id": node_id, "label": node_label } parent_section["subsections"].append(subsection_info) # If node has children, continue recursive processing (but only process matching levels) children = node.get("children", []) if children and kp_level not in ["chapter", "section", "subsection"]: # For other levels (like lesson), continue searching down for child in children: process_node(child, parent_chapter=parent_chapter, parent_section=parent_section) # Process root nodes if isinstance(kp_tree, dict): children = kp_tree.get("children", []) for child in children: process_node(child) elif isinstance(kp_tree, list): for item in kp_tree: process_node(item) return kp_map, kp_hierarchy def load_kp_structure_from_db(): """ 从MySQL数据库加载知识点结构,构建树形结构 返回: - kp_map: {kp_code: name} 映射 - kp_tree: 树形结构列表,每个节点包含 children 列表和 question_count """ kp_map = {} kp_tree = [] try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 查询所有知识点及其题目数量 cursor.execute(""" SELECT kp.kp_code, kp.name, kp.parent_kp_code, kp.grade, COUNT(q.question_code) as question_count FROM knowledge_points_copy1 kp LEFT JOIN questions_tem q ON q.kp_code = kp.kp_code GROUP BY kp.kp_code, kp.name, kp.parent_kp_code, kp.grade ORDER BY kp.kp_code """) all_kps = cursor.fetchall() # 构建映射 for kp in all_kps: kp_map[kp['kp_code']] = kp['name'] # 构建节点字典 nodes = {} for kp in all_kps: question_count = int(kp.get('question_count', 0) or 0) nodes[kp['kp_code']] = { 'kp_code': kp['kp_code'], 'name': kp['name'], 'parent_kp_code': kp['parent_kp_code'], 'grade': kp['grade'], 'question_count': question_count, 'children': [] } # 构建树形结构 for kp_code, node in nodes.items(): parent_code = node['parent_kp_code'] if parent_code and parent_code in nodes: # 有父节点,添加到父节点的children中 nodes[parent_code]['children'].append(node) else: # 没有父节点,是根节点 kp_tree.append(node) # 递归计算父节点的题目数量(包括子节点) def calculate_total_count(node): total = node.get('question_count', 0) for child in node['children']: total += calculate_total_count(child) node['total_question_count'] = total return total # 对每个节点的children按kp_code排序,并计算总题目数 def sort_children(node): node['children'].sort(key=lambda x: x['kp_code']) for child in node['children']: sort_children(child) # 计算总题目数(包括子节点) calculate_total_count(node) for root in kp_tree: sort_children(root) conn.close() except Exception as e: print(f"Error loading knowledge points from database: {e}") import traceback traceback.print_exc() return kp_map, kp_tree def load_kp_structure_with_pending_count(): """ 从MySQL数据库加载知识点结构,构建树形结构,只包含有未审核题目的节点 返回: - kp_map: {kp_code: name} 映射 - kp_tree: 树形结构列表,每个节点包含 children 列表和 pending_count(未审核题目数量) - other_questions_count: 其他题目(未关联知识点)的未审核题目数量 """ kp_map = {} kp_tree = [] other_questions_count = 0 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 查询所有知识点及其未审核题目数量(只查询有未审核题目的知识点) cursor.execute(""" SELECT kp.kp_code, kp.name, kp.parent_kp_code, kp.grade, COUNT(q.question_code) as pending_count FROM knowledge_points_copy1 kp INNER JOIN questions_tem q ON q.kp_code = kp.kp_code WHERE (q.audit_reason IS NULL OR q.audit_reason = '') GROUP BY kp.kp_code, kp.name, kp.parent_kp_code, kp.grade HAVING pending_count > 0 ORDER BY kp.kp_code """) all_kps = cursor.fetchall() # 查询其他题目(未关联知识点)的未审核题目数量 cursor.execute(""" SELECT COUNT(*) as count FROM questions_tem WHERE (kp_code IS NULL OR kp_code = '') AND (audit_reason IS NULL OR audit_reason = '') """) other_result = cursor.fetchone() other_questions_count = int(other_result['count'] or 0) if other_result else 0 # 构建映射 for kp in all_kps: kp_map[kp['kp_code']] = kp['name'] # 构建节点字典 nodes = {} for kp in all_kps: pending_count = int(kp.get('pending_count', 0) or 0) nodes[kp['kp_code']] = { 'kp_code': kp['kp_code'], 'name': kp['name'], 'parent_kp_code': kp['parent_kp_code'], 'grade': kp['grade'], 'pending_count': pending_count, 'children': [] } # 构建树形结构(只包含有未审核题目的节点) for kp_code, node in nodes.items(): parent_code = node['parent_kp_code'] if parent_code and parent_code in nodes: # 有父节点,添加到父节点的children中 nodes[parent_code]['children'].append(node) else: # 没有父节点,是根节点 kp_tree.append(node) # 递归计算父节点的未审核题目数量(包括子节点) def calculate_total_pending_count(node): total = node.get('pending_count', 0) for child in node['children']: total += calculate_total_pending_count(child) node['total_pending_count'] = total return total # 对每个节点的children按kp_code排序,并计算总未审核题目数 def sort_children(node): node['children'].sort(key=lambda x: x['kp_code']) for child in node['children']: sort_children(child) # 计算总未审核题目数(包括子节点) calculate_total_pending_count(node) for root in kp_tree: sort_children(root) conn.close() except Exception as e: print(f"Error loading knowledge points with pending count from database: {e}") import traceback traceback.print_exc() return kp_map, kp_tree, other_questions_count KP_MAP, KP_HIERARCHY = load_kp_structure() def find_kp_hierarchy(kp_id): """ 根据知识点ID查找其层级信息(chapter, section, subsection) 返回: { 'chapter': {'id': xxx, 'label': 'xxx'}, 'section': {'id': xxx, 'label': 'xxx'} 或 None, 'subsection': {'id': xxx, 'label': 'xxx'} 或 None } """ kp_id_str = str(kp_id) result = { 'chapter': None, 'section': None, 'subsection': None } for chapter in KP_HIERARCHY: # 检查是否是chapter本身 if str(chapter["id"]) == kp_id_str: result['chapter'] = {'id': chapter["id"], 'label': chapter["label"]} return result # 检查sections for section in chapter.get("sections", []): # 检查是否是section本身 if str(section["id"]) == kp_id_str: result['chapter'] = {'id': chapter["id"], 'label': chapter["label"]} result['section'] = {'id': section["id"], 'label': section["label"]} return result # 检查subsections for subsection in section.get("subsections", []): if str(subsection["id"]) == kp_id_str: result['chapter'] = {'id': chapter["id"], 'label': chapter["label"]} result['section'] = {'id': section["id"], 'label': section["label"]} result['subsection'] = {'id': subsection["id"], 'label': subsection["label"]} return result return result @app.route('/question_management') def question_management(): """题目管理页面:显示知识点目录和题目列表""" # 从数据库加载知识点树形结构 _, kp_tree = load_kp_structure_from_db() # 查询"其他题目"(未关联kp_code的题目)数量 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT COUNT(*) as count FROM questions_tem WHERE (kp_code IS NULL OR kp_code = '') """) other_questions_count = cursor.fetchone()['count'] if cursor.rowcount > 0 else 0 conn.close() except Exception as e: print(f"查询其他题目数量失败: {e}") other_questions_count = 0 return render_template('question_management.html', kp_tree=kp_tree, is_audit_mode=False, other_questions_count=other_questions_count) @app.route('/api/questions_by_kp/') def api_questions_by_kp(kp_code): """根据知识点代码返回题目列表(JSON格式)""" return _api_questions_by_kp(kp_code, audit_only=False) @app.route('/api/pending_questions_by_kp/') def api_pending_questions_by_kp(kp_code): """根据知识点代码返回未审核题目列表(JSON格式,用于审核页面)""" return _api_questions_by_kp(kp_code, audit_only=True) def _api_questions_by_kp(kp_code, audit_only=False): """根据知识点代码返回题目列表(JSON格式)""" try: conn = get_db_connection() except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 try: cursor = conn.cursor(dictionary=True) # 获取年级筛选参数(可选) grade_filter = request.args.get('grade', type=int) # 获取分页参数(可选) page = request.args.get('page', type=int, default=1) page_size = 20 # 每页显示20道题 # 先获取知识点名称(如果kp_code不为空) kp_name = kp_code if kp_code != 'null' and kp_code != '': cursor.execute("SELECT name FROM knowledge_points_copy1 WHERE kp_code = %s", (kp_code,)) kp_row = cursor.fetchone() kp_name = kp_row['name'] if kp_row else kp_code # 构建查询条件 where_conditions = [] params = [] # 知识点条件 if kp_code == 'null' or kp_code == '': where_conditions.append("(kp_code IS NULL OR kp_code = '' OR kp_code = 'null')") kp_name = "其他题目" else: where_conditions.append("kp_code = %s") params.append(kp_code) # 审核状态条件 if audit_only: where_conditions.append("(audit_reason IS NULL OR audit_reason = '')") # 年级筛选条件(仅对"其他题目"有效,因为其他题目有grade字段) # grade字段是int类型,使用CAST确保类型匹配 # 注意:如果grade为NULL,CAST会返回NULL,无法匹配,所以需要确保grade不为NULL if grade_filter is not None and (kp_code == 'null' or kp_code == ''): where_conditions.append("grade IS NOT NULL AND CAST(grade AS UNSIGNED) = %s") params.append(grade_filter) where_clause = " AND ".join(where_conditions) # 先查询总数(用于分页) count_query = f"SELECT COUNT(*) as total FROM questions_tem WHERE {where_clause}" cursor.execute(count_query, params) total_count = cursor.fetchone()['total'] total_pages = (total_count + page_size - 1) // page_size # 向上取整 # 确保页码有效 if page < 1: page = 1 if page > total_pages and total_pages > 0: page = total_pages # 计算偏移量 offset = (page - 1) * page_size # 查询题目(带分页) # 使用 id DESC 排序,确保列表顺序正确(按创建时间倒序,最新的在前) query = f""" SELECT question_code, stem, kp_code, kp_id, audit_reason, audit_status, difficulty, question_type, answer, solution, options, grade FROM questions_tem WHERE {where_clause} ORDER BY id DESC LIMIT %s OFFSET %s """ params_with_pagination = params + [page_size, offset] cursor.execute(query, params_with_pagination) questions = cursor.fetchall() # 为了统计,需要查询所有题目(不分页) # 统计查询不需要排序,但为了保持一致性也使用 id DESC stats_query = f""" SELECT question_code, audit_reason, difficulty, question_type FROM questions_tem WHERE {where_clause} ORDER BY id DESC """ cursor.execute(stats_query, params) all_questions_for_stats = cursor.fetchall() # 处理选项JSON for q in questions: if q.get('options'): try: opt_data = q['options'] if isinstance(opt_data, str): opt_data = opt_data.replace("'", '"') q['options'] = json.loads(opt_data) except: q['options'] = {} # 计算统计数据(基于所有题目,不分页) # 审核统计 pass_count = sum(1 for q in all_questions_for_stats if q.get('audit_reason') == '合格') fail_count = sum(1 for q in all_questions_for_stats if q.get('audit_reason') == '不合格') pending_count = sum(1 for q in all_questions_for_stats if not q.get('audit_reason') or q.get('audit_reason') == '') # 难度统计 difficulty_jichu = 0 # 筑基 0.2 difficulty_tifen = 0 # 提分 0.4 difficulty_peiyou = 0 # 培优 0.7 difficulty_unknown = 0 # 未设置难度 for q in all_questions_for_stats: diff = q.get('difficulty') if diff is not None: try: diff_float = float(diff) if abs(diff_float - 0.2) < 0.1: difficulty_jichu += 1 elif abs(diff_float - 0.4) < 0.1: difficulty_tifen += 1 elif abs(diff_float - 0.7) < 0.1: difficulty_peiyou += 1 else: difficulty_unknown += 1 except: difficulty_unknown += 1 else: difficulty_unknown += 1 # 题型统计 question_type_stats = {} for q in all_questions_for_stats: q_type = q.get('question_type') or '未分类' question_type_stats[q_type] = question_type_stats.get(q_type, 0) + 1 conn.close() return jsonify({ 'success': True, 'kp_code': kp_code, 'kp_name': kp_name, 'questions': questions, 'count': total_count, 'pagination': { 'page': page, 'page_size': page_size, 'total_pages': total_pages, 'total_count': total_count }, 'stats': { 'total': total_count, 'audit': { 'pass': pass_count, 'fail': fail_count, 'pending': pending_count, 'pass_rate': round((pass_count / (pass_count + fail_count) * 100) if (pass_count + fail_count) > 0 else 0, 1), 'audit_rate': round(((pass_count + fail_count) / total_count * 100) if total_count > 0 else 0, 1) }, 'difficulty': { 'jichu': difficulty_jichu, 'tifen': difficulty_tifen, 'peiyou': difficulty_peiyou, 'unknown': difficulty_unknown }, 'question_type': question_type_stats } }) except Exception as e: try: conn.close() except: pass return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/') def index(): """首页:显示题目统计信息""" try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) # 总体统计 cursor.execute(""" SELECT COUNT(*) AS total_count, SUM(CASE WHEN audit_reason = '合格' THEN 1 ELSE 0 END) AS pass_count, SUM(CASE WHEN audit_reason = '不合格' OR audit_status = 1 THEN 1 ELSE 0 END) AS fail_count, SUM(CASE WHEN audit_reason IS NULL OR audit_reason = '' THEN 1 ELSE 0 END) AS pending_count FROM questions_tem """) overall_stats = cursor.fetchone() total = int(overall_stats['total_count'] or 0) pass_count = int(overall_stats['pass_count'] or 0) fail_count = int(overall_stats['fail_count'] or 0) pending_count = int(overall_stats['pending_count'] or 0) audited_count = pass_count + fail_count # 计算审核通过率 pass_rate = (pass_count / audited_count * 100) if audited_count > 0 else 0 audit_rate = (audited_count / total * 100) if total > 0 else 0 # 教材系列统计 cursor.execute(""" SELECT COUNT(*) AS total_series, SUM(CASE WHEN is_active = 1 THEN 1 ELSE 0 END) AS active_series FROM textbook_series_copy1 """) series_stats = cursor.fetchone() total_series = int(series_stats['total_series'] or 0) active_series = int(series_stats['active_series'] or 0) # 教材统计 cursor.execute("SELECT COUNT(*) AS total_textbooks FROM textbooks_copy1") textbook_stats = cursor.fetchone() total_textbooks = int(textbook_stats['total_textbooks'] or 0) # 知识点统计 cursor.execute("SELECT COUNT(*) AS total_kp FROM knowledge_points_copy1") kp_stats = cursor.fetchone() total_kp = int(kp_stats['total_kp'] or 0) # 知识点层级统计(按级别统计) cursor.execute(""" SELECT CASE WHEN parent_kp_code IS NULL OR parent_kp_code = '' THEN 'level_0' WHEN parent_kp_code IN (SELECT kp_code FROM knowledge_points_copy1 WHERE parent_kp_code IS NULL OR parent_kp_code = '') THEN 'level_1' ELSE 'level_2_plus' END AS level_type, COUNT(*) AS count FROM knowledge_points_copy1 GROUP BY level_type """) kp_level_stats = cursor.fetchall() kp_level_0 = 0 kp_level_1 = 0 kp_level_2_plus = 0 for row in kp_level_stats: if row['level_type'] == 'level_0': kp_level_0 = int(row['count'] or 0) elif row['level_type'] == 'level_1': kp_level_1 = int(row['count'] or 0) else: kp_level_2_plus = int(row['count'] or 0) # 最近添加的题目(如果有创建时间字段) cursor.execute(""" SELECT question_code, stem, kp_id, audit_reason FROM questions_tem ORDER BY id DESC LIMIT 5 """) recent_questions = cursor.fetchall() # 按审核状态统计 status_stats = { 'pass': pass_count, 'fail': fail_count, 'pending': pending_count, } conn.close() return render_template('index.html', total=total, pass_count=pass_count, fail_count=fail_count, pending_count=pending_count, audited_count=audited_count, pass_rate=round(pass_rate, 1), audit_rate=round(audit_rate, 1), total_series=total_series, active_series=active_series, total_textbooks=total_textbooks, total_kp=total_kp, kp_level_0=kp_level_0, kp_level_1=kp_level_1, kp_level_2_plus=kp_level_2_plus, recent_questions=recent_questions, status_stats=status_stats) @app.route('/audit_questions') def audit_questions(): """审核题目页面:显示所有未审核的题目,按知识点分类""" try: # 从数据库加载知识点树形结构(只包含有未审核题目的节点) _, kp_tree, other_questions_count = load_kp_structure_with_pending_count() # 查询总体统计 conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT COUNT(*) AS total_count, SUM(CASE WHEN audit_reason = '合格' THEN 1 ELSE 0 END) AS pass_count, SUM(CASE WHEN audit_reason = '不合格' OR audit_status = 1 THEN 1 ELSE 0 END) AS fail_count, SUM(CASE WHEN audit_reason IS NULL OR audit_reason = '' THEN 1 ELSE 0 END) AS pending_count FROM questions_tem """) overall_stats = cursor.fetchone() total = int(overall_stats['total_count'] or 0) pass_count = int(overall_stats['pass_count'] or 0) fail_count = int(overall_stats['fail_count'] or 0) pending_count = int(overall_stats['pending_count'] or 0) audited_count = pass_count + fail_count pass_rate = (pass_count / audited_count * 100) if audited_count > 0 else 0 audit_rate = (audited_count / total * 100) if total > 0 else 0 conn.close() return render_template('audit_questions.html', kp_tree=kp_tree, other_questions_count=other_questions_count, total=total, pass_count=pass_count, fail_count=fail_count, pending_count=pending_count, audited_count=audited_count, pass_rate=round(pass_rate, 1), audit_rate=round(audit_rate, 1)) except Exception as e: return render_db_error(e) @app.route('/search') def search_by_question_code(): """ 输入 question_code 直接跳转题目详情。 GET /search?q=xxxx """ q = (request.args.get("q") or "").strip() if not q: return redirect(url_for("index")) try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) cursor.execute("SELECT question_code FROM questions_tem WHERE question_code = %s LIMIT 1", (q,)) row = cursor.fetchone() conn.close() if row: return redirect(url_for("detail", question_code=q)) return render_template("search_not_found.html", q=q) @app.route('/search_id') def search_by_question_id(): """ 输入主键ID(比如 id / question_id / pk_id / qid)查题。 GET /search_id?id=123 """ raw = (request.args.get("id") or "").strip() if not raw: return redirect(url_for("index")) try: conn = get_db_connection() except Exception as e: return render_db_error(e) pk_cols = get_pk_columns(conn) if not pk_cols: conn.close() return render_template("search_id_not_supported.html", q=raw) cursor = conn.cursor(dictionary=True) found_code = None for col in pk_cols: try: cursor.execute(f"SELECT question_code FROM questions_tem WHERE {col} = %s LIMIT 1", (raw,)) row = cursor.fetchone() if row and row.get("question_code"): found_code = row["question_code"] break except Exception: continue conn.close() if found_code: return redirect(url_for("detail", question_code=found_code)) return render_template("search_id_not_found.html", q=raw, pk_cols=pk_cols) @app.route('/questions/') def question_list(kp_code): try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) # 检查是否是审核模式(只显示未审核题目) audit_only = request.args.get('audit_only', '').lower() == 'true' # 处理 kp_code 为 'null' 或空字符串的情况(表示没有 kp_id 的题目) # 注意:URL参数仍使用 kp_code,但数据库字段已改为 kp_id # 使用别名 kp_id AS kp_code 以兼容模板代码 if kp_code == 'null' or kp_code == '': if audit_only: cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE (kp_id IS NULL OR kp_id = '') AND (audit_reason IS NULL OR audit_reason = '') ORDER BY question_code DESC") else: cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id IS NULL OR kp_id = '' ORDER BY question_code DESC") kp_name = "未分类题目" hierarchy_info = {'chapter': None, 'section': None, 'subsection': None} else: # 查找层级信息,判断是否是章节级别 hierarchy_info = find_kp_hierarchy(kp_code) # 如果是章节级别,需要查询该章节下所有 section 和 subsection 的题目 if hierarchy_info['chapter'] and not hierarchy_info['section'] and not hierarchy_info['subsection']: # 这是章节级别,需要收集该章节下所有的 kp_id chapter_id = hierarchy_info['chapter']['id'] kp_ids = [str(chapter_id)] # 包含章节本身 # 查找该章节下的所有 section 和 subsection for chapter in KP_HIERARCHY: if chapter["id"] == chapter_id: for section in chapter.get("sections", []): kp_ids.append(str(section["id"])) for subsection in section.get("subsections", []): kp_ids.append(str(subsection["id"])) break # 使用 IN 查询该章节下所有知识点的题目 placeholders = ','.join(['%s'] * len(kp_ids)) if audit_only: query = f"SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id IN ({placeholders}) AND (audit_reason IS NULL OR audit_reason = '') ORDER BY question_code DESC" else: query = f"SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id IN ({placeholders}) ORDER BY question_code DESC" cursor.execute(query, tuple(kp_ids)) kp_name = hierarchy_info['chapter']['label'] else: # 普通的知识点查询(section 或 subsection) if audit_only: cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id = %s AND (audit_reason IS NULL OR audit_reason = '') ORDER BY question_code DESC", (kp_code,)) else: cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id = %s ORDER BY question_code DESC", (kp_code,)) kp_name = KP_MAP.get(str(kp_code), kp_code) questions = cursor.fetchall() # 构建录入题目页面的URL add_question_url = f'/add_question?kp_code={kp_code}' if hierarchy_info['chapter']: chapter_label_encoded = urllib.parse.quote(hierarchy_info['chapter']['label']) add_question_url += f"&chapter={hierarchy_info['chapter']['id']}&chapter_label={chapter_label_encoded}" if hierarchy_info['section']: section_label_encoded = urllib.parse.quote(hierarchy_info['section']['label']) add_question_url += f"§ion={hierarchy_info['section']['id']}§ion_label={section_label_encoded}" if hierarchy_info['subsection']: subsection_label_encoded = urllib.parse.quote(hierarchy_info['subsection']['label']) add_question_url += f"&subsection={hierarchy_info['subsection']['id']}&subsection_label={subsection_label_encoded}" conn.close() return render_template('questions.html', questions=questions, kp_code=kp_code, kp_name=kp_name, node_id=None, hierarchy_info=hierarchy_info, add_question_url=add_question_url, audit_only=audit_only) @app.route('/textbook/') def textbook_question_list(node_id): """教材节点题目列表""" try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) # 获取教材节点标题 try: cursor.execute("SELECT title FROM textbook_catalog_nodes WHERE id = %s", (node_id,)) node_row = cursor.fetchone() title = node_row.get('title') if node_row else f'节点{node_id}' except Exception: # 如果表不存在,使用默认标题 title = f'节点{node_id}' # 查询该节点下的所有题目(使用别名 kp_id AS kp_code 以兼容模板代码) cursor.execute( "SELECT *, kp_id AS kp_code FROM questions_tem WHERE textbook_catalog_nodes_id = %s ORDER BY question_code DESC", (node_id,) ) questions = cursor.fetchall() conn.close() return render_template('questions.html', questions=questions, kp_code=None, kp_name=title, node_id=node_id) @app.route('/detail/') def detail(question_code): try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) # 使用别名 kp_id AS kp_code 以兼容模板代码 cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE question_code = %s", (question_code,)) question = cursor.fetchone() if not question: conn.close() return render_template("search_not_found.html", q=question_code), 404 # 获取上下题索引(从 kp_id 字段读取知识点ID,但模板中使用 kp_code) # 优先使用 URL 参数中的 kp_code(用于返回列表时定位到正确的知识点) kp_code_from_url = request.args.get('kp_code') kp_code_db = kp_code_from_url or question.get('kp_id') or question.get('kp_code') # 兼容处理 textbook_node_id = question.get('textbook_catalog_nodes_id') prev_code = None next_code = None curr_idx = 0 total = 1 if kp_code_db: # 有 kp_id:查询同知识点下的所有题目 cursor.execute("SELECT question_code FROM questions_tem WHERE kp_id = %s ORDER BY question_code DESC", (kp_code_db,)) all_codes = [r['question_code'] for r in cursor.fetchall()] if question_code in all_codes: curr_idx = all_codes.index(question_code) prev_code = all_codes[curr_idx - 1] if curr_idx > 0 else None next_code = all_codes[curr_idx + 1] if curr_idx < len(all_codes) - 1 else None total = len(all_codes) elif textbook_node_id: # 没有 kp_code 但有 textbook_catalog_nodes_id:查询同教材节点下的所有题目 cursor.execute( "SELECT question_code FROM questions_tem WHERE textbook_catalog_nodes_id = %s ORDER BY question_code DESC", (textbook_node_id,) ) all_codes = [r['question_code'] for r in cursor.fetchall()] if question_code in all_codes: curr_idx = all_codes.index(question_code) prev_code = all_codes[curr_idx - 1] if curr_idx > 0 else None next_code = all_codes[curr_idx + 1] if curr_idx < len(all_codes) - 1 else None total = len(all_codes) else: # 既没有 kp_code 也没有 textbook_catalog_nodes_id:不显示上下题导航 pass conn.close() # 处理选项 JSON options = [] if question.get('options'): try: opt_data = question['options'] if isinstance(opt_data, str): # 兼容单引号 JSON opt_data = opt_data.replace("'", '"') d = json.loads(opt_data) else: d = opt_data options = sorted(d.items()) except: options = [] # 处理知识点名称:如果没有 kp_code,显示"未分类" kp_name = "未分类题目" hierarchy_info = {'chapter': None, 'section': None, 'subsection': None} add_question_url = None if kp_code_db: kp_name = KP_MAP.get(str(kp_code_db), kp_code_db) # 查找层级信息 hierarchy_info = find_kp_hierarchy(kp_code_db) # 构建录入题目页面的URL add_question_url = f'/add_question?kp_code={kp_code_db}' if hierarchy_info['chapter']: chapter_label_encoded = urllib.parse.quote(hierarchy_info['chapter']['label']) add_question_url += f"&chapter={hierarchy_info['chapter']['id']}&chapter_label={chapter_label_encoded}" if hierarchy_info['section']: section_label_encoded = urllib.parse.quote(hierarchy_info['section']['label']) add_question_url += f"§ion={hierarchy_info['section']['id']}§ion_label={section_label_encoded}" if hierarchy_info['subsection']: subsection_label_encoded = urllib.parse.quote(hierarchy_info['subsection']['label']) add_question_url += f"&subsection={hierarchy_info['subsection']['id']}&subsection_label={subsection_label_encoded}" elif textbook_node_id: # 查询教材节点标题 try: cursor.execute("SELECT title FROM textbook_catalog_nodes WHERE id = %s", (textbook_node_id,)) node_row = cursor.fetchone() if node_row: kp_name = node_row.get('title') or f'节点{textbook_node_id}' else: kp_name = f'节点{textbook_node_id}' except Exception as e: # 如果表不存在或查询失败,使用默认标题 print(f"Warning: 无法查询教材节点标题: {e}") kp_name = f'节点{textbook_node_id}' return render_template('detail.html', q=question, options=options, kp_name=kp_name, kp_code=kp_code_db, node_id=str(textbook_node_id) if textbook_node_id else None, prev_code=prev_code, next_code=next_code, total=total, curr_num=curr_idx + 1 if total > 0 else 1, add_question_url=add_question_url) @app.route('/audit', methods=['POST']) def audit(): data = request.json q_code = data.get('question_code') status_text = data.get('audit_reason') # "合格" or "不合格" status_val = 1 if status_text == "不合格" else 0 try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 更新 questions_tem 表的审核状态 cursor.execute("UPDATE questions_tem SET audit_reason=%s, audit_status=%s WHERE question_code=%s", (status_text, status_val, q_code)) # 如果审核通过(合格),将题目插入到 questions 表 if status_text == "合格": # 从 questions_tem 表获取所有字段数据 cursor.execute("SELECT * FROM questions_tem WHERE question_code = %s", (q_code,)) question_data = cursor.fetchone() if question_data: # 检查 questions 表是否已存在该 question_code cursor.execute("SELECT question_code FROM questions WHERE question_code = %s", (q_code,)) existing = cursor.fetchone() # 准备要插入/更新的字段(排除 id 和 is_repeat,因为 questions 表没有 is_repeat 字段) fields_to_copy = [ 'question_code', 'kp_id', 'textbook_catalog_nodes_id', 'stem', 'options', 'answer', 'solution', 'difficulty', 'question_category', 'source', 'tags', 'question_type', 'source_file_id', 'source_paper_id', 'paper_part_id', 'textbook_id', 'meta', 'created_at', 'updated_at', 'audit_status', 'audit_reason', 'title_1', 'title_2', 'title_3', 'create_by', 'kp_code', 'kp_name', 'kp_reference' ] if existing: # 如果已存在,更新数据 update_fields = [] update_values = [] for field in fields_to_copy: if field in question_data: update_fields.append(f"{field} = %s") update_values.append(question_data[field]) update_values.append(q_code) update_sql = f"UPDATE questions SET {', '.join(update_fields)} WHERE question_code = %s" cursor.execute(update_sql, tuple(update_values)) else: # 如果不存在,插入新数据 insert_fields = [] insert_values = [] placeholders = [] for field in fields_to_copy: if field in question_data: insert_fields.append(field) insert_values.append(question_data[field]) placeholders.append('%s') insert_sql = f"INSERT INTO questions ({', '.join(insert_fields)}) VALUES ({', '.join(placeholders)})" cursor.execute(insert_sql, tuple(insert_values)) conn.commit() conn.close() return jsonify({'success': True}) except Exception as e: import traceback traceback.print_exc() return jsonify({'success': False, 'error': str(e)}) @app.route("/export_pdf_remote/") def export_pdf_remote(question_code): """ 远程导出:调用你给的接口拿到 pdf_url(可能是一个或多个),然后全部打开。 """ try: conn = get_db_connection() except Exception as e: return render_db_error(e) try: cursor = conn.cursor(dictionary=True) # 使用别名 kp_id AS kp_code 以兼容模板代码 cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE question_code = %s", (question_code,)) question = cursor.fetchone() finally: try: conn.close() except Exception: pass if not question: return render_template("search_not_found.html", q=question_code), 404 qid = get_question_pk(question) if not qid: # 没有常见主键列名,无法给 question_ids return render_template("search_id_not_supported.html", q=str(question_code)), 400 try: pdf_urls = request_remote_pdf_url(qid, include_grading=True) except Exception as e: # PDF接口失败,不显示数据库连接信息 return render_template( "db_error.html", error=str(e), db_host=None, db_port=None, db_name=None, db_user=None, ), 500 if not pdf_urls: # PDF接口未返回有效数据,不显示数据库连接信息 return render_template( "db_error.html", error="PDF 接口未返回有效的 pdf_url", db_host=None, db_port=None, db_name=None, db_user=None, ), 500 # 如果有多个 PDF,返回一个 HTML 页面,用 JavaScript 全部打开 if len(pdf_urls) > 1: return render_template("open_multiple_pdfs.html", pdf_urls=pdf_urls) else: # 只有一个 PDF,直接重定向 return redirect(pdf_urls[0]) @app.route("/api/optimize_stem/", methods=["POST"]) def api_optimize_stem(question_code): """ 生成“优化后的 stem”,并返回左右对比所需内容。 """ try: conn = get_db_connection() except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT question_code, stem FROM questions_tem WHERE question_code = %s", (question_code,)) row = cursor.fetchone() finally: try: conn.close() except Exception: pass if not row: return jsonify({"success": False, "error": "题目不存在"}), 404 old_stem = row.get("stem") or "" try: new_stem = call_ai_optimize_stem(old_stem) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 svg_ok = _svg_blocks_equal(old_stem, new_stem) return jsonify( { "success": True, "question_code": question_code, "old_stem": old_stem, "new_stem": new_stem, "svg_ok": bool(svg_ok), } ) @app.route("/api/replace_stem/", methods=["POST"]) def api_replace_stem(question_code): """ 将新 stem 覆写到 questions_tem.stem(仅改 stem)。 额外安全:替换前校验 SVG 不被改动。 """ data = request.json or {} new_stem = data.get("new_stem") if new_stem is None: return jsonify({"success": False, "error": "缺少 new_stem"}), 400 try: conn = get_db_connection() except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT question_code, stem FROM questions_tem WHERE question_code = %s", (question_code,)) row = cursor.fetchone() if not row: return jsonify({"success": False, "error": "题目不存在"}), 404 old_stem = row.get("stem") or "" if not _svg_blocks_equal(old_stem, str(new_stem)): return jsonify({"success": False, "error": "安全拦截:检测到 SVG 被改动,已禁止替换。"}), 400 cur2 = conn.cursor() cur2.execute("UPDATE questions_tem SET stem=%s WHERE question_code=%s", (str(new_stem), question_code)) conn.commit() return jsonify({"success": True, "question_code": question_code}) except Exception as e: try: conn.rollback() except Exception: pass return jsonify({"success": False, "error": str(e)}), 500 finally: try: conn.close() except Exception: pass @app.route('/api/score', methods=['POST']) def api_score_question(): """ 题目难度评分接口(支持文本、HTML和图片) 请求格式: { "stem": "题目文本内容或HTML(可选,HTML中可包含标签)", "image_url": "图片URL(可选)", "image_base64": "图片base64编码(可选,格式:)", "base_url": "基础URL(可选,用于处理HTML中相对路径的图片)", "custom_prompt": "可选的自定义提示词" } 返回格式: { "success": true, "data": { "difficulty_level": "筑基|提分|培优", "final_score": 0.00, "dimension_scores": { "推理与运算步数": 0.33, "知识点数量": 0.33, "抽象与构造要求": 0.33, "题型常规性": 0.33 } }, "message": "评分成功" } """ client = get_openai_client() if client is None: return jsonify({ "success": False, "error": "OpenAI客户端未初始化,请检查配置" }), 500 try: # 获取请求数据 data = request.get_json() if not data: return jsonify({ "success": False, "error": "请求体不能为空" }), 400 stem = data.get('stem', '').strip() image_url = data.get('image_url', '').strip() image_base64 = data.get('image_base64', '').strip() base_url = data.get('base_url', '').strip() # 用于处理相对路径的图片URL # 如果stem包含HTML标签,尝试提取图片 html_images = [] stem_text = stem if stem and (' 直接URL > HTML中提取的图片) images_to_add = [] if image_base64: # 处理base64图片 if not image_base64.startswith('data:'): image_base64 = f"data:image/png;base64,{image_base64}" images_to_add.append(image_base64) elif image_url: # 使用直接提供的图片URL images_to_add.append(image_url) elif html_images: # 使用从HTML中提取的图片URL images_to_add.extend(html_images) # 添加所有图片到content for img in images_to_add: if img.startswith('data:'): # base64图片 content.append({ "type": "image_url", "image_url": { "url": img } }) else: # URL图片 content.append({ "type": "image_url", "image_url": { "url": img } }) # 调用OpenAI API model_name = DIFFICULTY_SCORING_MODEL or AI_MODEL_NAME try: response = client.chat.completions.create( model=model_name, messages=[ { "role": "user", "content": content } ], temperature=DIFFICULTY_SCORING_TEMPERATURE, top_p=DIFFICULTY_SCORING_TOP_P, presence_penalty=DIFFICULTY_SCORING_PRESENCE_PENALTY, frequency_penalty=DIFFICULTY_SCORING_FREQUENCY_PENALTY ) except Exception as api_error: print(f"OpenAI API调用失败: {str(api_error)}") return jsonify({ "success": False, "error": f"AI服务调用失败: {str(api_error)}", "error_type": type(api_error).__name__ }), 500 if not response or not response.choices or len(response.choices) == 0: return jsonify({ "success": False, "error": "AI服务返回空响应" }), 500 result_text = response.choices[0].message.content # 调试:打印AI返回的原始内容(前500字符) print(f"AI返回原始内容(前500字符): {result_text[:500] if result_text else 'None'}") # 解析JSON响应 result_json = parse_json_response(result_text) if result_json is None: return jsonify({ "success": False, "error": "AI返回格式不正确,无法解析JSON", "raw_response": result_text[:500] if result_text else "None" # 只返回前500字符,避免响应过大 }), 500 # 验证返回的JSON结构 required_fields = ["difficulty_level", "total_score", "dimension_scores"] for field in required_fields: if field not in result_json: return jsonify({ "success": False, "error": f"返回JSON缺少必需字段: {field}", "raw_response": result_text }), 500 # 验证维度分值(放宽验证,只记录警告,不阻止返回) dimension_scores = result_json.get("dimension_scores", {}) valid_dimension_values = [0.33, 0.66, 1.00] # 维度名称映射(支持中文和英文键名) dimension_name_mapping = { # 中文键名(标准) "推理与运算步数": "推理与运算步数", "知识点数量": "知识点数量", "抽象与构造要求": "抽象与构造要求", "题型常规性": "题型常规性", # 英文键名(兼容) "reasoning_steps": "推理与运算步数", "knowledge_points": "知识点数量", "abstraction_construction": "抽象与构造要求", "typicality": "题型常规性", # 其他可能的英文键名 "reasoning": "推理与运算步数", "knowledge": "知识点数量", "abstraction": "抽象与构造要求", "conventionality": "题型常规性" } # 标准化维度分值(统一转换为中文键名) normalized_dimension_scores = {} for key, value in dimension_scores.items(): # 查找对应的中文键名 chinese_key = dimension_name_mapping.get(key, key) normalized_dimension_scores[chinese_key] = value # 更新 result_json 中的 dimension_scores 为标准格式 result_json["dimension_scores"] = normalized_dimension_scores # 验证维度分值 required_dimensions = ["推理与运算步数", "知识点数量", "抽象与构造要求", "题型常规性"] validation_errors = [] for dim in required_dimensions: if dim not in normalized_dimension_scores: validation_errors.append(f"缺少维度评分: {dim}") continue dim_value = normalized_dimension_scores[dim] # 允许浮点数精度误差 if not any(abs(dim_value - v) < 0.01 for v in valid_dimension_values): validation_errors.append(f"维度 {dim} 的值 {dim_value} 不在允许范围内(0.33、0.66、1.00)") # 验证 total_score 计算(四个维度相加后归一化) if len(normalized_dimension_scores) == 4: calculated_total = sum(normalized_dimension_scores.values()) / 4.0 total_score = result_json.get("total_score", 0) # 允许小的浮点数误差(0.05,放宽验证) if abs(calculated_total - total_score) > 0.05: validation_errors.append(f"total_score ({total_score}) 与计算值 ({calculated_total:.2f}) 不一致") else: total_score = result_json.get("total_score", 0) # 验证 difficulty_level 映射(如果 total_score 有效) difficulty_level = result_json.get("difficulty_level", "") if total_score >= 0 and total_score <= 1: if total_score <= 0.25: expected_level = "筑基" elif total_score <= 0.5: expected_level = "提分" else: expected_level = "培优" if difficulty_level != expected_level: validation_errors.append(f"difficulty_level ({difficulty_level}) 与 total_score ({total_score}) 的映射不一致,应为 {expected_level}") # 如果有验证错误,记录但不阻止返回(放宽验证) if validation_errors: print(f"难度评分验证警告: {', '.join(validation_errors)}") # 仍然返回结果,但添加警告信息 result_json["_validation_warnings"] = validation_errors return jsonify({ "success": True, "data": result_json, "message": "评分成功" }) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"难度评分接口错误: {error_trace}") return jsonify({ "success": False, "error": f"评分过程中出现错误: {str(e)}", "error_type": type(e).__name__ }), 500 @app.route('/kp_management') def kp_management(): """知识点管理页面:从 knowledge_points_copy1 表读取数据,按层级结构展示""" try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) # 获取所有知识点(显示小学、初中、高中) cursor.execute(""" SELECT id, kp_code, name, subject, grade, parent_kp_code, prerequisite_kp_codes, dependent_kp_codes, related_kp_codes, stats, created_at, updated_at, skills, direct_score, related_score FROM knowledge_points_copy1 WHERE grade IN ('小学', '初中', '高中') ORDER BY kp_code ASC """) all_kps = cursor.fetchall() # 处理JSON字段 for kp in all_kps: for field in ['prerequisite_kp_codes', 'dependent_kp_codes', 'related_kp_codes', 'stats', 'skills', 'direct_score', 'related_score']: if kp[field] and isinstance(kp[field], str): try: kp[field] = json.loads(kp[field]) except: kp[field] = None # 构建层级结构 kp_dict = {kp['kp_code']: kp for kp in all_kps} root_kps = [] # 为每个知识点添加children列表 for kp in all_kps: kp['children'] = [] kp['level'] = 0 # 构建树形结构 for kp in all_kps: if kp['parent_kp_code'] and kp['parent_kp_code'] in kp_dict: parent = kp_dict[kp['parent_kp_code']] parent['children'].append(kp) # 计算层级深度 kp['level'] = parent['level'] + 1 else: root_kps.append(kp) # 递归排序:按kp_code排序 def sort_kp_tree(kp_list): kp_list.sort(key=lambda x: x['kp_code']) for kp in kp_list: if kp['children']: sort_kp_tree(kp['children']) sort_kp_tree(root_kps) # 扁平化列表(用于表格展示,保持层级顺序) # 默认显示:level 0 和 level 1,level 2 及以上默认隐藏 def flatten_tree(kp_list, result=None, level=0): if result is None: result = [] for kp in kp_list: kp['display_level'] = level kp['has_children'] = len(kp['children']) > 0 # level 0 和 level 1 默认显示,level 2 及以上默认隐藏 kp['default_visible'] = level <= 1 result.append(kp) if kp['children']: flatten_tree(kp['children'], result, level + 1) return result knowledge_points = flatten_tree(root_kps) # 获取所有知识点代码和名称,用于下拉选择(显示小学、初中、高中) cursor.execute(""" SELECT kp_code, name FROM knowledge_points_copy1 WHERE grade IN ('小学', '初中', '高中') ORDER BY kp_code ASC """) kp_options = cursor.fetchall() # 统计每个知识点关联的题目数量(统计小学、初中、高中) # questions_tem 表使用 kp_code 字段,关联 knowledge_points_copy1 的 kp_code 字段 cursor.execute(""" SELECT kp.id, kp.kp_code, COUNT(q.question_code) as question_count FROM knowledge_points_copy1 kp LEFT JOIN questions_tem q ON q.kp_code = kp.kp_code WHERE kp.grade IN ('小学', '初中', '高中') GROUP BY kp.id, kp.kp_code """) question_counts_raw = cursor.fetchall() question_counts = {row['kp_code']: row['question_count'] for row in question_counts_raw} # 递归计算父节点的题目数量(包括所有子节点的题目数量) def calculate_total_question_count(kp): # 先计算当前节点直接关联的题目数量 direct_count = question_counts.get(kp['kp_code'], 0) # 递归计算所有子节点的题目数量总和 children_count = 0 if kp['children']: for child in kp['children']: children_count += calculate_total_question_count(child) # 父节点的题目数量 = 直接关联的题目 + 所有子节点的题目总和 total_count = direct_count + children_count kp['question_count'] = total_count kp['direct_question_count'] = direct_count # 保存直接关联的题目数量 return total_count # 计算所有节点的题目数量(包括子节点) for kp in root_kps: calculate_total_question_count(kp) # 创建知识点代码到题目数量的映射(用于扁平列表) kp_code_to_question_count = {} def build_question_count_map(kp_list): for kp in kp_list: kp_code_to_question_count[kp['kp_code']] = kp.get('question_count', 0) if kp.get('children'): build_question_count_map(kp['children']) build_question_count_map(root_kps) # 更新扁平列表的题目数量 for kp in knowledge_points: kp['question_count'] = kp_code_to_question_count.get(kp['kp_code'], 0) # 获取各学段的统计信息(显示小学、初中、高中) cursor.execute(""" SELECT grade, COUNT(*) as count FROM knowledge_points_copy1 WHERE grade IN ('小学', '初中', '高中') GROUP BY grade """) grade_stats_raw = cursor.fetchall() grade_stats = {row['grade']: row['count'] for row in grade_stats_raw} # 显示小学、初中、高中 grade_info = { '小学': {'count': grade_stats.get('小学', 0), 'color': 'from-pink-500 to-rose-600', 'icon': 'ri-book-open-line', 'bg': 'bg-gradient-to-br from-pink-50 to-rose-50'}, '初中': {'count': grade_stats.get('初中', 0), 'color': 'from-blue-500 to-indigo-600', 'icon': 'ri-graduation-cap-line', 'bg': 'bg-gradient-to-br from-blue-50 to-indigo-50'}, '高中': {'count': grade_stats.get('高中', 0), 'color': 'from-purple-500 to-violet-600', 'icon': 'ri-school-line', 'bg': 'bg-gradient-to-br from-purple-50 to-violet-50'} } conn.close() return render_template('kp_management.html', knowledge_points=knowledge_points, kp_tree=root_kps, kp_options=kp_options, grade_info=grade_info) @app.route('/textbook_management') def textbook_management(): """教材管理页面""" try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) # 获取所有教材系列 cursor.execute("SELECT * FROM textbook_series_copy1 ORDER BY sort_order ASC, id ASC") series_list = cursor.fetchall() # 获取所有教材 cursor.execute("SELECT * FROM textbooks_copy1 ORDER BY series_id ASC, grade ASC, semester ASC") textbooks_list = cursor.fetchall() # 获取所有知识点(用于关联) cursor.execute("SELECT kp_code, name FROM knowledge_points_copy1 ORDER BY kp_code ASC") kp_options = cursor.fetchall() conn.close() return render_template('textbook_management.html', series_list=series_list, textbooks_list=textbooks_list, kp_options=kp_options) # ==================== 教材系列 API ==================== @app.route('/api/textbook/series/create', methods=['POST']) def api_textbook_series_create(): """创建教材系列""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() name = (data.get('name') or '').strip() slug = (data.get('slug') or '').strip() or None publisher = (data.get('publisher') or '').strip() or None region = (data.get('region') or '').strip() or None is_active = 1 if data.get('is_active') else 0 if not name: return jsonify({'success': False, 'error': '系列名称不能为空'}) cursor.execute(""" INSERT INTO textbook_series_copy1 (name, slug, publisher, region, is_active, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, NOW(), NOW()) """, (name, slug, publisher, region, is_active)) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'success': True, 'message': '创建成功', 'id': new_id}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/series/update/', methods=['POST']) def api_textbook_series_update(series_id): """更新教材系列""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() name = (data.get('name') or '').strip() slug = (data.get('slug') or '').strip() or None publisher = (data.get('publisher') or '').strip() or None region = (data.get('region') or '').strip() or None is_active = 1 if data.get('is_active') else 0 if not name: return jsonify({'success': False, 'error': '系列名称不能为空'}) cursor.execute(""" UPDATE textbook_series_copy1 SET name = %s, slug = %s, publisher = %s, region = %s, is_active = %s, updated_at = NOW() WHERE id = %s """, (name, slug, publisher, region, is_active, series_id)) conn.commit() conn.close() return jsonify({'success': True, 'message': '更新成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/series/delete/', methods=['POST']) def api_textbook_series_delete(series_id): """删除教材系列""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 检查是否有关联的教材 cursor.execute("SELECT COUNT(*) as count FROM textbooks_copy1 WHERE series_id = %s", (series_id,)) result = cursor.fetchone() if result['count'] > 0: return jsonify({'success': False, 'error': '该系列下存在教材,无法删除'}) cursor.execute("DELETE FROM textbook_series_copy1 WHERE id = %s", (series_id,)) conn.commit() conn.close() return jsonify({'success': True, 'message': '删除成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/series/get/', methods=['GET']) def api_textbook_series_get(series_id): """获取教材系列详情""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM textbook_series_copy1 WHERE id = %s", (series_id,)) series = cursor.fetchone() conn.close() if not series: return jsonify({'success': False, 'error': '系列不存在'}) return jsonify({'success': True, 'data': series}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/series/toggle_active/', methods=['POST']) def api_textbook_series_toggle_active(series_id): """切换教材系列激活状态""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 获取当前状态 cursor.execute("SELECT is_active FROM textbook_series_copy1 WHERE id = %s", (series_id,)) series = cursor.fetchone() if not series: return jsonify({'success': False, 'error': '系列不存在'}) # 切换状态:1变0,0变1 new_status = 1 if series['is_active'] == 0 else 0 cursor.execute(""" UPDATE textbook_series_copy1 SET is_active = %s, updated_at = NOW() WHERE id = %s """, (new_status, series_id)) conn.commit() conn.close() return jsonify({ 'success': True, 'message': '状态已更新', 'is_active': new_status }) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) # ==================== 教材 API ==================== @app.route('/api/textbook/create', methods=['POST']) def api_textbook_create(): """创建教材""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() series_id = data.get('series_id') official_title = (data.get('official_title') or '').strip() stage = (data.get('stage') or '').strip() or None grade = (data.get('grade') or '').strip() or None semester = data.get('semester') if not series_id: return jsonify({'success': False, 'error': '教材系列不能为空'}) if not official_title: return jsonify({'success': False, 'error': '教材名称不能为空'}) cursor.execute(""" INSERT INTO textbooks_copy1 (series_id, official_title, stage, grade, semester, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, NOW(), NOW()) """, (series_id, official_title, stage, grade, semester)) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'success': True, 'message': '创建成功', 'id': new_id}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/update/', methods=['POST']) def api_textbook_update(textbook_id): """更新教材""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() official_title = (data.get('official_title') or '').strip() stage = (data.get('stage') or '').strip() or None grade = (data.get('grade') or '').strip() or None semester = data.get('semester') if not official_title: return jsonify({'success': False, 'error': '教材名称不能为空'}) cursor.execute(""" UPDATE textbooks_copy1 SET official_title = %s, stage = %s, grade = %s, semester = %s, updated_at = NOW() WHERE id = %s """, (official_title, stage, grade, semester, textbook_id)) conn.commit() conn.close() return jsonify({'success': True, 'message': '更新成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/delete/', methods=['POST']) def api_textbook_delete(textbook_id): """删除教材""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 检查是否有目录节点 cursor.execute("SELECT COUNT(*) as count FROM textbook_catalog_nodes_copy1 WHERE textbook_id = %s", (textbook_id,)) result = cursor.fetchone() if result['count'] > 0: return jsonify({'success': False, 'error': '该教材下存在目录节点,无法删除'}) cursor.execute("DELETE FROM textbooks_copy1 WHERE id = %s", (textbook_id,)) conn.commit() conn.close() return jsonify({'success': True, 'message': '删除成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/get/', methods=['GET']) def api_textbook_get(textbook_id): """获取教材详情""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM textbooks_copy1 WHERE id = %s", (textbook_id,)) textbook = cursor.fetchone() conn.close() if not textbook: return jsonify({'success': False, 'error': '教材不存在'}) return jsonify({'success': True, 'data': textbook}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/list/', methods=['GET']) def api_textbook_list(series_id): """获取指定系列下的所有教材""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM textbooks_copy1 WHERE series_id = %s ORDER BY grade ASC, semester ASC", (series_id,)) textbooks = cursor.fetchall() conn.close() return jsonify({'success': True, 'data': textbooks}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) # ==================== 目录节点 API ==================== @app.route('/api/textbook/catalog/create', methods=['POST']) def api_textbook_catalog_create(): """创建目录节点""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() textbook_id = data.get('textbook_id') parent_id = data.get('parent_id') # 可以为None(顶级节点) node_type = (data.get('node_type') or 'chapter').strip() title = (data.get('title') or '').strip() display_no = (data.get('display_no') or '').strip() or None if not textbook_id: return jsonify({'success': False, 'error': '教材ID不能为空'}) if not title: return jsonify({'success': False, 'error': '节点标题不能为空'}) # 验证节点类型层级关系 if parent_id: # 有父节点,需要验证节点类型 cursor.execute("SELECT node_type FROM textbook_catalog_nodes_copy1 WHERE id = %s", (parent_id,)) parent = cursor.fetchone() if not parent: return jsonify({'success': False, 'error': '父节点不存在'}) parent_node_type = parent['node_type'] # 章节下只能创建 section if parent_node_type == 'chapter' and node_type != 'section': return jsonify({'success': False, 'error': '章节下只能创建小节(section)'}) # section 下只能创建 subsection elif parent_node_type == 'section' and node_type != 'subsection': return jsonify({'success': False, 'error': '小节下只能创建子小节(subsection)'}) # subsection 下不能再创建子节点 elif parent_node_type == 'subsection': return jsonify({'success': False, 'error': '子小节下不能再创建子节点'}) else: # 没有父节点,只能创建顶级节点(chapter) if node_type != 'chapter': return jsonify({'success': False, 'error': '顶级节点只能是章节(chapter)'}) # 计算depth depth = 1 if parent_id: cursor.execute("SELECT depth FROM textbook_catalog_nodes_copy1 WHERE id = %s", (parent_id,)) parent_depth_result = cursor.fetchone() if parent_depth_result: depth = parent_depth_result['depth'] + 1 # 查询最大id并手动递增(因为id字段可能不是auto_increment) cursor.execute("SELECT MAX(id) as max_id FROM textbook_catalog_nodes_copy1") max_id_result = cursor.fetchone() new_id = (max_id_result['max_id'] or 0) + 1 cursor.execute(""" INSERT INTO textbook_catalog_nodes_copy1 (id, textbook_id, parent_id, node_type, title, display_no, depth, sort_order, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, 0, NOW(), NOW()) """, (new_id, textbook_id, parent_id, node_type, title, display_no, depth)) conn.commit() conn.close() return jsonify({'success': True, 'message': '创建成功', 'id': new_id}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/catalog/update/', methods=['POST']) def api_textbook_catalog_update(node_id): """更新目录节点""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() title = (data.get('title') or '').strip() display_no = (data.get('display_no') or '').strip() or None node_type = (data.get('node_type') or '').strip() or None if not title: return jsonify({'success': False, 'error': '节点标题不能为空'}) # 如果更新了节点类型,需要验证是否符合层级关系 if node_type: # 获取当前节点的父节点信息 cursor.execute("SELECT parent_id FROM textbook_catalog_nodes_copy1 WHERE id = %s", (node_id,)) current_node = cursor.fetchone() if current_node and current_node.get('parent_id'): parent_id = current_node['parent_id'] cursor.execute("SELECT node_type FROM textbook_catalog_nodes_copy1 WHERE id = %s", (parent_id,)) parent = cursor.fetchone() if parent: parent_node_type = parent['node_type'] # 验证节点类型层级关系 if parent_node_type == 'chapter' and node_type != 'section': return jsonify({'success': False, 'error': '章节下只能创建小节(section)'}) elif parent_node_type == 'section' and node_type != 'subsection': return jsonify({'success': False, 'error': '小节下只能创建子小节(subsection)'}) elif parent_node_type == 'subsection': return jsonify({'success': False, 'error': '子小节下不能再创建子节点'}) else: # 没有父节点,只能是顶级节点(chapter) if node_type != 'chapter': return jsonify({'success': False, 'error': '顶级节点只能是章节(chapter)'}) cursor.execute(""" UPDATE textbook_catalog_nodes_copy1 SET title = %s, display_no = %s, node_type = COALESCE(%s, node_type), updated_at = NOW() WHERE id = %s """, (title, display_no, node_type, node_id)) conn.commit() conn.close() return jsonify({'success': True, 'message': '更新成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/catalog/delete/', methods=['POST']) def api_textbook_catalog_delete(node_id): """删除目录节点""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 检查是否有子节点 cursor.execute("SELECT COUNT(*) as count FROM textbook_catalog_nodes_copy1 WHERE parent_id = %s", (node_id,)) result = cursor.fetchone() if result['count'] > 0: return jsonify({'success': False, 'error': '该节点下存在子节点,无法删除'}) # 检查是否有关联的知识点 cursor.execute("SELECT COUNT(*) as count FROM textbook_chapter_knowledge_relation_copy1 WHERE catalog_chapter_id = %s AND is_deleted = 0", (node_id,)) result = cursor.fetchone() if result['count'] > 0: return jsonify({'success': False, 'error': '该节点下存在关联的知识点,无法删除'}) cursor.execute("DELETE FROM textbook_catalog_nodes_copy1 WHERE id = %s", (node_id,)) conn.commit() conn.close() return jsonify({'success': True, 'message': '删除成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/catalog/get/', methods=['GET']) def api_textbook_catalog_get(node_id): """获取目录节点详情""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM textbook_catalog_nodes_copy1 WHERE id = %s", (node_id,)) node = cursor.fetchone() conn.close() if not node: return jsonify({'success': False, 'error': '节点不存在'}) return jsonify({'success': True, 'data': node}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/catalog/tree/', methods=['GET']) def api_textbook_catalog_tree(textbook_id): """获取教材的目录树""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 获取所有节点 cursor.execute(""" SELECT * FROM textbook_catalog_nodes_copy1 WHERE textbook_id = %s ORDER BY depth ASC, sort_order ASC, id ASC """, (textbook_id,)) all_nodes = cursor.fetchall() # 构建树形结构 node_dict = {node['id']: node for node in all_nodes} root_nodes = [] for node in all_nodes: node['children'] = [] if node['parent_id'] and node['parent_id'] in node_dict: parent = node_dict[node['parent_id']] parent['children'].append(node) else: root_nodes.append(node) conn.close() return jsonify({'success': True, 'data': root_nodes}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) # ==================== 章节-知识点关联 API ==================== @app.route('/api/textbook/relation/create', methods=['POST']) def api_textbook_relation_create(): """创建章节-知识点关联""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() catalog_chapter_id = data.get('catalog_chapter_id') kp_code = (data.get('kp_code') or '').strip() if not catalog_chapter_id: return jsonify({'success': False, 'error': '章节ID不能为空'}) if not kp_code: return jsonify({'success': False, 'error': '知识点代码不能为空'}) # 检查是否已存在 cursor.execute(""" SELECT id FROM textbook_chapter_knowledge_relation_copy1 WHERE catalog_chapter_id = %s AND kp_code = %s AND is_deleted = 0 """, (catalog_chapter_id, kp_code)) if cursor.fetchone(): return jsonify({'success': False, 'error': '该关联已存在'}) cursor.execute(""" INSERT INTO textbook_chapter_knowledge_relation_copy1 (catalog_chapter_id, kp_code, is_deleted, gmt_create, gmt_modified) VALUES (%s, %s, 0, NOW(), NOW()) """, (catalog_chapter_id, kp_code)) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'success': True, 'message': '创建成功', 'id': new_id}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/relation/delete/', methods=['POST']) def api_textbook_relation_delete(relation_id): """删除章节-知识点关联(软删除)""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" UPDATE textbook_chapter_knowledge_relation_copy1 SET is_deleted = 1, gmt_modified = NOW() WHERE id = %s """, (relation_id,)) conn.commit() conn.close() return jsonify({'success': True, 'message': '删除成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/textbook/relation/list/', methods=['GET']) def api_textbook_relation_list(catalog_chapter_id): """获取章节关联的知识点列表""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT tckr.*, kp.name as kp_name FROM textbook_chapter_knowledge_relation_copy1 tckr LEFT JOIN knowledge_points_copy1 kp ON kp.kp_code = tckr.kp_code WHERE tckr.catalog_chapter_id = %s AND tckr.is_deleted = 0 ORDER BY tckr.id ASC """, (catalog_chapter_id,)) relations = cursor.fetchall() conn.close() return jsonify({'success': True, 'data': relations}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/material_management') def material_management(): """资料管理页面""" return render_template('material_management.html') @app.route('/add_question') def add_question_page(): """显示录入新题目页面""" # 从URL参数获取层级信息 kp_code = request.args.get('kp_code') chapter_id = request.args.get('chapter') chapter_label = request.args.get('chapter_label', '') section_id = request.args.get('section') section_label = request.args.get('section_label', '') subsection_id = request.args.get('subsection') subsection_label = request.args.get('subsection_label', '') # 解码URL编码的标签 if chapter_label: chapter_label = urllib.parse.unquote(chapter_label) if section_label: section_label = urllib.parse.unquote(section_label) if subsection_label: subsection_label = urllib.parse.unquote(subsection_label) return render_template('add_question.html', kp_code=kp_code, chapter_id=chapter_id, chapter_label=chapter_label, section_id=section_id, section_label=section_label, subsection_id=subsection_id, subsection_label=subsection_label) @app.route('/edit/') def edit_page(question_code): try: conn = get_db_connection() except Exception as e: return render_db_error(e) cursor = conn.cursor(dictionary=True) # 使用别名 kp_id AS kp_code 以兼容模板代码 cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE question_code = %s", (question_code,)) question = cursor.fetchone() conn.close() return render_template('edit.html', q=question) @app.route('/update_question', methods=['POST']) def update_question(): data = request.json q_code = data.get('question_code') if not q_code: return jsonify({'success': False, 'error': '缺少 question_code'}), 400 try: conn = get_db_connection() cursor = conn.cursor() # 只更新用户实际提供的字段(如果字段在 data 中存在,就更新) updates = [] params = [] if 'stem' in data: updates.append("stem=%s") params.append(data.get('stem', '')) if 'options' in data: # 如果 options 是空字符串,保存为 NULL # 如果 options 是无效 JSON(如 "Invalid value."),也保存为 NULL,避免数据库报错 options_value = data.get('options') if options_value == '' or (isinstance(options_value, str) and options_value.strip().lower() in ['invalid value.', 'null', 'none']): options_value = None updates.append("options=%s") params.append(options_value) if 'answer' in data: updates.append("answer=%s") params.append(data.get('answer', '')) if 'solution' in data: updates.append("solution=%s") params.append(data.get('solution', '')) if 'question_type' in data: updates.append("question_type=%s") params.append(data.get('question_type', '')) if 'kp_code' in data: # 前端参数名是 kp_code,但数据库字段是 kp_id updates.append("kp_id=%s") params.append(data.get('kp_code')) if 'difficulty' in data: # 处理 difficulty 字段:转换为浮点数,保留两位小数 difficulty_value = data.get('difficulty') if difficulty_value is not None and difficulty_value != '': try: difficulty_value = round(float(difficulty_value), 2) updates.append("difficulty=%s") params.append(difficulty_value) except (ValueError, TypeError): pass # 如果转换失败,跳过该字段 if not updates: conn.close() return jsonify({'success': False, 'error': '没有提供要更新的字段'}), 400 # 添加 updated_at 时间戳 updates.append("updated_at=NOW()") # 添加 WHERE 条件 params.append(q_code) query = f"UPDATE questions_tem SET {', '.join(updates)} WHERE question_code=%s" cursor.execute(query, tuple(params)) conn.commit() conn.close() return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/create_question', methods=['POST']) def create_question(): """创建新题目""" data = request.json try: conn = get_db_connection() cursor = conn.cursor() # 自动生成唯一的 question_code # 格式:Q + 年月日时分秒 + 3位随机数,例如:Q20250101120045123 max_attempts = 10 # 最多尝试10次生成唯一编号 q_code = None for _ in range(max_attempts): timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') random_suffix = random.randint(100, 999) # 3位随机数 q_code = f"Q{timestamp}{random_suffix}" # 检查是否已存在 cursor.execute("SELECT question_code FROM questions_tem WHERE question_code = %s", (q_code,)) if not cursor.fetchone(): break # 找到唯一编号 q_code = None if not q_code: conn.close() return jsonify({'success': False, 'error': '无法生成唯一的题号,请稍后重试'}), 500 # 构建插入字段和值 fields = [] values = [] placeholders = [] # 必填字段:question_code(自动生成) fields.append('question_code') values.append(q_code) placeholders.append('%s') # 可选字段(前端参数 kp_code 保存到数据库的 kp_code 字段) optional_fields = { 'stem': 'stem', 'options': 'options', 'answer': 'answer', 'solution': 'solution', 'question_type': 'question_type', 'kp_code': 'kp_code', # 前端参数名 kp_code,保存到数据库的 kp_code 字段(与查询保持一致) 'textbook_catalog_nodes_id': 'textbook_catalog_nodes_id', 'chapter': 'title_1', # chapter 映射到 title_1 'section': 'title_2', # section 映射到 title_2 'subsection': 'title_3', # subsection 映射到 title_3 'difficulty': 'difficulty', # 难度字段 'create_by': 'create_by', # 创建者字段 'grade': 'grade', # 年级字段:1=小学,2=初中,3=高中 } for key, field_name in optional_fields.items(): if key in data and data[key] is not None: value = data[key] # 处理 options:如果是空字符串,设为 None if key == 'options' and (value == '' or (isinstance(value, str) and value.strip().lower() in ['null', 'none'])): value = None # 处理 textbook_catalog_nodes_id:转换为整数 elif key == 'textbook_catalog_nodes_id' and value: try: value = int(value) except (ValueError, TypeError): value = None # 处理 difficulty:转换为浮点数,保留两位小数 elif key == 'difficulty' and value: try: value = round(float(value), 2) except (ValueError, TypeError): value = None # 处理 grade:转换为整数(1=小学,2=初中,3=高中) elif key == 'grade' and value: try: value = int(value) # 验证年级值是否在有效范围内 if value not in [1, 2, 3]: value = None except (ValueError, TypeError): value = None if value is not None and value != '': fields.append(field_name) values.append(value) placeholders.append('%s') # 验证:所有题目都必须有年级字段 has_grade = 'grade' in fields if not has_grade: conn.close() return jsonify({'success': False, 'error': '所有题目都必须选择年级'}), 400 # 添加时间戳字段 fields.append('created_at') fields.append('updated_at') values.append(datetime.datetime.now()) values.append(datetime.datetime.now()) placeholders.append('%s') placeholders.append('%s') # 执行插入 if len(fields) == 3: # 只有 question_code, created_at, updated_at conn.close() return jsonify({'success': False, 'error': '至少需要填写题号以外的其他字段'}), 400 query = f"INSERT INTO questions_tem ({', '.join(fields)}) VALUES ({', '.join(placeholders)})" cursor.execute(query, tuple(values)) # 获取插入的题目ID question_id = cursor.lastrowid conn.commit() conn.close() return jsonify({'success': True, 'question_code': q_code, 'question_id': question_id}) except Exception as e: try: conn.rollback() except: pass try: conn.close() except: pass return jsonify({'success': False, 'error': str(e)}) @app.route('/api/question_by_id/') def api_question_by_id(question_id): """根据题目ID返回题目详情(用于查重比对)""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 查询题目详情 cursor.execute(""" SELECT id, question_code, stem, kp_code, kp_id, audit_reason, audit_status, difficulty, question_type, answer, solution, options FROM questions_tem WHERE id = %s """, (question_id,)) question = cursor.fetchone() conn.close() if not question: return jsonify({'success': False, 'error': '题目不存在'}), 404 # 处理选项JSON if question.get('options'): try: opt_data = question['options'] if isinstance(opt_data, str): opt_data = opt_data.replace("'", '"') question['options'] = json.loads(opt_data) except: question['options'] = {} return jsonify({'success': True, 'question': question}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/check_duplicate', methods=['POST', 'OPTIONS']) def api_check_duplicate(): """代理查重检测接口,解决CORS问题""" if request.method == 'OPTIONS': # 处理CORS预检请求 response = jsonify({}) response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Methods', 'POST, OPTIONS') response.headers.add('Access-Control-Allow-Headers', 'Content-Type') return response try: # 获取请求数据 data = request.get_json() # 转发请求到查重服务 import urllib.request import urllib.parse url = 'http://47.77.199.85:8888/api/check_duplicate' req_data = json.dumps(data).encode('utf-8') req = urllib.request.Request( url, data=req_data, headers={ 'Content-Type': 'application/json', 'Cookie': request.headers.get('Cookie', 'MATH-LOGIN-AUTH={{authToken}}'), 'MATH-DEBUG': request.headers.get('MATH-DEBUG', '1'), 'MATH-UID': request.headers.get('MATH-UID', '12') } ) with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read().decode('utf-8')) # 返回结果,添加CORS头 flask_response = jsonify(result) flask_response.headers.add('Access-Control-Allow-Origin', '*') return flask_response except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if e.fp else 'Unknown error' return jsonify({'code': -1, 'error': f'查重服务错误: {e.code} - {error_body}'}), e.code except Exception as e: return jsonify({'code': -1, 'error': f'查重检测失败: {str(e)}'}), 500 @app.route('/api/confirm_repeat', methods=['POST', 'OPTIONS']) def api_confirm_repeat(): """代理确认查重结果接口,解决CORS问题""" if request.method == 'OPTIONS': # 处理CORS预检请求 response = jsonify({}) response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Methods', 'POST, OPTIONS') response.headers.add('Access-Control-Allow-Headers', 'Content-Type') return response try: # 获取请求数据 data = request.get_json() # 转发请求到查重服务 import urllib.request url = 'http://47.77.199.85:8888/api/confirm_repeat' req_data = json.dumps(data).encode('utf-8') req = urllib.request.Request( url, data=req_data, headers={ 'Content-Type': 'application/json', 'Cookie': request.headers.get('Cookie', 'MATH-LOGIN-AUTH={{authToken}}'), 'MATH-DEBUG': request.headers.get('MATH-DEBUG', '1'), 'MATH-UID': request.headers.get('MATH-UID', '12') } ) with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read().decode('utf-8')) # 返回结果,添加CORS头 flask_response = jsonify(result) flask_response.headers.add('Access-Control-Allow-Origin', '*') return flask_response except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if e.fp else 'Unknown error' return jsonify({'code': -1, 'error': f'查重服务错误: {e.code} - {error_body}'}), e.code except Exception as e: return jsonify({'code': -1, 'error': f'确认查重结果失败: {str(e)}'}), 500 @app.route('/api/delete_question/', methods=['POST']) def delete_question(question_code): """ 删除题目(需要确认),返回下一题的 question_code(用于跳转) """ try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 先获取题目信息(包括知识点和教材节点,使用 kp_id 字段) cursor.execute("SELECT question_code, kp_id, textbook_catalog_nodes_id FROM questions_tem WHERE question_code = %s", (question_code,)) row = cursor.fetchone() if not row: conn.close() return jsonify({'success': False, 'error': '题目不存在'}), 404 kp_code = row.get('kp_id') # 数据库字段已改为 kp_id textbook_node_id = row.get('textbook_catalog_nodes_id') # 获取同分类下的所有题目(按 question_code 降序排列) all_codes = [] if kp_code: # 有知识点:在同知识点内查找 cursor.execute("SELECT question_code FROM questions_tem WHERE kp_id = %s ORDER BY question_code DESC", (kp_code,)) all_codes = [r['question_code'] for r in cursor.fetchall()] elif textbook_node_id: # 有教材节点:在同教材节点内查找 cursor.execute( "SELECT question_code FROM questions_tem WHERE textbook_catalog_nodes_id = %s ORDER BY question_code DESC", (textbook_node_id,) ) all_codes = [r['question_code'] for r in cursor.fetchall()] # 找到当前题目在列表中的位置 try: curr_idx = all_codes.index(question_code) except ValueError: curr_idx = -1 # 删除题目 cursor.execute("DELETE FROM questions_tem WHERE question_code = %s", (question_code,)) conn.commit() # 确定跳转目标:优先下一题,没有则上一题,都没有则返回 None(前端跳转列表) next_code = None if curr_idx >= 0 and curr_idx < len(all_codes) - 1: # 有下一题 next_code = all_codes[curr_idx + 1] elif curr_idx > 0: # 没有下一题,但有上一题 next_code = all_codes[curr_idx - 1] conn.close() return jsonify({ 'success': True, 'message': '题目已删除', 'next_code': next_code, 'kp_code': kp_code, 'node_id': str(textbook_node_id) if textbook_node_id else None }) except Exception as e: try: conn.rollback() except Exception: pass return jsonify({'success': False, 'error': str(e)}), 500 finally: try: conn.close() except Exception: pass # 知识点管理 API(knowledge_points_copy1 表) @app.route('/api/kp/create', methods=['POST']) def api_kp_create(): """创建知识点""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() kp_code = (data.get('kp_code') or '').strip() name = (data.get('name') or '').strip() subject = (data.get('subject') or '').strip() or None grade = (data.get('grade') or '').strip() or None parent_kp_code = (data.get('parent_kp_code') or '').strip() or None if not kp_code or not name: return jsonify({'success': False, 'error': '知识点代码和名称不能为空'}) # 检查kp_code是否已存在 cursor.execute("SELECT id FROM knowledge_points_copy1 WHERE kp_code = %s", (kp_code,)) if cursor.fetchone(): return jsonify({'success': False, 'error': '知识点代码已存在'}) # 查询最大id并手动递增(确保id正确自增) cursor.execute("SELECT MAX(id) as max_id FROM knowledge_points_copy1") max_id_result = cursor.fetchone() new_id = (max_id_result['max_id'] or 0) + 1 # 插入新知识点 cursor.execute(""" INSERT INTO knowledge_points_copy1 (id, kp_code, name, subject, grade, parent_kp_code, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW()) """, (new_id, kp_code, name, subject, grade, parent_kp_code)) conn.commit() conn.close() return jsonify({'success': True, 'message': '创建成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/kp/update/', methods=['POST']) def api_kp_update(kp_id): """更新知识点""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) data = request.get_json() name = (data.get('name') or '').strip() subject = (data.get('subject') or '').strip() or None grade = (data.get('grade') or '').strip() or None parent_kp_code = (data.get('parent_kp_code') or '').strip() or None if not name: return jsonify({'success': False, 'error': '知识点名称不能为空'}) # 更新知识点 cursor.execute(""" UPDATE knowledge_points_copy1 SET name = %s, subject = %s, grade = %s, parent_kp_code = %s, updated_at = NOW() WHERE id = %s """, (name, subject, grade, parent_kp_code, kp_id)) conn.commit() conn.close() return jsonify({'success': True, 'message': '更新成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/kp/delete/', methods=['POST']) def api_kp_delete(kp_id): """删除知识点""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 检查是否有子知识点 cursor.execute("SELECT id FROM knowledge_points_copy1 WHERE parent_kp_code = (SELECT kp_code FROM knowledge_points_copy1 WHERE id = %s)", (kp_id,)) if cursor.fetchone(): return jsonify({'success': False, 'error': '该知识点下存在子知识点,无法删除'}) # 删除知识点 cursor.execute("DELETE FROM knowledge_points_copy1 WHERE id = %s", (kp_id,)) conn.commit() conn.close() return jsonify({'success': True, 'message': '删除成功'}) except Exception as e: if conn: conn.rollback() conn.close() return jsonify({'success': False, 'error': str(e)}) @app.route('/api/kp/get/', methods=['GET']) def api_kp_get(kp_id): """获取单个知识点详情""" try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT * FROM knowledge_points_copy1 WHERE id = %s """, (kp_id,)) kp = cursor.fetchone() conn.close() if not kp: return jsonify({'success': False, 'error': '知识点不存在'}) # 处理JSON字段 for field in ['prerequisite_kp_codes', 'dependent_kp_codes', 'related_kp_codes', 'stats', 'skills', 'direct_score', 'related_score']: if kp[field] and isinstance(kp[field], str): try: kp[field] = json.loads(kp[field]) except: kp[field] = None return jsonify({'success': True, 'data': kp}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) if __name__ == '__main__': # 自动创建缺失列 try: conn = get_db_connection() cursor = conn.cursor() for field, ftype in [('audit_status', 'TINYINT DEFAULT 0'), ('audit_reason', 'TEXT')]: cursor.execute(f"SHOW COLUMNS FROM questions_tem LIKE '{field}'") if not cursor.fetchone(): cursor.execute(f"ALTER TABLE questions_tem ADD COLUMN {field} {ftype}") conn.commit() conn.close() except: pass # Ensure static directory exists and copy cat images (if exists in root directory) try: static_dir = resource_path("static") if not os.path.exists(static_dir): os.makedirs(static_dir, exist_ok=True) import shutil # Prefer PNG format, fallback to JPG if PNG doesn't exist (for backward compatibility) cat_img_src_png = os.path.join(BASE_DIR, "cat.png") cat_img_dst_png = os.path.join(static_dir, "cat.png") cat_img_src_jpg = os.path.join(BASE_DIR, "cat.jpg") cat_img_dst_jpg = os.path.join(static_dir, "cat.jpg") if os.path.exists(cat_img_src_png) and not os.path.exists(cat_img_dst_png): shutil.copy2(cat_img_src_png, cat_img_dst_png) elif os.path.exists(cat_img_src_jpg) and not os.path.exists(cat_img_dst_jpg): shutil.copy2(cat_img_src_jpg, cat_img_dst_jpg) except Exception: pass # 如果复制失败,不影响启动 app.run(host="0.0.0.0", port=5000, debug=True)