#!/usr/bin/env python3 """ Gitea CI/CD Webhook Controller Listens for push events from all Gitea repos and triggers automated build & deploy. """ import json import os import subprocess import hmac import hashlib import logging import time import threading from http.server import HTTPServer, BaseHTTPRequestHandler logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logger = logging.getLogger('ci-controller') # Configuration from environment GITEA_URL = os.environ.get('GITEA_URL', 'http://gitea-http.gitea.svc:3000') GITEA_USER = os.environ.get('GITEA_USER', 'gitea_admin') GITEA_PASSWORD = os.environ.get('GITEA_PASSWORD', '') REGISTRY = os.environ.get('REGISTRY', '10.0.0.3:31427') GITOPS_REPO = os.environ.get('GITOPS_REPO', f'{GITEA_URL}/{GITEA_USER}/gitops-infra.git') WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', '') BUILD_NAMESPACE = os.environ.get('BUILD_NAMESPACE', 'build') REGISTRY_SECRET = os.environ.get('REGISTRY_SECRET', 'gitea-registry') IGNORED_REPOS = set(os.environ.get('IGNORED_REPOS', 'gitops-infra').split(',')) def verify_signature(payload, signature): """Verify Gitea webhook signature.""" if not WEBHOOK_SECRET: return True expected = hmac.HMAC(WEBHOOK_SECRET.encode(), payload, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, signature) def repo_has_dockerfile(owner, repo): """Check if repo has a Dockerfile via Gitea API.""" try: result = subprocess.run( ['curl', '-sf', '-u', f'{GITEA_USER}:{GITEA_PASSWORD}', f'{GITEA_URL}/api/v1/repos/{owner}/{repo}/contents/Dockerfile'], capture_output=True, text=True, timeout=10 ) return result.returncode == 0 except Exception as e: logger.error(f'Error checking Dockerfile: {e}') return False def gitops_app_exists(repo_name): """Check if gitops environment exists for this repo.""" kustomization = f'environments/{repo_name}/overlays/production/kustomization.yaml' try: result = subprocess.run( ['curl', '-sf', '-u', f'{GITEA_USER}:{GITEA_PASSWORD}', f'{GITEA_URL}/api/v1/repos/{GITEA_USER}/gitops-infra/contents/{kustomization}'], capture_output=True, text=True, timeout=10 ) return result.returncode == 0 except Exception: return False def create_kaniko_pod(owner, repo, short_sha): """Create a Kaniko build pod.""" pod_name = f'kaniko-{repo}' clone_url = f'{GITEA_URL}/{owner}/{repo}.git' clone_url_auth = f'http://{GITEA_USER}:{GITEA_PASSWORD}@gitea-http.gitea.svc:3000/{owner}/{repo}.git' image_dest = f'{REGISTRY}/{owner}/{repo}' # Delete existing pod if any subprocess.run( ['kubectl', 'delete', 'pod', pod_name, '-n', BUILD_NAMESPACE, '--ignore-not-found=true'], capture_output=True, timeout=30 ) time.sleep(2) manifest = f""" apiVersion: v1 kind: Pod metadata: name: {pod_name} namespace: {BUILD_NAMESPACE} labels: app: kaniko-build repo: {repo} commit: "{short_sha}" spec: restartPolicy: Never tolerations: - key: node-role.kubernetes.io/control-plane effect: NoSchedule initContainers: - name: git-clone image: alpine/git:latest command: ["sh", "-c"] args: - git clone {clone_url_auth} /workspace volumeMounts: - name: workspace mountPath: /workspace containers: - name: kaniko image: gcr.io/kaniko-project/executor:latest args: - --dockerfile=Dockerfile - --context=/workspace - "--destination={image_dest}:{short_sha}" - "--destination={image_dest}:latest" - --insecure - --insecure-pull - --skip-tls-verify - --skip-tls-verify-pull - "--insecure-registry={REGISTRY}" - --snapshot-mode=redo - --use-new-run - --compressed-caching=false resources: requests: cpu: 500m memory: 1Gi limits: cpu: "2" memory: 3Gi volumeMounts: - name: workspace mountPath: /workspace - name: docker-config mountPath: /kaniko/.docker volumes: - name: workspace emptyDir: {{}} - name: docker-config secret: secretName: {REGISTRY_SECRET} items: - key: .dockerconfigjson path: config.json """ result = subprocess.run( ['kubectl', 'apply', '-f', '-'], input=manifest, capture_output=True, text=True, timeout=30 ) if result.returncode != 0: logger.error(f'Failed to create Kaniko pod: {result.stderr}') return False logger.info(f'Kaniko pod {pod_name} created for {owner}/{repo}:{short_sha}') return True def wait_for_build(repo, timeout=900): """Wait for Kaniko build to complete.""" pod_name = f'kaniko-{repo}' deadline = time.time() + timeout while time.time() < deadline: result = subprocess.run( ['kubectl', 'get', 'pod', pod_name, '-n', BUILD_NAMESPACE, '-o', 'jsonpath={.status.phase}'], capture_output=True, text=True, timeout=10 ) phase = result.stdout.strip() if phase == 'Succeeded': logger.info(f'Build succeeded for {repo}') return True elif phase == 'Failed': logger.error(f'Build failed for {repo}') logs = subprocess.run( ['kubectl', 'logs', pod_name, '-n', BUILD_NAMESPACE, '--tail=20'], capture_output=True, text=True, timeout=10 ) logger.error(f'Build logs: {logs.stdout}') return False time.sleep(15) logger.error(f'Build timed out for {repo}') return False def update_gitops(owner, repo, short_sha): """Update gitops-infra with new image tag.""" import tempfile gitops_dir = tempfile.mkdtemp() gitops_url_auth = f'http://{GITEA_USER}:{GITEA_PASSWORD}@gitea-http.gitea.svc:3000/{GITEA_USER}/gitops-infra.git' try: # Clone gitops repo subprocess.run(['git', 'clone', gitops_url_auth, gitops_dir], capture_output=True, timeout=30, check=True) kustomization = os.path.join(gitops_dir, f'environments/{repo}/overlays/production/kustomization.yaml') if not os.path.exists(kustomization): logger.warning(f'No gitops environment for {repo}, skipping update') return False # Update image tag subprocess.run( ['sed', '-i', f's/newTag: ".*"/newTag: "{short_sha}"/', kustomization], check=True, timeout=10 ) # Commit and push subprocess.run(['git', 'config', 'user.name', 'CI Controller'], cwd=gitops_dir, check=True, timeout=5) subprocess.run(['git', 'config', 'user.email', 'ci@gitea.local'], cwd=gitops_dir, check=True, timeout=5) subprocess.run(['git', 'add', '.'], cwd=gitops_dir, check=True, timeout=5) # Check if there are changes diff = subprocess.run(['git', 'diff', '--cached', '--quiet'], cwd=gitops_dir, capture_output=True) if diff.returncode == 0: logger.info(f'No changes for {repo}, already at {short_sha}') return True subprocess.run(['git', 'commit', '-m', f'Deploy {repo} {short_sha}'], cwd=gitops_dir, check=True, timeout=10) subprocess.run(['git', 'push'], cwd=gitops_dir, check=True, timeout=30) logger.info(f'GitOps updated: {repo} -> {short_sha}') return True except subprocess.CalledProcessError as e: logger.error(f'GitOps update failed: {e}') return False finally: subprocess.run(['rm', '-rf', gitops_dir], capture_output=True) def scaffold_gitops(owner, repo): """Create gitops scaffolding for a new project.""" import tempfile gitops_dir = tempfile.mkdtemp() gitops_url_auth = f'http://{GITEA_USER}:{GITEA_PASSWORD}@gitea-http.gitea.svc:3000/{GITEA_USER}/gitops-infra.git' try: subprocess.run(['git', 'clone', gitops_url_auth, gitops_dir], capture_output=True, timeout=30, check=True) env_base = os.path.join(gitops_dir, f'environments/{repo}/base') env_prod = os.path.join(gitops_dir, f'environments/{repo}/overlays/production') apps_dir = os.path.join(gitops_dir, 'apps') os.makedirs(env_base, exist_ok=True) os.makedirs(env_prod, exist_ok=True) image = f'{REGISTRY}/{owner}/{repo}' # Base kustomization with open(os.path.join(env_base, 'kustomization.yaml'), 'w') as f: f.write(f"""apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization namespace: {repo} resources: - deployment.yaml - service.yaml commonLabels: app.kubernetes.io/managed-by: argocd """) # Deployment with open(os.path.join(env_base, 'deployment.yaml'), 'w') as f: f.write(f"""apiVersion: apps/v1 kind: Deployment metadata: name: {repo} spec: replicas: 1 selector: matchLabels: app.kubernetes.io/name: {repo} strategy: type: RollingUpdate rollingUpdate: maxUnavailable: 0 maxSurge: 1 template: metadata: labels: app.kubernetes.io/name: {repo} spec: automountServiceAccountToken: false imagePullSecrets: - name: gitea-registry containers: - name: {repo} image: {image}:latest ports: - name: http containerPort: 3000 protocol: TCP resources: requests: cpu: 100m memory: 128Mi limits: cpu: 500m memory: 512Mi startupProbe: httpGet: path: / port: http initialDelaySeconds: 10 periodSeconds: 5 failureThreshold: 30 livenessProbe: httpGet: path: / port: http periodSeconds: 30 failureThreshold: 3 readinessProbe: httpGet: path: / port: http periodSeconds: 10 failureThreshold: 2 """) # Service with open(os.path.join(env_base, 'service.yaml'), 'w') as f: f.write(f"""apiVersion: v1 kind: Service metadata: name: {repo} spec: type: ClusterIP ports: - port: 80 targetPort: http protocol: TCP name: http selector: app.kubernetes.io/name: {repo} """) # Production overlay with open(os.path.join(env_prod, 'kustomization.yaml'), 'w') as f: f.write(f"""apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - ../../base images: - name: {image} newTag: "latest" """) # ArgoCD Application with open(os.path.join(apps_dir, f'{repo}.yaml'), 'w') as f: f.write(f"""apiVersion: argoproj.io/v1alpha1 kind: Application metadata: name: {repo} namespace: argocd spec: project: infinicaretech source: repoURL: http://gitea-http.gitea.svc:3000/{GITEA_USER}/gitops-infra.git targetRevision: main path: environments/{repo}/overlays/production destination: server: https://kubernetes.default.svc namespace: {repo} syncPolicy: automated: prune: true selfHeal: true syncOptions: - CreateNamespace=true - PrunePropagationPolicy=foreground - PruneLast=true retry: limit: 3 backoff: duration: 5s factor: 2 maxDuration: 3m """) # Commit and push subprocess.run(['git', 'config', 'user.name', 'CI Controller'], cwd=gitops_dir, check=True, timeout=5) subprocess.run(['git', 'config', 'user.email', 'ci@gitea.local'], cwd=gitops_dir, check=True, timeout=5) subprocess.run(['git', 'add', '.'], cwd=gitops_dir, check=True, timeout=5) subprocess.run(['git', 'commit', '-m', f'Scaffold gitops for new project: {repo}'], cwd=gitops_dir, check=True, timeout=10) subprocess.run(['git', 'push'], cwd=gitops_dir, check=True, timeout=30) logger.info(f'GitOps scaffolding created for {repo}') return True except subprocess.CalledProcessError as e: logger.error(f'Scaffold failed: {e}') return False finally: subprocess.run(['rm', '-rf', gitops_dir], capture_output=True) def handle_push(payload): """Handle a push event.""" repo_name = payload['repository']['name'] owner = payload['repository']['owner']['login'] ref = payload.get('ref', '') after = payload.get('after', '') # Only build on push to main/master if ref not in ('refs/heads/main', 'refs/heads/master'): logger.info(f'Ignoring push to {ref} for {owner}/{repo_name}') return # Skip ignored repos if repo_name in IGNORED_REPOS: logger.info(f'Ignoring push to {repo_name} (in ignore list)') return # Skip empty commits (branch delete) if after == '0' * 40: logger.info(f'Ignoring branch delete for {owner}/{repo_name}') return short_sha = after[:7] logger.info(f'Processing push: {owner}/{repo_name}@{short_sha}') # Check if repo has a Dockerfile if not repo_has_dockerfile(owner, repo_name): logger.info(f'{owner}/{repo_name} has no Dockerfile, skipping') return # Check if gitops environment exists, if not scaffold it if not gitops_app_exists(repo_name): logger.info(f'No gitops environment for {repo_name}, creating scaffold...') if not scaffold_gitops(owner, repo_name): return # Build with Kaniko if not create_kaniko_pod(owner, repo_name, short_sha): return # Wait for build if not wait_for_build(repo_name): return # Update gitops update_gitops(owner, repo_name, short_sha) class WebhookHandler(BaseHTTPRequestHandler): def do_POST(self): content_length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_length) # Verify signature signature = self.headers.get('X-Gitea-Signature', '') if WEBHOOK_SECRET and not verify_signature(body, signature): self.send_response(403) self.end_headers() self.wfile.write(b'Invalid signature') return event = self.headers.get('X-Gitea-Event', '') if event != 'push': self.send_response(200) self.end_headers() self.wfile.write(b'OK - ignored event') return try: payload = json.loads(body) self.send_response(202) self.end_headers() self.wfile.write(b'Accepted') # Process in background thread to not block webhook response t = threading.Thread(target=handle_push, args=(payload,), daemon=True) t.start() except Exception as e: logger.error(f'Error processing webhook: {e}') self.send_response(500) self.end_headers() self.wfile.write(str(e).encode()) def do_GET(self): """Health check endpoint.""" self.send_response(200) self.end_headers() self.wfile.write(b'CI Controller OK') def log_message(self, format, *args): logger.info(f'{self.client_address[0]} - {format % args}') def setup_gitea_webhook(): """Create a system-level webhook in Gitea for all repos.""" import urllib.request import urllib.error webhook_url = 'http://ci-controller.build.svc:8080/webhook' api_url = f'{GITEA_URL}/api/v1/user/hooks' # Check existing webhooks try: req = urllib.request.Request(api_url) credentials = f'{GITEA_USER}:{GITEA_PASSWORD}' import base64 auth = base64.b64encode(credentials.encode()).decode() req.add_header('Authorization', f'Basic {auth}') resp = urllib.request.urlopen(req, timeout=10) hooks = json.loads(resp.read()) for hook in hooks: if hook.get('config', {}).get('url') == webhook_url: logger.info('Webhook already exists') return True except Exception as e: logger.warning(f'Could not check existing webhooks: {e}') # Create webhook hook_data = json.dumps({ 'type': 'gitea', 'config': { 'url': webhook_url, 'content_type': 'json', 'secret': WEBHOOK_SECRET, }, 'events': ['push'], 'active': True, }).encode() try: req = urllib.request.Request(api_url, data=hook_data, method='POST') credentials = f'{GITEA_USER}:{GITEA_PASSWORD}' import base64 auth = base64.b64encode(credentials.encode()).decode() req.add_header('Authorization', f'Basic {auth}') req.add_header('Content-Type', 'application/json') resp = urllib.request.urlopen(req, timeout=10) logger.info(f'Webhook created: {resp.status}') return True except urllib.error.HTTPError as e: logger.error(f'Failed to create webhook: {e.code} {e.read().decode()}') return False if __name__ == '__main__': port = int(os.environ.get('PORT', '8080')) # Register webhook on startup logger.info('Setting up Gitea webhook...') setup_gitea_webhook() logger.info(f'Starting CI Controller on port {port}') server = HTTPServer(('0.0.0.0', port), WebhookHandler) server.serve_forever()