12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- from xtquant import xtdata, xttrader
- from datetime import datetime as dt
- import pandas as pd
- import math
- from sqlalchemy import create_engine
- import multiprocessing as mp
- from apscheduler.schedulers.blocking import BlockingScheduler
- path = 'C:\\qmt\\userdata_mini'
- field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
- cpu_count = mp.cpu_count()
- def to_sql(stock_list, engine):
- print(dt.now(), '开始循环入库!')
- for stock in stock_list:
- print(stock)
- data = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
- df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1)
- df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
- df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
- df.reset_index(drop=True, inplace=True)
- print(df)
- df.to_sql('%s_1d' %stock, con=engine, index=True, if_exists='append')
-
-
-
- def to_df(key,valus,engine):
- print('to_df')
- pass
- def download_data(stocks, engine):
- print(dt.now(), '开始下载!')
- xtdata.download_history_data2(stock_list=stocks, period='1d', start_time='', end_time='')
- print(dt.now(), '下载完成,准备入库!')
- to_sql(stocks, engine)
- sttime = dt.now()
- if __name__ == '__main__':
- stocks = xtdata.get_stock_list_in_sector('沪深A股')
- field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
- cpu_count = mp.cpu_count()
- stocks.sort()
- step = math.ceil(len(stocks) / cpu_count)
- engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
-
- sched = BlockingScheduler()
- sched.add_job(func=download_data, trigger='cron', hour='15', minute='45', args=[stocks, engine])
- try:
- sched.start()
- except (KeyboardInterrupt, SystemExit):
- pass
- print(dt.now() - sttime, '更新完成,准备入库!')
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|