ImportStreamController.php 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. <?php
  2. namespace App\Http\Controllers;
  3. use Filament\Facades\Filament;
  4. use Illuminate\Http\Request;
  5. use Illuminate\Http\StreamedResponse as LaravelStreamedResponse;
  6. use Illuminate\Support\Facades\Redis;
  7. class ImportStreamController extends Controller
  8. {
  9. public function stream(Request $request): LaravelStreamedResponse
  10. {
  11. if (!Filament::auth()->check()) {
  12. abort(403);
  13. }
  14. $type = $request->query('type', 'markdown-imports');
  15. $importId = (int) $request->query('import_id', 0);
  16. $channel = $type === 'pre-question-candidates' ? 'pre-question-candidates' : 'markdown-imports';
  17. return response()->stream(function () use ($channel, $importId) {
  18. echo "retry: 2000\n\n";
  19. @ob_flush();
  20. flush();
  21. Redis::connection()->subscribe([$channel], function ($message) use ($importId) {
  22. $payload = is_string($message) ? $message : json_encode($message, JSON_UNESCAPED_UNICODE);
  23. $decoded = json_decode($payload, true);
  24. if ($importId > 0 && is_array($decoded)) {
  25. $messageImportId = (int) ($decoded['import_id'] ?? 0);
  26. if ($messageImportId !== $importId) {
  27. return;
  28. }
  29. }
  30. echo "event: update\n";
  31. echo 'data: ' . $payload . "\n\n";
  32. @ob_flush();
  33. flush();
  34. });
  35. }, 200, [
  36. 'Content-Type' => 'text/event-stream',
  37. 'Cache-Control' => 'no-cache',
  38. 'X-Accel-Buffering' => 'no',
  39. 'Connection' => 'keep-alive',
  40. ]);
  41. }
  42. }