plugable-matrix-bot/matrix_bot_api/matrix_bot_api.py
Dennis eb2b887c0a Transitioned from JSON to Hjson. Closes #3
All configuration files and all messages files should now preferrable be
done in Hjson. This format supports, i.a. features, multi line strings
and commenting. This greatly improves readability.

All installation and documentation files are also updated in this
commit.
2019-01-14 23:47:38 +01:00

331 lines
11 KiB
Python

import traceback
import re
import os
import sys
import hjson
import importlib
import sqlite3
import datetime as dt
from pip._internal import main as pipmain
from matrix_client.client import MatrixClient
from matrix_client.api import MatrixRequestError, MatrixHttpApi
from matrix_client.user import User
from matrix_bot_api.mregex_handler import MRegexHandler
MESSAGES_DIR = os.path.join(os.path.dirname(__file__), 'messages')
DATA_LOCATION = os.path.join(os.path.dirname(__file__), '../data/bot.db')
HELP_LOCATION = MESSAGES_DIR + '/help'
MESSAGES_LOCATION = MESSAGES_DIR + '/messages.dutch.hjson'
def eprint(*args, **kwargs):
"""Print error messages to stderr"""
print(*args, file=sys.stderr, **kwargs)
class MatrixBotAPI:
# rooms - List of rooms ids to operate in, or None to accept all rooms
def __init__(self, config, rooms=None):
self.config = config
self.username = self.config['bot_credentials']['username']
# Authenticate with given credentials
self.client = MatrixClient(self.config['bot_credentials']['server'])
try:
self.token = self.client.login_with_password(
self.config['bot_credentials']['username'],
self.config['bot_credentials']['password'])
except MatrixRequestError as e:
print(e)
if e.code == 403:
print("Bad username/password")
except Exception as e:
print("Invalid server URL")
traceback.print_exc()
self.api = MatrixHttpApi(
self.config['bot_credentials']['server'],
token=self.token)
# Store allowed rooms
self.rooms = rooms
# Store empty list of handlers
self.handlers = []
# If rooms is None, we should listen for invites and automatically
# accept them
if rooms is None:
self.client.add_invite_listener(self.handle_invite)
self.rooms = []
# Add all rooms we're currently in to self.rooms and add their
# callbacks
for room_id, room in self.client.get_rooms().items():
room.add_listener(self.handle_message)
self.rooms.append(room_id)
else:
# Add the message callback for all specified rooms
for room in self.rooms:
room.add_listener(self.handle_message)
# This flag can be set by the calling function to cancel all threads
# of all plugins
self.cancel = False
# Run setup
self.setup()
# Add plugins
self.add_plugins()
# Add callback for help function
self.help_handler = MRegexHandler(
self.config['triggers']['help'], self.help)
self.add_handler(self.help_handler)
# Load messages
with open(MESSAGES_LOCATION) as hjson_data:
self.messages = hjson.load(hjson_data)
def add_plugins(self):
"""Acquire list of plugins from configuration, load them,
initialize them, and add them to a list of object
"""
# Store empty list of plugins
self.plugin_objects = []
# Try to import plugins. All plugins must be located in the
# ./plugins directory
modules = []
#try:
# Loop through the available plugins, install their requirements,
# load them as module, run their setup, append them to a list,
# and add their handler variables
for i, plugin in enumerate(self.config['plugins']):
# Install requirements
self.install_requirements(plugin)
# Dynamically load the module
modules.append(
importlib.import_module(
"plugins.{0}.{0}".format(plugin), package = None))
# Run the module's setup function and save that it got installed
self.setup_plugin(modules[i])
# Create new instance of plugin and append to plugin_objects array
self.plugin_objects.append(modules[i].Plugin(self))
# Add handler of newly created instance to bot
self.add_handler(self.plugin_objects[i].handler)
#except:
# eprint("Importing one or more of the plugins did not go well!")
# sys.exit()
def check_installation_plugin(self, module_name):
"""Function returns 0 if a plugin with that name is not
currently installed
"""
# Define query to SELECT event from SQLite DB
select_sql = f"""SELECT module_name
FROM plugins
WHERE module_name = "{module_name}" """
# Open SQLite database and run query
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(select_sql)
number_fetched = sqlite_cursor.fetchall()
sqlite_db.close()
return number_fetched
def install_requirements(self, module_name):
"""Install requirements for a given plugin."""
if self.check_installation_plugin(module_name):
# Do nothing, this was already installed
print(f"Check: {module_name} already installed.")
else:
# Install requirements
pipmain(
['install',
'-r',
f'plugins/{module_name}/requirements.txt'])
def setup_plugin(self, module):
"""Run the setup for a given plugin and subsequently save that
this plugin was installed.
"""
module_name = module.__name__.split('.')[-1]
if not self.check_installation_plugin(module_name):
# This appears to be new, insert it, process requirements.txt
# and run the plugin's setup file.
print(f"Running installation of {module_name}.")
# Run module's install method
#try:
module.setup()
#except:
# print(f"{module_name} did not specify setup(). Skipping...")
# Save in database that we installed this plugin
datetime_added = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Open SQLite database
insert_sql = f"""INSERT INTO plugins(module_name, datetime_added)
VALUES('{module_name}', '{datetime_added}')"""
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(insert_sql)
sqlite_db.commit()
sqlite_db.close()
# Tell the user how awesome we are
print(f"Successfully installed {module_name}.")
def setup(self):
"""This function only runs once, ever. It initializes an SQLite
database, sets the bot's avatar, and name
"""
try:
# Try to make new directory
os.mkdir("data")
# Create SQLite database
# Define query to INSERT event table to SQLite DB
sql = """CREATE TABLE 'plugins' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'datetime_added' DATTIME,
'module_name' TEXT);"""
# Open, execute, commit, and close SQLite database
sqlite_db = sqlite3.connect(DATA_LOCATION)
sqlite_cursor = sqlite_db.cursor()
sqlite_cursor.execute(sql)
sqlite_db.commit()
sqlite_db.close()
# Get user instance
self.user = self.client.get_user(self.username)
## Set username
self.user.set_display_name(self.config['character']['name'])
## Open, upload, and set avatar
f = open(self.config['character']['avatar'], mode="rb")
avatar = self.client.upload(
f, self.config['character']['avatar_mime'])
self.user.set_avatar_url(avatar)
except FileExistsError:
# Do nothing, data directory already exists
pass
def send_message_private_public(self, room, event, message):
"""This method takes a room, event, and message and makes sure
that the message is sent in a private room and not in a public
room. If no private room exists, it will create a private room
with the sender of the event.
"""
orig_room = room
found_room = False
for room_id, room in self.client.get_rooms().items():
joined_members = room.get_joined_members()
# Check for rooms with only two members
if len(joined_members) == 2:
# Check if sender is in that room
for member in joined_members:
if event['sender'] == member.user_id:
found_room = True
if found_room:
# If the flag is set, we do not need to check further rooms
break
# Send help message to an existing room or to a new room
if found_room:
room.send_html(message)
else:
room = self.client.create_room(invitees=[event['sender']]);
room.send_html(message)
if room != orig_room:
display_name_sender = self.api.get_display_name(event['sender'])
orig_room.send_text(self.messages['private_message'].format(
display_name_sender))
def help(self, room, event):
"""Prints a general help message and then grabs all help
messages from the different plugins
"""
help_text = open(HELP_LOCATION, mode="r").read()
for plugin in self.plugin_objects:
try:
help_text += plugin.help()
except TypeError:
# The plugin probably returned 0, ignore it
pass
self.send_message_private_public(room, event, help_text)
def add_handler(self, handler):
try:
# Assume it's a list and not a single handler
for handler_obj in handler:
self.handlers.append(handler_obj)
except TypeError:
# If it is not a list, TypeError: not iterable will occur
self.handlers.append(handler)
def handle_message(self, room, event):
# Make sure we didn't send this message
if re.match(self.username, event['sender']):
return
# Loop through all installed handlers and see if they need to be called
for handler in self.handlers:
if handler.test_callback(room, event):
# This handler needs to be called
try:
handler.handle_callback(room, event)
except:
traceback.print_exc()
def handle_invite(self, room_id, state):
print("Got invite to room: " + str(room_id))
print("Joining...")
room = self.client.join_room(room_id)
# Add message callback for this room
room.add_listener(self.handle_message)
# Add room to list
self.rooms.append(room)
def start_polling(self):
# Starts polling for messages
self.client.start_listener_thread()
return self.client.sync_thread