Browse Source

调整MA5均线交易持仓数量和金额的实时性

Daniel 2 years ago
parent
commit
9b2c5b8c73
1 changed files with 116 additions and 113 deletions
  1. 116 113
      QMT/real_time.py

+ 116 - 113
QMT/real_time.py

@@ -25,18 +25,84 @@ cursor_pool = db_pool.cursor()
 engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
 
 
+class MyXtQuantTraderCallback(XtQuantTraderCallback):
+    def on_disconnected(self):
+        """
+        连接断开
+        :return:
+        """
+        print(datetime.datetime.now(), '连接断开回调')
+
+    def on_stock_order(self, order):
+        """
+        委托回报推送
+        :param order: XtOrder对象
+        :return:
+        """
+        print(datetime.datetime.now(), '委托回调', order.order_remark)
+
+    def on_stock_trade(self, trade):
+        """
+        成交变动推送
+        :param trade: XtTrade对象
+        :return:
+        """
+        print(datetime.datetime.now(), '成交回调', trade.order_remark)
+
+    def on_order_error(self, order_error):
+        """
+        委托失败推送
+        :param order_error:XtOrderError 对象
+        :return:
+        """
+        # print("on order_error callback")
+        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
+        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
+
+    def on_cancel_error(self, cancel_error):
+        """
+        撤单失败推送
+        :param cancel_error: XtCancelError 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_order_stock_async_response(self, response):
+        """
+        异步下单回报推送
+        :param response: XtOrderResponse 对象
+        :return:
+        """
+        print(f"异步委托回调 {response.order_remark}")
+
+    def on_cancel_order_stock_async_response(self, response):
+        """
+        :param response: XtCancelOrderResponse 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_account_status(self, status):
+        """
+        :param response: XtAccountStatus 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+
 def run(seq):
-    print('seq=', seq)
-    '''阻塞线程接收行情回调'''
+    """阻塞线程接收行情回调"""
     import time
     client = xtdata.get_client()
     while True:
         time.sleep(3)
         now_date = dt.now()
-        if not client.is_connected() or dt.now() > now_date.replace(hour=15, minute=00, second=0):
+        if not client.is_connected():
             xtdata.unsubscribe_quote(seq)
             raise Exception('行情服务连接断开')
-            break
+        if dt.now() > now_date.replace(hour=15, minute=0, second=0):
+            print(f'现在时间:{dt.now()},已收盘')
+            return 0
     return
 
 
@@ -59,14 +125,12 @@ def ma(stock, num, data):
         return 9999999
     else:
         ma_num = (sum(df['close_front'][i:]) + data[stock]['lastPrice'])/num
-        del df
-        gc.collect()
         return ma_num
 
 
 def ma_1(stock, num):
     global engine_stock
-    i = (num) * -1
+    i = num * -1
     try:
         df = pd.read_sql_query(
             'select close_front from `%s_1d`' % stock, engine_stock)
@@ -74,8 +138,6 @@ def ma_1(stock, num):
         return 9999999
     else:
         ma_num_1 = df['close_front'][i:].mean()
-        del df
-        gc.collect()
         return ma_num_1
 
 
@@ -96,48 +158,54 @@ def ma_judge(data, stock_list, rate, results):
     list_judge = list(set(data.keys()) & set(stock_list))
     print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
     for stock in list_judge:
-        i = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
-        current_price, open_price = data[i]['lastPrice'], data[i]['open']
-        MA5, MA10, MA20, MA30, MA60, MA120 = ma(i, 5, data), ma(i, 10, data), ma(i, 20, data), ma(i, 30, data),\
-            ma(i, 60, data), ma(i, 120, data)
-        MA5_1 = ma_1(i, 5)
+        current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
+        MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30, data),\
+            ma(stock, 60, data), ma(stock, 120, data)
+        MA5_1 = ma_1(stock, 5)
         # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
         # 入交易池标准:阳线\大于MA5\MA5向上\MA20<MA10\离120线有距离
         if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
                 MA20 < MA10) & (current_price > MA120 or current_price < MA120*rate):
-            if his_vol(i, -1) > his_vol(i, -2):
-                results.append(i.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
+            if his_vol(stock, -1) > his_vol(stock, -2):
+                results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
 
 
-def sell_trader(data, positions_dict):
-    # for m in data:
-    #     print(m, data[m]['lastPrice'])
+def get_fundamentals(results):
+    return results
+    pass
+
+
+def sell_trader(data):
     print('卖出函数:', dt.now())
     positions = xt_trader.query_stock_positions(acc)
-    print('持仓总数:', len(positions))
+    positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
+    print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
+    print(f'目前持仓总数为:{len(positions)}')
 
-    for stock, volume in positions_dict.items():
-        if stock in data:
+    for stock, can_use_volume in positions_dict.items():
+        if stock in data and can_use_volume != 0:
             current_price = data[stock]['lastPrice']
             open_price = data[stock]['open']
             MA5 = ma(stock, 5, data)
             MA5_1 = ma_1(stock, 5)
-            print(f'{stock},持仓量为{volume}当前价:{current_price},MA5:{MA5},昨日MA5:{MA5_1},开始判断:')
-            if current_price < MA5 or MA5 < MA5_1 or current_price > MA5 * 1.07:
+            print(f'{stock},持仓量为{can_use_volume}当前价:{current_price},MA5:{MA5},昨日MA5:{MA5_1},开始判断:')
+            if current_price < MA5 or MA5 < MA5_1:
                 print('卖出信号!!!!!!', stock, current_price)
-                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, volume,
-                                                 xtconstant.LATEST_PRICE, 0, 'strategy1', 'order_test')
+                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                                 xtconstant.LATEST_PRICE, 0, 'MA5策略', '低于MA5趋势向下')
                 print('价格:', current_price, open_price, MA5, MA5_1)
-                print(order_id, stock, volume)
+                print(order_id, stock, can_use_volume)
+            elif current_price > MA5 * 1.07:
+                print('盈利乖离率超7%!!!!!!', stock, current_price)
+                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                                 xtconstant.LATEST_PRICE, 0, 'MA5策略', '盈利乖离率超7%')
+                print('价格:', current_price, open_price, MA5, MA5_1)
+                print(order_id, stock, can_use_volume)
         else:
             print(f'本轮没有持仓股票信息!')
 
 
-def get_fundamentals(results):
-    return results
-    pass
-
-def buy_trader(data, positions):
+def buy_trader(data):
     print('买入函数:', dt.now(), f'接受到{len(data.keys())}个个股')
     results = mp.Manager().list()
     mp_list = []
@@ -215,6 +283,7 @@ def buy_trader(data, positions):
         #进入购买程序
         max_pos = 7
         for stock in new_keep_stock:
+            positions = xt_trader.query_stock_positions(acc)
             asset = xt_trader.query_stock_asset(acc)
             cash = asset.cash
             positions_dict = {positions[x].stock_code: positions[x].volume for x in range(0, len(positions)) if
@@ -227,7 +296,7 @@ def buy_trader(data, positions):
                 volume = int((cash / 3 / current_price) // 100 * 100)
                 print('买入信号!!!!!!', stock, volume, current_price)
                 order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE,
-                                                 current_price, 'strategy1', 'order_test')
+                                                 current_price, 'MA5策略', 'MA5趋势向上')
                 print(order_id)
             else:
                 print(f'Cash只有:{cash} 或者 现有持仓{len(positions)} 超过了{max_pos}')
@@ -236,90 +305,23 @@ def buy_trader(data, positions):
 
 
 def trader(data):
-    gc.collect()
-    print(len(data.keys()))
-
     # 先判断卖出条件
     positions = xt_trader.query_stock_positions(acc)
-    print('持仓数量', len(positions))
     if len(positions) != 0:
-        positions_dict = {positions[x].stock_code: positions[x].volume for x in range(0, len(positions))}
-        sell_trader(data, positions_dict)
+        sell_trader(data)
 
     # 买入条件
     buy_trader(data, positions)
 
 
 def bridge():
-    print("start")
+    print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
-    run(seq)
-
-
-class MyXtQuantTraderCallback(XtQuantTraderCallback):
-    def on_disconnected(self):
-        """
-        连接断开
-        :return:
-        """
-        print(datetime.datetime.now(), '连接断开回调')
-
-    def on_stock_order(self, order):
-        """
-        委托回报推送
-        :param order: XtOrder对象
-        :return:
-        """
-        print(datetime.datetime.now(), '委托回调', order.order_remark)
-
-    def on_stock_trade(self, trade):
-        """
-        成交变动推送
-        :param trade: XtTrade对象
-        :return:
-        """
-        print(datetime.datetime.now(), '成交回调', trade.order_remark)
-
-    def on_order_error(self, order_error):
-        """
-        委托失败推送
-        :param order_error:XtOrderError 对象
-        :return:
-        """
-        # print("on order_error callback")
-        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
-        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
-
-    def on_cancel_error(self, cancel_error):
-        """
-        撤单失败推送
-        :param cancel_error: XtCancelError 对象
-        :return:
-        """
-        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
-
-    def on_order_stock_async_response(self, response):
-        """
-        异步下单回报推送
-        :param response: XtOrderResponse 对象
-        :return:
-        """
-        print(f"异步委托回调 {response.order_remark}")
-
-    def on_cancel_order_stock_async_response(self, response):
-        """
-        :param response: XtCancelOrderResponse 对象
-        :return:
-        """
-        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
-
-    def on_account_status(self, status):
-        """
-        :param response: XtAccountStatus 对象
-        :return:
-        """
-        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+    time.sleep(100000)
+    if run(seq) == 0:
+        xtdata.unsubscribe_quote(seq)
+        print(f'{dt.now()},收盘了,收工!')
 
 
 if __name__ == '__main__':
@@ -349,13 +351,14 @@ if __name__ == '__main__':
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
-    scheduler = BlockingScheduler()
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='40',
-                      timezone="Asia/Shanghai")
-    try:
-        scheduler.start()
-    except (KeyboardInterrupt, SystemExit):
-        pass
+    bridge()
+    # scheduler = BlockingScheduler()
+    # scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='40',
+    #                   timezone="Asia/Shanghai")
+    # try:
+    #     scheduler.start()
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass
 
 
     # xtdata.subscribe_quote('000001.SZ', '1d', '', '', count=1, callback=MA)