Explorar o código

qmt-download history data 自动化入数据库,修正apscheduler报错警告,增加时区

Daniel %!s(int64=2) %!d(string=hai) anos
pai
achega
b6ff6b5e41
Modificáronse 1 ficheiros con 18 adicións e 36 borrados
  1. 18 36
      QMT/download_history_data2.py

+ 18 - 36
QMT/download_history_data2.py

@@ -1,47 +1,45 @@
-from xtquant import xtdata, xttrader
+from xtquant import xtdata
 from datetime import datetime as dt
 import pandas as pd
 import math
 from sqlalchemy import create_engine
 import multiprocessing as mp
 from apscheduler.schedulers.blocking import BlockingScheduler
+
 # pd.set_option('display.max_rows', None) # 设置显示最大行
 
 
 path = 'C:\\qmt\\userdata_mini'
 
-
 field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
 cpu_count = mp.cpu_count()
 
 
-def to_sql(stock_list, engine):
+def to_sql(stock_list, eng):
     print(dt.now(), '开始循环入库!')
     for stock in stock_list:
         print(stock)
         data = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
-        df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1)
+        df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']],
+                       axis=1)
         df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
         df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
         df.reset_index(drop=True, inplace=True)
         print(df)
-        df.to_sql('%s_1d' %stock, con=engine, index=True, if_exists='append')
-
-        # with engine.connect() as con:
-        #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" %stock)
-        # exit()
+        df.to_sql('%s_1d' % stock, con=eng, index=True, if_exists='append')
 
 
-def to_df(key,valus,engine):
-    print('to_df')
-    pass
-def download_data(stocks, engine):
+def download_data(stock_list, eng):
     print(dt.now(), '开始下载!')
-    xtdata.download_history_data2(stock_list=stocks, period='1d', start_time='', end_time='')
+    xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
     print(dt.now(), '下载完成,准备入库!')
-    to_sql(stocks, engine)
+    to_sql(stock_list, eng)
+
+
+# def to_df(key, values, engine):
+#     print('to_df')
+#     pass
 
-sttime = dt.now()
 
 if __name__ == '__main__':
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
@@ -51,26 +49,10 @@ if __name__ == '__main__':
     step = math.ceil(len(stocks) / cpu_count)
     engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
 
-
-    # num = 1
-    sched = BlockingScheduler()
-    sched.add_job(func=download_data, trigger='cron', hour='22', minute='45', args=[stocks, engine])
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=download_data, trigger='cron', hour='15', minute='45', args=[stocks, engine],
+                      timezone="Asia/Shanghai")
     try:
-        sched.start()
+        scheduler.start()
     except (KeyboardInterrupt, SystemExit):
         pass
-    print(dt.now() - sttime, '更新完成,准备入库!')
-    # to_sql(stocks, engine)
-    # if sched.add_job(download_data, 'cron', hour=22,minute=00):
-    #     print(dt.now() - sttime, '更新完成,准备入库!' )
-    #     to_sql(stocks, engine)
-        # p_list = []
-        # # data = xtdata.get_market_data(field, stocks, '1d', end_time='', count=-1, dividend_type='back')
-        # for i in range(0, len(stocks), step):
-        #     p = mp.Process(target=to_sql, args=(stocks[i:], engine,))
-        #     p.start()
-        #     p_list.append(p)
-        # for m in p_list:
-        #     m.join()
-        #
-        # exit()