Przeglądaj źródła

Merge remote-tracking branch 'origin/master'

daniel 2 lat temu
rodzic
commit
5bff396440
4 zmienionych plików z 209 dodań i 48 usunięć
  1. 26 0
      QMT/chongxie_run.py
  2. 43 25
      QMT/qmt_real_hlfx.py
  3. 62 23
      QMT/real_time.py
  4. 78 0
      QMT/test_fundamentals.py

+ 26 - 0
QMT/chongxie_run.py

@@ -0,0 +1,26 @@
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
+from xtquant import xtdata, xtconstant
+from datetime import datetime as dt
+
+
+
+def run():
+    '''阻塞线程接收行情回调'''
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected() or dt.now() > now_date.replace(hour=11, minute=30, second=0):
+            raise Exception('行情服务连接断开')
+            break
+    return
+
+def trader(data):
+    print(dt.now(), len(data.keys()), data.keys())
+
+# stocks = stocks = xtdata.get_stock_list_in_sector('沪深A股')
+stocks = ['000001.SZ', '600000.SH', '300389.SZ', '001229.SZ', '600674.SH', '000895.SZ']
+xtdata.subscribe_whole_quote(stocks, callback=trader)
+run()

+ 43 - 25
QMT/qmt_real_hlfx.py

@@ -16,6 +16,8 @@ import multiprocessing as mp
 import os
 import psutil
 import traceback
+from apscheduler.schedulers.blocking import BlockingScheduler
+
 #原始版本
 
 # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
@@ -26,6 +28,21 @@ import traceback
 pd.set_option('display.max_columns', None) # 设置显示最大行
 fre = '1d'
 
