# 用 Python 和币安 API 构建数字货币交易机器人（二）

By [quantbang](https://paragraph.com/@quantbang) · 2022-03-24

---

![](https://storage.googleapis.com/papyrus_images/509e0c629c951292af63b41997bdb884c1c064f184ca08d19ff9877d1bab9427.jpg)

在本系列的第一部分中，我们讨论了文件夹结构，添加了我们的第一个Price业务对象，并为策略抽象层进行编码。

下面将通过编码我们需要的货币和订单的业务对象开始。

此处，货币业务对象存储所有不同的加密货币或法定货币。

`./models/order.py`

    from models.model import AbstractModel
    
    
    class Currency(AbstractModel):
        name: str = ''
        symbol: str = ''
        fiat: bool
    
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
    

然后是订单业务对象处理我们在交易所创建的订单。

`./models/order.py`

    from models.model import AbstractModel
    
    
    class Order(AbstractModel):
        BUY = 'BUY'
        SELL = 'SELL'
    
        TYPE_LIMIT = 'LIMIT'
        TYPE_MARKET = 'MARKET'
        TYPE_STOP_LOSS = 'STOP_LOSS'
        TYPE_STOP_LOSS_LIMIT = 'STOP_LOSS_LIMIT'
        TYPE_TAKE_PROFIT = 'TAKE_PROFIT'
        TYPE_TAKE_PROFIT_LIMIT = 'TAKE_PROFIT_LIMIT'
        TYPE_LIMIT_MAKER = 'LIMIT_MAKER'
    
        uuid = ''
        side: str = ''
        type: str = TYPE_LIMIT
        symbol: str = ''
        currency: str = ''
        asset: str = ''
        price: float = 0
        quantity: int = 0
        test: bool = False
    
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
    

然后构建交换抽象层，并开发我们的第一个连接器。由于这里不是编写API包装器的重点，因此为了方便起见，我们将使用非官方的Binance API包装器库python-binance（`https://python-binance.readthedocs.io/en/latest/binance.html`）。

`./exchanges/exchange.py`

    import datetime
    from api import utils
    from abc import ABC, abstractmethod
    from twisted.internet import reactor
    from strategies.strategy import Strategy
    from models.order import Order
    
    
    class Exchange(ABC):
        currency: str
        asset: str
        strategy: Strategy
    
        def __init__(self, key: str, secret: str):
            self.apiKey = key
            self.apiSecret = secret
            self.name = None
            self.client = None
            self.socketManager = None
            self.socket = None
            self.currency = ''
            self.asset = ''
            self.strategy = None
    
        def set_currency(self, symbol: str):
            self.currency = symbol
    
        def set_asset(self, symbol: str):
            self.asset = symbol
    
        def set_strategy(self, strategy: Strategy):
            self.strategy = strategy
    
        def compute_symbol_pair(self):
            return utils.format_pair(self.currency, self.asset)
    
        # abstract methods
    
        # Override to set current exchange symbol pair notation (default with _ separator currency_asset ex: eur_btc)
        @abstractmethod
        def get_symbol(self):
            return self.compute_symbol_pair(self)
    
        # Get current symbol ticker
        @abstractmethod
        def symbol_ticker(self):
            pass
    
        # Get current symbol ticker candle for given interval
        @abstractmethod
        def symbol_ticker_candle(self, interval):
            pass
    
        # Get current symbol historic value
        @abstractmethod
        def historical_symbol_ticker_candle(self, start: datetime, end=None, interval=60):
            pass
    
        # Get balance for a given currency
        @abstractmethod
        def get_asset_balance(self, currency):
            pass
    
        # Create an exchange order
        @abstractmethod
        def order(self, order: Order):
            pass
    
        # Create an exchange test order
        @abstractmethod
        def test_order(self, order: Order):
            pass
    
        # Check an exchange order status
        @abstractmethod
        def check_order(self, orderId):
            pass
    
        # Cancel an exchange order
        @abstractmethod
        def cancel_order(self, orderId):
            pass
    
        # WebSocket related methods
    
        @abstractmethod
        def get_socket_manager(self, purchase):
            pass
    
        @abstractmethod
        def websocket_event_handler(self, msg):
            pass
    
        def start_socket(self):
            print('Starting WebSocket connection...')
            self.socketManager.start()
    
        def close_socket(self):
            self.socketManager.stop_socket(self.socket)
            self.socketManager.close()
            # properly terminate WebSocket
            reactor.stop()
    
        @abstractmethod
        def start_symbol_ticker_socket(self, symbol: str):
            pass
    

我们的第一个Binance API连接器。

`./exchanges/binance.py`

    from datetime import datetime
    from math import floor
    
    from binance.client import Client
    from binance.enums import *
    from binance.websockets import BinanceSocketManager
    
    from api import utils
    from exchanges import exchange
    from models.order import Order
    from models.price import Price
    
    
    class Binance(exchange.Exchange):
        def __init__(self, key: str, secret: str):
            super().__init__(key, secret)
    
            self.client = Client(self.apiKey, self.apiSecret)
            self.name = self.__class__.__name__
    
        def get_client(self):
            return self.client
    
        def get_symbol(self):
            return self.currency + self.asset
    
        def symbol_ticker(self):
            response = self.client.get_symbol_ticker(symbol=self.get_symbol())
            return Price(pair=self.get_symbol(), currency=self.currency.lower(), asset=self.asset.lower(), exchange=self.name.lower(),
                         current=response['price'])
    
        def symbol_ticker_candle(self, interval=Client.KLINE_INTERVAL_1MINUTE):
            return self.client.get_klines(symbol=self.get_symbol(), interval=interval)
    
        def historical_symbol_ticker_candle(self, start: datetime, end=None, interval=Client.KLINE_INTERVAL_1MINUTE):
            # Convert default seconds interval to string like "1m"
            if isinstance(interval, int):
                interval = str(floor(interval/60)) + 'm'
    
            output = []
            for candle in self.client.get_historical_klines_generator(self.get_symbol(), interval, start, end):
                output.append(
                    Price(pair=self.compute_symbol_pair(), currency=self.currency.lower(), asset=self.asset.lower(), exchange=self.name.lower(),
                          current=candle[1], lowest=candle[3], highest=candle[2], volume=candle[5], openAt=utils.format_date(datetime.fromtimestamp(int(candle[0])/1000)))
                )
    
            return output
    
        def get_asset_balance(self, currency):
            response = self.client.get_asset_balance(currency)
            return response['free']
    
        def order(self, order: Order):
            return self.client.create_order(
                symbol=order.symbol,
                side=order.side,
                type=order.type,
                timeInForce=TIME_IN_FORCE_GTC,
                quantity=order.quantity,
                price=order.price
            )
    
        def test_order(self, order: Order):
            return self.client.create_test_order(
                symbol=order.symbol,
                side=order.side,
                type=order.type,
                timeInForce=TIME_IN_FORCE_GTC,
                quantity=order.quantity,
                price=order.price
            )
    
        def check_order(self, orderId):
            return self.client.get_order(
                symbol=self.get_symbol(),
                orderId=orderId
            )
    
        def cancel_order(self, orderId):
            return self.client.cancel_order(
                symbol=self.get_symbol(),
                orderId=orderId
            )
    
        def get_socket_manager(self):
            return BinanceSocketManager(self.client)
    
        def start_symbol_ticker_socket(self, symbol: str):
            self.socketManager = self.get_socket_manager()
            self.socket = self.socketManager.start_symbol_ticker_socket(
                symbol=self.get_symbol(),
                callback=self.websocket_event_handler
            )
    
            self.start_socket()
    
        def websocket_event_handler(self, msg):
            if msg['e'] == 'error':
                print(msg)
                self.close_socket()
            else:
                self.strategy.set_price(
                    Price(pair=self.compute_symbol_pair(), currency=self.currency, asset=self.asset, exchange=self.name,
                          current=msg['b'], lowest=msg['l'], highest=msg['h'])
                )
                self.strategy.run()
    

现在，只需调用我们的策略启动方法，我们便拥有了一个极简而强大的交易机器人系统。使用自己的指标和策略，您可以通过传递订单来开始买卖。但是，在开始编写自己的策略代码之前，请确保对其进行测试并安全运行。所以你需要一个系统来做到这一点。

在下一部分中，我们将实现回测模式，以针对交易所的历史数据来测试您的策略，并提供导入这些数据的服务。

另外，我们将所有这些部分与全局配置和一些依赖项一起打包进命令行工具中。

最后我们将通过查看如何对该代码进行容器化和工业化实现来进一步介绍。

---

*Originally published on [quantbang](https://paragraph.com/@quantbang/python-api-2)*
