Daniel 1 year ago
parent
commit
b431371ac5
7 changed files with 149 additions and 56 deletions
  1. BIN
      QMT/20230516105422482.dmp
  2. 16 12
      QMT/230504_real_time.py
  3. 18 13
      QMT/5m_data_whole.py
  4. 13 8
      QMT/download_data_whole.py
  5. 8 10
      QMT/qmt_get_indicators.py
  6. 5 5
      QMT/qmt_real_hlfx.py
  7. 89 8
      ttt.py

BIN
QMT/20230516105422482.dmp


+ 16 - 12
QMT/230504_real_time.py

@@ -134,7 +134,7 @@ def ma(stock, num, data):
     try:
         i = (num - 1) * -1
         df = pd.read_sql_query(text(
-            'select close_front from `%s_1d`'% stock), engine_stock.connect())
+            'select close_front from `%s_1d`' % stock), engine_stock.connect())
     except BaseException as e:
         print(e)
         return 9999999
@@ -170,7 +170,7 @@ def his_vol(stock, num):
 
 
 def ma_judge(data, list_judge, rate, results):
-    print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
+    # print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
     for stock in list_judge:
         current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
         MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30,
@@ -189,12 +189,9 @@ def sell_trader(data):
     # print('卖出函数:', dt.now())
     positions = xt_trader.query_stock_positions(acc)
     positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
-    print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
     print(
         f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume != 0])}')
 
-
-
     for stock, can_use_volume in positions_dict.items():
         if stock in data and can_use_volume != 0:
             current_price = data[stock]['lastPrice']
@@ -203,9 +200,12 @@ def sell_trader(data):
             MA5_1 = ma_1(stock, 5)
             df = pd.read_sql_query(text(
                 'select close_front, high_front from `%s_1d`' % stock), engine_stock.connect())
-            print(f'{stock},持仓量为{can_use_volume}当前价:{current_price},MA5:{MA5},昨日MA5:{MA5_1},开始判断:')
+            print(f"{data[stock]['time']}, {stock},持仓量为{can_use_volume}当前价:{current_price},开盘价:{open_price},"
+                  f"MA5:{MA5},昨日MA5:{MA5_1},开始判断:")
             if current_price == xtdata.get_instrument_detail(stock).get('UpStopPrice') \
-                or (df['close_front'][-1] == df['high_front'][-1] and df['close_front'][-1]/df['close_front'][-2] > 0.8):
+                    or (df['close_front'].iloc[-1] == df['high_front'].iloc[-1]
+                        and df['close_front'].iloc[-1] / df['close_front'].iloc[-2] > 1.08):
+                print(f"{stock}涨停或昨日涨幅超过8%,持股观察!{data[stock]['time']}")
                 continue
             elif current_price < MA5 or MA5 < MA5_1:
                 print('卖出信号!!!!!!', stock, current_price)
@@ -220,7 +220,8 @@ def sell_trader(data):
                 print('价格:', current_price, open_price, MA5, MA5_1, '盈利乖离率超7%')
                 print(order_id, stock, can_use_volume)
         else:
-            print(f'本轮没有持仓股票信息!')
+            # print(f'本轮没有持仓股票信息!')
+            pass
         engine_stock.dispose()
 
 
@@ -241,7 +242,7 @@ def buy_trader(data):
     except BaseException as e:
         print(e)
 
-    if len(stock_pool)!=0:
+    if len(stock_pool) != 0:
         list_judge = list(set(data.keys()) & set(stock_pool))
         print(f'本轮有{len(data.keys())}条个股信息,而list_judge有:{len(list_judge)}')
     else:
@@ -322,7 +323,8 @@ def buy_trader(data):
                             i = 2
                         volume = int((cash / i / current_price) // 100 * 100)
                         print('买入信号!!!!!!', stock, volume, current_price)
-                        order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE,
+                        order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume,
+                                                         xtconstant.LATEST_PRICE,
                                                          current_price, 'MA5策略', 'MA5趋势向上')
                         print(order_id)
                     else:
@@ -373,7 +375,10 @@ def bridge():
             time.sleep(3)
         else:
             break
-    print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了,session_id = {session_id}, \n, {xt_trader}')
+    print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了,session_id = {session_id}, \n')
+    positions = xt_trader.query_stock_positions(acc)
+    positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
+    print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
     run(seq, pid)
@@ -407,4 +412,3 @@ if __name__ == '__main__':
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):
         pass
-

+ 18 - 13
QMT/5m_data_whole.py

@@ -297,10 +297,10 @@ def ind():
     hlfx_pool.extend(pd.read_sql_query(
         text("select value from %s" % fre), engine_hlfx_pool.connect()).iloc[-1, 0].split(","))
 
-    pool = mp.Pool(processes=int(mp.cpu_count()))
-    step = math.ceil(len(stocks) / mp.cpu_count())
-    # pool = mp.Pool(processes=18)
-    # step = math.ceil(len(stocks) / 12)
+    # pool = mp.Pool(processes=int(mp.cpu_count()))
+    # step = math.ceil(len(stocks) / mp.cpu_count())
+    pool = mp.Pool(processes=12)
+    step = math.ceil(len(stocks) / 12)
     # step = 10000
     # tech_anal(stocks, hlfx_pool)
     for i in range(0, len(stocks), step):
@@ -368,7 +368,7 @@ def to_sql(stock_list):
         # print(df)
         try:
             # eng_w.connect().execute(text("truncate table `%s_5m`" % stock))
-            df.to_sql('%s_5m' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
+            df.to_sql('%s_5m' % stock, con=eng_w, index=False, if_exists='replace', chunksize=5000)
         except BaseException as e:
             print(stock, e)
             pass
@@ -382,12 +382,17 @@ def to_sql(stock_list):
 def download_data():
     stock_list = xtdata.get_stock_list_in_sector('沪深A股')
     stock_list.sort()
-    step = math.ceil(len(stock_list) / mp.cpu_count())
-    pool = mp.Pool(processes=mp.cpu_count())
-    for i in range(0, len(stock_list), step):
-        pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
-    pool.close()
-    pool.join()
+    print(dt.now(), '开始下载!')
+    # xtdata.download_history_data2(stock_list=stock_list, period='5m', start_time='', end_time='')
+    print(dt.now(), '下载完成,准备入库!')
+    # step = math.ceil(len(stock_list) / mp.cpu_count())
+    # pool = mp.Pool(processes=mp.cpu_count())
+    # pool = mp.Pool(processes=12)
+    # step = math.ceil(len(stock_list) / 12)
+    # for i in range(0, len(stock_list), step):
+    #     pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
+    # pool.close()
+    # pool.join()
     ind()
 
     print(f'今日数据下载完毕 {dt.now()}')
@@ -397,12 +402,12 @@ if __name__ == '__main__':
     field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
     cpu_count = mp.cpu_count()
     pus = psutil.Process()
-    # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19])
+    pus.cpu_affinity([8, 9, 10, 11, 16, 17, 18, 19, 20, 21, 22, 23])
 
     download_data()
 
     # scheduler = BlockingScheduler()
-    # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
+    # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='23', minute='05',
     #                   timezone="Asia/Shanghai", max_instances=10)
     # try:
     #     scheduler.start()

+ 13 - 8
QMT/download_data_whole.py

@@ -2,7 +2,7 @@ from xtquant import xtdata
 from datetime import datetime as dt
 import pandas as pd
 import math
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, text
 import multiprocessing as mp
 import os
 from apscheduler.schedulers.blocking import BlockingScheduler
@@ -17,8 +17,8 @@ path = 'C:\\qmt\\userdata_mini'
 
 field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
 cpu_count = mp.cpu_count()
-eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
-                      pool_recycle=3600, pool_pre_ping=True, pool_size=100)
+
+
 
 
 def err_call_back(err):
@@ -30,6 +30,8 @@ def to_sql(stock_list):
     print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}')
     m = 0
     for stock in stock_list:
+        eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
+                              pool_recycle=3600, pool_pre_ping=True, pool_size=1)
         # 后复权数据
         data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
         df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
@@ -48,12 +50,15 @@ def to_sql(stock_list):
         df = pd.merge_asof(df_back, df_front, 'time')
         # print(df)
         try:
-            df.to_sql('%s_1d' % stock, con=eng_w, index=True, if_exists='replace')
-        except BaseException:
-            print(stock)
+            # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
+            df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
+        except BaseException as e:
+            print(stock, e)
             pass
         else:
             m += 1
+
+        eng_w.dispose()
     print(f'Pid:{os.getpid()}已经完工了.应入库{len(stock_list)},共入库{m}支个股')
 
 
