فهرست منبع

处理df数据异常情况,并避免inf写入MySQL

Daniel 2 سال پیش
والد
کامیت
2481fd2960
1فایلهای تغییر یافته به همراه45 افزوده شده و 23 حذف شده
  1. 45 23
      QMT/qmt_get_indicators.py

+ 45 - 23
QMT/qmt_get_indicators.py

@@ -1,5 +1,6 @@
 # coding:utf-8
 # coding:utf-8
 from datetime import datetime as dt
 from datetime import datetime as dt
+import numpy as np
 import os
 import os
 import pandas as pd
 import pandas as pd
 import time
 import time
@@ -11,6 +12,7 @@ import math
 import talib as ta
 import talib as ta
 from xtquant import xtdata
 from xtquant import xtdata
 import os
 import os
+import traceback
 
 
 pd.set_option('display.max_columns', None)  # 设置显示最大行
 pd.set_option('display.max_columns', None)  # 设置显示最大行
 engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
 engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
@@ -18,6 +20,7 @@ engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_sto
 
 
 def err_call_back(err):
 def err_call_back(err):
     print(f'出错啦~ error:{str(err)}')
     print(f'出错啦~ error:{str(err)}')
+    traceback.print_exc()
 
 
 
 
 def myself_kdj(df):
 def myself_kdj(df):
@@ -189,42 +192,56 @@ def get_hlfx(data):
     return df_temp, Trading_signals
     return df_temp, Trading_signals
 
 
 
 
-def tech_anal(stocks, hlfx_pool):
-    print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()}')
+def tech_anal(stocks, hlfx_pool,err_list):
+    print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},池子长度为{len(stocks)}')
+
     engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
     engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
     m = 0
     m = 0
 
 
     for stock in stocks:
     for stock in stocks:
+        # print(stock)
         try:
         try:
             df = pd.read_sql_table('%s_1d' % stock, con=engine)
             df = pd.read_sql_table('%s_1d' % stock, con=engine)
+            df.dropna(axis=0, how='any')
         except BaseException:
         except BaseException:
-            print(stock)
+            print(f'{stock}读取有问题')
+            traceback.print_exc()
             pass
             pass
         else:
         else:
-            get_macd_data(df)
-            get_ris(df)
-            get_bias(df)
-            get_wilr(df)
-            df_temp, T_signals = get_hlfx(df)
-            df = pd.merge(df, df_temp, on='time', how='left')
-            df['HL'].fillna(value='-', inplace=True)
-            df = df.reset_index(drop=True)
+            if len(df) != 0:
+                try:
+                    get_macd_data(df)
+                    get_ris(df)
+                    get_bias(df)
+                    get_wilr(df)
+                    df_temp, T_signals = get_hlfx(df)
+                    df = pd.merge(df, df_temp, on='time', how='left')
+                    df['HL'].fillna(value='-', inplace=True)
+                    df = df.reset_index(drop=True)
+                    # print(stock, '\n', df[['open_front', 'HL']])
+                    df = df.replace([np.inf, -np.inf], np.nan)
+                    df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+                # with engine.connect() as con:
+                #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
+                except BaseException:
+                    print(f'{stock}存储有问题')
+                    traceback.print_exc()
+                    err_list.append(stock)
+                    pass
+                else:
+                    # print(f"{stock} 成功!")
+                    m += 1
+            else:
+                err_list.append(stock)
+                print(f'{stock}数据为空')
+
             if stock in hlfx_pool and T_signals == 2:
             if stock in hlfx_pool and T_signals == 2:
                 hlfx_pool.remove(stock)
                 hlfx_pool.remove(stock)
             elif stock not in hlfx_pool and T_signals == 1:
             elif stock not in hlfx_pool and T_signals == 1:
                 hlfx_pool.append(stock)
                 hlfx_pool.append(stock)
-            try:
-                df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
-            # with engine.connect() as con:
-            #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
-            except BaseException:
-                print(stock)
-                pass
-            else:
-                print(f"{stock} 成功!")
-                m += 1
 
 
-    # print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
+
+    print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
@@ -232,7 +249,9 @@ if __name__ == '__main__':
 
 
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     print(len(stocks))
     print(len(stocks))
+    stocks.sort()
 
 
+    err_list = mp.Manager().list()
     fre = '1d'
     fre = '1d'
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
     hlfx_pool = mp.Manager().list()
     hlfx_pool = mp.Manager().list()
@@ -243,15 +262,17 @@ if __name__ == '__main__':
     step = math.ceil(len(stocks) / mp.cpu_count())
     step = math.ceil(len(stocks) / mp.cpu_count())
     # step = 10000
     # step = 10000
     x = 1
     x = 1
+    # tech_anal(stocks, hlfx_pool)
     for i in range(0, len(stocks), step):
     for i in range(0, len(stocks), step):
         print(x)
         print(x)
-        pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, ), error_callback=err_call_back)
+        pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, err_list,), error_callback=err_call_back)
         x += 1
         x += 1
     time.sleep(5)
     time.sleep(5)
     pool.close()
     pool.close()
     pool.join()
     pool.join()
 
 
     print(hlfx_pool)
     print(hlfx_pool)
+    print(len(err_list, err_list))
 
 
     # 存档入库
     # 存档入库
     db_pool = pymysql.connect(host='localhost',
     db_pool = pymysql.connect(host='localhost',
@@ -267,4 +288,5 @@ if __name__ == '__main__':
     db_pool.commit()
     db_pool.commit()
     edtime = dt.now()
     edtime = dt.now()
 
 
+
     print(edtime - sttime)
     print(edtime - sttime)