Browse Source

自动停止功能

Daniel 1 year ago
parent
commit
7cd0fe8d85
2 changed files with 46 additions and 19 deletions
  1. 21 11
      QMT/qmt_real_hlfx.py
  2. 25 8
      QMT/real_time.py

+ 21 - 11
QMT/qmt_real_hlfx.py

@@ -100,12 +100,11 @@ def err_call_back(err):
     traceback.print_exc()
 
 
-def run(seq):
+def run(seq, pid):
     mor = datetime.datetime.strptime(
         str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
     afternoon = datetime.datetime.strptime(
         str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
-    print(mor, afternoon)
     """阻塞线程接收行情回调"""
     import time
     client = xtdata.get_client()
@@ -118,11 +117,13 @@ def run(seq):
         if mor < dt.now():
             xtdata.unsubscribe_quote(seq)
             print(f'现在时间:{dt.now()},已休市')
+            sys.exit()
             break
             # return 0
         elif dt.now() > afternoon:
             xtdata.unsubscribe_quote(seq)
             print(f'现在时间:{dt.now()},已收盘')
+            sys.exit()
             break
             # return 0
     # return
@@ -155,6 +156,7 @@ def hlfx(stock_list, data):
         else:
             # 获得最新价格信息
             get_price = data[qmt_stock]
+            # print(get_price)
             # 调整time时间格式
             get_price['time'] = dt.fromtimestamp(get_price['time'] / 1000.0)
             # print('成功判定', get_price['time'])
@@ -290,14 +292,11 @@ def hlfx(stock_list, data):
 
 
 def bridge():
+    pid = os.getpid()
     print(f'bridge is {os.getpid()}, now is {dt.now()},开盘了')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
-    m = run(seq)
-    xtdata.unsubscribe_quote(seq)
-    if m == 1:
-        pass
-    print(f'{dt.now()},收盘了,收工!')
+    run(seq, pid)
 
 
 def prepare(data):
@@ -322,11 +321,22 @@ def prepare(data):
     pool.join()
 
 
+def job_func():
+    print(f"Job started at {dt.now()}")
+    # 创建子进程
+    p = mp.Process(target=bridge)
+    # 启动子进程
+    p.start()
+    # 等待子进程结束
+    p.join()
+    print(f"Job finished at {dt.now()}")
+
+
 if __name__ == '__main__':
     print(f'总进程pid:{os.getpid()}')
     mp.freeze_support()
     pus = psutil.Process()
-    pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
+    pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
 
     path = r'c:\\qmt\\userdata_mini'
     # 生成session id 整数类型 同时运行的策略不能重复
@@ -346,12 +356,12 @@ if __name__ == '__main__':
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
-    # bridge()
+    # job_func()
 
     scheduler = BlockingScheduler()
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='25',
+    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='25',
                       timezone="Asia/Shanghai")
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='13', minute='00',
+    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='00',
                       timezone="Asia/Shanghai")
     try:
         scheduler.start()

+ 25 - 8
QMT/real_time.py

@@ -85,7 +85,11 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
         print(datetime.datetime.now(), sys._getframe().f_code.co_name)
 
 
-def run(seq):
+def run(seq, pid):
+    mor = datetime.datetime.strptime(
+        str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
+    afternoon = datetime.datetime.strptime(
+        str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
     """阻塞线程接收行情回调"""
     import time
     client = xtdata.get_client()
@@ -95,15 +99,19 @@ def run(seq):
         if not client.is_connected():
             xtdata.unsubscribe_quote(seq)
             raise Exception('行情服务连接断开')
-        if now_date.replace(hour=11, minute=30, second=0) < dt.now() < now_date.replace(hour=13, minute=0, second=0):
+        if mor < dt.now():
+            xtdata.unsubscribe_quote(seq)
             print(f'现在时间:{dt.now()},已休市')
+            sys.exit()
             break
             # return 0
-        elif dt.now() > now_date.replace(hour=15, minute=0, second=0):
+        elif dt.now() > afternoon:
+            xtdata.unsubscribe_quote(seq)
             print(f'现在时间:{dt.now()},已收盘')
+            sys.exit()
             break
             # return 0
-    return
+    # return
 
 
 def real_price(datas):
@@ -328,8 +336,17 @@ def bridge():
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
     run(seq)
-    xtdata.unsubscribe_quote(seq)
-    print(f'{dt.now()},收盘了,收工!')
+
+
+def job_func():
+    print(f"Job started at {dt.now()}")
+    # 创建子进程
+    p = mp.Process(target=bridge)
+    # 启动子进程
+    p.start()
+    # 等待子进程结束
+    p.join()
+    print(f"Job finished at {dt.now()}")
 
 
 if __name__ == '__main__':
@@ -360,9 +377,9 @@ if __name__ == '__main__':
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
     scheduler = BlockingScheduler()
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='40',
+    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='40',
                       timezone="Asia/Shanghai")
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='13', minute='10',
+    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='10',
                       timezone="Asia/Shanghai")
     try:
         scheduler.start()