plugable-matrix-bot/matrix_bot_api/matrix_bot_api.py

300 lines
9.8 KiB
Python

import traceback
import re
import os
import sys
import hjson
import importlib
import sqlite3
import datetime as dt
from pip._internal import main as pipmain
from matrix_client.api import MatrixRequestError
from matrix_bot_api.mregex_handler import MRegexHandler
from .custom_matrix_client.api import CustomMatrixHttpApi
from .custom_matrix_client.client import CustomMatrixClient
ALIASES_LOCATION = 'aliases.hjson'
CONFIG_LOCATION = 'config.hjson'
DATA_LOCATION = 'data/bot.db'
MESSAGES_DIR = os.path.join(os.path.dirname(__file__), 'messages')
HELP_LOCATION = MESSAGES_DIR + '/help'
def eprint(*args, **kwargs):
"""Print error messages to stderr"""
print(*args, file=sys.stderr, **kwargs)
class MatrixBotAPI:
# rooms - List of rooms ids to operate in, or None to accept all rooms
def __init__(self, rooms=None):
# Load the configuration
with open(CONFIG_LOCATION) as hjson_data:
self.config = hjson.load(hjson_data)
# Load the aliases
with open(ALIASES_LOCATION) as hjson_data:
self.aliases = hjson.load(hjson_data)
print(self.aliases)
self.username = self.config['bot_credentials']['username']
# Authenticate with given credentials
self.client = CustomMatrixClient(
self.config['bot_credentials']['server'])
try:
self.token = self.client.login_with_password(
self.config['bot_credentials']['username'],
self.config['bot_credentials']['password'])
except MatrixRequestError as e:
print(e)
if e.code == 403:
print("Bad username/password")
sys.exit()
except Exception as e:
print("Invalid server URL")
traceback.print_exc()
sys.exit()
# Store allowed rooms
self.rooms = rooms
# Store empty list of handlers
self.handlers = []
# If rooms is None, we should listen for invites and automatically
# accept them
if rooms is None:
self.client.add_invite_listener(self.handle_invite)
self.rooms = []
# Add all rooms we're currently in to self.rooms and add their
# callbacks
for room_id, room in self.client.rooms.items():
room.add_listener(self.handle_message)
self.rooms.append(room_id)
else:
# Add the message callback for all specified rooms
for room in self.rooms:
room.add_listener(self.handle_message)
# This flag can be set by the calling function to cancel all threads
# of all plugins
self.cancel = False
# Run setup
self.setup()
# Add plugins
self.add_plugins()
# Add callback for help function
self.help_handler = MRegexHandler("help", self.help, self)
self.add_handler(self.help_handler)
def add_plugins(self):
"""Acquire list of plugins from configuration, load them,
initialize them, and add them to a list of object
"""
# Store empty list of plugins
self.plugin_objects = []
# Try to import plugins. All plugins must be located in the
# ./plugins directory
modules = []
try:
# Loop through the available plugins, install their requirements,
# load them as module, run their setup, append them to a list,
# and add their handler variables
for i, plugin in enumerate(self.config['plugins']):
# Install requirements
self.install_requirements(plugin)
# Dynamically load the module
modules.append(
importlib.import_module(
"plugins.{0}.{0}".format(plugin), package = None))
# Run the module's setup function and save that it got installed
self.setup_plugin(modules[i])
# Create new instance of plugin and append to plugin_objects array
self.plugin_objects.append(modules[i].Plugin(self))
# Add handler of newly created instance to bot
self.add_handler(self.plugin_objects[i].handler)
except:
eprint("Importing one or more of the plugins did not go well!")
traceback.print_exc()
sys.exit()
def check_installation_plugin(self, module_name):
"""Function returns 0 if a plugin with that name is not
currently installed
"""
# Define query to SELECT event from SQLite DB
select_sql = f"""SELECT module_name
FROM plugins
WHERE module_name = "{module_name}" """
# Open SQLite database and run query
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(select_sql)
number_fetched = sqlite_cursor.fetchall()
sqlite_db.close()
return number_fetched
def install_requirements(self, module_name):
"""Install requirements for a given plugin."""
if self.check_installation_plugin(module_name):
# Do nothing, this was already installed
print(f"Check: {module_name} already installed.")
else:
# Install requirements
pipmain(
['install',
'-r',
f'plugins/{module_name}/requirements.txt'])
def setup_plugin(self, module):
"""Run the setup for a given plugin and subsequently save that
this plugin was installed.
"""
module_name = module.__name__.split('.')[-1]
if not self.check_installation_plugin(module_name):
# This appears to be new, insert it, process requirements.txt
# and run the plugin's setup file.
print(f"Running installation of {module_name}.")
# Run module's install method
try:
module.setup()
except:
print(f"{module_name} did not specify setup(). Skipping...")
# Save in database that we installed this plugin
datetime_added = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Open SQLite database
insert_sql = f"""INSERT INTO plugins(module_name, datetime_added)
VALUES('{module_name}', '{datetime_added}')"""
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(insert_sql)
sqlite_db.commit()
sqlite_db.close()
# Tell the user how awesome we are
print(f"Successfully installed {module_name}.")
def setup(self):
"""This function only runs once, ever. It initializes an SQLite
database, sets the bot's avatar, and name
"""
try:
# Try to make new directory
os.mkdir("data")
# Create SQLite database
# Define query to INSERT event table to SQLite DB
sql = """CREATE TABLE 'plugins' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'datetime_added' DATTIME,
'module_name' TEXT);"""
# Open, execute, commit, and close SQLite database
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(sql)
sqlite_db.commit()
sqlite_db.close()
# Get user instance
self.user = self.client.get_user(self.username)
## Set username
self.user.set_display_name(self.config['character']['name'])
## Open, upload, and set avatar
f = open(self.config['character']['avatar'], mode="rb")
avatar = self.client.upload(
f, self.config['character']['avatar_mime'])
self.user.set_avatar_url(avatar)
except FileExistsError:
# Do nothing, data directory already exists
pass
def help(self, room, event):
"""Prints a general help message and then grabs all help
messages from the different plugins
"""
help_text = open(HELP_LOCATION, mode="r").read()
for plugin in self.plugin_objects:
try:
help_text += plugin.help()
except TypeError:
# The plugin probably returned 0, ignore it
pass
self.client.send_message_private_public(room, event, help_text)
def add_handler(self, handler):
try:
# Assume it's a list and not a single handler
for handler_obj in handler:
self.handlers.append(handler_obj)
except TypeError:
# If it is not a list, TypeError: not iterable will occur
self.handlers.append(handler)
def handle_message(self, room, event):
# Make sure we didn't send this message
if re.match(self.username, event['sender']):
return
# Loop through all installed handlers and see if they need to be called
for handler in self.handlers:
if handler.test_callback(room, event):
# This handler needs to be called
try:
handler.handle_callback(room, event)
except:
traceback.print_exc()
def handle_invite(self, room_id, state):
print("Got invite to room: " + str(room_id))
print("Joining...")
room = self.client.join_room(room_id)
# Add message callback for this room
room.add_listener(self.handle_message)
# Add room to list
self.rooms.append(room)
def start_polling(self):
# Starts polling for messages
self.client.start_listener_thread()
return self.client.sync_thread