ProcessMarkdownSplit.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. <?php
  2. namespace App\Jobs;
  3. use App\Models\MarkdownImport;
  4. use App\Models\PreQuestionCandidate;
  5. use App\Services\SourceFileParserService;
  6. use App\Services\SourcePaperExtractorService;
  7. use App\Services\PaperPartExtractorService;
  8. use App\Services\QuestionExtractorService;
  9. use App\Services\PdfStorageService;
  10. use App\Jobs\ProcessMarkdownCandidateBatch;
  11. use Illuminate\Bus\Queueable;
  12. use Illuminate\Contracts\Queue\ShouldQueue;
  13. use Illuminate\Foundation\Bus\Dispatchable;
  14. use Illuminate\Queue\InteractsWithQueue;
  15. use Illuminate\Queue\SerializesModels;
  16. use Illuminate\Support\Facades\DB;
  17. use Illuminate\Support\Facades\Log;
  18. class ProcessMarkdownSplit implements ShouldQueue
  19. {
  20. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  21. public int $timeout = 300; // 5分钟超时
  22. public int $tries = 3;
  23. /**
  24. * Create a new job instance.
  25. */
  26. public function __construct(
  27. public int $markdownImportId
  28. ) {
  29. //
  30. }
  31. /**
  32. * Execute the job.
  33. */
  34. public function handle(): void
  35. {
  36. try {
  37. // 获取 Markdown 导入记录
  38. $markdownImport = MarkdownImport::find($this->markdownImportId);
  39. if (!$markdownImport) {
  40. Log::error('MarkdownImport not found', [
  41. 'id' => $this->markdownImportId
  42. ]);
  43. return;
  44. }
  45. // 更新状态为处理中
  46. $markdownImport->update([
  47. 'status' => 'processing',
  48. 'progress_stage' => MarkdownImport::STAGE_SPLITTING,
  49. 'progress_message' => '开始拆题…',
  50. 'progress_current' => 0,
  51. 'progress_total' => 0,
  52. 'progress_updated_at' => now(),
  53. 'processing_started_at' => $markdownImport->processing_started_at ?? now(),
  54. 'processing_finished_at' => null,
  55. 'error_message' => null,
  56. ]);
  57. Log::info('Starting Markdown pipeline (source->paper->part->question)', [
  58. 'id' => $this->markdownImportId,
  59. ]);
  60. $fileParser = app(SourceFileParserService::class);
  61. $paperExtractor = app(SourcePaperExtractorService::class);
  62. $partExtractor = app(PaperPartExtractorService::class);
  63. $questionExtractor = app(QuestionExtractorService::class);
  64. // 建立 source_file
  65. $sourceFile = $fileParser->storeFromMarkdown(
  66. $markdownImport->file_name ?? ('import-' . $markdownImport->id . '.md'),
  67. $markdownImport->original_markdown,
  68. $markdownImport,
  69. [],
  70. null
  71. );
  72. // 同步上传原始 Markdown 到春笋云
  73. try {
  74. $storageService = app(PdfStorageService::class);
  75. $path = "imports/markdown/{$markdownImport->id}_" . ($markdownImport->file_name ?: 'import.md');
  76. $remoteUrl = $storageService->put($path, (string)$markdownImport->original_markdown);
  77. if ($remoteUrl) {
  78. $markdownImport->update(['remote_url' => $remoteUrl]);
  79. }
  80. } catch (\Exception $e) {
  81. Log::warning('Failed to upload markdown to Chunsun', [
  82. 'id' => $this->markdownImportId,
  83. 'error' => $e->getMessage(),
  84. ]);
  85. }
  86. // 拆分卷子和区块
  87. $papers = $paperExtractor->extract($sourceFile);
  88. $parts = collect();
  89. foreach ($papers as $paper) {
  90. $parts = $parts->merge($partExtractor->extract($paper));
  91. }
  92. // 写入候选题
  93. $markdownImport->update([
  94. 'progress_stage' => MarkdownImport::STAGE_WRITING,
  95. 'progress_message' => '写入拆题结果…',
  96. 'progress_updated_at' => now(),
  97. ]);
  98. // 清理旧的队列任务,避免重复批次累积
  99. DB::table('jobs')
  100. ->where('payload', 'like', '%\"markdownImportId\":' . $this->markdownImportId . '%')
  101. ->orWhere('payload', 'like', '%\"markdownImportId\";i:' . $this->markdownImportId . ';%')
  102. ->delete();
  103. PreQuestionCandidate::where('import_id', $this->markdownImportId)->update([
  104. 'status' => 'superseded',
  105. ]);
  106. $sequence = 1;
  107. $createdTotal = 0;
  108. foreach ($parts as $part) {
  109. $created = $questionExtractor->extractAndPersist($part, $markdownImport, $sequence);
  110. $createdTotal += $created->count();
  111. }
  112. $markdownImport->update([
  113. 'progress_total' => $createdTotal,
  114. 'progress_current' => 0,
  115. 'progress_updated_at' => now(),
  116. ]);
  117. if ($createdTotal === 0) {
  118. $markdownImport->update([
  119. 'status' => 'failed',
  120. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  121. 'progress_message' => '未解析出任何候选题',
  122. 'progress_updated_at' => now(),
  123. 'processing_finished_at' => now(),
  124. 'error_message' => 'No candidates found',
  125. ]);
  126. return;
  127. }
  128. // 进入并发 AI 解析阶段(方案 A:子 Job 批处理 + 多 worker 并行)
  129. $markdownImport->update([
  130. 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
  131. 'progress_message' => 'AI 解析中…',
  132. 'progress_current' => 0,
  133. 'progress_updated_at' => now(),
  134. ]);
  135. $total = $createdTotal;
  136. $batchSize = 10; // 每批处理 10 题(并发由 worker 数控制)
  137. $batches = (int) ceil($total / $batchSize);
  138. for ($b = 0; $b < $batches; $b++) {
  139. $startSeq = ($b * $batchSize) + 1;
  140. $endSeq = min(($b + 1) * $batchSize, $total);
  141. ProcessMarkdownCandidateBatch::dispatch($this->markdownImportId, $startSeq, $endSeq);
  142. }
  143. Log::info('Markdown AI parsing batches dispatched', [
  144. 'id' => $this->markdownImportId,
  145. 'total_blocks' => $total,
  146. 'batch_size' => $batchSize,
  147. 'batches' => $batches,
  148. ]);
  149. } catch (\Exception $e) {
  150. Log::error('Markdown split and AI analysis failed', [
  151. 'id' => $this->markdownImportId,
  152. 'error' => $e->getMessage(),
  153. 'trace' => $e->getTraceAsString()
  154. ]);
  155. // 更新状态为失败
  156. MarkdownImport::where('id', $this->markdownImportId)->update([
  157. 'status' => 'failed',
  158. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  159. 'progress_message' => '解析失败',
  160. 'progress_updated_at' => now(),
  161. 'processing_finished_at' => now(),
  162. 'error_message' => $e->getMessage()
  163. ]);
  164. }
  165. }
  166. /**
  167. * Handle a job failure.
  168. */
  169. public function failed(\Throwable $exception): void
  170. {
  171. Log::error('Markdown split job failed', [
  172. 'id' => $this->markdownImportId,
  173. 'error' => $exception->getMessage()
  174. ]);
  175. // 更新状态为失败
  176. MarkdownImport::where('id', $this->markdownImportId)->update([
  177. 'status' => 'failed',
  178. 'progress_stage' => MarkdownImport::STAGE_FAILED,
  179. 'progress_message' => '任务执行失败',
  180. 'progress_updated_at' => now(),
  181. 'processing_finished_at' => now(),
  182. 'error_message' => $exception->getMessage()
  183. ]);
  184. }
  185. }