Batch querying every 10 min and processing new messages

This commit is contained in:
IamTheFij 2018-02-08 09:36:09 -08:00
parent 814f57a2e4
commit 563e75f2b6
2 changed files with 61 additions and 15 deletions

View File

@ -2,10 +2,13 @@ from datetime import date
from datetime import datetime
from datetime import timedelta
from getpass import getpass
from time import sleep
import email
import json
import os
from dateutil import parser
from dateutil.tz import tzutc
from imbox import Imbox
import requests
@ -23,6 +26,7 @@ class MailCrawler(object):
self.imap_pass = os.environ['IMAP_PASS']
def get_parsers(self):
"""Retrieves a list of parser hosts"""
if self.parser_hosts is None:
self.parser_hosts = []
parser_format = 'PARSER_{}'
@ -36,6 +40,7 @@ class MailCrawler(object):
return self.parser_hosts
def parse_message(self, message):
"""Parses tokens from an email message"""
text = self.get_email_text(message)
if not text:
print('No email text returned')
@ -57,6 +62,7 @@ class MailCrawler(object):
return results
def get_server(self):
"""Returns an active IMAP server"""
return Imbox(
self.imap_url,
username=self.imap_user,
@ -65,13 +71,15 @@ class MailCrawler(object):
)
def get_email_text(self, message):
"""Retrieves the text body of an email message"""
body = message.body.get('plain') or message.body.get('html')
if not body:
return None
# Concat all known body content together since it doesn't really matter
return ''.join([text for text in body if isinstance(text, str)])
def index_message(self, message):
def index_token(self, message):
"""Sends a token from the parser to the indexer"""
response = requests.post(
self.indexer_host+'/token',
json=message,
@ -79,24 +87,61 @@ class MailCrawler(object):
response.raise_for_status()
return response.json()
def process_message(self, message):
"""Process a single email message"""
for result in self.parse_message(message):
result.update({
'subject': message.subject,
})
print('Parsed result: ', result)
print('Indexed result: ', self.index_token(result))
def process_messages(self, server, since_date, last_message=0):
for uid, message in server.messages(date__gt=since_date):
uid = int(uid)
if uid <= last_message:
print('DDB Already seen message with uid {}. Skipping'.format(uid))
continue
print(
'Processing message uid {} message_id {} '
'with subject "{}"'.format(
uid, message.message_id, message.subject
)
)
self.process_message(message)
# Update since_date
message_date = parser.parse(message.date)
print('DDB Processed message. Message date: {} Old date: {}'.format(
message_date, since_date
))
since_date = max(since_date, message_date)
print('DDB Since date is now ', since_date)
last_message = max(uid, last_message)
return since_date, last_message
def run(self):
print('Starting crawler')
# TODO: Put server into some kind of context manager and property
with self.get_server() as server:
since_date = datetime.now() - timedelta(days=30)
for uid, message in server.messages(date__gt=since_date):
print(
'Processing message uid {} message_id {} '
'with subject "{}"'.format(
uid, message.message_id, message.subject
)
# TODO: parameterize startup date, maybe relative
since_date = datetime.now(tzutc()) - timedelta(days=1)
last_message = 0
while True:
print('Lets process')
since_date, last_message = self.process_messages(
server,
since_date,
last_message=last_message
)
for result in self.parse_message(message):
result.update({
'subject': message.subject,
})
print('Parsed result: ', result)
print('Indexed result: ', self.index_message(result))
print('DDB Processed all. New since_date', since_date)
# TODO: parameterize sleep
# Sleep for 10 min
sleep(10 * 60)
if __name__ == '__main__':

View File

@ -1,2 +1,3 @@
python-dateutil
imbox
requests