plugable-matrix-bot/matrix_bot_api/matrix_bot_api.py

331 lines
11 KiB
Python

import traceback
import re
import os
import sys
import hjson
import importlib
import sqlite3
import datetime as dt
from pip._internal import main as pipmain
from matrix_client.client import MatrixClient
from matrix_client.api import MatrixRequestError, MatrixHttpApi
from matrix_client.user import User
from matrix_bot_api.mregex_handler import MRegexHandler
MESSAGES_DIR = os.path.join(os.path.dirname(__file__), 'messages')
DATA_LOCATION = os.path.join(os.path.dirname(__file__), '../data/bot.db')
HELP_LOCATION = MESSAGES_DIR + '/help'
MESSAGES_LOCATION = MESSAGES_DIR + '/messages.dutch.hjson'
def eprint(*args, **kwargs):
"""Print error messages to stderr"""
print(*args, file=sys.stderr, **kwargs)
class MatrixBotAPI:
# rooms - List of rooms ids to operate in, or None to accept all rooms
def __init__(self, config, rooms=None):
self.config = config
self.username = self.config['bot_credentials']['username']
# Authenticate with given credentials
self.client = MatrixClient(self.config['bot_credentials']['server'])
try:
self.token = self.client.login_with_password(
self.config['bot_credentials']['username'],
self.config['bot_credentials']['password'])
except MatrixRequestError as e:
print(e)
if e.code == 403:
print("Bad username/password")
except Exception as e:
print("Invalid server URL")
traceback.print_exc()
self.api = MatrixHttpApi(
self.config['bot_credentials']['server'],
token=self.token)
# Store allowed rooms
self.rooms = rooms
# Store empty list of handlers
self.handlers = []
# If rooms is None, we should listen for invites and automatically
# accept them
if rooms is None:
self.client.add_invite_listener(self.handle_invite)
self.rooms = []
# Add all rooms we're currently in to self.rooms and add their
# callbacks
for room_id, room in self.client.get_rooms().items():
room.add_listener(self.handle_message)
self.rooms.append(room_id)
else:
# Add the message callback for all specified rooms
for room in self.rooms:
room.add_listener(self.handle_message)
# This flag can be set by the calling function to cancel all threads
# of all plugins
self.cancel = False
# Run setup
self.setup()
# Add plugins
self.add_plugins()
# Add callback for help function
self.help_handler = MRegexHandler(
self.config['triggers']['help'], self.help)
self.add_handler(self.help_handler)
# Load messages
with open(MESSAGES_LOCATION) as hjson_data:
self.messages = hjson.load(hjson_data)
def add_plugins(self):
"""Acquire list of plugins from configuration, load them,
initialize them, and add them to a list of object
"""
# Store empty list of plugins
self.plugin_objects = []
# Try to import plugins. All plugins must be located in the
# ./plugins directory
modules = []
#try:
# Loop through the available plugins, install their requirements,
# load them as module, run their setup, append them to a list,
# and add their handler variables
for i, plugin in enumerate(self.config['plugins']):
# Install requirements
self.install_requirements(plugin)
# Dynamically load the module
modules.append(
importlib.import_module(
"plugins.{0}.{0}".format(plugin), package = None))
# Run the module's setup function and save that it got installed
self.setup_plugin(modules[i])
# Create new instance of plugin and append to plugin_objects array
self.plugin_objects.append(modules[i].Plugin(self))
# Add handler of newly created instance to bot
self.add_handler(self.plugin_objects[i].handler)
#except:
# eprint("Importing one or more of the plugins did not go well!")
# sys.exit()
def check_installation_plugin(self, module_name):
"""Function returns 0 if a plugin with that name is not
currently installed
"""
# Define query to SELECT event from SQLite DB
select_sql = f"""SELECT module_name
FROM plugins
WHERE module_name = "{module_name}" """
# Open SQLite database and run query
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(select_sql)
number_fetched = sqlite_cursor.fetchall()
sqlite_db.close()
return number_fetched
def install_requirements(self, module_name):
"""Install requirements for a given plugin."""
if self.check_installation_plugin(module_name):
# Do nothing, this was already installed
print(f"Check: {module_name} already installed.")
else:
# Install requirements
pipmain(
['install',
'-r',
f'plugins/{module_name}/requirements.txt'])
def setup_plugin(self, module):
"""Run the setup for a given plugin and subsequently save that
this plugin was installed.
"""
module_name = module.__name__.split('.')[-1]
if not self.check_installation_plugin(module_name):
# This appears to be new, insert it, process requirements.txt
# and run the plugin's setup file.
print(f"Running installation of {module_name}.")
# Run module's install method
#try:
module.setup()
#except:
# print(f"{module_name} did not specify setup(). Skipping...")
# Save in database that we installed this plugin
datetime_added = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Open SQLite database
insert_sql = f"""INSERT INTO plugins(module_name, datetime_added)
VALUES('{module_name}', '{datetime_added}')"""
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(insert_sql)
sqlite_db.commit()
sqlite_db.close()
# Tell the user how awesome we are
print(f"Successfully installed {module_name}.")
def setup(self):
"""This function only runs once, ever. It initializes an SQLite
database, sets the bot's avatar, and name
"""
try:
# Try to make new directory
os.mkdir("data")
# Create SQLite database
# Define query to INSERT event table to SQLite DB
sql = """CREATE TABLE 'plugins' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'datetime_added' DATTIME,
'module_name' TEXT);"""
# Open, execute, commit, and close SQLite database
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(sql)
sqlite_db.commit()
sqlite_db.close()
# Get user instance
self.user = self.client.get_user(self.username)
## Set username
self.user.set_display_name(self.config['character']['name'])
## Open, upload, and set avatar
f = open(self.config['character']['avatar'], mode="rb")
avatar = self.client.upload(
f, self.config['character']['avatar_mime'])
self.user.set_avatar_url(avatar)
except FileExistsError:
# Do nothing, data directory already exists
pass
def send_message_private_public(self, room, event, message):
"""This method takes a room, event, and message and makes sure
that the message is sent in a private room and not in a public
room. If no private room exists, it will create a private room
with the sender of the event.
"""
orig_room = room
found_room = False
for room_id, room in self.client.get_rooms().items():
joined_members = room.get_joined_members()
# Check for rooms with only two members
if len(joined_members) == 2:
# Check if sender is in that room
for member in joined_members:
if event['sender'] == member.user_id:
found_room = True
if found_room:
# If the flag is set, we do not need to check further rooms
break
# Send help message to an existing room or to a new room
if found_room:
room.send_html(message)
else:
room = self.client.create_room(invitees=[event['sender']]);
room.send_html(message)
if room != orig_room:
display_name_sender = self.api.get_display_name(event['sender'])
orig_room.send_text(self.messages['private_message'].format(
display_name_sender))
def help(self, room, event):
"""Prints a general help message and then grabs all help
messages from the different plugins
"""
help_text = open(HELP_LOCATION, mode="r").read()
for plugin in self.plugin_objects:
try:
help_text += plugin.help()
except TypeError:
# The plugin probably returned 0, ignore it
pass
self.send_message_private_public(room, event, help_text)
def add_handler(self, handler):
try:
# Assume it's a list and not a single handler
for handler_obj in handler:
self.handlers.append(handler_obj)
except TypeError:
# If it is not a list, TypeError: not iterable will occur
self.handlers.append(handler)
def handle_message(self, room, event):
# Make sure we didn't send this message
if re.match(self.username, event['sender']):
return
# Loop through all installed handlers and see if they need to be called
for handler in self.handlers:
if handler.test_callback(room, event):
# This handler needs to be called
try:
handler.handle_callback(room, event)
except:
traceback.print_exc()
def handle_invite(self, room_id, state):
print("Got invite to room: " + str(room_id))
print("Joining...")
room = self.client.join_room(room_id)
# Add message callback for this room
room.add_listener(self.handle_message)
# Add room to list
self.rooms.append(room)
def start_polling(self):
# Starts polling for messages
self.client.start_listener_thread()
return self.client.sync_thread