download_data_whole.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. from xtquant import xtdata
  2. from xtquant import xtdatacenter as xtdc
  3. from datetime import datetime as dt
  4. import pandas as pd
  5. import math
  6. from sqlalchemy import create_engine, text
  7. import multiprocessing as mp
  8. from multiprocessing import freeze_support
  9. import os
  10. from apscheduler.schedulers.blocking import BlockingScheduler
  11. import traceback
  12. import psutil
  13. import pymysql
  14. from tqdm import tqdm
  15. import logging
  16. xtdata.connect('', port=586211, remember_if_success=True)
  17. pd.set_option('display.max_columns', None) # 设置显示最大行
  18. # path = 'C:\\qmt\\userdata_mini'
  19. # path = '\\DANIEL-NUC\\qmt\\userdata_mini'
  20. path = 'C:\\方正证券FQT交易客户端\\userdata_mini'
  21. # path = 'C:\\迅投极速交易终端睿智融科版'
  22. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  23. # 创建共享计数器
  24. count = mp.Value('i', 0)
  25. eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8', )
  26. # eng_w2 = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8')
  27. def err_call_back(err):
  28. print(f'问题在这里~ error:{str(err)}')
  29. traceback.print_exc()
  30. def to_sql(stock):
  31. global eng_w
  32. # 后复权数据
  33. data_back = xtdata.get_market_data(field, [stock], '1d', count=-1, dividend_type='back')
  34. df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  35. 'amount']], axis=1)
  36. df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
  37. df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  38. df_back.reset_index(drop=True, inplace=True)
  39. # 前复权数据
  40. data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
  41. df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  42. 'amount']], axis=1)
  43. df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
  44. 'amount_front']
  45. df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  46. df = pd.merge_asof(df_back, df_front, 'time')
  47. df = df.round(2)
  48. # print(df)
  49. try:
  50. # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
  51. df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
  52. # df.to_sql('%s_1d' % stock, con=eng_w2, index=False, if_exists='replace', chunksize=20000)
  53. except BaseException as e:
  54. print(stock, e)
  55. pass
  56. finally:
  57. print(f'{stock}入库完成!')
  58. eng_w.dispose()
  59. # eng_w2.dispose()
  60. def on_progress(data):
  61. print(data)
  62. return
  63. def download_data():
  64. global count
  65. stock_list = xtdata.get_stock_list_in_sector('沪深A股')
  66. '''
  67. # 连接数据库 获取股票列表
  68. conn_engine_hlfx_pool = create_engine(
  69. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  70. con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
  71. stock_list = pd.read_sql_query(
  72. text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
  73. '''
  74. results_list = ','.join(set(stock_list))
  75. print(f'今日个股列表为{len(stock_list)}')
  76. db_pool = pymysql.connect(host='localhost',
  77. user='root',
  78. port=3307,
  79. password='r6kEwqWU9!v3',
  80. database='hlfx_pool')
  81. # db_pool2 = pymysql.connect(host='localhost',
  82. # user='root',
  83. # port=3308,
  84. # password='r6kEwqWU9!v3',
  85. # database='hlfx_pool')
  86. cursor_pool = db_pool.cursor()
  87. # cursor_pool2 = db_pool2.cursor()
  88. sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % (
  89. 'stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  90. cursor_pool.execute(sql)
  91. # cursor_pool2.execute(sql)
  92. try:
  93. db_pool.commit()
  94. except BaseException as e:
  95. print(e)
  96. # db_pool.rollback()
  97. # db_pool2.commit()
  98. print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!')
  99. xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='',callback=on_progress)
  100. print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!')
  101. async_results = []
  102. pool = mp.Pool(processes=8)
  103. for stock in tqdm(stock_list, desc='入库进度'):
  104. async_result = pool.apply_async(func=to_sql, args=(stock,), error_callback=err_call_back)
  105. async_results.append(async_result)
  106. print(f'记录循环{len(async_results)}次!')
  107. pool.close()
  108. pool.join()
  109. # 统计返回为 None 的结果数量
  110. none_count = 0
  111. for i, result_async in enumerate(async_results):
  112. _ = result_async.get() # 获取任务的结果
  113. if _ is None:
  114. none_count += 1
  115. print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!")
  116. if __name__ == '__main__':
  117. logger = mp.log_to_stderr()
  118. logger.setLevel(logging.DEBUG)
  119. freeze_support()
  120. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  121. cpu_list = list(range(24))
  122. pus = psutil.Process()
  123. pus.cpu_affinity(cpu_list)
  124. download_data()
  125. # scheduler = BlockingScheduler()
  126. # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
  127. # timezone="Asia/Shanghai", max_instances=10)
  128. # try:
  129. # scheduler.start()
  130. # except (KeyboardInterrupt, SystemExit):
  131. # pass