verification and duplicated outgoing rules fixy

This commit is contained in:
2025-09-08 16:42:52 +02:00
parent afa80aaa01
commit 0912aec78e

118
hfm.py
View File

@@ -9,6 +9,7 @@ import json
import os
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -54,7 +55,24 @@ class HetznerFirewallManager:
return {"profiles": {}}
def save_config(self):
"""Save configuration to file"""
"""Save configuration to file with backup"""
# Create backup if config file exists
if self.config_file.exists():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = self.config_file.parent / f"firewall_config_backup_{timestamp}.json"
# Keep only last 5 backups
backups = sorted(self.config_file.parent.glob("firewall_config_backup_*.json"))
if len(backups) >= 5:
for old_backup in backups[:-4]:
old_backup.unlink()
# Create new backup
with open(self.config_file, 'r') as src:
with open(backup_file, 'w') as dst:
dst.write(src.read())
# Save new config
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
@@ -183,7 +201,8 @@ class HetznerFirewallManager:
def add_ip_to_server(self, server_ip: str, ip_to_add: str, comment: str = "") -> bool:
"""Add an IP to a server's firewall"""
# Get current firewall
# ALWAYS get fresh firewall configuration from API
print(f" Fetching current firewall configuration...")
firewall_response = self.get_firewall(server_ip)
if not firewall_response or 'firewall' not in firewall_response:
print(f" WARNING: No firewall configured on server {server_ip}")
@@ -218,16 +237,40 @@ class HetznerFirewallManager:
# Insert at beginning
current_rules['input'].insert(0, new_rule)
# Update firewall - keep output rules EXACTLY as they are
# Clean output rules - remove ALL 'Block mail ports' rules and deduplicate
cleaned_output = []
seen_rules = set()
for rule in current_rules.get('output', []):
# Skip ALL 'Block mail ports' rules - Hetzner blocks these ports by default
if rule.get('name') == 'Block mail ports':
continue
# Create unique key for deduplication
rule_key = (
rule.get('name'),
rule.get('ip_version'),
rule.get('protocol'),
rule.get('src_ip'),
rule.get('dst_ip'),
rule.get('src_port'),
rule.get('dst_port'),
rule.get('action')
)
if rule_key not in seen_rules:
seen_rules.add(rule_key)
cleaned_output.append(rule)
# Update firewall with cleaned output rules
update_data = {
'filter_ipv6': current_firewall.get('filter_ipv6', False),
'whitelist_hos': current_firewall.get('whitelist_hos', True),
'input': current_rules['input'],
'output': current_rules['output'] # Don't modify output rules at all
'output': cleaned_output # Use cleaned and deduplicated output rules
}
if self.update_firewall(server_ip, update_data):
print(f" SUCCESS: Added {ip_to_add}")
print(f" ACCEPTED: Request to add {ip_to_add} was accepted (processing...)")
return True
else:
print(f" FAILED: Failed to add {ip_to_add}")
@@ -235,7 +278,8 @@ class HetznerFirewallManager:
def remove_ip_from_server(self, server_ip: str, ip_to_remove: str) -> bool:
"""Remove an IP from a server's firewall"""
# Get current firewall
# ALWAYS get fresh firewall configuration from API
print(f" Fetching current firewall configuration...")
firewall_response = self.get_firewall(server_ip)
if not firewall_response or 'firewall' not in firewall_response:
print(f" WARNING: No firewall configured on server {server_ip}")
@@ -259,16 +303,40 @@ class HetznerFirewallManager:
print(f" WARNING: IP {ip_to_remove} not found")
return False
# Update firewall - keep output rules EXACTLY as they are
# Clean output rules - remove ALL 'Block mail ports' rules and deduplicate
cleaned_output = []
seen_rules = set()
for rule in current_rules.get('output', []):
# Skip ALL 'Block mail ports' rules - Hetzner blocks these ports by default
if rule.get('name') == 'Block mail ports':
continue
# Create unique key for deduplication
rule_key = (
rule.get('name'),
rule.get('ip_version'),
rule.get('protocol'),
rule.get('src_ip'),
rule.get('dst_ip'),
rule.get('src_port'),
rule.get('dst_port'),
rule.get('action')
)
if rule_key not in seen_rules:
seen_rules.add(rule_key)
cleaned_output.append(rule)
# Update firewall with cleaned output rules
update_data = {
'filter_ipv6': current_firewall.get('filter_ipv6', False),
'whitelist_hos': current_firewall.get('whitelist_hos', True),
'input': current_rules['input'],
'output': current_rules['output'] # Don't modify output rules at all
'output': cleaned_output # Use cleaned and deduplicated output rules
}
if self.update_firewall(server_ip, update_data):
print(f" SUCCESS: Removed {ip_to_remove}")
print(f" ACCEPTED: Request to remove {ip_to_remove} was accepted (processing...)")
return True
else:
print(f" FAILED: Failed to remove {ip_to_remove}")
@@ -449,12 +517,22 @@ class HetznerFirewallManager:
# Verify if requested
if verify:
print(" Waiting 5 seconds before verification...")
print(" Verifying change was applied...")
max_attempts = 12 # Try for up to 60 seconds (12 * 5)
verified = False
for attempt in range(max_attempts):
time.sleep(5)
if self.verify_ip_on_server(server_ip, current_ip):
print(f" VERIFIED: {current_ip} is whitelisted")
print(f" VERIFIED: {current_ip} is whitelisted (took {(attempt + 1) * 5} seconds)")
verified = True
break
else:
print(f" NOT VERIFIED: {current_ip} may not be applied yet")
if attempt < max_attempts - 1:
print(f" Still applying... waiting ({(attempt + 1) * 5}s elapsed)")
if not verified:
print(f" WARNING: {current_ip} not verified after 60 seconds - may still be applying")
# Save updated config
self.save_config()
@@ -522,12 +600,22 @@ class HetznerFirewallManager:
# Verify if requested
if verify:
print(" Waiting 5 seconds before verification...")
print(" Verifying change was applied...")
max_attempts = 12 # Try for up to 60 seconds (12 * 5)
verified = False
for attempt in range(max_attempts):
time.sleep(5)
if not self.verify_ip_on_server(server_ip, current_ip):
print(f" VERIFIED: {current_ip} has been removed")
print(f" VERIFIED: {current_ip} has been removed (took {(attempt + 1) * 5} seconds)")
verified = True
break
else:
print(f" NOT VERIFIED: {current_ip} may still be present")
if attempt < max_attempts - 1:
print(f" Still applying... waiting ({(attempt + 1) * 5}s elapsed)")
if not verified:
print(f" WARNING: {current_ip} still present after 60 seconds - may still be removing")
# Save updated config
self.save_config()