瀏覽代碼

限制进程数量

Daniel 2 年之前
父節點
當前提交
95a59f92fc
共有 1 個文件被更改,包括 42 次插入67 次删除
  1. 42 67
      QMT/qmt_real_hlfx.py

+ 42 - 67
QMT/qmt_real_hlfx.py

@@ -96,17 +96,26 @@ def err_call_back(err):
     traceback.print_exc()
 
 
-def hlfx(data, stocks, pool_list):
-    print(f'MyPid is {os.getpid()}, now is {dt.now()}')
+def hlfx(data):
+    stock_list = list(data.keys())
+    print(f'def-->hlfx, MyPid is {os.getpid()}, 本次我需要计算{len(stock_list)},now is {dt.now()}')
+
+    # 获得hlfx_pool池子
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+    results = []
+    results.extend(pd.read_sql_query(
+        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
+    print(f'本次hlfx_pool有{len(results)}个个股')
+
     engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
-    print(stocks)
-    for qmt_stock in stocks:
+
+    for qmt_stock in stock_list:
         # 读取qmt_stocks_whole表-前复权-信息
         try:
             df_day = pd.read_sql_query(
                 'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, dif, dea, macd,HL from `%s_%s`'
                 % (qmt_stock, fre), engine_stock)
-            df_day.columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd', 'HL']
+            df_day.columns = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd', 'HL']
         except BaseException as e:
             print(qmt_stock, '未能读取!', e)
             pass
@@ -125,8 +134,9 @@ def hlfx(data, stocks, pool_list):
                         and df_day.iloc[-1, 4] < get_price['low']):
                 # print('lalallala', get_price['open'], get_price['lastPrice'], get_price['high'],
                 #                             get_price['low'], get_price['volume'], get_price['amount'])
-                qmt_df = pd.DataFrame(data=[[get_price['time'], get_price['open'], get_price['lastPrice'], get_price['high'],
-                                            get_price['low'], get_price['volume'], get_price['amount']]],
+                qmt_df = pd.DataFrame(data=[[get_price['time'], get_price['open'], get_price['lastPrice'],
+                                             get_price['high'], get_price['low'], get_price['volume'],
+                                             get_price['amount']]],
                                       columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount'])
                 # print('qmt_______', qmt_df)
                 df_day = pd.concat([df_day, qmt_df], ignore_index=True)
@@ -175,12 +185,12 @@ def hlfx(data, stocks, pool_list):
                             if m_macd_dif < x_macd_dif:
                                 df_day.loc[x, 'HL'] = 'LL'
                                 # 产生信号,进入hlfx_pool
-                                pool_list.append(qmt_stock)
+                                results.append(qmt_stock)
                             # 前一个为底更高,且中间不存在更低的底
                             else:
                                 df_day.loc[x, 'HL'] = 'L'
                                 # 产生信号,进入hlfx_pool
-                                pool_list.append(qmt_stock)
+                                results.append(qmt_stock)
                             break
                         break
                     m = m - 1
@@ -190,7 +200,7 @@ def hlfx(data, stocks, pool_list):
             # 顶
 
             elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
-                    df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in pool_list):
+                    df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in results):
                 df_day.loc[x, 'HL'] = 'H*'
                 while m:
                     if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
@@ -198,7 +208,7 @@ def hlfx(data, stocks, pool_list):
                             # 成笔->H
                             df_day.loc[x, 'HL'] = 'H'
                             # 产生卖出信号,进入hlfx_pool
-                            pool_list.remove(qmt_stock)
+                            results.remove(qmt_stock)
                             break
 
                     elif (df_day.loc[m, 'HL'] == 'H'):
@@ -213,13 +223,13 @@ def hlfx(data, stocks, pool_list):
                             if x_macd_dif < m_macd_dif:
                                 df_day.loc[x, 'HL'] = 'HH'
                                 # 产生卖出信号,进入hlfx_pool
-                                pool_list.remove(qmt_stock)
+                                results.remove(qmt_stock)
 
                             # 前一个为顶,且中间存在不包含 or 更高的顶
                             else:
                                 df_day.loc[x, 'HL'] = 'H'
                                 # 产生卖出信号,进入hlfx_pool
-                                pool_list.remove(qmt_stock)
+                                results.remove(qmt_stock)
 
                             break
                         break
@@ -228,61 +238,12 @@ def hlfx(data, stocks, pool_list):
                         df_day.loc[x, 'HL'] = 'H'
     engine_stock.dispose()
 
-
-
-def bridge(data):
-    # 连接数据库
-    '''
-    db = pymysql.connect(host='localhost',
-                         user='root',
-                         port=3307,
-                         password='r6kEwqWU9!v3',
-                         database='hlfx')
-    cursor = db.cursor()
-    cursor.execute("show tables like '%%%s%%' " % fre)
-    pool_list = [tuple[0] for tuple in cursor.fetchall()]
-    print('取得 table_list %s' % fre)
-    '''
-    '''
-    
-    1.获取hlfx_pool中隔夜的标的
-    2.将本此的data均分,给到进程池
-    3.将data总数据、分配的任务stocklist、hlfx_pool 送入realtime_hlfx中进行计算
-    4.将实时刷新的hlfx存入hlfx_pool 以过滤出现顶分型的标的
-    
-    '''
-
-    # 获得hlfx_pool池子
-    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
-    results = mp.Manager().list()
-    results.extend(pd.read_sql_query(
-        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
-    print(results)
-
-    to_hlfx_list = []
-    keys = list(data.keys())
-    print(len(keys))
-    cpu_count = 4
-    step = math.ceil(len(keys) / cpu_count)
-    for i in range(0, len(keys), step):
-        to_hlfx_list.append([x for x in keys[i:i+step]])
-
-    pool = mp.Pool(processes=cpu_count)
-    for m in range(cpu_count):
-        pool.apply_async(func=hlfx,
-                         args=(data, to_hlfx_list[m], results,), error_callback=err_call_back)
-    pool.close()
-    pool.join()
-
-
-
     db_pool = pymysql.connect(host='localhost',
                               user='root',
                               port=3307,
                               password='r6kEwqWU9!v3',
                               database='hlfx_pool')
     cursor_pool = db_pool.cursor()
-
     print(set(results))
     results_list = ','.join(set(results))
     sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
@@ -290,8 +251,12 @@ def bridge(data):
     db_pool.commit()
     print(f'{dt.now()}写入新的results,hlfx_pool更新')
     engine_hlfx_pool.dispose()
-    # hlfx(data, engine_stock, engine_hlfx)
-    pass
+
+
+def bridge(list):
+    print(f'MyPid is {os.getpid()}, now is {dt.now()},我需要负责{len(list)}个个股数据')
+    xtdata.subscribe_whole_quote(list, callback=hlfx)
+    xtdata.run()
 
 
 def prepare():
@@ -328,9 +293,19 @@ if __name__ == '__main__':
     # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
-
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
-    xtdata.subscribe_whole_quote(stocks, callback=bridge)
 
-    xtdata.run()
+    cpu_count = 4
+    pool = mp.Pool(processes=cpu_count, maxtasksperchild=8)
+    step = math.ceil(len(stocks) / cpu_count)
+    to_hlfx_list = []
+
+    for i in range(0, len(stocks), step):
+        to_hlfx_list.append([x for x in stocks[i:i+step]])
+
+    for m in range(cpu_count):
+        pool.apply_async(func=bridge,
+                         args=(to_hlfx_list[m],), error_callback=err_call_back)
+    pool.close()
+    pool.join()