From 0912aec78e1ae318a198422214ea521b1be85c47 Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Mon, 8 Sep 2025 16:42:52 +0200 Subject: [PATCH] verification and duplicated outgoing rules fixy --- hfm.py | 130 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 109 insertions(+), 21 deletions(-) diff --git a/hfm.py b/hfm.py index 37d30c0..ed927a1 100755 --- a/hfm.py +++ b/hfm.py @@ -9,6 +9,7 @@ import json import os import sys import time +from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional @@ -54,7 +55,24 @@ class HetznerFirewallManager: return {"profiles": {}} def save_config(self): - """Save configuration to file""" + """Save configuration to file with backup""" + # Create backup if config file exists + if self.config_file.exists(): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_file = self.config_file.parent / f"firewall_config_backup_{timestamp}.json" + + # Keep only last 5 backups + backups = sorted(self.config_file.parent.glob("firewall_config_backup_*.json")) + if len(backups) >= 5: + for old_backup in backups[:-4]: + old_backup.unlink() + + # Create new backup + with open(self.config_file, 'r') as src: + with open(backup_file, 'w') as dst: + dst.write(src.read()) + + # Save new config with open(self.config_file, 'w') as f: json.dump(self.config, f, indent=2) @@ -183,7 +201,8 @@ class HetznerFirewallManager: def add_ip_to_server(self, server_ip: str, ip_to_add: str, comment: str = "") -> bool: """Add an IP to a server's firewall""" - # Get current firewall + # ALWAYS get fresh firewall configuration from API + print(f" Fetching current firewall configuration...") firewall_response = self.get_firewall(server_ip) if not firewall_response or 'firewall' not in firewall_response: print(f" WARNING: No firewall configured on server {server_ip}") @@ -218,16 +237,40 @@ class HetznerFirewallManager: # Insert at beginning current_rules['input'].insert(0, new_rule) - # Update firewall - keep output rules EXACTLY as they are + # Clean output rules - remove ALL 'Block mail ports' rules and deduplicate + cleaned_output = [] + seen_rules = set() + for rule in current_rules.get('output', []): + # Skip ALL 'Block mail ports' rules - Hetzner blocks these ports by default + if rule.get('name') == 'Block mail ports': + continue + + # Create unique key for deduplication + rule_key = ( + rule.get('name'), + rule.get('ip_version'), + rule.get('protocol'), + rule.get('src_ip'), + rule.get('dst_ip'), + rule.get('src_port'), + rule.get('dst_port'), + rule.get('action') + ) + + if rule_key not in seen_rules: + seen_rules.add(rule_key) + cleaned_output.append(rule) + + # Update firewall with cleaned output rules update_data = { 'filter_ipv6': current_firewall.get('filter_ipv6', False), 'whitelist_hos': current_firewall.get('whitelist_hos', True), 'input': current_rules['input'], - 'output': current_rules['output'] # Don't modify output rules at all + 'output': cleaned_output # Use cleaned and deduplicated output rules } if self.update_firewall(server_ip, update_data): - print(f" SUCCESS: Added {ip_to_add}") + print(f" ACCEPTED: Request to add {ip_to_add} was accepted (processing...)") return True else: print(f" FAILED: Failed to add {ip_to_add}") @@ -235,7 +278,8 @@ class HetznerFirewallManager: def remove_ip_from_server(self, server_ip: str, ip_to_remove: str) -> bool: """Remove an IP from a server's firewall""" - # Get current firewall + # ALWAYS get fresh firewall configuration from API + print(f" Fetching current firewall configuration...") firewall_response = self.get_firewall(server_ip) if not firewall_response or 'firewall' not in firewall_response: print(f" WARNING: No firewall configured on server {server_ip}") @@ -259,16 +303,40 @@ class HetznerFirewallManager: print(f" WARNING: IP {ip_to_remove} not found") return False - # Update firewall - keep output rules EXACTLY as they are + # Clean output rules - remove ALL 'Block mail ports' rules and deduplicate + cleaned_output = [] + seen_rules = set() + for rule in current_rules.get('output', []): + # Skip ALL 'Block mail ports' rules - Hetzner blocks these ports by default + if rule.get('name') == 'Block mail ports': + continue + + # Create unique key for deduplication + rule_key = ( + rule.get('name'), + rule.get('ip_version'), + rule.get('protocol'), + rule.get('src_ip'), + rule.get('dst_ip'), + rule.get('src_port'), + rule.get('dst_port'), + rule.get('action') + ) + + if rule_key not in seen_rules: + seen_rules.add(rule_key) + cleaned_output.append(rule) + + # Update firewall with cleaned output rules update_data = { 'filter_ipv6': current_firewall.get('filter_ipv6', False), 'whitelist_hos': current_firewall.get('whitelist_hos', True), 'input': current_rules['input'], - 'output': current_rules['output'] # Don't modify output rules at all + 'output': cleaned_output # Use cleaned and deduplicated output rules } if self.update_firewall(server_ip, update_data): - print(f" SUCCESS: Removed {ip_to_remove}") + print(f" ACCEPTED: Request to remove {ip_to_remove} was accepted (processing...)") return True else: print(f" FAILED: Failed to remove {ip_to_remove}") @@ -449,12 +517,22 @@ class HetznerFirewallManager: # Verify if requested if verify: - print(" Waiting 5 seconds before verification...") - time.sleep(5) - if self.verify_ip_on_server(server_ip, current_ip): - print(f" VERIFIED: {current_ip} is whitelisted") - else: - print(f" NOT VERIFIED: {current_ip} may not be applied yet") + print(" Verifying change was applied...") + max_attempts = 12 # Try for up to 60 seconds (12 * 5) + verified = False + + for attempt in range(max_attempts): + time.sleep(5) + if self.verify_ip_on_server(server_ip, current_ip): + print(f" VERIFIED: {current_ip} is whitelisted (took {(attempt + 1) * 5} seconds)") + verified = True + break + else: + if attempt < max_attempts - 1: + print(f" Still applying... waiting ({(attempt + 1) * 5}s elapsed)") + + if not verified: + print(f" WARNING: {current_ip} not verified after 60 seconds - may still be applying") # Save updated config self.save_config() @@ -522,12 +600,22 @@ class HetznerFirewallManager: # Verify if requested if verify: - print(" Waiting 5 seconds before verification...") - time.sleep(5) - if not self.verify_ip_on_server(server_ip, current_ip): - print(f" VERIFIED: {current_ip} has been removed") - else: - print(f" NOT VERIFIED: {current_ip} may still be present") + print(" Verifying change was applied...") + max_attempts = 12 # Try for up to 60 seconds (12 * 5) + verified = False + + for attempt in range(max_attempts): + time.sleep(5) + if not self.verify_ip_on_server(server_ip, current_ip): + print(f" VERIFIED: {current_ip} has been removed (took {(attempt + 1) * 5} seconds)") + verified = True + break + else: + if attempt < max_attempts - 1: + print(f" Still applying... waiting ({(attempt + 1) * 5}s elapsed)") + + if not verified: + print(f" WARNING: {current_ip} still present after 60 seconds - may still be removing") # Save updated config self.save_config()