""" Telematrix App service for Matrix to bridge a room with a Telegram group. """ import asyncio import html import json import logging import mimetypes from datetime import datetime from time import time from urllib.parse import unquote, quote, urlparse, parse_qs from aiohttp import web, ClientSession from aiotg import Bot from bs4 import BeautifulSoup import database as db # Read the configuration file try: with open('config.json', 'r') as config_file: CONFIG = json.load(config_file) HS_TOKEN = CONFIG['tokens']['hs'] AS_TOKEN = CONFIG['tokens']['as'] TG_TOKEN = CONFIG['tokens']['telegram'] try: GOOGLE_TOKEN = CONFIG['tokens']['google'] except KeyError: GOOGLE_TOKEN = None MATRIX_HOST = CONFIG['hosts']['internal'] MATRIX_HOST_EXT = CONFIG['hosts']['external'] MATRIX_HOST_BARE = CONFIG['hosts']['bare'] MATRIX_PREFIX = MATRIX_HOST + '_matrix/client/r0/' MATRIX_MEDIA_PREFIX = MATRIX_HOST + '_matrix/media/r0/' USER_ID_FORMAT = CONFIG['user_id_format'] TELEGRAM_CHATS = CONFIG['chats'] MATRIX_ROOMS = {v: k for k, v in TELEGRAM_CHATS.items()} DATABASE_URL = CONFIG['db_url'] except (OSError, IOError) as exception: print('Error opening config file:') print(exception) exit(1) GOO_GL_URL = 'https://www.googleapis.com/urlshortener/v1/url' TG_BOT = Bot(api_token=TG_TOKEN) MATRIX_SESS = ClientSession() SHORTEN_SESS = ClientSession() def create_response(code, obj): """ Create an HTTP response with a JSON body. :param code: The status code of the response. :param obj: The object to serialize and include in the response. :return: A web.Response. """ return web.Response(text=json.dumps(obj), status=code, content_type='application/json', charset='utf-8') VALID_TAGS = ['b', 'strong', 'i', 'em', 'a', 'pre'] def sanitize_html(string): """ Sanitize an HTML string for the Telegram bot API. :param string: The HTML string to sanitized. :return: The sanitized HTML string. """ string = string.replace('
', '\n').replace('
', '\n') \ .replace('
', '\n') soup = BeautifulSoup(string, 'html.parser') for tag in soup.find_all(True): if tag.name == 'blockquote': tag.string = ('\n' + tag.text).replace('\n', '\n> ').rstrip('\n>') if tag.name not in VALID_TAGS: tag.hidden = True return soup.renderContents().decode('utf-8') def format_matrix_msg(form, username, content): """ Formats a matrix message for sending to Telegram :param form: The format string of the message, where the first parameter is the username and the second one the message. :param username: The username of the user. :param content: The content to be sent. :return: The formatted string. """ if 'format' in content and content['format'] == 'org.matrix.custom.html': sanitized = sanitize_html(content['formatted_body']) return html.escape(form).format(username, sanitized), 'HTML' else: return form.format(username, content['body']), None async def download_matrix_file(url, filename): """ Download a file from an MXC URL to /tmp/{filename} :param url: The MXC URL to download from. :param filename: The filename in /tmp/ to download into. """ m_url = MATRIX_MEDIA_PREFIX + 'download/{}{}'.format(url.netloc, url.path) async with MATRIX_SESS.get(m_url) as response: data = await response.read() with open('/tmp/{}'.format(filename), 'wb') as file: file.write(data) async def shorten_url(url): """ Shorten an URL using goo.gl. Returns the original URL if it fails. :param url: The URL to shorten. :return: The shortened URL. """ if not GOOGLE_TOKEN: return url headers = {'Content-Type': 'application/json'} async with SHORTEN_SESS.post(GOO_GL_URL, params={'key': GOOGLE_TOKEN}, data=json.dumps({'longUrl': url}), headers=headers) \ as response: obj = await response.json() if 'id' in obj: return obj['id'] else: return url def matrix_is_telegram(user_id): username = user_id.split(':')[0][1:] return username.startswith('telegram_') def get_username(user_id): return user_id.split(':')[0][1:] async def matrix_transaction(request): """ Handle a transaction sent by the homeserver. :param request: The request containing the transaction. :return: The response to send. """ body = await request.json() events = body['events'] for event in events: print(event) if event['type'] == 'm.room.aliases': aliases = event['content']['aliases'] links = db.session.query(db.ChatLink)\ .filter_by(matrix_room=event['room_id']).all() for link in links: db.session.delete(link) for alias in aliases: print(alias) if alias.split('_')[0] != '#telegram' \ or alias.split(':')[-1] != MATRIX_HOST_BARE: continue tg_id = alias.split('_')[1].split(':')[0] link = db.ChatLink(event['room_id'], tg_id, True) db.session.add(link) db.session.commit() continue link = db.session.query(db.ChatLink)\ .filter_by(matrix_room=event['room_id']).first() if not link: print('{} isn\'t linked!'.format(event['room_id'])) continue group = TG_BOT.group(link.tg_room) if event['type'] == 'm.room.message': user_id = event['user_id'] if matrix_is_telegram(user_id): continue sender = db.session.query(db.MatrixUser)\ .filter_by(matrix_id=user_id).first() if not sender: response = await matrix_get('client', 'profile/{}/displayname' .format(user_id), None) try: displayname = response['displayname'] except KeyError: displayname = get_username(user_id) sender = db.MatrixUser(user_id, displayname) db.session.add(sender) else: displayname = sender.name or get_username(user_id) content = event['content'] if content['msgtype'] == 'm.text': msg, mode = format_matrix_msg('<{}> {}', displayname, content) await group.send_text(msg, parse_mode=mode) elif content['msgtype'] == 'm.notice': msg, mode = format_matrix_msg('[{}] {}', displayname, content) await group.send_text(msg, parse_mode=mode) elif content['msgtype'] == 'm.emote': msg, mode = format_matrix_msg('* {} {}', displayname, content) await group.send_text(msg, parse_mode=mode) elif content['msgtype'] == 'm.image': url = urlparse(content['url']) await download_matrix_file(url, content['body']) with open('/tmp/{}'.format(content['body']), 'rb') as img_file: url_str = MATRIX_HOST_EXT + \ '_matrix/media/r0/download/{}{}' \ .format(url.netloc, quote(url.path)) url_str = await shorten_url(url_str) caption = '<{}> {} ({})'.format(displayname, content['body'], url_str) await group.send_photo(img_file, caption=caption) else: print('Unsupported message type {}'.format(content['msgtype'])) print(json.dumps(content, indent=4)) elif event['type'] == 'm.room.member': if matrix_is_telegram(event['state_key']): continue user_id = event['state_key'] content = event['content'] sender = db.session.query(db.MatrixUser)\ .filter_by(matrix_id=user_id).first() if sender: displayname = sender.name else: displayname = get_username(user_id) if content['membership'] == 'join': displayname = content['displayname'] or get_username(user_id) if not sender: sender = db.MatrixUser(user_id, displayname) else: sender.name = displayname db.session.add(sender) msg = '> {} has joined the room'.format(displayname) await group.send_text(msg) elif content['membership'] == 'leave': msg = '< {} has left the room'.format(displayname) await group.send_text(msg) elif content['membership'] == 'ban': msg = '{}'.format(x) for x in message.split('\n')]) quoted_msg = 'Forwarded from {}:\n{}' \ .format(msg_from, quoted_msg) quoted_html = '
{}
' \ .format(html.escape(message).replace('\n', '
')) quoted_html = 'Forwarded from {}:\n{}' \ .format(html.escape(msg_from), quoted_html) j = await send_matrix_message(room_id, user_id, txn_id, body=quoted_msg, formatted_body=quoted_html, format='org.matrix.custom.html', msgtype='m.text') elif 'reply_to_message' in chat.message: re_msg = chat.message['reply_to_message'] if 'last_name' in re_msg['from']: msg_from = '{} {} (Telegram)'.format(re_msg['from']['first_name'], re_msg['from']['last_name']) else: msg_from = '{} (Telegram)'.format(re_msg['from']['first_name']) date = datetime.fromtimestamp(re_msg['date']) \ .strftime('%Y-%m-%d %H:%M:%S') quoted_msg = '\n'.join(['>{}'.format(x) for x in re_msg['text'].split('\n')]) quoted_msg = 'Reply to {} ({}):\n{}\n\n{}' \ .format(msg_from, date, quoted_msg, message) html_message = html.escape(message).replace('\n', '
') quoted_html = '
{}
' \ .format(html.escape(re_msg['text']) .replace('\n', '
')) quoted_html = 'Reply to {} ({}):
{}

{}

' \ .format(html.escape(msg_from), html.escape(str(date)), quoted_html, html_message) j = await send_matrix_message(room_id, user_id, txn_id, body=quoted_msg, formatted_body=quoted_html, format='org.matrix.custom.html', msgtype='m.text') else: j = await send_matrix_message(room_id, user_id, txn_id, body=message, msgtype='m.text') if 'errcode' in j and j['errcode'] == 'M_FORBIDDEN': await asyncio.sleep(0.1) await register_join_matrix(chat, room_id, user_id) await asyncio.sleep(0.1) await send_matrix_message(room_id, user_id, txn_id, body=message, msgtype='m.text') def main(): """ Main function to get the entire ball rolling. """ logging.basicConfig(level=logging.WARNING) db.initialize(DATABASE_URL) loop = asyncio.get_event_loop() asyncio.ensure_future(TG_BOT.loop()) app = web.Application(loop=loop) app.router.add_route('GET', '/rooms/{room_alias}', matrix_room) app.router.add_route('PUT', '/transactions/{transaction}', matrix_transaction) web.run_app(app, port=5000) if __name__ == "__main__": main()