@@ -81,11 +86,11 @@ if __name__ == '__main__':
     pus = psutil.Process()
     # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19])
 
-    download_data()
+    # download_data()
 
     scheduler = BlockingScheduler()
     scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
-                      timezone="Asia/Shanghai", max_instances=250)
+                      timezone="Asia/Shanghai", max_instances=10)
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):

+ 8 - 10
QMT/qmt_get_indicators.py

@@ -223,20 +223,19 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
 
     for stock in stocks:
         engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
-                               pool_size=5000, pool_recycle=7200, max_overflow=1000, pool_timeout=60)
+                               pool_size=1, pool_recycle=7200, max_overflow=1000, pool_timeout=60)
         engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
-                                    pool_size=4000, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
+                                    pool_size=1, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
         # print(stock)
         try:
             df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
             df.dropna(axis=0, how='any')
+            engine.dispose()
         except BaseException:
             print(f'{stock}读取有问题')
             traceback.print_exc()
             pass
         else:
-            engine.dispose()
-
             if len(df) != 0:
                 try:
                     get_macd_data(df)
@@ -250,6 +249,7 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
                     # print(stock, '\n', df[['open_front', 'HL']])
                     df = df.replace([np.inf, -np.inf], np.nan)
                     df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+                    engine_tech.dispose()
                 # with engine.connect() as con:
                 #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
                 except BaseException:
@@ -264,8 +264,6 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
                 err_list.append(stock)
                 print(f'{stock}数据为空')
 
-            engine_tech.dispose()
-
             if stock in hlfx_pool and T_signals == 2:
                 hlfx_pool.remove(stock)
             elif stock not in hlfx_pool and T_signals == 1:
@@ -287,7 +285,7 @@ def ind():
     err_list = mp.Manager().list()
     fre = '1d'
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
-                                     pool_size=4000, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
+                                     pool_size=1, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
     hlfx_pool = mp.Manager().list()
     hlfx_pool_daily = mp.Manager().list()
     hlfx_pool.extend(pd.read_sql_query(
@@ -295,8 +293,8 @@ def ind():
 
     pool = mp.Pool(processes=int(mp.cpu_count()))
     step = math.ceil(len(stocks) / mp.cpu_count())
-    # pool = mp.Pool(processes=18)
-    # step = math.ceil(len(stocks) / 12)
+    # pool = mp.Pool(processes=16)
+    # step = math.ceil(len(stocks) / 16)
     # step = 10000
     # tech_anal(stocks, hlfx_pool)
     for i in range(0, len(stocks), step):
@@ -348,7 +346,7 @@ if __name__ == '__main__':
 
     scheduler = BlockingScheduler()
     scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
-                      timezone="Asia/Shanghai", max_instances=250)
+                      timezone="Asia/Shanghai", max_instances=10)
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):

+ 5 - 5
QMT/qmt_real_hlfx.py

@@ -132,7 +132,7 @@ def run(seq):
 
 def hlfx(stock_list, data):
     # stock_list = list(data.keys())
-    print(f'def-->hlfx, MyPid is {os.getpid()}, 本次我需要计算{len(stock_list)},now is {dt.now()}')
+    # print(f'def-->hlfx, MyPid is {os.getpid()}, 本次我需要计算{len(stock_list)},now is {dt.now()}')
 
     # 获得hlfx_pool池子
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
@@ -140,7 +140,7 @@ def hlfx(stock_list, data):
     results = []
     results.extend(pd.read_sql_query(text(
         'select value from `%s` order by `index` desc limit 10' % fre), engine_hlfx_pool.connect()).iloc[0, 0].split(","))
-    print(f'本次hlfx_pool有{len(results)}个个股')
+    # print(f'本次hlfx_pool有{len(results)}个个股')
 
     engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
                                  pool_size=100, pool_recycle=3600, max_overflow=50, pool_timeout=60)
@@ -153,7 +153,7 @@ def hlfx(stock_list, data):
                 'dif, dea, macd, HL from `%s_%s`' % (qmt_stock, fre)), engine_stock.connect())
             df_day.columns = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd', 'HL']
         except BaseException as e:
-            print(qmt_stock, '未能读取!', e)
+            print(qmt_stock, '未能读取!')
             pass
         else:
             # 获得最新价格信息
