Aptos Indexer

Aptos Indexer

由于move的特性,数据是存储在用户地址下。比如

//alice拥有100DAI,bob有300DAI
alice 0xaa..ee => 0x1::coin::CoinStore<0x999..8::coins::DAI> {value: 100}
bob   0xbb..bb => 0x1::coin::CoinStore<0x999..8::coins::DAI> {value: 300}
//这个关系的意思是,链上建的索引是,你可以根据alice和bob的地址的去查询到地址下的100DAI和300DAI

如果想要统计DAI的Holder或者是Transfer,就不那么方便。

Aptos提供了Indexer的服务,在启动aptos node的时候可以配置indexer服务。

indexer服务的作用类似于ethereum里的The Graph,根据链上的行为来存储额外的表数据,比如存储在postgres里。

indexer服务默认会收集Coin/Token的信息,你也可以根据文档来收集自定义数据。

indexer服务提供的查询数据库比如postgres里的数据的方式跟The Graph一致,通过graphql来查询。

//query example

Coin表结构

列举几个indexer服务默认会收集的coin表结构。

#[diesel(primary_key(
    transaction_version,
    event_account_address,
    event_creation_number,
    event_sequence_number
))]
#[diesel(table_name = coin_activities)]
pub struct CoinActivity {  //存储发生的所有有coin变动的记录   --------- 表coin_activities
    pub transaction_version: i64,
    pub event_account_address: String,
    pub event_creation_number: i64,
    pub event_sequence_number: i64,
    pub owner_address: String, //owner address == event_account_address
    pub coin_type: String, //coin的类型
    pub amount: BigDecimal,
    pub activity_type: String, //withdraw || deposit || gasfee
    pub is_gas_fee: bool,     
    pub is_transaction_success: bool,
    pub entry_function_id_str: Option<String>,
    pub block_height: i64,
    pub transaction_timestamp: chrono::NaiveDateTime,
}


#[diesel(primary_key(owner_address, coin_type))]
#[diesel(table_name = current_coin_balances)]
pub struct CurrentCoinBalance { //存储当前状态 coin balance  --------- 表current_coin_balances
    pub owner_address: String,
    pub coin_type_hash: String,
    pub coin_type: String,
    pub amount: BigDecimal,
    pub last_transaction_version: i64,
    pub last_transaction_timestamp: chrono::NaiveDateTime,
}

#[diesel(primary_key(coin_type_hash))]
#[diesel(table_name = coin_infos)]
pub struct CoinInfo {  //存储coin基本信息 --------- 表coin_infos
    pub coin_type_hash: String,
    pub coin_type: String,
    pub transaction_version_created: i64,
    pub creator_address: String,
    pub name: String,
    pub symbol: String,
    pub decimals: i32,
    pub transaction_created_timestamp: chrono::NaiveDateTime,
}

//token表结构,就不一一粘贴出来占空间了,大概讲一下有哪几个表就行 -- 在crates/indexer/src/models下方有具体的定义
//1. ans -- 域名存储 存储域名子域名
//2. collection_data 集合信息
//3. token_activity 跟coin activity类似,就是存储token 转移的所有记录
//4. token_ownership ,
//..等

捋捋代码

自定义processor的方式在aptos的文档上有提到了。

主要是捋捋indexer的实现以及processor里可以得到些什么信息。

indexer的设计

post image

在aptos-core/indexer/src/indexer下,是indexer的实现,在runtime.rs入口,启动indexer服务。

aptos上的version,表示的是tx count all,所以每一个version都对应到一笔tx,在节点存储结构的设计中,是可以通过version来查询到tx。

这个思路大概讲几句就行

  • db 不仅需要存储业务数据,还需要存储处理过的进度,aptos上这里用的是indexer,ethereum上可能使用block number来标识。程序启动时,会根据处理过的进度已经配置设置来决定start version

  • 主要逻辑分成2部分, fetcher和processor。

  • fetcher负责从db中不断读出已commit的txs,通过channel传递给processor

  • processor 处理txs,筛选/构造数据,存储在自定义的postgres中

process_transactions

看一下coin processor的实现。

主要分两步

  • 构建数据

  • 存储数据 -- 这一块主要就是sql操作,就省略不看了。

来看看processor入口函数的定义,也就是上图中白色块部分

