import os import subprocess import pwd import re import tempfile import stat def _run(cmd, timeout=20): try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) return r.stdout.strip(), r.stderr.strip() except subprocess.TimeoutExpired: return '', 'timeout' except Exception as e: return '', str(e) def get_all_users(): """Return list of non-system users (uid >= 1000) with info.""" users = [] try: for pw in pwd.getpwall(): if pw.pw_uid < 1000 or pw.pw_name == 'nobody': continue uid = pw.pw_uid name = pw.pw_name home = pw.pw_dir sock = f"/run/user/{uid}/docker.sock" has_docker = os.path.exists(sock) disk_out, _ = _run(f"du -sh {home} 2>/dev/null | cut -f1") disk_used = disk_out.strip() or 'N/A' linger_out, _ = _run(f"loginctl show-user {name} --property=Linger 2>/dev/null") linger = 'yes' in linger_out.lower() container_count = 0 if has_docker: cnt_out, _ = _run( f"DOCKER_HOST=unix://{sock} docker ps -aq 2>/dev/null | wc -l" ) try: container_count = int(cnt_out.strip()) except ValueError: container_count = 0 # Check if user has a dedicated virtual disk mounted disk_img = f"/home/{name}.img" has_vdisk = os.path.exists(disk_img) vdisk_mount = None vdisk_size = None if has_vdisk: # Check if it's mounted somewhere mnt_out, _ = _run(f"findmnt -S {disk_img} -o TARGET --noheadings 2>/dev/null") vdisk_mount = mnt_out.strip() or None # Get size of the image size_out, _ = _run(f"du -sh {disk_img} 2>/dev/null | cut -f1") vdisk_size = size_out.strip() or None users.append({ 'name': name, 'uid': uid, 'home': home, 'has_docker': has_docker, 'docker_socket': sock if has_docker else None, 'disk_used': disk_used, 'linger': linger, 'container_count': container_count, 'has_vdisk': has_vdisk, 'vdisk_mount': vdisk_mount, 'vdisk_size': vdisk_size, }) except Exception as e: print(f"[users] Error listing users: {e}") return users def get_user_containers(username): """Get containers running under a specific user's rootless docker.""" try: pw = pwd.getpwnam(username) except KeyError: return [] uid = pw.pw_uid sock = f"/run/user/{uid}/docker.sock" if not os.path.exists(sock): return [] out, _ = _run( f"DOCKER_HOST=unix://{sock} " f"docker ps -a --format '{{{{.Names}}}}|{{{{.Status}}}}|{{{{.Image}}}}|{{{{.Ports}}}}' 2>/dev/null" ) containers = [] if out: for line in out.split('\n'): if '|' not in line: continue parts = line.split('|') containers.append({ 'name': parts[0] if len(parts) > 0 else '', 'status': parts[1] if len(parts) > 1 else '', 'image': parts[2] if len(parts) > 2 else '', 'ports': parts[3] if len(parts) > 3 else '', 'owner': username, }) return containers def get_all_users_containers(): """Get containers from ALL users' rootless docker instances.""" all_containers = [] for user in get_all_users(): if user['has_docker']: all_containers.extend(get_user_containers(user['name'])) return all_containers def create_user(username, password=None, setup_docker=True, disk_quota_mb=None): """ Create a new system user and optionally set up rootless docker + virtual disk. disk_quota_mb: if set, creates a loop-device virtual disk of that size (MB) and mounts it as the user's home directory. Returns (success: bool, log_text: str) """ logs = [] # Validate if not re.match(r'^[a-z][a-z0-9_-]{1,30}$', username): return False, "Invalid username. Use lowercase letters, numbers, _ or -" # Check existence try: pwd.getpwnam(username) return False, f"User '{username}' already exists" except KeyError: pass # Create user out, err = _run(f"useradd -m -s /bin/bash {username}") if err and 'already exists' not in err: return False, f"useradd failed: {err}" logs.append(f"✅ User {username} created") # Set password if password: out, err = _run(f"echo '{username}:{password}' | chpasswd") if err: logs.append(f"⚠️ Password set failed: {err}") else: logs.append("✅ Password set") # Install prerequisites _run("apt-get install -y uidmap dbus-user-session curl 2>/dev/null", timeout=60) logs.append("✅ Prerequisites ready") # Enable linger _run(f"loginctl enable-linger {username}") logs.append("✅ Linger enabled") # Virtual disk (loop device) instead of quota if disk_quota_mb: success, msg = _setup_virtual_disk(username, disk_quota_mb, logs) if not success: logs.append(f"⚠️ Virtual disk setup failed: {msg}") # Setup rootless docker if setup_docker: success, msg = _setup_rootless_docker_via_script(username, logs) if not success: logs.append(f"⚠️ Docker setup incomplete: {msg}") return True, '\n'.join(logs) def _setup_virtual_disk(username, disk_mb, logs): """ Create a loop-device virtual disk for a user and mount it as their home. Steps: 1. Create a blank image file at /home/.img 2. Format it as ext4 3. Copy existing home contents into it 4. Mount it over /home/ 5. Add to /etc/fstab for persistence across reboots 6. Fix ownership Returns (success: bool, message: str) """ try: pw = pwd.getpwnam(username) except KeyError as e: return False, str(e) home = pw.pw_dir # e.g. /home/secuser4 img_path = f"/home/{username}.img" logs.append(f"📦 Creating {disk_mb} MB virtual disk at {img_path} ...") # ── Step 1: Create the blank image ────────────────────────────────────── # Use fallocate (fast, instant) with dd fallback out, err = _run(f"fallocate -l {disk_mb}M {img_path}", timeout=60) if err and 'fallocate' in err: logs.append(" ↳ fallocate not available, using dd (this may take a moment)...") out, err = _run( f"dd if=/dev/zero of={img_path} bs=1M count={disk_mb} status=none", timeout=600 ) if err: return False, f"Failed to create image: {err}" logs.append(f" ✅ Image file created ({disk_mb} MB)") # ── Step 2: Format as ext4 ─────────────────────────────────────────────── out, err = _run(f"mkfs.ext4 -F {img_path}", timeout=60) if err and 'mke2fs' not in err and 'Discarding device blocks' not in err: # mkfs.ext4 writes info to stderr even on success; only fail on real errors if 'error' in err.lower() or 'failed' in err.lower(): return False, f"mkfs.ext4 failed: {err}" logs.append(" ✅ Formatted as ext4") # ── Step 3: Back up current home contents ──────────────────────────────── tmp_backup = f"/tmp/{username}_home_backup" _run(f"cp -a {home}/. {tmp_backup}/ 2>/dev/null") # ── Step 4: Mount the image over the user's home ───────────────────────── out, err = _run(f"mount -o loop {img_path} {home}") if err: return False, f"mount failed: {err}" logs.append(f" ✅ Mounted at {home}") # ── Step 5: Restore home contents into the new disk ────────────────────── _run(f"cp -a {tmp_backup}/. {home}/ 2>/dev/null") _run(f"rm -rf {tmp_backup}") # ── Step 6: Fix ownership ───────────────────────────────────────────────── _run(f"chown -R {username}:{username} {home}") logs.append(" ✅ Ownership set") # ── Step 7: Add to /etc/fstab for persistence ──────────────────────────── fstab_line = f"{img_path} {home} ext4 loop,defaults 0 0\n" try: with open('/etc/fstab', 'r') as f: fstab = f.read() if img_path not in fstab: with open('/etc/fstab', 'a') as f: f.write(fstab_line) logs.append(" ✅ Added to /etc/fstab (persistent across reboots)") else: logs.append(" ℹ️ Already in /etc/fstab") except Exception as e: logs.append(f" ⚠️ Could not update /etc/fstab: {e}") logs.append(f"✅ Virtual disk ready: {disk_mb} MB dedicated to {username}") return True, "ok" def _remove_virtual_disk(username, logs): """Unmount and remove the virtual disk image for a user.""" try: pw = pwd.getpwnam(username) home = pw.pw_dir except KeyError: return img_path = f"/home/{username}.img" # Unmount _run(f"umount {home} 2>/dev/null") logs.append(f" ↳ Unmounted {home}") # Remove image if os.path.exists(img_path): os.remove(img_path) logs.append(f" ↳ Removed {img_path}") # Remove from fstab try: with open('/etc/fstab', 'r') as f: lines = f.readlines() with open('/etc/fstab', 'w') as f: for line in lines: if img_path not in line: f.write(line) logs.append(" ↳ Removed from /etc/fstab") except Exception as e: logs.append(f" ⚠️ fstab cleanup error: {e}") def _setup_rootless_docker_via_script(username, logs): """ Setup rootless Docker for a user by running the official installer. This must be done AS the user in a proper login shell. """ try: pw = pwd.getpwnam(username) uid = pw.pw_uid home = pw.pw_dir except KeyError as e: return False, str(e) # First, ensure the sysctl setting is applied (critical!) _run("sysctl -w kernel.apparmor_restrict_unprivileged_userns=0") _run("echo 'kernel.apparmor_restrict_unprivileged_userns=0' >> /etc/sysctl.conf") # Ensure XDG_RUNTIME_DIR exists with correct permissions runtime_dir = f"/run/user/{uid}" os.makedirs(runtime_dir, exist_ok=True) _run(f"chown {username}:{username} {runtime_dir}") _run(f"chmod 700 {runtime_dir}") logs.append(f"📝 Installing rootless Docker for {username}...") # Create a simple installation script that runs as the user install_cmd = f"""bash -c ' export XDG_RUNTIME_DIR=/run/user/{uid} export PATH=$HOME/bin:$PATH mkdir -p $XDG_RUNTIME_DIR chmod 700 $XDG_RUNTIME_DIR # Install rootless Docker curl -fsSL https://get.docker.com/rootless | sh # Add environment variables to .bashrc echo "export PATH=$HOME/bin:\\$PATH" >> ~/.bashrc echo "export DOCKER_HOST=unix:///run/user/{uid}/docker.sock" >> ~/.bashrc echo "export XDG_RUNTIME_DIR=/run/user/{uid}" >> ~/.bashrc # Create systemd service mkdir -p ~/.config/systemd/user cat > ~/.config/systemd/user/docker.service << EOF [Unit] Description=Docker Rootless Daemon After=network.target [Service] Type=simple ExecStart=$HOME/bin/dockerd-rootless.sh Restart=always RestartSec=10 Environment=PATH=$HOME/bin:/usr/local/bin:/usr/bin:/bin Environment=XDG_RUNTIME_DIR=/run/user/{uid} [Install] WantedBy=default.target EOF # Enable and start the service systemctl --user daemon-reload systemctl --user enable docker.service systemctl --user start docker.service # Wait for socket sleep 5 '""" try: # Run the installation as the user result = subprocess.run( ['su', '-', username, '-c', install_cmd], capture_output=True, text=True, timeout=300 ) # Log output for line in result.stdout.split('\n'): if line.strip(): logs.append(line.strip()) if result.stderr: for line in result.stderr.split('\n')[-5:]: if line.strip(): logs.append(f" stderr: {line.strip()}") # Check if socket exists sock = f"/run/user/{uid}/docker.sock" if os.path.exists(sock): logs.append(f"✅ Rootless Docker ready for {username}") return True, "ok" else: logs.append(f"⚠️ Socket not found at {sock}") return True, "socket_pending" except subprocess.TimeoutExpired: logs.append("⚠️ Setup timed out after 300s") return False, "timeout" except Exception as e: logs.append(f"⚠️ Setup failed: {e}") return False, str(e) def delete_user(username, remove_home=False): """Remove a user. Returns (success, message).""" logs = [] try: pwd.getpwnam(username) except KeyError: return False, f"User '{username}' does not exist" # Stop their docker service first try: pw = pwd.getpwnam(username) _run( f"XDG_RUNTIME_DIR=/run/user/{pw.pw_uid} " f"su --login {username} --command 'systemctl --user stop docker-rootless.service 2>/dev/null' " f"2>/dev/null" ) except Exception: pass _run(f"loginctl disable-linger {username} 2>/dev/null") # Clean up virtual disk BEFORE userdel (userdel might complain if home is busy) _remove_virtual_disk(username, logs) flag = '-r' if remove_home else '' out, err = _run(f"userdel {flag} {username}") if err and 'does not exist' not in err and 'mail spool' not in err: return False, f"userdel error: {err}" msg = f"✅ User {username} deleted" + (" (home removed)" if remove_home else "") logs.append(msg) return True, '\n'.join(logs) def get_user_disk_usage(username): try: pw = pwd.getpwnam(username) except KeyError: return {} total_out, _ = _run(f"du -sh {pw.pw_dir} 2>/dev/null | cut -f1") # Also check if there's a virtual disk and its capacity img_path = f"/home/{username}.img" vdisk_info = {} if os.path.exists(img_path): # Get mounted filesystem usage df_out, _ = _run(f"df -h {pw.pw_dir} 2>/dev/null | tail -1") if df_out: parts = df_out.split() if len(parts) >= 4: vdisk_info = { 'size': parts[1], 'used': parts[2], 'available': parts[3], 'use_pct': parts[4] if len(parts) > 4 else '?', } return { 'home': pw.pw_dir, 'total': total_out or 'N/A', 'vdisk': vdisk_info, }