download_futures.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from xtquant import xtdata
  2. from datetime import datetime as dt
  3. import pandas as pd
  4. import math
  5. from sqlalchemy import create_engine, text
  6. import multiprocessing as mp
  7. from multiprocessing import freeze_support
  8. import os
  9. from apscheduler.schedulers.blocking import BlockingScheduler
  10. import traceback
  11. import psutil
  12. import pymysql
  13. from tqdm import tqdm
  14. import logging
  15. pd.set_option('display.max_columns', None) # 设置显示最大行
  16. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  17. # 创建共享计数器
  18. count = mp.Value('i', 0)
  19. eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qihuo?charset=utf8',)
  20. x = 'jmJQ00.DF'
  21. def err_call_back(err):
  22. print(f'问题在这里~ error:{str(err)}')
  23. traceback.print_exc()
  24. def to_sql():
  25. global eng_w
  26. data = xtdata.get_market_data([], [x], '1h', end_time='', count=-1)
  27. print(data)
  28. df = pd.concat([data[i].loc[x].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  29. 'amount']], axis=1)
  30. df.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume', 'amount']
  31. df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  32. df.reset_index(drop=True, inplace=True)
  33. print(df)
  34. try:
  35. df.to_sql('%s_5m' % x, con=eng_w, index=False, if_exists='replace', chunksize=20000)
  36. except BaseException as e:
  37. print( e)
  38. pass
  39. finally:
  40. print(f'入库完成!')
  41. eng_w.dispose()
  42. # eng_w2.dispose()
  43. def download_data():
  44. print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!')
  45. xtdata.download_history_data(x, '5m', '', '')
  46. print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!')
  47. to_sql()
  48. # async_results = []
  49. # pool = mp.Pool(processes=24)
  50. # for stock in tqdm(stock_list, desc='入库进度'):
  51. # async_result = pool.apply_async(func=to_sql, args=(stock, ), error_callback=err_call_back)
  52. # async_results.append(async_result)
  53. # print(f'记录循环{len(async_results)}次!')
  54. # pool.close()
  55. # pool.join()
  56. # 统计返回为 None 的结果数量
  57. # none_count = 0
  58. # for i, result_async in enumerate(async_results):
  59. # _ = result_async.get() # 获取任务的结果
  60. # if _ is None:
  61. # none_count += 1
  62. # print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!")
  63. if __name__ == '__main__':
  64. logger = mp.log_to_stderr()
  65. logger.setLevel(logging.DEBUG)
  66. freeze_support()
  67. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  68. cpu_list = list(range(16))
  69. pus = psutil.Process()
  70. pus.cpu_affinity(cpu_list)
  71. download_data()
  72. # scheduler = BlockingScheduler()
  73. # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
  74. # timezone="Asia/Shanghai", max_instances=10)
  75. # try:
  76. # scheduler.start()
  77. # except (KeyboardInterrupt, SystemExit):
  78. # pass