12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- from xtquant import xtdata
- from datetime import datetime as dt
- import pandas as pd
- import math
- from sqlalchemy import create_engine, text
- import multiprocessing as mp
- from multiprocessing import freeze_support
- import os
- from apscheduler.schedulers.blocking import BlockingScheduler
- import traceback
- import psutil
- import pymysql
- from tqdm import tqdm
- import logging
- pd.set_option('display.max_columns', None) # 设置显示最大行
- field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
- # 创建共享计数器
- count = mp.Value('i', 0)
- eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qihuo?charset=utf8',)
- x = 'jmJQ00.DF'
- def err_call_back(err):
- print(f'问题在这里~ error:{str(err)}')
- traceback.print_exc()
- def to_sql():
- global eng_w
- data = xtdata.get_market_data([], [x], '1h', end_time='', count=-1)
- print(data)
- df = pd.concat([data[i].loc[x].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
- 'amount']], axis=1)
- df.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume', 'amount']
- df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
- df.reset_index(drop=True, inplace=True)
- print(df)
- try:
- df.to_sql('%s_5m' % x, con=eng_w, index=False, if_exists='replace', chunksize=20000)
- except BaseException as e:
- print( e)
- pass
- finally:
- print(f'入库完成!')
- eng_w.dispose()
- # eng_w2.dispose()
- def download_data():
- print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!')
- xtdata.download_history_data(x, '5m', '', '')
- print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!')
- to_sql()
- # async_results = []
- # pool = mp.Pool(processes=24)
- # for stock in tqdm(stock_list, desc='入库进度'):
- # async_result = pool.apply_async(func=to_sql, args=(stock, ), error_callback=err_call_back)
- # async_results.append(async_result)
- # print(f'记录循环{len(async_results)}次!')
- # pool.close()
- # pool.join()
- # 统计返回为 None 的结果数量
- # none_count = 0
- # for i, result_async in enumerate(async_results):
- # _ = result_async.get() # 获取任务的结果
- # if _ is None:
- # none_count += 1
- # print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!")
- if __name__ == '__main__':
- logger = mp.log_to_stderr()
- logger.setLevel(logging.DEBUG)
- freeze_support()
- field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
- cpu_list = list(range(16))
- pus = psutil.Process()
- pus.cpu_affinity(cpu_list)
- download_data()
- # scheduler = BlockingScheduler()
- # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
- # timezone="Asia/Shanghai", max_instances=10)
- # try:
- # scheduler.start()
- # except (KeyboardInterrupt, SystemExit):
- # pass
|