commit 36810505cd75123a60dab56662892bc271e147ab Author: Cutieguwu Date: Mon Sep 30 21:30:28 2024 -0400 Initial Commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2b8f446 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +.whisper +.cache +src/plugins/* +/cache +/assets +/temp + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*/__pycache__/ +*.py[cod] +*$py.class \ No newline at end of file diff --git a/README.adoc b/README.adoc new file mode 100644 index 0000000..e0aabab --- /dev/null +++ b/README.adoc @@ -0,0 +1,37 @@ +:source-highlighter: highlight.js +:highlightjs-languages: rust +:toc: auto + += *Andromeda* + +Andromeda is meant to be an extensible and flexible personal assistant. + +Andromeda will push my Python and Rust skills to their limits. Python for ease of use, Rust to speed up portions of Andromeda where possible. + +There will be bugs and there will be limitations. + +== *Background and Goals*: + +After not looking at my CutieAssistant project for a few months, I suddenly had a thought for a somewhat unique identifying name. One that had not been used before, and wouldn't come up in common speech. + +Andromeda. + +...and then I remembered https://en.wikipedia.org/wiki/Andromeda_(TV_series)[_Andromeda_] the TV show. + +Well, maybe not quite original, but the name gave the project a personality with a theme of Space (...also very original). This reinvigorated my energy to work on this project, unlike so many before that remain unfinished. + +== *Immediate plans*: + +* [ ] Automatic response map generation. +** Currently manually written in assets/service_response_map.json +* [ ] Build a plugin template. +*** [ ] Include Andromeda version compatibility. +** [ ] Mapping of commands. +** [ ] Mapping for base required functions and inherited classes. +** [ ] Support for automatic update fetching via git. +* [ ] Plugin installer. +* [ ] Plugin updater. +* [ ] Move multiple base Andromeda functionality to seperately packaged plugins. +** [ ] andromeda-joke +** [ ] andromeda-timer +* [ ] Whatever I tackle afterward. \ No newline at end of file diff --git a/src/cutie_assistant/__init__.py b/src/cutie_assistant/__init__.py new file mode 100644 index 0000000..3e9e278 --- /dev/null +++ b/src/cutie_assistant/__init__.py @@ -0,0 +1,223 @@ +#!~/.pyenv/versions/3.11.6/bin/python +# +# Copyright (c) 2024 Cutieguwu | Olivia Brooks +# +# -*- coding: utf-8 -*- +# @Title: CutieAssistant +# @Author: Cutieguwu | Olivia Brooks +# @Description: Personal Voice Assistant +# +# @Script: __init__.py +# @Date Created: 12 Jul, 2024 +# @Last Modified: 22 Jul, 2024 +# @Last Modified by: Cutieguwu | Olivia Brooks +# -------------------------------------------- + +from .utils import install_dependencies, contains_keywords, clean_query, get_threads, convert_to_flac, get_audio_file_name, _load_plugins + + +install_dependencies({"icecream", "SpeechRecognition", "coqui-tts", "openai-whisper", "pyaudio", "soundfile", "torch", "python-vlc"}) + +from icecream import ic +from TTS.api import TTS +from torch.cuda import is_available as cuda_available +import speech_recognition as sr +from speech_recognition import WaitTimeoutError +import vlc +from time import sleep +from os import remove as remove_file +from .base import Task, WaitTimeTrigger + + +ic.configureOutput("INFO | ") + +class Assistant: + """ + Personal Assistant + """ + + def __init__(self): + print("INFO | Starting...") + + self.VERSION = [0, 0, 0] + + self.tracked_tasks = [] + self.plugins = {} + + print("INFO | Loading discovered plugins...") + _load_plugins(self) + print("INFO | Done.") + + ic(self.plugins) + + self._set_tts() + + self._set_mic_source() + + self.assistantOn = True + + print("INFO | Started.") + + def run(self): + """ + Main run loop. + """ + + while self.assistantOn: + self.run_checks() + + query = self.listen() + + if query != ("" or None) and contains_keywords(["execute"], query): + self.check_query(query) + + def speak(self, response_map:dict): + """ + Assistant Responses. + """ + + try: + match response_map["response_type"]: + case "rare": + output_path = "temp/output.wav" + raise FileNotFoundError # rare types are saved as output.wav + + case "asset": + audio_path = "assets/effects/" + + case _: # common and builtin types. + audio_path = f"cache/responses/{response_map['response_type']}/" + + audio_path = f"{audio_path}{get_audio_file_name(response_map)}" + output_path = f"{audio_path}.wav" + + with open(f"{audio_path}.flac"): + playback_path = f"{audio_path}.flac" + + except FileNotFoundError: + + print("INFO | Generating response as none was found...") + + self.TTS.tts_to_file(text=response_map["response"], speaker_wav="assets/speakers/venti.wav", file_path=output_path, language=self.TTS_LANGUAGE) + + print("INFO | Done.") + + if output_path != "temp/output.wav": + print("INFO | Response is not rare.\nINFO | Converting to flac...") + convert_to_flac(output_path) + print("INFO | Done.") + playback_path = f"{audio_path}.flac" + else: + playback_path = output_path + + media_player = vlc.MediaPlayer(playback_path) + + media_player.play() + sleep(media_player.get_length() / 1000) + + if response_map["response_type"] == "common": + TimedCache(self, playback_path) + + def listen(self): + """ + Listens for a command set. + """ + + try: + recognizer = sr.Recognizer() + + with sr.Microphone(device_index=self.MICROPHONE_INDEX) as microphone: + recognizer.pause_threshold = 1 + print("INFO | Adjusting for ambient noise...") + recognizer.adjust_for_ambient_noise(microphone) + print("INFO | Done.") + print("INFO | Recording...") + audio = recognizer.listen(microphone, timeout=2, phrase_time_limit=5) + print("INFO | Done.") + + print("INFO | Recognizing...") + try: + if self.TTS_LANGUAGE == "en": + query = recognizer.recognize_whisper(audio, model="small.en", language="en") + else: + query = recognizer.recognize_whisper(audio, model="small", language="en") + ic(query) + + print("INFO | Done.") + return clean_query(query) + + except Exception as err: + ic(err) + except WaitTimeoutError: + print("INFO | Heard nothing.") + except AttributeError: + print("Failed to open microphone.") + + def check_query(self, query:str): + """ + Checks the query and calls an appropriate function. + """ + + pass + + def run_checks(self): + """ + Runs some checks. + """ + + self.check_tasks() + + def check_tasks(self): + """ + Checks and executes triggered tasks. + """ + + if len(self.tracked_tasks) == 0: + return + + for t in self.tracked_tasks: + t.check() + + def _set_tts(self): + """ + Autoconfigures the TTS engine. + """ + + self.TTS_DEVICE = "cuda" if cuda_available() else "cpu" + if self.TTS_DEVICE != "cuda": + self.CPU_THREADS = get_threads() + ic(f"{self.TTS_DEVICE} - {self.CPU_THREADS}") + else: + ic(self.TTS_DEVICE) + + self.TTS_LANGUAGE = "en" + + print("INFO | Loading Model...") + self.TTS = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(self.TTS_DEVICE) + print("INFO | Loaded Model.") + + def _set_mic_source(self): + """ + Configures the microphone source. + """ + + print("--------------------------------------------") + for index, device in enumerate(sr.Microphone.list_microphone_names()): # Listing only working breaks on most systems. + print("Microphone(device_index={0}) - '{1}' ".format(index, device)) + + print("--------------------------------------------") + self.MICROPHONE_INDEX = int(input("Enter Microphone Index: ")) + +class TimedCache(Task): + def __init__(self, assistant, path, days:float = 30.0, lifespan=1): + Task.__init__(self, assistant) + + self.path = path + self.trigger = WaitTimeTrigger(days * 86400, lifespan) + + def run(self): + """ + Runs the task's function. + """ + + remove_file(self.path) diff --git a/src/cutie_assistant/base.py b/src/cutie_assistant/base.py new file mode 100644 index 0000000..7494fd5 --- /dev/null +++ b/src/cutie_assistant/base.py @@ -0,0 +1,182 @@ +#!~/.pyenv/versions/3.11.6/bin/python +# +# Copyright (c) 2024 Cutieguwu | Olivia Brooks +# +# -*- coding: utf-8 -*- +# @Title: Base Classes +# @Author: Cutieguwu | Olivia Brooks +# @Description: Base classes to inherit from for functions in CutieAssistant. +# +# @Script: base.py +# @Date Created: 22 Jul, 2024 +# @Last Modified: 24 Jul, 2024 +# @Last Modified by: Cutieguwu | Olivia Brooks +# -------------------------------------------- + +from time import time +from json import load as load_json +from os.path import dirname +from tomllib import load as load_toml +from enum import Enum + + +class Plugin(): + def __init__(self, plugin, assistant, plugin_file): + self.assistant = assistant + + plugin_path = dirname(plugin_file) + + self.EXPERIMENTAL_FEATURES = [].clear() # Reduce memory allocation instead of using None + + self._load_plugin_properties(plugin_path) + self._load_keywords(plugin_path) + self._register(plugin.__module__) + + self.is_active_background = False + + def _get_compatability(self) -> Enum: + """ + Checks to ensure that the plugin is compatible with the assistant. + """ + + version_max = self.ASSISTANT_VERSION_SUPPORT["max"] + version_min = self.ASSISTANT_VERSION_SUPPORT["min"] + + if version_max is None: + if version_min is None or self.assistant.VERSION > version_min: + return PluginSupport.supported_unknown_future + + elif self.assistant.VERSION < version_min: + return PluginSupport.unsupported_old + + elif self.assistant.VERSION > version_max: + return PluginSupport.unsupported_new + + else: + return PluginSupport.supported + + + def _load_keywords(self, path): + """ + Loads keywords from file. + """ + + with open(path + "/keywords.json") as f: + self.KEYWORDS = load_json(f) + + def _load_plugin_properties(self, path): + """ + Loads the plugin properties from properties.toml + """ + + with open(path + "/properties.toml", "rb") as f: + properties = load_toml(f) + + self.NAME = properties["plugin"]["name"] + self.VERSION = properties["plugin"]["version"] + + self.ASSISTANT_VERSION_SUPPORT:dict = {} + + for ver in ["min", "max"]: + try: + self.ASSISTANT_VERSION_SUPPORT[ver] = properties["assistant"]["version"][ver] + except KeyError: + self.ASSISTANT_VERSION_SUPPORT[ver] = None + + self.IS_SUPPORTED = self._get_compatability() + + try: + self.EXPERIMENTAL_FEATURES = properties["assistant"]["features"] + except KeyError: + pass + + def _register(self, plugin_name): + """ + Adds `self` as plugin for `self.parent` + """ + + self.assistant.plugins[plugin_name] = self + +class Trigger: + def reset(self): + """ + Resets the trigger. + """ + + if self.lifespan == 1: + raise TriggerLifespanException + elif self.lifespan != -1: + self.lifespan = self.lifespan - 1 + + self.build() + +class Task: + def __init__(self, assistant): + self.assistant = assistant + + self._register() + + def remove(self): + """ + Removes the task. + """ + + self.assistant.tracked_tasks.remove(self) + + def check(self): + """ + Checks the task and runs and resets, or removes as needed. + """ + + try: + self.run() + self.trigger.reset() + except TriggerLifespanException: + self.remove() + + def _register(self): + """ + Registers the task with the assistant. + """ + + self.assistant.tracked_tasks.append(self) + +class WaitTimeTrigger(): + def __init__(self, wait_duration, lifespan = 1): + + self.build() + self.wait_duration = wait_duration + self.lifespan = lifespan + + def check(self): + """ + returns `True` if trigger condition is met. + """ + + return True if time() - self.wait_duration >= self.wait_duration else False + + def build(self): + """ + Builds trigger condition. + """ + + self.start_time = time() + +class TriggerLifespanException(Exception): + def __init__(self): + self.message = "Lifespan of a trigger was spent." + +class PluginSupport(Enum): + """ + `supported` Enabled; Stable. Plugin is compatible with the assistant.\n + `supported_unknown_future` Enabled; Potentially Unstable. Plugin support is unknown; no max version was set in its properties.\n + `unsupported_new` Disabled; Stable. Plugin is too new; will not function with the assistant.\n + `unsupported_old` Disabled; Stable. Plugin is too old and will not function with the assistant.\n + `overridden` Enabled; Potentially Unstable. Only available if plugin is determined as `unsupported_old` so that if max version set and unmaintained, can be made to work. + """ + + supported = None + supported_unknown_future = None + unsupported_new = None + unsupported_old = None + overridden = None diff --git a/src/cutie_assistant/utils.py b/src/cutie_assistant/utils.py new file mode 100644 index 0000000..b180198 --- /dev/null +++ b/src/cutie_assistant/utils.py @@ -0,0 +1,143 @@ +#!~/.pyenv/versions/3.11.6/bin/python +# +# Copyright (c) 2024 Cutieguwu | Olivia Brooks +# +# -*- coding: utf-8 -*- +# @Title: CutieAssistant System Utilities +# @Author: Cutieguwu | Olivia Brooks +# @Description: Some utilities for running CutieAssistant +# +# @Script: system_utils.py +# @Date Created: 20 Jul, 2024 +# @Last Modified: 24 Jul, 2024 +# @Last Modified by: Cutieguwu | Olivia Brooks +# -------------------------------------------- + +from pkg_resources import working_set +from subprocess import run, CalledProcessError +from sys import executable +from icecream import ic +from os import cpu_count +from speech_recognition.audio import get_flac_converter +from json import load +from importlib import import_module +from pkgutil import iter_modules +import plugins + + +def install_dependencies(dependencies:set): + """ + Tries to install and import any missing dependencies from set. + """ + + libraries_installed = { + pkg.key for pkg in working_set + } + + libraries_missing = list(dependencies - libraries_installed) # Lists are faster to iterate over due to lack of hash table. + + try: + library = "pip" + if len(libraries_missing) != 0: + run([executable, "-m", "pip", "install", "--upgrade", "pip"], check=True) + for library in libraries_missing: + run([executable, "-m", "pip", "install", library], check=True) + + except CalledProcessError: + print(f"Error | Cannot find or install {library}") + raise SystemExit + +def contains_keywords(keywords:list, query:str) -> bool: + """ + Checks if a string contains one of the given keywords. + """ + + for k in keywords: + if f" {k} " in f" {query} ": + return True + + return False + +def clean_query(query:str) -> str: + """ + Cleans a query. + """ + + query_clean = "" + + for c in query.lower(): + if c.isalpha() or c == " ": + query_clean = query_clean + c + + return query_clean + +def run_command(command_list:list): + """ + Runs a command and returns its output. + """ + + try: + return run(command_list, capture_output=True, check=True).stdout.decode() + except Exception as err: + ic(f"Error | {command_list} raised {err}") + return err + +def get_threads() -> int: + """ + Gets the number of threads on the system. + If `os.cpu_count` returns `None`, sets thread count to `1`. + """ + + threads = cpu_count() + + return threads if threads is not None else 1 + +def convert_to_flac(source_path:str): + """ + Converts an audio file to flac. + Deletes original. + """ + + run_command( + [ + get_flac_converter(), + "--delete-input-file", + "--best", + source_path + ] + ) + +def get_response_map(service:str, response:str) -> dict: + """ + Returns a `dict` of response commonality. + """ + + with open("assets/service_response_map.json", "r") as f: + response_map = load(f)[service] + + response_map["service"] = service + response_map["response"] = response + + return response_map + +def get_audio_file_name(response_map) -> str: + """ + Names and formats and audio file name. + """ + + file_name_response = "" + + for c in response_map["response"]: + if c.isalpha(): + file_name_response = file_name_response + c.lower() + else: + file_name_response = file_name_response + "-" + + return f'{response_map["service"].upper()}_{file_name_response}' + +def _load_plugins(assistant): + """ + Loads all discovered plugins. + """ + + assistant.plugins = {import_module(name).Plugin(assistant) for finder, name, ispkg in iter_modules(plugins.__path__, plugins.__name__ + ".")} \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..760ba89 --- /dev/null +++ b/src/main.py @@ -0,0 +1,23 @@ +#!~/.pyenv/versions/3.11.6/bin/python +# +# Copyright (c) 2024 Cutieguwu | Olivia Brooks +# +# -*- coding: utf-8 -*- +# @Title: Personal Assistant +# @Author: Cutieguwu | Olivia Brooks +# @Description: Personal Assistant. +# +# @Script: main.py +# @Date Created: 22 Jul, 2024 +# @Last Modified: 22 Jul, 2024 +# @Last Modified by: Cutieguwu | Olivia Brooks +# -------------------------------------------- + +from cutie_assistant import Assistant + +if __name__ == "__main__": + try: + assistant = Assistant() + assistant.run() + except KeyboardInterrupt: + pass \ No newline at end of file