123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- from xtquant import xtdata
- from xtquant import xtdatacenter as xtdc
- from datetime import datetime as dt
- import pandas as pd
- import math
- from sqlalchemy import create_engine, text
- import multiprocessing as mp
- from multiprocessing import freeze_support
- import os
- from apscheduler.schedulers.blocking import BlockingScheduler
- import traceback
- import psutil
- import pymysql
- from tqdm import tqdm
- import logging
- xtdata.connect('', port=586211, remember_if_success=True)
- pd.set_option('display.max_columns', None) # 设置显示最大行
- # path = 'C:\\qmt\\userdata_mini'
- # path = '\\DANIEL-NUC\\qmt\\userdata_mini'
- path = 'C:\\方正证券FQT交易客户端\\userdata_mini'
- # path = 'C:\\迅投极速交易终端睿智融科版'
- field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
- # 创建共享计数器
- count = mp.Value('i', 0)
- eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8', )
- # eng_w2 = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8')
- def err_call_back(err):
- print(f'问题在这里~ error:{str(err)}')
- traceback.print_exc()
- def to_sql(stock):
- global eng_w
- # 后复权数据
- data_back = xtdata.get_market_data(field, [stock], '1d', count=-1, dividend_type='back')
- df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
- 'amount']], axis=1)
- df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
- df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
- df_back.reset_index(drop=True, inplace=True)
- # 前复权数据
- data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
- df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
- 'amount']], axis=1)
- df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
- 'amount_front']
- df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
- df = pd.merge_asof(df_back, df_front, 'time')
- df = df.round(2)
- # print(df)
- try:
- # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
- df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
- # df.to_sql('%s_1d' % stock, con=eng_w2, index=False, if_exists='replace', chunksize=20000)
- except BaseException as e:
- print(stock, e)
- pass
- finally:
- print(f'{stock}入库完成!')
- eng_w.dispose()
- # eng_w2.dispose()
- def on_progress(data):
- print(data)
- return
- def download_data():
- global count
- stock_list = xtdata.get_stock_list_in_sector('沪深A股')
- '''
- # 连接数据库 获取股票列表
- conn_engine_hlfx_pool = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
- con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
- stock_list = pd.read_sql_query(
- text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
- '''
- results_list = ','.join(set(stock_list))
- print(f'今日个股列表为{len(stock_list)}')
- db_pool = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='hlfx_pool')
- # db_pool2 = pymysql.connect(host='localhost',
- # user='root',
- # port=3308,
- # password='r6kEwqWU9!v3',
- # database='hlfx_pool')
- cursor_pool = db_pool.cursor()
- # cursor_pool2 = db_pool2.cursor()
- sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % (
- 'stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
- cursor_pool.execute(sql)
- # cursor_pool2.execute(sql)
- try:
- db_pool.commit()
- except BaseException as e:
- print(e)
- # db_pool.rollback()
- # db_pool2.commit()
- print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!')
- xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='',callback=on_progress)
- print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!')
- async_results = []
- pool = mp.Pool(processes=8)
- for stock in tqdm(stock_list, desc='入库进度'):
- async_result = pool.apply_async(func=to_sql, args=(stock,), error_callback=err_call_back)
- async_results.append(async_result)
- print(f'记录循环{len(async_results)}次!')
- pool.close()
- pool.join()
- # 统计返回为 None 的结果数量
- none_count = 0
- for i, result_async in enumerate(async_results):
- _ = result_async.get() # 获取任务的结果
- if _ is None:
- none_count += 1
- print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!")
- if __name__ == '__main__':
- logger = mp.log_to_stderr()
- logger.setLevel(logging.DEBUG)
- freeze_support()
- field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
- cpu_list = list(range(24))
- pus = psutil.Process()
- pus.cpu_affinity(cpu_list)
- download_data()
- # scheduler = BlockingScheduler()
- # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
- # timezone="Asia/Shanghai", max_instances=10)
- # try:
- # scheduler.start()
- # except (KeyboardInterrupt, SystemExit):
- # pass
|