Explorar el Código

修改mysql链接方式

daniel hace 1 año
padre
commit
70d99a1df7
Se han modificado 1 ficheros con 9 adiciones y 8 borrados
  1. 9 8
      QMT/qmt_get_indicators.py

+ 9 - 8
QMT/qmt_get_indicators.py

@@ -17,8 +17,6 @@ from apscheduler.schedulers.blocking import BlockingScheduler
 import psutil
 
 pd.set_option('display.max_columns', None)  # 设置显示最大行
-engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
-                       pool_size=4000, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
 
 
 def err_call_back(err):
@@ -57,7 +55,6 @@ def get_macd_data(data, short=0, long1=0, mid=0):
 
 
 # rsi指标
-# 建议用talib库的RSI方法,亲测有用
 def get_ris(data):
     data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
     data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
@@ -225,6 +222,8 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
     m = 0
 
     for stock in stocks:
+        engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
+                               pool_size=5000, pool_recycle=7200, max_overflow=1000, pool_timeout=60)
         engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
                                     pool_size=4000, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
         # print(stock)
@@ -236,6 +235,8 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
             traceback.print_exc()
             pass
         else:
+            engine.dispose()
+
             if len(df) != 0:
                 try:
                     get_macd_data(df)
@@ -248,7 +249,7 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
                     df = df.reset_index(drop=True)
                     # print(stock, '\n', df[['open_front', 'HL']])
                     df = df.replace([np.inf, -np.inf], np.nan)
-                    df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+                    df.to_sql('%s_1d' % stock, con=engine_tech.connect(), index=False, if_exists='replace')
                 # with engine.connect() as con:
                 #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
                 except BaseException:
@@ -263,12 +264,14 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
                 err_list.append(stock)
                 print(f'{stock}数据为空')
 
+            engine_tech.dispose()
+
             if stock in hlfx_pool and T_signals == 2:
                 hlfx_pool.remove(stock)
             elif stock not in hlfx_pool and T_signals == 1:
                 hlfx_pool.append(stock)
                 hlfx_pool_daily.append(stock)
-        engine_tech.dispose()
+
 
 
     print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
@@ -295,16 +298,14 @@ def ind():
     # pool = mp.Pool(processes=18)
     # step = math.ceil(len(stocks) / 12)
     # step = 10000
-    x = 1
     # tech_anal(stocks, hlfx_pool)
     for i in range(0, len(stocks), step):
-        print(x)
         pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, hlfx_pool_daily, err_list,),
                          error_callback=err_call_back)
-        x += 1
     time.sleep(5)
     pool.close()
     pool.join()
+    engine_hlfx_pool.dispose()
 
     print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
     print(len(err_list), err_list)