/* * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package com.amplifyframework.rx; import com.amplifyframework.core.Action; import com.amplifyframework.core.Consumer; import com.amplifyframework.core.NoOpConsumer; import com.amplifyframework.core.async.Cancelable; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; /** * Utilities for modeling Amplify category behaviors, and converting * those category behaviors into Rx objects (Observable, Single, Completable). */ final class RxAdapters { private RxAdapters() {} /** * Cancelable behaviors are those Amplify category behaviors which return a cancelable * operation. For example, most behaviors in Storage and Predictions will return * a cancelable operation, whereas DataStore and Auth do not. */ static final class CancelableBehaviors { private CancelableBehaviors() {} static Completable toCompletable(ActionEmitter behavior) { return Completable.create(subscriber -> { Cancelable cancelable = behavior.emitTo(subscriber::onComplete, subscriber::tryOnError); subscriber.setDisposable(AmplifyDisposables.fromCancelable(cancelable)); }); } static Single toSingle(ResultEmitter behavior) { return Single.create(subscriber -> { Cancelable cancelable = behavior.emitTo(subscriber::onSuccess, subscriber::tryOnError); subscriber.setDisposable(AmplifyDisposables.fromCancelable(cancelable)); }); } static Observable toObservable(StreamEmitter behavior) { return Observable.create(subscriber -> { Cancelable cancelable = behavior.streamTo( NoOpConsumer.create(), subscriber::onNext, subscriber::tryOnError, subscriber::onComplete ); subscriber.setDisposable(AmplifyDisposables.fromCancelable(cancelable)); }); } /** * Describes a behavior which emits a notification of start to an {@link Consumer}, * then emits 0..n values to a value {@link Consumer}, and finally terminated * either by calling a successful {@link Action}, or emitting an error to an * error {@link Consumer}. May be canceled via a returned {@link Cancelable}. * Such a method may be wrapped into an {@link Observable} * by using the {@link CancelableBehaviors#toObservable(StreamEmitter)} utility. * @param Type emitted to the start consumer * @param Type of object being emitted to the value consumer * @param Type emitted to error consumer */ interface StreamEmitter { Cancelable streamTo(Consumer onStart, Consumer onItem, Consumer onError, Action onComplete); } /** * Describes a behavior which emits results to a result or error {@link Consumer}, * and can be canceled via an {@link Cancelable}. Such a method * may be wrapped into an {@link Single} in a uniform way by using the * {@link CancelableBehaviors#toSingle(ResultEmitter)} utility. * @param Type of result accepted by result consumer * @param Type of error accepted by error consumer */ interface ResultEmitter { Cancelable emitTo(Consumer onResult, Consumer onError); } /** * A behavior which terminates in a completion action or an error. * Returns a cancelable when the behavior starts. Such a method * may be wrapped into an {@link Completable} in a uniform way * by using the {@link CancelableBehaviors#toCompletable(ActionEmitter)} * utility. * @param Type of error emitted */ interface ActionEmitter { Cancelable emitTo(Action onComplete, Consumer onError); } } /** * Void behaviors are those Amplify category behaviors which have a void return type. * For example, most behaviors in Auth and DataStore have a void return. Unlike * {@link CancelableBehaviors}, such behaviors may not be canceled once begun. */ static final class VoidBehaviors { private VoidBehaviors() {} static Completable toCompletable(ActionEmitter behavior) { return Completable.create(subscriber -> behavior.emitTo(subscriber::onComplete, subscriber::tryOnError)); } static Single toSingle(ResultEmitter behavior) { return Single.create(subscriber -> behavior.emitTo(subscriber::onSuccess, subscriber::tryOnError)); } static Observable toObservable(StreamEmitter behavior) { return Observable.create(subscriber -> { behavior.streamTo( NoOpConsumer.create(), subscriber::onNext, subscriber::tryOnError, subscriber::onComplete ); }); } /** * A behavior which streams items to a consumer, and then ends with an error or completion signal. * The behavior does not itself return anything synchronously. * The behavior begins by emitting to a start consumer. * @param Type of token emitted on successful start * @param Type of item output to stream consumer * @param Type of error emitted */ interface StreamEmitter { void streamTo(Consumer onStart, Consumer onItem, Consumer onError, Action onComplete); } /** * A behavior which emits to a result listener, but returns no value, itself. * @param Type of result emitted * @param Type of error emitted */ interface ResultEmitter { void emitTo(Consumer onResult, Consumer onError); } /** * Describes behavior which emits a completion notification via an {@link Action}, * or alternately, emits an error to a {@link Consumer}. * @param Type of error emitted */ interface ActionEmitter { void emitTo(Action onComplete, Consumer onError); } } /** * Interface that should be implemented by reactive-style operations * wishing to return a {@link Single} as its result. * @param The type that represents the result of a given operation. */ interface RxSingleOperation extends Cancelable { /** * Maps the result of a callback-style operation to a {@link Single}. * @return A {@link Single} that emits a result or an error. */ Single observeResult(); } }