import git import os import shutil import tempfile import time from util import load_env class GitManagerConfiguration: @staticmethod def from_environment(): origin = load_env("GIT_ORIGIN", None) wc_path = load_env("GIT_WC_PATH", None) git_pw = load_env("GIT_PASSWORD", None) pull_intv = load_env("GIT_PULL_INTV", None) return GitManagerConfiguration(origin=origin, git_pw=git_pw, wc_path=wc_path, pull_intv=pull_intv) def __init__(self, origin, git_pw=None, wc_path=None, pull_intv=None): if not origin: raise ValueError("Git origin cannot be empty!") self._origin = origin self._git_pw = git_pw self._wc_path = wc_path self._pull_intv = 30 if pull_intv is None else int(pull_intv) @property def origin(self): return self._origin @property def git_pw(self): return self._git_pw @property def wc_path(self): return self._wc_path @property def pull_intv(self): return self._pull_intv class GitManager: def __init__(self, configuration): if configuration is None: raise ValueError("GitManager must be initialized with a configuration!") self._configuration = configuration self._wc = None self._last_pull = 0 @property def configuration(self): return self._configuration def _setup_wc(self): if self._wc is not None: return _wc = self.configuration.wc_path if _wc is None: _wc = tempfile.mkdtemp(prefix='entities_git_') if not os.path.isdir(_wc): raise ValueError("Configured directory for the working copy does not exist!") self._wc = _wc def _teardown_wc(self): if self._wc is None: return if self.configuration.wc_path is not None: print("NOTE: Not tearing down externally configured working copy.") return shutil.rmtree(self._wc) self._wc = None def _assert_wc(self): """Assert working copy matches origin and is a valid repository. A failed assertion will throw exceptions and lead to service abort, as this error is not recoverable. Returns False if the WC path is an empty directory""" # Check if WC is empty if not os.listdir(self._wc): return False # Create a repository object # This fails if there is no valid repository repo = git.Repo(self._wc) # Assert that this is not a bare repo if repo.bare: raise ValueError("WC path points to a bare git repository!") origin = repo.remote('origin') if self.configuration.origin not in origin.urls: raise ValueError("Origin URL does not match!") # We're good here. return True def _askpass_script(self): # Passwords are impossible to store in scripts, as they may contain any character ... # We convert the password into a list of integers and create a little script # that reconstructs the password and writes it to the console. # Python will be installed anyways. pw_chars = [ord(c) for c in self.configuration.git_pw] script = "#!/usr/bin/env python3\n" script += "l = %s\n" % str(list(pw_chars)) script += "p = [chr(c) for c in l]\n" script += f"print(\"\".join(p))\n" return script def _init_repo(self): # Assert working copy is valid, # return false if cloning is necessary if not self._assert_wc(): print("Cloning new git working copy ...") # Create a temporary script file for GIT_ASKPASS with tempfile.NamedTemporaryFile(mode='w+t') as askpass: askpass.write(self._askpass_script()) askpass.file.close() os.chmod(path=askpass.name, mode=0o700) self.repo = git.Repo.clone_from(url=self.configuration.origin, to_path=self._wc, env={'GIT_ASKPASS': askpass.name}) else: print("Reusing existing git working copy ...") self.repo = git.Repo(self._wc) def setup(self): self._setup_wc() self._init_repo() self.pull(force=True) def teardown(self): self._teardown_wc() def printout(self): print("Git Manager:") print(f"\tGit origin is %s" % self.configuration.origin) print(f"\tUsing working copy path %s" % self._wc) if not self._wc == self.configuration.wc_path: print("\tUsing a temporary working copy.") @property def head_sha(self): return None if self.repo is None else self.repo.head.object.hexsha def pull(self, force=False): """Pull from origin. Arguments: `force` -- Do a pull even though the pull interval has not elapsed Returns: True if pull was executed """ if not force and (time.time() - self._last_pull < self.configuration.pull_intv): return False self._last_pull = time.time() old_head = self.head_sha # get the origin # (We verified during initialization that this origin exists.) origin = self.repo.remote('origin') origin.pull(rebase=True) return self.head_sha != old_head