ProcessMarkdownCandidateBatch.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. <?php
  2. namespace App\Jobs;
  3. use App\Models\MarkdownImport;
  4. use App\Models\PreQuestionCandidate;
  5. use App\Services\MarkdownQuestionParser;
  6. use Illuminate\Bus\Queueable;
  7. use Illuminate\Contracts\Queue\ShouldQueue;
  8. use Illuminate\Foundation\Bus\Dispatchable;
  9. use Illuminate\Queue\InteractsWithQueue;
  10. use Illuminate\Queue\SerializesModels;
  11. use Illuminate\Support\Facades\DB;
  12. use Illuminate\Support\Facades\Log;
  13. class ProcessMarkdownCandidateBatch implements ShouldQueue
  14. {
  15. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  16. public int $timeout = 300; // 5分钟超时
  17. public int $tries = 3;
  18. public function __construct(
  19. public int $markdownImportId,
  20. public int $sequenceStart,
  21. public int $sequenceEnd
  22. ) {
  23. //
  24. }
  25. public function handle(MarkdownQuestionParser $parser): void
  26. {
  27. $import = MarkdownImport::find($this->markdownImportId);
  28. if (!$import) {
  29. Log::error('MarkdownImport not found for batch', [
  30. 'id' => $this->markdownImportId,
  31. 'sequence_start' => $this->sequenceStart,
  32. 'sequence_end' => $this->sequenceEnd,
  33. ]);
  34. return;
  35. }
  36. Log::info('Markdown batch started', [
  37. 'import_id' => $this->markdownImportId,
  38. 'sequence_start' => $this->sequenceStart,
  39. 'sequence_end' => $this->sequenceEnd,
  40. ]);
  41. $records = PreQuestionCandidate::query()
  42. ->where('import_id', $this->markdownImportId)
  43. ->whereBetween('sequence', [$this->sequenceStart, $this->sequenceEnd])
  44. ->orderBy('sequence')
  45. ->get();
  46. $processed = 0;
  47. $failed = 0;
  48. foreach ($records as $record) {
  49. try {
  50. // 已经处理过的不重复处理
  51. if (in_array($record->status, ['pending', 'reviewed', 'accepted', 'rejected'], true) && $record->stem !== null) {
  52. continue;
  53. }
  54. $parsed = $parser->parseRawMarkdown((string) $record->raw_markdown, (int) $record->index);
  55. $record->update([
  56. 'stem' => $parsed['stem'] ?? null,
  57. 'options' => $parsed['options'] ?? null,
  58. 'images' => $parsed['images'] ?? [],
  59. 'tables' => $parsed['tables'] ?? [],
  60. 'is_question_candidate' => (bool) ($parsed['is_question_candidate'] ?? false),
  61. 'ai_confidence' => $parsed['ai_confidence'] ?? null,
  62. 'status' => 'pending',
  63. ]);
  64. $processed++;
  65. } catch (\Throwable $e) {
  66. $failed++;
  67. Log::warning('Markdown batch item failed', [
  68. 'import_id' => $this->markdownImportId,
  69. 'candidate_id' => $record->id,
  70. 'sequence' => $record->sequence,
  71. 'index' => $record->index,
  72. 'error' => $e->getMessage(),
  73. ]);
  74. }
  75. }
  76. if ($processed > 0) {
  77. DB::table('markdown_imports')
  78. ->where('id', $this->markdownImportId)
  79. ->update([
  80. 'progress_current' => DB::raw('progress_current + ' . (int) $processed),
  81. 'progress_updated_at' => now(),
  82. 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
  83. 'progress_message' => 'AI 解析中…',
  84. ]);
  85. }
  86. Log::info('Markdown batch finished', [
  87. 'import_id' => $this->markdownImportId,
  88. 'sequence_start' => $this->sequenceStart,
  89. 'sequence_end' => $this->sequenceEnd,
  90. 'processed' => $processed,
  91. 'failed' => $failed,
  92. ]);
  93. $this->finalizeIfDone();
  94. }
  95. private function finalizeIfDone(): void
  96. {
  97. $import = MarkdownImport::find($this->markdownImportId);
  98. if (!$import) {
  99. return;
  100. }
  101. $total = (int) ($import->progress_total ?? 0);
  102. $current = (int) ($import->progress_current ?? 0);
  103. if ($total <= 0 || $current < $total) {
  104. return;
  105. }
  106. // 只要有一个 batch 到达“完成条件”,就尝试做一次幂等的最终状态更新
  107. $updated = DB::table('markdown_imports')
  108. ->where('id', $this->markdownImportId)
  109. ->where('status', 'processing')
  110. ->update([
  111. 'status' => 'parsed',
  112. 'progress_stage' => MarkdownImport::STAGE_PARSED,
  113. 'progress_message' => '解析完成,等待人工校对',
  114. 'progress_updated_at' => now(),
  115. 'processing_finished_at' => now(),
  116. ]);
  117. if ($updated) {
  118. Log::info('Markdown import finalized', [
  119. 'import_id' => $this->markdownImportId,
  120. 'progress_total' => $total,
  121. 'progress_current' => $current,
  122. ]);
  123. }
  124. }
  125. }