Pārlūkot izejas kodu

修改订阅callback后的分配计算问题
修正部分hlfx中break位置,减小计算量

Daniel 2 gadi atpakaļ
vecāks
revīzija
348f823444
1 mainītis faili ar 42 papildinājumiem un 43 dzēšanām
  1. 42 43
      QMT/qmt_real_hlfx.py

+ 42 - 43
QMT/qmt_real_hlfx.py

@@ -29,22 +29,6 @@ pd.set_option('display.max_columns', None) # 设置显示最大行
 fre = '1d'
 
 
-def run(seq):
-    print('seq=', seq)
-    '''阻塞线程接收行情回调'''
-    import time
-    client = xtdata.get_client()
-    while True:
-        time.sleep(3)
-        now_date = dt.now()
-        if not client.is_connected() or dt.now() > now_date.replace(hour=15, minute=0, second=0):
-            xtdata.unsubscribe_quote(seq)
-            print(f'现在时间:{dt.now()},已收盘')
-            raise Exception('行情服务连接断开')
-            break
-    return
-
-
 class MyXtQuantTraderCallback(XtQuantTraderCallback):
     def on_disconnected(self):
         """
@@ -115,8 +99,24 @@ def err_call_back(err):
     traceback.print_exc()
 
 
-def hlfx(data):
-    stock_list = list(data.keys())
+def run(seq):
+    """阻塞线程接收行情回调"""
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected():
+            xtdata.unsubscribe_quote(seq)
+            raise Exception('行情服务连接断开')
+        if dt.now() > now_date.replace(hour=15, minute=0, second=0):
+            print(f'现在时间:{dt.now()},已收盘')
+            return 0
+    return
+
+
+def hlfx(stock_list, data):
+    # stock_list = list(data.keys())
     print(f'def-->hlfx, MyPid is {os.getpid()}, 本次我需要计算{len(stock_list)},now is {dt.now()}')
 
     # 获得hlfx_pool池子
@@ -132,8 +132,8 @@ def hlfx(data):
         # 读取qmt_stocks_whole表-前复权-信息
         try:
             df_day = pd.read_sql_query(
-                'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, dif, dea, macd,HL from `%s_%s`'
-                % (qmt_stock, fre), engine_stock)
+                'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, '
+                'dif, dea, macd, HL from `%s_%s`' % (qmt_stock, fre), engine_stock)
             df_day.columns = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd', 'HL']
         except BaseException as e:
             print(qmt_stock, '未能读取!', e)
@@ -189,8 +189,9 @@ def hlfx(data):
                         if (x - m) > 3:
                             # 成笔——>L
                             df_day.loc[x, 'HL'] = 'L'
+                            break
 
-                    elif df_day.loc[m, 'HL'] == 'L':
+                    elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
                         if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
                             # pool_list.append(qmt_stock)
 
@@ -205,16 +206,16 @@ def hlfx(data):
                                 df_day.loc[x, 'HL'] = 'LL'
                                 # 产生信号,进入hlfx_pool
                                 results.append(qmt_stock)
+                                break
                             # 前一个为底更高,且中间不存在更低的底
                             else:
                                 df_day.loc[x, 'HL'] = 'L'
                                 # 产生信号,进入hlfx_pool
-                                results.append(qmt_stock)
-                            break
                         break
                     m = m - 1
                     if m == 0:
                         df_day.loc[x, 'HL'] = 'L'
+                        results.append(qmt_stock)
 
             # 顶
 
@@ -230,7 +231,7 @@ def hlfx(data):
                             results.remove(qmt_stock)
                             break
 
-                    elif (df_day.loc[m, 'HL'] == 'H'):
+                    elif df_day.loc[m, 'HL'] in ['H','HH', 'H*']:
                         if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
                             # 获得MACD,判断MACD判断背驰
                             x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
@@ -243,18 +244,18 @@ def hlfx(data):
                                 df_day.loc[x, 'HL'] = 'HH'
                                 # 产生卖出信号,进入hlfx_pool
                                 results.remove(qmt_stock)
+                                break
 
                             # 前一个为顶,且中间存在不包含 or 更高的顶
                             else:
                                 df_day.loc[x, 'HL'] = 'H'
                                 # 产生卖出信号,进入hlfx_pool
                                 results.remove(qmt_stock)
-
-                            break
                         break
                     m = m - 1
                     if m == 0:
                         df_day.loc[x, 'HL'] = 'H'
+                        results.remove(qmt_stock)
     engine_stock.dispose()
 
     db_pool = pymysql.connect(host='localhost',
@@ -269,33 +270,31 @@ def hlfx(data):
     cursor_pool.execute(sql)
     db_pool.commit()
     print(f'{dt.now()}写入新的results{len(results_list)}个,hlfx_pool更新')
-    del df_day
-    gc.collect()
     engine_hlfx_pool.dispose()
 
 
-
-
-def bridge(list):
-    print(f'MyPid is {os.getpid()}, now is {dt.now()},我需要负责{len(list)}个个股数据')
-    seq = xtdata.subscribe_whole_quote(list, callback=hlfx)
-    run(seq)
-
-
-def prepare():
+def bridge():
+    print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
+    if run(seq) == 0:
+        xtdata.unsubscribe_quote(seq)
+        print(f'{dt.now()},收盘了,收工!')
 
+
+def prepare(data):
+    stock_list = list(data.keys())
     cpu_count = 4
     pool = mp.Pool(processes=cpu_count, maxtasksperchild=4)
-    step = math.ceil(len(stocks) / cpu_count)
+    step = math.ceil(len(stock_list) / cpu_count)
     to_hlfx_list = []
 
-    for i in range(0, len(stocks), step):
-        to_hlfx_list.append([x for x in stocks[i:i + step]])
+    for i in range(0, len(stock_list), step):
+        to_hlfx_list.append([x for x in stock_list[i:i + step]])
 
     for m in range(cpu_count):
-        pool.apply_async(func=bridge,
-                         args=(to_hlfx_list[m],), error_callback=err_call_back)
+        pool.apply_async(func=hlfx,
+                         args=(to_hlfx_list[m], data), error_callback=err_call_back)
     pool.close()
     pool.join()
 
@@ -324,7 +323,7 @@ if __name__ == '__main__':
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
     scheduler = BlockingScheduler()
-    scheduler.add_job(func=prepare, trigger='cron', day_of_week='0-4', hour='14', minute='51',
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='25',
                       timezone="Asia/Shanghai")
     try:
         scheduler.start()