337 lines
12 KiB
Python
337 lines
12 KiB
Python
"""
|
|
modules/cloud_backup.py
|
|
Cloudflare R2 cloud backup integration via boto3 (S3-compatible API)
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
|
|
try:
|
|
import boto3
|
|
from botocore.exceptions import ClientError, NoCredentialsError
|
|
BOTO3_AVAILABLE = True
|
|
except ImportError:
|
|
BOTO3_AVAILABLE = False
|
|
|
|
# Upload progress tracking (in-memory)
|
|
_upload_jobs: dict = {}
|
|
|
|
|
|
def _get_r2_config() -> dict:
|
|
"""Read R2 config fresh from env vars every time (not cached at import)."""
|
|
return {
|
|
"account_id": os.environ.get("R2_ACCOUNT_ID", "35e00c230cc8066252a2d9890b69aea2"),
|
|
"access_key_id": os.environ.get("R2_ACCESS_KEY_ID", ""),
|
|
"secret_access_key": os.environ.get("R2_SECRET_ACCESS_KEY", ""),
|
|
"bucket_name": os.environ.get("R2_BUCKET_NAME", "navitrends-backups"),
|
|
}
|
|
|
|
|
|
# Module-level name used in app.py template rendering
|
|
R2_BUCKET_NAME = os.environ.get("R2_BUCKET_NAME", "navitrends-backups")
|
|
|
|
|
|
def _get_r2_client():
|
|
"""Return a boto3 S3 client pointed at Cloudflare R2."""
|
|
if not BOTO3_AVAILABLE:
|
|
raise RuntimeError("boto3 is not installed. Run: pip install boto3")
|
|
cfg = _get_r2_config()
|
|
if not cfg["access_key_id"] or not cfg["secret_access_key"]:
|
|
raise RuntimeError(
|
|
"R2 credentials not configured. "
|
|
"Set R2_ACCESS_KEY_ID and R2_SECRET_ACCESS_KEY environment variables."
|
|
)
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com",
|
|
aws_access_key_id=cfg["access_key_id"],
|
|
aws_secret_access_key=cfg["secret_access_key"],
|
|
region_name="auto",
|
|
)
|
|
|
|
|
|
def r2_is_configured() -> bool:
|
|
"""Return True only if credentials are set and boto3 is available."""
|
|
cfg = _get_r2_config()
|
|
return BOTO3_AVAILABLE and bool(cfg["access_key_id"]) and bool(cfg["secret_access_key"])
|
|
|
|
|
|
def r2_test_connection() -> dict:
|
|
if not BOTO3_AVAILABLE:
|
|
return {"success": False, "message": "boto3 not installed (pip install boto3)", "bucket_exists": False}
|
|
cfg = _get_r2_config()
|
|
if not cfg["access_key_id"] or not cfg["secret_access_key"]:
|
|
return {"success": False, "message": "R2 credentials not set", "bucket_exists": False}
|
|
try:
|
|
client = _get_r2_client()
|
|
resp = client.list_buckets()
|
|
buckets = [b["Name"] for b in resp.get("Buckets", [])]
|
|
bucket = cfg["bucket_name"]
|
|
exists = bucket in buckets
|
|
return {
|
|
"success": True,
|
|
"message": f"Connected to R2. Bucket '{bucket}': {'exists' if exists else 'not created yet'}",
|
|
"bucket_exists": exists,
|
|
"buckets": buckets,
|
|
}
|
|
except NoCredentialsError:
|
|
return {"success": False, "message": "Invalid R2 credentials", "bucket_exists": False}
|
|
except Exception as e:
|
|
return {"success": False, "message": str(e), "bucket_exists": False}
|
|
|
|
|
|
def r2_ensure_bucket() -> tuple[bool, str]:
|
|
cfg = _get_r2_config()
|
|
bucket = cfg["bucket_name"]
|
|
try:
|
|
client = _get_r2_client()
|
|
try:
|
|
client.head_bucket(Bucket=bucket)
|
|
return True, f"Bucket '{bucket}' already exists"
|
|
except ClientError as e:
|
|
if e.response["Error"]["Code"] in ("404", "NoSuchBucket"):
|
|
client.create_bucket(Bucket=bucket)
|
|
return True, f"Bucket '{bucket}' created successfully"
|
|
raise
|
|
except Exception as e:
|
|
return False, f"Failed to ensure bucket: {e}"
|
|
|
|
|
|
def r2_list_backups() -> list[dict]:
|
|
if not r2_is_configured():
|
|
return []
|
|
cfg = _get_r2_config()
|
|
bucket = cfg["bucket_name"]
|
|
try:
|
|
client = _get_r2_client()
|
|
resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/")
|
|
objects = []
|
|
for obj in resp.get("Contents", []):
|
|
# Skip .sha256 checksum files — those are audit files, not backups
|
|
if obj["Key"].endswith(".sha256"):
|
|
continue
|
|
size = obj["Size"]
|
|
objects.append({
|
|
"key": obj["Key"],
|
|
"name": obj["Key"].replace("backups/", ""),
|
|
"size": size,
|
|
"size_human": _human_size(size),
|
|
"last_modified": obj["LastModified"].isoformat(),
|
|
"last_modified_str": obj["LastModified"].strftime("%Y-%m-%d %H:%M UTC"),
|
|
})
|
|
return sorted(objects, key=lambda x: x["last_modified"], reverse=True)
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def r2_delete_backup(key: str) -> tuple[bool, str]:
|
|
cfg = _get_r2_config()
|
|
bucket = cfg["bucket_name"]
|
|
try:
|
|
client = _get_r2_client()
|
|
client.delete_object(Bucket=bucket, Key=key)
|
|
return True, f"Deleted {key} from R2"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def r2_get_bucket_stats() -> dict:
|
|
if not r2_is_configured():
|
|
return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": False}
|
|
cfg = _get_r2_config()
|
|
bucket = cfg["bucket_name"]
|
|
try:
|
|
client = _get_r2_client()
|
|
resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/")
|
|
# Count and size only real archives, not .sha256 checksum files
|
|
objects = [o for o in resp.get("Contents", []) if not o["Key"].endswith(".sha256")]
|
|
total = sum(o["Size"] for o in objects)
|
|
return {
|
|
"count": len(objects),
|
|
"total_size": total,
|
|
"total_size_human": _human_size(total),
|
|
"configured": True,
|
|
}
|
|
except Exception:
|
|
return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": True}
|
|
|
|
|
|
class _ProgressCallback:
|
|
def __init__(self, job_id: str, file_size: int):
|
|
self._job_id = job_id
|
|
self._size = file_size
|
|
self._uploaded = 0
|
|
self._lock = threading.Lock()
|
|
|
|
def __call__(self, bytes_amount: int):
|
|
with self._lock:
|
|
self._uploaded += bytes_amount
|
|
pct = int(self._uploaded * 100 / self._size) if self._size else 0
|
|
if self._job_id in _upload_jobs:
|
|
_upload_jobs[self._job_id]["uploaded"] = self._uploaded
|
|
_upload_jobs[self._job_id]["progress"] = pct
|
|
_upload_jobs[self._job_id]["log"].append(
|
|
f" {_human_size(self._uploaded)} / {_human_size(self._size)} ({pct}%)"
|
|
)
|
|
|
|
|
|
def r2_upload_async(local_path: str, job_id: str) -> None:
|
|
_upload_jobs[job_id] = {
|
|
"status": "running",
|
|
"log": [f"Starting upload: {os.path.basename(local_path)}"],
|
|
"progress": 0,
|
|
"uploaded": 0,
|
|
"started": time.time(),
|
|
}
|
|
try:
|
|
if not r2_is_configured():
|
|
raise RuntimeError("R2 credentials not configured")
|
|
|
|
cfg = _get_r2_config()
|
|
bucket = cfg["bucket_name"]
|
|
|
|
ok, msg = r2_ensure_bucket()
|
|
if not ok:
|
|
raise RuntimeError(msg)
|
|
_upload_jobs[job_id]["log"].append(f"OK: {msg}")
|
|
|
|
file_size = os.path.getsize(local_path)
|
|
object_key = f"backups/{os.path.basename(local_path)}"
|
|
client = _get_r2_client()
|
|
callback = _ProgressCallback(job_id, file_size)
|
|
|
|
_upload_jobs[job_id]["log"].append(
|
|
f"Uploading {_human_size(file_size)} to r2://{bucket}/{object_key}"
|
|
)
|
|
|
|
client.upload_file(
|
|
local_path,
|
|
bucket,
|
|
object_key,
|
|
Callback=callback,
|
|
ExtraArgs={"Metadata": {
|
|
"uploaded-by": "navitrends-platform",
|
|
"uploaded-at": datetime.now(timezone.utc).isoformat(),
|
|
"original-file": os.path.basename(local_path),
|
|
}},
|
|
)
|
|
|
|
sha_path = local_path + ".sha256"
|
|
if os.path.exists(sha_path):
|
|
client.upload_file(sha_path, bucket, object_key + ".sha256")
|
|
_upload_jobs[job_id]["log"].append("SHA256 checksum uploaded")
|
|
|
|
elapsed = round(time.time() - _upload_jobs[job_id]["started"], 1)
|
|
_upload_jobs[job_id]["log"].append(
|
|
f"Upload complete in {elapsed}s — r2://{bucket}/{object_key}"
|
|
)
|
|
_upload_jobs[job_id]["status"] = "done"
|
|
_upload_jobs[job_id]["progress"] = 100
|
|
_upload_jobs[job_id]["object_key"] = object_key
|
|
|
|
except Exception as e:
|
|
_upload_jobs[job_id]["log"].append(f"Upload failed: {e}")
|
|
_upload_jobs[job_id]["status"] = "error"
|
|
|
|
|
|
def get_upload_job(job_id: str) -> dict | None:
|
|
job = _upload_jobs.get(job_id)
|
|
if not job:
|
|
return None
|
|
return {
|
|
"status": job["status"],
|
|
"log": job["log"],
|
|
"progress": job["progress"],
|
|
"elapsed": round(time.time() - job.get("started", time.time())),
|
|
}
|
|
|
|
|
|
def _human_size(n: int) -> str:
|
|
for unit in ("B", "KB", "MB", "GB"):
|
|
if n < 1024:
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} TB"
|
|
|
|
def r2_audit_backup(key: str) -> dict:
|
|
"""Audit a backup in R2 by verifying its SHA256 checksum and metadata."""
|
|
cfg = _get_r2_config()
|
|
bucket = cfg["bucket_name"]
|
|
report = {
|
|
"backup": key,
|
|
"status": "unknown",
|
|
"checks": [],
|
|
"warnings": [],
|
|
"errors": [],
|
|
}
|
|
|
|
try:
|
|
client = _get_r2_client()
|
|
|
|
# Check 1: Object exists
|
|
try:
|
|
head = client.head_object(Bucket=bucket, Key=key)
|
|
size = head["ContentLength"]
|
|
last_mod = head["LastModified"].strftime("%Y-%m-%d %H:%M UTC")
|
|
report["checks"].append(f"✅ Object exists in R2 ({_human_size(size)}, modified {last_mod})")
|
|
report["size"] = size
|
|
report["size_human"] = _human_size(size)
|
|
report["last_modified"] = last_mod
|
|
except ClientError as e:
|
|
report["errors"].append(f"❌ Object not found in R2: {e}")
|
|
report["status"] = "error"
|
|
return report
|
|
|
|
# Check 2: SHA256 companion file
|
|
sha_key = key + ".sha256"
|
|
try:
|
|
sha_obj = client.get_object(Bucket=bucket, Key=sha_key)
|
|
sha_content = sha_obj["Body"].read().decode("utf-8").strip()
|
|
stored_hash = sha_content.split()[0] if sha_content else ""
|
|
report["checks"].append("✅ SHA256 checksum file found")
|
|
report["stored_hash"] = (stored_hash[:16] + "…") if stored_hash else "N/A"
|
|
except ClientError:
|
|
report["warnings"].append("⚠️ No SHA256 checksum file found — integrity cannot be verified")
|
|
report["stored_hash"] = None
|
|
|
|
# Check 3: Upload metadata
|
|
metadata = head.get("Metadata", {})
|
|
uploaded_by = metadata.get("uploaded-by", "")
|
|
uploaded_at = metadata.get("uploaded-at", "")
|
|
if uploaded_by:
|
|
report["checks"].append(f"✅ Uploaded by: {uploaded_by}")
|
|
if uploaded_at:
|
|
report["checks"].append(f"✅ Upload timestamp: {uploaded_at[:19]}")
|
|
if not uploaded_by and not uploaded_at:
|
|
report["warnings"].append("⚠️ No upload metadata found (may have been uploaded outside the platform)")
|
|
|
|
# Check 4: File size sanity
|
|
if size < 10 * 1024 * 1024:
|
|
report["warnings"].append(f"⚠️ File is very small ({_human_size(size)}) — may be incomplete")
|
|
else:
|
|
report["checks"].append(f"✅ File size looks healthy ({_human_size(size)})")
|
|
|
|
# Check 5: Filename format
|
|
import re
|
|
filename = key.replace("backups/", "")
|
|
if re.match(r"myapps-backup-\d{8}_\d{6}\.tar\.gz", filename):
|
|
report["checks"].append("✅ Filename format is valid")
|
|
else:
|
|
report["warnings"].append(f"⚠️ Unexpected filename format: {filename}")
|
|
|
|
# Final status
|
|
if report["errors"]:
|
|
report["status"] = "error"
|
|
elif report["warnings"]:
|
|
report["status"] = "warning"
|
|
else:
|
|
report["status"] = "healthy"
|
|
|
|
except Exception as e:
|
|
report["errors"].append(f"❌ Audit failed: {e}")
|
|
report["status"] = "error"
|
|
|
|
return report
|