Browse Source

修正自动启动和暂停时间

Daniel 2 years ago
parent
commit
449b84eb5f
2 changed files with 34 additions and 26 deletions
  1. 15 8
      QMT/qmt_real_hlfx.py
  2. 19 18
      QMT/real_time.py

+ 15 - 8
QMT/qmt_real_hlfx.py

@@ -17,6 +17,7 @@ import os
 import psutil
 import traceback
 from apscheduler.schedulers.blocking import BlockingScheduler
+import sys
 import gc
 
 # 原始版本
@@ -111,10 +112,12 @@ def run(seq):
             raise Exception('行情服务连接断开')
         if now_date.replace(hour=11, minute=30, second=0) < dt.now() < now_date.replace(hour=13, minute=0, second=0):
             print(f'现在时间:{dt.now()},已休市')
-            return 0
+            break
+            # return 0
         elif dt.now() > now_date.replace(hour=15, minute=0, second=0):
             print(f'现在时间:{dt.now()},已收盘')
-            return 0
+            break
+            # return 0
     return
 
 
@@ -277,15 +280,16 @@ def hlfx(stock_list, data):
 
 
 def bridge():
-    print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了')
+    print(f'bridge is {os.getpid()}, now is {dt.now()},开盘了')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
-    if run(seq) == 0:
-        xtdata.unsubscribe_quote(seq)
-        print(f'{dt.now()},收盘了,收工!')
+    run(seq)
+    xtdata.unsubscribe_quote(seq)
+    print(f'{dt.now()},收盘了,收工!')
 
 
 def prepare(data):
+    print(f'prepare is {os.getpid()}, now is {dt.now()},开盘了')
     stock_list = list(data.keys())
     cpu_count = 6
     pool = mp.Pool(processes=cpu_count, maxtasksperchild=6)
@@ -299,10 +303,13 @@ def prepare(data):
         pool.apply_async(func=hlfx,
                          args=(to_hlfx_list[m], data), error_callback=err_call_back)
     pool.close()
+    time.sleep(8000)
+    pool.terminate()
     pool.join()
 
 
 if __name__ == '__main__':
+    print(f'总进程pid:{os.getpid()}')
     mp.freeze_support()
     pus = psutil.Process()
     pus.cpu_affinity([0, 1, 2, 3, 4, 5])
@@ -326,9 +333,9 @@ if __name__ == '__main__':
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
     scheduler = BlockingScheduler()
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='25',
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='9', minute='25',
                       timezone="Asia/Shanghai")
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='14', minute='43',
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='13', minute='00',
                       timezone="Asia/Shanghai")
     try:
         scheduler.start()

+ 19 - 18
QMT/real_time.py

@@ -13,6 +13,7 @@ import multiprocessing as mp
 import math
 import psutil
 from apscheduler.schedulers.blocking import BlockingScheduler
+import sys
 import gc
 
 # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
@@ -96,10 +97,12 @@ def run(seq):
             raise Exception('行情服务连接断开')
         if now_date.replace(hour=11, minute=30, second=0) < dt.now() < now_date.replace(hour=13, minute=0, second=0):
             print(f'现在时间:{dt.now()},已休市')
-            return 0
-        elif dt.now() > now_date.replace(hour=16, minute=0, second=0):
+            break
+            # return 0
+        elif dt.now() > now_date.replace(hour=15, minute=0, second=0):
             print(f'现在时间:{dt.now()},已收盘')
-            return 0
+            break
+            # return 0
     return
 
 
@@ -121,7 +124,7 @@ def ma(stock, num, data):
     except:
         return 9999999
     else:
-        ma_num = (sum(df['close_front'][i:]) + data[stock]['lastPrice'])/num
+        ma_num = (sum(df['close_front'][i:]) + data[stock]['lastPrice']) / num
         return ma_num
 
 
@@ -156,13 +159,14 @@ def ma_judge(data, list_judge, rate, results):
     print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
     for stock in list_judge:
         current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
-        MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30, data),\
+        MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30,
+                                                                                                              data), \
             ma(stock, 60, data), ma(stock, 120, data)
         MA5_1 = ma_1(stock, 5)
         # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
         # 入交易池标准:阳线\大于MA5\MA5向上\MA20<MA10\离120线有距离
         if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
-                MA20 < MA10) & (current_price > MA120 or current_price < MA120*rate):
+                MA20 < MA10) & (current_price > MA120 or current_price < MA120 * rate):
             if his_vol(stock, -1) > his_vol(stock, -2):
                 results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
 
@@ -177,7 +181,8 @@ def sell_trader(data):
     positions = xt_trader.query_stock_positions(acc)
     positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
     print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
-    print(f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume  != 0])}')
+    print(
+        f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume != 0])}')
 
     for stock, can_use_volume in positions_dict.items():
         if stock in data and can_use_volume != 0:
@@ -204,6 +209,7 @@ def sell_trader(data):
 
 def buy_trader(data):
     print('买入函数:', dt.now(), f'接受到{len(data.keys())}个个股')
+
     results = mp.Manager().list()
     mp_list = []
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
@@ -279,15 +285,12 @@ def buy_trader(data):
         cursor_pool.close()
         db_pool.close()
 
-
         # 取值交易
-
         keep_stocks = results_industry.split(",")
         new_keep_stock = [stock.replace('XSHG', 'SH').replace('XSHE', 'SZ') for stock in keep_stocks]
         print(f'new_keep_stock is:{len(new_keep_stock)}')
 
-
-        #进入购买程序
+        # 进入购买程序
         max_pos = 7
         for stock in new_keep_stock:
             positions = xt_trader.query_stock_positions(acc)
@@ -299,7 +302,7 @@ def buy_trader(data):
             current_price = data[stock]['lastPrice']
             current_high = data[stock]['high']
             if cash > 5000 and len(positions_dict) < max_pos and current_price > 9 \
-                    and current_price > (current_high*0.98):
+                    and current_price > (current_high * 0.98):
                 volume = int((cash / 3 / current_price) // 100 * 100)
                 print('买入信号!!!!!!', stock, volume, current_price)
                 order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE,
@@ -316,7 +319,6 @@ def trader(data):
     positions = xt_trader.query_stock_positions(acc)
     if len(positions) != 0:
         sell_trader(data)
-
     # 买入条件
     buy_trader(data)
 
@@ -325,9 +327,9 @@ def bridge():
     print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
-    if run(seq) == 0:
-        xtdata.unsubscribe_quote(seq)
-        print(f'{dt.now()},收盘了,收工!')
+    run(seq)
+    xtdata.unsubscribe_quote(seq)
+    print(f'{dt.now()},收盘了,收工!')
 
 
 if __name__ == '__main__':
@@ -360,12 +362,11 @@ if __name__ == '__main__':
     scheduler = BlockingScheduler()
     scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='40',
                       timezone="Asia/Shanghai")
-    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='13', minute='00',
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='13', minute='10',
                       timezone="Asia/Shanghai")
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):
         pass
 
-
     # xtdata.subscribe_quote('000001.SZ', '1d', '', '', count=1, callback=MA)