| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- <?php
- namespace App\Services;
- use Illuminate\Http\Client\Response;
- use Illuminate\Support\Facades\Cache;
- use Illuminate\Support\Facades\Http;
- use Illuminate\Support\Facades\Log;
- /**
- * 统一异步任务管理器
- * 负责所有异步任务的创建、状态管理、进度更新和回调通知
- */
- class TaskManager
- {
- /**
- * 任务类型常量
- */
- const TASK_TYPE_EXAM = 'exam'; // 智能出卷任务
- const TASK_TYPE_ANALYSIS = 'analysis'; // 学情分析任务
- /**
- * 任务状态常量
- */
- const STATUS_PENDING = 'pending';
- const STATUS_PROCESSING = 'processing';
- const STATUS_COMPLETED = 'completed';
- const STATUS_FAILED = 'failed';
- /**
- * 创建异步任务
- */
- public function createTask(string $type, array $data): string
- {
- $taskId = $this->generateTaskId($type, $data);
- $taskData = [
- 'task_id' => $taskId,
- 'type' => $type,
- 'status' => self::STATUS_PROCESSING,
- 'progress' => 0,
- 'message' => '任务已创建,正在处理...',
- 'data' => $data,
- 'created_at' => now()->toISOString(),
- 'updated_at' => now()->toISOString(),
- 'callback_url' => $data['callback_url'] ?? null,
- // 【优化】根据任务类型设置不同的超时时间
- 'expires_at' => now()->addSeconds($type === self::TASK_TYPE_EXAM ? 45 : 30)->toISOString(),
- ];
- $this->saveTask($taskId, $taskData);
- // 保存任务映射(用于快速查找)
- $this->saveTaskMapping($type, $data, $taskId);
- Log::info('TaskManager: 任务已创建', [
- 'task_id' => $taskId,
- 'type' => $type,
- 'data_keys' => array_keys($data),
- ]);
- return $taskId;
- }
- /**
- * 获取任务状态
- */
- public function getTaskStatus(string $taskId): ?array
- {
- return $this->loadTask($taskId);
- }
- /**
- * 更新任务状态
- */
- public function updateTaskStatus(string $taskId, array $updates): void
- {
- $task = $this->loadTask($taskId);
- if (!$task) {
- Log::warning('TaskManager: 尝试更新不存在的任务', ['task_id' => $taskId]);
- return;
- }
- $updatedTask = array_merge($task, $updates, [
- 'updated_at' => now()->toISOString(),
- ]);
- $this->saveTask($taskId, $updatedTask);
- Log::info('TaskManager: 任务状态已更新', [
- 'task_id' => $taskId,
- 'status' => $updates['status'] ?? 'N/A',
- 'progress' => $updates['progress'] ?? 'N/A',
- ]);
- }
- /**
- * 更新任务进度
- */
- public function updateTaskProgress(string $taskId, int $progress, string $message): void
- {
- $this->updateTaskStatus($taskId, [
- 'progress' => $progress,
- 'message' => $message,
- ]);
- }
- /**
- * 标记任务完成
- */
- public function markTaskCompleted(string $taskId, array $result): void
- {
- $this->updateTaskStatus($taskId, array_merge($result, [
- 'status' => self::STATUS_COMPLETED,
- 'progress' => 100,
- 'message' => '任务已完成',
- 'completed_at' => now()->toISOString(),
- // 【新增】任务完成时延长回调时间(给回调15秒时间)
- 'callback_expires_at' => now()->addSeconds(15)->toISOString(),
- ]));
- }
- /**
- * 标记任务失败
- */
- public function markTaskFailed(string $taskId, string $error): void
- {
- $this->updateTaskStatus($taskId, [
- 'status' => self::STATUS_FAILED,
- 'progress' => 0,
- 'message' => '任务失败: ' . $error,
- 'error' => $error,
- ]);
- Log::error('TaskManager: 任务执行失败', [
- 'task_id' => $taskId,
- 'error' => $error,
- ]);
- }
- /**
- * 发送回调通知
- */
- public function sendCallback(string $taskId): void
- {
- $task = $this->loadTask($taskId);
- if (!$task || !$task['callback_url']) {
- return; // 没有回调URL或任务不存在
- }
- // 【优化】检查任务是否超时(优先检查callback_expires_at)
- $callbackExpiresAt = $task['callback_expires_at'] ?? $task['expires_at'] ?? null;
- if ($callbackExpiresAt && now()->gt($callbackExpiresAt)) {
- Log::warning('TaskManager: 回调已超时,跳过发送', [
- 'task_id' => $taskId,
- 'callback_expires_at' => $callbackExpiresAt,
- 'current_time' => now()->toISOString(),
- 'task_status' => $task['status'],
- ]);
- return;
- }
- try {
- $payload = $this->buildCallbackPayload($task);
- $response = Http::timeout(30)->post($task['callback_url'], $payload);
- if ($response->successful()) {
- Log::info('TaskManager: 回调通知发送成功', [
- 'task_id' => $taskId,
- 'callback_url' => $task['callback_url'],
- 'status' => $response->status(),
- ]);
- } else {
- Log::warning('TaskManager: 回调通知发送失败', [
- 'task_id' => $taskId,
- 'callback_url' => $task['callback_url'],
- 'status' => $response->status(),
- 'response_body' => $response->body(),
- ]);
- }
- } catch (\Exception $e) {
- Log::error('TaskManager: 回调通知异常', [
- 'task_id' => $taskId,
- 'callback_url' => $task['callback_url'] ?? 'unknown',
- 'error' => $e->getMessage(),
- ]);
- }
- }
- /**
- * 生成任务ID
- */
- private function generateTaskId(string $type, array $data): string
- {
- $prefix = match ($type) {
- self::TASK_TYPE_EXAM => 'exam_task',
- self::TASK_TYPE_ANALYSIS => 'analysis_task',
- default => 'task_' . $type,
- };
- return $prefix . '_' . uniqid() . '_' . substr(md5(serialize($data) . time()), 0, 8);
- }
- /**
- * 保存任务到缓存
- */
- private function saveTask(string $taskId, array $taskData): void
- {
- Cache::put($this->getCacheKey($taskId), $taskData, now()->addDay());
- }
- /**
- * 从缓存加载任务
- */
- private function loadTask(string $taskId): ?array
- {
- return Cache::get($this->getCacheKey($taskId));
- }
- /**
- * 获取缓存键
- */
- private function getCacheKey(string $taskId): string
- {
- return "task:{$taskId}";
- }
- /**
- * 保存任务映射
- */
- private function saveTaskMapping(string $type, array $data, string $taskId): void
- {
- if ($type === self::TASK_TYPE_ANALYSIS) {
- $paperId = $data['paperId'] ?? $data['paper_id'] ?? null;
- $studentId = $data['studentId'] ?? $data['student_id'] ?? null;
- if ($paperId) {
- $mappingKey = $this->getTaskMappingKey($paperId, $studentId);
- Cache::put($mappingKey, $taskId, now()->addDay());
- }
- }
- }
- /**
- * 根据试卷ID查找分析任务
- */
- public function findAnalysisTaskByPaperId(string $paperId, ?string $studentId = null): ?array
- {
- // 尝试从映射缓存中获取任务ID
- $mappingKey = $this->getTaskMappingKey($paperId, $studentId);
- $taskId = Cache::get($mappingKey);
- if ($taskId) {
- $task = $this->loadTask($taskId);
- if ($task) {
- return $task;
- } else {
- // 任务不存在,清理映射缓存
- Cache::forget($mappingKey);
- }
- }
- return null;
- }
- /**
- * 获取任务映射缓存键
- */
- private function getTaskMappingKey(string $paperId, ?string $studentId = null): string
- {
- $studentPart = $studentId ? "_{$studentId}" : '';
- return "task_mapping:analysis:{$paperId}{$studentPart}";
- }
- /**
- * 构建回调负载
- */
- private function buildCallbackPayload(array $task): array
- {
- $basePayload = [
- 'task_id' => $task['task_id'],
- 'type' => $task['type'],
- 'status' => $task['status'],
- 'completed_at' => $task['completed_at'] ?? null,
- ];
- // 根据任务类型添加特定数据
- if ($task['type'] === self::TASK_TYPE_EXAM) {
- $basePayload['callback_type'] = 'exam_pdf_generated';
- $basePayload['paper_id'] = $task['data']['paper_id'] ?? null;
- $basePayload['pdfs'] = $task['pdfs'] ?? null;
- $basePayload['exam_content'] = $task['exam_content'] ?? null;
- } elseif ($task['type'] === self::TASK_TYPE_ANALYSIS) {
- $basePayload['callback_type'] = 'analysis_report_generated';
- $basePayload['paper_id'] = $task['data']['paper_id'] ?? $task['data']['paperId'] ?? null;
- $basePayload['student_id'] = $task['data']['student_id'] ?? $task['data']['studentId'] ?? null;
- $basePayload['pdf_url'] = $task['pdf_url'] ?? null;
- }
- return $basePayload;
- }
- }
|