import json import os import sys import flask from flask import jsonify from flask import request from flask.ext.sqlalchemy import SQLAlchemy app = flask.Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get( "SQLALCHEMY_DATABASE_URI", "sqlite:///../tokens.db" ) app.config["SQLALCHEMY_ECHO"] = True app.config["DEBUG"] = True db = SQLAlchemy(app) class EmailToken(db.Model): """Model to store the indexed tokens""" id = db.Column(db.Integer, primary_key=True) subject = db.Column(db.String(1024)) token = db.Column(db.String(1024)) token_type = db.Column(db.String(1024)) token_metadata = db.Column(db.String(2048)) disabled = db.Column(db.Boolean, default=False) def get_token_metadata(self): if self.token_metadata: return json.loads(self.token_metadata) return None def as_dict(self): return { "id": self.id, "subject": self.subject, "token": self.token, "type": self.token_type, "metadata": self.get_token_metadata(), "disabled": self.disabled, } @classmethod def from_json(cls, data): metadata = data.get("metadata") try: metadata = json.dumps(metadata) except TypeError as err: print("Error dumping metadata", err, file=sys.stderr) return cls( subject=data.get("subject"), token=data.get("token"), token_type=data.get("type"), token_metadata=metadata, disabled=data.get("disabled", False), ) @classmethod def jsonify_all(cls, token_type=None, desc=False): query = cls.query if token_type: print("Filtering query by token type", file=sys.stderr) query = query.filter_by(token_type=token_type) if desc: query = query.order_by(cls.id.desc()) return jsonify(tokens=[token.as_dict() for token in query.all()]) @app.route("/") def check(): return "OK" @app.route("/token", methods=["POST"]) def create_tokens(): """Creates a token from posted JSON request""" new_token = EmailToken.from_json(request.get_json(force=True)) existing_token = EmailToken.query.filter_by( token=new_token.token, token_type=new_token.token_type, ).first() print( "Received token with value {} and type {}".format( new_token.token, new_token.token_type ), file=sys.stderr, ) print("Existing token? ", existing_token, file=sys.stderr) if not existing_token: print("No existing token, creating a new one", file=sys.stderr) db.session.add(new_token) db.session.commit() db.session.refresh(new_token) return jsonify(success=True, created=True, record=new_token.as_dict()) else: print("Found an existing token", file=sys.stderr) return jsonify(success=True, created=False, record=existing_token.as_dict()) @app.route("/token", methods=["GET"]) def list_all_tokens(): """Lists all tokens with an optional type filter""" token_type = request.args.get("filter_type") desc = request.args.get("desc", False) print("Asked to filter by ", token_type, file=sys.stderr) return EmailToken.jsonify_all(token_type=token_type, desc=desc) @app.route("/token/", methods=["GET"]) def get_token(token_id): """Gets a token by its primary key id""" token = EmailToken.query.get(token_id) return jsonify(token.as_dict()) if __name__ == "__main__": db.create_all() app.run(host="0.0.0.0", port=5000)