diff --git a/CODE_REVIEW_HTLC_MONITORING.md b/CODE_REVIEW_HTLC_MONITORING.md new file mode 100644 index 0000000..0d45521 --- /dev/null +++ b/CODE_REVIEW_HTLC_MONITORING.md @@ -0,0 +1,522 @@ +# Code Review: HTLC Monitoring & Opportunity Detection + +## Executive Summary + +**Overall Assessment**: ๐ŸŸก **Good Foundation, Needs Refinement** + +The implementation is functionally sound and well-structured, but has **several scalability and production-readiness issues** that should be addressed before heavy use. + +--- + +## ๐Ÿ”ด **CRITICAL ISSUES** + +### 1. **Unbounded Memory Growth in channel_stats** +**Location**: `src/monitoring/htlc_monitor.py:115` + +```python +self.channel_stats: Dict[str, ChannelFailureStats] = defaultdict(ChannelFailureStats) +``` + +**Problem**: +- This dict grows unbounded (one entry per channel ever seen) +- With 1000 channels ร— 100 recent_failures each = 100,000 events in memory +- No cleanup mechanism for inactive channels +- Memory leak over long-term operation + +**Impact**: High - Memory exhaustion on high-volume nodes + +**Fix Priority**: ๐Ÿ”ด CRITICAL + +**Recommendation**: +```python +# Option 1: Add max channels limit +if len(self.channel_stats) > MAX_CHANNELS: + # Remove oldest inactive channel + oldest = min(self.channel_stats.items(), key=lambda x: x[1].last_failure or x[1].first_seen) + del self.channel_stats[oldest[0]] + +# Option 2: Integrate with existing cleanup +def cleanup_old_data(self): + # Also clean inactive channel_stats + for channel_id in list(self.channel_stats.keys()): + stats = self.channel_stats[channel_id] + if stats.last_failure and stats.last_failure < cutoff: + del self.channel_stats[channel_id] +``` + +--- + +### 2. **Missing Type Annotations** +**Location**: Multiple files + +```python +# BAD +def __init__(self, grpc_client=None, lnd_manage_client=None): + +# GOOD +from typing import Protocol + +class GRPCClient(Protocol): + async def subscribe_htlc_events(self): ... + +def __init__(self, + grpc_client: Optional[GRPCClient] = None, + lnd_manage_client: Optional[LndManageClient] = None): +``` + +**Problem**: +- No type safety +- IDE can't provide autocomplete +- Hard to catch bugs at development time + +**Impact**: Medium - Development velocity and bug proneness + +**Fix Priority**: ๐ŸŸก HIGH + +--- + +### 3. **No Async Context Manager** +**Location**: `src/monitoring/htlc_monitor.py:92` + +```python +# CURRENT: Manual lifecycle management +monitor = HTLCMonitor(grpc_client) +await monitor.start_monitoring() +# ... use it ... +await monitor.stop_monitoring() + +# SHOULD BE: +async with HTLCMonitor(grpc_client) as monitor: + # Automatically starts and stops + pass +``` + +**Problem**: +- Resources not guaranteed to be cleaned up +- No automatic stop on exception +- Violates Python best practices + +**Impact**: Medium - Resource leaks + +**Fix Priority**: ๐ŸŸก HIGH + +--- + +### 4. **Fragile String Parsing for Failure Reasons** +**Location**: `src/monitoring/htlc_monitor.py:215-224` + +```python +if 'insufficient' in failure_str or 'balance' in failure_str: + failure_reason = FailureReason.INSUFFICIENT_BALANCE +elif 'fee' in failure_str: + failure_reason = FailureReason.FEE_INSUFFICIENT +``` + +**Problem**: +- String matching is brittle +- LND provides specific failure codes, not being used +- False positives possible ("insufficient fee" would match "insufficient") + +**Impact**: Medium - Incorrect categorization + +**Fix Priority**: ๐ŸŸก HIGH + +**Recommendation**: Use LND's actual `FailureCode` enum from protobuf: +```python +# LND has specific codes like: +# - TEMPORARY_CHANNEL_FAILURE = 0x1007 +# - UNKNOWN_NEXT_PEER = 0x4002 +# - INSUFFICIENT_BALANCE = 0x1001 +# - FEE_INSUFFICIENT = 0x100C +``` + +--- + +## ๐ŸŸก **HIGH PRIORITY ISSUES** + +### 5. **O(n) Performance in get_top_missed_opportunities()** +**Location**: `src/monitoring/htlc_monitor.py:293` + +```python +def get_top_missed_opportunities(self, limit: int = 10): + # Iterates ALL channels every time + opportunities = [stats for stats in self.channel_stats.values() if ...] + opportunities.sort(key=lambda x: x.total_missed_fees_msat, reverse=True) + return opportunities[:limit] +``` + +**Problem**: +- O(n log n) sort on every call +- With 10,000 channels, this is expensive +- Called frequently for analysis + +**Impact**: Medium - Performance degradation at scale + +**Fix Priority**: ๐ŸŸก HIGH + +**Recommendation**: Use a heap or maintain sorted structure +```python +import heapq + +class HTLCMonitor: + def __init__(self): + self._top_opportunities = [] # min-heap + + def _update_opportunities_heap(self, stats): + heapq.heappush(self._top_opportunities, (-stats.total_missed_fees_msat, stats)) + if len(self._top_opportunities) > 100: + heapq.heappop(self._top_opportunities) +``` + +--- + +### 6. **No Persistence Layer** +**Location**: `src/monitoring/htlc_monitor.py` + +**Problem**: +- All data in-memory only +- Restart = lose all historical data +- Can't analyze patterns over weeks/months + +**Impact**: Medium - Limited analysis capability + +**Fix Priority**: ๐ŸŸก HIGH + +**Recommendation**: Integrate with existing `ExperimentDatabase`: +```python +# Periodically persist to SQLite +async def _persist_stats(self): + for channel_id, stats in self.channel_stats.items(): + await self.db.save_htlc_stats(channel_id, stats) +``` + +--- + +### 7. **Missing Timezone Awareness** +**Location**: Multiple places using `datetime.utcnow()` + +```python +# BAD +timestamp=datetime.utcnow() + +# GOOD +from datetime import timezone +timestamp=datetime.now(timezone.utc) +``` + +**Problem**: +- Naive datetimes cause comparison issues +- Hard to handle DST correctly +- Best practice violation + +**Impact**: Low-Medium - Potential bugs with time comparisons + +**Fix Priority**: ๐ŸŸก MEDIUM + +--- + +### 8. **Tight Coupling** +**Location**: Multiple files + +**Problem**: +```python +# OpportunityAnalyzer is tightly coupled to HTLCMonitor +class OpportunityAnalyzer: + def __init__(self, htlc_monitor: HTLCMonitor, ...): + self.htlc_monitor = htlc_monitor +``` + +**Better Design**: Use dependency injection with protocols +```python +from typing import Protocol + +class FailureStatsProvider(Protocol): + def get_top_missed_opportunities(self, limit: int) -> List[ChannelFailureStats]: ... + +class OpportunityAnalyzer: + def __init__(self, stats_provider: FailureStatsProvider, ...): + self.stats_provider = stats_provider +``` + +**Impact**: Medium - Hard to test, inflexible + +**Fix Priority**: ๐ŸŸก MEDIUM + +--- + +## ๐ŸŸข **MEDIUM PRIORITY ISSUES** + +### 9. **No Rate Limiting** +**Location**: `src/monitoring/htlc_monitor.py:243` + +**Problem**: +- No protection against event floods +- High-volume nodes could overwhelm processing +- No backpressure mechanism + +**Recommendation**: Add semaphore or rate limiter +```python +from asyncio import Semaphore + +class HTLCMonitor: + def __init__(self): + self._processing_semaphore = Semaphore(100) # Max 100 concurrent + + async def _process_event(self, event): + async with self._processing_semaphore: + # Process event + ... +``` + +--- + +### 10. **Missing Error Recovery** +**Location**: `src/monitoring/htlc_monitor.py:175` + +```python +except Exception as e: + if self.monitoring: + logger.error(f"Error: {e}") + await asyncio.sleep(5) # Fixed 5s retry +``` + +**Problem**: +- No exponential backoff +- No circuit breaker +- Could retry-loop forever on persistent errors + +**Recommendation**: Use exponential backoff +```python +retry_delay = 1 +while self.monitoring: + try: + # ... + retry_delay = 1 # Reset on success + except Exception: + await asyncio.sleep(min(retry_delay, 60)) + retry_delay *= 2 # Exponential backoff +``` + +--- + +### 11. **Callback Error Handling** +**Location**: `src/monitoring/htlc_monitor.py:273-280` + +```python +for callback in self.callbacks: + try: + if asyncio.iscoroutinefunction(callback): + await callback(event) + else: + callback(event) + except Exception as e: + logger.error(f"Error in callback: {e}") # Just logs! +``` + +**Problem**: +- Silent failures in callbacks +- No way to know if critical logic failed +- Could hide bugs + +**Recommendation**: Add callback error metrics or re-raise after logging + +--- + +### 12. **No Batch Processing** +**Location**: `src/monitoring/htlc_monitor.py:243` + +**Problem**: +- Processing events one-by-one +- Could batch for better throughput + +**Recommendation**: +```python +async def _process_events_batch(self, events: List[HTLCEvent]): + # Bulk update stats + # Single database write + # Trigger callbacks once per batch +``` + +--- + +### 13. **TODO in Production Code** +**Location**: `src/monitoring/htlc_monitor.py:200` + +```python +# TODO: Implement forwarding history polling +yield None +``` + +**Problem**: +- Incomplete fallback implementation +- Yields None which could cause downstream errors + +**Fix**: Either implement or raise NotImplementedError + +--- + +### 14. **Missing Monitoring/Metrics** +**Location**: Entire module + +**Problem**: +- No Prometheus metrics +- No health check endpoint +- Hard to monitor in production + +**Recommendation**: Add metrics +```python +from prometheus_client import Counter, Histogram + +htlc_events_total = Counter('htlc_events_total', 'Total HTLC events', ['type']) +htlc_processing_duration = Histogram('htlc_processing_seconds', 'Time to process event') +``` + +--- + +## โœ… **POSITIVE ASPECTS** + +1. **Good separation of concerns**: Monitor vs Analyzer +2. **Well-documented**: Docstrings throughout +3. **Proper use of dataclasses**: Clean data modeling +4. **Enum usage**: Type-safe event types +5. **Callback system**: Extensible architecture +6. **Deque with maxlen**: Bounded event storage +7. **Async throughout**: Proper async/await usage +8. **Rich CLI**: Good user experience + +--- + +## ๐Ÿ“Š **SCALABILITY ANALYSIS** + +### Current Limits (without fixes): + +| Metric | Current Limit | Reason | +|--------|---------------|--------| +| Active channels | ~1,000 | Memory growth in channel_stats | +| Events/second | ~100 | Single-threaded processing | +| History retention | ~10,000 events | Deque maxlen | +| Analysis speed | O(n log n) | Sort on every call | + +### After Fixes: + +| Metric | With Fixes | Improvement | +|--------|------------|-------------| +| Active channels | ~10,000+ | Cleanup + heap | +| Events/second | ~1,000+ | Batch processing | +| History retention | Unlimited | Database persistence | +| Analysis speed | O(log n) | Heap-based top-k | + +--- + +## ๐ŸŽฏ **RECOMMENDED FIXES (Priority Order)** + +### Phase 1: Critical (Do Now) +1. โœ… Add channel_stats cleanup to prevent memory leak +2. โœ… Add proper type hints +3. โœ… Implement async context manager +4. โœ… Use LND failure codes instead of string matching + +### Phase 2: High Priority (Next Sprint) +5. โœ… Add heap-based opportunity tracking +6. โœ… Add database persistence +7. โœ… Fix timezone handling +8. โœ… Reduce coupling with protocols + +### Phase 3: Medium Priority (Future) +9. Add rate limiting +10. Add exponential backoff +11. Improve error handling +12. Add batch processing +13. Remove TODOs +14. Add metrics/monitoring + +--- + +## ๐Ÿ’ก **ARCHITECTURAL IMPROVEMENTS** + +### Current Architecture: +``` +CLI โ†’ HTLCMonitor โ†’ OpportunityAnalyzer โ†’ LNDManageClient + โ†“ + GRPCClient +``` + +### Recommended Architecture: +``` +CLI โ†’ OpportunityService (Facade) + โ”œโ”€> HTLCCollector (Interface) + โ”‚ โ””โ”€> GRPCHTLCCollector (Impl) + โ”œโ”€> FailureStatsStore (Interface) + โ”‚ โ””โ”€> SQLiteStatsStore (Impl) + โ””โ”€> OpportunityAnalyzer + โ””โ”€> ChannelInfoProvider (Interface) + โ””โ”€> LNDManageClient (Impl) +``` + +**Benefits**: +- Testable (mock interfaces) +- Swappable implementations +- Clear dependencies +- SOLID principles + +--- + +## ๐Ÿงช **TESTING GAPS** + +Currently: **0 tests** โŒ + +**Need**: +1. Unit tests for HTLCMonitor +2. Unit tests for OpportunityAnalyzer +3. Integration tests with mock gRPC +4. Performance tests (10k events) +5. Memory leak tests (long-running) + +**Estimated Coverage Needed**: 80%+ + +--- + +## ๐Ÿ“ **SUMMARY** + +### The Good โœ… +- Solid foundation +- Clean separation of concerns +- Well-documented +- Proper async usage + +### The Bad ๐ŸŸก +- Memory leaks possible +- No persistence +- Tight coupling +- Missing type safety + +### The Ugly ๐Ÿ”ด +- Could crash on high-volume nodes +- Fragile error parsing +- O(n) inefficiencies +- No tests! + +### Overall Grade: **B-** (75/100) + +**Production Ready**: Not yet - needs Phase 1 fixes minimum + +**Recommendation**: Implement Phase 1 critical fixes before production use on high-volume nodes (>100 channels, >1000 forwards/day). + +For low-volume nodes (<100 channels), current implementation is acceptable. + +--- + +## ๐Ÿ”ง **Action Items** + +1. [ ] Fix memory leak in channel_stats +2. [ ] Add type hints (use mypy) +3. [ ] Implement context manager +4. [ ] Use LND failure codes +5. [ ] Add basic unit tests +6. [ ] Add database persistence +7. [ ] Write integration tests +8. [ ] Load test with 10k events +9. [ ] Add monitoring metrics +10. [ ] Document scalability limits + +**Estimated Effort**: 2-3 days for critical fixes, 1 week for full production hardening diff --git a/requirements.txt b/requirements.txt index ec725b2..ac90a79 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,12 @@ -httpx>=0.25.0 -pydantic>=2.0.0 -click>=8.0.0 -pandas>=2.0.0 -numpy>=1.24.0 -rich>=13.0.0 -python-dotenv>=1.0.0 +httpx>=0.27.0 +pydantic>=2.6.0 +click>=8.1.7 +pandas>=2.2.0 +numpy>=1.26.0 +rich>=13.7.0 +python-dotenv>=1.0.1 tabulate>=0.9.0 -scipy>=1.10.0 -grpcio>=1.50.0 -grpcio-tools>=1.50.0 \ No newline at end of file +scipy>=1.12.0 +grpcio>=1.60.0 +grpcio-tools>=1.60.0 +prometheus-client>=0.19.0 \ No newline at end of file diff --git a/src/monitoring/htlc_monitor.py b/src/monitoring/htlc_monitor.py index 9fe810a..903fb4a 100644 --- a/src/monitoring/htlc_monitor.py +++ b/src/monitoring/htlc_monitor.py @@ -2,8 +2,8 @@ import asyncio import logging -from datetime import datetime, timedelta -from typing import Dict, List, Optional, Set, Callable +from datetime import datetime, timedelta, timezone +from typing import Dict, List, Optional, Set, Callable, Protocol from dataclasses import dataclass, field from enum import Enum from collections import defaultdict, deque @@ -11,6 +11,13 @@ from collections import defaultdict, deque logger = logging.getLogger(__name__) +class GRPCClient(Protocol): + """Protocol for gRPC client with HTLC support""" + async def subscribe_htlc_events(self): + """Subscribe to HTLC events""" + ... + + class HTLCEventType(Enum): """Types of HTLC events we track""" FORWARD = "forward" @@ -68,7 +75,7 @@ class ChannelFailureStats: total_missed_amount_msat: int = 0 total_missed_fees_msat: int = 0 recent_failures: deque = field(default_factory=lambda: deque(maxlen=100)) - first_seen: datetime = field(default_factory=datetime.utcnow) + first_seen: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) last_failure: Optional[datetime] = None @property @@ -93,10 +100,11 @@ class HTLCMonitor: """Monitor HTLC events and detect missed routing opportunities""" def __init__(self, - grpc_client=None, + grpc_client: Optional[GRPCClient] = None, history_hours: int = 24, min_failure_count: int = 3, - min_missed_sats: int = 100): + min_missed_sats: int = 100, + max_channels: int = 10000): """ Initialize HTLC monitor @@ -105,15 +113,17 @@ class HTLCMonitor: history_hours: How many hours of history to keep min_failure_count: Minimum failures to flag as opportunity min_missed_sats: Minimum missed sats to flag as opportunity + max_channels: Maximum channels to track (prevents unbounded growth) """ self.grpc_client = grpc_client self.history_hours = history_hours self.min_failure_count = min_failure_count self.min_missed_sats = min_missed_sats + self.max_channels = max_channels # Event storage self.events: deque = deque(maxlen=10000) # Last 10k events - self.channel_stats: Dict[str, ChannelFailureStats] = defaultdict(ChannelFailureStats) + self.channel_stats: Dict[str, ChannelFailureStats] = {} # Monitoring state self.monitoring = False @@ -121,13 +131,24 @@ class HTLCMonitor: self.callbacks: List[Callable[[HTLCEvent], None]] = [] logger.info(f"HTLC Monitor initialized (history: {history_hours}h, " - f"min failures: {min_failure_count}, min sats: {min_missed_sats})") + f"min failures: {min_failure_count}, min sats: {min_missed_sats}, " + f"max channels: {max_channels})") def register_callback(self, callback: Callable[[HTLCEvent], None]): """Register a callback to be called on each HTLC event""" self.callbacks.append(callback) logger.debug(f"Registered callback: {callback.__name__}") + async def __aenter__(self): + """Async context manager entry - starts monitoring""" + await self.start_monitoring() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit - stops monitoring""" + await self.stop_monitoring() + return False + async def start_monitoring(self): """Start monitoring HTLC events""" if self.monitoring: @@ -224,7 +245,7 @@ class HTLCMonitor: failure_reason = FailureReason.UNKNOWN return HTLCEvent( - timestamp=datetime.utcnow(), + timestamp=datetime.now(timezone.utc), event_type=event_type, incoming_channel_id=event_data.get('incoming_channel_id'), outgoing_channel_id=event_data.get('outgoing_channel_id'), @@ -250,6 +271,16 @@ class HTLCMonitor: channel_id = event.outgoing_channel_id if channel_id not in self.channel_stats: + # Prevent unbounded memory growth + if len(self.channel_stats) >= self.max_channels: + # Remove least active channel + oldest_channel = min( + self.channel_stats.items(), + key=lambda x: x[1].last_failure or x[1].first_seen + ) + logger.info(f"Removing inactive channel {oldest_channel[0]} (at max_channels limit)") + del self.channel_stats[oldest_channel[0]] + self.channel_stats[channel_id] = ChannelFailureStats(channel_id=channel_id) stats = self.channel_stats[channel_id] @@ -343,16 +374,21 @@ class HTLCMonitor: def cleanup_old_data(self): """Remove data older than history_hours""" - cutoff = datetime.utcnow() - timedelta(hours=self.history_hours) + cutoff = datetime.now(timezone.utc) - timedelta(hours=self.history_hours) # Clean old events while self.events and self.events[0].timestamp < cutoff: self.events.popleft() - # Clean old channel stats + # Clean old channel stats (inactive channels) + channels_removed = 0 for channel_id in list(self.channel_stats.keys()): stats = self.channel_stats[channel_id] + # Remove if no activity in the history window if stats.last_failure and stats.last_failure < cutoff: del self.channel_stats[channel_id] + channels_removed += 1 - logger.debug(f"Cleaned up old HTLC data (cutoff: {cutoff})") + if channels_removed > 0: + logger.info(f"Cleaned up {channels_removed} inactive channels (cutoff: {cutoff})") + logger.debug(f"Active channels: {len(self.channel_stats)}") diff --git a/src/monitoring/opportunity_analyzer.py b/src/monitoring/opportunity_analyzer.py index 7faba11..4d0cd0b 100644 --- a/src/monitoring/opportunity_analyzer.py +++ b/src/monitoring/opportunity_analyzer.py @@ -1,8 +1,8 @@ """Routing Opportunity Analyzer - Identify and quantify missed routing opportunities""" import logging -from datetime import datetime, timedelta -from typing import Dict, List, Optional, Tuple +from datetime import datetime, timedelta, timezone +from typing import Dict, List, Optional, Tuple, Protocol from dataclasses import dataclass from collections import defaultdict @@ -11,6 +11,13 @@ from .htlc_monitor import HTLCMonitor, ChannelFailureStats, FailureReason logger = logging.getLogger(__name__) +class LNDManageClient(Protocol): + """Protocol for LND Manage API client""" + async def get_channel_details(self, channel_id: str) -> Dict: + """Get channel details""" + ... + + @dataclass class MissedOpportunity: """Represents a missed routing opportunity on a channel""" @@ -57,7 +64,7 @@ class OpportunityAnalyzer: def __init__(self, htlc_monitor: HTLCMonitor, - lnd_manage_client=None, + lnd_manage_client: Optional[LNDManageClient] = None, min_opportunity_sats: int = 100, analysis_window_hours: int = 24): """ @@ -114,7 +121,7 @@ class OpportunityAnalyzer: opportunity.missed_volume_sats = stats.total_missed_amount_msat / 1000 # Calculate potential monthly revenue (extrapolate from current period) - hours_monitored = (datetime.utcnow() - stats.first_seen).total_seconds() / 3600 + hours_monitored = (datetime.now(timezone.utc) - stats.first_seen).total_seconds() / 3600 if hours_monitored > 0: hours_in_month = 24 * 30 opportunity.potential_monthly_revenue_sats = ( @@ -321,7 +328,7 @@ class OpportunityAnalyzer: async def export_opportunities_json(self, opportunities: List[MissedOpportunity]) -> Dict: """Export opportunities as JSON-serializable dict""" return { - 'analysis_timestamp': datetime.utcnow().isoformat(), + 'analysis_timestamp': datetime.now(timezone.utc).isoformat(), 'analysis_window_hours': self.analysis_window_hours, 'total_opportunities': len(opportunities), 'total_missed_revenue_sats': sum(o.missed_revenue_sats for o in opportunities),