ProcessMarkdownCandidateBatch.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. <?php
  2. namespace App\Jobs;
  3. use App\Models\MarkdownImport;
  4. use App\Models\PreQuestionCandidate;
  5. use App\Services\MarkdownQuestionParser;
  6. use App\Services\PdfStorageService;
  7. use Illuminate\Bus\Queueable;
  8. use Illuminate\Contracts\Queue\ShouldQueue;
  9. use Illuminate\Foundation\Bus\Dispatchable;
  10. use Illuminate\Queue\InteractsWithQueue;
  11. use Illuminate\Queue\SerializesModels;
  12. use Illuminate\Support\Facades\DB;
  13. use Illuminate\Support\Facades\Log;
  14. class ProcessMarkdownCandidateBatch implements ShouldQueue
  15. {
  16. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  17. public int $timeout = 300; // 5分钟超时
  18. public int $tries = 3;
  19. public function __construct(
  20. public int $markdownImportId,
  21. public int $sequenceStart,
  22. public int $sequenceEnd
  23. ) {
  24. //
  25. }
  26. public function handle(MarkdownQuestionParser $parser, PdfStorageService $uploader): void
  27. {
  28. $import = MarkdownImport::find($this->markdownImportId);
  29. if (!$import) {
  30. Log::error('MarkdownImport not found for batch', [
  31. 'id' => $this->markdownImportId,
  32. 'sequence_start' => $this->sequenceStart,
  33. 'sequence_end' => $this->sequenceEnd,
  34. ]);
  35. return;
  36. }
  37. Log::info('Markdown batch started', [
  38. 'import_id' => $this->markdownImportId,
  39. 'sequence_start' => $this->sequenceStart,
  40. 'sequence_end' => $this->sequenceEnd,
  41. ]);
  42. $records = PreQuestionCandidate::query()
  43. ->where('import_id', $this->markdownImportId)
  44. ->whereBetween('sequence', [$this->sequenceStart, $this->sequenceEnd])
  45. ->orderBy('sequence')
  46. ->get();
  47. $processed = 0;
  48. $failed = 0;
  49. $totalRecords = $records->count();
  50. // 初始化进度
  51. $this->refreshProgress();
  52. foreach ($records as $index => $record) {
  53. try {
  54. $meta = $record->meta ?? [];
  55. if (!empty($meta['ai_parsed'])) {
  56. $processed++;
  57. continue;
  58. }
  59. $existingConfidence = $record->confidence ?? $record->ai_confidence;
  60. // 置信度高(>=0.85)时跳过再次 AI 解析,直接计入进度
  61. if ($existingConfidence !== null && (float) $existingConfidence >= 0.85) {
  62. $this->markParsed($record);
  63. $processed++;
  64. continue;
  65. }
  66. // 快速过滤卷子/区块标题,避免误判为题目再次走 AI
  67. if (!$this->isLikelyQuestion((string) $record->raw_markdown)) {
  68. // 标记为已过滤,但不标记为已解析
  69. $meta['ai_parsed'] = true;
  70. $meta['ai_parsed_at'] = now()->toDateTimeString();
  71. $meta['filtered_out'] = true; // 添加过滤标记
  72. $record->update([
  73. 'is_question_candidate' => false,
  74. 'ai_confidence' => 0.0,
  75. 'confidence' => 0.0,
  76. 'is_valid_question' => false,
  77. 'status' => 'rejected',
  78. 'meta' => $meta,
  79. ]);
  80. $processed++;
  81. continue;
  82. }
  83. // 已经处理过的不重复处理
  84. if (in_array($record->status, ['pending', 'reviewed', 'accepted', 'rejected'], true) && $record->stem !== null) {
  85. $this->markParsed($record);
  86. continue;
  87. }
  88. $parsed = $parser->parseRawMarkdown((string) $record->raw_markdown, (int) $record->index);
  89. $meta = $record->meta ?? [];
  90. $meta['ai_parsed'] = true;
  91. $meta['ai_parsed_at'] = now()->toDateTimeString();
  92. // 结构化后立即回传图片
  93. $uploadedImages = [];
  94. if (!empty($parsed['images'])) {
  95. foreach ($parsed['images'] as $idx => $imgUrl) {
  96. $path = "imports/images/{$record->id}_{$idx}.jpg";
  97. $uploadedImages[] = $uploader->put($path, (string)@file_get_contents($imgUrl)) ?: $imgUrl;
  98. }
  99. }
  100. $meta['images_uploaded'] = !empty($uploadedImages);
  101. $record->update([
  102. 'stem' => $parsed['stem'] ?? null,
  103. 'options' => $parsed['options'] ?? null,
  104. 'images' => !empty($uploadedImages) ? $uploadedImages : ($parsed['images'] ?? []),
  105. 'tables' => $parsed['tables'] ?? [],
  106. 'is_question_candidate' => (bool) ($parsed['is_question_candidate'] ?? false),
  107. 'ai_confidence' => $parsed['ai_confidence'] ?? null,
  108. 'status' => 'pending',
  109. 'meta' => $meta,
  110. ]);
  111. $processed++;
  112. // 每处理5个记录就更新一次进度,避免过于频繁
  113. if (($index + 1) % 5 === 0 || ($index + 1) === $totalRecords) {
  114. $this->refreshProgress();
  115. }
  116. } catch (\Throwable $e) {
  117. $failed++;
  118. Log::error('Markdown batch item failed', [
  119. 'import_id' => $this->markdownImportId,
  120. 'candidate_id' => $record->id,
  121. 'sequence' => $record->sequence,
  122. 'index' => $record->index,
  123. 'error' => $e->getMessage(),
  124. 'trace' => $e->getTraceAsString(),
  125. ]);
  126. // 失败时也更新进度
  127. if (($index + 1) % 5 === 0 || ($index + 1) === $totalRecords) {
  128. $this->refreshProgress();
  129. }
  130. }
  131. }
  132. $this->refreshProgress();
  133. Log::info('Markdown batch finished', [
  134. 'import_id' => $this->markdownImportId,
  135. 'sequence_start' => $this->sequenceStart,
  136. 'sequence_end' => $this->sequenceEnd,
  137. 'processed' => $processed,
  138. 'failed' => $failed,
  139. ]);
  140. $this->finalizeIfDone();
  141. }
  142. /**
  143. * 任务失败时的处理
  144. */
  145. public function failed(\Throwable $exception): void
  146. {
  147. Log::error('Markdown batch job failed permanently', [
  148. 'import_id' => $this->markdownImportId,
  149. 'sequence_start' => $this->sequenceStart,
  150. 'sequence_end' => $this->sequenceEnd,
  151. 'attempts' => $this->attempts(),
  152. 'error' => $exception->getMessage(),
  153. 'trace' => $exception->getTraceAsString(),
  154. ]);
  155. // 标记导入记录为失败状态,避免卡住
  156. $import = MarkdownImport::find($this->markdownImportId);
  157. if ($import) {
  158. $import->update([
  159. 'status' => MarkdownImport::STATUS_FAILED,
  160. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  161. 'progress_message' => 'AI 解析任务失败',
  162. 'error_message' => '队列任务执行失败,已超过最大重试次数',
  163. 'processing_finished_at' => now(),
  164. ]);
  165. }
  166. }
  167. private function finalizeIfDone(): void
  168. {
  169. $import = MarkdownImport::find($this->markdownImportId);
  170. if (!$import) {
  171. return;
  172. }
  173. // 统一使用与 refreshProgress 相同的查询逻辑
  174. [$total, $parsed, $batchInfo] = $this->calculateProgress();
  175. // 只有当所有候选题都已处理(解析或过滤)完成时才更新状态
  176. if ($total > 0 && $parsed >= $total) {
  177. $updated = $import->update([
  178. 'status' => MarkdownImport::STATUS_PARSED,
  179. 'progress_stage' => MarkdownImport::STAGE_PARSED,
  180. 'progress_message' => "解析完成,等待人工校对 ({$parsed}/{$total})",
  181. 'progress_total' => $total,
  182. 'progress_current' => $parsed,
  183. 'progress_updated_at' => now(),
  184. 'processing_finished_at' => now(),
  185. ]);
  186. if ($updated) {
  187. Log::info('Markdown import finalized', [
  188. 'import_id' => $this->markdownImportId,
  189. 'total_candidates' => $total,
  190. 'parsed_candidates' => $parsed,
  191. ]);
  192. }
  193. }
  194. }
  195. /**
  196. * 计算进度:返回 [总数量, 已处理数量]
  197. */
  198. private function calculateProgress(): array
  199. {
  200. // 总候选题数(排除被过滤的和已废弃的)
  201. $total = PreQuestionCandidate::query()
  202. ->where('import_id', $this->markdownImportId)
  203. ->where('status', '!=', PreQuestionCandidate::STATUS_SUPERSEDED)
  204. ->where(function ($query) {
  205. $query->whereNull('meta')
  206. ->orWhereRaw("JSON_UNQUOTE(JSON_EXTRACT(meta, '$.filtered_out')) != 'true'");
  207. })
  208. ->count();
  209. // 真正完成AI解析的判断:有 stem 字段且不为空,或有有效的 ai_confidence
  210. // 或已经被过滤 (filtered_out=true)
  211. $parsed = PreQuestionCandidate::query()
  212. ->where('import_id', $this->markdownImportId)
  213. ->where('status', '!=', PreQuestionCandidate::STATUS_SUPERSEDED)
  214. ->where(function ($query) {
  215. $query->whereNull('meta')
  216. ->orWhereRaw("JSON_UNQUOTE(JSON_EXTRACT(meta, '$.filtered_out')) != 'true'");
  217. })
  218. ->where(function ($query) {
  219. $query->where(function ($q) {
  220. // 有题目内容
  221. $q->whereNotNull('stem')
  222. ->where('stem', '!=', '');
  223. })->orWhere(function ($q) {
  224. // 或有有效的AI置信度
  225. $q->whereNotNull('ai_confidence')
  226. ->where('ai_confidence', '>', 0);
  227. });
  228. })
  229. ->count();
  230. // 计算当前正在处理的批次信息
  231. $batchInfo = sprintf(
  232. '批次 %d-%d',
  233. $this->sequenceStart,
  234. $this->sequenceEnd
  235. );
  236. return [$total, $parsed, $batchInfo];
  237. }
  238. private function refreshProgress(): void
  239. {
  240. [$total, $parsed, $batchInfo] = $this->calculateProgress();
  241. // 计算有stem但AI置信度为0的数量(可能是非题目被错误解析)
  242. $stemOnlyCount = PreQuestionCandidate::query()
  243. ->where('import_id', $this->markdownImportId)
  244. ->where('status', '!=', PreQuestionCandidate::STATUS_SUPERSEDED)
  245. ->where(function ($query) {
  246. $query->whereNull('meta')
  247. ->orWhereRaw("JSON_UNQUOTE(JSON_EXTRACT(meta, '$.filtered_out')) != 'true'");
  248. })
  249. ->whereNotNull('stem')
  250. ->where('stem', '!=', '')
  251. ->where(function ($query) {
  252. $query->whereNull('ai_confidence')
  253. ->orWhere('ai_confidence', '=', 0);
  254. })
  255. ->count();
  256. // 被过滤的记录数
  257. $filteredCount = PreQuestionCandidate::query()
  258. ->where('import_id', $this->markdownImportId)
  259. ->where('status', '!=', PreQuestionCandidate::STATUS_SUPERSEDED)
  260. ->whereRaw("JSON_UNQUOTE(JSON_EXTRACT(meta, '$.filtered_out')) = 'true'")
  261. ->count();
  262. $import = MarkdownImport::find($this->markdownImportId);
  263. if ($import) {
  264. $import->update([
  265. 'progress_total' => $total,
  266. 'progress_current' => min($parsed, $total),
  267. 'progress_updated_at' => now(),
  268. 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
  269. 'progress_message' => "{$batchInfo} | AI 解析中… {$parsed}/{$total}" .
  270. ($stemOnlyCount > 0 ? " (含{$stemOnlyCount}个待筛选)" : '') .
  271. ($filteredCount > 0 ? " (已过滤{$filteredCount}个非题目)" : ''),
  272. ]);
  273. }
  274. }
  275. private function markParsed(PreQuestionCandidate $record): void
  276. {
  277. // 只有在记录真正有解析结果时才标记为已解析
  278. if (!empty($record->stem) || (!empty($record->ai_confidence) && $record->ai_confidence > 0)) {
  279. $meta = $record->meta ?? [];
  280. if (empty($meta['ai_parsed'])) {
  281. $meta['ai_parsed'] = true;
  282. $meta['ai_parsed_at'] = now()->toDateTimeString();
  283. $record->update(['meta' => $meta]);
  284. }
  285. }
  286. }
  287. /**
  288. * 轻量启发式判断是否像一道题目,过滤卷子/部分标题和说明文字。
  289. */
  290. private function isLikelyQuestion(string $raw): bool
  291. {
  292. $text = trim(strip_tags($raw));
  293. $length = mb_strlen($text);
  294. // Markdown 标题或“卷/部分/说明”且文本很短,视为非题
  295. if (preg_match('/^#+\\s+/m', $raw)) {
  296. return false;
  297. }
  298. if (preg_match('/(第[一二三四五六七八九十IVX]+[卷部分]|题型|说明|试卷)/u', $text) && $length <= 80) {
  299. return false;
  300. }
  301. // 过短且无问句/命令词/选项特征
  302. if ($length < 25 && !preg_match('/[\\??求解求证计算]|[A-D]\\.|(本小题满分/u', $text)) {
  303. return false;
  304. }
  305. return true;
  306. }
  307. }