+def run(seq):
+    '''阻塞线程接收行情回调'''
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected() or dt.now() > now_date.replace(hour=15, minute=0, second=0):
+            xtdata.unsubscribe_quote(seq)
+            print(f'现在时间:{dt.now()},已收盘')
+            raise Exception('行情服务连接断开')
+            break
+    return
+
+
 class MyXtQuantTraderCallback(XtQuantTraderCallback):
     def on_disconnected(self):
         """
@@ -244,31 +261,38 @@ def hlfx(data):
                               password='r6kEwqWU9!v3',
                               database='hlfx_pool')
     cursor_pool = db_pool.cursor()
-    print(set(results))
+    # print(set(results))
     results_list = ','.join(set(results))
     sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
     cursor_pool.execute(sql)
     db_pool.commit()
-    print(f'{dt.now()}写入新的results,hlfx_pool更新')
+    print(f'{dt.now()}写入新的results{len(results_list)}个,hlfx_pool更新')
     engine_hlfx_pool.dispose()
 
 
+
 def bridge(list):
     print(f'MyPid is {os.getpid()}, now is {dt.now()},我需要负责{len(list)}个个股数据')
-    xtdata.subscribe_whole_quote(list, callback=hlfx)
-    xtdata.run()
+    seq = xtdata.subscribe_whole_quote(list, callback=hlfx)
+    run(seq)
 
 
 def prepare():
-    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
 
-    results = pd.read_sql_query(
-        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(",")
-    results = [x.replace('XSHG', 'SH').replace('XSHE', 'SZ') for x in results]
-    print('数据库读取,并转化后缀格式', len(results))
-    # print(results[0:10])
-    engine_hlfx_pool.dispose()
-    return results
+    cpu_count = 4
+    pool = mp.Pool(processes=cpu_count, maxtasksperchild=8)
+    step = math.ceil(len(stocks) / cpu_count)
+    to_hlfx_list = []
+
+    for i in range(0, len(stocks), step):
+        to_hlfx_list.append([x for x in stocks[i:i + step]])
+
+    for m in range(cpu_count):
+        pool.apply_async(func=bridge,
+                         args=(to_hlfx_list[m],), error_callback=err_call_back)
+    pool.close()
+    pool.join()
 
 
 if __name__ == '__main__':
@@ -293,19 +317,13 @@ if __name__ == '__main__':
     # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
-    stocks = xtdata.get_stock_list_in_sector('沪深A股')
 
-    cpu_count = 4
-    pool = mp.Pool(processes=cpu_count, maxtasksperchild=8)
-    step = math.ceil(len(stocks) / cpu_count)
-    to_hlfx_list = []
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=prepare, trigger='cron', day_of_week='0-4', hour='9', minute='25',
+                      timezone="Asia/Shanghai")
+    try:
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        pass
 
-    for i in range(0, len(stocks), step):
-        to_hlfx_list.append([x for x in stocks[i:i+step]])
-
-    for m in range(cpu_count):
-        pool.apply_async(func=bridge,
-                         args=(to_hlfx_list[m],), error_callback=err_call_back)
-    pool.close()
-    pool.join()
 

+ 62 - 23
QMT/real_time.py

@@ -12,6 +12,7 @@ import pymysql
 import multiprocessing as mp
 import math
 import psutil
+from apscheduler.schedulers.blocking import BlockingScheduler
 
 auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
 db_pool = pymysql.connect(host='localhost',
@@ -23,6 +24,21 @@ cursor_pool = db_pool.cursor()
 engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
 
 
+def run(seq):
+    print(seq)
+    '''阻塞线程接收行情回调'''
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected() or dt.now() > now_date.replace(hour=15, minute=00, second=0):
+            xtdata.unsubscribe_quote(seq)
+            raise Exception('行情服务连接断开')
+            break
+    return
+
+
 def real_price(datas):
     # i = '000001.SZ'
     for i in datas:
@@ -70,21 +86,22 @@ def his_vol(stock, num):
         return df['volume_front'].iloc[num]
 
 
-def ma_judge(data, stock_list, results):
-    print(f'这个ma_judge的PID为:{os.getpid()},收到的data数据为:{len(data.keys())},stock_pool长度为{len(stock_list)},now is {dt.now()}')
+def ma_judge(data, stock_list, rate, results):
+    # print(f',收到的data数据为:{len(data.keys())},stock_pool长度为{len(stock_list)},now is {dt.now()}')
     list_judge = list(set(data.keys()) & set(stock_list))
-    print(f'本轮计算:{len(list_judge)}个股')
+    print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
     for stock in list_judge:
         i = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
         current_price, open_price = data[i]['lastPrice'], data[i]['open']
-        MA5, MA10, MA20 = ma(i, 5, data), ma(i, 10, data), ma(i, 20, data)
+        MA5, MA10, MA20, MA30, MA60, MA120 = ma(i, 5, data), ma(i, 10, data), ma(i, 20, data), ma(i, 30, data),\
+            ma(i, 60, data), ma(i, 120, data)
         MA5_1 = ma_1(i, 5)
         # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
+        # 入交易池标准:阳线\大于MA5\MA5向上\MA20<MA10\离120线有距离
         if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
-                MA20 < MA10):
+                MA20 < MA10) & (current_price > MA120 or current_price < MA120*rate):
             if his_vol(i, -1) > his_vol(i, -2):
                 results.append(i.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
-    print('RRRRRRR,', results)
 
 
 def sell_trader(data, positions_dict):
@@ -111,6 +128,10 @@ def sell_trader(data, positions_dict):
             print(f'本轮没有持仓股票信息!')
 
 
+def get_fundamentals(results):
+    return results
+    pass
+
 def buy_trader(data, positions):
     print('买入函数:', dt.now(), f'接受到{len(data.keys())}个个股')
     results = mp.Manager().list()
@@ -141,8 +162,9 @@ def buy_trader(data, positions):
 
     step = math.ceil(len(stock_pool) / (mp.cpu_count()/2))
     print('step:', step)
+    rate = 0.8
     for i in range(0, len(stock_pool), step):
-        p = mp.Process(target=ma_judge, args=(data, stock_pool[i:i + step], results))
+        p = mp.Process(target=ma_judge, args=(data, stock_pool[i:i + step], rate, results))
         mp_list.append(p)
         p.start()
     for j in mp_list:
@@ -152,8 +174,9 @@ def buy_trader(data, positions):
 
     # 选择板块
     if len(results) != 0:
+        # 基本面过滤
+        results = get_fundamentals(results)
         num_industry = get_industry(results)
-        print(num_industry)
         industry_list = []
         for key in num_industry.values():
             for key2 in key.values():
@@ -166,7 +189,6 @@ def buy_trader(data, positions):
             for key2 in value.values():
                 if key2['industry_name'] in max_industry_list:
                     results_industry.append(key)
-        print('所有:', set(results_industry))
         results_industry = ','.join(set(results_industry))
         print('1d', '\n', results_industry)
 
@@ -182,20 +204,26 @@ def buy_trader(data, positions):
 
         keep_stocks = results_industry.split(",")
         new_keep_stock = [stock.replace('XSHG', 'SH').replace('XSHE', 'SZ') for stock in keep_stocks]
-        print(f'new_keep_stock is:{len(new_keep_stock)},{new_keep_stock}')
+        print(f'new_keep_stock is:{len(new_keep_stock)}')
+
 
-        max_pos = 15
+        #进入购买程序
+        max_pos = 7
         for stock in new_keep_stock:
             asset = xt_trader.query_stock_asset(acc)
             cash = asset.cash
-            if cash > 2000 and len(positions) < max_pos:
-                if stock in new_keep_stock:
-                    current_price = data[stock]['lastPrice']
-                    volume = int((cash / 2 / current_price) // 100 * 100)
-                    print('买入信号!!!!!!', stock, volume, current_price)
-                    order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE,
-                                                     current_price, 'strategy1', 'order_test')
-                    print(order_id)
+            positions_dict = {positions[x].stock_code: positions[x].volume for x in range(0, len(positions)) if
+                              positions[x].volume > 0}
+            print(f'判断{stock}:cash={cash},持仓数量为{len(positions_dict)}')
+            current_price = data[stock]['lastPrice']
+            current_high = data[stock]['high']
+            if cash > 5000 and len(positions_dict) < max_pos and current_price > 9 \
+                    and current_price > (current_high*0.98):
+                volume = int((cash / 3 / current_price) // 100 * 100)
+                print('买入信号!!!!!!', stock, volume, current_price)
+                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE,
+                                                 current_price, 'strategy1', 'order_test')
+                print(order_id)
             else:
                 print(f'Cash只有:{cash} 或者 现有持仓{len(positions)} 超过了{max_pos}')
     engine_hlfx_pool.dispose()
@@ -216,6 +244,13 @@ def trader(data):
     buy_trader(data, positions)
 
 
+def bridge():
+    print("start")
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
+    run(seq)
+
+
 class MyXtQuantTraderCallback(XtQuantTraderCallback):
     def on_disconnected(self):
         """
