feat(github_connector): use PyGithub

This commit is contained in:
0ry5 2025-04-12 00:15:24 +02:00
parent a8a5fed56f
commit 21bae5d7e3

View file

@ -1,8 +1,7 @@
import requests
import json
from dotenv import load_dotenv from dotenv import load_dotenv
from os import getenv from os import getenv
import base64 import base64
from github import Github
load_dotenv() load_dotenv()
@ -12,60 +11,47 @@ REPO_NAME = getenv("REPO_NAME")
BASE_BRANCH = getenv("BASE_BRANCH") BASE_BRANCH = getenv("BASE_BRANCH")
GITHUB_TOKEN = getenv("GITHUB_TOKEN") GITHUB_TOKEN = getenv("GITHUB_TOKEN")
# GitHub API URL g = Github(GITHUB_TOKEN)
API_URL = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}" repo = g.get_repo(f"{REPO_OWNER}/{REPO_NAME}")
# Headers for authentication
HEADERS = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
def get_file_sha(file_path): def similar_exists(search_text):
url = f"{API_URL}/contents/{file_path}?ref={BASE_BRANCH}" contents = repo.get_contents("_events")
response = requests.get(url, headers=HEADERS)
if response.status_code == 200: for content in contents:
return response.json()["sha"] if content.type == "dir":
else: return similar_exists(repo, content.path, search_text)
raise Exception(f"Error getting file SHA: {response.json()}") elif content.type == "file":
file_content = repo.get_contents(content.path)
decoded_content = base64.b64decode(file_content.content).decode(
"utf-8", errors="ignore"
)
if search_text in decoded_content:
return True
return False
def get_file(file_path):
return repo.get_contents(file_path)
def delete_file(file_path): def delete_file(file_path):
url = f"{API_URL}/contents/{file_path}" try:
data = { repo.delete_file(
"message": f"Deleting file: {file_path}", file_path,
"sha": get_file_sha(file_path), f"Deleting file: {file_path}",
"branch": BASE_BRANCH, get_file(file_path).sha,
} BASE_BRANCH,
response = requests.delete(url, headers=HEADERS, json=data) )
except Exception as e:
if not response.status_code == 200 and not response.status_code == 404: if "404" in str(e):
raise Exception(f"Error deleting file: {response.json()}") return
def create_or_update_file(file_path, content, commit_message): def create_or_update_file(file_path, content, commit_message):
url = f"{API_URL}/contents/{file_path}" try:
b = base64.b64encode(bytes(content, "utf-8")) existing = get_file(file_path)
base64_str = b.decode("utf-8") repo.update_file(file_path, commit_message, content, existing.sha, BASE_BRANCH)
except Exception as e:
# Check if file exists if "404" in str(e):
response = requests.get(url, headers=HEADERS) repo.create_file(file_path, commit_message, content, BASE_BRANCH)
if response.status_code == 200:
sha = response.json()["sha"]
else:
sha = None
data = {
"message": commit_message,
"content": base64_str,
"branch": BASE_BRANCH,
}
if sha:
data["sha"] = sha # If file exists, update it
response = requests.put(url, headers=HEADERS, json=data)
if not response.status_code in [200, 201]:
raise Exception(f"Error committing file: {response.json()}")