From 563e75f2b6561672a565dfc5718c247b733b5f8a Mon Sep 17 00:00:00 2001 From: Ian Fijolek Date: Thu, 8 Feb 2018 09:36:09 -0800 Subject: [PATCH] Batch querying every 10 min and processing new messages --- crawler/crawler/main.py | 75 ++++++++++++++++++++++++++++++++-------- crawler/requirements.txt | 1 + 2 files changed, 61 insertions(+), 15 deletions(-) diff --git a/crawler/crawler/main.py b/crawler/crawler/main.py index f1947e1..3f3f56b 100644 --- a/crawler/crawler/main.py +++ b/crawler/crawler/main.py @@ -2,10 +2,13 @@ from datetime import date from datetime import datetime from datetime import timedelta from getpass import getpass +from time import sleep import email import json import os +from dateutil import parser +from dateutil.tz import tzutc from imbox import Imbox import requests @@ -23,6 +26,7 @@ class MailCrawler(object): self.imap_pass = os.environ['IMAP_PASS'] def get_parsers(self): + """Retrieves a list of parser hosts""" if self.parser_hosts is None: self.parser_hosts = [] parser_format = 'PARSER_{}' @@ -36,6 +40,7 @@ class MailCrawler(object): return self.parser_hosts def parse_message(self, message): + """Parses tokens from an email message""" text = self.get_email_text(message) if not text: print('No email text returned') @@ -57,6 +62,7 @@ class MailCrawler(object): return results def get_server(self): + """Returns an active IMAP server""" return Imbox( self.imap_url, username=self.imap_user, @@ -65,13 +71,15 @@ class MailCrawler(object): ) def get_email_text(self, message): + """Retrieves the text body of an email message""" body = message.body.get('plain') or message.body.get('html') if not body: return None # Concat all known body content together since it doesn't really matter return ''.join([text for text in body if isinstance(text, str)]) - def index_message(self, message): + def index_token(self, message): + """Sends a token from the parser to the indexer""" response = requests.post( self.indexer_host+'/token', json=message, @@ -79,24 +87,61 @@ class MailCrawler(object): response.raise_for_status() return response.json() + def process_message(self, message): + """Process a single email message""" + for result in self.parse_message(message): + result.update({ + 'subject': message.subject, + }) + print('Parsed result: ', result) + print('Indexed result: ', self.index_token(result)) + + + def process_messages(self, server, since_date, last_message=0): + for uid, message in server.messages(date__gt=since_date): + uid = int(uid) + if uid <= last_message: + print('DDB Already seen message with uid {}. Skipping'.format(uid)) + continue + + print( + 'Processing message uid {} message_id {} ' + 'with subject "{}"'.format( + uid, message.message_id, message.subject + ) + ) + self.process_message(message) + + # Update since_date + message_date = parser.parse(message.date) + print('DDB Processed message. Message date: {} Old date: {}'.format( + message_date, since_date + )) + since_date = max(since_date, message_date) + print('DDB Since date is now ', since_date) + last_message = max(uid, last_message) + + return since_date, last_message + + def run(self): print('Starting crawler') - + # TODO: Put server into some kind of context manager and property with self.get_server() as server: - since_date = datetime.now() - timedelta(days=30) - for uid, message in server.messages(date__gt=since_date): - print( - 'Processing message uid {} message_id {} ' - 'with subject "{}"'.format( - uid, message.message_id, message.subject - ) + # TODO: parameterize startup date, maybe relative + since_date = datetime.now(tzutc()) - timedelta(days=1) + last_message = 0 + while True: + print('Lets process') + since_date, last_message = self.process_messages( + server, + since_date, + last_message=last_message ) - for result in self.parse_message(message): - result.update({ - 'subject': message.subject, - }) - print('Parsed result: ', result) - print('Indexed result: ', self.index_message(result)) + print('DDB Processed all. New since_date', since_date) + # TODO: parameterize sleep + # Sleep for 10 min + sleep(10 * 60) if __name__ == '__main__': diff --git a/crawler/requirements.txt b/crawler/requirements.txt index ad6cbc5..893e7a4 100644 --- a/crawler/requirements.txt +++ b/crawler/requirements.txt @@ -1,2 +1,3 @@ +python-dateutil imbox requests