# Streamlining Cross-Chain Queries with Hypermanager


By [Defi, Data, Degen](https://paragraph.com/@evandekim) · 2024-11-01

hypersync, envio, python, polars, on-chain, query, hypermanager

---

Introduction
------------

In today's blockchain ecosystem, querying transaction data across multiple chains has become an essential task for developers seeking to analyze trends, monitor transactions, and build cross-chain applications. Hypermanager enters as a solution to the challenges inherent in cross-chain querying. Built on [Hypersync](https://github.com/enviodev/hypersync-client-python)—a client that operates atop Envio's [Hyperindex](https://docs.envio.dev/docs/HyperIndex/overview)—[Hypermanager](https://github.com/Evan-Kim2028/hypermanager-py/tree/master) enables seamless, efficient orchestration of cross-chain log queries across [EVM networks](https://docs.envio.dev/docs/HyperSync/hypersync-supported-networks).

Hypersync provides many powerful features, including querying transactions, blocks, logs, and traces and writing results directly to Polars DataFrames, decoding logs, and saving data as parquet files. However, re-using Hypersync queries across different applications can be tedious. Developers often need to repeat the same boilerplate code to fetch blocks, logs, and transactions, leading to redundancy and inefficiency.

Hypermanager addresses these challenges by serving as a query orchestration tool that enables reusable queries across various applications and multiple chains. By centralizing configurations and reducing boilerplate code, Hypermanager streamlines the process of cross-chain data analysis, offering developers a more efficient and scalable solution. This revised version maintains the core information while providing a bit more context and a smoother transition to the next section about Hypermanager's specific benefits.

### Ease of Use

Hypermanager serves as a powerful query orchestration tool, enabling reusable queries across various applications and multiple chains. By reducing repetitive coding and centralizing configuration, it streamlines workflows and saves developers significant time and effort.

Developers new to Hypermanager will find the learning curve manageable, especially those already familiar with blockchain queries. The tool abstracts away the complexities of the Hypersync client, allowing users to focus on their data analysis tasks rather than worrying about low-level implementation details.

Hypermanager's intuitive API, comprehensive documentation, and practical examples accelerate the onboarding process for new users. The significant reduction in boilerplate code not only enhances efficiency but also lowers the entry barrier for developers venturing into blockchain data analysis.

The Power of Hypermanager: Efficiency in Action
-----------------------------------------------

The following example illustrates the contrast between using raw Hypersync code and Hypermanager to query blocks and transactions. Without Hypermanager, developers must continuously redefine transaction and block columns within each application, leading to verbose and redundant code.

    import asyncio
    import hypersync
    import polars as pl
    import time
    
    from hypersync import ColumnMapping, DataType, TransactionField, BlockField, TraceField, TransactionSelection
    
    
    async def historical_blocks_txs_sync():
        client = hypersync.HypersyncClient(
            hypersync.ClientConfig(url='http://eth.hypersync.xyz')
        )
    
        to_block: int = 20035000
        from_block: int = 20000000
    
        # # add +/-1 to the block range because the query is exclusive to the block number
        query = hypersync.Query(
            from_block=from_block-1,
            to_block=to_block+1,
            include_all_blocks=True,
            transactions=[TransactionSelection()],
            field_selection=hypersync.FieldSelection(
                block=[e.value for e in BlockField],
                transaction=[e.value for e in TransactionField],
            )
        )
        config = hypersync.StreamConfig(
            hex_output=hypersync.HexOutput.PREFIXED,
            column_mapping=ColumnMapping(
                transaction={
                    TransactionField.GAS_USED: DataType.FLOAT64,
                    TransactionField.MAX_FEE_PER_BLOB_GAS: DataType.FLOAT64,
                    TransactionField.MAX_PRIORITY_FEE_PER_GAS: DataType.FLOAT64,
                    TransactionField.GAS_PRICE: DataType.FLOAT64,
                    TransactionField.CUMULATIVE_GAS_USED: DataType.FLOAT64,
                    TransactionField.EFFECTIVE_GAS_PRICE: DataType.FLOAT64,
                    TransactionField.NONCE: DataType.INT64,
                    TransactionField.GAS: DataType.FLOAT64,
                    TransactionField.MAX_FEE_PER_GAS: DataType.FLOAT64,
                    TransactionField.MAX_FEE_PER_BLOB_GAS: DataType.FLOAT64,
                    TransactionField.VALUE: DataType.FLOAT64,
                },
                block={
                    BlockField.GAS_LIMIT: DataType.FLOAT64,
                    BlockField.GAS_USED: DataType.FLOAT64,
                    BlockField.SIZE: DataType.FLOAT64,
                    BlockField.BLOB_GAS_USED: DataType.FLOAT64,
                    BlockField.EXCESS_BLOB_GAS: DataType.FLOAT64,
                    BlockField.BASE_FEE_PER_GAS: DataType.FLOAT64,
                    BlockField.TIMESTAMP: DataType.INT64,
                }
            )
        )
    	
    	# save data into parquet files in "data" folder
        return await client.collect_parquet("data", query, config)
    
    

**Efficiency Gains with Hypermanager:**

By using Hypermanager, the same query can be completed in about 10 lines of code, reducing boilerplate by 6x and greatly improving readability:

    import asyncio
    import polars as pl
    
    from hypermanager.manager import HyperManager
    
    hypersync_client: str = "https://eth.hypersync.xyz"
    
    async def get_events():
        manager = HyperManager(url=hypersync_client)
    
        df: pl.DataFrame = await manager.get_blocks(
            from_block=20000000, to_block=20035000, save_data=True
        )
    
    if __name__ == "__main__":
        asyncio.run(get_events())

With Hypermanager, developers no longer need to manually define columns or repeatedly initialize data maps. The tool handles all standard configurations, allowing users to focus more on data analysis and customizing the data pipeline than setting all up.

Streamlined Event Decoding
--------------------------

When using Hypersync directly, developers must define and decode each event signature individually. Below is an example where each event signature is defined in Hypersync:

    import hypersync
    import asyncio
    
    
    # Define events (matching the structure from the second code block)
    event_1 = (
        "V3FundsDeposited(address inputToken,address outputToken,uint256 inputAmount,"
        "uint256 outputAmount,uint256 indexed destinationChainId,uint32 indexed depositId,"
        "uint32 quoteTimestamp,uint32 fillDeadline,uint32 exclusivityDeadline,"
        "address indexed depositor,address recipient,address exclusiveRelayer,bytes message)"
    )
    event_2 = (
        "RequestedSpeedUpV3Deposit(uint256 updatedOutputAmount,uint32 indexed depositId,"
        "address indexed depositor,address updatedRecipient,bytes updatedMessage,"
        "bytes depositorSignature)"
    )
    event_3 = (
        "FilledV3Relay(address inputToken,address outputToken,uint256 inputAmount,"
        "uint256 outputAmount,uint256 repaymentChainId,uint256 indexed originChainId,"
        "uint32 indexed depositId,uint32 fillDeadline,uint32 exclusivityDeadline,"
        "address exclusiveRelayer,address indexed relayer,address depositor,"
        "address recipient,bytes message,V3RelayExecutionEventInfo relayExecutionInfo)"
    )
    event_4 = (
        "RequestedV3SlowFill(address inputToken,address outputToken,uint256 inputAmount,"
        "uint256 outputAmount,uint256 indexed originChainId,uint32 indexed depositId,"
        "uint32 fillDeadline,uint32 exclusivityDeadline,address exclusiveRelayer,"
        "address depositor,address recipient,bytes message)"
    )
    
    # List of events
    events_set = [event_1, event_2, event_3, event_4]
    
    # Hypersync client configuration
    hypersync_client_url = "https://base.hypersync.xyz"
    contract = "0x09aea4b2242abC8bb4BB78D537A67a245A7bEC64"
    
    
    async def main():
        # Create Hypersync client using the provided base URL
        client = hypersync.HypersyncClient(hypersync.ClientConfig(url=hypersync_client_url))
    
        # Lowercase the smart contract address
        smart_contract = contract.lower()
    
        # Iterate over the list of events
        for event in events_set:
            try:
                # Convert event signature to topic0 (hash of the event signature)
                topic = hypersync.signature_to_topic0(event)
    
                # Prepare the query to fetch logs for the event
                query = hypersync.preset_query_logs_of_event(
                    smart_contract, topic, from_block=19_500_000
                )
    
                print("Running the query for event...")
    
                # Run the query and fetch the logs
                res = await client.get(query)
    
                # Check if logs were returned
                if len(res.data.logs) == 0:
                    print("No logs found for event")
                    continue
    
                # Process the logs (print the log count)
                print(f"Query returned {len(res.data.logs)} logs for event")
    
            except Exception as e:
                print(f"Error querying event, error: {e}")
                continue  # Continue to the next event
    
    
    # Run the asynchronous main function
    asyncio.run(main())
    

Something like the below. Plus the transactions and blocks decoding too.

           decoded_log={
                    "inputAmount": DataType.FLOAT64,
                    "outputAmount": DataType.FLOAT64,
                    "quoteTimestamp": DataType.INT64,
                    "fillDeadline": DataType.UINT64,
                    "exclusivityDeadline": DataType.INT64,
                    "destinationChainId": DataType.UINT64,
                    "depositId": DataType.UINT64,
                },

This setup requires manual event signature definitions and individual data type configurations for each event, increasing development time and complexity.

**Hypermanager’s Simplified Decoding Process:**

Hypermanager centralizes event decoding, enabling developers to query events across multiple chains without needing to redefine them. This simplifies cross-chain queries, as the configuration only needs to be written once. The following Hypermanager example demonstrates querying multiple events with pre-loaded decoded values:

    import asyncio
    import os
    import polars as pl
    from hypermanager.events import EventConfig
    from hypermanager.manager import HyperManager
    from hypermanager.protocols.across import (
        client_config,
        across_config,
    )
    
    async def get_events():
        # Iterate through the different HyperSync clients and their associated SpokePool addresses
        for client, spoke_pool_address in client_config.items():
            print(f"Querying events for {client.name}...")
            print(f"SpokePool Address: {spoke_pool_address.value}")
    
            # **Modified Loop**: Iterate over the values of the across_config dictionary
            for base_event_config in across_config.values():
                try:
                    manager = HyperManager(url=client.client)
    
                    # Query events using the event configuration and return the result as a Polars DataFrame
                    df: pl.DataFrame = await manager.execute_event_query(
                        base_event_config,
                        tx_data=True,
                        block_range=10_000,  # query the most recent 10,000 blocks from each chain
                    )
    
                    # Check if any events were found
                    if df.is_empty():
                        print(
                            f"No events found for {base_event_config.name} on {client.name}, continuing..."
                        )
                        continue  # Move to the next event if none found
    
    
                    print(f"Events found for {base_event_config.name} on {client.name}:")
                    print(df.shape)  
    
    
                except Exception as e:
                    print(f"Error querying {base_event_config.name} on {
                          client.name}: {e}")
    
    
    if __name__ == "__main__":
        asyncio.run(get_events())
    

With Hypermanager, events, transactions, and blocks can all be queried with decoded values already [pre-loaded](https://github.com/Evan-Kim2028/hypermanager-py/blob/master/src/hypermanager/protocols/across.py). This eliminates the need for repetitive event definitions and custom data type decoding across applications.

### Centralized Configuration and Reusability

One of Hypermanager’s core strengths is its centralized [data decoding setup.](https://github.com/Evan-Kim2028/hypermanager-py/tree/master/src/hypermanager/protocols) Once configured, the event decoding information is reusable across queries Hypermanager not only reduces setup time but also makes cross-chain querying more efficient and scalable.

In summary, Hypermanager’s orchestration abilities streamline the complexity of cross-chain log queries by centralizing configurations, reducing boilerplate, and enabling developers to quickly reuse queries across applications. With Hypermanager, the barrier to efficient cross-chain analysis is lowered, making it a vital tool for anyone working with blockchain data.

Hypermanager's centralized log repository is a collaborative effort, and developers are encouraged to contribute new logs to expand its coverage and utility. By submitting pull requests with new log configurations, developers can help grow the repository, benefiting the entire community.

### Hyperemanager + DuckDB Example:

To demonstrate the practicality and ease of integrating Hypermanager into a real-world data pipeline, I developed a backend MVP focused on key log events from the [mev-commit chain](https://docs.primev.xyz/get-started/welcome-to-primev). This proof of concept showcases the seamless integration of Hypermanager, Polars, and DuckDB, highlighting the efficiency and ergonomics of building a data pipeline for querying and storing sparse events from the mev-commit chain.

The data pipeline architecture consists of the following components:

1.  **Hypermanager**
    
2.  **Polars**
    
3.  **DuckDB**
    

By leveraging Hypermanager, the data pipeline benefits from simplified cross-chain querying and reduced boilerplate code. Hypermanager's centralized event decoding allows for seamless retrieval of log events from the mev-commit chain without the need for repetitive event definitions.

The retrieved log data is then processed using Polars, taking advantage of its high-performance DataFrame operations. Polars' integration with DuckDB enables efficient storage and querying of the processed data within the embedded OLAP database. To streamline the deployment and testing process, I dockerized the data pipeline using Docker Compose. This allows for easy setup and execution of the pipeline with a single command: `docker-compose up --build db`.

---

*Originally published on [Defi, Data, Degen](https://paragraph.com/@evandekim/streamlining-cross-chain-queries-with-hypermanager)*
