| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- <?php
- namespace App\Services;
- use Illuminate\Http\Client\Response;
- use Illuminate\Support\Facades\Cache;
- use Illuminate\Support\Facades\Http;
- use Illuminate\Support\Facades\Log;
- /**
- * 统一异步任务管理器
- * 负责所有异步任务的创建、状态管理、进度更新和回调通知
- */
- class TaskManager
- {
- /**
- * 任务类型常量
- */
- const TASK_TYPE_EXAM = 'exam'; // 智能出卷任务
- const TASK_TYPE_ANALYSIS = 'analysis'; // 学情分析任务
- /**
- * 任务状态常量
- */
- const STATUS_PENDING = 'pending';
- const STATUS_PROCESSING = 'processing';
- const STATUS_COMPLETED = 'completed';
- const STATUS_FAILED = 'failed';
- /**
- * 创建异步任务
- */
- public function createTask(string $type, array $data): string
- {
- $taskId = $this->generateTaskId($type, $data);
- $taskData = [
- 'task_id' => $taskId,
- 'type' => $type,
- 'status' => self::STATUS_PROCESSING,
- 'progress' => 0,
- 'message' => '任务已创建,正在处理...',
- 'data' => $data,
- 'created_at' => now()->toISOString(),
- 'updated_at' => now()->toISOString(),
- 'callback_url' => $data['callback_url'] ?? null,
- ];
- $this->saveTask($taskId, $taskData);
- Log::info('TaskManager: 任务已创建', [
- 'task_id' => $taskId,
- 'type' => $type,
- 'data_keys' => array_keys($data),
- ]);
- return $taskId;
- }
- /**
- * 获取任务状态
- */
- public function getTaskStatus(string $taskId): ?array
- {
- return $this->loadTask($taskId);
- }
- /**
- * 更新任务状态
- */
- public function updateTaskStatus(string $taskId, array $updates): void
- {
- $task = $this->loadTask($taskId);
- if (!$task) {
- Log::warning('TaskManager: 尝试更新不存在的任务', ['task_id' => $taskId]);
- return;
- }
- $updatedTask = array_merge($task, $updates, [
- 'updated_at' => now()->toISOString(),
- ]);
- $this->saveTask($taskId, $updatedTask);
- Log::info('TaskManager: 任务状态已更新', [
- 'task_id' => $taskId,
- 'status' => $updates['status'] ?? 'N/A',
- 'progress' => $updates['progress'] ?? 'N/A',
- ]);
- }
- /**
- * 更新任务进度
- */
- public function updateTaskProgress(string $taskId, int $progress, string $message): void
- {
- $this->updateTaskStatus($taskId, [
- 'progress' => $progress,
- 'message' => $message,
- ]);
- }
- /**
- * 标记任务完成
- */
- public function markTaskCompleted(string $taskId, array $result): void
- {
- $this->updateTaskStatus($taskId, array_merge($result, [
- 'status' => self::STATUS_COMPLETED,
- 'progress' => 100,
- 'message' => '任务已完成',
- 'completed_at' => now()->toISOString(),
- ]));
- }
- /**
- * 标记任务失败
- */
- public function markTaskFailed(string $taskId, string $error): void
- {
- $this->updateTaskStatus($taskId, [
- 'status' => self::STATUS_FAILED,
- 'progress' => 0,
- 'message' => '任务失败: ' . $error,
- 'error' => $error,
- ]);
- Log::error('TaskManager: 任务执行失败', [
- 'task_id' => $taskId,
- 'error' => $error,
- ]);
- }
- /**
- * 发送回调通知
- */
- public function sendCallback(string $taskId): void
- {
- $task = $this->loadTask($taskId);
- if (!$task || !$task['callback_url']) {
- return; // 没有回调URL或任务不存在
- }
- try {
- $payload = $this->buildCallbackPayload($task);
- $response = Http::timeout(30)->post($task['callback_url'], $payload);
- if ($response->successful()) {
- Log::info('TaskManager: 回调通知发送成功', [
- 'task_id' => $taskId,
- 'callback_url' => $task['callback_url'],
- ]);
- } else {
- Log::warning('TaskManager: 回调通知发送失败', [
- 'task_id' => $taskId,
- 'callback_url' => $task['callback_url'],
- 'status' => $response->status(),
- ]);
- }
- } catch (\Exception $e) {
- Log::error('TaskManager: 回调通知异常', [
- 'task_id' => $taskId,
- 'callback_url' => $task['callback_url'] ?? 'unknown',
- 'error' => $e->getMessage(),
- ]);
- }
- }
- /**
- * 生成任务ID
- */
- private function generateTaskId(string $type, array $data): string
- {
- $prefix = match ($type) {
- self::TASK_TYPE_EXAM => 'exam_task',
- self::TASK_TYPE_ANALYSIS => 'analysis_task',
- default => 'task_' . $type,
- };
- return $prefix . '_' . uniqid() . '_' . substr(md5(serialize($data) . time()), 0, 8);
- }
- /**
- * 保存任务到缓存
- */
- private function saveTask(string $taskId, array $taskData): void
- {
- Cache::put($this->getCacheKey($taskId), $taskData, now()->addDay());
- }
- /**
- * 从缓存加载任务
- */
- private function loadTask(string $taskId): ?array
- {
- return Cache::get($this->getCacheKey($taskId));
- }
- /**
- * 获取缓存键
- */
- private function getCacheKey(string $taskId): string
- {
- return "task:{$taskId}";
- }
- /**
- * 构建回调负载
- */
- private function buildCallbackPayload(array $task): array
- {
- $basePayload = [
- 'task_id' => $task['task_id'],
- 'type' => $task['type'],
- 'status' => $task['status'],
- 'completed_at' => $task['completed_at'] ?? null,
- ];
- // 根据任务类型添加特定数据
- if ($task['type'] === self::TASK_TYPE_EXAM) {
- $basePayload['callback_type'] = 'exam_pdf_generated';
- $basePayload['paper_id'] = $task['data']['paper_id'] ?? null;
- $basePayload['pdfs'] = $task['pdfs'] ?? null;
- $basePayload['exam_content'] = $task['exam_content'] ?? null;
- } elseif ($task['type'] === self::TASK_TYPE_ANALYSIS) {
- $basePayload['callback_type'] = 'analysis_report_generated';
- $basePayload['paper_id'] = $task['data']['paper_id'] ?? $task['data']['paperId'] ?? null;
- $basePayload['student_id'] = $task['data']['student_id'] ?? $task['data']['studentId'] ?? null;
- $basePayload['pdf_url'] = $task['pdf_url'] ?? null;
- }
- return $basePayload;
- }
- }
|