如何实现一个监听txpool功能的以太坊客户端

官方的 go-ethereum 库使用 SubscribePendingTransactions 监听最新的交易,只能得到对应的 hash 值,为了实现监听 txpool 的功能,需要以下改动

参考下面这个 go-ethereum 的 pr

https://github.com/ethereum/go-ethereum/pull/25186

合并该 pr 的代码后,自己 build 出 docker 镜像

然后编写 docker-compose 文件

version: "3"

services:
  geth:
    image: go-ethereum
    pull_policy: always
    container_name: gethnode
    restart: unless-stopped
    ports:
      - "30303:30303"
      - "30303:30303/udp"
      - "38545:8545"
      - "38546:8546"
    volumes:
      - ./data:/root/.ethereum
    stop_signal: SIGINT
    stop_grace_period: 2m
    command:
      - --http
      - --http.api
      - "eth,net,web3,txpool,personal"
      - --http.addr=0.0.0.0
      - --http.vhosts=*
      - --http.corsdomain=*
      - --ws
      - --ws.api
      - --ws.origins=*
      - --ws.addr=0.0.0.0
      - --ws.api
      - "eth,net,web3,txpool,personal"
      - --graphql
      - --graphql.corsdomain=*
      - --graphql.vhosts=*
      # Minimum number of executable transaction slots guaranteed per account (default: 16)
      # - --txpool.accountslots=16
      # Maximum number of non-executable transaction slots for all accounts (default: 1024)
      # --txpool.globalqueue=1024
      # Maximum number of executable transaction slots for all accounts (default: 4096)
      # - --txpool.globalslots=10000
      # Maximum amount of time non-executable transaction are queued (default: 3h0m0s)
      # - --txpool.lifetime=3h0m0s
      # Megabytes of memory allocated to internal caching (default: 1024)
      # - --cache=4096
      # Maximum number of network peers (network disabled if set to 0) (default: 25)
      # - --maxpeers=100
      # Blockchain sync mode ("fast", "full", or "light")
      # - --syncmode=fast
      # Blockchain garbage collection mode ("full", "archive") (default: "full")
      # - --gcmode=full
    logging:
      driver: "json-file"
      options:
        max-size: "20m"
        max-file: "10"

go-ethereum的客户端依赖也要改为对应合并 pr 后的

然后使用节点 ws:// ip:38546 连接 eth 客户端,即可使用 SubscribePendingTransactions 方法监听到完整的 tx 信息

func TestTxPool(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
    defer cancel()

    rpcClient, err := rpc.Dial("ws://xxx:38546")
    if err != nil {
        t.Fatal(err)
    }

    ec := gethclient.New(rpcClient)

    ch := make(chan *types.Transaction)
    sub, err := ec.SubscribePendingTransactions(ctx, ch)
    if err != nil {
        t.Fatal(err)
    }

    for {
        select {
        case err = <-sub.Err():
            t.Fatal(err)
        case tx := <-ch:
            if tx.To() == nil {
                continue
            }
            if strings.ToLower(tx.To().String()) == strings.ToLower("0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45") {
                t.Logf("gas:%d hash: %s", tx.GasPrice(), tx.Hash())
            }
        }
    }
}