Amplify also provides a set of APIs that expose [Reactive Extensions](http://reactivex.io/), a cross-platform library for asynchronous and event-based programs. To use it, you'll interact with the **`RxAmplify`** facade instead of the default `Amplify` facade. ```java import com.amplifyframework.rx.RxAmplify; // ... Post post = Post.builder() .title("My First Post") .build(); RxAmplify.DataStore.save(post) .subscribe( () -> Log.i("RxAmplifyDemo", "Saved a post"), failure -> Log.e("RxAmplifyDemo", "Save failed", failure) ); ``` Compared to the traditional callback API, this doesn't make a big difference when used for a single method call. However, it greatly improves readability when chaining asynchronous calls. Moreover, you can use standard RxJava operators to compose other complex functionality into readable chunks. Let's revisit your nested example where you saved `Post`, `Editor`, and `PostEditor`. With Amplify's RxJava interface you can merge these operations together. ```java Completable.mergeArray( RxAmplify.DataStore.save(post), RxAmplify.DataStore.save(editor) ).andThen( RxAmplify.DataStore.save(postEditor) ).subscribe( () -> Log.i("RxAmplifyDemo", "Post, Editor, and PostEditor saved"), failure -> Log.e("RxAmplifyDemo", "One or more items not saved", failure) ); ``` Compared to nesting these dependent calls in callbacks, this provides a much more readable pattern. ## Installation Amplify's RxJava support is included in an optional module, `rxbindings`. To start using the Rx APIs, add the following dependency to your application's Gradle file: Under **Gradle Scripts**, open **build.gradle (Module :app)**. Add the following line in `dependencies`: ```groovy dependencies { // Add the below line in `dependencies` implementation 'com.amplifyframework:rxbindings:ANDROID_VERSION' } ``` ## Usage Amplify tries to map the behavior of your callback-based APIs to well-known Rx primitives in an intuitive way. Functions whose callbacks emit a single value (or error) will now return Rx `Single`s, instead. Functions whose callbacks emit no particular value will now return Rx `Completable`s, instead. Lastly, functions whose callbacks emit a stream of values will now return `Observable`s, instead. ## Special cases Some APIs return an operation which can be cancelled. Examples include subscribing to an API or uploading or downloading objects from Storage. ### API subscriptions The API category's `subscribe()` method exposes two `Observable`s: one for subscription data, and one for connection state. You can access these `Observable`s using `observeConnectionState()` and `observeSubscriptionData()` on the returned operation: ```java RxSubscriptionOperation> subscription = RxAmplify.API.subscribe(request); subscription .observeConnectionState() .subscribe( connectionStateEvent -> Log.i("RxAmplifyDemo", String.valueOf(connectionStateEvent)) ); subscription .observeSubscriptionData() .subscribe( data -> Log.i("RxAmplifyDemo", "Data on subscription = " + data), failure -> Log.e("RxAmplifyDemo", "Subscription failed", failure), () -> Log.i("RxAmplifyDemo", "Subscription completed") ); ``` ### Storage upload & download operations The Storage category's `downloadFile()` and `uploadFile()` work largely the same way. `uploadFile()` and `downloadFile()` both return an operation containing a `Single` and an `Observable`. The `Single` can be used to obtain the result of the download, and the `Observable` can be used to monitor download/upload progress. ```java // Download RxProgressAwareSingleOperation download = RxAmplify.Storage.downloadFile(remoteKey, localFile); download .observeProgress() .subscribe( progress -> Log.i("RxAmplifyDemo", "Download progress = " + progress.toString()) ); download .observeResult() .subscribe( result -> Log.i("RxAmplifyDemo", "Download finished! " + result.getFile().getPath()), failure -> Log.e("RxAmplifyDemo", "Download failed", failure) ); // Upload RxProgressAwareSingleOperation upload = RxAmplify.Storage.uploadFile(remoteKey, localFile); upload .observeProgress() .subscribe( progress -> Log.i("RxAmplifyDemo", "Upload progress = " + progress.toString()) ); upload .observeResult() .subscribe( result -> Log.i("RxAmplifyDemo", "Upload finished! " + result.getKey()), failure -> Log.e("RxAmplifyDemo", "Upload failed", failure) ); ```