entities_service/gitmgr.py

189 lines
5.5 KiB
Python
Raw Normal View History

import git
import os
import shutil
import tempfile
2021-02-12 16:33:11 +01:00
import time
from util import load_env
class GitManagerConfiguration:
@staticmethod
def from_environment():
origin = load_env("GIT_ORIGIN", None)
wc_path = load_env("GIT_WC_PATH", None)
git_pw = load_env("GIT_PASSWORD", None)
2021-02-12 16:33:11 +01:00
pull_intv = load_env("GIT_PULL_INTV", None)
return GitManagerConfiguration(origin=origin,
git_pw=git_pw,
2021-02-12 16:33:11 +01:00
wc_path=wc_path,
pull_intv=pull_intv)
2021-02-12 16:33:11 +01:00
def __init__(self, origin, git_pw=None, wc_path=None, pull_intv=None):
if not origin:
raise ValueError("Git origin cannot be empty!")
self._origin = origin
self._git_pw = git_pw
self._wc_path = wc_path
2021-02-12 16:33:11 +01:00
self._pull_intv = 30 if pull_intv is None else int(pull_intv)
@property
def origin(self):
return self._origin
@property
def git_pw(self):
return self._git_pw
@property
def wc_path(self):
return self._wc_path
2021-02-12 16:33:11 +01:00
@property
def pull_intv(self):
return self._pull_intv
class GitManager:
def __init__(self, configuration):
if configuration is None:
raise ValueError("GitManager must be initialized with a configuration!")
self._configuration = configuration
self._wc = None
2021-02-12 16:33:11 +01:00
self._last_pull = 0
@property
def configuration(self):
return self._configuration
def _setup_wc(self):
if self._wc is not None:
return
_wc = self.configuration.wc_path
if _wc is None:
_wc = tempfile.mkdtemp(prefix='entities_git_')
if not os.path.isdir(_wc):
raise ValueError("Configured directory for the working copy does not exist!")
self._wc = _wc
def _teardown_wc(self):
if self._wc is None:
return
if self.configuration.wc_path is not None:
print("NOTE: Not tearing down externally configured working copy.")
return
shutil.rmtree(self._wc)
self._wc = None
def _assert_wc(self):
"""Assert working copy matches origin and is a valid repository.
A failed assertion will throw exceptions and lead to service abort,
as this error is not recoverable.
Returns False if the WC path is an empty directory"""
# Check if WC is empty
if not os.listdir(self._wc):
return False
# Create a repository object
# This fails if there is no valid repository
repo = git.Repo(self._wc)
# Assert that this is not a bare repo
if repo.bare:
raise ValueError("WC path points to a bare git repository!")
origin = repo.remote('origin')
if self.configuration.origin not in origin.urls:
raise ValueError("Origin URL does not match!")
# We're good here.
return True
def _askpass_script(self):
# Passwords are impossible to store in scripts, as they may contain any character ...
# We convert the password into a list of integers and create a little script
# that reconstructs the password and writes it to the console.
# Python will be installed anyways.
pw_chars = [ord(c) for c in self.configuration.git_pw]
script = "#!/usr/bin/env python3\n"
script += "l = %s\n" % str(list(pw_chars))
script += "p = [chr(c) for c in l]\n"
script += f"print(\"\".join(p))\n"
return script
def _init_repo(self):
# Assert working copy is valid,
# return false if cloning is necessary
if not self._assert_wc():
print("Cloning new git working copy ...")
# Create a temporary script file for GIT_ASKPASS
with tempfile.NamedTemporaryFile(mode='w+t') as askpass:
askpass.write(self._askpass_script())
askpass.file.close()
os.chmod(path=askpass.name, mode=0o700)
self.repo = git.Repo.clone_from(url=self.configuration.origin,
to_path=self._wc,
env={'GIT_ASKPASS': askpass.name})
else:
print("Reusing existing git working copy ...")
self.repo = git.Repo(self._wc)
def setup(self):
self._setup_wc()
self._init_repo()
2021-02-12 16:33:11 +01:00
self.pull(force=True)
def teardown(self):
self._teardown_wc()
def printout(self):
print("Git Manager:")
print(f"\tGit origin is %s" % self.configuration.origin)
print(f"\tUsing working copy path %s" % self._wc)
if not self._wc == self.configuration.wc_path:
print("\tUsing a temporary working copy.")
2021-02-12 16:33:11 +01:00
@property
def head_sha(self):
return None if self.repo is None else self.repo.head.object.hexsha
def pull(self, force=False):
"""Pull from origin.
Arguments:
`force` -- Do a pull even though the pull interval has not elapsed
Returns: True if pull was executed
"""
if not force and (time.time() - self._last_pull < self.configuration.pull_intv):
return False
self._last_pull = time.time()
old_head = self.head_sha
# get the origin
# (We verified during initialization that this origin exists.)
origin = self.repo.remote('origin')
origin.pull(rebase=True)
return self.head_sha != old_head