Bladeren bron

qmt-download history data 入数据库

Daniel 2 jaren geleden
bovenliggende
commit
8523e3d051
2 gewijzigde bestanden met toevoegingen van 80 en 15 verwijderingen
  1. 58 8
      QMT/download_history_data2.py
  2. 22 7
      QMT/get_local_data.py

+ 58 - 8
QMT/download_history_data2.py

@@ -1,11 +1,61 @@
 from xtquant import xtdata, xttrader
+from datetime import datetime as dt
+import pandas as pd
+import math
+from sqlalchemy import create_engine
+import multiprocessing as mp
+# pd.set_option('display.max_rows', None) # 设置显示最大行
 
-path = 'c:\\qmt\\userdata_mini'
 
-stocks = xtdata.get_stock_list_in_sector('沪深A股')
-stocks.sort()
-def f(data):
-    print('完成了!')
-# xtdata.download_history_data('000001.SH', '1d','','')
-num = xtdata.download_history_data2(stock_list=stocks, period='1d', start_time='', end_time='', callback=f)
-print(num)
+path = 'C:\\qmt\\userdata_mini'
+
+
+field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
+cpu_count = mp.cpu_count()
+
+
+def to_sql(stock_list, engine):
+    for stock in stock_list:
+        print(stock)
+        data = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
+        df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1)
+        df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
+        df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+        df.reset_index(drop=True, inplace=True)
+        print(df)
+        df.to_sql('%s_1d' %stock, con=engine, index=True, if_exists='append')
+
+        # with engine.connect() as con:
+        #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" %stock)
+        # exit()
+
+
+def to_df(key,valus,engine):
+    print('to_df')
+    pass
+
+sttime = dt.now()
+
+if __name__ == '__main__':
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
+    cpu_count = mp.cpu_count()
+    stocks.sort()
+    step = math.ceil(len(stocks) / cpu_count)
+    engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
+
+    num = xtdata.download_history_data2(stock_list=stocks, period='1d', start_time='', end_time='')
+    # num = 1
+    if num:
+        print(dt.now() - sttime, '更新完成,准备入库!', )
+        to_sql(stocks, engine)
+        # p_list = []
+        # # data = xtdata.get_market_data(field, stocks, '1d', end_time='', count=-1, dividend_type='back')
+        # for i in range(0, len(stocks), step):
+        #     p = mp.Process(target=to_sql, args=(stocks[i:], engine,))
+        #     p.start()
+        #     p_list.append(p)
+        # for m in p_list:
+        #     m.join()
+        #
+        # exit()

+ 22 - 7
QMT/get_local_data.py

@@ -1,10 +1,11 @@
 #coding:utf-8
-
+from datetime import datetime as dt
 
 # 用get_market_data订阅历史和实时行情
 
 import time
 
+import pandas as pd
 
 if __name__=='__main__':
     from xtquant import xtdata
@@ -14,13 +15,27 @@ if __name__=='__main__':
     # 同时间、同股票、同周期的行情下载一次即可,
     xtdata.download_history_data(s, '1d','','')
     xtdata.download_history_data('000001.SH', '1d','','')  
-    data = xtdata.get_market_data([], [s], '1d', end_time='', count=2,dividend_type='back')
-    data2 = xtdata.get_local_data([], [s], '1d', end_time='', count=2,dividend_type='back')
+    data = xtdata.get_market_data([], [s], '1d', end_time='', count=-1, dividend_type='back')
+    # data2 = xtdata.get_local_data([], [s], '1d', end_time='', count=2,dividend_type='back')
     print('data from get_local_data:\n')
-   
+    df = pd.DataFrame()
     for column in data:
-        print(f"              {column}\n {data[column].head()}\n")
+        if column in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']:
+            # print(f"              {column}\n {data[column].head()}\n")
+            print(column, data[column].T)
+            df=pd.concat([df, data[column].T],axis=1)
+    df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
+    df['time']=df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+    print('!!!!!!!', df)
+    # print(df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0)))
 
-    for column in data2:
-        print(f"              {column}\n {data2[column].head()}\n")
+    df2 = pd.concat([data[i].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']], axis=1)
+    df2.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
+    df2['time']=df2['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+    print('>>>>>>>>>>>', df2)
+    print(df.index)
+    exit()
+    #
+    # for column in data2:
+    #     print(f"              {column}\n {data2[column].head()}\n")