download_data.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from xtquant import xtdata
  2. from datetime import datetime as dt
  3. import pandas as pd
  4. import math
  5. from sqlalchemy import create_engine
  6. import multiprocessing as mp
  7. from apscheduler.schedulers.blocking import BlockingScheduler
  8. # pd.set_option('display.max_rows', None) # 设置显示最大行
  9. path = 'C:\\qmt\\userdata_mini'
  10. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  11. cpu_count = mp.cpu_count()
  12. def to_sql(stock_list, eng_back, eng_front):
  13. print(dt.now(), '开始循环入库!')
  14. for stock in stock_list:
  15. print(stock)
  16. data = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
  17. df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']],
  18. axis=1)
  19. df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
  20. df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  21. df.reset_index(drop=True, inplace=True)
  22. print(df)
  23. df.to_sql('%s_1d' % stock, con=eng_back, index=True, if_exists='append')
  24. for stock in stock_list:
  25. print(stock)
  26. data = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
  27. df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']],
  28. axis=1)
  29. df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
  30. df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  31. df.reset_index(drop=True, inplace=True)
  32. print(df)
  33. df.to_sql('%s_1d' % stock, con=eng_front, index=True, if_exists='append')
  34. def download_data(stock_list, eng_back, eng_front):
  35. print(dt.now(), '开始下载!')
  36. xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
  37. print(dt.now(), '下载完成,准备入库!')
  38. to_sql(stock_list, eng_back, eng_front)
  39. # def to_df(key, values, engine):
  40. # print('to_df')
  41. # pass
  42. if __name__ == '__main__':
  43. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  44. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  45. cpu_count = mp.cpu_count()
  46. stocks.sort()
  47. step = math.ceil(len(stocks) / cpu_count)
  48. eng_b = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
  49. eng_f = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_front?charset=utf8')
  50. download_data(stocks, eng_b, eng_f)
  51. # scheduler = BlockingScheduler()
  52. # scheduler.add_job(func=download_data, trigger='cron', hour='15', minute='45', args=[stocks, eng_b, eng_f],
  53. # timezone="Asia/Shanghai")
  54. # try:
  55. # scheduler.start()
  56. # except (KeyboardInterrupt, SystemExit):
  57. # pass