# 用 Python 和币安 API 构建数字货币交易机器人（三）

By [quantbang](https://paragraph.com/@quantbang) · 2022-03-24

---

欢迎来到本系列的第三部分。

**数据集创建**

首先，让我们介绍一个新的“数据集”业务对象来对价格进行分组。

`./models/dataset.py`

    from datetime import datetime
    
    from api import utils
    from models.model import AbstractModel
    from models.exchange import Exchange
    from models.currency import Currency
    
    class Dataset(AbstractModel):
        resource_name = 'datasets'
    
        pair: str = ''
        exchange: str = ''
        period_start: str = ''
        period_end: str = ''
        currency: str = ''
        asset: str = ''
    
        relations = {'exchange': Exchange, 'currency': Currency, 'asset': Currency}
    
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            self.pair = self.get_pair()
    
        def get_pair(self):
            return utils.format_pair(self.currency, self.asset)
    

**引入服务对象**

我们需要构建一个服务来解析和加载Binance交易所或任何其他具有API和此类历史行情自动收录器端点的历史数据。

`./services/importer.py`

    import sys
    from datetime import datetime
    from models.dataset import Dataset
    
    class Importer:
        def __init__(self, exchange, period_start: datetime, period_end=None, interval=60, *args, **kwargs):
            self.exchange = exchange
            self.interval = interval
            self.period_start = period_start
            self.period_end = period_end
            self.start = datetime.now()
            self.dataset = Dataset().create(
                data={'exchange': '/api/exchanges/'+self.exchange.name.lower(), 'periodStart': self.period_start, 'periodEnd': self.period_end,
                      'candleSize': 60,
                      'currency': '/api/currencies/'+self.exchange.currency.lower(), 'asset': '/api/currencies/'+self.exchange.asset.lower()})
    
        def process(self):
            for price in self.exchange.historical_symbol_ticker_candle(self.period_start, self.period_end, self.interval):
                print(price.create({'dataset': '/api/datasets/'+self.dataset.uuid}))
    
            execution_time = datetime.now() - self.start
            print('Execution time: ' + str(execution_time.total_seconds()) + ' seconds')
            sys.exit()
    

这项服务职责非常简单明了，他的名字说明了一切，我们从交易所导入和存储历史行情自动收录器数据。

在这里，您可以将对象直接存储在诸如PostgreSQL之类的关系数据库中，也可以构建内部REST API并将其用作数据库的代理，以实现高性能。

**回测**

回测是编写您未来的机器人并根据历史行情自动收录器数据针对所有市场情况进行测试的最重要工具。

为此，我们将创建一个回测服务，他的职责是从您当前的本地数据中加载数据集，如果找不到，则直接从交易所加载（默认情况下为Binance）。然后针对历史数据集中的每个价格数据运行给定策略。

`/services/backtest.py`

    import sys
    from datetime import datetime
    
    from exchanges.exchange import Exchange
    from models.dataset import Dataset
    from models.price import Price
    
    class Backtest:
        def __init__(self, exchange: Exchange, period_start: datetime, period_end=None, interval=60):
            self.launchedAt = datetime.now()
            # Try to find dataset
            dataset = Dataset().query('get', {"exchange": '/api/exchanges/' + exchange.name.lower(),
                                              "currency": '/api/currencies/' + exchange.currency.lower(),
                                              "asset": '/api/currencies/' + exchange.asset.lower(),
                                              "period_start": period_start, "period_end": period_end, "candleSize": interval})
    
            if dataset and len(dataset) > 0:
                print(dataset[0])
                price = Price()
                for price in price.query('get', {"dataset": dataset[0]['uuid']}):
                    newPrice = Price()
                    newPrice.populate(price)
                    exchange.strategy.set_price(newPrice)
                    exchange.strategy.run()
            else:
                print("Dataset not found, external API call to " + exchange.name)
                for price in exchange.historical_symbol_ticker_candle(period_start, period_end, interval):
                    exchange.strategy.set_price(price)
                    exchange.strategy.run()
    
            execution_time = datetime.now() - self.launchedAt
            print('Execution time: ' + str(execution_time.total_seconds()) + ' seconds')
            sys.exit()
    

**项目配置**

我们将使用dotenv库来管理环境变量。这是项目的默认值：

`./.env.local`

    AVAILABLE_EXCHANGES="coinbase,binance"
    EXCHANGE="binance"
    
    BINANCE_API_KEY="Your Binance API KEY"
    BINANCE_API_SECRET="Your Binance API SECRET"
    
    COINBASE_API_KEY="Your Coinbase API KEY""
    COINBASE_API_SECRET="Your Coinbase API SECRET""
    
    # Available modes
    # "trade" to trade on candlesticks
    # "live" to live trade throught WebSocket
    # "backtest" to test a strategy for a given symbol pair and a period
    # "import" to import dataset from exchanges for a given symbol pair and a period
    MODE="trade"
    STRATEGY="logger"
    # Allow trading "test" mode or "real" trading
    TRADING_MODE="test"
    # Default candle size in seconds
    CANDLE_INTERVAL=60
    CURRENCY="BTC"
    ASSET="EUR"
    # Default period for backtesting: string in UTC format
    PERIOD_START="2021-02-28T08:49"
    PERIOD_END="2021-03-09T08:49"
    
    DATABASE_URL="postgresql://postgres:password@127.0.0.1:15432/cryptobot"
    

**主线程**

然后将所有这些部分放到一个主线程上，主要是使用args以及环境变量的CLI命令。

这样，我们可以覆盖任何默认环境设置，并直接使用基于命令行的客户端直接调整所有输入参数。

例如，在使用诸如Docker之类的容器化工具时，它也确实非常有用，只需启动此主线程，它将与特定容器的环境变量一起运行。

我们将根据设置动态加载和导入我们创建的每个组件。

`./main.py`

    #!/usr/bin/python3
    
    import importlib
    import signal
    import sys
    import threading
    from decouple import config
    
    from services.backtest import Backtest
    from services.importer import Importer
    
    exchange_name = config('EXCHANGE')
    available_exchanges = config('AVAILABLE_EXCHANGES').split(',')
    mode: str = config('MODE')
    strategy: str = config('STRATEGY')
    trading_mode: str = config('TRADING_MODE')
    interval: int = int(config('CANDLE_INTERVAL'))
    currency: str = config('CURRENCY')
    asset: str = config('ASSET')
    
    if trading_mode == 'real':
        print("*** Caution: Real trading mode activated ***")
    else:
        print("Test mode")
    
    # Parse symbol pair from first command argument
    if len(sys.argv) > 1:
        currencies = sys.argv[1].split('_')
        if len(currencies) > 1:
            currency = currencies[0]
            asset = currencies[1]
    
    # Load exchange
    print("Connecting to {} exchange...".format(exchange_name[0].upper() + exchange_name[1:]))
    exchangeModule = importlib.import_module('exchanges.' + exchange_name, package=None)
    exchangeClass = getattr(exchangeModule, exchange_name[0].upper() + exchange_name[1:])
    exchange = exchangeClass(config(exchange_name.upper() + '_API_KEY'), config(exchange_name.upper() + '_API_SECRET'))
    
    # Load currencies
    exchange.set_currency(currency)
    exchange.set_asset(asset)
    
    # Load strategy
    strategyModule = importlib.import_module('strategies.' + strategy, package=None)
    strategyClass = getattr(strategyModule, strategy[0].upper() + strategy[1:])
    exchange.set_strategy(strategyClass(exchange, interval))
    
    # mode
    print("{} mode on {} symbol".format(mode, exchange.get_symbol()))
    if mode == 'trade':
        exchange.strategy.start()
    
    elif mode == 'live':
        exchange.start_symbol_ticker_socket(exchange.get_symbol())
    
    elif mode == 'backtest':
        period_start = config('PERIOD_START')
        period_end = config('PERIOD_END')
    
        print(
            "Backtest period from {} to {} with {} seconds candlesticks.".format(
                period_start,
                period_end,
                interval
            )
        )
        Backtest(exchange, period_start, period_end, interval)
    
    elif mode == 'import':
        period_start = config('PERIOD_START')
        period_end = config('PERIOD_END')
    
        print(
            "Import mode on {} symbol for period from {} to {} with {} seconds candlesticks.".format(
                exchange.get_symbol(),
                period_start,
                period_end,
                interval
            )
        )
        importer = Importer(exchange, period_start, period_end, interval)
        importer.process()
    
    else:
        print('Not supported mode.')
    
    def signal_handler(signal, frame):
        if (exchange.socket):
            print('Closing WebSocket connection...')
            exchange.close_socket()
            sys.exit(0)
        else:
            print('stopping strategy...')
            exchange.strategy.stop()
            sys.exit(0)
    
    # Listen for keyboard interrupt event
    signal.signal(signal.SIGINT, signal_handler)
    forever = threading.Event()
    forever.wait()
    exchange.strategy.stop()
    sys.exit(0)
    

**用法**

    # Real time trading mode via WebSocket
    MODE=live ./main.py BTC_EUR
    
    # Trading mode with default 1 minute candle
    MODE=trade ./main.py BTC_EUR
    
    # Import data from Exchange
    MODE=import ./main.py BTC_EUR
    
    # Backtest with an imported dataset or Binance Exchange API
    MODE=backtest ./main.py BTC_EUR
    

您可以轻松地覆盖任何设置，如下所示：

    PERIOD_START="2021-04-16 00:00" PERIOD_END="2021-04-16 00:00" STRATEGY=myCustomStrategy MODE=backtest ./main.py BTC_EUR
    

要退出测试模式并进行真实交易，只需将“ trading\_mode”从“ test”切换为“ real”。谨慎使用，后果自负。

    TRADING_MODE=real ./main.py BTC_EUR
    

**集成化项目**

我们可以使用Docker对该程序进行容器化。这是一个简单的自我解释Docker构建文件的例子。

    FROM python:3.9
    
    WORKDIR /usr/src/app
    
    COPY requirements.txt ./
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY . .
    
    CMD [ "python", "./main.py" ]
    

**基准**

使用带有16go DDR3 ram的旧AMD Phenom II 955四核CPU，并运行其他进程。

**导入**

导入价格并将其部署到内部API

1天的行情

    Execution time: 82.716666 seconds
    

一周的行情

    Execution time: 9,423079183 minutes
    

一个月的行情

    Execution time: 27,48139456 minutes
    

六个月的行情

    Execution time: 3.032364739 hours
    

**回测**

从导入的数据集

1天的行情

    Execution time: 3.746787 seconds
    

一周的行情

    Execution time: 46.900068 seconds
    

一个月的行情

    Execution time: 1.8953 seconds
    

六个月的行情

    Execution time: 12,15175435 minutes
    

**结论**

我们构建了一个kickass性能的实时加密交易机器人。他能够使用少量的CPU和RAM在大型市场数据集上对您的策略进行回测。从交易所导入数据集，甚至使用WebSocket实时执行实时交易。

**更进一步**

*   编写一个涵盖所有程序行为的测试套件，以确保将来不会退化。
    
*   构建并使用内部Rest API实时持久保存所有加密货币交易所市场数据。
    
*   建立最终用户客户端，例如移动应用或网络应用。使用WebSocket或服务器发送事件，以显示实时指标。
    

感谢您阅读这三部分的文章，内容涉及如何使用python 3和Binance API构建加密机器人。

---

*Originally published on [quantbang](https://paragraph.com/@quantbang/python-api-3)*
