<100 subscribers
Share Dialog
Share Dialog

所以对于大部分脚本任务,协程是最简单也是最高效的方案,虽然还是单线程,但是创建开销小
单线程,事件循环轮询:“等网络?切下一个任务”
脚本中链上余额查询使用异步的示例
import asyncio # → 导入异步框架,用于协程和事件循环
import aiohttp # → 导入异步 HTTP 客户端库(比 requests 快 10 倍)
# 异步函数:查询单个地址的 ETH 余额
async def get_balance(session, addr):
"""
使用 JSON-RPC 调用 Alchemy 节点,查询指定地址的余额
参数:
session: 复用的 aiohttp.ClientSession(避免重复创建连接)
addr: 以太坊地址(字符串)
返回:
(地址, 余额 ETH) 元组
"""
url = "https://eth-mainnet.g.alchemy.com/v2/xxx" # → Alchemy RPC 节点地址(需替换为你的 API Key)
# JSON-RPC 请求体
payload = {
"jsonrpc": "2.0", # → JSON-RPC 协议版本
"method": "eth_getBalance", # → 以太坊方法:获取余额
"params": [addr, "latest"], # → 参数:[地址, 区块标签]
"id": 1 # → 请求 ID,用于匹配响应
}
# 异步发送 POST 请求(关键:async with + await)
async with session.post(url, json=payload) as resp:
# → session.post:异步发起请求
# → async with:自动关闭响应对象
# → resp: 响应对象(类似 requests.Response)
data = await resp.json() # → 异步解析 JSON 响应(不会阻塞)
# 提取余额(16 进制字符串)→ 转为整数 → 除以 1e18 转为 ETH
balance_wei = int(data["result"], 16) # → "0x..." → 十进制 Wei
balance_eth = balance_wei / 1e18 # → 1 ETH = 10^18 Wei
return addr, balance_eth # → 返回 (地址, 余额)
# 主异步函数:并发查询多个地址
async def main():
"""
1. 创建一个 HTTP 会话(连接池)
2. 创建多个查询任务
3. 并发执行所有任务
4. 收集并打印结果
"""
# 创建一个异步 HTTP 会话(推荐:整个程序只创建一个)
async with aiohttp.ClientSession() as session:
# → 自动管理连接池、Keep-Alive、DNS 缓存
# → 所有请求共享同一个 session,性能更高
# 创建任务列表:每个地址一个协程任务
tasks = [get_balance(session, addr) for addr in ADDRESSES]
# → 列表推导式,不执行,仅创建协程对象
# 并发执行所有任务(真正并发的关键!)
results = await asyncio.gather(*tasks)
# → *tasks:解包列表为多个参数
# → gather:并发运行所有协程,任意一个完成就返回
# → 等待所有任务完成,返回结果列表
# 遍历结果并打印
for addr, bal in results:
print(f"{addr}: {bal:.6f} ETH") # → 保留 6 位小数
# 地址列表(可扩展到 1000 个)
ADDRESSES = [
"0x1234567890123456789012345678901234567890",
"0xAbcDefGhiJklMnoPqrStuVwxYz1234567890abcd",
# ... 更多地址
]
# 启动事件循环,运行主协程
if __name__ == "__main__":
asyncio.run(main())
# → 创建事件循环 → 运行 main() → 结束后关闭循环函数 | 作用 | 位置 |
启动整个异步程序(入口) | 只能在同步代码中调用 | |
asyncio.gather() | 并发运行多个协程(并行) | 只能在 async def 函数内用 await |
其中get_balance是async 修饰的异步函数所以他只返回协程对象(coroutine)而不会立即执行,生成的数组丢给gather函数后才会批量执行
此外如果需要设置最大协程数量可以用下面这个方法
import asyncio
import aiohttp
# 1. 创建信号量:最多 20 个协程同时运行
MAX_CONCURRENT = 20
sem = asyncio.Semaphore(MAX_CONCURRENT)
async def get_balance(session, addr):
async with sem: # ← 自动获取/释放“通行证”
url = "https://eth-mainnet.g.alchemy.com/v2/xxx"
payload = {"jsonrpc": "2.0", "method": "eth_getBalance", "params": [addr, "latest"], "id": 1}
async with session.post(url, json=payload) as resp:
data = await resp.json()
balance = int(data["result"], 16) / 1e18
print(f"{addr}: {balance} ETH")
return addr, balance
async def main():
async with aiohttp.ClientSession() as session:
tasks = [get_balance(session, addr) for addr in ADDRESSES]
await asyncio.gather(*tasks) # ← 自动限流
ADDRESSES = ["0x1...", "0x2...", ...] * 100 # 模拟 100 个地址
asyncio.run(main())asyncio.run() 只能接收一个协程对象,所以不能直接传递gather()方法返回的协程对象数组或者解包后的多个协程参数,必须使用一个异步函数(协程对象)进行包装后交给run方法使用,就像下面这样
import asyncio
import random
from eth_typing import ChecksumAddress
from loguru import logger
from web3 import AsyncWeb3, AsyncHTTPProvider, Web3
from web3.middleware import async_geth_poa_middleware
from eth_account import Account as EthereumAccount
from tabulate import tabulate
from utils.password_handler import get_wallet_data
from config import RPC
async def get_nonce(address: ChecksumAddress):
web3 = AsyncWeb3(
AsyncHTTPProvider(random.choice(RPC["base"]["rpc"])),
middlewares=[async_geth_poa_middleware],
)
nonce = await web3.eth.get_transaction_count(address)
return nonce
if __name__ == '__main__':
address1 = Web3.to_checksum_address('0x0060dc9ddf206f1a400af95aace88b61c377508f')
address2 = Web3.to_checksum_address('0x9ffee66938e40df8255eebdf3b3d1f1db1ec8931')
# 正确的方式:创建一个包装协程来运行 gather
async def run_multiple():
tasks = await asyncio.gather(get_nonce(address1), get_nonce(address2))
return tasks
nonce = asyncio.run(run_multiple())
print(nonce)
所以对于大部分脚本任务,协程是最简单也是最高效的方案,虽然还是单线程,但是创建开销小
单线程,事件循环轮询:“等网络?切下一个任务”
脚本中链上余额查询使用异步的示例
import asyncio # → 导入异步框架,用于协程和事件循环
import aiohttp # → 导入异步 HTTP 客户端库(比 requests 快 10 倍)
# 异步函数:查询单个地址的 ETH 余额
async def get_balance(session, addr):
"""
使用 JSON-RPC 调用 Alchemy 节点,查询指定地址的余额
参数:
session: 复用的 aiohttp.ClientSession(避免重复创建连接)
addr: 以太坊地址(字符串)
返回:
(地址, 余额 ETH) 元组
"""
url = "https://eth-mainnet.g.alchemy.com/v2/xxx" # → Alchemy RPC 节点地址(需替换为你的 API Key)
# JSON-RPC 请求体
payload = {
"jsonrpc": "2.0", # → JSON-RPC 协议版本
"method": "eth_getBalance", # → 以太坊方法:获取余额
"params": [addr, "latest"], # → 参数:[地址, 区块标签]
"id": 1 # → 请求 ID,用于匹配响应
}
# 异步发送 POST 请求(关键:async with + await)
async with session.post(url, json=payload) as resp:
# → session.post:异步发起请求
# → async with:自动关闭响应对象
# → resp: 响应对象(类似 requests.Response)
data = await resp.json() # → 异步解析 JSON 响应(不会阻塞)
# 提取余额(16 进制字符串)→ 转为整数 → 除以 1e18 转为 ETH
balance_wei = int(data["result"], 16) # → "0x..." → 十进制 Wei
balance_eth = balance_wei / 1e18 # → 1 ETH = 10^18 Wei
return addr, balance_eth # → 返回 (地址, 余额)
# 主异步函数:并发查询多个地址
async def main():
"""
1. 创建一个 HTTP 会话(连接池)
2. 创建多个查询任务
3. 并发执行所有任务
4. 收集并打印结果
"""
# 创建一个异步 HTTP 会话(推荐:整个程序只创建一个)
async with aiohttp.ClientSession() as session:
# → 自动管理连接池、Keep-Alive、DNS 缓存
# → 所有请求共享同一个 session,性能更高
# 创建任务列表:每个地址一个协程任务
tasks = [get_balance(session, addr) for addr in ADDRESSES]
# → 列表推导式,不执行,仅创建协程对象
# 并发执行所有任务(真正并发的关键!)
results = await asyncio.gather(*tasks)
# → *tasks:解包列表为多个参数
# → gather:并发运行所有协程,任意一个完成就返回
# → 等待所有任务完成,返回结果列表
# 遍历结果并打印
for addr, bal in results:
print(f"{addr}: {bal:.6f} ETH") # → 保留 6 位小数
# 地址列表(可扩展到 1000 个)
ADDRESSES = [
"0x1234567890123456789012345678901234567890",
"0xAbcDefGhiJklMnoPqrStuVwxYz1234567890abcd",
# ... 更多地址
]
# 启动事件循环,运行主协程
if __name__ == "__main__":
asyncio.run(main())
# → 创建事件循环 → 运行 main() → 结束后关闭循环函数 | 作用 | 位置 |
启动整个异步程序(入口) | 只能在同步代码中调用 | |
asyncio.gather() | 并发运行多个协程(并行) | 只能在 async def 函数内用 await |
其中get_balance是async 修饰的异步函数所以他只返回协程对象(coroutine)而不会立即执行,生成的数组丢给gather函数后才会批量执行
此外如果需要设置最大协程数量可以用下面这个方法
import asyncio
import aiohttp
# 1. 创建信号量:最多 20 个协程同时运行
MAX_CONCURRENT = 20
sem = asyncio.Semaphore(MAX_CONCURRENT)
async def get_balance(session, addr):
async with sem: # ← 自动获取/释放“通行证”
url = "https://eth-mainnet.g.alchemy.com/v2/xxx"
payload = {"jsonrpc": "2.0", "method": "eth_getBalance", "params": [addr, "latest"], "id": 1}
async with session.post(url, json=payload) as resp:
data = await resp.json()
balance = int(data["result"], 16) / 1e18
print(f"{addr}: {balance} ETH")
return addr, balance
async def main():
async with aiohttp.ClientSession() as session:
tasks = [get_balance(session, addr) for addr in ADDRESSES]
await asyncio.gather(*tasks) # ← 自动限流
ADDRESSES = ["0x1...", "0x2...", ...] * 100 # 模拟 100 个地址
asyncio.run(main())asyncio.run() 只能接收一个协程对象,所以不能直接传递gather()方法返回的协程对象数组或者解包后的多个协程参数,必须使用一个异步函数(协程对象)进行包装后交给run方法使用,就像下面这样
import asyncio
import random
from eth_typing import ChecksumAddress
from loguru import logger
from web3 import AsyncWeb3, AsyncHTTPProvider, Web3
from web3.middleware import async_geth_poa_middleware
from eth_account import Account as EthereumAccount
from tabulate import tabulate
from utils.password_handler import get_wallet_data
from config import RPC
async def get_nonce(address: ChecksumAddress):
web3 = AsyncWeb3(
AsyncHTTPProvider(random.choice(RPC["base"]["rpc"])),
middlewares=[async_geth_poa_middleware],
)
nonce = await web3.eth.get_transaction_count(address)
return nonce
if __name__ == '__main__':
address1 = Web3.to_checksum_address('0x0060dc9ddf206f1a400af95aace88b61c377508f')
address2 = Web3.to_checksum_address('0x9ffee66938e40df8255eebdf3b3d1f1db1ec8931')
# 正确的方式:创建一个包装协程来运行 gather
async def run_multiple():
tasks = await asyncio.gather(get_nonce(address1), get_nonce(address2))
return tasks
nonce = asyncio.run(run_multiple())
print(nonce)
No comments yet