markdownImportId); if (!$markdownImport) { Log::error('MarkdownImport not found', [ 'id' => $this->markdownImportId ]); return; } // 更新状态为处理中 $markdownImport->update([ 'status' => 'processing', 'progress_stage' => MarkdownImport::STAGE_SPLITTING, 'progress_message' => '开始拆题…', 'progress_current' => 0, 'progress_total' => 0, 'progress_updated_at' => now(), 'processing_started_at' => $markdownImport->processing_started_at ?? now(), 'processing_finished_at' => null, 'error_message' => null, ]); Log::info('Starting Markdown pipeline (source->paper->part->question)', [ 'id' => $this->markdownImportId, ]); $fileParser = app(SourceFileParserService::class); $paperExtractor = app(SourcePaperExtractorService::class); $partExtractor = app(PaperPartExtractorService::class); $questionExtractor = app(QuestionExtractorService::class); // 建立 source_file $sourceFile = $fileParser->storeFromMarkdown( $markdownImport->file_name ?? ('import-' . $markdownImport->id . '.md'), $markdownImport->original_markdown, $markdownImport, [], null ); // 同步上传原始 Markdown 到春笋云 try { $storageService = app(PdfStorageService::class); $path = "imports/markdown/{$markdownImport->id}_" . ($markdownImport->file_name ?: 'import.md'); $remoteUrl = $storageService->put($path, (string)$markdownImport->original_markdown); if ($remoteUrl) { $markdownImport->update(['remote_url' => $remoteUrl]); } } catch (\Exception $e) { Log::warning('Failed to upload markdown to Chunsun', [ 'id' => $this->markdownImportId, 'error' => $e->getMessage(), ]); } // 拆分卷子和区块 $papers = $paperExtractor->extract($sourceFile); $parts = collect(); foreach ($papers as $paper) { $parts = $parts->merge($partExtractor->extract($paper)); } // 写入候选题 $markdownImport->update([ 'progress_stage' => MarkdownImport::STAGE_WRITING, 'progress_message' => '写入拆题结果…', 'progress_updated_at' => now(), ]); // 清理旧的队列任务,避免重复批次累积 DB::table('jobs') ->where('payload', 'like', '%\"markdownImportId\":' . $this->markdownImportId . '%') ->orWhere('payload', 'like', '%\"markdownImportId\";i:' . $this->markdownImportId . ';%') ->delete(); PreQuestionCandidate::where('import_id', $this->markdownImportId)->update([ 'status' => 'superseded', ]); $sequence = 1; $createdTotal = 0; foreach ($parts as $part) { $created = $questionExtractor->extractAndPersist($part, $markdownImport, $sequence); $createdTotal += $created->count(); } $markdownImport->update([ 'progress_total' => $createdTotal, 'progress_current' => 0, 'progress_updated_at' => now(), ]); if ($createdTotal === 0) { $markdownImport->update([ 'status' => 'failed', 'progress_stage' => MarkdownImport::STAGE_FAILED, 'progress_message' => '未解析出任何候选题', 'progress_updated_at' => now(), 'processing_finished_at' => now(), 'error_message' => 'No candidates found', ]); return; } // 进入并发 AI 解析阶段(方案 A:子 Job 批处理 + 多 worker 并行) $markdownImport->update([ 'progress_stage' => MarkdownImport::STAGE_AI_PARSING, 'progress_message' => 'AI 解析中…', 'progress_current' => 0, 'progress_updated_at' => now(), ]); $total = $createdTotal; $batchSize = 10; // 每批处理 10 题(并发由 worker 数控制) $batches = (int) ceil($total / $batchSize); for ($b = 0; $b < $batches; $b++) { $startSeq = ($b * $batchSize) + 1; $endSeq = min(($b + 1) * $batchSize, $total); ProcessMarkdownCandidateBatch::dispatch($this->markdownImportId, $startSeq, $endSeq); } Log::info('Markdown AI parsing batches dispatched', [ 'id' => $this->markdownImportId, 'total_blocks' => $total, 'batch_size' => $batchSize, 'batches' => $batches, ]); } catch (\Exception $e) { Log::error('Markdown split and AI analysis failed', [ 'id' => $this->markdownImportId, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); // 更新状态为失败 MarkdownImport::where('id', $this->markdownImportId)->update([ 'status' => 'failed', 'progress_stage' => MarkdownImport::STAGE_FAILED, 'progress_message' => '解析失败', 'progress_updated_at' => now(), 'processing_finished_at' => now(), 'error_message' => $e->getMessage() ]); } } /** * Handle a job failure. */ public function failed(\Throwable $exception): void { Log::error('Markdown split job failed', [ 'id' => $this->markdownImportId, 'error' => $exception->getMessage() ]); // 更新状态为失败 MarkdownImport::where('id', $this->markdownImportId)->update([ 'status' => 'failed', 'progress_stage' => MarkdownImport::STAGE_FAILED, 'progress_message' => '任务执行失败', 'progress_updated_at' => now(), 'processing_finished_at' => now(), 'error_message' => $exception->getMessage() ]); } }