#!/usr/bin/env php make(Illuminate\Contracts\Console\Kernel::class); $kernel->bootstrap(); use App\Models\MarkdownImport; use App\Models\PreQuestionCandidate; use App\Services\MarkdownQuestionParser; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Facade; echo "\n=== AI 解析同步执行脚本 (并发版) ===\n\n"; // 查找所有导入记录 $allImports = MarkdownImport::orderBy('created_at', 'desc')->get(); if ($allImports->isEmpty()) { echo "❌ 没有找到导入记录。\n\n"; exit(0); } echo "📋 找到 {$allImports->count()} 个导入记录:\n\n"; foreach ($allImports as $index => $import) { $candidateCount = PreQuestionCandidate::where('import_id', $import->id) ->where('status', '!=', 'superseded') ->count(); echo sprintf( "%d. ID %d: %s (%d 个候选题)\n", $index + 1, $import->id, $import->file_name, $candidateCount ); } echo "\n选择要解析的导入ID (输入数字,多个用逗号分隔,输入 'all' 全部执行): "; $input = trim(fgets(STDIN)); $selectedImports = []; if ($input === 'all') { $selectedImports = $allImports; } else { $indices = array_map('intval', explode(',', $input)); foreach ($indices as $index) { if ($index >= 1 && $index <= $allImports->count()) { $selectedImports[] = $allImports[$index - 1]; } } } if (empty($selectedImports)) { echo "❌ 没有选择有效的导入记录。\n"; exit(1); } echo "\n设置并发参数:\n"; echo "并发进程数 (建议 4-8,默认为 4): "; $workers = (int)trim(fgets(STDIN)); if ($workers <= 0) { $workers = 4; } echo "每个进程的批次大小 (默认 10): "; $batchSize = (int)trim(fgets(STDIN)); if ($batchSize <= 0) { $batchSize = 10; } echo "\n=== 开始执行 AI 解析 (并发模式: {$workers} 进程) ===\n\n"; foreach ($selectedImports as $import) { echo "🔄 处理 ID {$import->id}: {$import->file_name}\n"; $candidateCount = PreQuestionCandidate::where('import_id', $import->id) ->where('status', '!=', 'superseded') ->count(); if ($candidateCount === 0) { echo " ⚠️ 没有候选题,跳过\n\n"; continue; } // 更新导入状态 $import->update([ 'status' => 'processing', 'progress_stage' => MarkdownImport::STAGE_AI_PARSING, 'progress_message' => "开始 AI 解析(本地脚本)...", 'progress_current' => 0, 'progress_total' => $candidateCount, 'progress_updated_at' => now(), 'processing_started_at' => $import->processing_started_at ?: now(), 'processing_finished_at' => null, 'error_message' => null, ]); echo " 📊 总计 {$candidateCount} 个候选题,使用 {$workers} 个进程并发处理\n"; // 将候选题分成批次 $candidateIds = PreQuestionCandidate::where('import_id', $import->id) ->where('status', '!=', 'superseded') ->orderBy('id') ->pluck('id') ->toArray(); $batches = array_chunk($candidateIds, $batchSize); $batchCount = count($batches); $batchSizes = array_map('count', $batches); $activeWorkers = []; // 创建临时目录存储子进程结果 $tmpDir = sys_get_temp_dir() . '/ai_parse_' . $import->id; if (!is_dir($tmpDir)) { mkdir($tmpDir, 0777, true); } $batchIndex = 0; $processedTotal = 0; $failedTotal = 0; $batchDurations = []; // 主进程循环 while ($batchIndex < $batchCount || !empty($activeWorkers)) { // 启动新的工作进程 while ($batchIndex < $batchCount && count($activeWorkers) < $workers) { $batch = $batches[$batchIndex]; $pid = pcntl_fork(); if ($pid == -1) { die("无法创建子进程\n"); } elseif ($pid == 0) { // 子进程 $start = microtime(true); $result = [ 'processed' => 0, 'failed' => count($batch), 'error' => 'child terminated unexpectedly', 'duration_sec' => 0, ]; try { $result = processBatch($import->id, $batch); } catch (Throwable $e) { $result = [ 'processed' => 0, 'failed' => count($batch), 'error' => $e->getMessage(), 'duration_sec' => 0, ]; } $result['duration_sec'] = round(microtime(true) - $start, 3); $resultFile = $tmpDir . '/batch_' . $batchIndex . '.json'; $tmpFile = $resultFile . '.tmp'; file_put_contents($tmpFile, json_encode($result), LOCK_EX); rename($tmpFile, $resultFile); exit(($result['failed'] ?? 0) > 0 ? 1 : 0); } else { // 父进程 $activeWorkers[$pid] = $batchIndex; $batchIndex++; } } // 检查子进程状态 foreach ($activeWorkers as $pid => $batchIdx) { $res = pcntl_waitpid($pid, $status, WNOHANG); if ($res == $pid) { // 子进程完成 unset($activeWorkers[$pid]); // 读取结果 $resultFile = $tmpDir . '/batch_' . $batchIdx . '.json'; if (file_exists($resultFile)) { $raw = file_get_contents($resultFile); $result = json_decode($raw, true); $expected = $batchSizes[$batchIdx] ?? 0; if (!is_array($result)) { $result = [ 'processed' => 0, 'failed' => $expected, 'error' => 'invalid result file: ' . ($raw === '' ? 'empty' : 'malformed json'), ]; } $processed = $result['processed'] ?? 0; $failed = $result['failed'] ?? $expected; $accounted = $processed + $failed; if ($expected > 0 && $accounted < $expected) { $failed += ($expected - $accounted); } $processedTotal += $processed; $failedTotal += $failed; if (!empty($result['error'])) { echo " ⚠️ 批次 {$batchIdx} 错误: {$result['error']}\n"; } if (isset($result['duration_sec'])) { $duration = (float) $result['duration_sec']; $batchDurations[] = $duration; $count = count($batchDurations); echo sprintf(" ⏱️ 批次 %d 用时: %.2fs\n", $batchIdx, $duration); if ($count % 10 === 0) { $recent = array_slice($batchDurations, -10); $avg = array_sum($recent) / max(count($recent), 1); echo sprintf(" 📈 最近 10 批次平均用时: %.2fs (估算 10 批次)\n", $avg); } } $percent = round(($processedTotal / $candidateCount) * 100, 1); echo " ⏳ 进度: {$processedTotal}/{$candidateCount} ({$percent}%)\n"; } else { $failedTotal += $batchSizes[$batchIdx] ?? 0; echo " ⚠️ 批次 {$batchIdx} 未找到结果文件,已计为失败\n"; } } } // 短暂休眠避免CPU占用过高 usleep(100000); // 0.1秒 } // 清理临时文件 array_map('unlink', glob($tmpDir . '/*')); rmdir($tmpDir); // 更新最终状态 $import->update([ 'status' => 'parsed', 'progress_stage' => MarkdownImport::STAGE_PARSED, 'progress_message' => "解析完成,成功 {$processedTotal},失败 {$failedTotal}", 'progress_current' => $processedTotal, 'progress_total' => $candidateCount, 'progress_updated_at' => now(), 'processing_finished_at' => now(), ]); echo " ✅ 完成: 成功 {$processedTotal} 题,失败 {$failedTotal} 题\n\n"; } echo "=== 所有任务完成 ===\n\n"; // 子进程处理函数 function processBatch($importId, $candidateIds) { // 确保自动加载器已加载 if (!file_exists(__DIR__ . '/vendor/autoload.php')) { return [ 'processed' => 0, 'failed' => count($candidateIds), 'error' => 'Composer autoload not found', ]; } require_once __DIR__ . '/vendor/autoload.php'; // 在子进程中重新初始化 Laravel 应用 $app = require __DIR__ . '/bootstrap/app.php'; // 先检查应用实例是否正确 if (!$app instanceof Illuminate\Foundation\Application) { return [ 'processed' => 0, 'failed' => count($candidateIds), 'error' => 'Laravel app initialization failed in child process', ]; } try { Facade::clearResolvedInstances(); Facade::setFacadeApplication($app); $kernel = $app->make(Illuminate\Contracts\Console\Kernel::class); $kernel->bootstrap(); } catch (Throwable $e) { return [ 'processed' => 0, 'failed' => count($candidateIds), 'error' => 'Failed to bootstrap kernel: ' . $e->getMessage(), ]; } try { DB::disconnect(); DB::reconnect(); DB::disableQueryLog(); } catch (Throwable $e) { return [ 'processed' => 0, 'failed' => count($candidateIds), 'error' => 'Failed to reconnect DB: ' . $e->getMessage(), ]; } // 直接从容器获取服务 try { $parser = $app->make(App\Services\MarkdownQuestionParser::class); } catch (Throwable $e) { return [ 'processed' => 0, 'failed' => count($candidateIds), 'error' => 'Failed to resolve services: ' . $e->getMessage(), ]; } $processed = 0; $failed = 0; foreach ($candidateIds as $candidateId) { try { // 在子进程中也需要重新连接数据库 $candidate = App\Models\PreQuestionCandidate::find($candidateId); if (!$candidate) { $failed++; continue; } // 如果已经解析过,跳过 $meta = $candidate->meta ?? []; if (!empty($meta['ai_parsed'])) { $processed++; continue; } // 执行 AI 解析 $parsed = $parser->parseRawMarkdown((string) $candidate->raw_markdown, (int) $candidate->index); $meta = $candidate->meta ?? []; $meta['ai_parsed'] = true; $meta['ai_parsed_at'] = now()->toDateTimeString(); $candidate->update([ 'stem' => $parsed['stem'] ?? null, 'options' => $parsed['options'] ?? null, 'images' => $parsed['images'] ?? [], 'tables' => $parsed['tables'] ?? [], 'is_question_candidate' => (bool) ($parsed['is_question_candidate'] ?? false), 'ai_confidence' => $parsed['ai_confidence'] ?? null, 'status' => 'pending', 'meta' => $meta, ]); $processed++; } catch (\Throwable $e) { $failed++; // 记录详细错误信息到子进程stderr fwrite(STDERR, sprintf( "[Child PID %d] Candidate %d failed: %s in %s:%d\n%s\n", getmypid(), $candidateId, $e->getMessage(), basename($e->getFile()), $e->getLine(), $e->getTraceAsString() )); } } return [ 'processed' => $processed, 'failed' => $failed, ]; }