TaskManager.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. <?php
  2. namespace App\Services;
  3. use Illuminate\Http\Client\Response;
  4. use Illuminate\Support\Facades\Cache;
  5. use Illuminate\Support\Facades\Http;
  6. use Illuminate\Support\Facades\Log;
  7. /**
  8. * 统一异步任务管理器
  9. * 负责所有异步任务的创建、状态管理、进度更新和回调通知
  10. */
  11. class TaskManager
  12. {
  13. /**
  14. * 任务类型常量
  15. */
  16. const TASK_TYPE_EXAM = 'exam'; // 智能出卷任务
  17. const TASK_TYPE_ANALYSIS = 'analysis'; // 学情分析任务
  18. /**
  19. * 任务状态常量
  20. */
  21. const STATUS_PENDING = 'pending';
  22. const STATUS_PROCESSING = 'processing';
  23. const STATUS_COMPLETED = 'completed';
  24. const STATUS_FAILED = 'failed';
  25. /**
  26. * 创建异步任务
  27. */
  28. public function createTask(string $type, array $data): string
  29. {
  30. $taskId = $this->generateTaskId($type, $data);
  31. $taskData = [
  32. 'task_id' => $taskId,
  33. 'type' => $type,
  34. 'status' => self::STATUS_PROCESSING,
  35. 'progress' => 0,
  36. 'message' => '任务已创建,正在处理...',
  37. 'data' => $data,
  38. 'created_at' => now()->toISOString(),
  39. 'updated_at' => now()->toISOString(),
  40. 'callback_url' => $data['callback_url'] ?? null,
  41. // 分析任务已异步化,给分析/PDF链路留足回调窗口。
  42. 'expires_at' => now()->addSeconds($type === self::TASK_TYPE_EXAM ? 240 : 600)->toISOString(),
  43. ];
  44. $this->saveTask($taskId, $taskData);
  45. // 保存任务映射(用于快速查找)
  46. $this->saveTaskMapping($type, $data, $taskId);
  47. Log::info('TaskManager: 任务已创建', [
  48. 'task_id' => $taskId,
  49. 'type' => $type,
  50. 'data_keys' => array_keys($data),
  51. ]);
  52. return $taskId;
  53. }
  54. /**
  55. * 获取任务状态
  56. */
  57. public function getTaskStatus(string $taskId): ?array
  58. {
  59. return $this->loadTask($taskId);
  60. }
  61. /**
  62. * 更新任务状态
  63. */
  64. public function updateTaskStatus(string $taskId, array $updates): void
  65. {
  66. $task = $this->loadTask($taskId);
  67. if (!$task) {
  68. Log::warning('TaskManager: 尝试更新不存在的任务', ['task_id' => $taskId]);
  69. return;
  70. }
  71. $updatedTask = array_merge($task, $updates, [
  72. 'updated_at' => now()->toISOString(),
  73. ]);
  74. $this->saveTask($taskId, $updatedTask);
  75. Log::info('TaskManager: 任务状态已更新', [
  76. 'task_id' => $taskId,
  77. 'status' => $updates['status'] ?? 'N/A',
  78. 'progress' => $updates['progress'] ?? 'N/A',
  79. ]);
  80. }
  81. /**
  82. * 更新任务进度
  83. */
  84. public function updateTaskProgress(string $taskId, int $progress, string $message): void
  85. {
  86. $this->updateTaskStatus($taskId, [
  87. 'progress' => $progress,
  88. 'message' => $message,
  89. ]);
  90. }
  91. /**
  92. * 标记任务完成
  93. */
  94. public function markTaskCompleted(string $taskId, array $result): void
  95. {
  96. $this->updateTaskStatus($taskId, array_merge($result, [
  97. 'status' => self::STATUS_COMPLETED,
  98. 'progress' => 100,
  99. 'message' => '任务已完成',
  100. 'completed_at' => now()->toISOString(),
  101. // 【新增】任务完成时延长回调时间(给回调15秒时间)
  102. 'callback_expires_at' => now()->addSeconds(15)->toISOString(),
  103. ]));
  104. }
  105. /**
  106. * 标记任务失败
  107. */
  108. public function markTaskFailed(string $taskId, string $error): void
  109. {
  110. $this->updateTaskStatus($taskId, [
  111. 'status' => self::STATUS_FAILED,
  112. 'progress' => 0,
  113. 'message' => '任务失败: ' . $error,
  114. 'error' => $error,
  115. ]);
  116. Log::error('TaskManager: 任务执行失败', [
  117. 'task_id' => $taskId,
  118. 'error' => $error,
  119. ]);
  120. }
  121. /**
  122. * 发送回调通知
  123. */
  124. public function sendCallback(string $taskId): void
  125. {
  126. $task = $this->loadTask($taskId);
  127. if (!$task || !$task['callback_url']) {
  128. return; // 没有回调URL或任务不存在
  129. }
  130. // 【优化】检查任务是否超时(优先检查callback_expires_at)
  131. $callbackExpiresAt = $task['callback_expires_at'] ?? $task['expires_at'] ?? null;
  132. if ($callbackExpiresAt && now()->gt($callbackExpiresAt)) {
  133. Log::warning('TaskManager: 回调已超时,跳过发送', [
  134. 'task_id' => $taskId,
  135. 'callback_expires_at' => $callbackExpiresAt,
  136. 'current_time' => now()->toISOString(),
  137. 'task_status' => $task['status'],
  138. ]);
  139. return;
  140. }
  141. try {
  142. $payload = $this->buildCallbackPayload($task);
  143. Log::info('TaskManager: 回调请求负载', [
  144. 'task_id' => $taskId,
  145. 'callback_url' => $task['callback_url'],
  146. 'callback_type' => $payload['callback_type'] ?? null,
  147. 'status' => $payload['status'] ?? null,
  148. 'paper_id' => $payload['paper_id'] ?? null,
  149. 'knowledge_id' => $payload['knowledge_id'] ?? null,
  150. 'pdf_url' => $payload['pdf_url'] ?? null,
  151. 'grading_pdf_url' => $payload['grading_pdf_url'] ?? null,
  152. 'pdfs' => $payload['pdfs'] ?? null,
  153. 'payload_keys' => array_keys($payload),
  154. ]);
  155. $response = Http::timeout(30)->post($task['callback_url'], $payload);
  156. if ($response->successful()) {
  157. Log::info('TaskManager: 回调通知发送成功', [
  158. 'task_id' => $taskId,
  159. 'callback_url' => $task['callback_url'],
  160. 'status' => $response->status(),
  161. ]);
  162. } else {
  163. Log::warning('TaskManager: 回调通知发送失败', [
  164. 'task_id' => $taskId,
  165. 'callback_url' => $task['callback_url'],
  166. 'status' => $response->status(),
  167. 'response_body' => $response->body(),
  168. ]);
  169. }
  170. } catch (\Exception $e) {
  171. Log::error('TaskManager: 回调通知异常', [
  172. 'task_id' => $taskId,
  173. 'callback_url' => $task['callback_url'] ?? 'unknown',
  174. 'error' => $e->getMessage(),
  175. ]);
  176. }
  177. }
  178. /**
  179. * 生成任务ID
  180. */
  181. private function generateTaskId(string $type, array $data): string
  182. {
  183. $prefix = match ($type) {
  184. self::TASK_TYPE_EXAM => 'exam_task',
  185. self::TASK_TYPE_ANALYSIS => 'analysis_task',
  186. default => 'task_' . $type,
  187. };
  188. return $prefix . '_' . uniqid() . '_' . substr(md5(serialize($data) . time()), 0, 8);
  189. }
  190. /**
  191. * 保存任务到缓存
  192. */
  193. private function saveTask(string $taskId, array $taskData): void
  194. {
  195. Cache::put($this->getCacheKey($taskId), $taskData, now()->addDay());
  196. }
  197. /**
  198. * 从缓存加载任务
  199. */
  200. private function loadTask(string $taskId): ?array
  201. {
  202. return Cache::get($this->getCacheKey($taskId));
  203. }
  204. /**
  205. * 获取缓存键
  206. */
  207. private function getCacheKey(string $taskId): string
  208. {
  209. return "task:{$taskId}";
  210. }
  211. /**
  212. * 保存任务映射
  213. */
  214. private function saveTaskMapping(string $type, array $data, string $taskId): void
  215. {
  216. if ($type === self::TASK_TYPE_ANALYSIS) {
  217. $paperId = $data['paperId'] ?? $data['paper_id'] ?? null;
  218. $studentId = $data['studentId'] ?? $data['student_id'] ?? null;
  219. if ($paperId) {
  220. $mappingKey = $this->getTaskMappingKey($paperId, $studentId);
  221. Cache::put($mappingKey, $taskId, now()->addDay());
  222. }
  223. }
  224. }
  225. /**
  226. * 根据试卷ID查找分析任务
  227. */
  228. public function findAnalysisTaskByPaperId(string $paperId, ?string $studentId = null): ?array
  229. {
  230. // 尝试从映射缓存中获取任务ID
  231. $mappingKey = $this->getTaskMappingKey($paperId, $studentId);
  232. $taskId = Cache::get($mappingKey);
  233. if ($taskId) {
  234. $task = $this->loadTask($taskId);
  235. if ($task) {
  236. return $task;
  237. } else {
  238. // 任务不存在,清理映射缓存
  239. Cache::forget($mappingKey);
  240. }
  241. }
  242. return null;
  243. }
  244. /**
  245. * 获取任务映射缓存键
  246. */
  247. private function getTaskMappingKey(string $paperId, ?string $studentId = null): string
  248. {
  249. $studentPart = $studentId ? "_{$studentId}" : '';
  250. return "task_mapping:analysis:{$paperId}{$studentPart}";
  251. }
  252. /**
  253. * 构建回调负载
  254. */
  255. private function buildCallbackPayload(array $task): array
  256. {
  257. $basePayload = [
  258. 'task_id' => $task['task_id'],
  259. 'type' => $task['type'],
  260. 'status' => $task['status'],
  261. 'completed_at' => $task['completed_at'] ?? null,
  262. ];
  263. // 根据任务类型添加特定数据
  264. if ($task['type'] === self::TASK_TYPE_EXAM) {
  265. $basePayload['callback_type'] = 'exam_pdf_generated';
  266. $basePayload['paper_id'] = $task['data']['paper_id']
  267. ?? ($task['paper_id'] ?? null)
  268. ?? ($task['data']['knowledge_id'] ?? null)
  269. ?? ($task['knowledge_id'] ?? null);
  270. // 兼容历史调用方(新调用方统一读取 paper_id)
  271. $basePayload['knowledge_id'] = $task['data']['knowledge_id'] ?? ($task['knowledge_id'] ?? null);
  272. $basePayload['pdfs'] = $task['pdfs'] ?? null;
  273. // 兼容旧回调消费方:同时提供顶层 URL 字段,避免只读 pdf_url 导致“回调成功但前端无链接”
  274. $basePayload['pdf_url'] = $task['pdfs']['all_pdf']
  275. ?? $task['pdfs']['exam_paper_pdf']
  276. ?? ($task['pdf_url'] ?? null);
  277. $basePayload['grading_pdf_url'] = $task['pdfs']['grading_pdf']
  278. ?? ($task['grading_pdf_url'] ?? null);
  279. $basePayload['exam_content'] = $task['exam_content'] ?? null;
  280. $basePayload['stats'] = $task['stats'] ?? null;
  281. } elseif ($task['type'] === self::TASK_TYPE_ANALYSIS) {
  282. $basePayload['callback_type'] = 'analysis_report_generated';
  283. $basePayload['paper_id'] = $task['data']['paper_id'] ?? $task['data']['paperId'] ?? null;
  284. $basePayload['student_id'] = $task['data']['student_id'] ?? $task['data']['studentId'] ?? null;
  285. $basePayload['pdf_url'] = $task['pdf_url'] ?? null;
  286. }
  287. return $basePayload;
  288. }
  289. }