mirror of
https://github.com/aljazceru/lnflow.git
synced 2026-02-11 23:14:22 +01:00
better logging
This commit is contained in:
@@ -13,6 +13,10 @@ dependencies = [
|
||||
"numpy>=1.24.0",
|
||||
"rich>=13.0.0",
|
||||
"python-dotenv>=1.0.0",
|
||||
"tabulate>=0.9.0",
|
||||
"scipy>=1.10.0",
|
||||
"grpcio>=1.50.0",
|
||||
"grpcio-tools>=1.50.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -6,4 +6,6 @@ numpy>=1.24.0
|
||||
rich>=13.0.0
|
||||
python-dotenv>=1.0.0
|
||||
tabulate>=0.9.0
|
||||
scipy>=1.10.0
|
||||
scipy>=1.10.0
|
||||
grpcio>=1.50.0
|
||||
grpcio-tools>=1.50.0
|
||||
@@ -434,11 +434,32 @@ class AsyncLNDgRPCClient:
|
||||
return await loop.run_in_executor(None, self.sync_client.list_channels)
|
||||
|
||||
async def update_channel_policy(self, *args, **kwargs):
|
||||
"""Async version of update_channel_policy"""
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(
|
||||
None, self.sync_client.update_channel_policy, *args, **kwargs
|
||||
"""Async version of update_channel_policy with enhanced logging"""
|
||||
logger.debug(
|
||||
f"gRPC update_channel_policy called with:\n"
|
||||
f" Args: {args}\n"
|
||||
f" Kwargs: {kwargs}"
|
||||
)
|
||||
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
# Fix: Use lambda to properly pass kwargs to run_in_executor
|
||||
result = await loop.run_in_executor(
|
||||
None, lambda: self.sync_client.update_channel_policy(*args, **kwargs)
|
||||
)
|
||||
|
||||
logger.debug(f"gRPC update_channel_policy succeeded: {result}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"gRPC update_channel_policy failed:\n"
|
||||
f" Error: {str(e)}\n"
|
||||
f" Exception Type: {type(e).__name__}\n"
|
||||
f" Args: {args}\n"
|
||||
f" Kwargs: {kwargs}"
|
||||
)
|
||||
raise
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
@@ -304,58 +304,99 @@ class PolicyEngine:
|
||||
def match_channel(self, channel_data: Dict[str, Any]) -> List[PolicyRule]:
|
||||
"""Find matching policies for a channel"""
|
||||
matching_rules = []
|
||||
channel_id = channel_data.get('channel_id', 'unknown')
|
||||
|
||||
logger.debug(f"Evaluating policies for channel {channel_id}:")
|
||||
logger.debug(f" Channel Data: capacity={channel_data.get('capacity', 0):,}, "
|
||||
f"balance_ratio={channel_data.get('local_balance_ratio', 0.5):.2%}, "
|
||||
f"activity={channel_data.get('activity_level', 'unknown')}")
|
||||
|
||||
for rule in self.rules:
|
||||
if not rule.enabled:
|
||||
logger.debug(f" Skipping disabled policy: {rule.name}")
|
||||
continue
|
||||
|
||||
if self._channel_matches(channel_data, rule.matcher):
|
||||
matching_rules.append(rule)
|
||||
logger.debug(f" ✓ MATCHED policy: {rule.name} (priority {rule.priority})")
|
||||
|
||||
# Stop if this is a final policy
|
||||
if rule.policy.policy_type == PolicyType.FINAL:
|
||||
logger.debug(f" Stopping at final policy: {rule.name}")
|
||||
break
|
||||
else:
|
||||
logger.debug(f" ✗ SKIPPED policy: {rule.name}")
|
||||
|
||||
logger.debug(f" Final matches for {channel_id}: {[r.name for r in matching_rules]}")
|
||||
return matching_rules
|
||||
|
||||
def _channel_matches(self, channel_data: Dict[str, Any], matcher: PolicyMatcher) -> bool:
|
||||
"""Check if channel matches policy criteria"""
|
||||
"""Check if channel matches policy criteria with detailed debug logging"""
|
||||
|
||||
# Channel ID matching
|
||||
if matcher.chan_id and channel_data.get('channel_id') not in matcher.chan_id:
|
||||
return False
|
||||
if matcher.chan_id:
|
||||
channel_id = channel_data.get('channel_id', '')
|
||||
if channel_id not in matcher.chan_id:
|
||||
logger.debug(f" ✗ Channel ID mismatch: {channel_id} not in {matcher.chan_id}")
|
||||
return False
|
||||
logger.debug(f" ✓ Channel ID matches: {channel_id}")
|
||||
|
||||
# Capacity matching
|
||||
capacity = channel_data.get('capacity', 0)
|
||||
if matcher.chan_capacity_min and capacity < matcher.chan_capacity_min:
|
||||
return False
|
||||
if matcher.chan_capacity_max and capacity > matcher.chan_capacity_max:
|
||||
return False
|
||||
if matcher.chan_capacity_min:
|
||||
if capacity < matcher.chan_capacity_min:
|
||||
logger.debug(f" ✗ Capacity too small: {capacity:,} < {matcher.chan_capacity_min:,}")
|
||||
return False
|
||||
logger.debug(f" ✓ Capacity min OK: {capacity:,} >= {matcher.chan_capacity_min:,}")
|
||||
if matcher.chan_capacity_max:
|
||||
if capacity > matcher.chan_capacity_max:
|
||||
logger.debug(f" ✗ Capacity too large: {capacity:,} > {matcher.chan_capacity_max:,}")
|
||||
return False
|
||||
logger.debug(f" ✓ Capacity max OK: {capacity:,} <= {matcher.chan_capacity_max:,}")
|
||||
|
||||
# Balance ratio matching
|
||||
balance_ratio = channel_data.get('local_balance_ratio', 0.5)
|
||||
if matcher.chan_balance_ratio_min and balance_ratio < matcher.chan_balance_ratio_min:
|
||||
return False
|
||||
if matcher.chan_balance_ratio_max and balance_ratio > matcher.chan_balance_ratio_max:
|
||||
return False
|
||||
if matcher.chan_balance_ratio_min:
|
||||
if balance_ratio < matcher.chan_balance_ratio_min:
|
||||
logger.debug(f" ✗ Balance ratio too low: {balance_ratio:.2%} < {matcher.chan_balance_ratio_min:.2%}")
|
||||
return False
|
||||
logger.debug(f" ✓ Balance ratio min OK: {balance_ratio:.2%} >= {matcher.chan_balance_ratio_min:.2%}")
|
||||
if matcher.chan_balance_ratio_max:
|
||||
if balance_ratio > matcher.chan_balance_ratio_max:
|
||||
logger.debug(f" ✗ Balance ratio too high: {balance_ratio:.2%} > {matcher.chan_balance_ratio_max:.2%}")
|
||||
return False
|
||||
logger.debug(f" ✓ Balance ratio max OK: {balance_ratio:.2%} <= {matcher.chan_balance_ratio_max:.2%}")
|
||||
|
||||
# Node ID matching
|
||||
peer_id = channel_data.get('peer_pubkey', '')
|
||||
if matcher.node_id and peer_id not in matcher.node_id:
|
||||
return False
|
||||
if matcher.node_id:
|
||||
peer_id = channel_data.get('peer_pubkey', '')
|
||||
if peer_id not in matcher.node_id:
|
||||
logger.debug(f" ✗ Peer ID mismatch: {peer_id[:16]}... not in target list")
|
||||
return False
|
||||
logger.debug(f" ✓ Peer ID matches: {peer_id[:16]}...")
|
||||
|
||||
# Activity level matching
|
||||
activity = channel_data.get('activity_level', 'inactive')
|
||||
if matcher.activity_level and activity not in matcher.activity_level:
|
||||
return False
|
||||
if matcher.activity_level:
|
||||
activity = channel_data.get('activity_level', 'inactive')
|
||||
if activity not in matcher.activity_level:
|
||||
logger.debug(f" ✗ Activity level mismatch: '{activity}' not in {matcher.activity_level}")
|
||||
return False
|
||||
logger.debug(f" ✓ Activity level matches: '{activity}' in {matcher.activity_level}")
|
||||
|
||||
# Flow matching
|
||||
flow_7d = channel_data.get('flow_7d', 0)
|
||||
if matcher.flow_7d_min and flow_7d < matcher.flow_7d_min:
|
||||
return False
|
||||
if matcher.flow_7d_max and flow_7d > matcher.flow_7d_max:
|
||||
return False
|
||||
if matcher.flow_7d_min:
|
||||
if flow_7d < matcher.flow_7d_min:
|
||||
logger.debug(f" ✗ Flow too low: {flow_7d:,} < {matcher.flow_7d_min:,}")
|
||||
return False
|
||||
logger.debug(f" ✓ Flow min OK: {flow_7d:,} >= {matcher.flow_7d_min:,}")
|
||||
if matcher.flow_7d_max:
|
||||
if flow_7d > matcher.flow_7d_max:
|
||||
logger.debug(f" ✗ Flow too high: {flow_7d:,} > {matcher.flow_7d_max:,}")
|
||||
return False
|
||||
logger.debug(f" ✓ Flow max OK: {flow_7d:,} <= {matcher.flow_7d_max:,}")
|
||||
|
||||
logger.debug(f" ✓ All criteria passed")
|
||||
return True
|
||||
|
||||
def calculate_fees(self, channel_data: Dict[str, Any]) -> Tuple[int, int, int, int]:
|
||||
|
||||
@@ -191,8 +191,17 @@ class PolicyManager:
|
||||
rule.applied_count += 1
|
||||
rule.last_applied = datetime.utcnow()
|
||||
|
||||
logger.info(f"Policy applied to {channel_id}: {[r.name for r in matching_rules]} "
|
||||
f"→ {outbound_fee}ppm outbound, {inbound_fee}ppm inbound")
|
||||
# Enhanced logging with detailed channel and policy information
|
||||
peer_alias = enriched_channel.get('peer', {}).get('alias', 'Unknown')
|
||||
capacity_btc = capacity / 100_000_000
|
||||
logger.info(
|
||||
f"Policy applied to {channel_id} [{peer_alias}]:\n"
|
||||
f" Capacity: {capacity_btc:.3f} BTC ({capacity:,} sats)\n"
|
||||
f" Balance: {local_balance:,} / {remote_balance:,} (ratio: {balance_ratio:.2%})\n"
|
||||
f" Policies: {[r.name for r in matching_rules]}\n"
|
||||
f" Fee Change: {current_outbound} → {outbound_fee}ppm outbound, {current_inbound} → {inbound_fee}ppm inbound\n"
|
||||
f" Base Fees: {outbound_base}msat outbound, {inbound_base}msat inbound"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error processing channel {channel_id}: {e}"
|
||||
@@ -206,9 +215,22 @@ class PolicyManager:
|
||||
# Generate performance summary
|
||||
results['performance_summary'] = self.policy_engine.get_policy_performance_report()
|
||||
|
||||
logger.info(f"Policy application complete: {results['fee_changes']} changes, "
|
||||
f"{results['policies_applied']} policies applied, "
|
||||
f"{len(results['errors'])} errors")
|
||||
# Enhanced summary logging
|
||||
logger.info(
|
||||
f"Policy Application Summary:\n"
|
||||
f" Channels Processed: {results.get('channels_processed', 0)}\n"
|
||||
f" Fee Changes Applied: {results['fee_changes']}\n"
|
||||
f" Policies Applied: {results['policies_applied']}\n"
|
||||
f" Errors: {len(results['errors'])}\n"
|
||||
f" Session ID: {results.get('session_id', 'N/A')}"
|
||||
)
|
||||
|
||||
if results['errors']:
|
||||
logger.warning(f"Errors encountered during policy application:")
|
||||
for i, error in enumerate(results['errors'][:5], 1): # Show first 5 errors
|
||||
logger.warning(f" {i}. {error}")
|
||||
if len(results['errors']) > 5:
|
||||
logger.warning(f" ... and {len(results['errors']) - 5} more errors")
|
||||
|
||||
return results
|
||||
|
||||
@@ -220,6 +242,12 @@ class PolicyManager:
|
||||
channel_id = channel_info.get('channelIdCompact')
|
||||
capacity = int(channel_info.get('capacity', 0)) if channel_info.get('capacity') else 0
|
||||
|
||||
logger.debug(f"Processing channel {channel_id}:")
|
||||
logger.debug(f" Raw capacity: {channel_info.get('capacity')}")
|
||||
logger.debug(f" Raw balance info: {channel_info.get('balance', {})}")
|
||||
logger.debug(f" Raw policies: {channel_info.get('policies', {})}")
|
||||
logger.debug(f" Raw peer info: {channel_info.get('peer', {})}")
|
||||
|
||||
# Get balance info
|
||||
balance_info = channel_info.get('balance', {})
|
||||
local_balance = int(balance_info.get('localBalanceSat', 0)) if balance_info.get('localBalanceSat') else 0
|
||||
@@ -321,12 +349,26 @@ class PolicyManager:
|
||||
time_lock_delta=80
|
||||
)
|
||||
|
||||
logger.info(f"Applied fees via {client_type} to {channel_id}: "
|
||||
f"{outbound_fee}ppm outbound, {inbound_fee}ppm inbound")
|
||||
logger.info(
|
||||
f"Successfully applied fees via {client_type} to {channel_id}:\n"
|
||||
f" Channel Point: {chan_point}\n"
|
||||
f" Outbound: {outbound_fee}ppm (base: {outbound_base}msat)\n"
|
||||
f" Inbound: {inbound_fee}ppm (base: {inbound_base}msat)\n"
|
||||
f" Time Lock Delta: 80"
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to apply fees to {channel_id} via {client_type}: {e}")
|
||||
logger.error(
|
||||
f"Failed to apply fees to {channel_id} via {client_type}:\n"
|
||||
f" Error: {str(e)}\n"
|
||||
f" Channel Point: {chan_point}\n"
|
||||
f" Attempted Parameters:\n"
|
||||
f" Outbound: {outbound_fee}ppm (base: {outbound_base}msat)\n"
|
||||
f" Inbound: {inbound_fee}ppm (base: {inbound_base}msat)\n"
|
||||
f" Time Lock Delta: 80\n"
|
||||
f" Exception Type: {type(e).__name__}"
|
||||
)
|
||||
return False
|
||||
|
||||
async def check_rollback_conditions(self) -> Dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user