""" modules/cloud_backup.py Cloudflare R2 cloud backup integration via boto3 (S3-compatible API) """ import os import time import threading from datetime import datetime, timezone try: import boto3 from botocore.exceptions import ClientError, NoCredentialsError BOTO3_AVAILABLE = True except ImportError: BOTO3_AVAILABLE = False # Upload progress tracking (in-memory) _upload_jobs: dict = {} def _get_r2_config() -> dict: """Read R2 config fresh from env vars every time (not cached at import).""" return { "account_id": os.environ.get("R2_ACCOUNT_ID", "35e00c230cc8066252a2d9890b69aea2"), "access_key_id": os.environ.get("R2_ACCESS_KEY_ID", ""), "secret_access_key": os.environ.get("R2_SECRET_ACCESS_KEY", ""), "bucket_name": os.environ.get("R2_BUCKET_NAME", "navitrends-backups"), } # Module-level name used in app.py template rendering R2_BUCKET_NAME = os.environ.get("R2_BUCKET_NAME", "navitrends-backups") def _get_r2_client(): """Return a boto3 S3 client pointed at Cloudflare R2.""" if not BOTO3_AVAILABLE: raise RuntimeError("boto3 is not installed. Run: pip install boto3") cfg = _get_r2_config() if not cfg["access_key_id"] or not cfg["secret_access_key"]: raise RuntimeError( "R2 credentials not configured. " "Set R2_ACCESS_KEY_ID and R2_SECRET_ACCESS_KEY environment variables." ) return boto3.client( "s3", endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com", aws_access_key_id=cfg["access_key_id"], aws_secret_access_key=cfg["secret_access_key"], region_name="auto", ) def r2_is_configured() -> bool: """Return True only if credentials are set and boto3 is available.""" cfg = _get_r2_config() return BOTO3_AVAILABLE and bool(cfg["access_key_id"]) and bool(cfg["secret_access_key"]) def r2_test_connection() -> dict: if not BOTO3_AVAILABLE: return {"success": False, "message": "boto3 not installed (pip install boto3)", "bucket_exists": False} cfg = _get_r2_config() if not cfg["access_key_id"] or not cfg["secret_access_key"]: return {"success": False, "message": "R2 credentials not set", "bucket_exists": False} try: client = _get_r2_client() resp = client.list_buckets() buckets = [b["Name"] for b in resp.get("Buckets", [])] bucket = cfg["bucket_name"] exists = bucket in buckets return { "success": True, "message": f"Connected to R2. Bucket '{bucket}': {'exists' if exists else 'not created yet'}", "bucket_exists": exists, "buckets": buckets, } except NoCredentialsError: return {"success": False, "message": "Invalid R2 credentials", "bucket_exists": False} except Exception as e: return {"success": False, "message": str(e), "bucket_exists": False} def r2_ensure_bucket() -> tuple[bool, str]: cfg = _get_r2_config() bucket = cfg["bucket_name"] try: client = _get_r2_client() try: client.head_bucket(Bucket=bucket) return True, f"Bucket '{bucket}' already exists" except ClientError as e: if e.response["Error"]["Code"] in ("404", "NoSuchBucket"): client.create_bucket(Bucket=bucket) return True, f"Bucket '{bucket}' created successfully" raise except Exception as e: return False, f"Failed to ensure bucket: {e}" def r2_list_backups() -> list[dict]: if not r2_is_configured(): return [] cfg = _get_r2_config() bucket = cfg["bucket_name"] try: client = _get_r2_client() resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/") objects = [] for obj in resp.get("Contents", []): # Skip .sha256 checksum files — those are audit files, not backups if obj["Key"].endswith(".sha256"): continue size = obj["Size"] objects.append({ "key": obj["Key"], "name": obj["Key"].replace("backups/", ""), "size": size, "size_human": _human_size(size), "last_modified": obj["LastModified"].isoformat(), "last_modified_str": obj["LastModified"].strftime("%Y-%m-%d %H:%M UTC"), }) return sorted(objects, key=lambda x: x["last_modified"], reverse=True) except Exception: return [] def r2_delete_backup(key: str) -> tuple[bool, str]: cfg = _get_r2_config() bucket = cfg["bucket_name"] try: client = _get_r2_client() client.delete_object(Bucket=bucket, Key=key) return True, f"Deleted {key} from R2" except Exception as e: return False, str(e) def r2_get_bucket_stats() -> dict: if not r2_is_configured(): return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": False} cfg = _get_r2_config() bucket = cfg["bucket_name"] try: client = _get_r2_client() resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/") # Count and size only real archives, not .sha256 checksum files objects = [o for o in resp.get("Contents", []) if not o["Key"].endswith(".sha256")] total = sum(o["Size"] for o in objects) return { "count": len(objects), "total_size": total, "total_size_human": _human_size(total), "configured": True, } except Exception: return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": True} class _ProgressCallback: def __init__(self, job_id: str, file_size: int): self._job_id = job_id self._size = file_size self._uploaded = 0 self._lock = threading.Lock() def __call__(self, bytes_amount: int): with self._lock: self._uploaded += bytes_amount pct = int(self._uploaded * 100 / self._size) if self._size else 0 if self._job_id in _upload_jobs: _upload_jobs[self._job_id]["uploaded"] = self._uploaded _upload_jobs[self._job_id]["progress"] = pct _upload_jobs[self._job_id]["log"].append( f" {_human_size(self._uploaded)} / {_human_size(self._size)} ({pct}%)" ) def r2_upload_async(local_path: str, job_id: str) -> None: _upload_jobs[job_id] = { "status": "running", "log": [f"Starting upload: {os.path.basename(local_path)}"], "progress": 0, "uploaded": 0, "started": time.time(), } try: if not r2_is_configured(): raise RuntimeError("R2 credentials not configured") cfg = _get_r2_config() bucket = cfg["bucket_name"] ok, msg = r2_ensure_bucket() if not ok: raise RuntimeError(msg) _upload_jobs[job_id]["log"].append(f"OK: {msg}") file_size = os.path.getsize(local_path) object_key = f"backups/{os.path.basename(local_path)}" client = _get_r2_client() callback = _ProgressCallback(job_id, file_size) _upload_jobs[job_id]["log"].append( f"Uploading {_human_size(file_size)} to r2://{bucket}/{object_key}" ) client.upload_file( local_path, bucket, object_key, Callback=callback, ExtraArgs={"Metadata": { "uploaded-by": "navitrends-platform", "uploaded-at": datetime.now(timezone.utc).isoformat(), "original-file": os.path.basename(local_path), }}, ) sha_path = local_path + ".sha256" if os.path.exists(sha_path): client.upload_file(sha_path, bucket, object_key + ".sha256") _upload_jobs[job_id]["log"].append("SHA256 checksum uploaded") elapsed = round(time.time() - _upload_jobs[job_id]["started"], 1) _upload_jobs[job_id]["log"].append( f"Upload complete in {elapsed}s — r2://{bucket}/{object_key}" ) _upload_jobs[job_id]["status"] = "done" _upload_jobs[job_id]["progress"] = 100 _upload_jobs[job_id]["object_key"] = object_key except Exception as e: _upload_jobs[job_id]["log"].append(f"Upload failed: {e}") _upload_jobs[job_id]["status"] = "error" def get_upload_job(job_id: str) -> dict | None: job = _upload_jobs.get(job_id) if not job: return None return { "status": job["status"], "log": job["log"], "progress": job["progress"], "elapsed": round(time.time() - job.get("started", time.time())), } def _human_size(n: int) -> str: for unit in ("B", "KB", "MB", "GB"): if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} TB" def r2_audit_backup(key: str) -> dict: """Audit a backup in R2 by verifying its SHA256 checksum and metadata.""" cfg = _get_r2_config() bucket = cfg["bucket_name"] report = { "backup": key, "status": "unknown", "checks": [], "warnings": [], "errors": [], } try: client = _get_r2_client() # Check 1: Object exists try: head = client.head_object(Bucket=bucket, Key=key) size = head["ContentLength"] last_mod = head["LastModified"].strftime("%Y-%m-%d %H:%M UTC") report["checks"].append(f"✅ Object exists in R2 ({_human_size(size)}, modified {last_mod})") report["size"] = size report["size_human"] = _human_size(size) report["last_modified"] = last_mod except ClientError as e: report["errors"].append(f"❌ Object not found in R2: {e}") report["status"] = "error" return report # Check 2: SHA256 companion file sha_key = key + ".sha256" try: sha_obj = client.get_object(Bucket=bucket, Key=sha_key) sha_content = sha_obj["Body"].read().decode("utf-8").strip() stored_hash = sha_content.split()[0] if sha_content else "" report["checks"].append("✅ SHA256 checksum file found") report["stored_hash"] = (stored_hash[:16] + "…") if stored_hash else "N/A" except ClientError: report["warnings"].append("⚠️ No SHA256 checksum file found — integrity cannot be verified") report["stored_hash"] = None # Check 3: Upload metadata metadata = head.get("Metadata", {}) uploaded_by = metadata.get("uploaded-by", "") uploaded_at = metadata.get("uploaded-at", "") if uploaded_by: report["checks"].append(f"✅ Uploaded by: {uploaded_by}") if uploaded_at: report["checks"].append(f"✅ Upload timestamp: {uploaded_at[:19]}") if not uploaded_by and not uploaded_at: report["warnings"].append("⚠️ No upload metadata found (may have been uploaded outside the platform)") # Check 4: File size sanity if size < 10 * 1024 * 1024: report["warnings"].append(f"⚠️ File is very small ({_human_size(size)}) — may be incomplete") else: report["checks"].append(f"✅ File size looks healthy ({_human_size(size)})") # Check 5: Filename format import re filename = key.replace("backups/", "") if re.match(r"myapps-backup-\d{8}_\d{6}\.tar\.gz", filename): report["checks"].append("✅ Filename format is valid") else: report["warnings"].append(f"⚠️ Unexpected filename format: {filename}") # Final status if report["errors"]: report["status"] = "error" elif report["warnings"]: report["status"] = "warning" else: report["status"] = "healthy" except Exception as e: report["errors"].append(f"❌ Audit failed: {e}") report["status"] = "error" return report