Collectors
Overview
Collectors form the bedrock of the solving infrastructure by continuously streaming on-chain data from multiple protocols. The system leverages real-time blockchain indexing to capture state changes across DEX protocols, enabling immediate route evaluation and arbitrage detection.
Using our Solution
Our collectors enable solvers and market makers to:
- Real-time Data Streaming: Monitor blockchain state changes across multiple protocols in real-time
- Multi-protocol Support: Index data from Uniswap V2/V3/V4, Balancer, Curve, SushiSwap, and more
- TVL-based Filtering: Focus on pools with significant liquidity to optimize processing
- State Change Triggers: Automatically trigger route evaluations when pool states change
Solution Overview
The collection architecture is built on Tycho Indexer using substreams technology. This provides real-time state updates for multiple protocols filtered by TVL values.
Key Components
Liquidity Mapping Collector
The liquidity mapping collector system provides comprehensive chain indexing capabilities through integration with Tycho protocol streams, enabling real-time monitoring of blockchain state changes across multiple supported chains.
Streaming Engine Collector
The MinimalStreamingEngine serves as the central orchestrator for the entire arbitrage system, coordinating real-time data ingestion, route discovery, profitability evaluation, and transaction execution.
Architecture
The collectors follow a modular design with:
- Data Collection: Real-time streaming from blockchain via WebSocket connections
- State Processing: Live protocol state updates with incremental changes
- Graph Building: Dynamic graph construction as new pools are discovered
- Route Discovery: Automatic route calculation for new trading pairs
- Performance Optimization: Microsecond-level processing with memory caching
Persistence
The system uses RocksDB as its primary database engine with a column family architecture optimized for high-performance operations:
Column Family | Purpose | Key Format | Value Format |
---|---|---|---|
tokens | Token metadata | token:<address> | Serialized Token struct |
graph | Graph edges | graph:<token_address> | Serialized Vec<CompactEdge> |
routes | Calculated routes | route:<route_id> | Serialized Route struct |
signals | Route signals | signal:<signal_id> | Serialized RouteSignal struct |
Performance Optimizations:
- Write Batching: 100 operations per batch with 100ms flush interval
- Asynchronous Writes: Non-blocking write operations via dedicated writer thread
- Memory Caching: In-memory route storage with O(1) pool index lookup
- Incremental Updates: Only recalculates affected routes on state changes
Technical Reference
Stream Message Processing
async fn process_stream_message(&mut self, message: StreamMessage) -> Result<()> {
// Update pool states
self.update_pool_states(&message).await?;
// Check for new trading pairs
if self.has_new_pairs(&message) {
self.process_new_pools(&message).await?;
}
// Re-evaluate existing routes if state changed
if self.has_state_updates(&message) {
self.re_evaluate_routes().await?;
}
}
State Update Flow
The streaming system processes real-time blockchain data through a sophisticated pipeline:
- State Updates: Pool states updated in real-time
- Graph Updates: Trading graph rebuilt incrementally
- Route Calculation: New routes calculated for affected pools
- Route Evaluation: Routes evaluated for profitability
- Execution: Profitable routes executed automatically
Performance Metrics
The collectors achieve high-performance data processing:
- Route Evaluation: Over 1,000 routes per second
- Average Processing Time: 424 microseconds per route
- Memory Usage: ~657 MB for full operation
- Database Operations: High-throughput with batched writes
- Success Rate: 100% execution success rate in production testing
Configuration
Key configuration parameters for optimal performance:
- TVL Filtering: Minimum TVL thresholds (1-50 ETH depending on chain)
- Protocol Selection: 2-7 protocols per chain depending on liquidity
- Batch Processing: Configurable batch sizes for write operations
- Memory Management: Adjustable cache sizes and flush intervals