mirror of
https://github.com/aljazceru/hfm.git
synced 2025-12-16 23:24:21 +01:00
900 lines
36 KiB
Python
Executable File
900 lines
36 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Hetzner Robot Firewall Manager
|
|
Complete solution for managing Hetzner server firewalls via Robot API
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
|
|
class HetznerFirewallManager:
|
|
"""Manage Hetzner Robot Firewall configurations"""
|
|
|
|
def __init__(self, config_file: str = "firewall_config.json"):
|
|
self.config_file = Path(config_file)
|
|
self.base_url = "https://robot-ws.your-server.de"
|
|
|
|
# Get credentials from environment
|
|
self.username = os.getenv('HETZNER_USER')
|
|
self.password = os.getenv('HETZNER_PASS')
|
|
|
|
if not self.username or not self.password:
|
|
raise ValueError("Please set HETZNER_USER and HETZNER_PASS in .env file")
|
|
|
|
# Setup session
|
|
self.auth = HTTPBasicAuth(self.username, self.password)
|
|
self.session = requests.Session()
|
|
self.session.auth = self.auth
|
|
self.session.headers.update({
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
})
|
|
|
|
# Load or create config
|
|
self.config = self.load_config()
|
|
|
|
def load_config(self) -> Dict:
|
|
"""Load configuration from file"""
|
|
if self.config_file.exists():
|
|
with open(self.config_file, 'r') as f:
|
|
return json.load(f)
|
|
return {"profiles": {}}
|
|
|
|
def save_config(self):
|
|
"""Save configuration to file with backup"""
|
|
# Create backup if config file exists
|
|
if self.config_file.exists():
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_file = self.config_file.parent / f"firewall_config_backup_{timestamp}.json"
|
|
|
|
# Keep only last 5 backups
|
|
backups = sorted(self.config_file.parent.glob("firewall_config_backup_*.json"))
|
|
if len(backups) >= 5:
|
|
for old_backup in backups[:-4]:
|
|
old_backup.unlink()
|
|
|
|
# Create new backup
|
|
with open(self.config_file, 'r') as src:
|
|
with open(backup_file, 'w') as dst:
|
|
dst.write(src.read())
|
|
|
|
# Save new config
|
|
with open(self.config_file, 'w') as f:
|
|
json.dump(self.config, f, indent=2)
|
|
|
|
def get_current_public_ip(self) -> Optional[str]:
|
|
"""Get the current public IP address"""
|
|
services = [
|
|
"https://ipinfo.io/ip",
|
|
"https://ifconfig.me",
|
|
"https://api.ipify.org",
|
|
"https://icanhazip.com"
|
|
]
|
|
|
|
for service in services:
|
|
try:
|
|
response = requests.get(service, timeout=5)
|
|
if response.status_code == 200:
|
|
ip = response.text.strip()
|
|
return ip
|
|
except Exception:
|
|
continue
|
|
|
|
return None
|
|
|
|
def get_all_servers(self) -> List[Dict]:
|
|
"""Get all servers from Hetzner API"""
|
|
try:
|
|
response = self.session.get(f"{self.base_url}/server")
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Extract servers from the response
|
|
servers = []
|
|
for item in data:
|
|
if isinstance(item, dict) and 'server' in item:
|
|
servers.append(item['server'])
|
|
|
|
return servers
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to get servers: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def get_firewall(self, server_ip: str) -> Optional[Dict]:
|
|
"""Get current firewall configuration for a server"""
|
|
try:
|
|
response = self.session.get(f"{self.base_url}/firewall/{server_ip}")
|
|
|
|
if response.status_code == 404:
|
|
return None
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to get firewall for {server_ip}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
def update_firewall(self, server_ip: str, rules: Dict[str, Any]) -> bool:
|
|
"""Update firewall configuration for a server with retry logic for FIREWALL_IN_PROCESS"""
|
|
# Build form data (URL-encoded format)
|
|
data = {
|
|
'filter_ipv6': str(rules.get('filter_ipv6', False)).lower(),
|
|
'whitelist_hos': str(rules.get('whitelist_hos', True)).lower(),
|
|
}
|
|
|
|
# Encode input rules
|
|
for i, rule in enumerate(rules.get('input', [])):
|
|
for key, value in rule.items():
|
|
if value is None:
|
|
continue
|
|
form_key = f'rules[input][{i}][{key}]'
|
|
if isinstance(value, bool):
|
|
data[form_key] = str(value).lower()
|
|
else:
|
|
data[form_key] = str(value)
|
|
|
|
# Encode output rules
|
|
for i, rule in enumerate(rules.get('output', [])):
|
|
for key, value in rule.items():
|
|
if value is None:
|
|
continue
|
|
form_key = f'rules[output][{i}][{key}]'
|
|
if isinstance(value, bool):
|
|
data[form_key] = str(value).lower()
|
|
else:
|
|
data[form_key] = str(value)
|
|
|
|
# Retry logic with exponential backoff
|
|
max_retries = 6 # Total wait time: ~63 seconds (1+2+4+8+16+32)
|
|
retry_delay = 1
|
|
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
response = self.session.post(
|
|
f"{self.base_url}/firewall/{server_ip}",
|
|
data=data,
|
|
headers={'Content-Type': 'application/x-www-form-urlencoded'}
|
|
)
|
|
|
|
if response.status_code in [200, 202]:
|
|
return True
|
|
elif response.status_code == 409:
|
|
# Check if it's a FIREWALL_IN_PROCESS error
|
|
try:
|
|
error_data = response.json()
|
|
if error_data.get('error', {}).get('code') == 'FIREWALL_IN_PROCESS':
|
|
if attempt < max_retries:
|
|
print(f" Firewall update in progress, waiting {retry_delay}s before retry (attempt {attempt + 1}/{max_retries})...")
|
|
time.sleep(retry_delay)
|
|
retry_delay *= 2 # Exponential backoff
|
|
continue
|
|
else:
|
|
print(f" Firewall still busy after {max_retries} retries")
|
|
return False
|
|
except:
|
|
pass
|
|
|
|
print(f"Failed with status {response.status_code}: {response.text[:200]}")
|
|
return False
|
|
else:
|
|
print(f"Failed with status {response.status_code}: {response.text[:200]}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error updating firewall: {e}")
|
|
return False
|
|
|
|
return False
|
|
|
|
def add_ip_to_server(self, server_ip: str, ip_to_add: str, comment: str = "") -> bool:
|
|
"""Add an IP to a server's firewall"""
|
|
# ALWAYS get fresh firewall configuration from API
|
|
print(f" Fetching current firewall configuration...")
|
|
firewall_response = self.get_firewall(server_ip)
|
|
if not firewall_response or 'firewall' not in firewall_response:
|
|
print(f" WARNING: No firewall configured on server {server_ip}")
|
|
return False
|
|
|
|
current_firewall = firewall_response['firewall']
|
|
current_rules = current_firewall.get('rules', {"input": [], "output": []})
|
|
|
|
# Normalize IP format
|
|
if '/' not in ip_to_add:
|
|
ip_to_add = f"{ip_to_add}/32"
|
|
|
|
# Check if IP already exists
|
|
for rule in current_rules['input']:
|
|
if rule.get('src_ip') and ip_to_add in rule['src_ip']:
|
|
print(f" INFO: IP {ip_to_add} already exists")
|
|
return True
|
|
|
|
# Add new rule
|
|
new_rule = {
|
|
"ip_version": "ipv4",
|
|
"name": comment[:50] if comment else f"Allow {ip_to_add}",
|
|
"dst_ip": None,
|
|
"src_ip": ip_to_add,
|
|
"dst_port": None,
|
|
"src_port": None,
|
|
"protocol": "tcp",
|
|
"tcp_flags": None,
|
|
"action": "accept"
|
|
}
|
|
|
|
# Insert at beginning
|
|
current_rules['input'].insert(0, new_rule)
|
|
|
|
# Clean output rules - remove ALL 'Block mail ports' rules and deduplicate
|
|
cleaned_output = []
|
|
seen_rules = set()
|
|
for rule in current_rules.get('output', []):
|
|
# Skip ALL 'Block mail ports' rules - Hetzner blocks these ports by default
|
|
if rule.get('name') == 'Block mail ports':
|
|
continue
|
|
|
|
# Create unique key for deduplication
|
|
rule_key = (
|
|
rule.get('name'),
|
|
rule.get('ip_version'),
|
|
rule.get('protocol'),
|
|
rule.get('src_ip'),
|
|
rule.get('dst_ip'),
|
|
rule.get('src_port'),
|
|
rule.get('dst_port'),
|
|
rule.get('action')
|
|
)
|
|
|
|
if rule_key not in seen_rules:
|
|
seen_rules.add(rule_key)
|
|
cleaned_output.append(rule)
|
|
|
|
# Update firewall with cleaned output rules
|
|
update_data = {
|
|
'filter_ipv6': current_firewall.get('filter_ipv6', False),
|
|
'whitelist_hos': current_firewall.get('whitelist_hos', True),
|
|
'input': current_rules['input'],
|
|
'output': cleaned_output # Use cleaned and deduplicated output rules
|
|
}
|
|
|
|
if self.update_firewall(server_ip, update_data):
|
|
print(f" ACCEPTED: Request to add {ip_to_add} was accepted (processing...)")
|
|
return True
|
|
else:
|
|
print(f" FAILED: Failed to add {ip_to_add}")
|
|
return False
|
|
|
|
def remove_ip_from_server(self, server_ip: str, ip_to_remove: str) -> bool:
|
|
"""Remove an IP from a server's firewall"""
|
|
# ALWAYS get fresh firewall configuration from API
|
|
print(f" Fetching current firewall configuration...")
|
|
firewall_response = self.get_firewall(server_ip)
|
|
if not firewall_response or 'firewall' not in firewall_response:
|
|
print(f" WARNING: No firewall configured on server {server_ip}")
|
|
return False
|
|
|
|
current_firewall = firewall_response['firewall']
|
|
current_rules = current_firewall.get('rules', {"input": [], "output": []})
|
|
|
|
# Normalize IP format
|
|
if '/' not in ip_to_remove:
|
|
ip_to_remove = f"{ip_to_remove}/32"
|
|
|
|
# Find and remove the IP
|
|
original_count = len(current_rules['input'])
|
|
current_rules['input'] = [
|
|
rule for rule in current_rules['input']
|
|
if not (rule.get('src_ip') and ip_to_remove in rule['src_ip'])
|
|
]
|
|
|
|
if len(current_rules['input']) == original_count:
|
|
print(f" WARNING: IP {ip_to_remove} not found")
|
|
return False
|
|
|
|
# Clean output rules - remove ALL 'Block mail ports' rules and deduplicate
|
|
cleaned_output = []
|
|
seen_rules = set()
|
|
for rule in current_rules.get('output', []):
|
|
# Skip ALL 'Block mail ports' rules - Hetzner blocks these ports by default
|
|
if rule.get('name') == 'Block mail ports':
|
|
continue
|
|
|
|
# Create unique key for deduplication
|
|
rule_key = (
|
|
rule.get('name'),
|
|
rule.get('ip_version'),
|
|
rule.get('protocol'),
|
|
rule.get('src_ip'),
|
|
rule.get('dst_ip'),
|
|
rule.get('src_port'),
|
|
rule.get('dst_port'),
|
|
rule.get('action')
|
|
)
|
|
|
|
if rule_key not in seen_rules:
|
|
seen_rules.add(rule_key)
|
|
cleaned_output.append(rule)
|
|
|
|
# Update firewall with cleaned output rules
|
|
update_data = {
|
|
'filter_ipv6': current_firewall.get('filter_ipv6', False),
|
|
'whitelist_hos': current_firewall.get('whitelist_hos', True),
|
|
'input': current_rules['input'],
|
|
'output': cleaned_output # Use cleaned and deduplicated output rules
|
|
}
|
|
|
|
if self.update_firewall(server_ip, update_data):
|
|
print(f" ACCEPTED: Request to remove {ip_to_remove} was accepted (processing...)")
|
|
return True
|
|
else:
|
|
print(f" FAILED: Failed to remove {ip_to_remove}")
|
|
return False
|
|
|
|
def verify_ip_on_server(self, server_ip: str, ip_to_check: str) -> bool:
|
|
"""Verify if an IP is whitelisted on a server"""
|
|
firewall_response = self.get_firewall(server_ip)
|
|
if not firewall_response or 'firewall' not in firewall_response:
|
|
return False
|
|
|
|
firewall = firewall_response['firewall']
|
|
rules = firewall.get('rules', {})
|
|
input_rules = rules.get('input', [])
|
|
|
|
# Normalize IP for comparison
|
|
if '/' not in ip_to_check:
|
|
ip_to_check_with_mask = f"{ip_to_check}/32"
|
|
else:
|
|
ip_to_check_with_mask = ip_to_check
|
|
|
|
# Check if IP is in any rule
|
|
for rule in input_rules:
|
|
if rule.get('action') == 'accept' and rule.get('src_ip'):
|
|
src_ip = rule['src_ip']
|
|
if ip_to_check in src_ip or ip_to_check_with_mask == src_ip:
|
|
return True
|
|
|
|
return False
|
|
|
|
def bootstrap_from_api(self) -> int:
|
|
"""Import existing firewall configurations from Hetzner API"""
|
|
print("Fetching servers from Hetzner API...")
|
|
servers = self.get_all_servers()
|
|
|
|
if not servers:
|
|
print("No servers found")
|
|
return 0
|
|
|
|
print(f"Found {len(servers)} server(s)")
|
|
imported_count = 0
|
|
|
|
for server in servers:
|
|
server_ip = server.get('server_ip')
|
|
server_name = server.get('server_name', '')
|
|
server_number = server.get('server_number')
|
|
|
|
if not server_ip:
|
|
continue
|
|
|
|
print(f"\nServer: {server_name or server_number} ({server_ip})")
|
|
|
|
# Get firewall configuration
|
|
firewall_response = self.get_firewall(server_ip)
|
|
if not firewall_response or 'firewall' not in firewall_response:
|
|
print(" ⚠ No firewall configured")
|
|
continue
|
|
|
|
firewall = firewall_response['firewall']
|
|
|
|
# Create profile name
|
|
if server_name:
|
|
profile_name = server_name.lower().replace(' ', '-').replace('.', '-')
|
|
else:
|
|
profile_name = f"server-{server_number}"
|
|
|
|
# Make sure profile name is unique
|
|
base_name = profile_name
|
|
counter = 1
|
|
while profile_name in self.config.get("profiles", {}):
|
|
profile_name = f"{base_name}-{counter}"
|
|
counter += 1
|
|
|
|
# Extract whitelisted IPs
|
|
permanent_whitelist = []
|
|
rules = firewall.get('rules', {})
|
|
input_rules = rules.get('input', [])
|
|
|
|
for rule in input_rules:
|
|
if rule.get('action') == 'accept' and rule.get('src_ip'):
|
|
src_ip = rule.get('src_ip')
|
|
rule_name = rule.get('name', '')
|
|
dst_port = rule.get('dst_port')
|
|
|
|
# Check if this IP already exists in whitelist
|
|
existing_entry = None
|
|
for entry in permanent_whitelist:
|
|
if entry['ip'] == src_ip:
|
|
existing_entry = entry
|
|
break
|
|
|
|
if existing_entry:
|
|
if dst_port and dst_port not in existing_entry['ports']:
|
|
existing_entry['ports'].append(dst_port)
|
|
else:
|
|
entry = {
|
|
'ip': src_ip,
|
|
'ports': [dst_port] if dst_port else [],
|
|
'comment': rule_name[:50] if rule_name else ''
|
|
}
|
|
permanent_whitelist.append(entry)
|
|
|
|
# Create profile configuration
|
|
profile_config = {
|
|
'server_ip': server_ip,
|
|
'server_name': server_name or f"Server {server_number}",
|
|
'permanent_whitelist': permanent_whitelist,
|
|
'filter_ipv6': firewall.get('filter_ipv6', False),
|
|
'whitelist_hos': firewall.get('whitelist_hos', True)
|
|
}
|
|
|
|
# Add profile to configuration
|
|
self.config.setdefault("profiles", {})[profile_name] = profile_config
|
|
|
|
print(f" Imported as profile '{profile_name}'")
|
|
print(f" - {len(permanent_whitelist)} whitelisted IPs")
|
|
imported_count += 1
|
|
|
|
# Save configuration
|
|
self.save_config()
|
|
print(f"\nImported {imported_count} server configuration(s)")
|
|
return imported_count
|
|
|
|
def whitelist_current_ip(self, comment: str = "Current location", verify: bool = False) -> int:
|
|
"""Add current public IP to all servers"""
|
|
# Get current IP
|
|
print("Getting current public IP...")
|
|
current_ip = self.get_current_public_ip()
|
|
|
|
if not current_ip:
|
|
print("Failed to determine current public IP!")
|
|
return 0
|
|
|
|
print(f"Current public IP: {current_ip}\n")
|
|
|
|
profiles = self.config.get("profiles", {})
|
|
if not profiles:
|
|
print("No profiles found. Run 'bootstrap' first.")
|
|
return 0
|
|
|
|
print(f"Found {len(profiles)} profile(s): {', '.join(profiles.keys())}")
|
|
print("\n" + "=" * 60)
|
|
print("Adding IP to all servers...")
|
|
print("=" * 60 + "\n")
|
|
|
|
success_count = 0
|
|
|
|
for profile_name, profile_config in profiles.items():
|
|
server_ip = profile_config.get("server_ip")
|
|
server_name = profile_config.get("server_name", "")
|
|
|
|
if not server_ip:
|
|
print(f"\n⚠ Profile '{profile_name}' has no server IP")
|
|
continue
|
|
|
|
print(f"\nProfile: {profile_name}")
|
|
print(f"Server: {server_name} ({server_ip})")
|
|
|
|
if self.add_ip_to_server(server_ip, current_ip, comment):
|
|
success_count += 1
|
|
|
|
# Update local config
|
|
ip_entry = {
|
|
"ip": f"{current_ip}/32" if '/' not in current_ip else current_ip,
|
|
"ports": [],
|
|
"comment": comment
|
|
}
|
|
|
|
# Check if IP already in config
|
|
already_in_config = False
|
|
for entry in profile_config.get("permanent_whitelist", []):
|
|
if isinstance(entry, dict) and current_ip in entry.get("ip", ""):
|
|
already_in_config = True
|
|
break
|
|
|
|
if not already_in_config:
|
|
profile_config.setdefault("permanent_whitelist", []).append(ip_entry)
|
|
|
|
# Verify if requested
|
|
if verify:
|
|
print(" Verifying change was applied...")
|
|
max_attempts = 12 # Try for up to 60 seconds (12 * 5)
|
|
verified = False
|
|
|
|
for attempt in range(max_attempts):
|
|
time.sleep(5)
|
|
if self.verify_ip_on_server(server_ip, current_ip):
|
|
print(f" VERIFIED: {current_ip} is whitelisted (took {(attempt + 1) * 5} seconds)")
|
|
verified = True
|
|
break
|
|
else:
|
|
if attempt < max_attempts - 1:
|
|
print(f" Still applying... waiting ({(attempt + 1) * 5}s elapsed)")
|
|
|
|
if not verified:
|
|
print(f" WARNING: {current_ip} not verified after 60 seconds - may still be applying")
|
|
|
|
# Save updated config
|
|
self.save_config()
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("Summary")
|
|
print("=" * 60)
|
|
print(f"\nSuccessfully updated {success_count}/{len(profiles)} server(s)")
|
|
|
|
return success_count
|
|
|
|
def remove_current_ip(self, verify: bool = False) -> int:
|
|
"""Remove current public IP from all servers"""
|
|
print("Getting current public IP...")
|
|
current_ip = self.get_current_public_ip()
|
|
|
|
if not current_ip:
|
|
print("Failed to get current public IP")
|
|
return 0
|
|
|
|
print(f"Current public IP: {current_ip}")
|
|
|
|
profiles = self.config.get("profiles", {})
|
|
if not profiles:
|
|
print("No profiles configured. Run 'bootstrap' to import from API.")
|
|
return 0
|
|
|
|
print(f"\nFound {len(profiles)} profile(s): {', '.join(profiles.keys())}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Removing IP from all servers...")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
success_count = 0
|
|
for profile_name, profile_config in profiles.items():
|
|
server_ip = profile_config.get("server_ip")
|
|
server_name = profile_config.get("server_name", "")
|
|
|
|
if not server_ip:
|
|
print(f"\nWARNING: Profile '{profile_name}' has no server IP")
|
|
continue
|
|
|
|
print(f"\nProfile: {profile_name}")
|
|
print(f"Server: {server_name} ({server_ip})")
|
|
|
|
if self.remove_ip_from_server(server_ip, current_ip):
|
|
success_count += 1
|
|
|
|
# Update local config - remove from permanent_whitelist
|
|
whitelist = profile_config.get("permanent_whitelist", [])
|
|
updated_whitelist = []
|
|
for entry in whitelist:
|
|
if isinstance(entry, dict):
|
|
entry_ip = entry.get("ip", "")
|
|
if current_ip not in entry_ip:
|
|
updated_whitelist.append(entry)
|
|
else:
|
|
# Handle string entries
|
|
if current_ip not in str(entry):
|
|
updated_whitelist.append(entry)
|
|
|
|
profile_config["permanent_whitelist"] = updated_whitelist
|
|
|
|
# Verify if requested
|
|
if verify:
|
|
print(" Verifying change was applied...")
|
|
max_attempts = 12 # Try for up to 60 seconds (12 * 5)
|
|
verified = False
|
|
|
|
for attempt in range(max_attempts):
|
|
time.sleep(5)
|
|
if not self.verify_ip_on_server(server_ip, current_ip):
|
|
print(f" VERIFIED: {current_ip} has been removed (took {(attempt + 1) * 5} seconds)")
|
|
verified = True
|
|
break
|
|
else:
|
|
if attempt < max_attempts - 1:
|
|
print(f" Still applying... waiting ({(attempt + 1) * 5}s elapsed)")
|
|
|
|
if not verified:
|
|
print(f" WARNING: {current_ip} still present after 60 seconds - may still be removing")
|
|
|
|
# Save updated config
|
|
self.save_config()
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("Summary")
|
|
print("=" * 60)
|
|
print(f"\nSuccessfully removed IP from {success_count}/{len(profiles)} server(s)")
|
|
|
|
return success_count
|
|
|
|
def list_profiles(self):
|
|
"""List all configured profiles"""
|
|
profiles = self.config.get("profiles", {})
|
|
|
|
if not profiles:
|
|
print("No profiles configured. Run 'bootstrap' to import from API.")
|
|
return
|
|
|
|
print("\nConfigured Profiles:")
|
|
print("-" * 60)
|
|
|
|
for profile_name, profile_config in profiles.items():
|
|
server_ip = profile_config.get("server_ip", "Not configured")
|
|
server_name = profile_config.get("server_name", "")
|
|
whitelist_count = len(profile_config.get("permanent_whitelist", []))
|
|
|
|
print(f"\nProfile: {profile_name}")
|
|
print(f" Server: {server_name} ({server_ip})")
|
|
print(f" Whitelisted IPs: {whitelist_count}")
|
|
|
|
def list_ips(self, profile: Optional[str] = None, from_api: bool = True):
|
|
"""List whitelisted IPs for a profile or all profiles"""
|
|
profiles = self.config.get("profiles", {})
|
|
|
|
if profile:
|
|
if profile not in profiles:
|
|
print(f"Profile '{profile}' not found")
|
|
return
|
|
profiles_to_show = {profile: profiles[profile]}
|
|
else:
|
|
profiles_to_show = profiles
|
|
|
|
for profile_name, profile_config in profiles_to_show.items():
|
|
server_ip = profile_config.get("server_ip")
|
|
server_name = profile_config.get("server_name", "")
|
|
|
|
print(f"\nProfile: {profile_name}")
|
|
print(f"Server: {server_name} ({server_ip})")
|
|
print("-" * 40)
|
|
|
|
if not server_ip:
|
|
print(" ⚠ No server IP configured")
|
|
continue
|
|
|
|
if from_api:
|
|
# Get current rules from API
|
|
print(" Fetching from API...")
|
|
firewall_response = self.get_firewall(server_ip)
|
|
|
|
if not firewall_response or 'firewall' not in firewall_response:
|
|
print(" ⚠ No firewall configured on server")
|
|
continue
|
|
|
|
firewall = firewall_response['firewall']
|
|
rules = firewall.get('rules', {})
|
|
input_rules = rules.get('input', [])
|
|
|
|
# Group IPs and their rules
|
|
ip_rules = {}
|
|
for rule in input_rules:
|
|
if rule.get('action') == 'accept' and rule.get('src_ip'):
|
|
src_ip = rule.get('src_ip')
|
|
rule_name = rule.get('name', '')
|
|
dst_port = rule.get('dst_port')
|
|
|
|
if src_ip not in ip_rules:
|
|
ip_rules[src_ip] = {
|
|
'names': [],
|
|
'ports': []
|
|
}
|
|
|
|
if rule_name and rule_name not in ip_rules[src_ip]['names']:
|
|
ip_rules[src_ip]['names'].append(rule_name)
|
|
|
|
if dst_port and dst_port not in ip_rules[src_ip]['ports']:
|
|
ip_rules[src_ip]['ports'].append(dst_port)
|
|
|
|
if not ip_rules:
|
|
print(" No whitelisted IPs found")
|
|
else:
|
|
print(f" Found {len(ip_rules)} whitelisted IP(s):\n")
|
|
for ip, info in sorted(ip_rules.items()):
|
|
# Build description
|
|
if info['ports']:
|
|
port_str = f" (ports: {', '.join(info['ports'])})"
|
|
else:
|
|
port_str = " (all TCP)"
|
|
|
|
if info['names']:
|
|
# Use the first meaningful name
|
|
name = next((n for n in info['names'] if n and not n.startswith('Allow')), info['names'][0])
|
|
print(f" - {ip}{port_str} - {name}")
|
|
else:
|
|
print(f" - {ip}{port_str}")
|
|
|
|
# Also show rules that allow specific ports from any IP
|
|
port_rules = []
|
|
for rule in input_rules:
|
|
if rule.get('action') == 'accept' and not rule.get('src_ip') and rule.get('dst_port'):
|
|
port_rules.append(rule)
|
|
|
|
if port_rules:
|
|
print(f"\n Open ports (any IP):")
|
|
for rule in port_rules:
|
|
port = rule.get('dst_port', '')
|
|
name = rule.get('name', '')
|
|
if name:
|
|
print(f" - Port {port} - {name}")
|
|
else:
|
|
print(f" - Port {port}")
|
|
else:
|
|
# Use local config (old behavior)
|
|
whitelist = profile_config.get("permanent_whitelist", [])
|
|
if not whitelist:
|
|
print(" No whitelisted IPs in local config")
|
|
continue
|
|
|
|
for entry in whitelist:
|
|
if isinstance(entry, dict):
|
|
ip = entry.get("ip", "")
|
|
comment = entry.get("comment", "")
|
|
ports = entry.get("ports", [])
|
|
|
|
if ports:
|
|
port_str = f" (ports: {', '.join(map(str, ports))})"
|
|
else:
|
|
port_str = " (all TCP)"
|
|
|
|
if comment:
|
|
print(f" • {ip}{port_str} - {comment}")
|
|
else:
|
|
print(f" - {ip}{port_str}")
|
|
else:
|
|
print(f" - {entry}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Hetzner Robot Firewall Manager",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s bootstrap # Import existing configurations from API
|
|
%(prog)s whitelist-current # Add current IP to all servers
|
|
%(prog)s add 1.2.3.4 --comment "Office" # Add specific IP to all servers
|
|
%(prog)s remove 1.2.3.4 # Remove IP from all servers
|
|
%(prog)s list # List all profiles
|
|
%(prog)s list-ips # List all whitelisted IPs
|
|
"""
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(dest='command', help='Commands')
|
|
|
|
# Bootstrap command
|
|
subparsers.add_parser('bootstrap', help='Import existing firewall configurations from API')
|
|
|
|
# Whitelist current IP
|
|
whitelist_parser = subparsers.add_parser('whitelist-current', help='Add current public IP to all servers')
|
|
whitelist_parser.add_argument('--comment', '-c', default='Current location', help='Comment for the IP')
|
|
whitelist_parser.add_argument('--verify', '-v', action='store_true', help='Verify after adding')
|
|
|
|
# Remove current IP
|
|
remove_current_parser = subparsers.add_parser('remove-current', help='Remove current public IP from all servers')
|
|
remove_current_parser.add_argument('--verify', '-v', action='store_true', help='Verify after removing')
|
|
|
|
# Add IP
|
|
add_parser = subparsers.add_parser('add', help='Add an IP to all servers')
|
|
add_parser.add_argument('ip', help='IP address to add')
|
|
add_parser.add_argument('--comment', '-c', default='', help='Comment for the IP')
|
|
add_parser.add_argument('--profile', '-p', help='Specific profile (default: all)')
|
|
|
|
# Remove IP
|
|
remove_parser = subparsers.add_parser('remove', help='Remove an IP from all servers')
|
|
remove_parser.add_argument('ip', help='IP address to remove')
|
|
remove_parser.add_argument('--profile', '-p', help='Specific profile (default: all)')
|
|
|
|
# List profiles
|
|
subparsers.add_parser('list', help='List all configured profiles')
|
|
|
|
# List IPs
|
|
list_ips_parser = subparsers.add_parser('list-ips', help='List whitelisted IPs from API')
|
|
list_ips_parser.add_argument('--profile', '-p', help='Specific profile (default: all)')
|
|
list_ips_parser.add_argument('--local', action='store_true', help='Use local config instead of API')
|
|
|
|
# Verify IP
|
|
verify_parser = subparsers.add_parser('verify', help='Verify if an IP is whitelisted')
|
|
verify_parser.add_argument('ip', help='IP address to verify')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
return
|
|
|
|
try:
|
|
manager = HetznerFirewallManager()
|
|
except ValueError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Execute commands
|
|
if args.command == 'bootstrap':
|
|
manager.bootstrap_from_api()
|
|
|
|
elif args.command == 'whitelist-current':
|
|
manager.whitelist_current_ip(args.comment, args.verify)
|
|
|
|
elif args.command == 'remove-current':
|
|
manager.remove_current_ip(args.verify)
|
|
|
|
elif args.command == 'add':
|
|
profiles = manager.config.get("profiles", {})
|
|
if args.profile:
|
|
if args.profile not in profiles:
|
|
print(f"Profile '{args.profile}' not found")
|
|
return
|
|
profiles_to_update = {args.profile: profiles[args.profile]}
|
|
else:
|
|
profiles_to_update = profiles
|
|
|
|
success_count = 0
|
|
for profile_name, profile_config in profiles_to_update.items():
|
|
server_ip = profile_config.get("server_ip")
|
|
if server_ip:
|
|
print(f"\nProfile: {profile_name}")
|
|
if manager.add_ip_to_server(server_ip, args.ip, args.comment):
|
|
success_count += 1
|
|
|
|
print(f"\nAdded IP to {success_count}/{len(profiles_to_update)} server(s)")
|
|
|
|
elif args.command == 'remove':
|
|
profiles = manager.config.get("profiles", {})
|
|
if args.profile:
|
|
if args.profile not in profiles:
|
|
print(f"Profile '{args.profile}' not found")
|
|
return
|
|
profiles_to_update = {args.profile: profiles[args.profile]}
|
|
else:
|
|
profiles_to_update = profiles
|
|
|
|
success_count = 0
|
|
for profile_name, profile_config in profiles_to_update.items():
|
|
server_ip = profile_config.get("server_ip")
|
|
if server_ip:
|
|
print(f"\nProfile: {profile_name}")
|
|
if manager.remove_ip_from_server(server_ip, args.ip):
|
|
success_count += 1
|
|
|
|
print(f"\nRemoved IP from {success_count}/{len(profiles_to_update)} server(s)")
|
|
|
|
elif args.command == 'list':
|
|
manager.list_profiles()
|
|
|
|
elif args.command == 'list-ips':
|
|
manager.list_ips(args.profile, from_api=not args.local)
|
|
|
|
elif args.command == 'verify':
|
|
profiles = manager.config.get("profiles", {})
|
|
print(f"\nVerifying IP {args.ip} on all servers...")
|
|
print("-" * 60)
|
|
|
|
for profile_name, profile_config in profiles.items():
|
|
server_ip = profile_config.get("server_ip")
|
|
if server_ip:
|
|
status = "[WHITELISTED]" if manager.verify_ip_on_server(server_ip, args.ip) else "[NOT FOUND]"
|
|
print(f"{profile_name}: {status}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |