# Aptos Indexer

By [zoie](https://paragraph.com/@zoie) · 2022-11-10

---

Aptos Indexer
-------------

由于move的特性，数据是存储在用户地址下。比如

    //alice拥有100DAI,bob有300DAI
    alice 0xaa..ee => 0x1::coin::CoinStore<0x999..8::coins::DAI> {value: 100}
    bob   0xbb..bb => 0x1::coin::CoinStore<0x999..8::coins::DAI> {value: 300}
    //这个关系的意思是，链上建的索引是，你可以根据alice和bob的地址的去查询到地址下的100DAI和300DAI
    

如果想要统计DAI的Holder或者是Transfer，就不那么方便。

Aptos提供了Indexer的服务，在启动aptos node的时候可以配置indexer服务。

[indexer服务](https://aptos.dev/guides/indexing)的作用类似于ethereum里的The Graph，根据链上的行为来存储额外的表数据，比如存储在postgres里。

indexer服务默认会收集Coin/Token的信息，你也可以根据[文档](https://aptos.dev/guides/indexing)来收集自定义数据。

indexer服务提供的查询数据库比如postgres里的数据的方式跟The Graph一致，通过graphql来查询。

> //query example
> 
> *   Click on [Mainnet GraphQL server](https://cloud.hasura.io/public/graphiql?endpoint=https://indexer.mainnet.aptoslabs.com/v1/graphql) or [Testnet GraphQL server](https://cloud.hasura.io/public/graphiql?endpoint=https://indexer-testnet.staging.gcp.aptosdev.com/v1/graphql).
>     

### Coin表结构

列举几个indexer服务默认会收集的coin表结构。

    #[diesel(primary_key(
        transaction_version,
        event_account_address,
        event_creation_number,
        event_sequence_number
    ))]
    #[diesel(table_name = coin_activities)]
    pub struct CoinActivity {  //存储发生的所有有coin变动的记录   --------- 表coin_activities
        pub transaction_version: i64,
        pub event_account_address: String,
        pub event_creation_number: i64,
        pub event_sequence_number: i64,
        pub owner_address: String, //owner address == event_account_address
        pub coin_type: String, //coin的类型
        pub amount: BigDecimal,
        pub activity_type: String, //withdraw || deposit || gasfee
        pub is_gas_fee: bool,     
        pub is_transaction_success: bool,
        pub entry_function_id_str: Option<String>,
        pub block_height: i64,
        pub transaction_timestamp: chrono::NaiveDateTime,
    }
    
    
    #[diesel(primary_key(owner_address, coin_type))]
    #[diesel(table_name = current_coin_balances)]
    pub struct CurrentCoinBalance { //存储当前状态 coin balance  --------- 表current_coin_balances
        pub owner_address: String,
        pub coin_type_hash: String,
        pub coin_type: String,
        pub amount: BigDecimal,
        pub last_transaction_version: i64,
        pub last_transaction_timestamp: chrono::NaiveDateTime,
    }
    
    #[diesel(primary_key(coin_type_hash))]
    #[diesel(table_name = coin_infos)]
    pub struct CoinInfo {  //存储coin基本信息 --------- 表coin_infos
        pub coin_type_hash: String,
        pub coin_type: String,
        pub transaction_version_created: i64,
        pub creator_address: String,
        pub name: String,
        pub symbol: String,
        pub decimals: i32,
        pub transaction_created_timestamp: chrono::NaiveDateTime,
    }
    
    //token表结构，就不一一粘贴出来占空间了，大概讲一下有哪几个表就行 -- 在crates/indexer/src/models下方有具体的定义
    //1. ans -- 域名存储 存储域名子域名
    //2. collection_data 集合信息
    //3. token_activity 跟coin activity类似，就是存储token 转移的所有记录
    //4. token_ownership ，
    //..等
    

### 捋捋代码

自定义processor的方式在aptos的文档上有提到了。

主要是捋捋indexer的实现以及processor里可以得到些什么信息。

#### indexer的设计

![](https://storage.googleapis.com/papyrus_images/94191fcd6070e56153858766e272f9218ea8cd6d164b408978bcfa97d8a25154.png)

在aptos-core/indexer/src/indexer下，是indexer的实现，在runtime.rs入口，启动indexer服务。

aptos上的version，表示的是tx count all，所以每一个version都对应到一笔tx，在节点存储结构的设计中，是可以通过version来查询到tx。

这个思路大概讲几句就行

*   db 不仅需要存储业务数据，还需要存储处理过的进度，aptos上这里用的是indexer，ethereum上可能使用block number来标识。程序启动时，会根据处理过的进度已经配置设置来决定start version
    
*   主要逻辑分成2部分， fetcher和processor。
    
*   fetcher负责从**db**中不断读出已commit的txs，通过channel传递给processor
    
*   processor 处理txs，筛选/构造数据，存储在自定义的postgres中
    

#### process\_transactions

看一下coin processor的实现。

主要分两步

*   构建数据
    
*   存储数据 -- 这一块主要就是sql操作，就省略不看了。
    

来看看processor入口函数的定义，也就是上图中白色块部分

    //入参有，一批tx.   
    async fn process_transactions(
            &self,
            transactions: Vec<APITransaction>,
            start_version: u64,
            end_version: u64,
        ) -> Result<ProcessingResult, TransactionProcessingError> 
    

我们主要也是通过这一批`Vec<APITransaction>`来构造数据。

下面的代码来自[models/coin\_models/coin\_activities.rs](https://github.com/aptos-labs/aptos-core/blob/main/crates/indexer/src/models/coin_models/coin_activities.rs)。通过下面的逻辑可以构造出想要的数据

    //1. APITransaction类型，可以用Match只处理想要的交易类型
     pub enum Transaction {
        PendingTransaction(PendingTransaction),
        UserTransaction(Box<UserTransaction>),
        GenesisTransaction(GenesisTransaction),
        BlockMetadataTransaction(BlockMetadataTransaction),
        StateCheckpointTransaction(StateCheckpointTransaction),
    }
    
    
    //2. 可以从从APITransaction解析出， -- 到这里已经是交易被添加到区块后的时候了，receipt/event都已经产生了
    //a.inner.info - 相当于receipt+writesets(changes),
    //b. writesets - Account下Module/Resource的删除/写 集合
    //c.产生的events
    //d. inner.request 用户提交的原始tx 的信息
    let (txn_info, writesets, events, maybe_user_request, txn_timestamp) = match &transaction {
         APITransaction::GenesisTransaction(inner) => (
                    &inner.info,
                    &inner.info.changes,
                    &inner.events,
                    None,
                    chrono::NaiveDateTime::from_timestamp(0, 0),
                ),
          APITransaction::UserTransaction(inner) => (
                    &inner.info,
                    &inner.info.changes,
                    &inner.events,
                    Some(&inner.request),
                    parse_timestamp(inner.timestamp.0, inner.info.version.0 as i64),
                ),
                _ => return Default::default(),
    };
    
    
    //3. 拿到以上信息后，我们可以
    //a. 如果有 inner.request， 也就是这笔交易是由用户发起，会产生gas fee，可以记录gas fee    
     if let Some(user_request) = maybe_user_request {
                entry_function_id_str = match &user_request.payload {
                    TransactionPayload::EntryFunctionPayload(payload) => Some(truncate_str(
                        &payload.function.to_string(),
                        MAX_ENTRY_FUNCTION_LENGTH,
                    )),
                    _ => None,
                };
               //记录是gas fee的APT coin activities
                coin_activities.push(Self::get_gas_event(
                    txn_info,
                    user_request,
                    &entry_function_id_str,
                    txn_timestamp,
                ));
    }
    
    //b. 可以从writesets里，判断如果是WriteResource的话 && write 的resource类型是 CoinInfo或者是CoinStore的话，可以拿到对应的CoinInfo或者CoinStore的信息，（比如CoinInfo的话，就可以记录name/decimal等信息，CoinStore的话就可以记录这个人当前的balance之类的
    
    for wsc in writesets {
                let (maybe_coin_info, maybe_coin_balance_data) =
                    if let APIWriteSetChange::WriteResource(write_resource) = wsc {
                        (
                           //在from_write_resource的实现里，可以通过
                          //format!("{}::{}::{}",write_resource.data.typ.address,write_resource.data.typ.module,
               // write_resource.data.typ.name) matches! "0x1::coin::CoinInfo" | "0x1::coin::CoinStore"
                          //来判断当前这个write_resource是不是想要的resource（根据地址::module::type) 
                          //然后就可以将resource下的数据解析出来成对象CoinInfo/CoinStore
                            CoinInfo::from_write_resource(write_resource, txn_version, txn_timestamp)
                                .unwrap(),
                            CoinBalance::from_write_resource(
                                write_resource,
                                txn_version,
                                txn_timestamp,
                            )
                            .unwrap(),
                        )
                    } else {
                        (None, None)
                    };
    }
    
    
    //c.前面我们用到了writesets（Resource）, 我们也可以使用event来记录Deposit/Withdraw事件
    //同样是根据event_type来筛选我们想要的event，event_type == 0x1::coin::WithdrawEvent ｜｜0x1::coin::DepositEvent
    //记录产生Deposit|Withdraw事件的coin activities
     for event in events {
                let event_type = event.typ.to_string();
                match CoinEvent::from_event(event_type.as_str(), &event.data, txn_version).unwrap() {
                    Some(parsed_event) => coin_activities.push(Self::from_parsed_event(
                        &event_type,
                        event,
                        &parsed_event,
                        txn_version,
                        &all_event_to_coin_type,
                        txn_info.block_height.unwrap().0 as i64,
                        &entry_function_id_str,
                        txn_timestamp,
                    )),
                    None => {}
                };
    }
    

最后将数据存储在db即可。

---

*Originally published on [zoie](https://paragraph.com/@zoie/aptos-indexer)*
