# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import os import json import time from datetime import datetime import logging import sys from awscrt.io import ( ClientBootstrap, DefaultHostResolver, EventLoopGroup, SocketDomain, SocketOptions, ) from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment from awsiot.greengrasscoreipc.model import PublishToIoTCoreRequest import awsiot.greengrasscoreipc.client as client TIMEOUT = 10 logger = logging.getLogger() handler = logging.StreamHandler(sys.stdout) logger.addHandler(handler) logger.setLevel(logging.INFO) logging.basicConfig() class IPCUtils: def connect(self): elg = EventLoopGroup() resolver = DefaultHostResolver(elg) bootstrap = ClientBootstrap(elg, resolver) socket_options = SocketOptions() socket_options.domain = SocketDomain.Local logger.info(str(os.getenv("SVCUID"))) amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID")) hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT") logger.info(str(hostname)) connection = Connection( host_name=hostname, port=8033, bootstrap=bootstrap, socket_options=socket_options, connect_message_amender=amender, ) self.lifecycle_handler = LifecycleHandler() connect_future = connection.connect(self.lifecycle_handler) connect_future.result(TIMEOUT) return connection