Browse Source

新增启动时间
新增中午休市时间

daniel-ali 2 years ago
parent
commit
fd22ce4621
2 changed files with 27 additions and 17 deletions
  1. 10 5
      QMT/qmt_real_hlfx.py
  2. 17 12
      QMT/real_time.py

+ 10 - 5
QMT/qmt_real_hlfx.py

@@ -109,7 +109,10 @@ def run(seq):
         if not client.is_connected():
             xtdata.unsubscribe_quote(seq)
             raise Exception('行情服务连接断开')
-        if dt.now() > now_date.replace(hour=15, minute=0, second=0):
+        if now_date.replace(hour=11, minute=30, second=0) < dt.now() < now_date.replace(hour=13, minute=0, second=0):
+            print(f'现在时间:{dt.now()},已休市')
+            return 0
+        elif dt.now() > now_date.replace(hour=15, minute=0, second=0):
             print(f'现在时间:{dt.now()},已收盘')
             return 0
     return
@@ -123,7 +126,7 @@ def hlfx(stock_list, data):
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
     results = []
     results.extend(pd.read_sql_query(
-        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
+        'select value from `%s` order by `index` desc limit 10' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
     print(f'本次hlfx_pool有{len(results)}个个股')
 
     engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
@@ -284,8 +287,8 @@ def bridge():
 
 def prepare(data):
     stock_list = list(data.keys())
-    cpu_count = 4
-    pool = mp.Pool(processes=cpu_count, maxtasksperchild=4)
+    cpu_count = 6
+    pool = mp.Pool(processes=cpu_count, maxtasksperchild=6)
     step = math.ceil(len(stock_list) / cpu_count)
     to_hlfx_list = []
 
@@ -302,7 +305,7 @@ def prepare(data):
 if __name__ == '__main__':
     mp.freeze_support()
     pus = psutil.Process()
-    pus.cpu_affinity([0, 1, 2, 3])
+    pus.cpu_affinity([0, 1, 2, 3, 4, 5])
 
     path = r'c:\\qmt\\userdata_mini'
     # 生成session id 整数类型 同时运行的策略不能重复
@@ -325,6 +328,8 @@ if __name__ == '__main__':
     scheduler = BlockingScheduler()
     scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='25',
                       timezone="Asia/Shanghai")
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='13', minute='00',
+                      timezone="Asia/Shanghai")
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):

+ 17 - 12
QMT/real_time.py

@@ -100,7 +100,10 @@ def run(seq):
         if not client.is_connected():
             xtdata.unsubscribe_quote(seq)
             raise Exception('行情服务连接断开')
-        if dt.now() > now_date.replace(hour=15, minute=0, second=0):
+        if now_date.replace(hour=11, minute=30, second=0) < dt.now() < now_date.replace(hour=13, minute=0, second=0):
+            print(f'现在时间:{dt.now()},已休市')
+            return 0
+        elif dt.now() > now_date.replace(hour=15, minute=0, second=0):
             print(f'现在时间:{dt.now()},已收盘')
             return 0
     return
@@ -153,9 +156,9 @@ def his_vol(stock, num):
         return df['volume_front'].iloc[num]
 
 
-def ma_judge(data, stock_list, rate, results):
+def ma_judge(data, list_judge, rate, results):
     # print(f',收到的data数据为:{len(data.keys())},stock_pool长度为{len(stock_list)},now is {dt.now()}')
-    list_judge = list(set(data.keys()) & set(stock_list))
+
     print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
     for stock in list_judge:
         current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
@@ -213,10 +216,10 @@ def buy_trader(data):
 
     try:
         stock_pool = pd.read_sql_query(
-            'select value from `%s`' % '1d', engine_hlfx_pool)
+            'select value from `%s` order by `index` desc limit 10' % '1d', engine_hlfx_pool)
         stock_pool = stock_pool.iloc[-1, 0].split(",")
         stock_pool.sort()
-        print('stock_pool',len(stock_pool))
+        print('stock_pool', len(stock_pool))
     except BaseException:
         pass
     '''
@@ -232,12 +235,13 @@ def buy_trader(data):
                     results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
                     print('append')
     '''
-
-    step = math.ceil(len(stock_pool) / (mp.cpu_count()/2))
+    list_judge = list(set(data.keys()) & set(stock_pool))
+    print(f'本轮有{len(data.keys())}条个股信息,而list_judge有:{len(list_judge)}')
+    step = math.ceil(len(list_judge) / 2)
     print('step:', step)
     rate = 0.8
-    for i in range(0, len(stock_pool), step):
-        p = mp.Process(target=ma_judge, args=(data, stock_pool[i:i + step], rate, results))
+    for i in range(0, len(list_judge), step):
+        p = mp.Process(target=ma_judge, args=(data, list_judge[i:i + step], rate, results))
         mp_list.append(p)
         p.start()
     for j in mp_list:
@@ -311,14 +315,13 @@ def trader(data):
         sell_trader(data)
 
     # 买入条件
-    buy_trader(data, positions)
+    buy_trader(data)
 
 
 def bridge():
     print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
-    time.sleep(100000)
     if run(seq) == 0:
         xtdata.unsubscribe_quote(seq)
         print(f'{dt.now()},收盘了,收工!')
@@ -330,7 +333,7 @@ if __name__ == '__main__':
     mp.freeze_support()
     print('cpu_count =', mp.cpu_count())
     pus = psutil.Process()
-    pus.cpu_affinity([4, 5, 6, 7])
+    pus.cpu_affinity([6, 7])
 
     # 指定客户端所在路径
     path = r'c:\\qmt\\userdata_mini'
@@ -354,6 +357,8 @@ if __name__ == '__main__':
     scheduler = BlockingScheduler()
     scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='40',
                       timezone="Asia/Shanghai")
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='13', minute='00',
+                      timezone="Asia/Shanghai")
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):