from xtquant import xtdata, xttrader from datetime import datetime as dt import pandas as pd import math from sqlalchemy import create_engine import multiprocessing as mp from apscheduler.schedulers.blocking import BlockingScheduler # pd.set_option('display.max_rows', None) # 设置显示最大行 path = 'C:\\qmt\\userdata_mini' field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] cpu_count = mp.cpu_count() def to_sql(stock_list, engine): print(dt.now(), '开始循环入库!') for stock in stock_list: print(stock) data = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back') df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1) df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount'] df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0)) df.reset_index(drop=True, inplace=True) print(df) df.to_sql('%s_1d' %stock, con=engine, index=True, if_exists='append') # with engine.connect() as con: # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" %stock) # exit() def to_df(key,valus,engine): print('to_df') pass def download_data(stocks, engine): print(dt.now(), '开始下载!') xtdata.download_history_data2(stock_list=stocks, period='1d', start_time='', end_time='') print(dt.now(), '下载完成,准备入库!') to_sql(stocks, engine) sttime = dt.now() if __name__ == '__main__': stocks = xtdata.get_stock_list_in_sector('沪深A股') field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] cpu_count = mp.cpu_count() stocks.sort() step = math.ceil(len(stocks) / cpu_count) engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8') # num = 1 sched = BlockingScheduler() sched.add_job(func=download_data, trigger='cron', hour='22', minute='45', args=[stocks, engine]) try: sched.start() except (KeyboardInterrupt, SystemExit): pass print(dt.now() - sttime, '更新完成,准备入库!') # to_sql(stocks, engine) # if sched.add_job(download_data, 'cron', hour=22,minute=00): # print(dt.now() - sttime, '更新完成,准备入库!' ) # to_sql(stocks, engine) # p_list = [] # # data = xtdata.get_market_data(field, stocks, '1d', end_time='', count=-1, dividend_type='back') # for i in range(0, len(stocks), step): # p = mp.Process(target=to_sql, args=(stocks[i:], engine,)) # p.start() # p_list.append(p) # for m in p_list: # m.join() # # exit()