Browse Source

板块信息

Daniel 2 years ago
parent
commit
3d0ddb13bc
2 changed files with 104 additions and 0 deletions
  1. 26 0
      QMT/chongxie_run.py
  2. 78 0
      QMT/test_fundamentals.py

+ 26 - 0
QMT/chongxie_run.py

@@ -0,0 +1,26 @@
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
+from xtquant import xtdata, xtconstant
+from datetime import datetime as dt
+
+
+
+def run():
+    '''阻塞线程接收行情回调'''
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected() or dt.now() > now_date.replace(hour=11, minute=30, second=0):
+            raise Exception('行情服务连接断开')
+            break
+    return
+
+def trader(data):
+    print(dt.now(), len(data.keys()), data.keys())
+
+# stocks = stocks = xtdata.get_stock_list_in_sector('沪深A股')
+stocks = ['000001.SZ', '600000.SH', '300389.SZ', '001229.SZ', '600674.SH', '000895.SZ']
+xtdata.subscribe_whole_quote(stocks, callback=trader)
+run()

+ 78 - 0
QMT/test_fundamentals.py

@@ -0,0 +1,78 @@
+import os
+
+import pandas as pd
+import xtquant.xtdata
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
+from xtquant import xtdata, xtconstant
+from functools import reduce
+from datetime import datetime as dt
+import pymysql
+import math
+import multiprocessing as mp
+import traceback
+
+
+db_pool = pymysql.connect(host='localhost',
+                          user='root',
+                          port=3307,
+                          password='r6kEwqWU9!v3',
+                          database='hlfx_pool')
+cursor_pool = db_pool.cursor()
+
+def err_call_back(err):
+    print(f'问题在这里~ error:{str(err)}')
+    traceback.print_exc()
+
+def to_sql(stocks, sector_list):
+    print(f'my pid is {os.getpid()}')
+    for stock in stocks:
+        sec_lt = []
+        for i in sector_list:
+            sector = xtdata.get_stock_list_in_sector(i)
+            if stock in sector:
+                sec_lt.append(i)
+        print(f'{stock}属于:')
+        print(f'{sec_lt}板块')
+        results_list = ','.join(set(sec_lt))
+        print(results_list)
+        sql = "INSERT INTO sector_data (stock_code, sector) VALUES('%s', '%s')" \
+              % (stock, results_list)
+        cursor_pool.execute(sql)
+        db_pool.commit()
+
+if __name__ == '__main__':
+    sttime = dt.now()
+    stock_list = xtdata.get_stock_list_in_sector('沪深A股')
+    sector_list = xtdata.get_sector_list()
+    step = math.ceil(len(stock_list) / mp.cpu_count())
+    pool = mp.Pool(processes=mp.cpu_count())
+    for i in range(0, len(stock_list), step):
+        pool.apply_async(func=to_sql, args=(stock_list[i:i+step], sector_list,), error_callback=err_call_back)
+    pool.close()
+    pool.join()
+
+    print(dt.now()-sttime)
+
+
+
+# exit()
+
+# pd.set_option('display.unicode.ambiguous_as_wide', True)
+# pd.set_option('display.unicode.east_asian_width', True)
+# pd.set_option('display.width', 1000)
+#
+# stocks = xtdata.get_stock_list_in_sector('沪深A股')
+# # xtdata.download_financial_data(stocks, ['Balance', 'Income', 'CashFlow'])
+# stocks.sort()
+# funda = xtdata.get_financial_data(stocks[0:10], ['Balance', 'Income', 'CashFlow'], start_time='20220101', end_time='20230220')
+# for stock in stocks[0:10]:
+#     bal = funda[stock]['Balance'][['m_timetag', 'goodwill']]
+#     profit = funda[stock]['Income'][['m_timetag', 'tot_profit', 'net_profit_incl_min_int_inc_after', 's_fa_eps_basic']]
+#     cflow = funda[stock]['CashFlow'][['m_timetag', 'net_profit']]
+#     dfs = [bal, profit, cflow]
+#     df = reduce(lambda x, y: pd.merge(x, y, on='m_timetag', how='inner'), dfs)
+#     df.columns = ['披露时间', '商誉', '利润总额', '净利润', '每股收益', '净利润']
+#
+#     print(stock, '\n', df)
+