Add the GitManager with configuration and repository setup
This commit is contained in:
parent
186e5ac2ab
commit
234421fb64
1 changed files with 150 additions and 0 deletions
150
gitmgr.py
Normal file
150
gitmgr.py
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
import git
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from util import load_env
|
||||||
|
|
||||||
|
|
||||||
|
class GitManagerConfiguration:
|
||||||
|
@staticmethod
|
||||||
|
def from_environment():
|
||||||
|
origin = load_env("GIT_ORIGIN", None)
|
||||||
|
wc_path = load_env("GIT_WC_PATH", None)
|
||||||
|
git_pw = load_env("GIT_PASSWORD", None)
|
||||||
|
|
||||||
|
return GitManagerConfiguration(origin=origin,
|
||||||
|
git_pw=git_pw,
|
||||||
|
wc_path=wc_path)
|
||||||
|
|
||||||
|
def __init__(self, origin, git_pw=None, wc_path=None):
|
||||||
|
if not origin:
|
||||||
|
raise ValueError("Git origin cannot be empty!")
|
||||||
|
|
||||||
|
self._origin = origin
|
||||||
|
self._git_pw = git_pw
|
||||||
|
self._wc_path = wc_path
|
||||||
|
|
||||||
|
@property
|
||||||
|
def origin(self):
|
||||||
|
return self._origin
|
||||||
|
|
||||||
|
@property
|
||||||
|
def git_pw(self):
|
||||||
|
return self._git_pw
|
||||||
|
|
||||||
|
@property
|
||||||
|
def wc_path(self):
|
||||||
|
return self._wc_path
|
||||||
|
|
||||||
|
|
||||||
|
class GitManager:
|
||||||
|
def __init__(self, configuration):
|
||||||
|
if configuration is None:
|
||||||
|
raise ValueError("GitManager must be initialized with a configuration!")
|
||||||
|
|
||||||
|
self._configuration = configuration
|
||||||
|
self._wc = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def configuration(self):
|
||||||
|
return self._configuration
|
||||||
|
|
||||||
|
def _setup_wc(self):
|
||||||
|
if self._wc is not None:
|
||||||
|
return
|
||||||
|
|
||||||
|
_wc = self.configuration.wc_path
|
||||||
|
|
||||||
|
if _wc is None:
|
||||||
|
_wc = tempfile.mkdtemp(prefix='entities_git_')
|
||||||
|
|
||||||
|
if not os.path.isdir(_wc):
|
||||||
|
raise ValueError("Configured directory for the working copy does not exist!")
|
||||||
|
|
||||||
|
self._wc = _wc
|
||||||
|
|
||||||
|
def _teardown_wc(self):
|
||||||
|
if self._wc is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.configuration.wc_path is not None:
|
||||||
|
print("NOTE: Not tearing down externally configured working copy.")
|
||||||
|
return
|
||||||
|
|
||||||
|
shutil.rmtree(self._wc)
|
||||||
|
|
||||||
|
self._wc = None
|
||||||
|
|
||||||
|
def _assert_wc(self):
|
||||||
|
"""Assert working copy matches origin and is a valid repository.
|
||||||
|
|
||||||
|
A failed assertion will throw exceptions and lead to service abort,
|
||||||
|
as this error is not recoverable.
|
||||||
|
|
||||||
|
Returns False if the WC path is an empty directory"""
|
||||||
|
|
||||||
|
# Check if WC is empty
|
||||||
|
if not os.listdir(self._wc):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Create a repository object
|
||||||
|
# This fails if there is no valid repository
|
||||||
|
repo = git.Repo(self._wc)
|
||||||
|
|
||||||
|
# Assert that this is not a bare repo
|
||||||
|
if repo.bare:
|
||||||
|
raise ValueError("WC path points to a bare git repository!")
|
||||||
|
|
||||||
|
origin = repo.remote('origin')
|
||||||
|
if self.configuration.origin not in origin.urls:
|
||||||
|
raise ValueError("Origin URL does not match!")
|
||||||
|
|
||||||
|
# We're good here.
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _askpass_script(self):
|
||||||
|
# Passwords are impossible to store in scripts, as they may contain any character ...
|
||||||
|
# We convert the password into a list of integers and create a little script
|
||||||
|
# that reconstructs the password and writes it to the console.
|
||||||
|
# Python will be installed anyways.
|
||||||
|
|
||||||
|
pw_chars = [ord(c) for c in self.configuration.git_pw]
|
||||||
|
|
||||||
|
script = "#!/usr/bin/env python3\n"
|
||||||
|
script += "l = %s\n" % str(list(pw_chars))
|
||||||
|
script += "p = [chr(c) for c in l]\n"
|
||||||
|
script += f"print(\"\".join(p))\n"
|
||||||
|
return script
|
||||||
|
|
||||||
|
def _init_repo(self):
|
||||||
|
# Assert working copy is valid,
|
||||||
|
# return false if cloning is necessary
|
||||||
|
if not self._assert_wc():
|
||||||
|
print("Cloning new git working copy ...")
|
||||||
|
|
||||||
|
# Create a temporary script file for GIT_ASKPASS
|
||||||
|
with tempfile.NamedTemporaryFile(mode='w+t') as askpass:
|
||||||
|
askpass.write(self._askpass_script())
|
||||||
|
askpass.file.close()
|
||||||
|
os.chmod(path=askpass.name, mode=0o700)
|
||||||
|
self.repo = git.Repo.clone_from(url=self.configuration.origin,
|
||||||
|
to_path=self._wc,
|
||||||
|
env={'GIT_ASKPASS': askpass.name})
|
||||||
|
else:
|
||||||
|
print("Reusing existing git working copy ...")
|
||||||
|
self.repo = git.Repo(self._wc)
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self._setup_wc()
|
||||||
|
self._init_repo()
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
self._teardown_wc()
|
||||||
|
|
||||||
|
def printout(self):
|
||||||
|
print("Git Manager:")
|
||||||
|
print(f"\tGit origin is %s" % self.configuration.origin)
|
||||||
|
print(f"\tUsing working copy path %s" % self._wc)
|
||||||
|
if not self._wc == self.configuration.wc_path:
|
||||||
|
print("\tUsing a temporary working copy.")
|
Loading…
Reference in a new issue