download_data_whole.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from xtquant import xtdata
  2. from datetime import datetime as dt
  3. import pandas as pd
  4. import math
  5. from sqlalchemy import create_engine
  6. import multiprocessing as mp
  7. import os
  8. from apscheduler.schedulers.blocking import BlockingScheduler
  9. pd.set_option('display.max_columns', None) # 设置显示最大行
  10. path = 'C:\\qmt\\userdata_mini'
  11. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  12. cpu_count = mp.cpu_count()
  13. eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
  14. def err_call_back(err):
  15. print(f'问题在这里~ error:{str(err)}')
  16. def to_sql(stock_list):
  17. print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}')
  18. m = 0
  19. for stock in stock_list:
  20. # 后复权数据
  21. data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
  22. df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  23. 'amount']], axis=1)
  24. df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
  25. df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  26. df_back.reset_index(drop=True, inplace=True)
  27. # 前复权数据
  28. data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
  29. df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  30. 'amount']], axis=1)
  31. df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
  32. 'amount_front']
  33. df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  34. df = pd.merge_asof(df_back, df_front, 'time')
  35. # print(df)
  36. try:
  37. df.to_sql('%s_1d' % stock, con=eng_w, index=True, if_exists='replace')
  38. except BaseException:
  39. print(stock)
  40. pass
  41. else:
  42. m += 1
  43. print(f'Pid:{os.getpid()}已经完工了.应入库{len(stock_list)},共入库{m}支个股')
  44. def download_data():
  45. stock_list = xtdata.get_stock_list_in_sector('沪深A股')
  46. stock_list.sort()
  47. print(dt.now(), '开始下载!')
  48. xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
  49. print(dt.now(), '下载完成,准备入库!')
  50. step = math.ceil(len(stock_list) / mp.cpu_count())
  51. pool = mp.Pool(processes=mp.cpu_count())
  52. for i in range(0, len(stock_list), step):
  53. pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
  54. pool.close()
  55. pool.join()
  56. print(f'今日数据下载完毕 {dt.now()}')
  57. if __name__ == '__main__':
  58. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  59. cpu_count = mp.cpu_count()
  60. # download_data()
  61. scheduler = BlockingScheduler()
  62. scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='15', minute='40',
  63. timezone="Asia/Shanghai")
  64. try:
  65. scheduler.start()
  66. except (KeyboardInterrupt, SystemExit):
  67. pass