TaskManager.php 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. <?php
  2. namespace App\Services;
  3. use Illuminate\Http\Client\Response;
  4. use Illuminate\Support\Facades\Cache;
  5. use Illuminate\Support\Facades\Http;
  6. use Illuminate\Support\Facades\Log;
  7. /**
  8. * 统一异步任务管理器
  9. * 负责所有异步任务的创建、状态管理、进度更新和回调通知
  10. */
  11. class TaskManager
  12. {
  13. /**
  14. * 任务类型常量
  15. */
  16. const TASK_TYPE_EXAM = 'exam'; // 智能出卷任务
  17. const TASK_TYPE_ANALYSIS = 'analysis'; // 学情分析任务
  18. /**
  19. * 任务状态常量
  20. */
  21. const STATUS_PENDING = 'pending';
  22. const STATUS_PROCESSING = 'processing';
  23. const STATUS_COMPLETED = 'completed';
  24. const STATUS_FAILED = 'failed';
  25. /**
  26. * 创建异步任务
  27. */
  28. public function createTask(string $type, array $data): string
  29. {
  30. $taskId = $this->generateTaskId($type, $data);
  31. $taskData = [
  32. 'task_id' => $taskId,
  33. 'type' => $type,
  34. 'status' => self::STATUS_PROCESSING,
  35. 'progress' => 0,
  36. 'message' => '任务已创建,正在处理...',
  37. 'data' => $data,
  38. 'created_at' => now()->toISOString(),
  39. 'updated_at' => now()->toISOString(),
  40. 'callback_url' => $data['callback_url'] ?? null,
  41. ];
  42. $this->saveTask($taskId, $taskData);
  43. Log::info('TaskManager: 任务已创建', [
  44. 'task_id' => $taskId,
  45. 'type' => $type,
  46. 'data_keys' => array_keys($data),
  47. ]);
  48. return $taskId;
  49. }
  50. /**
  51. * 获取任务状态
  52. */
  53. public function getTaskStatus(string $taskId): ?array
  54. {
  55. return $this->loadTask($taskId);
  56. }
  57. /**
  58. * 更新任务状态
  59. */
  60. public function updateTaskStatus(string $taskId, array $updates): void
  61. {
  62. $task = $this->loadTask($taskId);
  63. if (!$task) {
  64. Log::warning('TaskManager: 尝试更新不存在的任务', ['task_id' => $taskId]);
  65. return;
  66. }
  67. $updatedTask = array_merge($task, $updates, [
  68. 'updated_at' => now()->toISOString(),
  69. ]);
  70. $this->saveTask($taskId, $updatedTask);
  71. Log::info('TaskManager: 任务状态已更新', [
  72. 'task_id' => $taskId,
  73. 'status' => $updates['status'] ?? 'N/A',
  74. 'progress' => $updates['progress'] ?? 'N/A',
  75. ]);
  76. }
  77. /**
  78. * 更新任务进度
  79. */
  80. public function updateTaskProgress(string $taskId, int $progress, string $message): void
  81. {
  82. $this->updateTaskStatus($taskId, [
  83. 'progress' => $progress,
  84. 'message' => $message,
  85. ]);
  86. }
  87. /**
  88. * 标记任务完成
  89. */
  90. public function markTaskCompleted(string $taskId, array $result): void
  91. {
  92. $this->updateTaskStatus($taskId, array_merge($result, [
  93. 'status' => self::STATUS_COMPLETED,
  94. 'progress' => 100,
  95. 'message' => '任务已完成',
  96. 'completed_at' => now()->toISOString(),
  97. ]));
  98. }
  99. /**
  100. * 标记任务失败
  101. */
  102. public function markTaskFailed(string $taskId, string $error): void
  103. {
  104. $this->updateTaskStatus($taskId, [
  105. 'status' => self::STATUS_FAILED,
  106. 'progress' => 0,
  107. 'message' => '任务失败: ' . $error,
  108. 'error' => $error,
  109. ]);
  110. Log::error('TaskManager: 任务执行失败', [
  111. 'task_id' => $taskId,
  112. 'error' => $error,
  113. ]);
  114. }
  115. /**
  116. * 发送回调通知
  117. */
  118. public function sendCallback(string $taskId): void
  119. {
  120. $task = $this->loadTask($taskId);
  121. if (!$task || !$task['callback_url']) {
  122. return; // 没有回调URL或任务不存在
  123. }
  124. try {
  125. $payload = $this->buildCallbackPayload($task);
  126. $response = Http::timeout(30)->post($task['callback_url'], $payload);
  127. if ($response->successful()) {
  128. Log::info('TaskManager: 回调通知发送成功', [
  129. 'task_id' => $taskId,
  130. 'callback_url' => $task['callback_url'],
  131. ]);
  132. } else {
  133. Log::warning('TaskManager: 回调通知发送失败', [
  134. 'task_id' => $taskId,
  135. 'callback_url' => $task['callback_url'],
  136. 'status' => $response->status(),
  137. ]);
  138. }
  139. } catch (\Exception $e) {
  140. Log::error('TaskManager: 回调通知异常', [
  141. 'task_id' => $taskId,
  142. 'callback_url' => $task['callback_url'] ?? 'unknown',
  143. 'error' => $e->getMessage(),
  144. ]);
  145. }
  146. }
  147. /**
  148. * 生成任务ID
  149. */
  150. private function generateTaskId(string $type, array $data): string
  151. {
  152. $prefix = match ($type) {
  153. self::TASK_TYPE_EXAM => 'exam_task',
  154. self::TASK_TYPE_ANALYSIS => 'analysis_task',
  155. default => 'task_' . $type,
  156. };
  157. return $prefix . '_' . uniqid() . '_' . substr(md5(serialize($data) . time()), 0, 8);
  158. }
  159. /**
  160. * 保存任务到缓存
  161. */
  162. private function saveTask(string $taskId, array $taskData): void
  163. {
  164. Cache::put($this->getCacheKey($taskId), $taskData, now()->addDay());
  165. }
  166. /**
  167. * 从缓存加载任务
  168. */
  169. private function loadTask(string $taskId): ?array
  170. {
  171. return Cache::get($this->getCacheKey($taskId));
  172. }
  173. /**
  174. * 获取缓存键
  175. */
  176. private function getCacheKey(string $taskId): string
  177. {
  178. return "task:{$taskId}";
  179. }
  180. /**
  181. * 构建回调负载
  182. */
  183. private function buildCallbackPayload(array $task): array
  184. {
  185. $basePayload = [
  186. 'task_id' => $task['task_id'],
  187. 'type' => $task['type'],
  188. 'status' => $task['status'],
  189. 'completed_at' => $task['completed_at'] ?? null,
  190. ];
  191. // 根据任务类型添加特定数据
  192. if ($task['type'] === self::TASK_TYPE_EXAM) {
  193. $basePayload['callback_type'] = 'exam_pdf_generated';
  194. $basePayload['paper_id'] = $task['data']['paper_id'] ?? null;
  195. $basePayload['pdfs'] = $task['pdfs'] ?? null;
  196. $basePayload['exam_content'] = $task['exam_content'] ?? null;
  197. } elseif ($task['type'] === self::TASK_TYPE_ANALYSIS) {
  198. $basePayload['callback_type'] = 'analysis_report_generated';
  199. $basePayload['paper_id'] = $task['data']['paper_id'] ?? $task['data']['paperId'] ?? null;
  200. $basePayload['student_id'] = $task['data']['student_id'] ?? $task['data']['studentId'] ?? null;
  201. $basePayload['pdf_url'] = $task['pdf_url'] ?? null;
  202. }
  203. return $basePayload;
  204. }
  205. }