Subscribe to mutations for creating real-time clients. ## Setup subscription with callbacks When creating subscriptions, a [`Stream`](https://api.dart.dev/stable/dart-async/Stream-class.html) object will be returned to you. This `Stream` will continue producing events until either the subscription encounters an error or you cancel the subscription. In the case of need for limiting the amount of data that is omitted, you can take advantage of the Stream's helper functions such as `take`. The cancellation occurs when the defined amount of event has occurred: ```dart Stream> subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream> operation = Amplify.API .subscribe( subscriptionRequest, onEstablished: () => print('Subscription established'), ) // Listens to only 5 elements .take(5) .handleError( (error) { print('Error in subscription stream: $error'); }, ); return operation; } ``` Alternatively, you can call [`Stream.listen`](https://api.dart.dev/stable/dart-async/Stream/listen.html) to create a [`StreamSubscription`](https://api.dart.dev/stable/dart-async/StreamSubscription-class.html) object which can be programmatically canceled. ```dart // Be sure to import this import 'dart:async'; ... StreamSubscription>? subscription; void subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream> operation = Amplify.API.subscribe( subscriptionRequest, onEstablished: () => print('Subscription established'), ); subscription = operation.listen( (event) { print('Subscription event data received: ${event.data}'); }, onError: (Object e) => print('Error in subscription stream: $e'), ); } void unsubscribe() { subscription?.cancel(); } ``` Note that in addition to an `onCreate` subscription, you can also call `.onUpdate()` or `.onDelete()`. ```dart final onUpdateSubscriptionRequest = ModelSubscriptions.onUpdate(Todo.classType); // or final onDeleteSubscriptionRequest = ModelSubscriptions.onDelete(Todo.classType); ```