diff --git a/pyproject.toml b/pyproject.toml index 694fa08..39efb4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,10 @@ dependencies = [ "numpy>=1.24.0", "rich>=13.0.0", "python-dotenv>=1.0.0", + "tabulate>=0.9.0", + "scipy>=1.10.0", + "grpcio>=1.50.0", + "grpcio-tools>=1.50.0", ] [project.scripts] diff --git a/requirements.txt b/requirements.txt index d9eec01..ec725b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,6 @@ numpy>=1.24.0 rich>=13.0.0 python-dotenv>=1.0.0 tabulate>=0.9.0 -scipy>=1.10.0 \ No newline at end of file +scipy>=1.10.0 +grpcio>=1.50.0 +grpcio-tools>=1.50.0 \ No newline at end of file diff --git a/src/experiment/lnd_grpc_client.py b/src/experiment/lnd_grpc_client.py index 9a74361..635d983 100644 --- a/src/experiment/lnd_grpc_client.py +++ b/src/experiment/lnd_grpc_client.py @@ -434,11 +434,32 @@ class AsyncLNDgRPCClient: return await loop.run_in_executor(None, self.sync_client.list_channels) async def update_channel_policy(self, *args, **kwargs): - """Async version of update_channel_policy""" - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, self.sync_client.update_channel_policy, *args, **kwargs + """Async version of update_channel_policy with enhanced logging""" + logger.debug( + f"gRPC update_channel_policy called with:\n" + f" Args: {args}\n" + f" Kwargs: {kwargs}" ) + + try: + loop = asyncio.get_event_loop() + # Fix: Use lambda to properly pass kwargs to run_in_executor + result = await loop.run_in_executor( + None, lambda: self.sync_client.update_channel_policy(*args, **kwargs) + ) + + logger.debug(f"gRPC update_channel_policy succeeded: {result}") + return result + + except Exception as e: + logger.error( + f"gRPC update_channel_policy failed:\n" + f" Error: {str(e)}\n" + f" Exception Type: {type(e).__name__}\n" + f" Args: {args}\n" + f" Kwargs: {kwargs}" + ) + raise async def __aenter__(self): return self diff --git a/src/policy/engine.py b/src/policy/engine.py index 84a11ce..4027bbf 100644 --- a/src/policy/engine.py +++ b/src/policy/engine.py @@ -304,58 +304,99 @@ class PolicyEngine: def match_channel(self, channel_data: Dict[str, Any]) -> List[PolicyRule]: """Find matching policies for a channel""" matching_rules = [] + channel_id = channel_data.get('channel_id', 'unknown') + + logger.debug(f"Evaluating policies for channel {channel_id}:") + logger.debug(f" Channel Data: capacity={channel_data.get('capacity', 0):,}, " + f"balance_ratio={channel_data.get('local_balance_ratio', 0.5):.2%}, " + f"activity={channel_data.get('activity_level', 'unknown')}") for rule in self.rules: if not rule.enabled: + logger.debug(f" Skipping disabled policy: {rule.name}") continue if self._channel_matches(channel_data, rule.matcher): matching_rules.append(rule) + logger.debug(f" ✓ MATCHED policy: {rule.name} (priority {rule.priority})") # Stop if this is a final policy if rule.policy.policy_type == PolicyType.FINAL: + logger.debug(f" Stopping at final policy: {rule.name}") break + else: + logger.debug(f" ✗ SKIPPED policy: {rule.name}") + logger.debug(f" Final matches for {channel_id}: {[r.name for r in matching_rules]}") return matching_rules def _channel_matches(self, channel_data: Dict[str, Any], matcher: PolicyMatcher) -> bool: - """Check if channel matches policy criteria""" + """Check if channel matches policy criteria with detailed debug logging""" # Channel ID matching - if matcher.chan_id and channel_data.get('channel_id') not in matcher.chan_id: - return False + if matcher.chan_id: + channel_id = channel_data.get('channel_id', '') + if channel_id not in matcher.chan_id: + logger.debug(f" ✗ Channel ID mismatch: {channel_id} not in {matcher.chan_id}") + return False + logger.debug(f" ✓ Channel ID matches: {channel_id}") # Capacity matching capacity = channel_data.get('capacity', 0) - if matcher.chan_capacity_min and capacity < matcher.chan_capacity_min: - return False - if matcher.chan_capacity_max and capacity > matcher.chan_capacity_max: - return False + if matcher.chan_capacity_min: + if capacity < matcher.chan_capacity_min: + logger.debug(f" ✗ Capacity too small: {capacity:,} < {matcher.chan_capacity_min:,}") + return False + logger.debug(f" ✓ Capacity min OK: {capacity:,} >= {matcher.chan_capacity_min:,}") + if matcher.chan_capacity_max: + if capacity > matcher.chan_capacity_max: + logger.debug(f" ✗ Capacity too large: {capacity:,} > {matcher.chan_capacity_max:,}") + return False + logger.debug(f" ✓ Capacity max OK: {capacity:,} <= {matcher.chan_capacity_max:,}") # Balance ratio matching balance_ratio = channel_data.get('local_balance_ratio', 0.5) - if matcher.chan_balance_ratio_min and balance_ratio < matcher.chan_balance_ratio_min: - return False - if matcher.chan_balance_ratio_max and balance_ratio > matcher.chan_balance_ratio_max: - return False + if matcher.chan_balance_ratio_min: + if balance_ratio < matcher.chan_balance_ratio_min: + logger.debug(f" ✗ Balance ratio too low: {balance_ratio:.2%} < {matcher.chan_balance_ratio_min:.2%}") + return False + logger.debug(f" ✓ Balance ratio min OK: {balance_ratio:.2%} >= {matcher.chan_balance_ratio_min:.2%}") + if matcher.chan_balance_ratio_max: + if balance_ratio > matcher.chan_balance_ratio_max: + logger.debug(f" ✗ Balance ratio too high: {balance_ratio:.2%} > {matcher.chan_balance_ratio_max:.2%}") + return False + logger.debug(f" ✓ Balance ratio max OK: {balance_ratio:.2%} <= {matcher.chan_balance_ratio_max:.2%}") # Node ID matching - peer_id = channel_data.get('peer_pubkey', '') - if matcher.node_id and peer_id not in matcher.node_id: - return False + if matcher.node_id: + peer_id = channel_data.get('peer_pubkey', '') + if peer_id not in matcher.node_id: + logger.debug(f" ✗ Peer ID mismatch: {peer_id[:16]}... not in target list") + return False + logger.debug(f" ✓ Peer ID matches: {peer_id[:16]}...") # Activity level matching - activity = channel_data.get('activity_level', 'inactive') - if matcher.activity_level and activity not in matcher.activity_level: - return False + if matcher.activity_level: + activity = channel_data.get('activity_level', 'inactive') + if activity not in matcher.activity_level: + logger.debug(f" ✗ Activity level mismatch: '{activity}' not in {matcher.activity_level}") + return False + logger.debug(f" ✓ Activity level matches: '{activity}' in {matcher.activity_level}") # Flow matching flow_7d = channel_data.get('flow_7d', 0) - if matcher.flow_7d_min and flow_7d < matcher.flow_7d_min: - return False - if matcher.flow_7d_max and flow_7d > matcher.flow_7d_max: - return False + if matcher.flow_7d_min: + if flow_7d < matcher.flow_7d_min: + logger.debug(f" ✗ Flow too low: {flow_7d:,} < {matcher.flow_7d_min:,}") + return False + logger.debug(f" ✓ Flow min OK: {flow_7d:,} >= {matcher.flow_7d_min:,}") + if matcher.flow_7d_max: + if flow_7d > matcher.flow_7d_max: + logger.debug(f" ✗ Flow too high: {flow_7d:,} > {matcher.flow_7d_max:,}") + return False + logger.debug(f" ✓ Flow max OK: {flow_7d:,} <= {matcher.flow_7d_max:,}") + logger.debug(f" ✓ All criteria passed") return True def calculate_fees(self, channel_data: Dict[str, Any]) -> Tuple[int, int, int, int]: diff --git a/src/policy/manager.py b/src/policy/manager.py index 2114c19..8c697c0 100644 --- a/src/policy/manager.py +++ b/src/policy/manager.py @@ -191,8 +191,17 @@ class PolicyManager: rule.applied_count += 1 rule.last_applied = datetime.utcnow() - logger.info(f"Policy applied to {channel_id}: {[r.name for r in matching_rules]} " - f"→ {outbound_fee}ppm outbound, {inbound_fee}ppm inbound") + # Enhanced logging with detailed channel and policy information + peer_alias = enriched_channel.get('peer', {}).get('alias', 'Unknown') + capacity_btc = capacity / 100_000_000 + logger.info( + f"Policy applied to {channel_id} [{peer_alias}]:\n" + f" Capacity: {capacity_btc:.3f} BTC ({capacity:,} sats)\n" + f" Balance: {local_balance:,} / {remote_balance:,} (ratio: {balance_ratio:.2%})\n" + f" Policies: {[r.name for r in matching_rules]}\n" + f" Fee Change: {current_outbound} → {outbound_fee}ppm outbound, {current_inbound} → {inbound_fee}ppm inbound\n" + f" Base Fees: {outbound_base}msat outbound, {inbound_base}msat inbound" + ) except Exception as e: error_msg = f"Error processing channel {channel_id}: {e}" @@ -206,9 +215,22 @@ class PolicyManager: # Generate performance summary results['performance_summary'] = self.policy_engine.get_policy_performance_report() - logger.info(f"Policy application complete: {results['fee_changes']} changes, " - f"{results['policies_applied']} policies applied, " - f"{len(results['errors'])} errors") + # Enhanced summary logging + logger.info( + f"Policy Application Summary:\n" + f" Channels Processed: {results.get('channels_processed', 0)}\n" + f" Fee Changes Applied: {results['fee_changes']}\n" + f" Policies Applied: {results['policies_applied']}\n" + f" Errors: {len(results['errors'])}\n" + f" Session ID: {results.get('session_id', 'N/A')}" + ) + + if results['errors']: + logger.warning(f"Errors encountered during policy application:") + for i, error in enumerate(results['errors'][:5], 1): # Show first 5 errors + logger.warning(f" {i}. {error}") + if len(results['errors']) > 5: + logger.warning(f" ... and {len(results['errors']) - 5} more errors") return results @@ -220,6 +242,12 @@ class PolicyManager: channel_id = channel_info.get('channelIdCompact') capacity = int(channel_info.get('capacity', 0)) if channel_info.get('capacity') else 0 + logger.debug(f"Processing channel {channel_id}:") + logger.debug(f" Raw capacity: {channel_info.get('capacity')}") + logger.debug(f" Raw balance info: {channel_info.get('balance', {})}") + logger.debug(f" Raw policies: {channel_info.get('policies', {})}") + logger.debug(f" Raw peer info: {channel_info.get('peer', {})}") + # Get balance info balance_info = channel_info.get('balance', {}) local_balance = int(balance_info.get('localBalanceSat', 0)) if balance_info.get('localBalanceSat') else 0 @@ -321,12 +349,26 @@ class PolicyManager: time_lock_delta=80 ) - logger.info(f"Applied fees via {client_type} to {channel_id}: " - f"{outbound_fee}ppm outbound, {inbound_fee}ppm inbound") + logger.info( + f"Successfully applied fees via {client_type} to {channel_id}:\n" + f" Channel Point: {chan_point}\n" + f" Outbound: {outbound_fee}ppm (base: {outbound_base}msat)\n" + f" Inbound: {inbound_fee}ppm (base: {inbound_base}msat)\n" + f" Time Lock Delta: 80" + ) return True except Exception as e: - logger.error(f"Failed to apply fees to {channel_id} via {client_type}: {e}") + logger.error( + f"Failed to apply fees to {channel_id} via {client_type}:\n" + f" Error: {str(e)}\n" + f" Channel Point: {chan_point}\n" + f" Attempted Parameters:\n" + f" Outbound: {outbound_fee}ppm (base: {outbound_base}msat)\n" + f" Inbound: {inbound_fee}ppm (base: {inbound_base}msat)\n" + f" Time Lock Delta: 80\n" + f" Exception Type: {type(e).__name__}" + ) return False async def check_rollback_conditions(self) -> Dict[str, Any]: