Sync from main server - 2026-05-13 01:06:32
This commit is contained in:
336
platform/modules/cloud_backup.py
Normal file
336
platform/modules/cloud_backup.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""
|
||||
modules/cloud_backup.py
|
||||
Cloudflare R2 cloud backup integration via boto3 (S3-compatible API)
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
from datetime import datetime, timezone
|
||||
|
||||
try:
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError, NoCredentialsError
|
||||
BOTO3_AVAILABLE = True
|
||||
except ImportError:
|
||||
BOTO3_AVAILABLE = False
|
||||
|
||||
# Upload progress tracking (in-memory)
|
||||
_upload_jobs: dict = {}
|
||||
|
||||
|
||||
def _get_r2_config() -> dict:
|
||||
"""Read R2 config fresh from env vars every time (not cached at import)."""
|
||||
return {
|
||||
"account_id": os.environ.get("R2_ACCOUNT_ID", "35e00c230cc8066252a2d9890b69aea2"),
|
||||
"access_key_id": os.environ.get("R2_ACCESS_KEY_ID", ""),
|
||||
"secret_access_key": os.environ.get("R2_SECRET_ACCESS_KEY", ""),
|
||||
"bucket_name": os.environ.get("R2_BUCKET_NAME", "navitrends-backups"),
|
||||
}
|
||||
|
||||
|
||||
# Module-level name used in app.py template rendering
|
||||
R2_BUCKET_NAME = os.environ.get("R2_BUCKET_NAME", "navitrends-backups")
|
||||
|
||||
|
||||
def _get_r2_client():
|
||||
"""Return a boto3 S3 client pointed at Cloudflare R2."""
|
||||
if not BOTO3_AVAILABLE:
|
||||
raise RuntimeError("boto3 is not installed. Run: pip install boto3")
|
||||
cfg = _get_r2_config()
|
||||
if not cfg["access_key_id"] or not cfg["secret_access_key"]:
|
||||
raise RuntimeError(
|
||||
"R2 credentials not configured. "
|
||||
"Set R2_ACCESS_KEY_ID and R2_SECRET_ACCESS_KEY environment variables."
|
||||
)
|
||||
return boto3.client(
|
||||
"s3",
|
||||
endpoint_url=f"https://{cfg['account_id']}.r2.cloudflarestorage.com",
|
||||
aws_access_key_id=cfg["access_key_id"],
|
||||
aws_secret_access_key=cfg["secret_access_key"],
|
||||
region_name="auto",
|
||||
)
|
||||
|
||||
|
||||
def r2_is_configured() -> bool:
|
||||
"""Return True only if credentials are set and boto3 is available."""
|
||||
cfg = _get_r2_config()
|
||||
return BOTO3_AVAILABLE and bool(cfg["access_key_id"]) and bool(cfg["secret_access_key"])
|
||||
|
||||
|
||||
def r2_test_connection() -> dict:
|
||||
if not BOTO3_AVAILABLE:
|
||||
return {"success": False, "message": "boto3 not installed (pip install boto3)", "bucket_exists": False}
|
||||
cfg = _get_r2_config()
|
||||
if not cfg["access_key_id"] or not cfg["secret_access_key"]:
|
||||
return {"success": False, "message": "R2 credentials not set", "bucket_exists": False}
|
||||
try:
|
||||
client = _get_r2_client()
|
||||
resp = client.list_buckets()
|
||||
buckets = [b["Name"] for b in resp.get("Buckets", [])]
|
||||
bucket = cfg["bucket_name"]
|
||||
exists = bucket in buckets
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Connected to R2. Bucket '{bucket}': {'exists' if exists else 'not created yet'}",
|
||||
"bucket_exists": exists,
|
||||
"buckets": buckets,
|
||||
}
|
||||
except NoCredentialsError:
|
||||
return {"success": False, "message": "Invalid R2 credentials", "bucket_exists": False}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": str(e), "bucket_exists": False}
|
||||
|
||||
|
||||
def r2_ensure_bucket() -> tuple[bool, str]:
|
||||
cfg = _get_r2_config()
|
||||
bucket = cfg["bucket_name"]
|
||||
try:
|
||||
client = _get_r2_client()
|
||||
try:
|
||||
client.head_bucket(Bucket=bucket)
|
||||
return True, f"Bucket '{bucket}' already exists"
|
||||
except ClientError as e:
|
||||
if e.response["Error"]["Code"] in ("404", "NoSuchBucket"):
|
||||
client.create_bucket(Bucket=bucket)
|
||||
return True, f"Bucket '{bucket}' created successfully"
|
||||
raise
|
||||
except Exception as e:
|
||||
return False, f"Failed to ensure bucket: {e}"
|
||||
|
||||
|
||||
def r2_list_backups() -> list[dict]:
|
||||
if not r2_is_configured():
|
||||
return []
|
||||
cfg = _get_r2_config()
|
||||
bucket = cfg["bucket_name"]
|
||||
try:
|
||||
client = _get_r2_client()
|
||||
resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/")
|
||||
objects = []
|
||||
for obj in resp.get("Contents", []):
|
||||
# Skip .sha256 checksum files — those are audit files, not backups
|
||||
if obj["Key"].endswith(".sha256"):
|
||||
continue
|
||||
size = obj["Size"]
|
||||
objects.append({
|
||||
"key": obj["Key"],
|
||||
"name": obj["Key"].replace("backups/", ""),
|
||||
"size": size,
|
||||
"size_human": _human_size(size),
|
||||
"last_modified": obj["LastModified"].isoformat(),
|
||||
"last_modified_str": obj["LastModified"].strftime("%Y-%m-%d %H:%M UTC"),
|
||||
})
|
||||
return sorted(objects, key=lambda x: x["last_modified"], reverse=True)
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def r2_delete_backup(key: str) -> tuple[bool, str]:
|
||||
cfg = _get_r2_config()
|
||||
bucket = cfg["bucket_name"]
|
||||
try:
|
||||
client = _get_r2_client()
|
||||
client.delete_object(Bucket=bucket, Key=key)
|
||||
return True, f"Deleted {key} from R2"
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
||||
|
||||
def r2_get_bucket_stats() -> dict:
|
||||
if not r2_is_configured():
|
||||
return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": False}
|
||||
cfg = _get_r2_config()
|
||||
bucket = cfg["bucket_name"]
|
||||
try:
|
||||
client = _get_r2_client()
|
||||
resp = client.list_objects_v2(Bucket=bucket, Prefix="backups/")
|
||||
# Count and size only real archives, not .sha256 checksum files
|
||||
objects = [o for o in resp.get("Contents", []) if not o["Key"].endswith(".sha256")]
|
||||
total = sum(o["Size"] for o in objects)
|
||||
return {
|
||||
"count": len(objects),
|
||||
"total_size": total,
|
||||
"total_size_human": _human_size(total),
|
||||
"configured": True,
|
||||
}
|
||||
except Exception:
|
||||
return {"count": 0, "total_size": 0, "total_size_human": "0 B", "configured": True}
|
||||
|
||||
|
||||
class _ProgressCallback:
|
||||
def __init__(self, job_id: str, file_size: int):
|
||||
self._job_id = job_id
|
||||
self._size = file_size
|
||||
self._uploaded = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def __call__(self, bytes_amount: int):
|
||||
with self._lock:
|
||||
self._uploaded += bytes_amount
|
||||
pct = int(self._uploaded * 100 / self._size) if self._size else 0
|
||||
if self._job_id in _upload_jobs:
|
||||
_upload_jobs[self._job_id]["uploaded"] = self._uploaded
|
||||
_upload_jobs[self._job_id]["progress"] = pct
|
||||
_upload_jobs[self._job_id]["log"].append(
|
||||
f" {_human_size(self._uploaded)} / {_human_size(self._size)} ({pct}%)"
|
||||
)
|
||||
|
||||
|
||||
def r2_upload_async(local_path: str, job_id: str) -> None:
|
||||
_upload_jobs[job_id] = {
|
||||
"status": "running",
|
||||
"log": [f"Starting upload: {os.path.basename(local_path)}"],
|
||||
"progress": 0,
|
||||
"uploaded": 0,
|
||||
"started": time.time(),
|
||||
}
|
||||
try:
|
||||
if not r2_is_configured():
|
||||
raise RuntimeError("R2 credentials not configured")
|
||||
|
||||
cfg = _get_r2_config()
|
||||
bucket = cfg["bucket_name"]
|
||||
|
||||
ok, msg = r2_ensure_bucket()
|
||||
if not ok:
|
||||
raise RuntimeError(msg)
|
||||
_upload_jobs[job_id]["log"].append(f"OK: {msg}")
|
||||
|
||||
file_size = os.path.getsize(local_path)
|
||||
object_key = f"backups/{os.path.basename(local_path)}"
|
||||
client = _get_r2_client()
|
||||
callback = _ProgressCallback(job_id, file_size)
|
||||
|
||||
_upload_jobs[job_id]["log"].append(
|
||||
f"Uploading {_human_size(file_size)} to r2://{bucket}/{object_key}"
|
||||
)
|
||||
|
||||
client.upload_file(
|
||||
local_path,
|
||||
bucket,
|
||||
object_key,
|
||||
Callback=callback,
|
||||
ExtraArgs={"Metadata": {
|
||||
"uploaded-by": "navitrends-platform",
|
||||
"uploaded-at": datetime.now(timezone.utc).isoformat(),
|
||||
"original-file": os.path.basename(local_path),
|
||||
}},
|
||||
)
|
||||
|
||||
sha_path = local_path + ".sha256"
|
||||
if os.path.exists(sha_path):
|
||||
client.upload_file(sha_path, bucket, object_key + ".sha256")
|
||||
_upload_jobs[job_id]["log"].append("SHA256 checksum uploaded")
|
||||
|
||||
elapsed = round(time.time() - _upload_jobs[job_id]["started"], 1)
|
||||
_upload_jobs[job_id]["log"].append(
|
||||
f"Upload complete in {elapsed}s — r2://{bucket}/{object_key}"
|
||||
)
|
||||
_upload_jobs[job_id]["status"] = "done"
|
||||
_upload_jobs[job_id]["progress"] = 100
|
||||
_upload_jobs[job_id]["object_key"] = object_key
|
||||
|
||||
except Exception as e:
|
||||
_upload_jobs[job_id]["log"].append(f"Upload failed: {e}")
|
||||
_upload_jobs[job_id]["status"] = "error"
|
||||
|
||||
|
||||
def get_upload_job(job_id: str) -> dict | None:
|
||||
job = _upload_jobs.get(job_id)
|
||||
if not job:
|
||||
return None
|
||||
return {
|
||||
"status": job["status"],
|
||||
"log": job["log"],
|
||||
"progress": job["progress"],
|
||||
"elapsed": round(time.time() - job.get("started", time.time())),
|
||||
}
|
||||
|
||||
|
||||
def _human_size(n: int) -> str:
|
||||
for unit in ("B", "KB", "MB", "GB"):
|
||||
if n < 1024:
|
||||
return f"{n:.1f} {unit}"
|
||||
n /= 1024
|
||||
return f"{n:.1f} TB"
|
||||
|
||||
def r2_audit_backup(key: str) -> dict:
|
||||
"""Audit a backup in R2 by verifying its SHA256 checksum and metadata."""
|
||||
cfg = _get_r2_config()
|
||||
bucket = cfg["bucket_name"]
|
||||
report = {
|
||||
"backup": key,
|
||||
"status": "unknown",
|
||||
"checks": [],
|
||||
"warnings": [],
|
||||
"errors": [],
|
||||
}
|
||||
|
||||
try:
|
||||
client = _get_r2_client()
|
||||
|
||||
# Check 1: Object exists
|
||||
try:
|
||||
head = client.head_object(Bucket=bucket, Key=key)
|
||||
size = head["ContentLength"]
|
||||
last_mod = head["LastModified"].strftime("%Y-%m-%d %H:%M UTC")
|
||||
report["checks"].append(f"✅ Object exists in R2 ({_human_size(size)}, modified {last_mod})")
|
||||
report["size"] = size
|
||||
report["size_human"] = _human_size(size)
|
||||
report["last_modified"] = last_mod
|
||||
except ClientError as e:
|
||||
report["errors"].append(f"❌ Object not found in R2: {e}")
|
||||
report["status"] = "error"
|
||||
return report
|
||||
|
||||
# Check 2: SHA256 companion file
|
||||
sha_key = key + ".sha256"
|
||||
try:
|
||||
sha_obj = client.get_object(Bucket=bucket, Key=sha_key)
|
||||
sha_content = sha_obj["Body"].read().decode("utf-8").strip()
|
||||
stored_hash = sha_content.split()[0] if sha_content else ""
|
||||
report["checks"].append("✅ SHA256 checksum file found")
|
||||
report["stored_hash"] = (stored_hash[:16] + "…") if stored_hash else "N/A"
|
||||
except ClientError:
|
||||
report["warnings"].append("⚠️ No SHA256 checksum file found — integrity cannot be verified")
|
||||
report["stored_hash"] = None
|
||||
|
||||
# Check 3: Upload metadata
|
||||
metadata = head.get("Metadata", {})
|
||||
uploaded_by = metadata.get("uploaded-by", "")
|
||||
uploaded_at = metadata.get("uploaded-at", "")
|
||||
if uploaded_by:
|
||||
report["checks"].append(f"✅ Uploaded by: {uploaded_by}")
|
||||
if uploaded_at:
|
||||
report["checks"].append(f"✅ Upload timestamp: {uploaded_at[:19]}")
|
||||
if not uploaded_by and not uploaded_at:
|
||||
report["warnings"].append("⚠️ No upload metadata found (may have been uploaded outside the platform)")
|
||||
|
||||
# Check 4: File size sanity
|
||||
if size < 10 * 1024 * 1024:
|
||||
report["warnings"].append(f"⚠️ File is very small ({_human_size(size)}) — may be incomplete")
|
||||
else:
|
||||
report["checks"].append(f"✅ File size looks healthy ({_human_size(size)})")
|
||||
|
||||
# Check 5: Filename format
|
||||
import re
|
||||
filename = key.replace("backups/", "")
|
||||
if re.match(r"myapps-backup-\d{8}_\d{6}\.tar\.gz", filename):
|
||||
report["checks"].append("✅ Filename format is valid")
|
||||
else:
|
||||
report["warnings"].append(f"⚠️ Unexpected filename format: {filename}")
|
||||
|
||||
# Final status
|
||||
if report["errors"]:
|
||||
report["status"] = "error"
|
||||
elif report["warnings"]:
|
||||
report["status"] = "warning"
|
||||
else:
|
||||
report["status"] = "healthy"
|
||||
|
||||
except Exception as e:
|
||||
report["errors"].append(f"❌ Audit failed: {e}")
|
||||
report["status"] = "error"
|
||||
|
||||
return report
|
||||
Reference in New Issue
Block a user