from xtquant import xtdata from datetime import datetime as dt import pandas as pd import math from sqlalchemy import create_engine, text import multiprocessing as mp from multiprocessing import freeze_support import os from apscheduler.schedulers.blocking import BlockingScheduler import traceback import psutil import pymysql pd.set_option('display.max_columns', None) # 设置显示最大行 # path = 'C:\\qmt\\userdata_mini' path = '\\DANIEL-NUC\\qmt\\userdata_mini' field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] # 创建共享计数器 count = mp.Value('i', 0) eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',) eng_w2 = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8',) def err_call_back(err): print(f'问题在这里~ error:{str(err)}') traceback.print_exc() def to_sql(stock): global eng_w, eng_w2 # 后复权数据 data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back') df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1) df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back'] df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0)) df_back.reset_index(drop=True, inplace=True) # 前复权数据 data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front') df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1) df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front', 'amount_front'] df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0)) df = pd.merge_asof(df_back, df_front, 'time') # print(df) try: # eng_w.connect().execute(text("truncate table `%s_1d`" % stock)) df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000) df.to_sql('%s_1d' % stock, con=eng_w2, index=False, if_exists='replace', chunksize=20000) with count.get_lock(): count.value += 1 except BaseException as e: print(stock, e) pass finally: eng_w.dispose() eng_w2.dispose() def download_data(): global count stock_list = xtdata.get_stock_list_in_sector('沪深A股') ''' # 连接数据库 获取股票列表 conn_engine_hlfx_pool = create_engine( 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8') con_engine_hlfx_pool = conn_engine_hlfx_pool.connect() stock_list = pd.read_sql_query( text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",") ''' results_list = ','.join(set(stock_list)) print(f'今日个股列表为{len(stock_list)}') db_pool = pymysql.connect(host='localhost', user='root', port=3307, password='r6kEwqWU9!v3', database='hlfx_pool') db_pool2 = pymysql.connect(host='localhost', user='root', port=3308, password='r6kEwqWU9!v3', database='hlfx_pool') cursor_pool = db_pool.cursor() cursor_pool2 = db_pool2.cursor() sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % ( 'stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list) cursor_pool.execute(sql) cursor_pool2.execute(sql) db_pool.commit() db_pool2.commit() print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!') xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='') print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!') async_results = [] pool = mp.Pool(processes=mp.cpu_count()) for stock in stock_list: async_result = pool.apply_async(func=to_sql, args=(stock, ), error_callback=err_call_back) async_results.append(async_result) pool.close() pool.join() # 统计返回为 None 的结果数量 none_count = 0 for i, result_async in enumerate(async_results): _ = result_async.get() # 获取任务的结果 if _ is None: none_count += 1 print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!") if __name__ == '__main__': freeze_support() field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] cpu_count = mp.cpu_count() pus = psutil.Process() download_data() # scheduler = BlockingScheduler() # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05', # timezone="Asia/Shanghai", max_instances=10) # try: # scheduler.start() # except (KeyboardInterrupt, SystemExit): # pass