# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. from awscrt.http import HttpClientConnection, HttpRequest, HttpHeaders, HttpProxyOptions from awscrt.io import ClientBootstrap, ClientTlsContext, is_alpn_available, SocketOptions, TlsConnectionOptions import awsiot from concurrent.futures import Future import json from typing import Any, Dict, List, Optional class DiscoveryClient: """ Client which performs Greengrass discovery. Args: bootstrap: Client bootstrap socket_options: Socket options tls_context: Client TLS context region: AWS region (not used if gg_server_name is set) gg_server_name: optional full server name proxy_options: Proxy options (if None is provided then a proxy is not used) """ __slots__ = [ '_bootstrap', '_tls_context', '_socket_options', '_region', '_tls_connection_options', '_gg_server_name', 'gg_url', 'port', "_proxy_options"] def __init__( self, bootstrap: ClientBootstrap, socket_options: SocketOptions, tls_context: ClientTlsContext, region: str, gg_server_name: str = None, proxy_options: HttpProxyOptions = None): assert isinstance(bootstrap, ClientBootstrap) assert isinstance(socket_options, SocketOptions) assert isinstance(tls_context, ClientTlsContext) assert isinstance(region, str) if gg_server_name is not None: assert isinstance(gg_server_name, str) self._bootstrap = bootstrap self._socket_options = socket_options self._region = region if gg_server_name is None: self._gg_server_name = 'greengrass-ats.iot.{}.amazonaws.com'.format(region) else: self._gg_server_name = gg_server_name self._tls_connection_options = tls_context.new_connection_options() self._tls_connection_options.set_server_name(self._gg_server_name) self.port = 8443 self._proxy_options = proxy_options if is_alpn_available(): self._tls_connection_options.set_alpn_list(['x-amzn-http-ca']) self.port = 443 def discover(self, thing_name: str) -> Future: """ Perform discovery. This is an asynchronous operation. Returns: Future a Future which will contain a result of :class:`DiscoverResponse` on success, or an exception on failure. """ discovery = dict( future=Future(), response_body=bytearray()) def on_incoming_body(http_stream, chunk, **kwargs): discovery['response_body'].extend(chunk) def on_request_complete(completion_future): try: response_code = completion_future.result() if response_code == 200: payload_str = discovery['response_body'].decode('utf-8') discover_res = DiscoverResponse.from_payload(json.loads(payload_str)) discovery['future'].set_result(discover_res) else: discovery['future'].set_exception( DiscoveryException( 'Error during discover call: response_code={}'.format(response_code), response_code)) except Exception as e: discovery['future'].set_exception(e) def on_connection_completed(conn_future): try: connection = conn_future.result() headers = HttpHeaders() headers.add('host', self._gg_server_name) request = HttpRequest( method='GET', path='/greengrass/discover/thing/{}'.format(thing_name), headers=headers) http_stream = connection.request( request=request, on_body=on_incoming_body) http_stream.activate() http_stream.completion_future.add_done_callback(on_request_complete) except Exception as e: discovery['future'].set_exception(e) connect_future = HttpClientConnection.new( host_name=self._gg_server_name, port=self.port, socket_options=self._socket_options, tls_connection_options=self._tls_connection_options, bootstrap=self._bootstrap, proxy_options=self._proxy_options) connect_future.add_done_callback(on_connection_completed) return discovery['future'] class DiscoveryException(Exception): """ Discovery response was an error. """ _slots_ = ['http_response_code', 'message'] def __init__(self, message: str, response_code:int): #: HTTP response code self.http_response_code = response_code # type: int #: Message self.message = message # type: str class ConnectivityInfo(awsiot.ModeledClass): """ Connectivity info """ __slots__ = ['id', 'host_address', 'metadata', 'port'] def __init__(self): #: ID self.id = None #: Port address self.host_address = None #: Metadata self.metadata = None #: Port self.port = None @classmethod def from_payload(cls, payload: Dict[str, Any]) -> 'ConnectivityInfo': new = cls() val = payload.get('Id') if val is not None: new.id = val val = payload.get('HostAddress') if val is not None: new.host_address = val val = payload.get('PortNumber') if val is not None: new.port = val val = payload.get('Metadata') if val is not None: new.metadata = val return new class GGCore(awsiot.ModeledClass): """ Greengrass Core """ __slots__ = ['thing_arn', 'connectivity'] def __init__(self): #: Thing ARN self.thing_arn = None #: List of :class:`ConnectivityInfo` self.connectivity = None @classmethod def from_payload(cls, payload: Dict[str, Any]) -> 'GGCore': new = cls() val = payload.get('thingArn') if val is not None: new.thing_arn = val val = payload.get('Connectivity') if val is not None: new.connectivity = [ConnectivityInfo.from_payload(i) for i in val] return new class GGGroup(awsiot.ModeledClass): """ Greengrass group """ __slots__ = ['gg_group_id', 'cores', 'certificate_authorities'] def __init__(self): #: Greengrass group ID self.gg_group_id = None #: List of :class:`GGCore` self.cores = None #: List of strings self.certificate_authorities = None @classmethod def from_payload(cls, payload: Dict[str, Any]) -> 'GGGroup': new = cls() val = payload.get('GGGroupId') if val is not None: new.gg_group_id = val val = payload.get('Cores') if val is not None: new.cores = [GGCore.from_payload(i) for i in val] val = payload.get('CAs') if val is not None: new.certificate_authorities = val return new class DiscoverResponse(awsiot.ModeledClass): """ Discovery response """ __slots__ = ['gg_groups'] def __init__(self): #: List of :class:`GGGroup` self.gg_groups = None # type: Optional[List[GGGroup]] @classmethod def from_payload(cls, payload: Dict[str, Any]) -> 'DiscoverResponse': new = cls() val = payload.get('GGGroups') if val is not None: new.gg_groups = [GGGroup.from_payload(i) for i in val] return new