@@ -289,7 +324,6 @@ if __name__ == '__main__':
     pus = psutil.Process()
     pus.cpu_affinity([4, 5, 6, 7])
 
-    print("start")
     # 指定客户端所在路径
     path = r'c:\\qmt\\userdata_mini'
     # 生成session id 整数类型 同时运行的策略不能重复
@@ -309,8 +343,13 @@ if __name__ == '__main__':
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='9', minute='40',
+                      timezone="Asia/Shanghai")
+    try:
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        pass
+
 
-    stocks = xtdata.get_stock_list_in_sector('沪深A股')
-    xtdata.subscribe_whole_quote(stocks, callback=trader)
-    xtdata.run()
     # xtdata.subscribe_quote('000001.SZ', '1d', '', '', count=1, callback=MA)

+ 78 - 0
QMT/test_fundamentals.py

@@ -0,0 +1,78 @@
+import os
+
+import pandas as pd
+import xtquant.xtdata
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
+from xtquant import xtdata, xtconstant
+from functools import reduce
+from datetime import datetime as dt
+import pymysql
+import math
+import multiprocessing as mp
+import traceback
+
+
+db_pool = pymysql.connect(host='localhost',
+                          user='root',
+                          port=3307,
+                          password='r6kEwqWU9!v3',
+                          database='hlfx_pool')
+cursor_pool = db_pool.cursor()
+
+def err_call_back(err):
+    print(f'问题在这里~ error:{str(err)}')
+    traceback.print_exc()
+
+def to_sql(stocks, sector_list):
+    print(f'my pid is {os.getpid()}')
+    for stock in stocks:
+        sec_lt = []
+        for i in sector_list:
+            sector = xtdata.get_stock_list_in_sector(i)
+            if stock in sector:
+                sec_lt.append(i)
+        print(f'{stock}属于:')
+        print(f'{sec_lt}板块')
+        results_list = ','.join(set(sec_lt))
+        print(results_list)
+        sql = "INSERT INTO sector_data (stock_code, sector) VALUES('%s', '%s')" \
+              % (stock, results_list)
+        cursor_pool.execute(sql)
+        db_pool.commit()
+
+if __name__ == '__main__':
+    sttime = dt.now()
+    stock_list = xtdata.get_stock_list_in_sector('沪深A股')
+    sector_list = xtdata.get_sector_list()
+    step = math.ceil(len(stock_list) / mp.cpu_count())
+    pool = mp.Pool(processes=mp.cpu_count())
+    for i in range(0, len(stock_list), step):
+        pool.apply_async(func=to_sql, args=(stock_list[i:i+step], sector_list,), error_callback=err_call_back)
+    pool.close()
+    pool.join()
+
+    print(dt.now()-sttime)
+
+
+
+# exit()
+
+# pd.set_option('display.unicode.ambiguous_as_wide', True)
+# pd.set_option('display.unicode.east_asian_width', True)
+# pd.set_option('display.width', 1000)
+#
+# stocks = xtdata.get_stock_list_in_sector('沪深A股')
+# # xtdata.download_financial_data(stocks, ['Balance', 'Income', 'CashFlow'])
+# stocks.sort()
+# funda = xtdata.get_financial_data(stocks[0:10], ['Balance', 'Income', 'CashFlow'], start_time='20220101', end_time='20230220')
+# for stock in stocks[0:10]:
+#     bal = funda[stock]['Balance'][['m_timetag', 'goodwill']]
+#     profit = funda[stock]['Income'][['m_timetag', 'tot_profit', 'net_profit_incl_min_int_inc_after', 's_fa_eps_basic']]
+#     cflow = funda[stock]['CashFlow'][['m_timetag', 'net_profit']]
+#     dfs = [bal, profit, cflow]
+#     df = reduce(lambda x, y: pd.merge(x, y, on='m_timetag', how='inner'), dfs)
+#     df.columns = ['披露时间', '商誉', '利润总额', '净利润', '每股收益', '净利润']
+#
+#     print(stock, '\n', df)
+