/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * software and associated documentation files (the "Software"), to deal in the Software * without restriction, including without limitation the rights to use, copy, modify, * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ package com.amazonaws.transcribestreaming; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent; import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; /** * This is an example Subscription implementation that converts bytes read from an AudioStream into AudioEvents * that can be sent to the Transcribe service. It implements a simple demand system that will read chunks of bytes * from an input stream containing audio data * * To read more about how Subscriptions and reactive streams work, please see * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md */ public class ByteToAudioEventSubscription implements Subscription { private static final int CHUNK_SIZE_IN_BYTES = 1024 * 4; private ExecutorService executor = Executors.newFixedThreadPool(1); private AtomicLong demand = new AtomicLong(0); private final Subscriber subscriber; private final InputStream inputStream; public ByteToAudioEventSubscription(Subscriber s, InputStream inputStream) { this.subscriber = s; this.inputStream = inputStream; } @Override public void request(long n) { if (n <= 0) { subscriber.onError(new IllegalArgumentException("Demand must be positive")); } demand.getAndAdd(n); //We need to invoke this in a separate thread because the call to subscriber.onNext(...) is recursive executor.submit(() -> { try { do { ByteBuffer audioBuffer = getNextEvent(); if (audioBuffer.remaining() > 0) { AudioEvent audioEvent = audioEventFromBuffer(audioBuffer); subscriber.onNext(audioEvent); } else { subscriber.onComplete(); break; } } while (demand.decrementAndGet() > 0); } catch (Exception e) { subscriber.onError(e); } }); } @Override public void cancel() { executor.shutdown(); } private ByteBuffer getNextEvent() { ByteBuffer audioBuffer = null; byte[] audioBytes = new byte[CHUNK_SIZE_IN_BYTES]; try { int len = inputStream.read(audioBytes); if (len <= 0) { audioBuffer = ByteBuffer.allocate(0); } else { audioBuffer = ByteBuffer.wrap(audioBytes, 0, len); } } catch (IOException e) { throw new UncheckedIOException(e); } return audioBuffer; } private AudioEvent audioEventFromBuffer(ByteBuffer bb) { return AudioEvent.builder() .audioChunk(SdkBytes.fromByteBuffer(bb)) .build(); } }