from xtquant import xtdata from datetime import datetime as dt import pandas as pd import math from sqlalchemy import create_engine import multiprocessing as mp import os from apscheduler.schedulers.blocking import BlockingScheduler pd.set_option('display.max_columns', None) # 设置显示最大行 path = 'C:\\qmt\\userdata_mini' field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] cpu_count = mp.cpu_count() eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8') def err_call_back(err): print(f'问题在这里~ error:{str(err)}') def to_sql(stock_list): print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}') m = 0 for stock in stock_list: # 后复权数据 data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back') df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1) df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back'] df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0)) df_back.reset_index(drop=True, inplace=True) # 前复权数据 data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front') df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1) df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front', 'amount_front'] df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0)) df = pd.merge_asof(df_back, df_front, 'time') # print(df) try: df.to_sql('%s_1d' % stock, con=eng_w, index=True, if_exists='replace') except BaseException: print(stock) pass else: m += 1 print(f'Pid:{os.getpid()}已经完工了.应入库{len(stock_list)},共入库{m}支个股') def download_data(stock_list): print(dt.now(), '开始下载!') xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='') print(dt.now(), '下载完成,准备入库!') step = math.ceil(len(stock_list) / mp.cpu_count()) pool = mp.Pool(processes=mp.cpu_count()) for i in range(0, len(stock_list), step): pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back) pool.close() pool.join() if __name__ == '__main__': stocks = xtdata.get_stock_list_in_sector('沪深A股') field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] cpu_count = mp.cpu_count() stocks.sort() step = math.ceil(len(stocks) / cpu_count) # download_data(stocks) scheduler = BlockingScheduler() scheduler.add_job(func=download_data, trigger='cron', hour='15', minute='45', args=[stocks], timezone="Asia/Shanghai") try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass