ImportStreamController.php 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. <?php
  2. namespace App\Http\Controllers;
  3. use Filament\Facades\Filament;
  4. use Illuminate\Http\Request;
  5. use Illuminate\Support\Facades\Redis;
  6. class ImportStreamController extends Controller
  7. {
  8. public function stream(Request $request): \Symfony\Component\HttpFoundation\StreamedResponse
  9. {
  10. if (!Filament::auth()->check()) {
  11. abort(403);
  12. }
  13. $type = $request->query('type', 'markdown-imports');
  14. $importId = (int) $request->query('import_id', 0);
  15. $channel = $type === 'pre-question-candidates' ? 'pre-question-candidates' : 'markdown-imports';
  16. return response()->stream(function () use ($channel, $importId) {
  17. echo "retry: 2000\n\n";
  18. @ob_flush();
  19. flush();
  20. Redis::connection()->subscribe([$channel], function ($message) use ($importId) {
  21. $payload = is_string($message) ? $message : json_encode($message, JSON_UNESCAPED_UNICODE);
  22. $decoded = json_decode($payload, true);
  23. if ($importId > 0 && is_array($decoded)) {
  24. $messageImportId = (int) ($decoded['import_id'] ?? 0);
  25. if ($messageImportId !== $importId) {
  26. return;
  27. }
  28. }
  29. echo "event: update\n";
  30. echo 'data: ' . $payload . "\n\n";
  31. @ob_flush();
  32. flush();
  33. });
  34. }, 200, [
  35. 'Content-Type' => 'text/event-stream',
  36. 'Cache-Control' => 'no-cache',
  37. 'X-Accel-Buffering' => 'no',
  38. 'Connection' => 'keep-alive',
  39. ]);
  40. }
  41. }