||
- import os
- import sys
- import json
- import re
- import tempfile
- import urllib.request
- import urllib.error
- import urllib.parse
- import mysql.connector
- import datetime
- import random
- import base64
- import socket
- from typing import Optional, List
- from flask import Flask, render_template, request, jsonify, url_for, send_file, redirect
- from html.parser import HTMLParser
- from urllib.parse import urljoin, urlparse
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- def _load_dotenv_if_exists():
- """
- 允许把数据库/AI 等配置放在同目录 config.env 里(不改代码即可生效)。
- 重要:需要在读取 os.getenv(...) 之前执行。
- """
- try:
- from dotenv import load_dotenv # type: ignore
- except Exception:
- return
- # 兼容源码运行 / PyInstaller 打包运行
- candidates = [
- os.path.join(BASE_DIR, "config.env"),
- os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "config.env"),
- ]
- for p in candidates:
- if os.path.exists(p):
- load_dotenv(p, override=True)
- break
- _load_dotenv_if_exists()
- def resource_path(*parts: str) -> str:
- """
- 资源路径兼容:
- - 源码运行:基于当前文件目录
- - PyInstaller 打包:基于 _MEIPASS
- """
- base = getattr(sys, "_MEIPASS", BASE_DIR)
- return os.path.join(base, *parts)
- app = Flask(
- __name__,
- template_folder=resource_path("templates"),
- static_folder=resource_path("static"),
- )
- # 可能的主键列名(不同库/表可能不一致)
- _PK_CANDIDATE_COLS = ["id", "question_id", "pk_id", "qid"]
- _PK_COLS_CACHE: Optional[List[str]] = None
- # 数据库配置
- # 默认值:按你刚刚给的 JDBC 配置来(也支持用环境变量覆盖)
- DB_CONFIG = {
- "host": os.getenv("DB_HOST", "rm-f8ze60yirdj8786u2wo.mysql.rds.aliyuncs.com"),
- "port": int(os.getenv("DB_PORT", "3306")),
- "database": os.getenv("DB_DATABASE", "math-conten-online2"),
- "user": os.getenv("DB_USERNAME", "root"),
- "password": os.getenv("DB_PASSWORD", "csqz@20255"),
- "charset": "utf8mb4",
- }
- # 远程 PDF 生成接口(你提供的)
- PDF_API_URL = os.getenv("PDF_API_URL", "https://teaching-content.chunsunqiuzhu.com/api/questions/pdf")
- PDF_STUDENT_ID = os.getenv("PDF_STUDENT_ID", "44") # 默认写死 44(也支持用环境变量覆盖)
- # AI 优化题干(通过环境变量配置,避免把 key 写进代码)
- AI_API_KEY = os.getenv("AI_API_KEY", "").strip()
- AI_BASE_URL = (os.getenv("AI_BASE_URL") or "").strip() # 为空则使用官方默认
- AI_MODEL_NAME = (os.getenv("AI_MODEL_NAME") or "gpt-5.2").strip()
- # 难度评分配置(使用相同的 AI 配置)
- DIFFICULTY_SCORING_API_KEY = os.getenv("DIFFICULTY_SCORING_API_KEY", AI_API_KEY).strip()
- DIFFICULTY_SCORING_MODEL = os.getenv("DIFFICULTY_SCORING_MODEL", AI_MODEL_NAME).strip()
- DIFFICULTY_SCORING_TEMPERATURE = float(os.getenv("DIFFICULTY_SCORING_TEMPERATURE", "0.0"))
- DIFFICULTY_SCORING_TOP_P = float(os.getenv("DIFFICULTY_SCORING_TOP_P", "0.3"))
- DIFFICULTY_SCORING_PRESENCE_PENALTY = float(os.getenv("DIFFICULTY_SCORING_PRESENCE_PENALTY", "0.0"))
- DIFFICULTY_SCORING_FREQUENCY_PENALTY = float(os.getenv("DIFFICULTY_SCORING_FREQUENCY_PENALTY", "0.0"))
- # 默认难度评分提示词
- DIFFICULTY_SCORING_PROMPT = """你是一名数学题库难度建模专家。你的任务不是解题,而是基于题目文本、公式及图像信息,对题目难度进行结构化量化评估,用于企业级题库难度算法。本评估包含四个评分维度,四个维度等权重参与最终得分,但在判断时需遵循重要性顺序:推理与运算步数最高,知识点数量第二,抽象与构造要求第三,题型常规性最低。每个维度的取值只能是 0.33、0.66 或 1.00,不允许出现其他数值。(1)推理与运算步数:直接或少量步骤取 0.33,中等连续步骤取 0.66,多步链式推理或存在中间结论取 1.00;(2)知识点数量:单一核心知识点取 0.33,两个知识点取 0.66,三个及以上或跨模块综合取 1.00;(3)抽象与构造要求:无需抽象或构造取 0.33,需要一定抽象或简单构造取 0.66,需要明显构造或较高层次建模取 1.00;(4)题型常规性:高度常规模板题取 0.33,存在一定变式取 0.66,非典型或创新题型取 1.00。四个维度得分相加后需归一化为 0~1 的 total_score,并按以下规则映射难度等级:0~0.25 为"筑基",0.25~0.5 为"提分",0.5 以上为"培优"。请仅以严格 JSON 格式输出 dimension_scores、total_score 和 difficulty_level,不得输出其他字段,不得展开解题过程。"""
- AI_STEM_PROMPT_DEFAULT = r"""你是“题干格式修复器”,不是解题老师。
- 你的任务:把输入题干(stem)做最小幅度的格式清洗,使其更适合 PDF / 系统展示。
- 【核心原则:原文已兼容就不要改】(最重要,违反此条视为失败)
- - 如果原文已经是 PDF 兼容格式(例如:数字、单位、上标³、普通文本混合),就不要做任何改动。
- - **严禁添加**:反斜杠 `\`、逗号 `,`、LaTeX 命令(如 `\,`、`\n`、`^3` 等)、换行符 `\n` 等原本不存在的内容。
- - **严禁改动**:不要把上标³改成 `^3`,不要把普通文本改成 LaTeX 格式,不要把 Unicode 上标改成 LaTeX 上标。
- - **严禁添加换行**:不要添加 `\n` 或任何换行符,保持原文的换行格式不变。如果原文是连续文本,输出也必须是连续文本。
- 【严禁解题/严禁新增内容】(非常重要)
- - 不要解题:不要推导、不要解释、不要补充答案、不要给出步骤、不要增加任何原文中不存在的内容。
- - 不要把题干改写成解析/答案。
- 【必须保留】(非常重要)
- 1) 保留原题干的结构与排版:不调整顺序、不合并/拆分段落、不新增/删除句子。
- 2) 保留所有 HTML 标签与结构原样不动(例如 <p>、<br>、<span> 等)。
- 3) 题干中出现的所有 <svg>...</svg> 片段必须在输出中**逐字复制**(字符、空格、换行、属性顺序、大小写都必须完全一致),不得省略、重排、格式化。
- 4) **保留上标格式**:如果原文是 `dm³`、`cm³`、`m²` 等 Unicode 上标,必须保持原样,严禁改成 `dm^3`、`cm^3`、`m^2` 等 LaTeX 格式。
- 5) **保留单位格式**:如果原文是 `2.5 dm³` 这种格式(数字+空格+单位),保持原样,不要添加 `\,`、`\n` 等 LaTeX 命令。
- 【允许的唯一改动】(只允许做下面这些替换;除此之外任何字符都不要改)
- 6) 公式包裹:对行内公式 `$...$`,仅去掉外层 `$`,并保留内部内容的原顺序与原字符(不要删改里面的数字/变量/符号)。
- 7) 符号替换(只替换命中的这些 LaTeX 命令,其它所有内容保持原样):
- - \triangle → △
- - \text{cm} → cm
- - \text{cm²} → cm²
- - \neq → !=
- - \ge → ≥
- - \le → ≤
- - \Rightarrow => =>
- - \sqrt → sqrt
- - \perp → ⟂
- 8) 填空线:把所有“连续下划线填空线”(长度不固定)统一成:________(8 个下划线)。
- 9) **问号填空**:把题干中的问号 `?`(用于表示填空)替换成:________(8 个下划线)。注意:不要删除问号,要替换成填空线。
- 【输出格式】
- 10) 输出必须是一段“可直接写回 stem 字段”的文本/HTML。
- 11) 只输出结果,不要加任何解释、标题、Markdown 标记(不要使用 **、列表编号等)。
- 下面是题干原文,请按规则输出清洗后的题干:
- """
- _env_prompt = os.getenv("AI_STEM_PROMPT")
- # 注意:如果环境变量存在但为空(例如 AI_STEM_PROMPT=),os.getenv 会返回空字符串,
- # 这会把默认提示词覆盖掉,导致 AI 没有收到规则。这里显式做“空值回退默认”。
- AI_STEM_PROMPT = (_env_prompt.strip() if isinstance(_env_prompt, str) else "").strip() or AI_STEM_PROMPT_DEFAULT
- _RE_SVG_BLOCK = re.compile(r"<svg\b[\s\S]*?</svg>", re.IGNORECASE)
- def _env_bool(name: str, default: bool) -> bool:
- val = os.getenv(name)
- if val is None:
- return default
- return str(val).strip().lower() in {"1", "true", "yes", "y", "on"}
- def get_db_connection():
- """
- use_pure=True:避免 C 扩展在部分环境抛出 “RuntimeError: Failed raising error.”
- connection_timeout:避免卡死
- ssl:默认按 JDBC 里的 useSSL=true 走;如果你本地/网络环境不支持,也可以用环境变量关闭
- DB_USE_SSL=false
- """
- use_ssl = _env_bool("DB_USE_SSL", True)
- try:
- return mysql.connector.connect(
- **DB_CONFIG,
- use_pure=True,
- connection_timeout=5,
- ssl_disabled=(not use_ssl),
- )
- except Exception:
- # 兜底:有些环境 SSL 握手会失败,尝试降级不启用 SSL,让你至少能先用起来
- if use_ssl:
- return mysql.connector.connect(
- **DB_CONFIG,
- use_pure=True,
- connection_timeout=5,
- ssl_disabled=True,
- )
- raise
- def render_db_error(e: Exception):
- return render_template(
- "db_error.html",
- error=str(e),
- db_host=DB_CONFIG.get("host"),
- db_port=DB_CONFIG.get("port"),
- db_name=DB_CONFIG.get("database"),
- db_user=DB_CONFIG.get("user"),
- ), 500
- def get_pk_columns(conn) -> List[str]:
- """
- 获取 questions_tem 表里"可能的主键列名"中实际存在的列。
- 只要命中一个就能用主键 ID 搜索跳转。
- """
- global _PK_COLS_CACHE
- if _PK_COLS_CACHE is not None:
- return _PK_COLS_CACHE
- cols: List[str] = []
- try:
- cursor = conn.cursor()
- for c in _PK_CANDIDATE_COLS:
- cursor.execute("SHOW COLUMNS FROM questions_tem LIKE %s", (c,))
- if cursor.fetchone():
- cols.append(c)
- except Exception:
- cols = []
- _PK_COLS_CACHE = cols
- return cols
- def get_question_pk(question: dict) -> Optional[str]:
- """
- 从题目记录里提取“题目主键ID”(用于远程导出 PDF 接口的 question_ids)。
- 兼容不同库的列名:id / question_id / pk_id / qid
- """
- for c in _PK_CANDIDATE_COLS:
- v = question.get(c)
- if v is None:
- continue
- s = str(v).strip()
- if s:
- return s
- return None
- def request_remote_pdf_url(question_id: str, include_grading: bool = True) -> List[str]:
- """
- 调用你给的接口生成 PDF,返回 pdf_url 列表(可能是一个或多个)。
- 返回结构示例:
- {"success": true, "data": {"pdf_url": "https://...pdf"}} # 单个
- 或
- {"success": true, "data": {"pdf_url": ["https://...pdf1", "https://...pdf2"]}} # 多个
- """
- payload = {
- "question_ids": [str(question_id)],
- "student_id": str(PDF_STUDENT_ID),
- "include_grading": include_grading
- }
- data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
- req = urllib.request.Request(
- PDF_API_URL,
- data=data,
- headers={"Content-Type": "application/json"},
- method="POST",
- )
- try:
- with urllib.request.urlopen(req, timeout=20) as resp:
- raw = resp.read().decode("utf-8", errors="replace")
- except urllib.error.HTTPError as e:
- detail = ""
- try:
- detail = e.read().decode("utf-8", errors="replace")
- except Exception:
- detail = ""
- raise RuntimeError(f"PDF 接口 HTTP 错误:{e.code} {e.reason}. {detail}".strip())
- except Exception as e:
- raise RuntimeError(f"调用 PDF 接口失败:{e}")
- try:
- obj = json.loads(raw)
- except Exception:
- raise RuntimeError(f"PDF 接口返回不是合法 JSON:{raw[:300]}")
- if not isinstance(obj, dict) or not obj.get("success"):
- msg = obj.get("message") if isinstance(obj, dict) else ""
- raise RuntimeError(f"PDF 生成失败:{msg or raw[:300]}")
- data_obj = obj.get("data") if isinstance(obj, dict) else None
- if not isinstance(data_obj, dict):
- raise RuntimeError(f"PDF 接口返回的 data 不是对象:{raw[:300]}")
-
- # 接口返回格式:{"pdf_url": "...", "grading_pdf_url": "..."}
- # 只打开 grading_pdf_url(评分PDF),不打开 pdf_url(题目PDF)
- grading_pdf_url = data_obj.get("grading_pdf_url")
- if not grading_pdf_url or not isinstance(grading_pdf_url, str) or not grading_pdf_url.strip():
- raise RuntimeError(f"PDF 接口未返回有效的 grading_pdf_url:{raw[:300]}")
-
- return [str(grading_pdf_url).strip()]
- def _extract_svg_blocks(text: str) -> List[str]:
- if not text:
- return []
- return _RE_SVG_BLOCK.findall(text)
- def _svg_blocks_equal(a: str, b: str) -> bool:
- """
- 严格校验:新旧题干中的 <svg>...</svg> 片段列表必须完全一致(数量、顺序、内容都一致)。
- """
- return _extract_svg_blocks(a) == _extract_svg_blocks(b)
- def call_ai_optimize_stem(stem_html: str) -> str:
- """
- 调用 AI,把 stem 原文优化成“安全版 stem”,返回一段可直接写入 stem 的文本/HTML。
- 注意:AI 返回的是一段文本,我们直接当作新的 stem 处理。
- """
- if not AI_API_KEY:
- raise RuntimeError("缺少 AI_API_KEY:请在 config.env 里配置 AI_API_KEY")
- base = AI_BASE_URL.rstrip("/")
- if not base:
- base = "https://api.openai.com/v1"
- url = f"{base}/chat/completions"
- # 把“规则”放 system,把题干原文放 user
- payload = {
- "model": AI_MODEL_NAME,
- "messages": [
- {"role": "system", "content": AI_STEM_PROMPT},
- {"role": "user", "content": stem_html or ""},
- ],
- # 关键:不设置时,部分兼容实现会默认只生成很短的内容(看起来像“没读 prompt”)。
- # 为了让模型能完整输出“带 SVG 的题干”,这里给足 token 预算。
- # 注意:部分 API 端点只支持 max_completion_tokens,不支持 max_tokens,所以只传前者。
- "max_completion_tokens": int(os.getenv("AI_MAX_TOKENS", "4096")),
- "temperature": float(os.getenv("AI_TEMPERATURE", "0.0")),
- }
- data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
- req = urllib.request.Request(
- url,
- data=data,
- headers={
- "Content-Type": "application/json",
- "Authorization": f"Bearer {AI_API_KEY}",
- },
- method="POST",
- )
- try:
- with urllib.request.urlopen(req, timeout=60) as resp:
- raw = resp.read().decode("utf-8", errors="replace")
- except urllib.error.HTTPError as e:
- detail = ""
- try:
- detail = e.read().decode("utf-8", errors="replace")
- except Exception:
- detail = ""
- raise RuntimeError(f"AI 接口 HTTP 错误:{e.code} {e.reason}. {detail}".strip())
- except Exception as e:
- raise RuntimeError(f"调用 AI 接口失败:{e}")
- try:
- obj = json.loads(raw)
- except Exception:
- raise RuntimeError(f"AI 接口返回不是合法 JSON:{raw[:300]}")
- try:
- content = obj["choices"][0]["message"]["content"]
- except Exception:
- raise RuntimeError(f"AI 接口返回结构不符合预期:{raw[:300]}")
- s = str(content or "").strip()
- if not s:
- raise RuntimeError("AI 返回为空")
- # 后处理:清理 AI 误加的无意义换行符 `\n`
- # 规则:只清理那些在连续文本中间的单个 `\n`(前后都是字母/数字/中文,没有空格或标点)
- # 保留 HTML 标签内的换行(如 <svg> 内的换行)和原本有意义的段落换行
- # 匹配:字母/数字/中文 + \n + 字母/数字/中文(这种通常是误加的)
- s = re.sub(r"([\w\u4e00-\u9fff])\n([\w\u4e00-\u9fff])", r"\1 \2", s)
- # 清理多余的连续空格(避免替换后产生多个空格)
- s = re.sub(r" +", " ", s)
- # 保护:如果输入很长但输出极短,通常是 token 上限/模型拒答/接口兼容问题。
- # 直接抛错,避免用户误以为“优化成功”。
- if len((stem_html or "").strip()) >= 200 and len(s) <= 40:
- raise RuntimeError(
- "AI 返回内容过短(疑似被 max_tokens 截断或模型未按要求输出)。请重试;"
- "如仍复现,可把 AI_MODEL_NAME / AI_BASE_URL 发我进一步定位。"
- )
- return s
- # 加载知识点映射和层级结构
- # ==================== 难度评分相关函数 ====================
- class ImageExtractor(HTMLParser):
- """从HTML中提取图片URL"""
- def __init__(self):
- super().__init__()
- self.image_urls = []
-
- def handle_starttag(self, tag, attrs):
- if tag == 'img':
- for attr in attrs:
- if attr[0] in ['src', 'data-src']:
- url = attr[1]
- if url:
- self.image_urls.append(url)
- def extract_images_from_html(html_content, base_url=None):
- """从HTML内容中提取所有图片URL"""
- parser = ImageExtractor()
- parser.feed(html_content)
-
- image_urls = parser.image_urls
-
- # 如果是相对路径,转换为绝对路径
- if base_url and image_urls:
- image_urls = [urljoin(base_url, url) if not urlparse(url).netloc else url
- for url in image_urls]
-
- return image_urls
- def extract_text_from_html(html_content):
- """从HTML中提取纯文本(去除标签)"""
- # 简单的HTML标签去除
- text = re.sub(r'<[^>]+>', '', html_content)
- # 去除多余的空白字符
- text = re.sub(r'\s+', ' ', text).strip()
- return text
- def parse_json_response(text):
- """从AI响应中提取JSON"""
- # 尝试直接解析
- try:
- return json.loads(text)
- except:
- pass
-
- # 尝试提取JSON部分(如果响应包含其他文本)
- json_match = re.search(r'\{[\s\S]*\}', text)
- if json_match:
- try:
- return json.loads(json_match.group())
- except:
- pass
-
- # 如果都失败,返回None
- return None
- def get_openai_client():
- """获取OpenAI客户端"""
- try:
- from openai import OpenAI
- api_key = DIFFICULTY_SCORING_API_KEY or AI_API_KEY
- if not api_key:
- return None
-
- base_url = AI_BASE_URL.rstrip("/") if AI_BASE_URL else None
- if base_url:
- return OpenAI(api_key=api_key, base_url=base_url)
- else:
- return OpenAI(api_key=api_key)
- except ImportError:
- return None
- except Exception as e:
- print(f"警告: OpenAI客户端初始化失败: {e}")
- return None
- def load_kp_structure():
- """
- Load knowledge point structure from database
- Returns:
- - kp_map: {id: label} mapping
- - kp_hierarchy: hierarchical structure list, each element contains chapter, section, subsection info
- """
- # Load from database instead of JSON file
- _, kp_tree = load_kp_structure_from_db()
-
- kp_map = {}
- kp_hierarchy = []
-
- def process_node(node, parent_chapter=None, parent_section=None):
- """Process node recursively, only keep three levels: chapter, section, subsection"""
- if not isinstance(node, dict):
- return
-
- node_id = node.get("id")
- node_label = node.get("label", "")
- kp_level = node.get("kp_level", "")
-
- # Record mapping for all nodes
- if node_id and node_label:
- kp_map[str(node_id)] = node_label
-
- # Only process three levels
- if kp_level == "chapter":
- # Chapter level
- chapter_info = {
- "id": node_id,
- "label": node_label,
- "sections": []
- }
- kp_hierarchy.append(chapter_info)
-
- # Process child nodes (sections)
- children = node.get("children", [])
- for child in children:
- process_node(child, parent_chapter=chapter_info, parent_section=None)
-
- elif kp_level == "section":
- # Section level
- if parent_chapter:
- section_info = {
- "id": node_id,
- "label": node_label,
- "subsections": []
- }
- parent_chapter["sections"].append(section_info)
-
- # Process child nodes (subsections)
- children = node.get("children", [])
- for child in children:
- process_node(child, parent_chapter=parent_chapter, parent_section=section_info)
-
- elif kp_level == "subsection":
- # Subsection level
- if parent_section:
- subsection_info = {
- "id": node_id,
- "label": node_label
- }
- parent_section["subsections"].append(subsection_info)
-
- # If node has children, continue recursive processing (but only process matching levels)
- children = node.get("children", [])
- if children and kp_level not in ["chapter", "section", "subsection"]:
- # For other levels (like lesson), continue searching down
- for child in children:
- process_node(child, parent_chapter=parent_chapter, parent_section=parent_section)
-
- # Process root nodes
- if isinstance(kp_tree, dict):
- children = kp_tree.get("children", [])
- for child in children:
- process_node(child)
- elif isinstance(kp_tree, list):
- for item in kp_tree:
- process_node(item)
-
- return kp_map, kp_hierarchy
- def load_kp_structure_from_db():
- """
- 从MySQL数据库加载知识点结构,构建树形结构
- 返回:
- - kp_map: {kp_code: name} 映射
- - kp_tree: 树形结构列表,每个节点包含 children 列表和 question_count
- """
- kp_map = {}
- kp_tree = []
-
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 查询所有知识点及其题目数量
- cursor.execute("""
- SELECT
- kp.kp_code,
- kp.name,
- kp.parent_kp_code,
- kp.grade,
- COUNT(q.question_code) as question_count
- FROM knowledge_points_copy1 kp
- LEFT JOIN questions_tem q ON q.kp_code = kp.kp_code
- GROUP BY kp.kp_code, kp.name, kp.parent_kp_code, kp.grade
- ORDER BY kp.kp_code
- """)
- all_kps = cursor.fetchall()
-
- # 构建映射
- for kp in all_kps:
- kp_map[kp['kp_code']] = kp['name']
-
- # 构建节点字典
- nodes = {}
- for kp in all_kps:
- question_count = int(kp.get('question_count', 0) or 0)
- nodes[kp['kp_code']] = {
- 'kp_code': kp['kp_code'],
- 'name': kp['name'],
- 'parent_kp_code': kp['parent_kp_code'],
- 'grade': kp['grade'],
- 'question_count': question_count,
- 'children': []
- }
-
- # 构建树形结构
- for kp_code, node in nodes.items():
- parent_code = node['parent_kp_code']
- if parent_code and parent_code in nodes:
- # 有父节点,添加到父节点的children中
- nodes[parent_code]['children'].append(node)
- else:
- # 没有父节点,是根节点
- kp_tree.append(node)
-
- # 递归计算父节点的题目数量(包括子节点)
- def calculate_total_count(node):
- total = node.get('question_count', 0)
- for child in node['children']:
- total += calculate_total_count(child)
- node['total_question_count'] = total
- return total
-
- # 对每个节点的children按kp_code排序,并计算总题目数
- def sort_children(node):
- node['children'].sort(key=lambda x: x['kp_code'])
- for child in node['children']:
- sort_children(child)
- # 计算总题目数(包括子节点)
- calculate_total_count(node)
-
- for root in kp_tree:
- sort_children(root)
-
- conn.close()
-
- except Exception as e:
- print(f"Error loading knowledge points from database: {e}")
- import traceback
- traceback.print_exc()
-
- return kp_map, kp_tree
- def load_kp_structure_with_pending_count():
- """
- 从MySQL数据库加载知识点结构,构建树形结构,只包含有未审核题目的节点
- 返回:
- - kp_map: {kp_code: name} 映射
- - kp_tree: 树形结构列表,每个节点包含 children 列表和 pending_count(未审核题目数量)
- - other_questions_count: 其他题目(未关联知识点)的未审核题目数量
- """
- kp_map = {}
- kp_tree = []
- other_questions_count = 0
-
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 查询所有知识点及其未审核题目数量(只查询有未审核题目的知识点)
- cursor.execute("""
- SELECT
- kp.kp_code,
- kp.name,
- kp.parent_kp_code,
- kp.grade,
- COUNT(q.question_code) as pending_count
- FROM knowledge_points_copy1 kp
- INNER JOIN questions_tem q ON q.kp_code = kp.kp_code
- WHERE (q.audit_reason IS NULL OR q.audit_reason = '')
- GROUP BY kp.kp_code, kp.name, kp.parent_kp_code, kp.grade
- HAVING pending_count > 0
- ORDER BY kp.kp_code
- """)
- all_kps = cursor.fetchall()
-
- # 查询其他题目(未关联知识点)的未审核题目数量
- cursor.execute("""
- SELECT COUNT(*) as count
- FROM questions_tem
- WHERE (kp_code IS NULL OR kp_code = '')
- AND (audit_reason IS NULL OR audit_reason = '')
- """)
- other_result = cursor.fetchone()
- other_questions_count = int(other_result['count'] or 0) if other_result else 0
-
- # 构建映射
- for kp in all_kps:
- kp_map[kp['kp_code']] = kp['name']
-
- # 构建节点字典
- nodes = {}
- for kp in all_kps:
- pending_count = int(kp.get('pending_count', 0) or 0)
- nodes[kp['kp_code']] = {
- 'kp_code': kp['kp_code'],
- 'name': kp['name'],
- 'parent_kp_code': kp['parent_kp_code'],
- 'grade': kp['grade'],
- 'pending_count': pending_count,
- 'children': []
- }
-
- # 构建树形结构(只包含有未审核题目的节点)
- for kp_code, node in nodes.items():
- parent_code = node['parent_kp_code']
- if parent_code and parent_code in nodes:
- # 有父节点,添加到父节点的children中
- nodes[parent_code]['children'].append(node)
- else:
- # 没有父节点,是根节点
- kp_tree.append(node)
-
- # 递归计算父节点的未审核题目数量(包括子节点)
- def calculate_total_pending_count(node):
- total = node.get('pending_count', 0)
- for child in node['children']:
- total += calculate_total_pending_count(child)
- node['total_pending_count'] = total
- return total
-
- # 对每个节点的children按kp_code排序,并计算总未审核题目数
- def sort_children(node):
- node['children'].sort(key=lambda x: x['kp_code'])
- for child in node['children']:
- sort_children(child)
- # 计算总未审核题目数(包括子节点)
- calculate_total_pending_count(node)
-
- for root in kp_tree:
- sort_children(root)
-
- conn.close()
-
- except Exception as e:
- print(f"Error loading knowledge points with pending count from database: {e}")
- import traceback
- traceback.print_exc()
-
- return kp_map, kp_tree, other_questions_count
- KP_MAP, KP_HIERARCHY = load_kp_structure()
- def find_kp_hierarchy(kp_id):
- """
- 根据知识点ID查找其层级信息(chapter, section, subsection)
- 返回: {
- 'chapter': {'id': xxx, 'label': 'xxx'},
- 'section': {'id': xxx, 'label': 'xxx'} 或 None,
- 'subsection': {'id': xxx, 'label': 'xxx'} 或 None
- }
- """
- kp_id_str = str(kp_id)
- result = {
- 'chapter': None,
- 'section': None,
- 'subsection': None
- }
-
- for chapter in KP_HIERARCHY:
- # 检查是否是chapter本身
- if str(chapter["id"]) == kp_id_str:
- result['chapter'] = {'id': chapter["id"], 'label': chapter["label"]}
- return result
-
- # 检查sections
- for section in chapter.get("sections", []):
- # 检查是否是section本身
- if str(section["id"]) == kp_id_str:
- result['chapter'] = {'id': chapter["id"], 'label': chapter["label"]}
- result['section'] = {'id': section["id"], 'label': section["label"]}
- return result
-
- # 检查subsections
- for subsection in section.get("subsections", []):
- if str(subsection["id"]) == kp_id_str:
- result['chapter'] = {'id': chapter["id"], 'label': chapter["label"]}
- result['section'] = {'id': section["id"], 'label': section["label"]}
- result['subsection'] = {'id': subsection["id"], 'label': subsection["label"]}
- return result
-
- return result
- @app.route('/question_management')
- def question_management():
- """题目管理页面:显示知识点目录和题目列表"""
- # 从数据库加载知识点树形结构
- _, kp_tree = load_kp_structure_from_db()
-
- # 查询"其他题目"(未关联kp_code的题目)数量
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- cursor.execute("""
- SELECT COUNT(*) as count
- FROM questions_tem
- WHERE (kp_code IS NULL OR kp_code = '')
- """)
- other_questions_count = cursor.fetchone()['count'] if cursor.rowcount > 0 else 0
- conn.close()
- except Exception as e:
- print(f"查询其他题目数量失败: {e}")
- other_questions_count = 0
-
- return render_template('question_management.html',
- kp_tree=kp_tree,
- is_audit_mode=False,
- other_questions_count=other_questions_count)
- @app.route('/api/questions_by_kp/<kp_code>')
- def api_questions_by_kp(kp_code):
- """根据知识点代码返回题目列表(JSON格式)"""
- return _api_questions_by_kp(kp_code, audit_only=False)
- @app.route('/api/pending_questions_by_kp/<kp_code>')
- def api_pending_questions_by_kp(kp_code):
- """根据知识点代码返回未审核题目列表(JSON格式,用于审核页面)"""
- return _api_questions_by_kp(kp_code, audit_only=True)
- def _api_questions_by_kp(kp_code, audit_only=False):
- """根据知识点代码返回题目列表(JSON格式)"""
- try:
- conn = get_db_connection()
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)}), 500
-
- try:
- cursor = conn.cursor(dictionary=True)
-
- # 获取年级筛选参数(可选)
- grade_filter = request.args.get('grade', type=int)
- # 获取创建人筛选参数(可选)
- create_by_filter = request.args.get('create_by', type=str)
- # 获取创建时间筛选参数(可选,格式:YYYY-MM-DD)
- created_at_filter = request.args.get('created_at', type=str)
- # 获取分页参数(可选)
- page = request.args.get('page', type=int, default=1)
- page_size = 20 # 每页显示20道题
-
- # 先获取知识点名称(如果kp_code不为空)
- kp_name = kp_code
- if kp_code != 'null' and kp_code != '':
- cursor.execute("SELECT name FROM knowledge_points_copy1 WHERE kp_code = %s", (kp_code,))
- kp_row = cursor.fetchone()
- kp_name = kp_row['name'] if kp_row else kp_code
-
- # 构建查询条件
- where_conditions = []
- params = []
-
- # 知识点条件
- # 如果kp_code为'all',表示查询所有题目(不限制知识点)
- if kp_code == 'all':
- kp_name = "全部题目"
- # 不添加知识点条件,查询所有题目
- elif kp_code == 'null' or kp_code == '':
- where_conditions.append("(kp_code IS NULL OR kp_code = '' OR kp_code = 'null')")
- kp_name = "其他题目"
- else:
- where_conditions.append("kp_code = %s")
- params.append(kp_code)
-
- # 审核状态条件
- if audit_only:
- where_conditions.append("(audit_reason IS NULL OR audit_reason = '')")
-
- # 年级筛选条件(仅对"其他题目"有效,因为其他题目有grade字段)
- # grade字段是int类型,使用CAST确保类型匹配
- # 注意:如果grade为NULL,CAST会返回NULL,无法匹配,所以需要确保grade不为NULL
- if grade_filter is not None and (kp_code == 'null' or kp_code == ''):
- where_conditions.append("grade IS NOT NULL AND CAST(grade AS UNSIGNED) = %s")
- params.append(grade_filter)
-
- # 创建人筛选条件
- if create_by_filter:
- where_conditions.append("create_by = %s")
- params.append(create_by_filter)
-
- # 创建时间筛选条件(按天筛选)
- if created_at_filter:
- where_conditions.append("DATE(created_at) = %s")
- params.append(created_at_filter)
-
- where_clause = " AND ".join(where_conditions)
-
- # 先查询总数(用于分页)
- count_query = f"SELECT COUNT(*) as total FROM questions_tem WHERE {where_clause}"
- cursor.execute(count_query, params)
- total_count = cursor.fetchone()['total']
- total_pages = (total_count + page_size - 1) // page_size # 向上取整
-
- # 确保页码有效
- if page < 1:
- page = 1
- if page > total_pages and total_pages > 0:
- page = total_pages
-
- # 计算偏移量
- offset = (page - 1) * page_size
-
- # 查询题目(带分页)
- # 使用 id DESC 排序,确保列表顺序正确(按创建时间倒序,最新的在前)
- query = f"""
- SELECT
- question_code,
- stem,
- kp_code,
- kp_id,
- audit_reason,
- audit_status,
- difficulty,
- question_type,
- answer,
- solution,
- options,
- grade,
- create_by,
- created_at
- FROM questions_tem
- WHERE {where_clause}
- ORDER BY id DESC
- LIMIT %s OFFSET %s
- """
-
- params_with_pagination = params + [page_size, offset]
- cursor.execute(query, params_with_pagination)
-
- questions = cursor.fetchall()
-
- # 为了统计,需要查询所有题目(不分页)
- # 统计查询不需要排序,但为了保持一致性也使用 id DESC
- stats_query = f"""
- SELECT
- question_code,
- audit_reason,
- difficulty,
- question_type
- FROM questions_tem
- WHERE {where_clause}
- ORDER BY id DESC
- """
- cursor.execute(stats_query, params)
- all_questions_for_stats = cursor.fetchall()
-
- # 处理选项JSON
- for q in questions:
- if q.get('options'):
- try:
- opt_data = q['options']
- if isinstance(opt_data, str):
- opt_data = opt_data.replace("'", '"')
- q['options'] = json.loads(opt_data)
- except:
- q['options'] = {}
-
- # 计算统计数据(基于所有题目,不分页)
- # 审核统计
- pass_count = sum(1 for q in all_questions_for_stats if q.get('audit_reason') == '合格')
- fail_count = sum(1 for q in all_questions_for_stats if q.get('audit_reason') == '不合格')
- pending_count = sum(1 for q in all_questions_for_stats if not q.get('audit_reason') or q.get('audit_reason') == '')
-
- # 难度统计
- difficulty_jichu = 0 # 筑基 0.2
- difficulty_tifen = 0 # 提分 0.4
- difficulty_peiyou = 0 # 培优 0.7
- difficulty_unknown = 0 # 未设置难度
-
- for q in all_questions_for_stats:
- diff = q.get('difficulty')
- if diff is not None:
- try:
- diff_float = float(diff)
- if abs(diff_float - 0.2) < 0.1:
- difficulty_jichu += 1
- elif abs(diff_float - 0.4) < 0.1:
- difficulty_tifen += 1
- elif abs(diff_float - 0.7) < 0.1:
- difficulty_peiyou += 1
- else:
- difficulty_unknown += 1
- except:
- difficulty_unknown += 1
- else:
- difficulty_unknown += 1
-
- # 题型统计
- question_type_stats = {}
- for q in all_questions_for_stats:
- q_type = q.get('question_type') or '未分类'
- question_type_stats[q_type] = question_type_stats.get(q_type, 0) + 1
-
- conn.close()
-
- return jsonify({
- 'success': True,
- 'kp_code': kp_code,
- 'kp_name': kp_name,
- 'questions': questions,
- 'count': total_count,
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total_pages': total_pages,
- 'total_count': total_count
- },
- 'stats': {
- 'total': total_count,
- 'audit': {
- 'pass': pass_count,
- 'fail': fail_count,
- 'pending': pending_count,
- 'pass_rate': round((pass_count / (pass_count + fail_count) * 100) if (pass_count + fail_count) > 0 else 0, 1),
- 'audit_rate': round(((pass_count + fail_count) / total_count * 100) if total_count > 0 else 0, 1)
- },
- 'difficulty': {
- 'jichu': difficulty_jichu,
- 'tifen': difficulty_tifen,
- 'peiyou': difficulty_peiyou,
- 'unknown': difficulty_unknown
- },
- 'question_type': question_type_stats
- }
- })
- except Exception as e:
- try:
- conn.close()
- except:
- pass
- return jsonify({'success': False, 'error': str(e)}), 500
- @app.route('/')
- def index():
- """首页:显示题目统计信息"""
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
-
- # 总体统计
- cursor.execute("""
- SELECT
- COUNT(*) AS total_count,
- SUM(CASE WHEN audit_reason = '合格' THEN 1 ELSE 0 END) AS pass_count,
- SUM(CASE WHEN audit_reason = '不合格' OR audit_status = 1 THEN 1 ELSE 0 END) AS fail_count,
- SUM(CASE WHEN audit_reason IS NULL OR audit_reason = '' THEN 1 ELSE 0 END) AS pending_count
- FROM questions_tem
- """)
- overall_stats = cursor.fetchone()
-
- total = int(overall_stats['total_count'] or 0)
- pass_count = int(overall_stats['pass_count'] or 0)
- fail_count = int(overall_stats['fail_count'] or 0)
- pending_count = int(overall_stats['pending_count'] or 0)
- audited_count = pass_count + fail_count
-
- # 计算审核通过率
- pass_rate = (pass_count / audited_count * 100) if audited_count > 0 else 0
- audit_rate = (audited_count / total * 100) if total > 0 else 0
-
- # 教材系列统计
- cursor.execute("""
- SELECT
- COUNT(*) AS total_series,
- SUM(CASE WHEN is_active = 1 THEN 1 ELSE 0 END) AS active_series
- FROM textbook_series_copy1
- """)
- series_stats = cursor.fetchone()
- total_series = int(series_stats['total_series'] or 0)
- active_series = int(series_stats['active_series'] or 0)
-
- # 教材统计
- cursor.execute("SELECT COUNT(*) AS total_textbooks FROM textbooks_copy1")
- textbook_stats = cursor.fetchone()
- total_textbooks = int(textbook_stats['total_textbooks'] or 0)
-
- # 知识点统计
- cursor.execute("SELECT COUNT(*) AS total_kp FROM knowledge_points_copy1")
- kp_stats = cursor.fetchone()
- total_kp = int(kp_stats['total_kp'] or 0)
-
- # 知识点层级统计(按级别统计)
- cursor.execute("""
- SELECT
- CASE
- WHEN parent_kp_code IS NULL OR parent_kp_code = '' THEN 'level_0'
- WHEN parent_kp_code IN (SELECT kp_code FROM knowledge_points_copy1 WHERE parent_kp_code IS NULL OR parent_kp_code = '') THEN 'level_1'
- ELSE 'level_2_plus'
- END AS level_type,
- COUNT(*) AS count
- FROM knowledge_points_copy1
- GROUP BY level_type
- """)
- kp_level_stats = cursor.fetchall()
- kp_level_0 = 0
- kp_level_1 = 0
- kp_level_2_plus = 0
- for row in kp_level_stats:
- if row['level_type'] == 'level_0':
- kp_level_0 = int(row['count'] or 0)
- elif row['level_type'] == 'level_1':
- kp_level_1 = int(row['count'] or 0)
- else:
- kp_level_2_plus = int(row['count'] or 0)
-
- # 最近添加的题目(如果有创建时间字段)
- cursor.execute("""
- SELECT question_code, stem, kp_id, audit_reason
- FROM questions_tem
- ORDER BY id DESC
- LIMIT 5
- """)
- recent_questions = cursor.fetchall()
-
- # 按审核状态统计
- status_stats = {
- 'pass': pass_count,
- 'fail': fail_count,
- 'pending': pending_count,
- }
-
- conn.close()
-
- return render_template('index.html',
- total=total,
- pass_count=pass_count,
- fail_count=fail_count,
- pending_count=pending_count,
- audited_count=audited_count,
- pass_rate=round(pass_rate, 1),
- audit_rate=round(audit_rate, 1),
- total_series=total_series,
- active_series=active_series,
- total_textbooks=total_textbooks,
- total_kp=total_kp,
- kp_level_0=kp_level_0,
- kp_level_1=kp_level_1,
- kp_level_2_plus=kp_level_2_plus,
- recent_questions=recent_questions,
- status_stats=status_stats)
- @app.route('/audit_questions')
- def audit_questions():
- """审核题目页面:显示所有未审核的题目,按知识点分类"""
- try:
- # 从数据库加载知识点树形结构(只包含有未审核题目的节点)
- _, kp_tree, other_questions_count = load_kp_structure_with_pending_count()
-
- # 查询总体统计
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("""
- SELECT
- COUNT(*) AS total_count,
- SUM(CASE WHEN audit_reason = '合格' THEN 1 ELSE 0 END) AS pass_count,
- SUM(CASE WHEN audit_reason = '不合格' OR audit_status = 1 THEN 1 ELSE 0 END) AS fail_count,
- SUM(CASE WHEN audit_reason IS NULL OR audit_reason = '' THEN 1 ELSE 0 END) AS pending_count
- FROM questions_tem
- """)
- overall_stats = cursor.fetchone()
- total = int(overall_stats['total_count'] or 0)
- pass_count = int(overall_stats['pass_count'] or 0)
- fail_count = int(overall_stats['fail_count'] or 0)
- pending_count = int(overall_stats['pending_count'] or 0)
- audited_count = pass_count + fail_count
- pass_rate = (pass_count / audited_count * 100) if audited_count > 0 else 0
- audit_rate = (audited_count / total * 100) if total > 0 else 0
-
- # 查询所有创建人列表(用于筛选)
- cursor.execute("""
- SELECT DISTINCT create_by
- FROM questions_tem
- WHERE create_by IS NOT NULL AND create_by != ''
- ORDER BY create_by
- """)
- creators = [row['create_by'] for row in cursor.fetchall()]
-
- conn.close()
-
- return render_template('audit_questions.html',
- kp_tree=kp_tree,
- other_questions_count=other_questions_count,
- total=total,
- pass_count=pass_count,
- fail_count=fail_count,
- pending_count=pending_count,
- audited_count=audited_count,
- pass_rate=round(pass_rate, 1),
- audit_rate=round(audit_rate, 1),
- creators=creators)
- except Exception as e:
- return render_db_error(e)
- @app.route('/search')
- def search_by_question_code():
- """
- 输入 question_code 直接跳转题目详情。
- GET /search?q=xxxx
- """
- q = (request.args.get("q") or "").strip()
- if not q:
- return redirect(url_for("index"))
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
- cursor.execute("SELECT question_code FROM questions_tem WHERE question_code = %s LIMIT 1", (q,))
- row = cursor.fetchone()
- conn.close()
- if row:
- return redirect(url_for("detail", question_code=q))
- return render_template("search_not_found.html", q=q)
- @app.route('/search_id')
- def search_by_question_id():
- """
- 输入主键ID(比如 id / question_id / pk_id / qid)查题。
- GET /search_id?id=123
- """
- raw = (request.args.get("id") or "").strip()
- if not raw:
- return redirect(url_for("index"))
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- pk_cols = get_pk_columns(conn)
- if not pk_cols:
- conn.close()
- return render_template("search_id_not_supported.html", q=raw)
- cursor = conn.cursor(dictionary=True)
- found_code = None
- for col in pk_cols:
- try:
- cursor.execute(f"SELECT question_code FROM questions_tem WHERE {col} = %s LIMIT 1", (raw,))
- row = cursor.fetchone()
- if row and row.get("question_code"):
- found_code = row["question_code"]
- break
- except Exception:
- continue
- conn.close()
- if found_code:
- return redirect(url_for("detail", question_code=found_code))
- return render_template("search_id_not_found.html", q=raw, pk_cols=pk_cols)
- @app.route('/questions/<kp_code>')
- def question_list(kp_code):
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
-
- # 检查是否是审核模式(只显示未审核题目)
- audit_only = request.args.get('audit_only', '').lower() == 'true'
-
- # 处理 kp_code 为 'null' 或空字符串的情况(表示没有 kp_id 的题目)
- # 注意:URL参数仍使用 kp_code,但数据库字段已改为 kp_id
- # 使用别名 kp_id AS kp_code 以兼容模板代码
- if kp_code == 'null' or kp_code == '':
- if audit_only:
- cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE (kp_id IS NULL OR kp_id = '') AND (audit_reason IS NULL OR audit_reason = '') ORDER BY question_code DESC")
- else:
- cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id IS NULL OR kp_id = '' ORDER BY question_code DESC")
- kp_name = "未分类题目"
- hierarchy_info = {'chapter': None, 'section': None, 'subsection': None}
- else:
- # 查找层级信息,判断是否是章节级别
- hierarchy_info = find_kp_hierarchy(kp_code)
-
- # 如果是章节级别,需要查询该章节下所有 section 和 subsection 的题目
- if hierarchy_info['chapter'] and not hierarchy_info['section'] and not hierarchy_info['subsection']:
- # 这是章节级别,需要收集该章节下所有的 kp_id
- chapter_id = hierarchy_info['chapter']['id']
- kp_ids = [str(chapter_id)] # 包含章节本身
-
- # 查找该章节下的所有 section 和 subsection
- for chapter in KP_HIERARCHY:
- if chapter["id"] == chapter_id:
- for section in chapter.get("sections", []):
- kp_ids.append(str(section["id"]))
- for subsection in section.get("subsections", []):
- kp_ids.append(str(subsection["id"]))
- break
-
- # 使用 IN 查询该章节下所有知识点的题目
- placeholders = ','.join(['%s'] * len(kp_ids))
- if audit_only:
- query = f"SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id IN ({placeholders}) AND (audit_reason IS NULL OR audit_reason = '') ORDER BY question_code DESC"
- else:
- query = f"SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id IN ({placeholders}) ORDER BY question_code DESC"
- cursor.execute(query, tuple(kp_ids))
- kp_name = hierarchy_info['chapter']['label']
- else:
- # 普通的知识点查询(section 或 subsection)
- if audit_only:
- cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id = %s AND (audit_reason IS NULL OR audit_reason = '') ORDER BY question_code DESC", (kp_code,))
- else:
- cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE kp_id = %s ORDER BY question_code DESC", (kp_code,))
- kp_name = KP_MAP.get(str(kp_code), kp_code)
-
- questions = cursor.fetchall()
-
- # 构建录入题目页面的URL
- add_question_url = f'/add_question?kp_code={kp_code}'
- if hierarchy_info['chapter']:
- chapter_label_encoded = urllib.parse.quote(hierarchy_info['chapter']['label'])
- add_question_url += f"&chapter={hierarchy_info['chapter']['id']}&chapter_label={chapter_label_encoded}"
- if hierarchy_info['section']:
- section_label_encoded = urllib.parse.quote(hierarchy_info['section']['label'])
- add_question_url += f"§ion={hierarchy_info['section']['id']}§ion_label={section_label_encoded}"
- if hierarchy_info['subsection']:
- subsection_label_encoded = urllib.parse.quote(hierarchy_info['subsection']['label'])
- add_question_url += f"&subsection={hierarchy_info['subsection']['id']}&subsection_label={subsection_label_encoded}"
-
- conn.close()
- return render_template('questions.html',
- questions=questions,
- kp_code=kp_code,
- kp_name=kp_name,
- node_id=None,
- hierarchy_info=hierarchy_info,
- add_question_url=add_question_url,
- audit_only=audit_only)
- @app.route('/textbook/<node_id>')
- def textbook_question_list(node_id):
- """教材节点题目列表"""
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
-
- # 获取教材节点标题
- try:
- cursor.execute("SELECT title FROM textbook_catalog_nodes WHERE id = %s", (node_id,))
- node_row = cursor.fetchone()
- title = node_row.get('title') if node_row else f'节点{node_id}'
- except Exception:
- # 如果表不存在,使用默认标题
- title = f'节点{node_id}'
-
- # 查询该节点下的所有题目(使用别名 kp_id AS kp_code 以兼容模板代码)
- cursor.execute(
- "SELECT *, kp_id AS kp_code FROM questions_tem WHERE textbook_catalog_nodes_id = %s ORDER BY question_code DESC",
- (node_id,)
- )
- questions = cursor.fetchall()
- conn.close()
-
- return render_template('questions.html', questions=questions, kp_code=None, kp_name=title, node_id=node_id)
- @app.route('/detail/<question_code>')
- def detail(question_code):
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
- # 使用别名 kp_id AS kp_code 以兼容模板代码
- cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE question_code = %s", (question_code,))
- question = cursor.fetchone()
-
- if not question:
- conn.close()
- return render_template("search_not_found.html", q=question_code), 404
-
- # 获取上下题索引(从 kp_id 字段读取知识点ID,但模板中使用 kp_code)
- # 优先使用 URL 参数中的 kp_code(用于返回列表时定位到正确的知识点)
- kp_code_from_url = request.args.get('kp_code')
- kp_code_db = kp_code_from_url or question.get('kp_id') or question.get('kp_code') # 兼容处理
- textbook_node_id = question.get('textbook_catalog_nodes_id')
- prev_code = None
- next_code = None
- curr_idx = 0
- total = 1
-
- if kp_code_db:
- # 有 kp_id:查询同知识点下的所有题目
- cursor.execute("SELECT question_code FROM questions_tem WHERE kp_id = %s ORDER BY question_code DESC", (kp_code_db,))
- all_codes = [r['question_code'] for r in cursor.fetchall()]
-
- if question_code in all_codes:
- curr_idx = all_codes.index(question_code)
- prev_code = all_codes[curr_idx - 1] if curr_idx > 0 else None
- next_code = all_codes[curr_idx + 1] if curr_idx < len(all_codes) - 1 else None
- total = len(all_codes)
- elif textbook_node_id:
- # 没有 kp_code 但有 textbook_catalog_nodes_id:查询同教材节点下的所有题目
- cursor.execute(
- "SELECT question_code FROM questions_tem WHERE textbook_catalog_nodes_id = %s ORDER BY question_code DESC",
- (textbook_node_id,)
- )
- all_codes = [r['question_code'] for r in cursor.fetchall()]
-
- if question_code in all_codes:
- curr_idx = all_codes.index(question_code)
- prev_code = all_codes[curr_idx - 1] if curr_idx > 0 else None
- next_code = all_codes[curr_idx + 1] if curr_idx < len(all_codes) - 1 else None
- total = len(all_codes)
- else:
- # 既没有 kp_code 也没有 textbook_catalog_nodes_id:不显示上下题导航
- pass
-
- conn.close()
-
- # 处理选项 JSON
- options = []
- if question.get('options'):
- try:
- opt_data = question['options']
- if isinstance(opt_data, str):
- # 兼容单引号 JSON
- opt_data = opt_data.replace("'", '"')
- d = json.loads(opt_data)
- else:
- d = opt_data
- options = sorted(d.items())
- except:
- options = []
- # 处理知识点名称:如果没有 kp_code,显示"未分类"
- kp_name = "未分类题目"
- hierarchy_info = {'chapter': None, 'section': None, 'subsection': None}
- add_question_url = None
-
- if kp_code_db:
- kp_name = KP_MAP.get(str(kp_code_db), kp_code_db)
- # 查找层级信息
- hierarchy_info = find_kp_hierarchy(kp_code_db)
- # 构建录入题目页面的URL
- add_question_url = f'/add_question?kp_code={kp_code_db}'
- if hierarchy_info['chapter']:
- chapter_label_encoded = urllib.parse.quote(hierarchy_info['chapter']['label'])
- add_question_url += f"&chapter={hierarchy_info['chapter']['id']}&chapter_label={chapter_label_encoded}"
- if hierarchy_info['section']:
- section_label_encoded = urllib.parse.quote(hierarchy_info['section']['label'])
- add_question_url += f"§ion={hierarchy_info['section']['id']}§ion_label={section_label_encoded}"
- if hierarchy_info['subsection']:
- subsection_label_encoded = urllib.parse.quote(hierarchy_info['subsection']['label'])
- add_question_url += f"&subsection={hierarchy_info['subsection']['id']}&subsection_label={subsection_label_encoded}"
- elif textbook_node_id:
- # 查询教材节点标题
- try:
- cursor.execute("SELECT title FROM textbook_catalog_nodes WHERE id = %s", (textbook_node_id,))
- node_row = cursor.fetchone()
- if node_row:
- kp_name = node_row.get('title') or f'节点{textbook_node_id}'
- else:
- kp_name = f'节点{textbook_node_id}'
- except Exception as e:
- # 如果表不存在或查询失败,使用默认标题
- print(f"Warning: 无法查询教材节点标题: {e}")
- kp_name = f'节点{textbook_node_id}'
- return render_template('detail.html',
- q=question,
- options=options,
- kp_name=kp_name,
- kp_code=kp_code_db,
- node_id=str(textbook_node_id) if textbook_node_id else None,
- prev_code=prev_code,
- next_code=next_code,
- total=total,
- curr_num=curr_idx + 1 if total > 0 else 1,
- add_question_url=add_question_url)
- @app.route('/audit', methods=['POST'])
- def audit():
- data = request.json
- q_code = data.get('question_code')
- status_text = data.get('audit_reason') # "合格" or "不合格"
- status_val = 1 if status_text == "不合格" else 0
-
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 更新 questions_tem 表的审核状态
- # 注意:所有操作只在 questions_tem 表,不再同步到 questions 表
- cursor.execute("UPDATE questions_tem SET audit_reason=%s, audit_status=%s WHERE question_code=%s",
- (status_text, status_val, q_code))
-
- # 已移除同步到 questions 表的逻辑,所有操作只在 questions_tem 表
-
- conn.commit()
- conn.close()
- return jsonify({'success': True})
- except Exception as e:
- import traceback
- traceback.print_exc()
- return jsonify({'success': False, 'error': str(e)})
- @app.route("/export_pdf_remote/<question_code>")
- def export_pdf_remote(question_code):
- """
- 远程导出:调用你给的接口拿到 pdf_url(可能是一个或多个),然后全部打开。
- """
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- try:
- cursor = conn.cursor(dictionary=True)
- # 使用别名 kp_id AS kp_code 以兼容模板代码
- cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE question_code = %s", (question_code,))
- question = cursor.fetchone()
- finally:
- try:
- conn.close()
- except Exception:
- pass
- if not question:
- return render_template("search_not_found.html", q=question_code), 404
- qid = get_question_pk(question)
- if not qid:
- # 没有常见主键列名,无法给 question_ids
- return render_template("search_id_not_supported.html", q=str(question_code)), 400
- try:
- pdf_urls = request_remote_pdf_url(qid, include_grading=True)
- except Exception as e:
- # PDF接口失败,不显示数据库连接信息
- return render_template(
- "db_error.html",
- error=str(e),
- db_host=None,
- db_port=None,
- db_name=None,
- db_user=None,
- ), 500
- if not pdf_urls:
- # PDF接口未返回有效数据,不显示数据库连接信息
- return render_template(
- "db_error.html",
- error="PDF 接口未返回有效的 pdf_url",
- db_host=None,
- db_port=None,
- db_name=None,
- db_user=None,
- ), 500
- # 如果有多个 PDF,返回一个 HTML 页面,用 JavaScript 全部打开
- if len(pdf_urls) > 1:
- return render_template("open_multiple_pdfs.html", pdf_urls=pdf_urls)
- else:
- # 只有一个 PDF,直接重定向
- return redirect(pdf_urls[0])
- @app.route("/api/optimize_stem/<question_code>", methods=["POST"])
- def api_optimize_stem(question_code):
- """
- 生成“优化后的 stem”,并返回左右对比所需内容。
- """
- try:
- conn = get_db_connection()
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- try:
- cursor = conn.cursor(dictionary=True)
- cursor.execute("SELECT question_code, stem FROM questions_tem WHERE question_code = %s", (question_code,))
- row = cursor.fetchone()
- finally:
- try:
- conn.close()
- except Exception:
- pass
- if not row:
- return jsonify({"success": False, "error": "题目不存在"}), 404
- old_stem = row.get("stem") or ""
- try:
- new_stem = call_ai_optimize_stem(old_stem)
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- svg_ok = _svg_blocks_equal(old_stem, new_stem)
- return jsonify(
- {
- "success": True,
- "question_code": question_code,
- "old_stem": old_stem,
- "new_stem": new_stem,
- "svg_ok": bool(svg_ok),
- }
- )
- @app.route("/api/replace_stem/<question_code>", methods=["POST"])
- def api_replace_stem(question_code):
- """
- 将新 stem 覆写到 questions_tem.stem(仅改 stem)。
- 额外安全:替换前校验 SVG 不被改动。
- """
- data = request.json or {}
- new_stem = data.get("new_stem")
- if new_stem is None:
- return jsonify({"success": False, "error": "缺少 new_stem"}), 400
- try:
- conn = get_db_connection()
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- try:
- cursor = conn.cursor(dictionary=True)
- cursor.execute("SELECT question_code, stem FROM questions_tem WHERE question_code = %s", (question_code,))
- row = cursor.fetchone()
- if not row:
- return jsonify({"success": False, "error": "题目不存在"}), 404
- old_stem = row.get("stem") or ""
- if not _svg_blocks_equal(old_stem, str(new_stem)):
- return jsonify({"success": False, "error": "安全拦截:检测到 SVG 被改动,已禁止替换。"}), 400
- cur2 = conn.cursor()
- cur2.execute("UPDATE questions_tem SET stem=%s WHERE question_code=%s", (str(new_stem), question_code))
- conn.commit()
- return jsonify({"success": True, "question_code": question_code})
- except Exception as e:
- try:
- conn.rollback()
- except Exception:
- pass
- return jsonify({"success": False, "error": str(e)}), 500
- finally:
- try:
- conn.close()
- except Exception:
- pass
- @app.route('/api/score', methods=['POST'])
- def api_score_question():
- """
- 题目难度评分接口(支持文本、HTML和图片)
-
- 请求格式:
- {
- "stem": "题目文本内容或HTML(可选,HTML中可包含<img>标签)",
- "image_url": "图片URL(可选)",
- "image_base64": "图片base64编码(可选,格式:)",
- "base_url": "基础URL(可选,用于处理HTML中相对路径的图片)",
- "custom_prompt": "可选的自定义提示词"
- }
-
- 返回格式:
- {
- "success": true,
- "data": {
- "difficulty_level": "筑基|提分|培优",
- "final_score": 0.00,
- "dimension_scores": {
- "推理与运算步数": 0.33,
- "知识点数量": 0.33,
- "抽象与构造要求": 0.33,
- "题型常规性": 0.33
- }
- },
- "message": "评分成功"
- }
- """
- client = get_openai_client()
- if client is None:
- return jsonify({
- "success": False,
- "error": "OpenAI客户端未初始化,请检查配置"
- }), 500
-
- try:
- # 获取请求数据
- data = request.get_json()
- if not data:
- return jsonify({
- "success": False,
- "error": "请求体不能为空"
- }), 400
-
- stem = data.get('stem', '').strip()
- image_url = data.get('image_url', '').strip()
- image_base64 = data.get('image_base64', '').strip()
- base_url = data.get('base_url', '').strip() # 用于处理相对路径的图片URL
-
- # 如果stem包含HTML标签,尝试提取图片
- html_images = []
- stem_text = stem
- if stem and ('<img' in stem.lower() or '<image' in stem.lower()):
- html_images = extract_images_from_html(stem, base_url)
- # 同时提取纯文本(保留LaTeX公式等数学符号)
- stem_text = extract_text_from_html(stem)
-
- # 至少需要提供一种输入
- if not stem_text and not image_url and not image_base64 and not html_images:
- return jsonify({
- "success": False,
- "error": "至少需要提供以下之一:stem(文本或HTML)、image_url(图片URL)或image_base64(图片base64)"
- }), 400
-
- # 获取提示词
- custom_prompt = data.get('custom_prompt', '').strip()
- prompt = custom_prompt if custom_prompt else DIFFICULTY_SCORING_PROMPT
-
- # 构建消息内容
- content = []
-
- # 添加文本提示词
- if stem_text:
- full_prompt = f"{prompt}\n\n题目内容:\n{stem_text}"
- else:
- full_prompt = prompt
-
- content.append({
- "type": "text",
- "text": full_prompt
- })
-
- # 添加图片(优先级:base64 > 直接URL > HTML中提取的图片)
- images_to_add = []
-
- if image_base64:
- # 处理base64图片
- if not image_base64.startswith('data:'):
- image_base64 = f"data:image/png;base64,{image_base64}"
- images_to_add.append(image_base64)
- elif image_url:
- # 使用直接提供的图片URL
- images_to_add.append(image_url)
- elif html_images:
- # 使用从HTML中提取的图片URL
- images_to_add.extend(html_images)
-
- # 添加所有图片到content
- for img in images_to_add:
- if img.startswith('data:'):
- # base64图片
- content.append({
- "type": "image_url",
- "image_url": {
- "url": img
- }
- })
- else:
- # URL图片
- content.append({
- "type": "image_url",
- "image_url": {
- "url": img
- }
- })
-
- # 调用OpenAI API
- model_name = DIFFICULTY_SCORING_MODEL or AI_MODEL_NAME
- try:
- response = client.chat.completions.create(
- model=model_name,
- messages=[
- {
- "role": "user",
- "content": content
- }
- ],
- temperature=DIFFICULTY_SCORING_TEMPERATURE,
- top_p=DIFFICULTY_SCORING_TOP_P,
- presence_penalty=DIFFICULTY_SCORING_PRESENCE_PENALTY,
- frequency_penalty=DIFFICULTY_SCORING_FREQUENCY_PENALTY
- )
- except Exception as api_error:
- print(f"OpenAI API调用失败: {str(api_error)}")
- return jsonify({
- "success": False,
- "error": f"AI服务调用失败: {str(api_error)}",
- "error_type": type(api_error).__name__
- }), 500
-
- if not response or not response.choices or len(response.choices) == 0:
- return jsonify({
- "success": False,
- "error": "AI服务返回空响应"
- }), 500
-
- result_text = response.choices[0].message.content
-
- # 调试:打印AI返回的原始内容(前500字符)
- print(f"AI返回原始内容(前500字符): {result_text[:500] if result_text else 'None'}")
-
- # 解析JSON响应
- result_json = parse_json_response(result_text)
-
- if result_json is None:
- return jsonify({
- "success": False,
- "error": "AI返回格式不正确,无法解析JSON",
- "raw_response": result_text[:500] if result_text else "None" # 只返回前500字符,避免响应过大
- }), 500
-
- # 验证返回的JSON结构
- required_fields = ["difficulty_level", "total_score", "dimension_scores"]
- for field in required_fields:
- if field not in result_json:
- return jsonify({
- "success": False,
- "error": f"返回JSON缺少必需字段: {field}",
- "raw_response": result_text
- }), 500
-
- # 验证维度分值(放宽验证,只记录警告,不阻止返回)
- dimension_scores = result_json.get("dimension_scores", {})
- valid_dimension_values = [0.33, 0.66, 1.00]
-
- # 维度名称映射(支持中文和英文键名)
- dimension_name_mapping = {
- # 中文键名(标准)
- "推理与运算步数": "推理与运算步数",
- "知识点数量": "知识点数量",
- "抽象与构造要求": "抽象与构造要求",
- "题型常规性": "题型常规性",
- # 英文键名(兼容)
- "reasoning_steps": "推理与运算步数",
- "knowledge_points": "知识点数量",
- "abstraction_construction": "抽象与构造要求",
- "typicality": "题型常规性",
- # 其他可能的英文键名
- "reasoning": "推理与运算步数",
- "knowledge": "知识点数量",
- "abstraction": "抽象与构造要求",
- "conventionality": "题型常规性"
- }
-
- # 标准化维度分值(统一转换为中文键名)
- normalized_dimension_scores = {}
- for key, value in dimension_scores.items():
- # 查找对应的中文键名
- chinese_key = dimension_name_mapping.get(key, key)
- normalized_dimension_scores[chinese_key] = value
-
- # 更新 result_json 中的 dimension_scores 为标准格式
- result_json["dimension_scores"] = normalized_dimension_scores
-
- # 验证维度分值
- required_dimensions = ["推理与运算步数", "知识点数量", "抽象与构造要求", "题型常规性"]
- validation_errors = []
-
- for dim in required_dimensions:
- if dim not in normalized_dimension_scores:
- validation_errors.append(f"缺少维度评分: {dim}")
- continue
-
- dim_value = normalized_dimension_scores[dim]
- # 允许浮点数精度误差
- if not any(abs(dim_value - v) < 0.01 for v in valid_dimension_values):
- validation_errors.append(f"维度 {dim} 的值 {dim_value} 不在允许范围内(0.33、0.66、1.00)")
-
- # 验证 total_score 计算(四个维度相加后归一化)
- if len(normalized_dimension_scores) == 4:
- calculated_total = sum(normalized_dimension_scores.values()) / 4.0
- total_score = result_json.get("total_score", 0)
-
- # 允许小的浮点数误差(0.05,放宽验证)
- if abs(calculated_total - total_score) > 0.05:
- validation_errors.append(f"total_score ({total_score}) 与计算值 ({calculated_total:.2f}) 不一致")
- else:
- total_score = result_json.get("total_score", 0)
-
- # 验证 difficulty_level 映射(如果 total_score 有效)
- difficulty_level = result_json.get("difficulty_level", "")
- if total_score >= 0 and total_score <= 1:
- if total_score <= 0.25:
- expected_level = "筑基"
- elif total_score <= 0.5:
- expected_level = "提分"
- else:
- expected_level = "培优"
-
- if difficulty_level != expected_level:
- validation_errors.append(f"difficulty_level ({difficulty_level}) 与 total_score ({total_score}) 的映射不一致,应为 {expected_level}")
-
- # 如果有验证错误,记录但不阻止返回(放宽验证)
- if validation_errors:
- print(f"难度评分验证警告: {', '.join(validation_errors)}")
- # 仍然返回结果,但添加警告信息
- result_json["_validation_warnings"] = validation_errors
-
- return jsonify({
- "success": True,
- "data": result_json,
- "message": "评分成功"
- })
-
- except Exception as e:
- import traceback
- error_trace = traceback.format_exc()
- print(f"难度评分接口错误: {error_trace}")
- return jsonify({
- "success": False,
- "error": f"评分过程中出现错误: {str(e)}",
- "error_type": type(e).__name__
- }), 500
- @app.route('/kp_management')
- def kp_management():
- """知识点管理页面:从 knowledge_points_copy1 表读取数据,按层级结构展示"""
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
-
- # 获取所有知识点(显示小学、初中、高中)
- cursor.execute("""
- SELECT
- id, kp_code, name, subject, grade, parent_kp_code,
- prerequisite_kp_codes, dependent_kp_codes, related_kp_codes,
- stats, created_at, updated_at, skills, direct_score, related_score
- FROM knowledge_points_copy1
- WHERE grade IN ('小学', '初中', '高中')
- ORDER BY kp_code ASC
- """)
- all_kps = cursor.fetchall()
-
- # 处理JSON字段
- for kp in all_kps:
- for field in ['prerequisite_kp_codes', 'dependent_kp_codes', 'related_kp_codes', 'stats', 'skills', 'direct_score', 'related_score']:
- if kp[field] and isinstance(kp[field], str):
- try:
- kp[field] = json.loads(kp[field])
- except:
- kp[field] = None
-
- # 构建层级结构
- kp_dict = {kp['kp_code']: kp for kp in all_kps}
- root_kps = []
-
- # 为每个知识点添加children列表
- for kp in all_kps:
- kp['children'] = []
- kp['level'] = 0
-
- # 构建树形结构
- for kp in all_kps:
- if kp['parent_kp_code'] and kp['parent_kp_code'] in kp_dict:
- parent = kp_dict[kp['parent_kp_code']]
- parent['children'].append(kp)
- # 计算层级深度
- kp['level'] = parent['level'] + 1
- else:
- root_kps.append(kp)
-
- # 递归排序:按kp_code排序
- def sort_kp_tree(kp_list):
- kp_list.sort(key=lambda x: x['kp_code'])
- for kp in kp_list:
- if kp['children']:
- sort_kp_tree(kp['children'])
-
- sort_kp_tree(root_kps)
-
- # 扁平化列表(用于表格展示,保持层级顺序)
- # 默认显示:level 0 和 level 1,level 2 及以上默认隐藏
- def flatten_tree(kp_list, result=None, level=0):
- if result is None:
- result = []
- for kp in kp_list:
- kp['display_level'] = level
- kp['has_children'] = len(kp['children']) > 0
- # level 0 和 level 1 默认显示,level 2 及以上默认隐藏
- kp['default_visible'] = level <= 1
- result.append(kp)
- if kp['children']:
- flatten_tree(kp['children'], result, level + 1)
- return result
-
- knowledge_points = flatten_tree(root_kps)
-
- # 获取所有知识点代码和名称,用于下拉选择(显示小学、初中、高中)
- cursor.execute("""
- SELECT kp_code, name
- FROM knowledge_points_copy1
- WHERE grade IN ('小学', '初中', '高中')
- ORDER BY kp_code ASC
- """)
- kp_options = cursor.fetchall()
-
- # 统计每个知识点关联的题目数量(统计小学、初中、高中)
- # questions_tem 表使用 kp_code 字段,关联 knowledge_points_copy1 的 kp_code 字段
- cursor.execute("""
- SELECT
- kp.id,
- kp.kp_code,
- COUNT(q.question_code) as question_count
- FROM knowledge_points_copy1 kp
- LEFT JOIN questions_tem q ON q.kp_code = kp.kp_code
- WHERE kp.grade IN ('小学', '初中', '高中')
- GROUP BY kp.id, kp.kp_code
- """)
- question_counts_raw = cursor.fetchall()
- question_counts = {row['kp_code']: row['question_count'] for row in question_counts_raw}
-
- # 递归计算父节点的题目数量(包括所有子节点的题目数量)
- def calculate_total_question_count(kp):
- # 先计算当前节点直接关联的题目数量
- direct_count = question_counts.get(kp['kp_code'], 0)
-
- # 递归计算所有子节点的题目数量总和
- children_count = 0
- if kp['children']:
- for child in kp['children']:
- children_count += calculate_total_question_count(child)
-
- # 父节点的题目数量 = 直接关联的题目 + 所有子节点的题目总和
- total_count = direct_count + children_count
- kp['question_count'] = total_count
- kp['direct_question_count'] = direct_count # 保存直接关联的题目数量
-
- return total_count
-
- # 计算所有节点的题目数量(包括子节点)
- for kp in root_kps:
- calculate_total_question_count(kp)
-
- # 创建知识点代码到题目数量的映射(用于扁平列表)
- kp_code_to_question_count = {}
- def build_question_count_map(kp_list):
- for kp in kp_list:
- kp_code_to_question_count[kp['kp_code']] = kp.get('question_count', 0)
- if kp.get('children'):
- build_question_count_map(kp['children'])
-
- build_question_count_map(root_kps)
-
- # 更新扁平列表的题目数量
- for kp in knowledge_points:
- kp['question_count'] = kp_code_to_question_count.get(kp['kp_code'], 0)
-
- # 获取各学段的统计信息(显示小学、初中、高中)
- cursor.execute("""
- SELECT
- grade,
- COUNT(*) as count
- FROM knowledge_points_copy1
- WHERE grade IN ('小学', '初中', '高中')
- GROUP BY grade
- """)
- grade_stats_raw = cursor.fetchall()
- grade_stats = {row['grade']: row['count'] for row in grade_stats_raw}
-
- # 显示小学、初中、高中
- grade_info = {
- '小学': {'count': grade_stats.get('小学', 0), 'color': 'from-pink-500 to-rose-600', 'icon': 'ri-book-open-line', 'bg': 'bg-gradient-to-br from-pink-50 to-rose-50'},
- '初中': {'count': grade_stats.get('初中', 0), 'color': 'from-blue-500 to-indigo-600', 'icon': 'ri-graduation-cap-line', 'bg': 'bg-gradient-to-br from-blue-50 to-indigo-50'},
- '高中': {'count': grade_stats.get('高中', 0), 'color': 'from-purple-500 to-violet-600', 'icon': 'ri-school-line', 'bg': 'bg-gradient-to-br from-purple-50 to-violet-50'}
- }
-
- conn.close()
-
- return render_template('kp_management.html',
- knowledge_points=knowledge_points,
- kp_tree=root_kps,
- kp_options=kp_options,
- grade_info=grade_info)
- @app.route('/textbook_management')
- def textbook_management():
- """教材管理页面"""
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
-
- # 获取所有教材系列
- cursor.execute("SELECT * FROM textbook_series_copy1 ORDER BY sort_order ASC, id ASC")
- series_list = cursor.fetchall()
-
- # 获取所有教材
- cursor.execute("SELECT * FROM textbooks_copy1 ORDER BY series_id ASC, grade ASC, semester ASC")
- textbooks_list = cursor.fetchall()
-
- # 获取所有知识点(用于关联)
- cursor.execute("SELECT kp_code, name FROM knowledge_points_copy1 ORDER BY kp_code ASC")
- kp_options = cursor.fetchall()
-
- conn.close()
-
- return render_template('textbook_management.html',
- series_list=series_list,
- textbooks_list=textbooks_list,
- kp_options=kp_options)
- # ==================== 教材系列 API ====================
- @app.route('/api/textbook/series/create', methods=['POST'])
- def api_textbook_series_create():
- """创建教材系列"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- name = (data.get('name') or '').strip()
- slug = (data.get('slug') or '').strip() or None
- publisher = (data.get('publisher') or '').strip() or None
- region = (data.get('region') or '').strip() or None
- is_active = 1 if data.get('is_active') else 0
-
- if not name:
- return jsonify({'success': False, 'error': '系列名称不能为空'})
-
- cursor.execute("""
- INSERT INTO textbook_series_copy1
- (name, slug, publisher, region, is_active, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
- """, (name, slug, publisher, region, is_active))
-
- conn.commit()
- new_id = cursor.lastrowid
- conn.close()
- return jsonify({'success': True, 'message': '创建成功', 'id': new_id})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/series/update/<int:series_id>', methods=['POST'])
- def api_textbook_series_update(series_id):
- """更新教材系列"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- name = (data.get('name') or '').strip()
- slug = (data.get('slug') or '').strip() or None
- publisher = (data.get('publisher') or '').strip() or None
- region = (data.get('region') or '').strip() or None
- is_active = 1 if data.get('is_active') else 0
-
- if not name:
- return jsonify({'success': False, 'error': '系列名称不能为空'})
-
- cursor.execute("""
- UPDATE textbook_series_copy1
- SET name = %s, slug = %s, publisher = %s, region = %s, is_active = %s, updated_at = NOW()
- WHERE id = %s
- """, (name, slug, publisher, region, is_active, series_id))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '更新成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/series/delete/<int:series_id>', methods=['POST'])
- def api_textbook_series_delete(series_id):
- """删除教材系列"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 检查是否有关联的教材
- cursor.execute("SELECT COUNT(*) as count FROM textbooks_copy1 WHERE series_id = %s", (series_id,))
- result = cursor.fetchone()
- if result['count'] > 0:
- return jsonify({'success': False, 'error': '该系列下存在教材,无法删除'})
-
- cursor.execute("DELETE FROM textbook_series_copy1 WHERE id = %s", (series_id,))
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '删除成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/series/get/<int:series_id>', methods=['GET'])
- def api_textbook_series_get(series_id):
- """获取教材系列详情"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("SELECT * FROM textbook_series_copy1 WHERE id = %s", (series_id,))
- series = cursor.fetchone()
- conn.close()
-
- if not series:
- return jsonify({'success': False, 'error': '系列不存在'})
-
- return jsonify({'success': True, 'data': series})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/series/toggle_active/<int:series_id>', methods=['POST'])
- def api_textbook_series_toggle_active(series_id):
- """切换教材系列激活状态"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 获取当前状态
- cursor.execute("SELECT is_active FROM textbook_series_copy1 WHERE id = %s", (series_id,))
- series = cursor.fetchone()
-
- if not series:
- return jsonify({'success': False, 'error': '系列不存在'})
-
- # 切换状态:1变0,0变1
- new_status = 1 if series['is_active'] == 0 else 0
-
- cursor.execute("""
- UPDATE textbook_series_copy1
- SET is_active = %s, updated_at = NOW()
- WHERE id = %s
- """, (new_status, series_id))
-
- conn.commit()
- conn.close()
- return jsonify({
- 'success': True,
- 'message': '状态已更新',
- 'is_active': new_status
- })
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- # ==================== 教材 API ====================
- @app.route('/api/textbook/create', methods=['POST'])
- def api_textbook_create():
- """创建教材"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- series_id = data.get('series_id')
- official_title = (data.get('official_title') or '').strip()
- stage = (data.get('stage') or '').strip() or None
- grade = (data.get('grade') or '').strip() or None
- semester = data.get('semester')
-
- if not series_id:
- return jsonify({'success': False, 'error': '教材系列不能为空'})
- if not official_title:
- return jsonify({'success': False, 'error': '教材名称不能为空'})
-
- cursor.execute("""
- INSERT INTO textbooks_copy1
- (series_id, official_title, stage, grade, semester, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
- """, (series_id, official_title, stage, grade, semester))
-
- conn.commit()
- new_id = cursor.lastrowid
- conn.close()
- return jsonify({'success': True, 'message': '创建成功', 'id': new_id})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/update/<int:textbook_id>', methods=['POST'])
- def api_textbook_update(textbook_id):
- """更新教材"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- official_title = (data.get('official_title') or '').strip()
- stage = (data.get('stage') or '').strip() or None
- grade = (data.get('grade') or '').strip() or None
- semester = data.get('semester')
-
- if not official_title:
- return jsonify({'success': False, 'error': '教材名称不能为空'})
-
- cursor.execute("""
- UPDATE textbooks_copy1
- SET official_title = %s, stage = %s, grade = %s, semester = %s, updated_at = NOW()
- WHERE id = %s
- """, (official_title, stage, grade, semester, textbook_id))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '更新成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/delete/<int:textbook_id>', methods=['POST'])
- def api_textbook_delete(textbook_id):
- """删除教材"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 检查是否有目录节点
- cursor.execute("SELECT COUNT(*) as count FROM textbook_catalog_nodes_copy1 WHERE textbook_id = %s", (textbook_id,))
- result = cursor.fetchone()
- if result['count'] > 0:
- return jsonify({'success': False, 'error': '该教材下存在目录节点,无法删除'})
-
- cursor.execute("DELETE FROM textbooks_copy1 WHERE id = %s", (textbook_id,))
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '删除成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/get/<int:textbook_id>', methods=['GET'])
- def api_textbook_get(textbook_id):
- """获取教材详情"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("SELECT * FROM textbooks_copy1 WHERE id = %s", (textbook_id,))
- textbook = cursor.fetchone()
- conn.close()
-
- if not textbook:
- return jsonify({'success': False, 'error': '教材不存在'})
-
- return jsonify({'success': True, 'data': textbook})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/list/<int:series_id>', methods=['GET'])
- def api_textbook_list(series_id):
- """获取指定系列下的所有教材"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("SELECT * FROM textbooks_copy1 WHERE series_id = %s ORDER BY grade ASC, semester ASC", (series_id,))
- textbooks = cursor.fetchall()
- conn.close()
-
- return jsonify({'success': True, 'data': textbooks})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- # ==================== 目录节点 API ====================
- @app.route('/api/textbook/catalog/create', methods=['POST'])
- def api_textbook_catalog_create():
- """创建目录节点"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- textbook_id = data.get('textbook_id')
- parent_id = data.get('parent_id') # 可以为None(顶级节点)
- node_type = (data.get('node_type') or 'chapter').strip()
- title = (data.get('title') or '').strip()
- display_no = (data.get('display_no') or '').strip() or None
-
- if not textbook_id:
- return jsonify({'success': False, 'error': '教材ID不能为空'})
- if not title:
- return jsonify({'success': False, 'error': '节点标题不能为空'})
-
- # 验证节点类型层级关系
- if parent_id:
- # 有父节点,需要验证节点类型
- cursor.execute("SELECT node_type FROM textbook_catalog_nodes_copy1 WHERE id = %s", (parent_id,))
- parent = cursor.fetchone()
- if not parent:
- return jsonify({'success': False, 'error': '父节点不存在'})
-
- parent_node_type = parent['node_type']
- # 章节下只能创建 section
- if parent_node_type == 'chapter' and node_type != 'section':
- return jsonify({'success': False, 'error': '章节下只能创建小节(section)'})
- # section 下只能创建 subsection
- elif parent_node_type == 'section' and node_type != 'subsection':
- return jsonify({'success': False, 'error': '小节下只能创建子小节(subsection)'})
- # subsection 下不能再创建子节点
- elif parent_node_type == 'subsection':
- return jsonify({'success': False, 'error': '子小节下不能再创建子节点'})
- else:
- # 没有父节点,只能创建顶级节点(chapter)
- if node_type != 'chapter':
- return jsonify({'success': False, 'error': '顶级节点只能是章节(chapter)'})
-
- # 计算depth
- depth = 1
- if parent_id:
- cursor.execute("SELECT depth FROM textbook_catalog_nodes_copy1 WHERE id = %s", (parent_id,))
- parent_depth_result = cursor.fetchone()
- if parent_depth_result:
- depth = parent_depth_result['depth'] + 1
-
- # 查询最大id并手动递增(因为id字段可能不是auto_increment)
- cursor.execute("SELECT MAX(id) as max_id FROM textbook_catalog_nodes_copy1")
- max_id_result = cursor.fetchone()
- new_id = (max_id_result['max_id'] or 0) + 1
-
- cursor.execute("""
- INSERT INTO textbook_catalog_nodes_copy1
- (id, textbook_id, parent_id, node_type, title, display_no, depth, sort_order, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s, 0, NOW(), NOW())
- """, (new_id, textbook_id, parent_id, node_type, title, display_no, depth))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '创建成功', 'id': new_id})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/catalog/update/<int:node_id>', methods=['POST'])
- def api_textbook_catalog_update(node_id):
- """更新目录节点"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- title = (data.get('title') or '').strip()
- display_no = (data.get('display_no') or '').strip() or None
- node_type = (data.get('node_type') or '').strip() or None
-
- if not title:
- return jsonify({'success': False, 'error': '节点标题不能为空'})
-
- # 如果更新了节点类型,需要验证是否符合层级关系
- if node_type:
- # 获取当前节点的父节点信息
- cursor.execute("SELECT parent_id FROM textbook_catalog_nodes_copy1 WHERE id = %s", (node_id,))
- current_node = cursor.fetchone()
- if current_node and current_node.get('parent_id'):
- parent_id = current_node['parent_id']
- cursor.execute("SELECT node_type FROM textbook_catalog_nodes_copy1 WHERE id = %s", (parent_id,))
- parent = cursor.fetchone()
- if parent:
- parent_node_type = parent['node_type']
- # 验证节点类型层级关系
- if parent_node_type == 'chapter' and node_type != 'section':
- return jsonify({'success': False, 'error': '章节下只能创建小节(section)'})
- elif parent_node_type == 'section' and node_type != 'subsection':
- return jsonify({'success': False, 'error': '小节下只能创建子小节(subsection)'})
- elif parent_node_type == 'subsection':
- return jsonify({'success': False, 'error': '子小节下不能再创建子节点'})
- else:
- # 没有父节点,只能是顶级节点(chapter)
- if node_type != 'chapter':
- return jsonify({'success': False, 'error': '顶级节点只能是章节(chapter)'})
-
- cursor.execute("""
- UPDATE textbook_catalog_nodes_copy1
- SET title = %s, display_no = %s, node_type = COALESCE(%s, node_type), updated_at = NOW()
- WHERE id = %s
- """, (title, display_no, node_type, node_id))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '更新成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/catalog/delete/<int:node_id>', methods=['POST'])
- def api_textbook_catalog_delete(node_id):
- """删除目录节点"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 检查是否有子节点
- cursor.execute("SELECT COUNT(*) as count FROM textbook_catalog_nodes_copy1 WHERE parent_id = %s", (node_id,))
- result = cursor.fetchone()
- if result['count'] > 0:
- return jsonify({'success': False, 'error': '该节点下存在子节点,无法删除'})
-
- # 检查是否有关联的知识点
- cursor.execute("SELECT COUNT(*) as count FROM textbook_chapter_knowledge_relation_copy1 WHERE catalog_chapter_id = %s AND is_deleted = 0", (node_id,))
- result = cursor.fetchone()
- if result['count'] > 0:
- return jsonify({'success': False, 'error': '该节点下存在关联的知识点,无法删除'})
-
- cursor.execute("DELETE FROM textbook_catalog_nodes_copy1 WHERE id = %s", (node_id,))
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '删除成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/catalog/get/<int:node_id>', methods=['GET'])
- def api_textbook_catalog_get(node_id):
- """获取目录节点详情"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("SELECT * FROM textbook_catalog_nodes_copy1 WHERE id = %s", (node_id,))
- node = cursor.fetchone()
- conn.close()
-
- if not node:
- return jsonify({'success': False, 'error': '节点不存在'})
-
- return jsonify({'success': True, 'data': node})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/catalog/tree/<int:textbook_id>', methods=['GET'])
- def api_textbook_catalog_tree(textbook_id):
- """获取教材的目录树"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 获取所有节点
- cursor.execute("""
- SELECT * FROM textbook_catalog_nodes_copy1
- WHERE textbook_id = %s
- ORDER BY depth ASC, sort_order ASC, id ASC
- """, (textbook_id,))
- all_nodes = cursor.fetchall()
-
- # 构建树形结构
- node_dict = {node['id']: node for node in all_nodes}
- root_nodes = []
-
- for node in all_nodes:
- node['children'] = []
- if node['parent_id'] and node['parent_id'] in node_dict:
- parent = node_dict[node['parent_id']]
- parent['children'].append(node)
- else:
- root_nodes.append(node)
-
- conn.close()
- return jsonify({'success': True, 'data': root_nodes})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- # ==================== 章节-知识点关联 API ====================
- @app.route('/api/textbook/relation/create', methods=['POST'])
- def api_textbook_relation_create():
- """创建章节-知识点关联"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- catalog_chapter_id = data.get('catalog_chapter_id')
- kp_code = (data.get('kp_code') or '').strip()
-
- if not catalog_chapter_id:
- return jsonify({'success': False, 'error': '章节ID不能为空'})
- if not kp_code:
- return jsonify({'success': False, 'error': '知识点代码不能为空'})
-
- # 检查是否已存在
- cursor.execute("""
- SELECT id FROM textbook_chapter_knowledge_relation_copy1
- WHERE catalog_chapter_id = %s AND kp_code = %s AND is_deleted = 0
- """, (catalog_chapter_id, kp_code))
- if cursor.fetchone():
- return jsonify({'success': False, 'error': '该关联已存在'})
-
- cursor.execute("""
- INSERT INTO textbook_chapter_knowledge_relation_copy1
- (catalog_chapter_id, kp_code, is_deleted, gmt_create, gmt_modified)
- VALUES (%s, %s, 0, NOW(), NOW())
- """, (catalog_chapter_id, kp_code))
-
- conn.commit()
- new_id = cursor.lastrowid
- conn.close()
- return jsonify({'success': True, 'message': '创建成功', 'id': new_id})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/relation/delete/<int:relation_id>', methods=['POST'])
- def api_textbook_relation_delete(relation_id):
- """删除章节-知识点关联(软删除)"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("""
- UPDATE textbook_chapter_knowledge_relation_copy1
- SET is_deleted = 1, gmt_modified = NOW()
- WHERE id = %s
- """, (relation_id,))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '删除成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/textbook/relation/list/<int:catalog_chapter_id>', methods=['GET'])
- def api_textbook_relation_list(catalog_chapter_id):
- """获取章节关联的知识点列表"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("""
- SELECT tckr.*, kp.name as kp_name
- FROM textbook_chapter_knowledge_relation_copy1 tckr
- LEFT JOIN knowledge_points_copy1 kp ON kp.kp_code = tckr.kp_code
- WHERE tckr.catalog_chapter_id = %s AND tckr.is_deleted = 0
- ORDER BY tckr.id ASC
- """, (catalog_chapter_id,))
-
- relations = cursor.fetchall()
- conn.close()
- return jsonify({'success': True, 'data': relations})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/material_management')
- def material_management():
- """资料管理页面"""
- return render_template('material_management.html')
- @app.route('/add_question')
- def add_question_page():
- """显示录入新题目页面"""
- # 从URL参数获取层级信息
- kp_code = request.args.get('kp_code')
- chapter_id = request.args.get('chapter')
- chapter_label = request.args.get('chapter_label', '')
- section_id = request.args.get('section')
- section_label = request.args.get('section_label', '')
- subsection_id = request.args.get('subsection')
- subsection_label = request.args.get('subsection_label', '')
-
- # 解码URL编码的标签
- if chapter_label:
- chapter_label = urllib.parse.unquote(chapter_label)
- if section_label:
- section_label = urllib.parse.unquote(section_label)
- if subsection_label:
- subsection_label = urllib.parse.unquote(subsection_label)
-
- return render_template('add_question.html',
- kp_code=kp_code,
- chapter_id=chapter_id,
- chapter_label=chapter_label,
- section_id=section_id,
- section_label=section_label,
- subsection_id=subsection_id,
- subsection_label=subsection_label)
- @app.route('/edit/<question_code>')
- def edit_page(question_code):
- try:
- conn = get_db_connection()
- except Exception as e:
- return render_db_error(e)
- cursor = conn.cursor(dictionary=True)
- # 使用别名 kp_id AS kp_code 以兼容模板代码
- cursor.execute("SELECT *, kp_id AS kp_code FROM questions_tem WHERE question_code = %s", (question_code,))
- question = cursor.fetchone()
- conn.close()
- return render_template('edit.html', q=question)
- @app.route('/update_question', methods=['POST'])
- def update_question():
- data = request.json
- q_code = data.get('question_code')
-
- if not q_code:
- return jsonify({'success': False, 'error': '缺少 question_code'}), 400
-
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
-
- # 只更新用户实际提供的字段(如果字段在 data 中存在,就更新)
- updates = []
- params = []
-
- if 'stem' in data:
- updates.append("stem=%s")
- params.append(data.get('stem', ''))
-
- if 'options' in data:
- # 如果 options 是空字符串,保存为 NULL
- # 如果 options 是无效 JSON(如 "Invalid value."),也保存为 NULL,避免数据库报错
- options_value = data.get('options')
- if options_value == '' or (isinstance(options_value, str) and options_value.strip().lower() in ['invalid value.', 'null', 'none']):
- options_value = None
- updates.append("options=%s")
- params.append(options_value)
-
- if 'answer' in data:
- updates.append("answer=%s")
- params.append(data.get('answer', ''))
-
- if 'solution' in data:
- updates.append("solution=%s")
- params.append(data.get('solution', ''))
-
- if 'question_type' in data:
- updates.append("question_type=%s")
- params.append(data.get('question_type', ''))
-
- if 'kp_code' in data:
- # 前端参数名是 kp_code,但数据库字段是 kp_id
- updates.append("kp_id=%s")
- params.append(data.get('kp_code'))
-
- if 'difficulty' in data:
- # 处理 difficulty 字段:转换为浮点数,保留两位小数
- difficulty_value = data.get('difficulty')
- if difficulty_value is not None and difficulty_value != '':
- try:
- difficulty_value = round(float(difficulty_value), 2)
- updates.append("difficulty=%s")
- params.append(difficulty_value)
- except (ValueError, TypeError):
- pass # 如果转换失败,跳过该字段
-
- if not updates:
- conn.close()
- return jsonify({'success': False, 'error': '没有提供要更新的字段'}), 400
-
- # 添加 updated_at 时间戳
- updates.append("updated_at=NOW()")
-
- # 添加 WHERE 条件
- params.append(q_code)
- query = f"UPDATE questions_tem SET {', '.join(updates)} WHERE question_code=%s"
- cursor.execute(query, tuple(params))
- conn.commit()
- conn.close()
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/create_question', methods=['POST'])
- def create_question():
- """创建新题目"""
- data = request.json
-
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
-
- # 自动生成唯一的 question_code
- # 格式:Q + 年月日时分秒 + 3位随机数,例如:Q20250101120045123
- max_attempts = 10 # 最多尝试10次生成唯一编号
- q_code = None
-
- for _ in range(max_attempts):
- timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
- random_suffix = random.randint(100, 999) # 3位随机数
- q_code = f"Q{timestamp}{random_suffix}"
-
- # 检查是否已存在
- cursor.execute("SELECT question_code FROM questions_tem WHERE question_code = %s", (q_code,))
- if not cursor.fetchone():
- break # 找到唯一编号
- q_code = None
-
- if not q_code:
- conn.close()
- return jsonify({'success': False, 'error': '无法生成唯一的题号,请稍后重试'}), 500
-
- # 构建插入字段和值
- fields = []
- values = []
- placeholders = []
-
- # 必填字段:question_code(自动生成)
- fields.append('question_code')
- values.append(q_code)
- placeholders.append('%s')
-
- # 可选字段(前端参数 kp_code 保存到数据库的 kp_code 字段)
- optional_fields = {
- 'stem': 'stem',
- 'options': 'options',
- 'answer': 'answer',
- 'solution': 'solution',
- 'question_type': 'question_type',
- 'kp_code': 'kp_code', # 前端参数名 kp_code,保存到数据库的 kp_code 字段(与查询保持一致)
- 'textbook_catalog_nodes_id': 'textbook_catalog_nodes_id',
- 'chapter': 'title_1', # chapter 映射到 title_1
- 'section': 'title_2', # section 映射到 title_2
- 'subsection': 'title_3', # subsection 映射到 title_3
- 'difficulty': 'difficulty', # 难度字段
- 'create_by': 'create_by', # 创建者字段
- 'grade': 'grade', # 年级字段:1=小学,2=初中,3=高中
- }
-
- for key, field_name in optional_fields.items():
- if key in data and data[key] is not None:
- value = data[key]
- # 处理 options:如果是空字符串,设为 None
- if key == 'options' and (value == '' or (isinstance(value, str) and value.strip().lower() in ['null', 'none'])):
- value = None
- # 处理 textbook_catalog_nodes_id:转换为整数
- elif key == 'textbook_catalog_nodes_id' and value:
- try:
- value = int(value)
- except (ValueError, TypeError):
- value = None
- # 处理 difficulty:转换为浮点数,保留两位小数
- elif key == 'difficulty' and value:
- try:
- value = round(float(value), 2)
- except (ValueError, TypeError):
- value = None
- # 处理 grade:转换为整数(1=小学,2=初中,3=高中)
- elif key == 'grade' and value:
- try:
- value = int(value)
- # 验证年级值是否在有效范围内
- if value not in [1, 2, 3]:
- value = None
- except (ValueError, TypeError):
- value = None
-
- if value is not None and value != '':
- fields.append(field_name)
- values.append(value)
- placeholders.append('%s')
-
- # 验证:所有题目都必须有年级字段
- has_grade = 'grade' in fields
- if not has_grade:
- conn.close()
- return jsonify({'success': False, 'error': '所有题目都必须选择年级'}), 400
-
- # 添加时间戳字段
- fields.append('created_at')
- fields.append('updated_at')
- values.append(datetime.datetime.now())
- values.append(datetime.datetime.now())
- placeholders.append('%s')
- placeholders.append('%s')
-
- # 执行插入
- if len(fields) == 3: # 只有 question_code, created_at, updated_at
- conn.close()
- return jsonify({'success': False, 'error': '至少需要填写题号以外的其他字段'}), 400
-
- query = f"INSERT INTO questions_tem ({', '.join(fields)}) VALUES ({', '.join(placeholders)})"
- cursor.execute(query, tuple(values))
- # 获取插入的题目ID
- question_id = cursor.lastrowid
- conn.commit()
- conn.close()
-
- return jsonify({'success': True, 'question_code': q_code, 'question_id': question_id})
- except Exception as e:
- try:
- conn.rollback()
- except:
- pass
- try:
- conn.close()
- except:
- pass
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/question_by_id/<int:question_id>')
- def api_question_by_id(question_id):
- """根据题目ID返回题目详情(用于查重比对)"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 查询题目详情
- cursor.execute("""
- SELECT
- id,
- question_code,
- stem,
- kp_code,
- kp_id,
- audit_reason,
- audit_status,
- difficulty,
- question_type,
- answer,
- solution,
- options
- FROM questions_tem
- WHERE id = %s
- """, (question_id,))
-
- question = cursor.fetchone()
- conn.close()
-
- if not question:
- return jsonify({'success': False, 'error': '题目不存在'}), 404
-
- # 处理选项JSON
- if question.get('options'):
- try:
- opt_data = question['options']
- if isinstance(opt_data, str):
- opt_data = opt_data.replace("'", '"')
- question['options'] = json.loads(opt_data)
- except:
- question['options'] = {}
-
- return jsonify({'success': True, 'question': question})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)}), 500
- @app.route('/api/check_duplicate', methods=['POST', 'OPTIONS'])
- def api_check_duplicate():
- """代理查重检测接口,解决CORS问题"""
- if request.method == 'OPTIONS':
- # 处理CORS预检请求
- response = jsonify({})
- response.headers.add('Access-Control-Allow-Origin', '*')
- response.headers.add('Access-Control-Allow-Methods', 'POST, OPTIONS')
- response.headers.add('Access-Control-Allow-Headers', 'Content-Type')
- return response
-
- try:
- # 获取请求数据
- data = request.get_json()
-
- # 转发请求到查重服务
- import urllib.request
- import urllib.parse
-
- url = 'http://47.77.199.85:8888/api/check_duplicate'
- req_data = json.dumps(data).encode('utf-8')
-
- req = urllib.request.Request(
- url,
- data=req_data,
- headers={
- 'Content-Type': 'application/json',
- 'Cookie': request.headers.get('Cookie', 'MATH-LOGIN-AUTH={{authToken}}'),
- 'MATH-DEBUG': request.headers.get('MATH-DEBUG', '1'),
- 'MATH-UID': request.headers.get('MATH-UID', '12')
- }
- )
-
- with urllib.request.urlopen(req, timeout=30) as response:
- result = json.loads(response.read().decode('utf-8'))
-
- # 返回结果,添加CORS头
- flask_response = jsonify(result)
- flask_response.headers.add('Access-Control-Allow-Origin', '*')
- return flask_response
-
- except urllib.error.HTTPError as e:
- error_body = e.read().decode('utf-8') if e.fp else 'Unknown error'
- return jsonify({'code': -1, 'error': f'查重服务错误: {e.code} - {error_body}'}), e.code
- except Exception as e:
- return jsonify({'code': -1, 'error': f'查重检测失败: {str(e)}'}), 500
- @app.route('/api/confirm_repeat', methods=['POST', 'OPTIONS'])
- def api_confirm_repeat():
- """代理确认查重结果接口,解决CORS问题"""
- if request.method == 'OPTIONS':
- # 处理CORS预检请求
- response = jsonify({})
- response.headers.add('Access-Control-Allow-Origin', '*')
- response.headers.add('Access-Control-Allow-Methods', 'POST, OPTIONS')
- response.headers.add('Access-Control-Allow-Headers', 'Content-Type')
- return response
-
- try:
- # 获取请求数据
- data = request.get_json()
-
- # 转发请求到查重服务
- import urllib.request
-
- url = 'http://47.77.199.85:8888/api/confirm_repeat'
- req_data = json.dumps(data).encode('utf-8')
-
- req = urllib.request.Request(
- url,
- data=req_data,
- headers={
- 'Content-Type': 'application/json',
- 'Cookie': request.headers.get('Cookie', 'MATH-LOGIN-AUTH={{authToken}}'),
- 'MATH-DEBUG': request.headers.get('MATH-DEBUG', '1'),
- 'MATH-UID': request.headers.get('MATH-UID', '12')
- }
- )
-
- with urllib.request.urlopen(req, timeout=30) as response:
- result = json.loads(response.read().decode('utf-8'))
-
- # 返回结果,添加CORS头
- flask_response = jsonify(result)
- flask_response.headers.add('Access-Control-Allow-Origin', '*')
- return flask_response
-
- except urllib.error.HTTPError as e:
- error_body = e.read().decode('utf-8') if e.fp else 'Unknown error'
- return jsonify({'code': -1, 'error': f'查重服务错误: {e.code} - {error_body}'}), e.code
- except Exception as e:
- return jsonify({'code': -1, 'error': f'确认查重结果失败: {str(e)}'}), 500
- @app.route('/api/delete_question/<question_code>', methods=['POST'])
- def delete_question(question_code):
- """
- 删除题目(需要确认),返回下一题的 question_code(用于跳转)
- """
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 先获取题目信息(包括知识点和教材节点,使用 kp_id 字段)
- cursor.execute("SELECT question_code, kp_id, textbook_catalog_nodes_id FROM questions_tem WHERE question_code = %s", (question_code,))
- row = cursor.fetchone()
- if not row:
- conn.close()
- return jsonify({'success': False, 'error': '题目不存在'}), 404
-
- kp_code = row.get('kp_id') # 数据库字段已改为 kp_id
- textbook_node_id = row.get('textbook_catalog_nodes_id')
-
- # 获取同分类下的所有题目(按 question_code 降序排列)
- all_codes = []
- if kp_code:
- # 有知识点:在同知识点内查找
- cursor.execute("SELECT question_code FROM questions_tem WHERE kp_id = %s ORDER BY question_code DESC", (kp_code,))
- all_codes = [r['question_code'] for r in cursor.fetchall()]
- elif textbook_node_id:
- # 有教材节点:在同教材节点内查找
- cursor.execute(
- "SELECT question_code FROM questions_tem WHERE textbook_catalog_nodes_id = %s ORDER BY question_code DESC",
- (textbook_node_id,)
- )
- all_codes = [r['question_code'] for r in cursor.fetchall()]
-
- # 找到当前题目在列表中的位置
- try:
- curr_idx = all_codes.index(question_code)
- except ValueError:
- curr_idx = -1
-
- # 删除题目
- cursor.execute("DELETE FROM questions_tem WHERE question_code = %s", (question_code,))
- conn.commit()
-
- # 确定跳转目标:优先下一题,没有则上一题,都没有则返回 None(前端跳转列表)
- next_code = None
- if curr_idx >= 0 and curr_idx < len(all_codes) - 1:
- # 有下一题
- next_code = all_codes[curr_idx + 1]
- elif curr_idx > 0:
- # 没有下一题,但有上一题
- next_code = all_codes[curr_idx - 1]
-
- conn.close()
- return jsonify({
- 'success': True,
- 'message': '题目已删除',
- 'next_code': next_code,
- 'kp_code': kp_code,
- 'node_id': str(textbook_node_id) if textbook_node_id else None
- })
- except Exception as e:
- try:
- conn.rollback()
- except Exception:
- pass
- return jsonify({'success': False, 'error': str(e)}), 500
- finally:
- try:
- conn.close()
- except Exception:
- pass
- # 知识点管理 API(knowledge_points_copy1 表)
- @app.route('/api/kp/create', methods=['POST'])
- def api_kp_create():
- """创建知识点"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- kp_code = (data.get('kp_code') or '').strip()
- name = (data.get('name') or '').strip()
- subject = (data.get('subject') or '').strip() or None
- grade = (data.get('grade') or '').strip() or None
- parent_kp_code = (data.get('parent_kp_code') or '').strip() or None
-
- if not kp_code or not name:
- return jsonify({'success': False, 'error': '知识点代码和名称不能为空'})
-
- # 检查kp_code是否已存在
- cursor.execute("SELECT id FROM knowledge_points_copy1 WHERE kp_code = %s", (kp_code,))
- if cursor.fetchone():
- return jsonify({'success': False, 'error': '知识点代码已存在'})
-
- # 查询最大id并手动递增(确保id正确自增)
- cursor.execute("SELECT MAX(id) as max_id FROM knowledge_points_copy1")
- max_id_result = cursor.fetchone()
- new_id = (max_id_result['max_id'] or 0) + 1
-
- # 插入新知识点
- cursor.execute("""
- INSERT INTO knowledge_points_copy1
- (id, kp_code, name, subject, grade, parent_kp_code, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (new_id, kp_code, name, subject, grade, parent_kp_code))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '创建成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/kp/update/<int:kp_id>', methods=['POST'])
- def api_kp_update(kp_id):
- """更新知识点"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- data = request.get_json()
- name = (data.get('name') or '').strip()
- subject = (data.get('subject') or '').strip() or None
- grade = (data.get('grade') or '').strip() or None
- parent_kp_code = (data.get('parent_kp_code') or '').strip() or None
-
- if not name:
- return jsonify({'success': False, 'error': '知识点名称不能为空'})
-
- # 更新知识点
- cursor.execute("""
- UPDATE knowledge_points_copy1
- SET name = %s, subject = %s, grade = %s, parent_kp_code = %s, updated_at = NOW()
- WHERE id = %s
- """, (name, subject, grade, parent_kp_code, kp_id))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '更新成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/kp/delete/<int:kp_id>', methods=['POST'])
- def api_kp_delete(kp_id):
- """删除知识点"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- # 检查是否有子知识点
- cursor.execute("SELECT id FROM knowledge_points_copy1 WHERE parent_kp_code = (SELECT kp_code FROM knowledge_points_copy1 WHERE id = %s)", (kp_id,))
- if cursor.fetchone():
- return jsonify({'success': False, 'error': '该知识点下存在子知识点,无法删除'})
-
- # 删除知识点
- cursor.execute("DELETE FROM knowledge_points_copy1 WHERE id = %s", (kp_id,))
-
- conn.commit()
- conn.close()
- return jsonify({'success': True, 'message': '删除成功'})
- except Exception as e:
- if conn:
- conn.rollback()
- conn.close()
- return jsonify({'success': False, 'error': str(e)})
- @app.route('/api/kp/get/<int:kp_id>', methods=['GET'])
- def api_kp_get(kp_id):
- """获取单个知识点详情"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
-
- cursor.execute("""
- SELECT * FROM knowledge_points_copy1 WHERE id = %s
- """, (kp_id,))
-
- kp = cursor.fetchone()
- conn.close()
-
- if not kp:
- return jsonify({'success': False, 'error': '知识点不存在'})
-
- # 处理JSON字段
- for field in ['prerequisite_kp_codes', 'dependent_kp_codes', 'related_kp_codes', 'stats', 'skills', 'direct_score', 'related_score']:
- if kp[field] and isinstance(kp[field], str):
- try:
- kp[field] = json.loads(kp[field])
- except:
- kp[field] = None
-
- return jsonify({'success': True, 'data': kp})
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- if __name__ == '__main__':
- # 自动创建缺失列
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- for field, ftype in [('audit_status', 'TINYINT DEFAULT 0'), ('audit_reason', 'TEXT')]:
- cursor.execute(f"SHOW COLUMNS FROM questions_tem LIKE '{field}'")
- if not cursor.fetchone():
- cursor.execute(f"ALTER TABLE questions_tem ADD COLUMN {field} {ftype}")
- conn.commit()
- conn.close()
- except: pass
-
- # Ensure static directory exists and copy cat images (if exists in root directory)
- try:
- static_dir = resource_path("static")
- if not os.path.exists(static_dir):
- os.makedirs(static_dir, exist_ok=True)
-
- import shutil
- # Prefer PNG format, fallback to JPG if PNG doesn't exist (for backward compatibility)
- cat_img_src_png = os.path.join(BASE_DIR, "cat.png")
- cat_img_dst_png = os.path.join(static_dir, "cat.png")
- cat_img_src_jpg = os.path.join(BASE_DIR, "cat.jpg")
- cat_img_dst_jpg = os.path.join(static_dir, "cat.jpg")
-
- if os.path.exists(cat_img_src_png) and not os.path.exists(cat_img_dst_png):
- shutil.copy2(cat_img_src_png, cat_img_dst_png)
- elif os.path.exists(cat_img_src_jpg) and not os.path.exists(cat_img_dst_jpg):
- shutil.copy2(cat_img_src_jpg, cat_img_dst_jpg)
- except Exception:
- pass # 如果复制失败,不影响启动
-
- app.run(host="0.0.0.0", port=5000, debug=True)
|