ai_parse_sync.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. #!/usr/bin/env php
  2. <?php
  3. // 检查是否支持多进程
  4. if (!function_exists('pcntl_fork')) {
  5. echo "\n❌ 错误:PHP pcntl 扩展未安装,无法使用并发模式。\n";
  6. echo "请安装 pcntl 扩展或使用单进程模式。\n\n";
  7. exit(1);
  8. }
  9. require __DIR__ . '/vendor/autoload.php';
  10. $app = require_once __DIR__ . '/bootstrap/app.php';
  11. $kernel = $app->make(Illuminate\Contracts\Console\Kernel::class);
  12. $kernel->bootstrap();
  13. use App\Models\MarkdownImport;
  14. use App\Models\PreQuestionCandidate;
  15. use App\Services\MarkdownQuestionParser;
  16. use Illuminate\Support\Facades\DB;
  17. use Illuminate\Support\Facades\Facade;
  18. echo "\n=== AI 解析同步执行脚本 (并发版) ===\n\n";
  19. // 查找所有导入记录
  20. $allImports = MarkdownImport::orderBy('created_at', 'desc')->get();
  21. if ($allImports->isEmpty()) {
  22. echo "❌ 没有找到导入记录。\n\n";
  23. exit(0);
  24. }
  25. echo "📋 找到 {$allImports->count()} 个导入记录:\n\n";
  26. foreach ($allImports as $index => $import) {
  27. $candidateCount = PreQuestionCandidate::where('import_id', $import->id)
  28. ->where('status', '!=', 'superseded')
  29. ->count();
  30. echo sprintf(
  31. "%d. ID %d: %s (%d 个候选题)\n",
  32. $index + 1,
  33. $import->id,
  34. $import->file_name,
  35. $candidateCount
  36. );
  37. }
  38. echo "\n选择要解析的导入ID:\n";
  39. echo "输入数字,多个用逗号分隔,或输入 'all' 全部执行: ";
  40. $input = trim((string) env('AI_PARSE_SELECTION', ''));
  41. if ($input !== '') {
  42. echo $input . "\n";
  43. } else {
  44. $line = fgets(STDIN);
  45. if ($line === false) {
  46. $input = 'all';
  47. echo $input . "\n";
  48. } else {
  49. $input = trim($line);
  50. }
  51. }
  52. $selectedImports = [];
  53. if ($input === 'all') {
  54. $selectedImports = $allImports;
  55. } else {
  56. $indices = array_map('intval', explode(',', $input));
  57. foreach ($indices as $index) {
  58. if ($index >= 1 && $index <= $allImports->count()) {
  59. $selectedImports[] = $allImports[$index - 1];
  60. }
  61. }
  62. }
  63. if (empty($selectedImports)) {
  64. echo "❌ 没有选择有效的导入记录。\n";
  65. exit(1);
  66. }
  67. echo "\n设置并发参数:\n";
  68. $workers = (int) env('AI_PARSE_WORKERS', 2);
  69. $batchSize = (int) env('AI_PARSE_BATCH', 10);
  70. echo "并发进程数: {$workers}\n";
  71. echo "每个进程的批次大小: {$batchSize}\n";
  72. echo "\n=== 开始执行 AI 解析 (并发模式: {$workers} 进程) ===\n\n";
  73. foreach ($selectedImports as $import) {
  74. echo "🔄 处理 ID {$import->id}: {$import->file_name}\n";
  75. $candidateCount = PreQuestionCandidate::where('import_id', $import->id)
  76. ->where('status', '!=', 'superseded')
  77. ->count();
  78. if ($candidateCount === 0) {
  79. echo " ⚠️ 没有候选题,跳过\n\n";
  80. continue;
  81. }
  82. // 更新导入状态
  83. $import->update([
  84. 'status' => 'processing',
  85. 'progress_stage' => MarkdownImport::STAGE_AI_PARSING,
  86. 'progress_message' => "开始 AI 解析(本地脚本)...",
  87. 'progress_current' => 0,
  88. 'progress_total' => $candidateCount,
  89. 'progress_updated_at' => now(),
  90. 'processing_started_at' => $import->processing_started_at ?: now(),
  91. 'processing_finished_at' => null,
  92. 'error_message' => null,
  93. ]);
  94. echo " 📊 总计 {$candidateCount} 个候选题,使用 {$workers} 个进程并发处理\n";
  95. // 将候选题分成批次
  96. $candidateIds = PreQuestionCandidate::where('import_id', $import->id)
  97. ->where('status', '!=', 'superseded')
  98. ->orderBy('id')
  99. ->pluck('id')
  100. ->toArray();
  101. $batches = array_chunk($candidateIds, $batchSize);
  102. $batchCount = count($batches);
  103. $batchSizes = array_map('count', $batches);
  104. $activeWorkers = [];
  105. // 创建临时目录存储子进程结果
  106. $tmpDir = sys_get_temp_dir() . '/ai_parse_' . $import->id;
  107. if (!is_dir($tmpDir)) {
  108. mkdir($tmpDir, 0777, true);
  109. }
  110. $batchIndex = 0;
  111. $processedTotal = 0;
  112. $failedTotal = 0;
  113. $batchDurations = [];
  114. // 主进程循环
  115. while ($batchIndex < $batchCount || !empty($activeWorkers)) {
  116. // 启动新的工作进程
  117. while ($batchIndex < $batchCount && count($activeWorkers) < $workers) {
  118. $batch = $batches[$batchIndex];
  119. $pid = pcntl_fork();
  120. if ($pid == -1) {
  121. die("无法创建子进程\n");
  122. } elseif ($pid == 0) {
  123. // 子进程
  124. $start = microtime(true);
  125. $result = [
  126. 'processed' => 0,
  127. 'failed' => count($batch),
  128. 'error' => 'child terminated unexpectedly',
  129. 'duration_sec' => 0,
  130. ];
  131. try {
  132. $result = processBatch($import->id, $batch);
  133. } catch (Throwable $e) {
  134. $result = [
  135. 'processed' => 0,
  136. 'failed' => count($batch),
  137. 'error' => $e->getMessage(),
  138. 'duration_sec' => 0,
  139. ];
  140. }
  141. $result['duration_sec'] = round(microtime(true) - $start, 3);
  142. $resultFile = $tmpDir . '/batch_' . $batchIndex . '.json';
  143. $tmpFile = $resultFile . '.tmp';
  144. file_put_contents($tmpFile, json_encode($result), LOCK_EX);
  145. rename($tmpFile, $resultFile);
  146. exit(($result['failed'] ?? 0) > 0 ? 1 : 0);
  147. } else {
  148. // 父进程
  149. $activeWorkers[$pid] = $batchIndex;
  150. $batchIndex++;
  151. }
  152. }
  153. // 检查子进程状态
  154. foreach ($activeWorkers as $pid => $batchIdx) {
  155. $res = pcntl_waitpid($pid, $status, WNOHANG);
  156. if ($res == $pid) {
  157. // 子进程完成
  158. unset($activeWorkers[$pid]);
  159. // 读取结果
  160. $resultFile = $tmpDir . '/batch_' . $batchIdx . '.json';
  161. if (file_exists($resultFile)) {
  162. $raw = file_get_contents($resultFile);
  163. $result = json_decode($raw, true);
  164. $expected = $batchSizes[$batchIdx] ?? 0;
  165. if (!is_array($result)) {
  166. $result = [
  167. 'processed' => 0,
  168. 'failed' => $expected,
  169. 'error' => 'invalid result file: ' . ($raw === '' ? 'empty' : 'malformed json'),
  170. ];
  171. }
  172. $processed = $result['processed'] ?? 0;
  173. $failed = $result['failed'] ?? $expected;
  174. $accounted = $processed + $failed;
  175. if ($expected > 0 && $accounted < $expected) {
  176. $failed += ($expected - $accounted);
  177. }
  178. $processedTotal += $processed;
  179. $failedTotal += $failed;
  180. if (!empty($result['error'])) {
  181. echo " ⚠️ 批次 {$batchIdx} 错误: {$result['error']}\n";
  182. }
  183. if (isset($result['duration_sec'])) {
  184. $duration = (float) $result['duration_sec'];
  185. $batchDurations[] = $duration;
  186. $count = count($batchDurations);
  187. echo sprintf(" ⏱️ 批次 %d 用时: %.2fs\n", $batchIdx, $duration);
  188. if ($count % 10 === 0) {
  189. $recent = array_slice($batchDurations, -10);
  190. $avg = array_sum($recent) / max(count($recent), 1);
  191. echo sprintf(" 📈 最近 10 批次平均用时: %.2fs (估算 10 批次)\n", $avg);
  192. }
  193. }
  194. $percent = round(($processedTotal / $candidateCount) * 100, 1);
  195. echo " ⏳ 进度: {$processedTotal}/{$candidateCount} ({$percent}%)\n";
  196. } else {
  197. $failedTotal += $batchSizes[$batchIdx] ?? 0;
  198. echo " ⚠️ 批次 {$batchIdx} 未找到结果文件,已计为失败\n";
  199. }
  200. }
  201. }
  202. // 短暂休眠避免CPU占用过高
  203. usleep(100000); // 0.1秒
  204. }
  205. // 清理临时文件
  206. array_map('unlink', glob($tmpDir . '/*'));
  207. rmdir($tmpDir);
  208. // 更新最终状态
  209. $import->update([
  210. 'status' => 'parsed',
  211. 'progress_stage' => MarkdownImport::STAGE_PARSED,
  212. 'progress_message' => "解析完成,成功 {$processedTotal},失败 {$failedTotal}",
  213. 'progress_current' => $processedTotal,
  214. 'progress_total' => $candidateCount,
  215. 'progress_updated_at' => now(),
  216. 'processing_finished_at' => now(),
  217. ]);
  218. echo " ✅ 完成: 成功 {$processedTotal} 题,失败 {$failedTotal} 题\n\n";
  219. }
  220. echo "=== 所有任务完成 ===\n\n";
  221. // 子进程处理函数
  222. function processBatch($importId, $candidateIds) {
  223. // 确保自动加载器已加载
  224. if (!file_exists(__DIR__ . '/vendor/autoload.php')) {
  225. return [
  226. 'processed' => 0,
  227. 'failed' => count($candidateIds),
  228. 'error' => 'Composer autoload not found',
  229. ];
  230. }
  231. require_once __DIR__ . '/vendor/autoload.php';
  232. // 在子进程中重新初始化 Laravel 应用
  233. $app = require __DIR__ . '/bootstrap/app.php';
  234. // 先检查应用实例是否正确
  235. if (!$app instanceof Illuminate\Foundation\Application) {
  236. return [
  237. 'processed' => 0,
  238. 'failed' => count($candidateIds),
  239. 'error' => 'Laravel app initialization failed in child process',
  240. ];
  241. }
  242. try {
  243. Facade::clearResolvedInstances();
  244. Facade::setFacadeApplication($app);
  245. $kernel = $app->make(Illuminate\Contracts\Console\Kernel::class);
  246. $kernel->bootstrap();
  247. } catch (Throwable $e) {
  248. return [
  249. 'processed' => 0,
  250. 'failed' => count($candidateIds),
  251. 'error' => 'Failed to bootstrap kernel: ' . $e->getMessage(),
  252. ];
  253. }
  254. try {
  255. DB::disconnect();
  256. DB::reconnect();
  257. DB::disableQueryLog();
  258. } catch (Throwable $e) {
  259. return [
  260. 'processed' => 0,
  261. 'failed' => count($candidateIds),
  262. 'error' => 'Failed to reconnect DB: ' . $e->getMessage(),
  263. ];
  264. }
  265. // 直接从容器获取服务
  266. try {
  267. $parser = $app->make(App\Services\MarkdownQuestionParser::class);
  268. } catch (Throwable $e) {
  269. return [
  270. 'processed' => 0,
  271. 'failed' => count($candidateIds),
  272. 'error' => 'Failed to resolve services: ' . $e->getMessage(),
  273. ];
  274. }
  275. $processed = 0;
  276. $failed = 0;
  277. foreach ($candidateIds as $candidateId) {
  278. try {
  279. // 在子进程中也需要重新连接数据库
  280. $candidate = App\Models\PreQuestionCandidate::find($candidateId);
  281. if (!$candidate) {
  282. $failed++;
  283. continue;
  284. }
  285. // 如果已经解析过,跳过
  286. $meta = $candidate->meta ?? [];
  287. if (!empty($meta['ai_parsed'])) {
  288. $processed++;
  289. continue;
  290. }
  291. // 执行 AI 解析
  292. $parsed = $parser->parseRawMarkdown((string) $candidate->raw_markdown, (int) $candidate->index);
  293. $meta = $candidate->meta ?? [];
  294. $meta['ai_parsed'] = true;
  295. $meta['ai_parsed_at'] = now()->toDateTimeString();
  296. $candidate->update([
  297. 'stem' => $parsed['stem'] ?? null,
  298. 'options' => $parsed['options'] ?? null,
  299. 'images' => $parsed['images'] ?? [],
  300. 'tables' => $parsed['tables'] ?? [],
  301. 'is_question_candidate' => (bool) ($parsed['is_question_candidate'] ?? false),
  302. 'ai_confidence' => $parsed['ai_confidence'] ?? null,
  303. 'status' => 'pending',
  304. 'meta' => $meta,
  305. ]);
  306. $processed++;
  307. } catch (\Throwable $e) {
  308. $failed++;
  309. // 记录详细错误信息到子进程stderr
  310. fwrite(STDERR, sprintf(
  311. "[Child PID %d] Candidate %d failed: %s in %s:%d\n%s\n",
  312. getmypid(),
  313. $candidateId,
  314. $e->getMessage(),
  315. basename($e->getFile()),
  316. $e->getLine(),
  317. $e->getTraceAsString()
  318. ));
  319. }
  320. }
  321. return [
  322. 'processed' => $processed,
  323. 'failed' => $failed,
  324. ];
  325. }