TaskManager.php 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. <?php
  2. namespace App\Services;
  3. use Illuminate\Http\Client\Response;
  4. use Illuminate\Support\Facades\Cache;
  5. use Illuminate\Support\Facades\Http;
  6. use Illuminate\Support\Facades\Log;
  7. /**
  8. * 统一异步任务管理器
  9. * 负责所有异步任务的创建、状态管理、进度更新和回调通知
  10. */
  11. class TaskManager
  12. {
  13. /**
  14. * 任务类型常量
  15. */
  16. const TASK_TYPE_EXAM = 'exam'; // 智能出卷任务
  17. const TASK_TYPE_ANALYSIS = 'analysis'; // 学情分析任务
  18. /**
  19. * 任务状态常量
  20. */
  21. const STATUS_PENDING = 'pending';
  22. const STATUS_PROCESSING = 'processing';
  23. const STATUS_COMPLETED = 'completed';
  24. const STATUS_FAILED = 'failed';
  25. /**
  26. * 创建异步任务
  27. */
  28. public function createTask(string $type, array $data): string
  29. {
  30. $taskId = $this->generateTaskId($type, $data);
  31. $taskData = [
  32. 'task_id' => $taskId,
  33. 'type' => $type,
  34. 'status' => self::STATUS_PROCESSING,
  35. 'progress' => 0,
  36. 'message' => '任务已创建,正在处理...',
  37. 'data' => $data,
  38. 'created_at' => now()->toISOString(),
  39. 'updated_at' => now()->toISOString(),
  40. 'callback_url' => $data['callback_url'] ?? null,
  41. // 【优化】根据任务类型设置不同的超时时间
  42. 'expires_at' => now()->addSeconds($type === self::TASK_TYPE_EXAM ? 45 : 30)->toISOString(),
  43. ];
  44. $this->saveTask($taskId, $taskData);
  45. // 保存任务映射(用于快速查找)
  46. $this->saveTaskMapping($type, $data, $taskId);
  47. Log::info('TaskManager: 任务已创建', [
  48. 'task_id' => $taskId,
  49. 'type' => $type,
  50. 'data_keys' => array_keys($data),
  51. ]);
  52. return $taskId;
  53. }
  54. /**
  55. * 获取任务状态
  56. */
  57. public function getTaskStatus(string $taskId): ?array
  58. {
  59. return $this->loadTask($taskId);
  60. }
  61. /**
  62. * 更新任务状态
  63. */
  64. public function updateTaskStatus(string $taskId, array $updates): void
  65. {
  66. $task = $this->loadTask($taskId);
  67. if (!$task) {
  68. Log::warning('TaskManager: 尝试更新不存在的任务', ['task_id' => $taskId]);
  69. return;
  70. }
  71. $updatedTask = array_merge($task, $updates, [
  72. 'updated_at' => now()->toISOString(),
  73. ]);
  74. $this->saveTask($taskId, $updatedTask);
  75. Log::info('TaskManager: 任务状态已更新', [
  76. 'task_id' => $taskId,
  77. 'status' => $updates['status'] ?? 'N/A',
  78. 'progress' => $updates['progress'] ?? 'N/A',
  79. ]);
  80. }
  81. /**
  82. * 更新任务进度
  83. */
  84. public function updateTaskProgress(string $taskId, int $progress, string $message): void
  85. {
  86. $this->updateTaskStatus($taskId, [
  87. 'progress' => $progress,
  88. 'message' => $message,
  89. ]);
  90. }
  91. /**
  92. * 标记任务完成
  93. */
  94. public function markTaskCompleted(string $taskId, array $result): void
  95. {
  96. $this->updateTaskStatus($taskId, array_merge($result, [
  97. 'status' => self::STATUS_COMPLETED,
  98. 'progress' => 100,
  99. 'message' => '任务已完成',
  100. 'completed_at' => now()->toISOString(),
  101. // 【新增】任务完成时延长回调时间(给回调15秒时间)
  102. 'callback_expires_at' => now()->addSeconds(15)->toISOString(),
  103. ]));
  104. }
  105. /**
  106. * 标记任务失败
  107. */
  108. public function markTaskFailed(string $taskId, string $error): void
  109. {
  110. $this->updateTaskStatus($taskId, [
  111. 'status' => self::STATUS_FAILED,
  112. 'progress' => 0,
  113. 'message' => '任务失败: ' . $error,
  114. 'error' => $error,
  115. ]);
  116. Log::error('TaskManager: 任务执行失败', [
  117. 'task_id' => $taskId,
  118. 'error' => $error,
  119. ]);
  120. }
  121. /**
  122. * 发送回调通知
  123. */
  124. public function sendCallback(string $taskId): void
  125. {
  126. $task = $this->loadTask($taskId);
  127. if (!$task || !$task['callback_url']) {
  128. return; // 没有回调URL或任务不存在
  129. }
  130. // 【优化】检查任务是否超时(优先检查callback_expires_at)
  131. $callbackExpiresAt = $task['callback_expires_at'] ?? $task['expires_at'] ?? null;
  132. if ($callbackExpiresAt && now()->gt($callbackExpiresAt)) {
  133. Log::warning('TaskManager: 回调已超时,跳过发送', [
  134. 'task_id' => $taskId,
  135. 'callback_expires_at' => $callbackExpiresAt,
  136. 'current_time' => now()->toISOString(),
  137. 'task_status' => $task['status'],
  138. ]);
  139. return;
  140. }
  141. try {
  142. $payload = $this->buildCallbackPayload($task);
  143. $response = Http::timeout(30)->post($task['callback_url'], $payload);
  144. if ($response->successful()) {
  145. Log::info('TaskManager: 回调通知发送成功', [
  146. 'task_id' => $taskId,
  147. 'callback_url' => $task['callback_url'],
  148. 'status' => $response->status(),
  149. ]);
  150. } else {
  151. Log::warning('TaskManager: 回调通知发送失败', [
  152. 'task_id' => $taskId,
  153. 'callback_url' => $task['callback_url'],
  154. 'status' => $response->status(),
  155. 'response_body' => $response->body(),
  156. ]);
  157. }
  158. } catch (\Exception $e) {
  159. Log::error('TaskManager: 回调通知异常', [
  160. 'task_id' => $taskId,
  161. 'callback_url' => $task['callback_url'] ?? 'unknown',
  162. 'error' => $e->getMessage(),
  163. ]);
  164. }
  165. }
  166. /**
  167. * 生成任务ID
  168. */
  169. private function generateTaskId(string $type, array $data): string
  170. {
  171. $prefix = match ($type) {
  172. self::TASK_TYPE_EXAM => 'exam_task',
  173. self::TASK_TYPE_ANALYSIS => 'analysis_task',
  174. default => 'task_' . $type,
  175. };
  176. return $prefix . '_' . uniqid() . '_' . substr(md5(serialize($data) . time()), 0, 8);
  177. }
  178. /**
  179. * 保存任务到缓存
  180. */
  181. private function saveTask(string $taskId, array $taskData): void
  182. {
  183. Cache::put($this->getCacheKey($taskId), $taskData, now()->addDay());
  184. }
  185. /**
  186. * 从缓存加载任务
  187. */
  188. private function loadTask(string $taskId): ?array
  189. {
  190. return Cache::get($this->getCacheKey($taskId));
  191. }
  192. /**
  193. * 获取缓存键
  194. */
  195. private function getCacheKey(string $taskId): string
  196. {
  197. return "task:{$taskId}";
  198. }
  199. /**
  200. * 保存任务映射
  201. */
  202. private function saveTaskMapping(string $type, array $data, string $taskId): void
  203. {
  204. if ($type === self::TASK_TYPE_ANALYSIS) {
  205. $paperId = $data['paperId'] ?? $data['paper_id'] ?? null;
  206. $studentId = $data['studentId'] ?? $data['student_id'] ?? null;
  207. if ($paperId) {
  208. $mappingKey = $this->getTaskMappingKey($paperId, $studentId);
  209. Cache::put($mappingKey, $taskId, now()->addDay());
  210. }
  211. }
  212. }
  213. /**
  214. * 根据试卷ID查找分析任务
  215. */
  216. public function findAnalysisTaskByPaperId(string $paperId, ?string $studentId = null): ?array
  217. {
  218. // 尝试从映射缓存中获取任务ID
  219. $mappingKey = $this->getTaskMappingKey($paperId, $studentId);
  220. $taskId = Cache::get($mappingKey);
  221. if ($taskId) {
  222. $task = $this->loadTask($taskId);
  223. if ($task) {
  224. return $task;
  225. } else {
  226. // 任务不存在,清理映射缓存
  227. Cache::forget($mappingKey);
  228. }
  229. }
  230. return null;
  231. }
  232. /**
  233. * 获取任务映射缓存键
  234. */
  235. private function getTaskMappingKey(string $paperId, ?string $studentId = null): string
  236. {
  237. $studentPart = $studentId ? "_{$studentId}" : '';
  238. return "task_mapping:analysis:{$paperId}{$studentPart}";
  239. }
  240. /**
  241. * 构建回调负载
  242. */
  243. private function buildCallbackPayload(array $task): array
  244. {
  245. $basePayload = [
  246. 'task_id' => $task['task_id'],
  247. 'type' => $task['type'],
  248. 'status' => $task['status'],
  249. 'completed_at' => $task['completed_at'] ?? null,
  250. ];
  251. // 根据任务类型添加特定数据
  252. if ($task['type'] === self::TASK_TYPE_EXAM) {
  253. $basePayload['callback_type'] = 'exam_pdf_generated';
  254. $basePayload['paper_id'] = $task['data']['paper_id'] ?? null;
  255. $basePayload['pdfs'] = $task['pdfs'] ?? null;
  256. $basePayload['exam_content'] = $task['exam_content'] ?? null;
  257. } elseif ($task['type'] === self::TASK_TYPE_ANALYSIS) {
  258. $basePayload['callback_type'] = 'analysis_report_generated';
  259. $basePayload['paper_id'] = $task['data']['paper_id'] ?? $task['data']['paperId'] ?? null;
  260. $basePayload['student_id'] = $task['data']['student_id'] ?? $task['data']['studentId'] ?? null;
  261. $basePayload['pdf_url'] = $task['pdf_url'] ?? null;
  262. }
  263. return $basePayload;
  264. }
  265. }