// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; import 'package:async/async.dart'; import 'package:aws_common/aws_common.dart'; import 'package:collection/collection.dart'; import 'package:meta/meta.dart'; /// {@template aws_common.aws_http_response} /// A parameterized HTTP response. /// {@endtemplate} /// /// See also: /// - [AWSHttpResponse] /// - [AWSStreamedHttpResponse] sealed class AWSBaseHttpResponse implements StreamSplitter>, Closeable { AWSBaseHttpResponse._({ required this.statusCode, Map? headers, }) : headers = UnmodifiableMapView( CaseInsensitiveMap(headers ?? const {}), ); /// The response's status code. final int statusCode; /// The response's headers. final Map headers; /// The response's body. Stream> get body; /// The collected bytes of the response [body]. FutureOr> get bodyBytes; /// Decodes the response body using [encoding] (defaults to UTF-8). FutureOr decodeBody({Encoding encoding = utf8}); } /// {@macro aws_common.aws_http_response} @immutable class AWSHttpResponse extends AWSBaseHttpResponse { /// {@macro aws_common.aws_http_response} AWSHttpResponse({ required super.statusCode, super.headers, List? body, }) : bodyBytes = body ?? const [], super._(); @override Stream> get body => bodyBytes.isEmpty ? const Stream.empty() : Stream.value(bodyBytes); @override Stream> split() => body; @override final List bodyBytes; @override String decodeBody({Encoding encoding = utf8}) => encoding.decode(bodyBytes); @override Future close() async {} } /// {@template aws_common.aws_http_streamed_response} /// A streaming HTTP response. /// {@endtemplate} class AWSStreamedHttpResponse extends AWSBaseHttpResponse { /// @{macro aws_common.aws_http_streamed_response} AWSStreamedHttpResponse({ required super.statusCode, super.headers, required Stream> body, }) : _body = body, super._(); /// Handles splitting [_body] into multiple single-subscription streams. StreamSplitter>? _splitter; /// The original body. final Stream> _body; @override Stream> get body => _splitter == null ? _body : split(); @override Future decodeBody({Encoding encoding = utf8}) => encoding.decodeStream(body); @override Future get bodyBytes => collectBytes(body); /// Reads [body] fully and returns a flattened [AWSHttpResponse]. /// /// `this` will no longer be usable after this completes. Future read() async { try { return AWSHttpResponse( statusCode: statusCode, headers: headers, body: await bodyBytes, ); } finally { unawaited(close()); } } /// The number of times the body stream has been split. @visibleForTesting int debugNumSplits = 0; /// Returns a copy of [body] in cases where the stream must be read multiple /// times. @override Stream> split() { debugNumSplits++; return (_splitter ??= StreamSplitter(body)).split(); } @override Future close() => _splitter?.close() ?? Future.value(); }