| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- <?php
- namespace App\Jobs;
- use App\Models\MarkdownImport;
- use App\Models\PreQuestionCandidate;
- use App\Services\AsyncMarkdownSplitter;
- use App\Jobs\ProcessMarkdownCandidateBatch;
- use Illuminate\Bus\Queueable;
- use Illuminate\Contracts\Queue\ShouldQueue;
- use Illuminate\Foundation\Bus\Dispatchable;
- use Illuminate\Queue\InteractsWithQueue;
- use Illuminate\Queue\SerializesModels;
- use Illuminate\Support\Facades\DB;
- use Illuminate\Support\Facades\Log;
- class ProcessMarkdownSplit implements ShouldQueue
- {
- use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- public int $timeout = 300; // 5分钟超时
- public int $tries = 3;
- /**
- * Create a new job instance.
- */
- public function __construct(
- public int $markdownImportId
- ) {
- //
- }
- /**
- * Execute the job.
- */
- public function handle(AsyncMarkdownSplitter $splitter): void
- {
- try {
- // 获取 Markdown 导入记录
- $markdownImport = MarkdownImport::find($this->markdownImportId);
- if (!$markdownImport) {
- Log::error('MarkdownImport not found', [
- 'id' => $this->markdownImportId
- ]);
- return;
- }
- // 更新状态为处理中
- $markdownImport->update([
- 'status' => 'processing',
- 'progress_stage' => MarkdownImport::STAGE_SPLITTING,
- 'progress_message' => '开始拆题…',
- 'progress_current' => 0,
- 'progress_total' => 0,
- 'progress_updated_at' => now(),
- 'processing_started_at' => $markdownImport->processing_started_at ?? now(),
- 'processing_finished_at' => null,
- 'error_message' => null,
- ]);
- Log::info('Starting Markdown split (orchestrator)', [
- 'id' => $this->markdownImportId
- ]);
- $blocks = $splitter->split($markdownImport->original_markdown);
- $splitter->validate($blocks);
- Log::info('Markdown split completed', [
- 'id' => $this->markdownImportId,
- 'blocks_count' => count($blocks),
- ]);
- $markdownImport->update([
- 'progress_total' => count($blocks),
- 'progress_current' => 0,
- 'progress_updated_at' => now(),
- ]);
- if (empty($blocks)) {
- Log::warning('No candidates found from Markdown parsing', [
- 'id' => $this->markdownImportId
- ]);
- $markdownImport->update([
- 'status' => 'failed',
- 'progress_stage' => MarkdownImport::STAGE_FAILED,
- 'progress_message' => '未解析出任何候选题',
- 'progress_updated_at' => now(),
- 'processing_finished_at' => now(),
- 'error_message' => 'No candidates found'
- ]);
- return;
- }
- Log::info('Markdown split done, seeding candidates to database', [
- 'id' => $this->markdownImportId,
- 'blocks_count' => count($blocks),
- ]);
- // 写入候选题到 pre_question_candidates 表(仅 raw_markdown + 顺序;AI 解析交给后续 batch job)
- DB::beginTransaction();
- try {
- $markdownImport->update([
- 'progress_stage' => MarkdownImport::STAGE_WRITING,
- 'progress_message' => '写入拆题结果…',
- 'progress_updated_at' => now(),
- ]);
- // 不删除历史数据:将旧记录标记为 superseded,避免重跑时混淆
- PreQuestionCandidate::where('import_id', $this->markdownImportId)->update([
- 'status' => 'superseded',
- ]);
- foreach ($blocks as $block) {
- $candidateIndex = (int) ($block['index'] ?? 0);
- $sequence = (int) ($block['sequence'] ?? 0);
- PreQuestionCandidate::updateOrCreate(
- [
- 'import_id' => $this->markdownImportId,
- // 用 sequence 做唯一键,避免题号重复导致覆盖丢题
- 'sequence' => $sequence,
- ],
- [
- 'index' => $candidateIndex,
- 'raw_markdown' => (string) ($block['raw_markdown'] ?? ''),
- 'stem' => null,
- 'options' => null,
- 'images' => [],
- 'tables' => [],
- 'is_question_candidate' => false,
- 'ai_confidence' => null,
- 'status' => 'ai_pending',
- ]
- );
- }
- DB::commit();
- Log::info('Successfully wrote candidates to pre_question_candidates', [
- 'id' => $this->markdownImportId,
- 'candidates_count' => count($blocks)
- ]);
- } catch (\Exception $e) {
- DB::rollBack();
- throw $e;
- }
- // 进入并发 AI 解析阶段(方案 A:子 Job 批处理 + 多 worker 并行)
- $markdownImport->update([
- 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
- 'progress_message' => 'AI 解析中…',
- 'progress_current' => 0,
- 'progress_updated_at' => now(),
- ]);
- $total = count($blocks);
- $batchSize = 10; // 每批处理 10 题(并发由 worker 数控制)
- $batches = (int) ceil($total / $batchSize);
- for ($b = 0; $b < $batches; $b++) {
- $startSeq = ($b * $batchSize) + 1;
- $endSeq = min(($b + 1) * $batchSize, $total);
- ProcessMarkdownCandidateBatch::dispatch($this->markdownImportId, $startSeq, $endSeq);
- }
- Log::info('Markdown AI parsing batches dispatched', [
- 'id' => $this->markdownImportId,
- 'total_blocks' => $total,
- 'batch_size' => $batchSize,
- 'batches' => $batches,
- ]);
- } catch (\Exception $e) {
- Log::error('Markdown split and AI analysis failed', [
- 'id' => $this->markdownImportId,
- 'error' => $e->getMessage(),
- 'trace' => $e->getTraceAsString()
- ]);
- // 更新状态为失败
- MarkdownImport::where('id', $this->markdownImportId)->update([
- 'status' => 'failed',
- 'progress_stage' => MarkdownImport::STAGE_FAILED,
- 'progress_message' => '解析失败',
- 'progress_updated_at' => now(),
- 'processing_finished_at' => now(),
- 'error_message' => $e->getMessage()
- ]);
- }
- }
- /**
- * Handle a job failure.
- */
- public function failed(\Throwable $exception): void
- {
- Log::error('Markdown split job failed', [
- 'id' => $this->markdownImportId,
- 'error' => $exception->getMessage()
- ]);
- // 更新状态为失败
- MarkdownImport::where('id', $this->markdownImportId)->update([
- 'status' => 'failed',
- 'progress_stage' => MarkdownImport::STAGE_FAILED,
- 'progress_message' => '任务执行失败',
- 'progress_updated_at' => now(),
- 'processing_finished_at' => now(),
- 'error_message' => $exception->getMessage()
- ]);
- }
- }
|