@@ -284,12 +284,12 @@ def hlfx(stock_list, data):
                               password='r6kEwqWU9!v3',
                               database='hlfx_pool')
     cursor_pool = db_pool.cursor()
-    # print(set(results))
+
     results_list = ','.join(set(results))
     sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
     cursor_pool.execute(sql)
     db_pool.commit()
-    print(f'{dt.now()}写入新的results-{len(results_list)}个,hlfx_pool更新')
+    print(f'{dt.now()} 新的results有{len(set(results))}, \n {set(results)}')
     engine_stock.dispose()
     engine_hlfx_pool.dispose()
 

+ 89 - 8
ttt.py

@@ -1,9 +1,90 @@
 m = 0
-def mm(m):
-    while True:
-        print('m=', m)
-        m = m + 1
-        if m == 10:
-            break
-    return
-print(mm(m))
+# def mm(m):
+#     while True:
+#         print('m=', m)
+#         m = m + 1
+#         if m == 10:
+#             break
+#     return
+# print(mm(m))
+
+import pandas as pd
+import pymysql
+from sqlalchemy import create_engine, text
+import threading
+from datetime import datetime as dt
+import datetime
+from jqdatasdk.technical_analysis import *
+from xtquant import xtdata, xtconstant
+from xtquant.xttype import StockAccount
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+import time
+import math
+import multiprocessing as mp
+import os
+import psutil
+import traceback
+from apscheduler.schedulers.blocking import BlockingScheduler
+import sys
+
+# p = xtdata.get_instrument_detail('000001.sz')
+# print(p.get('UpStopPrice'))
+
+
+# stock = '000001.SZ'
+# engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
+#                              pool_size=5000, pool_recycle=50, max_overflow=-1)
+# df = pd.read_sql_query(text(
+#                 'select close_front, high_front from `%s_1d`' % stock), engine_stock.connect())
+#
+# print(df['close_front'].iloc[-1])
+# print(df['close_front'].iloc[-2])
+
+
+def sell_trader(data):
+    stock = '002645.SZ'
+    # print(data[stock][0]['close'])
+    print(xtdata.get_instrument_detail(stock).get('UpStopPrice'))
+
+    current_price = data[stock]['lastPrice']
+    print('aaa', current_price, data[stock]['time'])
+    print(data[stock])
+
+'''
+    if stock in data and can_use_volume != 0:
+        current_price = data[stock]['lastPrice']
+        open_price = data[stock]['open']
+        MA5 = ma(stock, 5, data)
+        MA5_1 = ma_1(stock, 5)
+        df = pd.read_sql_query(text(
+            'select close_front, high_front from `%s_1d`' % stock), engine_stock.connect())
+        print(f'{stock},持仓量为{can_use_volume}当前价:{current_price},MA5:{MA5},昨日MA5:{MA5_1},开始判断:')
+        if current_price == xtdata.get_instrument_detail(stock).get('UpStopPrice') \
+                or (df['close_front'].iloc[-1] == df['high_front'].iloc[-1]
+                    and df['close_front'].iloc[-1] / df['close_front'].iloc[-2] > 1.08):
+            continue
+        elif current_price < MA5 or MA5 < MA5_1:
+            print('卖出信号!!!!!!', stock, current_price)
+            order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                             xtconstant.LATEST_PRICE, 0, 'MA5策略', '低于MA5趋势向下')
+            print('价格:', current_price, open_price, MA5, MA5_1, '低于MA5趋势向下')
+            print(order_id, stock, can_use_volume)
+        elif current_price > MA5 * 1.07:
+            print('盈利乖离率超7%!!!!!!', stock, current_price)
+            order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                             xtconstant.LATEST_PRICE, 0, 'MA5策略', '盈利乖离率超7%')
+            print('价格:', current_price, open_price, MA5, MA5_1, '盈利乖离率超7%')
+            print(order_id, stock, can_use_volume)
+    else:
+        print(f'本轮没有持仓股票信息!')
+'''
+
+
+
+
+# xtdata.subscribe_quote('301125.SZ',callback=sell_trader)
+stocks = xtdata.get_stock_list_in_sector('沪深A股')
+xtdata.download_history_data2(stock_list=stocks, period='30m', start_time='', end_time='')
+print(stocks)
+seq = xtdata.subscribe_whole_quote(stocks, callback=sell_trader)
+xtdata.run()