From 2134bf72f85d5b67aa263990120a388fc15db66e Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 15 Oct 2022 15:29:12 +0200 Subject: [PATCH] feat: add experimental CoAP server implementation --- example/coap_server.dart | 36 +++++++++ lib/coap.dart | 1 + lib/src/coap_server.dart | 163 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 200 insertions(+) create mode 100644 example/coap_server.dart create mode 100644 lib/src/coap_server.dart diff --git a/example/coap_server.dart b/example/coap_server.dart new file mode 100644 index 00000000..a92d4771 --- /dev/null +++ b/example/coap_server.dart @@ -0,0 +1,36 @@ +// ignore_for_file: avoid_print + +/* + * Package : Coap + * Author : J. Romann + * Date : 10/15/2022 + * Copyright : J. Romann + * + * CoAP Server example + */ + +import 'dart:async'; +import 'dart:io'; +import 'package:coap/coap.dart'; + +FutureOr main() async { + final server = await CoapServer.bind( + InternetAddress.anyIPv4, + CoapUriScheme.coap, + ); + server.listen( + (final request) async { + print('Received the following request: $request\n'); + final response = CoapResponse.createResponse( + request, + CoapCode.content, + CoapMessageType.ack, + )..id = request.id; + print('Sending response: $response\n'); + server + ..sendResponse(response, request.source!, request.uriPort) + ..close(); + }, + onDone: () => print('Done!'), + ); +} diff --git a/lib/coap.dart b/lib/coap.dart index 097d8511..f8320987 100644 --- a/lib/coap.dart +++ b/lib/coap.dart @@ -29,6 +29,7 @@ export 'src/coap_option.dart'; export 'src/coap_option_type.dart'; export 'src/coap_request.dart'; export 'src/coap_response.dart'; +export 'src/coap_server.dart'; export 'src/deduplication/crop_rotation_deduplicator.dart'; export 'src/deduplication/deduplicator.dart'; export 'src/deduplication/noop_deduplicator.dart'; diff --git a/lib/src/coap_server.dart b/lib/src/coap_server.dart new file mode 100644 index 00000000..d839166d --- /dev/null +++ b/lib/src/coap_server.dart @@ -0,0 +1,163 @@ +/* + * Package : Coap + * Author : J. Romann + * Date : 10/15/2022 + * Copyright : J. Romann + * + * CoAP Server implementation + */ + +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:typed_data/typed_data.dart'; + +import '../config/coap_config_default.dart'; +import 'coap_config.dart'; +import 'coap_request.dart'; +import 'coap_response.dart'; +import 'stack/layer_stack.dart'; + +enum CoapUriScheme { + coap, + coaps, + coapWs, + coapsWs, + coapTcp, + coapsTcp, +} + +abstract class CoapServer extends Stream { + CoapServer(); + + static Future bind( + final Object? host, + final CoapUriScheme uriScheme, { + final DefaultCoapConfig? config, + final bool reuseAddress = true, + final bool reusePort = false, + }) async { + switch (uriScheme) { + case CoapUriScheme.coap: + return _createUdpServer( + host, + config: config, + reuseAddress: reuseAddress, + reusePort: reusePort, + ); + case CoapUriScheme.coaps: + case CoapUriScheme.coapWs: + case CoapUriScheme.coapsWs: + case CoapUriScheme.coapTcp: + case CoapUriScheme.coapsTcp: + throw UnimplementedError(); + } + } + + static Future _createUdpServer( + final Object? host, { + final DefaultCoapConfig? config, + final bool reuseAddress = true, + final bool reusePort = false, + }) async { + final serverConfig = config ?? CoapConfigDefault(); + final coapPort = serverConfig.defaultPort; + + final socket = await RawDatagramSocket.bind( + host, + coapPort, + reuseAddress: reuseAddress, + reusePort: reusePort, + ); + + return _CoapUdpServer(socket, coapPort, serverConfig); + } + + int get port; + + String get uriScheme; + + void sendResponse( + final CoapResponse response, + final InternetAddress address, + final int port, + ); + + void close(); +} + +class _CoapUdpServer extends CoapServer { + final DefaultCoapConfig _config; + + final streamController = StreamController(); + + final stack = LayerStack(CoapConfigDefault()); + + @override + void close() { + streamController.close(); + _socket.close(); + } + + _CoapUdpServer(this._socket, this.port, this._config) { + _socket.listen((final event) { + if (event != RawSocketEvent.read) { + return; + } + + final datagram = _socket.receive(); + if (datagram == null) { + return; + } + final data = Uint8Buffer()..addAll(datagram.data); + final decoder = _config.spec.newMessageDecoder(data); + if (decoder.isRequest) { + final request = decoder.decodeRequest(); + if (request == null) { + return; + } + request + ..source = datagram.address + ..uriPort = datagram.port; + streamController.sink.add(request); + } + }); + } + + @override + final uriScheme = 'coap'; + + @override + final int port; + + final RawDatagramSocket _socket; + + @override + StreamSubscription listen( + final void Function(CoapRequest event)? onData, { + final Function? onError, + final void Function()? onDone, + final bool? cancelOnError, + }) => + streamController.stream.listen( + onData, + onError: onError, + onDone: () { + onDone?.call(); + }, + cancelOnError: cancelOnError, + ); + + @override + void sendResponse( + final CoapResponse response, + final InternetAddress address, + final int port, + ) { + final encoder = _config.spec.newMessageEncoder(); + final bytes = encoder.encodeResponse(response); + final buffer = Uint8List.fromList(bytes.toList(growable: false)); + _socket.send(buffer, address, port); + } +}