ProcessMarkdownSplit.php 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. <?php
  2. namespace App\Jobs;
  3. use App\Models\MarkdownImport;
  4. use App\Models\PreQuestionCandidate;
  5. use App\Services\SourceFileParserService;
  6. use App\Services\SourcePaperExtractorService;
  7. use App\Services\PaperPartExtractorService;
  8. use App\Services\QuestionExtractorService;
  9. use App\Jobs\ProcessMarkdownCandidateBatch;
  10. use Illuminate\Bus\Queueable;
  11. use Illuminate\Contracts\Queue\ShouldQueue;
  12. use Illuminate\Foundation\Bus\Dispatchable;
  13. use Illuminate\Queue\InteractsWithQueue;
  14. use Illuminate\Queue\SerializesModels;
  15. use Illuminate\Support\Facades\DB;
  16. use Illuminate\Support\Facades\Log;
  17. class ProcessMarkdownSplit implements ShouldQueue
  18. {
  19. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  20. public int $timeout = 300; // 5分钟超时
  21. public int $tries = 3;
  22. /**
  23. * Create a new job instance.
  24. */
  25. public function __construct(
  26. public int $markdownImportId
  27. ) {
  28. //
  29. }
  30. /**
  31. * Execute the job.
  32. */
  33. public function handle(): void
  34. {
  35. try {
  36. // 获取 Markdown 导入记录
  37. $markdownImport = MarkdownImport::find($this->markdownImportId);
  38. if (!$markdownImport) {
  39. Log::error('MarkdownImport not found', [
  40. 'id' => $this->markdownImportId
  41. ]);
  42. return;
  43. }
  44. // 更新状态为处理中
  45. $markdownImport->update([
  46. 'status' => 'processing',
  47. 'progress_stage' => MarkdownImport::STAGE_SPLITTING,
  48. 'progress_message' => '开始拆题…',
  49. 'progress_current' => 0,
  50. 'progress_total' => 0,
  51. 'progress_updated_at' => now(),
  52. 'processing_started_at' => $markdownImport->processing_started_at ?? now(),
  53. 'processing_finished_at' => null,
  54. 'error_message' => null,
  55. ]);
  56. Log::info('Starting Markdown pipeline (source->paper->part->question)', [
  57. 'id' => $this->markdownImportId,
  58. ]);
  59. $fileParser = app(SourceFileParserService::class);
  60. $paperExtractor = app(SourcePaperExtractorService::class);
  61. $partExtractor = app(PaperPartExtractorService::class);
  62. $questionExtractor = app(QuestionExtractorService::class);
  63. // 建立 source_file
  64. $sourceFile = $fileParser->storeFromMarkdown(
  65. $markdownImport->file_name ?? ('import-' . $markdownImport->id . '.md'),
  66. $markdownImport->original_markdown,
  67. $markdownImport,
  68. [],
  69. null
  70. );
  71. // 拆分卷子和区块
  72. $papers = $paperExtractor->extract($sourceFile);
  73. $parts = collect();
  74. foreach ($papers as $paper) {
  75. $parts = $parts->merge($partExtractor->extract($paper));
  76. }
  77. // 写入候选题
  78. $markdownImport->update([
  79. 'progress_stage' => MarkdownImport::STAGE_WRITING,
  80. 'progress_message' => '写入拆题结果…',
  81. 'progress_updated_at' => now(),
  82. ]);
  83. PreQuestionCandidate::where('import_id', $this->markdownImportId)->update([
  84. 'status' => 'superseded',
  85. ]);
  86. $sequence = 1;
  87. $createdTotal = 0;
  88. foreach ($parts as $part) {
  89. $created = $questionExtractor->extractAndPersist($part, $markdownImport, $sequence);
  90. $createdTotal += $created->count();
  91. }
  92. $markdownImport->update([
  93. 'progress_total' => $createdTotal,
  94. 'progress_current' => 0,
  95. 'progress_updated_at' => now(),
  96. ]);
  97. if ($createdTotal === 0) {
  98. $markdownImport->update([
  99. 'status' => 'failed',
  100. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  101. 'progress_message' => '未解析出任何候选题',
  102. 'progress_updated_at' => now(),
  103. 'processing_finished_at' => now(),
  104. 'error_message' => 'No candidates found',
  105. ]);
  106. return;
  107. }
  108. // 进入并发 AI 解析阶段(方案 A:子 Job 批处理 + 多 worker 并行)
  109. $markdownImport->update([
  110. 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
  111. 'progress_message' => 'AI 解析中…',
  112. 'progress_current' => 0,
  113. 'progress_updated_at' => now(),
  114. ]);
  115. $total = $createdTotal;
  116. $batchSize = 10; // 每批处理 10 题(并发由 worker 数控制)
  117. $batches = (int) ceil($total / $batchSize);
  118. for ($b = 0; $b < $batches; $b++) {
  119. $startSeq = ($b * $batchSize) + 1;
  120. $endSeq = min(($b + 1) * $batchSize, $total);
  121. ProcessMarkdownCandidateBatch::dispatch($this->markdownImportId, $startSeq, $endSeq);
  122. }
  123. Log::info('Markdown AI parsing batches dispatched', [
  124. 'id' => $this->markdownImportId,
  125. 'total_blocks' => $total,
  126. 'batch_size' => $batchSize,
  127. 'batches' => $batches,
  128. ]);
  129. } catch (\Exception $e) {
  130. Log::error('Markdown split and AI analysis failed', [
  131. 'id' => $this->markdownImportId,
  132. 'error' => $e->getMessage(),
  133. 'trace' => $e->getTraceAsString()
  134. ]);
  135. // 更新状态为失败
  136. MarkdownImport::where('id', $this->markdownImportId)->update([
  137. 'status' => 'failed',
  138. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  139. 'progress_message' => '解析失败',
  140. 'progress_updated_at' => now(),
  141. 'processing_finished_at' => now(),
  142. 'error_message' => $e->getMessage()
  143. ]);
  144. }
  145. }
  146. /**
  147. * Handle a job failure.
  148. */
  149. public function failed(\Throwable $exception): void
  150. {
  151. Log::error('Markdown split job failed', [
  152. 'id' => $this->markdownImportId,
  153. 'error' => $exception->getMessage()
  154. ]);
  155. // 更新状态为失败
  156. MarkdownImport::where('id', $this->markdownImportId)->update([
  157. 'status' => 'failed',
  158. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  159. 'progress_message' => '任务执行失败',
  160. 'progress_updated_at' => now(),
  161. 'processing_finished_at' => now(),
  162. 'error_message' => $exception->getMessage()
  163. ]);
  164. }
  165. }