| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- <?php
- namespace App\Jobs;
- use App\Models\MarkdownImport;
- use App\Models\PreQuestionCandidate;
- use App\Services\SourceFileParserService;
- use App\Services\SourcePaperExtractorService;
- use App\Services\PaperPartExtractorService;
- use App\Services\QuestionExtractorService;
- use App\Jobs\ProcessMarkdownCandidateBatch;
- use Illuminate\Bus\Queueable;
- use Illuminate\Contracts\Queue\ShouldQueue;
- use Illuminate\Foundation\Bus\Dispatchable;
- use Illuminate\Queue\InteractsWithQueue;
- use Illuminate\Queue\SerializesModels;
- use Illuminate\Support\Facades\DB;
- use Illuminate\Support\Facades\Log;
- class ProcessMarkdownSplit implements ShouldQueue
- {
- use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- public int $timeout = 300; // 5分钟超时
- public int $tries = 3;
- /**
- * Create a new job instance.
- */
- public function __construct(
- public int $markdownImportId
- ) {
- //
- }
- /**
- * Execute the job.
- */
- public function handle(): void
- {
- try {
- // 获取 Markdown 导入记录
- $markdownImport = MarkdownImport::find($this->markdownImportId);
- if (!$markdownImport) {
- Log::error('MarkdownImport not found', [
- 'id' => $this->markdownImportId
- ]);
- return;
- }
- // 更新状态为处理中
- $markdownImport->update([
- 'status' => 'processing',
- 'progress_stage' => MarkdownImport::STAGE_SPLITTING,
- 'progress_message' => '开始拆题…',
- 'progress_current' => 0,
- 'progress_total' => 0,
- 'progress_updated_at' => now(),
- 'processing_started_at' => $markdownImport->processing_started_at ?? now(),
- 'processing_finished_at' => null,
- 'error_message' => null,
- ]);
- Log::info('Starting Markdown pipeline (source->paper->part->question)', [
- 'id' => $this->markdownImportId,
- ]);
- $fileParser = app(SourceFileParserService::class);
- $paperExtractor = app(SourcePaperExtractorService::class);
- $partExtractor = app(PaperPartExtractorService::class);
- $questionExtractor = app(QuestionExtractorService::class);
- // 建立 source_file
- $sourceFile = $fileParser->storeFromMarkdown(
- $markdownImport->file_name ?? ('import-' . $markdownImport->id . '.md'),
- $markdownImport->original_markdown,
- $markdownImport,
- [],
- null
- );
- // 拆分卷子和区块
- $papers = $paperExtractor->extract($sourceFile);
- $parts = collect();
- foreach ($papers as $paper) {
- $parts = $parts->merge($partExtractor->extract($paper));
- }
- // 写入候选题
- $markdownImport->update([
- 'progress_stage' => MarkdownImport::STAGE_WRITING,
- 'progress_message' => '写入拆题结果…',
- 'progress_updated_at' => now(),
- ]);
- PreQuestionCandidate::where('import_id', $this->markdownImportId)->update([
- 'status' => 'superseded',
- ]);
- $sequence = 1;
- $createdTotal = 0;
- foreach ($parts as $part) {
- $created = $questionExtractor->extractAndPersist($part, $markdownImport, $sequence);
- $createdTotal += $created->count();
- }
- $markdownImport->update([
- 'progress_total' => $createdTotal,
- 'progress_current' => 0,
- 'progress_updated_at' => now(),
- ]);
- if ($createdTotal === 0) {
- $markdownImport->update([
- 'status' => 'failed',
- 'progress_stage' => MarkdownImport::STAGE_FAILED,
- 'progress_message' => '未解析出任何候选题',
- 'progress_updated_at' => now(),
- 'processing_finished_at' => now(),
- 'error_message' => 'No candidates found',
- ]);
- return;
- }
- // 进入并发 AI 解析阶段(方案 A:子 Job 批处理 + 多 worker 并行)
- $markdownImport->update([
- 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
- 'progress_message' => 'AI 解析中…',
- 'progress_current' => 0,
- 'progress_updated_at' => now(),
- ]);
- $total = $createdTotal;
- $batchSize = 10; // 每批处理 10 题(并发由 worker 数控制)
- $batches = (int) ceil($total / $batchSize);
- for ($b = 0; $b < $batches; $b++) {
- $startSeq = ($b * $batchSize) + 1;
- $endSeq = min(($b + 1) * $batchSize, $total);
- ProcessMarkdownCandidateBatch::dispatch($this->markdownImportId, $startSeq, $endSeq);
- }
- Log::info('Markdown AI parsing batches dispatched', [
- 'id' => $this->markdownImportId,
- 'total_blocks' => $total,
- 'batch_size' => $batchSize,
- 'batches' => $batches,
- ]);
- } catch (\Exception $e) {
- Log::error('Markdown split and AI analysis failed', [
- 'id' => $this->markdownImportId,
- 'error' => $e->getMessage(),
- 'trace' => $e->getTraceAsString()
- ]);
- // 更新状态为失败
- MarkdownImport::where('id', $this->markdownImportId)->update([
- 'status' => 'failed',
- 'progress_stage' => MarkdownImport::STAGE_FAILED,
- 'progress_message' => '解析失败',
- 'progress_updated_at' => now(),
- 'processing_finished_at' => now(),
- 'error_message' => $e->getMessage()
- ]);
- }
- }
- /**
- * Handle a job failure.
- */
- public function failed(\Throwable $exception): void
- {
- Log::error('Markdown split job failed', [
- 'id' => $this->markdownImportId,
- 'error' => $exception->getMessage()
- ]);
- // 更新状态为失败
- MarkdownImport::where('id', $this->markdownImportId)->update([
- 'status' => 'failed',
- 'progress_stage' => MarkdownImport::STAGE_FAILED,
- 'progress_message' => '任务执行失败',
- 'progress_updated_at' => now(),
- 'processing_finished_at' => now(),
- 'error_message' => $exception->getMessage()
- ]);
- }
- }
|