download_data_whole.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. from xtquant import xtdata
  2. from datetime import datetime as dt
  3. import pandas as pd
  4. import math
  5. from sqlalchemy import create_engine, text
  6. import multiprocessing as mp
  7. from multiprocessing import freeze_support
  8. import os
  9. from apscheduler.schedulers.blocking import BlockingScheduler
  10. import traceback
  11. import psutil
  12. import pymysql
  13. pd.set_option('display.max_columns', None) # 设置显示最大行
  14. # path = 'C:\\qmt\\userdata_mini'
  15. path = '\\DANIEL-NUC\\qmt\\userdata_mini'
  16. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  17. # 创建共享计数器
  18. count = mp.Value('i', 0)
  19. eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',)
  20. eng_w2 = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8',)
  21. def err_call_back(err):
  22. print(f'问题在这里~ error:{str(err)}')
  23. traceback.print_exc()
  24. def to_sql(stock):
  25. global eng_w, eng_w2
  26. # 后复权数据
  27. data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
  28. df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  29. 'amount']], axis=1)
  30. df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
  31. df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  32. df_back.reset_index(drop=True, inplace=True)
  33. # 前复权数据
  34. data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
  35. df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  36. 'amount']], axis=1)
  37. df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
  38. 'amount_front']
  39. df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  40. df = pd.merge_asof(df_back, df_front, 'time')
  41. # print(df)
  42. try:
  43. # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
  44. df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
  45. df.to_sql('%s_1d' % stock, con=eng_w2, index=False, if_exists='replace', chunksize=20000)
  46. with count.get_lock():
  47. count.value += 1
  48. except BaseException as e:
  49. print(stock, e)
  50. pass
  51. finally:
  52. eng_w.dispose()
  53. eng_w2.dispose()
  54. def download_data():
  55. global count
  56. stock_list = xtdata.get_stock_list_in_sector('沪深A股')
  57. '''
  58. # 连接数据库 获取股票列表
  59. conn_engine_hlfx_pool = create_engine(
  60. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  61. con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
  62. stock_list = pd.read_sql_query(
  63. text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
  64. '''
  65. results_list = ','.join(set(stock_list))
  66. print(f'今日个股列表为{len(stock_list)}')
  67. db_pool = pymysql.connect(host='localhost',
  68. user='root',
  69. port=3307,
  70. password='r6kEwqWU9!v3',
  71. database='hlfx_pool')
  72. db_pool2 = pymysql.connect(host='localhost',
  73. user='root',
  74. port=3308,
  75. password='r6kEwqWU9!v3',
  76. database='hlfx_pool')
  77. cursor_pool = db_pool.cursor()
  78. cursor_pool2 = db_pool2.cursor()
  79. sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % (
  80. 'stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  81. cursor_pool.execute(sql)
  82. cursor_pool2.execute(sql)
  83. db_pool.commit()
  84. db_pool2.commit()
  85. print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!')
  86. xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
  87. print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!')
  88. async_results = []
  89. pool = mp.Pool(processes=mp.cpu_count())
  90. for stock in stock_list:
  91. async_result = pool.apply_async(func=to_sql, args=(stock, ), error_callback=err_call_back)
  92. async_results.append(async_result)
  93. pool.close()
  94. pool.join()
  95. # 统计返回为 None 的结果数量
  96. none_count = 0
  97. for i, result_async in enumerate(async_results):
  98. _ = result_async.get() # 获取任务的结果
  99. if _ is None:
  100. none_count += 1
  101. print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!")
  102. if __name__ == '__main__':
  103. freeze_support()
  104. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  105. cpu_count = mp.cpu_count()
  106. pus = psutil.Process()
  107. download_data()
  108. # scheduler = BlockingScheduler()
  109. # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
  110. # timezone="Asia/Shanghai", max_instances=10)
  111. # try:
  112. # scheduler.start()
  113. # except (KeyboardInterrupt, SystemExit):
  114. # pass