generateTaskId($type, $data); $taskData = [ 'task_id' => $taskId, 'type' => $type, 'status' => self::STATUS_PROCESSING, 'progress' => 0, 'message' => '任务已创建,正在处理...', 'data' => $data, 'created_at' => now()->toISOString(), 'updated_at' => now()->toISOString(), 'callback_url' => $data['callback_url'] ?? null, // 【优化】根据任务类型设置不同的超时时间 'expires_at' => now()->addSeconds($type === self::TASK_TYPE_EXAM ? 45 : 30)->toISOString(), ]; $this->saveTask($taskId, $taskData); // 保存任务映射(用于快速查找) $this->saveTaskMapping($type, $data, $taskId); Log::info('TaskManager: 任务已创建', [ 'task_id' => $taskId, 'type' => $type, 'data_keys' => array_keys($data), ]); return $taskId; } /** * 获取任务状态 */ public function getTaskStatus(string $taskId): ?array { return $this->loadTask($taskId); } /** * 更新任务状态 */ public function updateTaskStatus(string $taskId, array $updates): void { $task = $this->loadTask($taskId); if (!$task) { Log::warning('TaskManager: 尝试更新不存在的任务', ['task_id' => $taskId]); return; } $updatedTask = array_merge($task, $updates, [ 'updated_at' => now()->toISOString(), ]); $this->saveTask($taskId, $updatedTask); Log::info('TaskManager: 任务状态已更新', [ 'task_id' => $taskId, 'status' => $updates['status'] ?? 'N/A', 'progress' => $updates['progress'] ?? 'N/A', ]); } /** * 更新任务进度 */ public function updateTaskProgress(string $taskId, int $progress, string $message): void { $this->updateTaskStatus($taskId, [ 'progress' => $progress, 'message' => $message, ]); } /** * 标记任务完成 */ public function markTaskCompleted(string $taskId, array $result): void { $this->updateTaskStatus($taskId, array_merge($result, [ 'status' => self::STATUS_COMPLETED, 'progress' => 100, 'message' => '任务已完成', 'completed_at' => now()->toISOString(), // 【新增】任务完成时延长回调时间(给回调15秒时间) 'callback_expires_at' => now()->addSeconds(15)->toISOString(), ])); } /** * 标记任务失败 */ public function markTaskFailed(string $taskId, string $error): void { $this->updateTaskStatus($taskId, [ 'status' => self::STATUS_FAILED, 'progress' => 0, 'message' => '任务失败: ' . $error, 'error' => $error, ]); Log::error('TaskManager: 任务执行失败', [ 'task_id' => $taskId, 'error' => $error, ]); } /** * 发送回调通知 */ public function sendCallback(string $taskId): void { $task = $this->loadTask($taskId); if (!$task || !$task['callback_url']) { return; // 没有回调URL或任务不存在 } // 【优化】检查任务是否超时(优先检查callback_expires_at) $callbackExpiresAt = $task['callback_expires_at'] ?? $task['expires_at'] ?? null; if ($callbackExpiresAt && now()->gt($callbackExpiresAt)) { Log::warning('TaskManager: 回调已超时,跳过发送', [ 'task_id' => $taskId, 'callback_expires_at' => $callbackExpiresAt, 'current_time' => now()->toISOString(), 'task_status' => $task['status'], ]); return; } try { $payload = $this->buildCallbackPayload($task); $response = Http::timeout(30)->post($task['callback_url'], $payload); if ($response->successful()) { Log::info('TaskManager: 回调通知发送成功', [ 'task_id' => $taskId, 'callback_url' => $task['callback_url'], 'status' => $response->status(), ]); } else { Log::warning('TaskManager: 回调通知发送失败', [ 'task_id' => $taskId, 'callback_url' => $task['callback_url'], 'status' => $response->status(), 'response_body' => $response->body(), ]); } } catch (\Exception $e) { Log::error('TaskManager: 回调通知异常', [ 'task_id' => $taskId, 'callback_url' => $task['callback_url'] ?? 'unknown', 'error' => $e->getMessage(), ]); } } /** * 生成任务ID */ private function generateTaskId(string $type, array $data): string { $prefix = match ($type) { self::TASK_TYPE_EXAM => 'exam_task', self::TASK_TYPE_ANALYSIS => 'analysis_task', default => 'task_' . $type, }; return $prefix . '_' . uniqid() . '_' . substr(md5(serialize($data) . time()), 0, 8); } /** * 保存任务到缓存 */ private function saveTask(string $taskId, array $taskData): void { Cache::put($this->getCacheKey($taskId), $taskData, now()->addDay()); } /** * 从缓存加载任务 */ private function loadTask(string $taskId): ?array { return Cache::get($this->getCacheKey($taskId)); } /** * 获取缓存键 */ private function getCacheKey(string $taskId): string { return "task:{$taskId}"; } /** * 保存任务映射 */ private function saveTaskMapping(string $type, array $data, string $taskId): void { if ($type === self::TASK_TYPE_ANALYSIS) { $paperId = $data['paperId'] ?? $data['paper_id'] ?? null; $studentId = $data['studentId'] ?? $data['student_id'] ?? null; if ($paperId) { $mappingKey = $this->getTaskMappingKey($paperId, $studentId); Cache::put($mappingKey, $taskId, now()->addDay()); } } } /** * 根据试卷ID查找分析任务 */ public function findAnalysisTaskByPaperId(string $paperId, ?string $studentId = null): ?array { // 尝试从映射缓存中获取任务ID $mappingKey = $this->getTaskMappingKey($paperId, $studentId); $taskId = Cache::get($mappingKey); if ($taskId) { $task = $this->loadTask($taskId); if ($task) { return $task; } else { // 任务不存在,清理映射缓存 Cache::forget($mappingKey); } } return null; } /** * 获取任务映射缓存键 */ private function getTaskMappingKey(string $paperId, ?string $studentId = null): string { $studentPart = $studentId ? "_{$studentId}" : ''; return "task_mapping:analysis:{$paperId}{$studentPart}"; } /** * 构建回调负载 */ private function buildCallbackPayload(array $task): array { $basePayload = [ 'task_id' => $task['task_id'], 'type' => $task['type'], 'status' => $task['status'], 'completed_at' => $task['completed_at'] ?? null, ]; // 根据任务类型添加特定数据 if ($task['type'] === self::TASK_TYPE_EXAM) { $basePayload['callback_type'] = 'exam_pdf_generated'; $basePayload['paper_id'] = $task['data']['paper_id'] ?? null; $basePayload['pdfs'] = $task['pdfs'] ?? null; $basePayload['exam_content'] = $task['exam_content'] ?? null; } elseif ($task['type'] === self::TASK_TYPE_ANALYSIS) { $basePayload['callback_type'] = 'analysis_report_generated'; $basePayload['paper_id'] = $task['data']['paper_id'] ?? $task['data']['paperId'] ?? null; $basePayload['student_id'] = $task['data']['student_id'] ?? $task['data']['studentId'] ?? null; $basePayload['pdf_url'] = $task['pdf_url'] ?? null; } return $basePayload; } }