#!/usr/bin/env python3 # SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # SPDX-License-Identifier: MIT-0 from sys import byteorder from array import array import base64 import os import pyaudio import boto3 import uuid import zlib import json import time from dotenv import load_dotenv load_dotenv() session = boto3.Session(aws_access_key_id=os.getenv('AWS_KEY_ID'), aws_secret_access_key=os.getenv('AWS_SECRET_KEY')) import iot_sub #from iot_sub import mqtt_subscribe, mqtt_publish import struct import platform platform_sys=platform.system() print('platform: ',platform_sys) pa = None audio_stream = None from ctypes import * from contextlib import contextmanager if platform_sys != "Darwin": from gpiozero import LED from apa102 import APA102 COLORS_RGB = dict( blue=(0, 0, 255), green=(0, 255, 0), orange=(255, 128, 0), pink=(255, 51, 153), purple=(128, 0, 128), red=(255, 0, 0), white=(255, 255, 255), yellow=(255, 255, 51), off=(0, 0, 0), ) KEYWORDS_COLOR = { 'picovoice': 'green', 'porcupine': 'blue', 'blueberry': 'orange', 'terminator': 'off', } driver = APA102(num_led=12) power = LED(5) power.on() keywords = list(KEYWORDS_COLOR.keys()) #Change to reflect the bot and alias you created #bot_id = 'AVLBAKUHJ9' #bot_alias_id='4ZTMYYWUQA' bot_id = 'ISXIO344FF' bot_alias_id='ZWIGXTFIAU' THRESHOLD = 500 CHUNK_SIZE = 2048 FORMAT = pyaudio.paInt16 RATE = 16000 #RATE = 44100 VOICE_ID = None polly = session.client('polly') lexv2 = session.client('lexv2-runtime') translate = session.client(service_name='translate', region_name='us-east-1', use_ssl=True) ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p) def py_error_handler(filename, line, function, err, fmt): pass c_error_handler = ERROR_HANDLER_FUNC(py_error_handler) @contextmanager def noalsaerr(): if platform_sys != 'Darwin': asound = cdll.LoadLibrary('libasound.so') asound.snd_lib_error_set_handler(c_error_handler) yield asound.snd_lib_error_set_handler(None) print("noalsoerr activated") else: yield print('no alsa err mode deactivated') def is_silent(data): return max(data) < THRESHOLD def record_phrase(): with noalsaerr(): num_silent = 0 start = False p = pyaudio.PyAudio() if platform_sys == 'Darwin': #stream config for mac stream = p.open(format=FORMAT, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK_SIZE) else: #stream config for RaspPiI #stream = p.open(format=pyaudio.paInt16, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK_SIZE, input_device_index=2) stream = p.open(format=pyaudio.paInt16, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK_SIZE) stream.start_stream() recording = array('h') while True: data = array('h', stream.read(CHUNK_SIZE)) if byteorder == 'big': data.byteswap() recording.extend(data) silent = is_silent(data) if silent and start: num_silent += 1 sessionlength +=1 elif not silent and not start: start = True num_silent = 0 sessionlength = 0 if start: sessionlength +=1 if start and num_silent > 15: break if start and sessionlength > 80: break stream.stop_stream() stream.close() p.terminate() if platform_sys != 'Darwin': set_color("off") print("turning off leds and returning recorded voice") return recording def play_audio(audio): p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=1, rate=RATE, output=True) stream.write(audio.read()) stream.stop_stream() stream.close() def set_color(colori): color = COLORS_RGB[colori] for i in range(12): driver.set_pixel(i, color[0], color[1], color[2]) driver.show() def color_start(): pixels_number = 12 pixels = [0, 0, 12, 12, 0, 0, 0, 24] * pixels_number cc = 0 while cc <= 20: for i in range(12): driver.set_pixel(i, pixels[4*i + 1], pixels[4*i + 2], pixels[4*i + 3]) driver.show() time.sleep(0.2) pixels = pixels[-4:] + pixels[:-4] cc +=1 set_color("off") def translatelang(text,lang): result = translate.translate_text(Text=text, SourceLanguageCode="en", TargetLanguageCode=lang) print('TranslatedText: ' + result.get('TranslatedText')) print('SourceLanguageCode: ' + result.get('SourceLanguageCode')) print('TargetLanguageCode: ' + result.get('TargetLanguageCode')) return result.get('TranslatedText') def word(): try: with noalsaerr(): porcupine = pvporcupine.create(keywords=["picovoice", "blueberry"]) #porcupine = pvporcupine.create(keyword_paths=['picovoice/hey-t-s-a-bot__en_mac_2021-10-21-utc_v1_9_0.ppn']) pa = pyaudio.PyAudio() audio_stream = pa.open( rate=porcupine.sample_rate, channels=1, format=pyaudio.paInt16, input=True, #input_device_index=2, frames_per_buffer=porcupine.frame_length) print('[Listening...]') while True: pcm = audio_stream.read(porcupine.frame_length) pcm = struct.unpack_from("h" * porcupine.frame_length, pcm) keyword_index = porcupine.process(pcm) if keyword_index >= 0: print("Hotword '%s' Detected" % keywords[keyword_index]) if platform_sys != 'Darwin': set_color(COLORS_RGB[KEYWORDS_COLOR[keywords[keyword_index]]]) return "hotword" except Exception as e: print('word exception: ',e) finally: if porcupine is not None: porcupine.delete() if audio_stream is not None: audio_stream.close() print('audio stream closed') if pa is not None: pa.terminate() print("pa terminated") def synthesize_speech(message,text_type): global VOICE_ID if VOICE_ID is None: VOICE_ID = get_voiceid() response = polly.synthesize_speech( Engine='standard', OutputFormat = 'pcm', Text = message, VoiceId = VOICE_ID, TextType=text_type ) audio = response['AudioStream'] play_audio(audio) def call_lexv2(session_id, recording, locale_Id): try: print('locale_Id: ', locale_Id) response = lexv2.recognize_utterance( botId=bot_id, botAliasId=bot_alias_id, localeId=locale_Id, sessionId=session_id, requestContentType='audio/l16; rate=16000; channels=1', responseContentType='audio/pcm', inputStream=recording ) print("Lex response ", response) return response except Exception as e: print("there is an error: ", e) response=synthesize_speech("I could not understand") return "none" def decode(data): decoded_data = base64.b64decode(data) decompressed_data = zlib.decompress(decoded_data, 16+zlib.MAX_WBITS) return json.loads(decompressed_data) def get_voiceid(): lexv2_model = session.client('lexv2-models') bot_alias = lexv2_model.describe_bot_alias( botAliasId = bot_alias_id, botId = bot_id ) bot_locale = lexv2_model.describe_bot_locale( botId = bot_id, botVersion = bot_alias['botVersion'], localeId = 'en_US' ) return bot_locale['voiceSettings']['voiceId'] def main(): print("starting") #word() #print ('bot: ', bot) print("going to record") session_id = str(uuid.uuid4()) #while 1: while iot_sub.launchlextt: try: if platform_sys != 'Darwin': set_color(COLORS_RGB[KEYWORDS_COLOR['porcupine']]) print('iot_sub.launchlextt2: ', iot_sub.launchlextt) recording = record_phrase() if not iot_sub.launchlextt: print('stop in middle') break response = call_lexv2(session_id, bytes(recording)) if response == 'none': break play_audio(response['audioStream']) session_state = decode(response['sessionState']) dialog_action = session_state['dialogAction']['type'] state = session_state['intent']['state'] if dialog_action == 'Close' and state == 'Fulfilled': break except Exception as e: print(e) if __name__ == "__main__": bot = True global launchlex launchlex = False #sub = mqtt_subscribe() intro = 0 close = 0 if platform_sys !='Darwin': color_start() while bot: pass #print('bot: ',bot) #print('iot_sub.launchlextt: ', iot_sub.launchlextt) #print('sub: ', sub) #if iot_sub.launchlextt: #if intro == 0: # synthesize_speech("Hello. I am T. S. A. Bot. How can I help?") # main() # intro +=1 # close += 1 #else: # time.sleep(4) # print("Do you have a question?.") # synthesize_speech("Do you have a question?.") # main() #synthesize_speech("Do you have another question?.") #synthesize_speech("Would you like to try again?.") # if platform_sys !='Darwin': # set_color(COLORS_RGB["white"]) # time.sleep(0.5) # set_color(COLORS_RGB["off"]) #iot_sub.launchlextt = False # else: # if close != 0: # synthesize_speech("Thank you. Have a nice day!") # close = 0 # intro = 0 # pass # time.sleep(2) #while 1: # key = input("Please input y/n: ") # if key == 'y': # break # elif key == 'n': # bot = False # break # else: # synthesize_speech("Please input y or n.") #synthesize_speech("Thank you. Have a nice day!")