#!/usr/bin/python3 from abc import ABCMeta import tornado.web import os import subprocess from datetime import datetime import isodate import json import util startup_timestamp = datetime.now() class HealthHandler(tornado.web.RequestHandler, metaclass=ABCMeta): # noinspection PyAttributeOutsideInit def initialize(self): self.git_version = self._load_git_version() @staticmethod def _load_git_version(): v = None # try file git-version.txt first gitversion_file = "git-version.txt" if os.path.exists(gitversion_file): with open(gitversion_file) as f: v = f.readline().strip() # if not available, try git if v is None: try: v = subprocess.check_output(["git", "describe", "--always", "--dirty"], cwd=os.path.dirname(__file__)).strip().decode() except subprocess.CalledProcessError as e: print("Checking git version lead to non-null return code ", e.returncode) return v def get(self): health = dict() health['api-version'] = 'v0' if self.git_version is not None: health['git-version'] = self.git_version health['timestamp'] = isodate.datetime_isoformat(datetime.now()) health['uptime'] = isodate.duration_isoformat(datetime.now() - startup_timestamp) self.set_header("Content-Type", "application/json") self.write(json.dumps(health, indent=4)) self.set_status(200) class Oas3Handler(tornado.web.RequestHandler, metaclass=ABCMeta): def get(self): self.set_header("Content-Type", "text/plain") # This is the proposed content type, # but browsers like Firefox try to download instead of display the content # self.set_header("Content-Type", "text/vnd.yml") with open('OAS3.yml', 'r') as f: oas3 = f.read() self.write(oas3) self.finish() class ValidateHandler(tornado.web.RequestHandler, metaclass=ABCMeta): def post(self): self.set_header("Content-Type", "application/json") entity = self.request.body.decode() # TODO call validator validation_result = { "valid": "true" } self.write(validation_result) self.finish() def make_app(): version_path = r"/v[0-9]" return tornado.web.Application([ (version_path + r"/health", HealthHandler), (version_path + r"/oas3", Oas3Handler), (version_path + r"/validate", ValidateHandler), ]) def main(): port = util.load_env('PORT', 8080) # Setup util.run_tornado_server(make_app(), server_port=port) # Teardown print("Server stopped") if __name__ == "__main__": main()