from xtquant import xtdata from datetime import datetime as dt import pandas as pd import math from sqlalchemy import create_engine, text import multiprocessing as mp from multiprocessing import freeze_support import os from apscheduler.schedulers.blocking import BlockingScheduler import traceback import psutil import pymysql from tqdm import tqdm import logging pd.set_option('display.max_columns', None) # 设置显示最大行 field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] # 创建共享计数器 count = mp.Value('i', 0) eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qihuo?charset=utf8',) x = 'jmJQ00.DF' def err_call_back(err): print(f'问题在这里~ error:{str(err)}') traceback.print_exc() def to_sql(): global eng_w data = xtdata.get_market_data([], [x], '1h', end_time='', count=-1) print(data) df = pd.concat([data[i].loc[x].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1) df.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume', 'amount'] df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0)) df.reset_index(drop=True, inplace=True) print(df) try: df.to_sql('%s_5m' % x, con=eng_w, index=False, if_exists='replace', chunksize=20000) except BaseException as e: print( e) pass finally: print(f'入库完成!') eng_w.dispose() # eng_w2.dispose() def download_data(): print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!') xtdata.download_history_data(x, '5m', '', '') print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!') to_sql() # async_results = [] # pool = mp.Pool(processes=24) # for stock in tqdm(stock_list, desc='入库进度'): # async_result = pool.apply_async(func=to_sql, args=(stock, ), error_callback=err_call_back) # async_results.append(async_result) # print(f'记录循环{len(async_results)}次!') # pool.close() # pool.join() # 统计返回为 None 的结果数量 # none_count = 0 # for i, result_async in enumerate(async_results): # _ = result_async.get() # 获取任务的结果 # if _ is None: # none_count += 1 # print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!") if __name__ == '__main__': logger = mp.log_to_stderr() logger.setLevel(logging.DEBUG) freeze_support() field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount'] cpu_list = list(range(16)) pus = psutil.Process() pus.cpu_affinity(cpu_list) download_data() # scheduler = BlockingScheduler() # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05', # timezone="Asia/Shanghai", max_instances=10) # try: # scheduler.start() # except (KeyboardInterrupt, SystemExit): # pass