Browse Source

real_time 自动启动,买入条件 未执行原因信息提示
real_hlfx 自动启动

Daniel 2 years ago
parent
commit
33a737f065
2 changed files with 75 additions and 29 deletions
  1. 42 23
      QMT/qmt_real_hlfx.py
  2. 33 6
      QMT/real_time.py

+ 42 - 23
QMT/qmt_real_hlfx.py

@@ -16,6 +16,8 @@ import multiprocessing as mp
 import os
 import psutil
 import traceback
+from apscheduler.schedulers.blocking import BlockingScheduler
+
 #原始版本
 
 # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
@@ -26,6 +28,20 @@ import traceback
 pd.set_option('display.max_columns', None) # 设置显示最大行
 fre = '1d'
 
+def run(seq):
+    '''阻塞线程接收行情回调'''
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected() or dt.now() > now_date.replace(hour=15, minute=0, second=0):
+            xtdata.unsubscribe_quote(seq)
+            raise Exception('行情服务连接断开')
+            break
+    return
+
+
 class MyXtQuantTraderCallback(XtQuantTraderCallback):
     def on_disconnected(self):
         """
@@ -253,22 +269,31 @@ def hlfx(data):
     engine_hlfx_pool.dispose()
 
 
+def trader(data):
+    print(dt.now(), len(data.keys()), data.keys())
+
 def bridge(list):
     print(f'MyPid is {os.getpid()}, now is {dt.now()},我需要负责{len(list)}个个股数据')
-    xtdata.subscribe_whole_quote(list, callback=hlfx)
-    xtdata.run()
+    seq = xtdata.subscribe_whole_quote(list, callback=trader)
+    run(seq)
 
 
 def prepare():
-    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
 
-    results = pd.read_sql_query(
-        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(",")
-    results = [x.replace('XSHG', 'SH').replace('XSHE', 'SZ') for x in results]
-    print('数据库读取,并转化后缀格式', len(results))
-    # print(results[0:10])
-    engine_hlfx_pool.dispose()
-    return results
+    cpu_count = 4
+    pool = mp.Pool(processes=cpu_count, maxtasksperchild=8)
+    step = math.ceil(len(stocks) / cpu_count)
+    to_hlfx_list = []
+
+    for i in range(0, len(stocks), step):
+        to_hlfx_list.append([x for x in stocks[i:i + step]])
+
+    for m in range(cpu_count):
+        pool.apply_async(func=bridge,
+                         args=(to_hlfx_list[m],), error_callback=err_call_back)
+    pool.close()
+    pool.join()
 
 
 if __name__ == '__main__':
@@ -293,19 +318,13 @@ if __name__ == '__main__':
     # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
-    stocks = xtdata.get_stock_list_in_sector('沪深A股')
 
-    cpu_count = 4
-    pool = mp.Pool(processes=cpu_count, maxtasksperchild=8)
-    step = math.ceil(len(stocks) / cpu_count)
-    to_hlfx_list = []
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=prepare, trigger='cron', day_of_week='0-4', hour='9', minute='25',
+                      timezone="Asia/Shanghai")
+    try:
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        pass
 
-    for i in range(0, len(stocks), step):
-        to_hlfx_list.append([x for x in stocks[i:i+step]])
-
-    for m in range(cpu_count):
-        pool.apply_async(func=bridge,
-                         args=(to_hlfx_list[m],), error_callback=err_call_back)
-    pool.close()
-    pool.join()
 

+ 33 - 6
QMT/real_time.py

@@ -12,6 +12,7 @@ import pymysql
 import multiprocessing as mp
 import math
 import psutil
+from apscheduler.schedulers.blocking import BlockingScheduler
 
 auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
 db_pool = pymysql.connect(host='localhost',
@@ -23,6 +24,21 @@ cursor_pool = db_pool.cursor()
 engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
 
 
+def run(seq):
+    print(seq)
+    '''阻塞线程接收行情回调'''
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected() or dt.now() > now_date.replace(hour=13, minute=26, second=0):
+            xtdata.unsubscribe_quote(seq)
+            raise Exception('行情服务连接断开')
+            break
+    return
+
+
 def real_price(datas):
     # i = '000001.SZ'
     for i in datas:
@@ -211,8 +227,8 @@ def buy_trader(data, positions):
                     order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE,
                                                      current_price, 'strategy1', 'order_test')
                     print(order_id)
-            else:
-                print(f'Cash只有:{cash} 或者 现有持仓{len(positions)} 超过了{max_pos}')
+                else:
+                    print(f'Cash只有:{cash} 或者 现有持仓{len(positions)} 超过了{max_pos}')
     engine_hlfx_pool.dispose()
     print('一轮结束了,现在时间是:', dt.now())
 
@@ -231,6 +247,13 @@ def trader(data):
     buy_trader(data, positions)
 
 
+def bridge():
+    print("start")
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
+    run(seq)
+
+
 class MyXtQuantTraderCallback(XtQuantTraderCallback):
     def on_disconnected(self):
         """
@@ -304,7 +327,6 @@ if __name__ == '__main__':
     pus = psutil.Process()
     pus.cpu_affinity([4, 5, 6, 7])
 
-    print("start")
     # 指定客户端所在路径
     path = r'c:\\qmt\\userdata_mini'
     # 生成session id 整数类型 同时运行的策略不能重复
@@ -324,8 +346,13 @@ if __name__ == '__main__':
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='9', minute='40',
+                      timezone="Asia/Shanghai")
+    try:
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        pass
+
 
-    stocks = xtdata.get_stock_list_in_sector('沪深A股')
-    xtdata.subscribe_whole_quote(stocks, callback=trader)
-    xtdata.run()
     # xtdata.subscribe_quote('000001.SZ', '1d', '', '', count=1, callback=MA)