//入参有,一批tx.   
async fn process_transactions(
        &self,
        transactions: Vec<APITransaction>,
        start_version: u64,
        end_version: u64,
    ) -> Result<ProcessingResult, TransactionProcessingError> 

我们主要也是通过这一批Vec<APITransaction>来构造数据。

下面的代码来自models/coin_models/coin_activities.rs。通过下面的逻辑可以构造出想要的数据

//1. APITransaction类型,可以用Match只处理想要的交易类型
 pub enum Transaction {
    PendingTransaction(PendingTransaction),
    UserTransaction(Box<UserTransaction>),
    GenesisTransaction(GenesisTransaction),
    BlockMetadataTransaction(BlockMetadataTransaction),
    StateCheckpointTransaction(StateCheckpointTransaction),
}


//2. 可以从从APITransaction解析出, -- 到这里已经是交易被添加到区块后的时候了,receipt/event都已经产生了
//a.inner.info - 相当于receipt+writesets(changes),
//b. writesets - Account下Module/Resource的删除/写 集合
//c.产生的events
//d. inner.request 用户提交的原始tx 的信息
let (txn_info, writesets, events, maybe_user_request, txn_timestamp) = match &transaction {
     APITransaction::GenesisTransaction(inner) => (
                &inner.info,
                &inner.info.changes,
                &inner.events,
                None,
                chrono::NaiveDateTime::from_timestamp(0, 0),
            ),
      APITransaction::UserTransaction(inner) => (
                &inner.info,
                &inner.info.changes,
                &inner.events,
                Some(&inner.request),
                parse_timestamp(inner.timestamp.0, inner.info.version.0 as i64),
            ),
            _ => return Default::default(),
};


//3. 拿到以上信息后,我们可以
//a. 如果有 inner.request, 也就是这笔交易是由用户发起,会产生gas fee,可以记录gas fee    
 if let Some(user_request) = maybe_user_request {
            entry_function_id_str = match &user_request.payload {
                TransactionPayload::EntryFunctionPayload(payload) => Some(truncate_str(
                    &payload.function.to_string(),
                    MAX_ENTRY_FUNCTION_LENGTH,
                )),
                _ => None,
            };
           //记录是gas fee的APT coin activities
            coin_activities.push(Self::get_gas_event(
                txn_info,
                user_request,
                &entry_function_id_str,
                txn_timestamp,
            ));
}

//b. 可以从writesets里,判断如果是WriteResource的话 && write 的resource类型是 CoinInfo或者是CoinStore的话,可以拿到对应的CoinInfo或者CoinStore的信息,(比如CoinInfo的话,就可以记录name/decimal等信息,CoinStore的话就可以记录这个人当前的balance之类的

for wsc in writesets {
            let (maybe_coin_info, maybe_coin_balance_data) =
                if let APIWriteSetChange::WriteResource(write_resource) = wsc {
                    (
                       //在from_write_resource的实现里,可以通过
                      //format!("{}::{}::{}",write_resource.data.typ.address,write_resource.data.typ.module,
           // write_resource.data.typ.name) matches! "0x1::coin::CoinInfo" | "0x1::coin::CoinStore"
                      //来判断当前这个write_resource是不是想要的resource(根据地址::module::type) 
                      //然后就可以将resource下的数据解析出来成对象CoinInfo/CoinStore
                        CoinInfo::from_write_resource(write_resource, txn_version, txn_timestamp)
                            .unwrap(),
                        CoinBalance::from_write_resource(
                            write_resource,
                            txn_version,
                            txn_timestamp,
                        )
                        .unwrap(),
                    )
                } else {
                    (None, None)
                };
}


//c.前面我们用到了writesets(Resource), 我们也可以使用event来记录Deposit/Withdraw事件
//同样是根据event_type来筛选我们想要的event,event_type == 0x1::coin::WithdrawEvent ||0x1::coin::DepositEvent
//记录产生Deposit|Withdraw事件的coin activities
 for event in events {
            let event_type = event.typ.to_string();
            match CoinEvent::from_event(event_type.as_str(), &event.data, txn_version).unwrap() {
                Some(parsed_event) => coin_activities.push(Self::from_parsed_event(
                    &event_type,
                    event,
                    &parsed_event,
                    txn_version,
                    &all_event_to_coin_type,
                    txn_info.block_height.unwrap().0 as i64,
                    &entry_function_id_str,
                    txn_timestamp,
                )),
                None => {}
            };
}

最后将数据存储在db即可。