Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 86 additions & 124 deletions lib/flutter_client_sse.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,37 @@ import 'dart:async';
import 'dart:convert';
import 'package:flutter_client_sse/constants/sse_request_type_enum.dart';
import 'package:http/http.dart' as http;

part 'sse_event_model.dart';

/// A client for subscribing to Server-Sent Events (SSE).
class SSEClient {
static http.Client _client = new http.Client();
static StreamController<SSEModel> _streamController = StreamController();
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static stream limits the client to be used only once

final http.Client _client;

SSEClient(this._client);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients of the library will be responsible for managing the client


/// Retry the SSE connection after a delay.
///
/// [method] is the request method (GET or POST).
/// [url] is the URL of the SSE endpoint.
/// [header] is a map of request headers.
/// [body] is an optional request body for POST requests.
static void _retryConnection(
{required SSERequestType method,
required String url,
required Map<String, String> header,
Map<String, dynamic>? body}) {
print('---RETRY CONNECTION---');
Future.delayed(Duration(seconds: 5), () {
subscribeToSSE(
method: method,
url: url,
header: header,
body: body,
);
});
Stream<SSEModel> _retryConnection({
required SSERequestType method,
required String url,
required Map<String, String> header,
Map<String, dynamic>? body,
}) async* {
/// Delay for 5 seconds
await Future.delayed(Duration(seconds: 5));

/// Retry the connection
yield* subscribeToSSE(
method: method,
url: url,
header: header,
body: body,
);
}

/// Subscribe to Server-Sent Events.
Expand All @@ -41,124 +45,82 @@ class SSEClient {
/// [body] is an optional request body for POST requests.
///
/// Returns a [Stream] of [SSEModel] representing the SSE events.
static Stream<SSEModel> subscribeToSSE(
{required SSERequestType method,
required String url,
required Map<String, String> header,
Map<String, dynamic>? body}) {
Stream<SSEModel> subscribeToSSE({
required SSERequestType method,
required String url,
required Map<String, String> header,
Map<String, dynamic>? body,
}) async* {
var lineRegex = RegExp(r'^([^:]*)(?::)?(?: )?(.*)?$');
var currentSSEModel = SSEModel(data: '', id: '', event: '');

print("--SUBSCRIBING TO SSE---");
while (true) {
try {
_client = http.Client();
var request = new http.Request(
method == SSERequestType.GET ? "GET" : "POST",
Uri.parse(url),
);

/// Adding headers to the request
header.forEach((key, value) {
request.headers[key] = value;
});
try {
var request = http.Request(
method == SSERequestType.GET ? "GET" : "POST",
Uri.parse(url),
);

/// Adding body to the request if exists
if (body != null) {
request.body = jsonEncode(body);
}
// Adding headers to the request
header.forEach((key, value) {
request.headers[key] = value;
});

Future<http.StreamedResponse> response = _client.send(request);
// Adding body to the request if exists
if (body != null) {
request.body = jsonEncode(body);
}

var response = await _client.send(request);

/// Listening to the response as a stream
response.asStream().listen((data) {
/// Applying transforms and listening to it
data.stream
..transform(Utf8Decoder()).transform(LineSplitter()).listen(
(dataLine) {
if (dataLine.isEmpty) {
/// This means that the complete event set has been read.
/// We then add the event to the stream
_streamController.add(currentSSEModel);
currentSSEModel = SSEModel(data: '', id: '', event: '');
return;
}
// Listening to the response as a stream
await for (var data in response.stream.transform(Utf8Decoder()).transform(LineSplitter())) {
if (data.isEmpty) {
// This means that the complete event set has been read.
// We then yield the event to the stream
yield currentSSEModel;
currentSSEModel = SSEModel(data: '', id: '', event: '');
continue;
}

/// Get the match of each line through the regex
Match match = lineRegex.firstMatch(dataLine)!;
var field = match.group(1);
if (field!.isEmpty) {
return;
}
var value = '';
if (field == 'data') {
// If the field is data, we get the data through the substring
value = dataLine.substring(
5,
);
} else {
value = match.group(2) ?? '';
}
switch (field) {
case 'event':
currentSSEModel.event = value;
break;
case 'data':
currentSSEModel.data =
(currentSSEModel.data ?? '') + value + '\n';
break;
case 'id':
currentSSEModel.id = value;
break;
case 'retry':
break;
default:
print('---ERROR---');
print(dataLine);
_retryConnection(
method: method,
url: url,
header: header,
);
}
},
onError: (e, s) {
print('---ERROR---');
print(e);
_retryConnection(
method: method,
url: url,
header: header,
body: body,
);
},
);
}, onError: (e, s) {
print('---ERROR---');
print(e);
_retryConnection(
method: method,
url: url,
header: header,
body: body,
);
});
} catch (e) {
print('---ERROR---');
print(e);
_retryConnection(
method: method,
url: url,
header: header,
body: body,
);
// Get the match of each line through the regex
Match match = lineRegex.firstMatch(data)!;
var field = match.group(1);
if (field!.isEmpty) {
continue;
}
var value = '';
if (field == 'data') {
// If the field is data, we get the data through the substring
value = data.substring(5);
} else {
value = match.group(2) ?? '';
}
switch (field) {
case 'event':
currentSSEModel.event = value;
break;
case 'data':
currentSSEModel.data = (currentSSEModel.data ?? '') + value + '\n';
break;
case 'id':
currentSSEModel.id = value;
break;
case 'retry':
break;
default:
print('---ERROR---');
print(data);
yield* _retryConnection(method: method, url: url, header: header, body: body);
}
}
return _streamController.stream;
} catch (e) {
print('---ERROR---');
print(e);
/// Handle errors by yielding an empty stream or specific error handling logic
yield* _retryConnection(method: method, url: url, header: header, body: body);
}
}

/// Unsubscribe from the SSE.
static void unsubscribeFromSSE() {
_streamController.close();
_client.close();
}
}
32 changes: 16 additions & 16 deletions pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -87,26 +87,26 @@ packages:
dependency: transitive
description:
name: leak_tracker
sha256: "78eb209deea09858f5269f5a5b02be4049535f568c07b275096836f01ea323fa"
sha256: "3f87a60e8c63aecc975dda1ceedbc8f24de75f09e4856ea27daf8958f2f0ce05"
url: "https://pub.dev"
source: hosted
version: "10.0.0"
version: "10.0.5"
leak_tracker_flutter_testing:
dependency: transitive
description:
name: leak_tracker_flutter_testing
sha256: b46c5e37c19120a8a01918cfaf293547f47269f7cb4b0058f21531c2465d6ef0
sha256: "932549fb305594d82d7183ecd9fa93463e9914e1b67cacc34bc40906594a1806"
url: "https://pub.dev"
source: hosted
version: "2.0.1"
version: "3.0.5"
leak_tracker_testing:
dependency: transitive
description:
name: leak_tracker_testing
sha256: a597f72a664dbd293f3bfc51f9ba69816f84dcd403cdac7066cb3f6003f3ab47
sha256: "6ba465d5d76e67ddf503e1161d1f4a6bc42306f9d66ca1e8f079a47290fb06d3"
url: "https://pub.dev"
source: hosted
version: "2.0.1"
version: "3.0.1"
matcher:
dependency: transitive
description:
Expand All @@ -119,18 +119,18 @@ packages:
dependency: transitive
description:
name: material_color_utilities
sha256: "0e0a020085b65b6083975e499759762399b4475f766c21668c4ecca34ea74e5a"
sha256: f7142bb1154231d7ea5f96bc7bde4bda2a0945d2806bb11670e30b850d56bdec
url: "https://pub.dev"
source: hosted
version: "0.8.0"
version: "0.11.1"
meta:
dependency: transitive
description:
name: meta
sha256: d584fa6707a52763a52446f02cc621b077888fb63b93bbcb1143a7be5a0c0c04
sha256: bdb68674043280c3428e9ec998512fb681678676b3c54e773629ffe74419f8c7
url: "https://pub.dev"
source: hosted
version: "1.11.0"
version: "1.15.0"
path:
dependency: transitive
description:
Expand Down Expand Up @@ -188,10 +188,10 @@ packages:
dependency: transitive
description:
name: test_api
sha256: "5c2f730018264d276c20e4f1503fd1308dfbbae39ec8ee63c5236311ac06954b"
sha256: "5b8a98dafc4d5c4c9c72d8b31ab2b23fc13422348d2997120294d3bac86b4ddb"
url: "https://pub.dev"
source: hosted
version: "0.6.1"
version: "0.7.2"
typed_data:
dependency: transitive
description:
Expand All @@ -212,10 +212,10 @@ packages:
dependency: transitive
description:
name: vm_service
sha256: b3d56ff4341b8f182b96aceb2fa20e3dcb336b9f867bc0eafc0de10f1048e957
sha256: f652077d0bdf60abe4c1f6377448e8655008eef28f128bc023f7b5e8dfeb48fc
url: "https://pub.dev"
source: hosted
version: "13.0.0"
version: "14.2.4"
sdks:
dart: ">=3.2.0-0 <4.0.0"
flutter: ">=1.17.0"
dart: ">=3.3.0 <4.0.0"
flutter: ">=3.18.0-18.0.pre.54"
20 changes: 17 additions & 3 deletions sse-sample-backend/src/app.controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Controller, Sse } from '@nestjs/common';
import { Controller, Get, Sse, Res, HttpStatus } from '@nestjs/common';
import { Response } from 'express';
import { AppService } from './app.service';
import { interval, map, Observable } from 'rxjs';
import { interval, map, Observable, take } from 'rxjs';
import { MessageEvent } from '@nestjs/common';

@Controller()
Expand All @@ -9,6 +10,19 @@ export class AppController {

@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(1000).pipe(map((_) => ({ data: 'world' })));
return interval(10).pipe(map((_) => ({ data: 'world' })));
}

@Sse('sse-limited')
sseLimited(): Observable<MessageEvent> {
return interval(10).pipe(
take(3),
map(() => ({ data: 'world' }))
);
}

@Get('sse-error')
httpError(@Res() res: Response): void {
res.status(HttpStatus.INTERNAL_SERVER_ERROR).json({ message: 'Simulated Error' });
}
}
Loading