ProcessMarkdownSplit.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. <?php
  2. namespace App\Jobs;
  3. use App\Models\MarkdownImport;
  4. use App\Models\PreQuestionCandidate;
  5. use App\Services\AsyncMarkdownSplitter;
  6. use App\Jobs\ProcessMarkdownCandidateBatch;
  7. use Illuminate\Bus\Queueable;
  8. use Illuminate\Contracts\Queue\ShouldQueue;
  9. use Illuminate\Foundation\Bus\Dispatchable;
  10. use Illuminate\Queue\InteractsWithQueue;
  11. use Illuminate\Queue\SerializesModels;
  12. use Illuminate\Support\Facades\DB;
  13. use Illuminate\Support\Facades\Log;
  14. class ProcessMarkdownSplit implements ShouldQueue
  15. {
  16. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  17. public int $timeout = 300; // 5分钟超时
  18. public int $tries = 3;
  19. /**
  20. * Create a new job instance.
  21. */
  22. public function __construct(
  23. public int $markdownImportId
  24. ) {
  25. //
  26. }
  27. /**
  28. * Execute the job.
  29. */
  30. public function handle(AsyncMarkdownSplitter $splitter): void
  31. {
  32. try {
  33. // 获取 Markdown 导入记录
  34. $markdownImport = MarkdownImport::find($this->markdownImportId);
  35. if (!$markdownImport) {
  36. Log::error('MarkdownImport not found', [
  37. 'id' => $this->markdownImportId
  38. ]);
  39. return;
  40. }
  41. // 更新状态为处理中
  42. $markdownImport->update([
  43. 'status' => 'processing',
  44. 'progress_stage' => MarkdownImport::STAGE_SPLITTING,
  45. 'progress_message' => '开始拆题…',
  46. 'progress_current' => 0,
  47. 'progress_total' => 0,
  48. 'progress_updated_at' => now(),
  49. 'processing_started_at' => $markdownImport->processing_started_at ?? now(),
  50. 'processing_finished_at' => null,
  51. 'error_message' => null,
  52. ]);
  53. Log::info('Starting Markdown split (orchestrator)', [
  54. 'id' => $this->markdownImportId
  55. ]);
  56. $blocks = $splitter->split($markdownImport->original_markdown);
  57. $splitter->validate($blocks);
  58. Log::info('Markdown split completed', [
  59. 'id' => $this->markdownImportId,
  60. 'blocks_count' => count($blocks),
  61. ]);
  62. $markdownImport->update([
  63. 'progress_total' => count($blocks),
  64. 'progress_current' => 0,
  65. 'progress_updated_at' => now(),
  66. ]);
  67. if (empty($blocks)) {
  68. Log::warning('No candidates found from Markdown parsing', [
  69. 'id' => $this->markdownImportId
  70. ]);
  71. $markdownImport->update([
  72. 'status' => 'failed',
  73. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  74. 'progress_message' => '未解析出任何候选题',
  75. 'progress_updated_at' => now(),
  76. 'processing_finished_at' => now(),
  77. 'error_message' => 'No candidates found'
  78. ]);
  79. return;
  80. }
  81. Log::info('Markdown split done, seeding candidates to database', [
  82. 'id' => $this->markdownImportId,
  83. 'blocks_count' => count($blocks),
  84. ]);
  85. // 写入候选题到 pre_question_candidates 表(仅 raw_markdown + 顺序;AI 解析交给后续 batch job)
  86. DB::beginTransaction();
  87. try {
  88. $markdownImport->update([
  89. 'progress_stage' => MarkdownImport::STAGE_WRITING,
  90. 'progress_message' => '写入拆题结果…',
  91. 'progress_updated_at' => now(),
  92. ]);
  93. // 不删除历史数据:将旧记录标记为 superseded,避免重跑时混淆
  94. PreQuestionCandidate::where('import_id', $this->markdownImportId)->update([
  95. 'status' => 'superseded',
  96. ]);
  97. foreach ($blocks as $block) {
  98. $candidateIndex = (int) ($block['index'] ?? 0);
  99. $sequence = (int) ($block['sequence'] ?? 0);
  100. PreQuestionCandidate::updateOrCreate(
  101. [
  102. 'import_id' => $this->markdownImportId,
  103. // 用 sequence 做唯一键,避免题号重复导致覆盖丢题
  104. 'sequence' => $sequence,
  105. ],
  106. [
  107. 'index' => $candidateIndex,
  108. 'raw_markdown' => (string) ($block['raw_markdown'] ?? ''),
  109. 'stem' => null,
  110. 'options' => null,
  111. 'images' => [],
  112. 'tables' => [],
  113. 'is_question_candidate' => false,
  114. 'ai_confidence' => null,
  115. 'status' => 'ai_pending',
  116. ]
  117. );
  118. }
  119. DB::commit();
  120. Log::info('Successfully wrote candidates to pre_question_candidates', [
  121. 'id' => $this->markdownImportId,
  122. 'candidates_count' => count($blocks)
  123. ]);
  124. } catch (\Exception $e) {
  125. DB::rollBack();
  126. throw $e;
  127. }
  128. // 进入并发 AI 解析阶段(方案 A:子 Job 批处理 + 多 worker 并行)
  129. $markdownImport->update([
  130. 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
  131. 'progress_message' => 'AI 解析中…',
  132. 'progress_current' => 0,
  133. 'progress_updated_at' => now(),
  134. ]);
  135. $total = count($blocks);
  136. $batchSize = 10; // 每批处理 10 题(并发由 worker 数控制)
  137. $batches = (int) ceil($total / $batchSize);
  138. for ($b = 0; $b < $batches; $b++) {
  139. $startSeq = ($b * $batchSize) + 1;
  140. $endSeq = min(($b + 1) * $batchSize, $total);
  141. ProcessMarkdownCandidateBatch::dispatch($this->markdownImportId, $startSeq, $endSeq);
  142. }
  143. Log::info('Markdown AI parsing batches dispatched', [
  144. 'id' => $this->markdownImportId,
  145. 'total_blocks' => $total,
  146. 'batch_size' => $batchSize,
  147. 'batches' => $batches,
  148. ]);
  149. } catch (\Exception $e) {
  150. Log::error('Markdown split and AI analysis failed', [
  151. 'id' => $this->markdownImportId,
  152. 'error' => $e->getMessage(),
  153. 'trace' => $e->getTraceAsString()
  154. ]);
  155. // 更新状态为失败
  156. MarkdownImport::where('id', $this->markdownImportId)->update([
  157. 'status' => 'failed',
  158. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  159. 'progress_message' => '解析失败',
  160. 'progress_updated_at' => now(),
  161. 'processing_finished_at' => now(),
  162. 'error_message' => $e->getMessage()
  163. ]);
  164. }
  165. }
  166. /**
  167. * Handle a job failure.
  168. */
  169. public function failed(\Throwable $exception): void
  170. {
  171. Log::error('Markdown split job failed', [
  172. 'id' => $this->markdownImportId,
  173. 'error' => $exception->getMessage()
  174. ]);
  175. // 更新状态为失败
  176. MarkdownImport::where('id', $this->markdownImportId)->update([
  177. 'status' => 'failed',
  178. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  179. 'progress_message' => '任务执行失败',
  180. 'progress_updated_at' => now(),
  181. 'processing_finished_at' => now(),
  182. 'error_message' => $exception->getMessage()
  183. ]);
  184. }
  185. }