Browse Source

版本同步

Daniel 1 year ago
parent
commit
a4abde97ed

+ 1 - 1
.idea/misc.xml

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
-  <component name="ProjectRootManager" version="2" project-jdk-name="Py38Quant" project-jdk-type="Python SDK" />
+  <component name="ProjectRootManager" version="2" project-jdk-name="py311Quant" project-jdk-type="Python SDK" />
   <component name="PyPackaging">
     <option name="earlyReleasesAsUpgrades" value="true" />
   </component>

+ 1 - 0
.idea/modules.xml

@@ -2,6 +2,7 @@
 <project version="4">
   <component name="ProjectModuleManager">
     <modules>
+      <module fileurl="file://$PROJECT_DIR$/../AI/.idea/AI.iml" filepath="$PROJECT_DIR$/../AI/.idea/AI.iml" />
       <module fileurl="file://$USER_HOME$/PycharmProjects/pythonProject/.idea/pythonProject.iml" filepath="$USER_HOME$/PycharmProjects/pythonProject/.idea/pythonProject.iml" />
       <module fileurl="file://$USER_HOME$/Library/CloudStorage/OneDrive-个人/个人/python_stocks/quantify01/.idea/quantify01.iml" filepath="$USER_HOME$/Library/CloudStorage/OneDrive-个人/个人/python_stocks/quantify01/.idea/quantify01.iml" />
       <module fileurl="file://$PROJECT_DIR$/.idea/stock.iml" filepath="$PROJECT_DIR$/.idea/stock.iml" />

+ 2 - 1
.idea/stock.iml

@@ -2,10 +2,11 @@
 <module type="PYTHON_MODULE" version="4">
   <component name="NewModuleRootManager">
     <content url="file://$MODULE_DIR$" />
-    <orderEntry type="jdk" jdkName="Py39PyTorch" jdkType="Python SDK" />
+    <orderEntry type="jdk" jdkName="py311Quant" jdkType="Python SDK" />
     <orderEntry type="sourceFolder" forTests="false" />
     <orderEntry type="module" module-name="stocks_to_sql" />
     <orderEntry type="module" module-name="quantify01" />
     <orderEntry type="module" module-name="pythonProject" />
+    <orderEntry type="module" module-name="AI" />
   </component>
 </module>

+ 1 - 1
.idea/vcs.xml

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="VcsDirectoryMappings">
-    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+    <mapping directory="" vcs="Git" />
   </component>
 </project>

+ 392 - 0
QMT/230504_real_time.py

@@ -0,0 +1,392 @@
+# coding:utf-8
+from datetime import datetime as dt
+import os
+import pandas as pd
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
+from xtquant import xtdata, xtconstant
+import time
+from sqlalchemy import create_engine
+from jqdatasdk import *
+import pymysql
+import multiprocessing as mp
+import math
+import psutil
+import datetime
+from apscheduler.schedulers.blocking import BlockingScheduler
+import sys
+
+# 指定客户端所在路径
+path = r'c:\\qmt\\userdata_mini'
+# 创建资金账号为 800068 的证券账号对象
+acc = StockAccount('920000207040', 'SECURITY')
+# 生成session id 整数类型 同时运行的策略不能重复
+session_id = 123456
+xt_trader = None
+engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
+auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
+
+
+class MyXtQuantTraderCallback(XtQuantTraderCallback):
+    def on_disconnected(self):
+        """
+        连接断开
+        :return:
+        """
+        print(datetime.datetime.now(), '连接断开回调')
+
+    def on_stock_order(self, order):
+        """
+        委托回报推送
+        :param order: XtOrder对象
+        :return:
+        """
+        print(datetime.datetime.now(), '委托回调', order.order_remark)
+
+    def on_stock_trade(self, trade):
+        """
+        成交变动推送
+        :param trade: XtTrade对象
+        :return:
+        """
+        print(datetime.datetime.now(), '成交回调', trade.order_remark)
+
+    def on_order_error(self, order_error):
+        """
+        委托失败推送
+        :param order_error:XtOrderError 对象
+        :return:
+        """
+        # print("on order_error callback")
+        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
+        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
+
+    def on_cancel_error(self, cancel_error):
+        """
+        撤单失败推送
+        :param cancel_error: XtCancelError 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_order_stock_async_response(self, response):
+        """
+        异步下单回报推送
+        :param response: XtOrderResponse 对象
+        :return:
+        """
+        print(f"异步委托回调 {response.order_remark}")
+
+    def on_cancel_order_stock_async_response(self, response):
+        """
+        :param response: XtCancelOrderResponse 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_account_status(self, status):
+        """
+        :param response: XtAccountStatus 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+
+def run(seq, pid):
+    mor = datetime.datetime.strptime(
+        str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
+    afternoon = datetime.datetime.strptime(
+        str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
+    mor_1 = datetime.datetime.strptime(
+        str(dt.now().date()) + '12:59', '%Y-%m-%d%H:%M')
+    """阻塞线程接收行情回调"""
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected():
+            xtdata.unsubscribe_quote(seq)
+            raise Exception('行情服务连接断开')
+        # if mor < dt.now() < mor_1:
+        #     xtdata.unsubscribe_quote(seq)
+        #     print(f'现在时间:{dt.now()},已休市')
+        #     sys.exit()
+        #     break
+        #     return 0
+        elif dt.now() > afternoon:
+            xtdata.unsubscribe_quote(seq)
+            print(f'现在时间:{dt.now()},已收盘')
+            sys.exit()
+            break
+            # return 0
+    # return
+
+
+def get_fundamentals(results):
+    return results
+    pass
+
+
+def ma(stock, num, data):
+    global engine_stock
+    try:
+        i = (num - 1) * -1
+        df = pd.read_sql_query(
+            'select close_front from `%s_1d`' % stock, engine_stock)
+    except:
+        return 9999999
+    else:
+        ma_num = (sum(df['close_front'][i:]) + data[stock]['lastPrice']) / num
+        return ma_num
+
+
+def ma_1(stock, num):
+    global engine_stock
+    i = num * -1
+    try:
+        df = pd.read_sql_query(
+            'select close_front from `%s_1d`' % stock, engine_stock)
+    except BaseException:
+        return 9999999
+    else:
+        ma_num_1 = df['close_front'][i:].mean()
+        return ma_num_1
+
+
+def his_vol(stock, num):
+    global engine_stock
+    num = num * -1
+    try:
+        df = pd.read_sql_query(
+            'select volume_front from `%s_1d`' % stock, engine_stock)
+    except BaseException:
+        return 9999999
+    else:
+        return df['volume_front'].iloc[num]
+
+
+def ma_judge(data, list_judge, rate, results):
+    print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
+    for stock in list_judge:
+        current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
+        MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30,
+                                                                                                              data), \
+            ma(stock, 60, data), ma(stock, 120, data)
+        MA5_1 = ma_1(stock, 5)
+        # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
+        # 入交易池标准:阳线\大于MA5\MA5向上\MA20<MA10\离120线有距离
+        if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
+                MA20 < MA10) & (current_price > MA120 or current_price < MA120 * rate):
+            if his_vol(stock, -1) > his_vol(stock, -2):
+                results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
+
+
+def sell_trader(data):
+    # print('卖出函数:', dt.now())
+    positions = xt_trader.query_stock_positions(acc)
+    positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
+    print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
+    print(
+        f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume != 0])}')
+
+    for stock, can_use_volume in positions_dict.items():
+        if stock in data and can_use_volume != 0:
+            current_price = data[stock]['lastPrice']
+            open_price = data[stock]['open']
+            MA5 = ma(stock, 5, data)
+            MA5_1 = ma_1(stock, 5)
+            print(f'{stock},持仓量为{can_use_volume}当前价:{current_price},MA5:{MA5},昨日MA5:{MA5_1},开始判断:')
+            if current_price < MA5 or MA5 < MA5_1:
+                print('卖出信号!!!!!!', stock, current_price)
+                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                                 xtconstant.LATEST_PRICE, 0, 'MA5策略', '低于MA5趋势向下')
+                print('价格:', current_price, open_price, MA5, MA5_1, '低于MA5趋势向下')
+                print(order_id, stock, can_use_volume)
+            elif current_price > MA5 * 1.07:
+                print('盈利乖离率超7%!!!!!!', stock, current_price)
+                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                                 xtconstant.LATEST_PRICE, 0, 'MA5策略', '盈利乖离率超7%')
+                print('价格:', current_price, open_price, MA5, MA5_1, '盈利乖离率超7%')
+                print(order_id, stock, can_use_volume)
+        else:
+            print(f'本轮没有持仓股票信息!')
+
+
+def buy_trader(data):
+    # print('买入函数:', dt.now(), f'接受到{len(data.keys())}个个股')
+    results = mp.Manager().list()
+    mp_list = []
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+
+    try:
+        stock_pool = pd.read_sql_query(
+            'select value from `%s` order by `index` desc limit 10' % '1d', engine_hlfx_pool)
+        stock_pool = stock_pool.iloc[0, 0].split(",")
+        stock_pool.sort()
+        # print('stock_pool', len(stock_pool))
+    except BaseException:
+        pass
+
+    list_judge = list(set(data.keys()) & set(stock_pool))
+    print(f'本轮有{len(data.keys())}条个股信息,而list_judge有:{len(list_judge)}')
+    step = math.ceil(len(list_judge) / 4)
+    rate = 0.8
+    if len(list_judge) != 0:
+        for i in range(0, len(list_judge), step):
+            p = mp.Process(target=ma_judge, args=(data, list_judge[i:i + step], rate, results))
+            mp_list.append(p)
+            p.start()
+        for j in mp_list:
+            j.join()
+    results = list(set(results))
+
+    # 选择板块
+    if len(results) != 0:
+        # 基本面过滤
+        results = get_fundamentals(results)
+        num_industry = get_industry(results)
+        industry_list = []
+        for key in num_industry.values():
+            for key2 in key.values():
+                industry_list.append(key2['industry_name'])
+        industry_list = pd.value_counts(industry_list)
+        # 最热集中的n个板块
+        max_industry_list = list(industry_list[0:2].index)
+        results_industry = []
+        for key, value in num_industry.items():
+            for key2 in value.values():
+                if key2['industry_name'] in max_industry_list:
+                    results_industry.append(key)
+        results_industry = ','.join(set(results_industry))
+        print('1d', '\n', results_industry)
+
+        db_pool = pymysql.connect(host='localhost',
+                                  user='root',
+                                  port=3307,
+                                  password='r6kEwqWU9!v3',
+                                  database='hlfx_pool')
+        cursor_pool = db_pool.cursor()
+        sql = "INSERT INTO MA5_%s (date,value) VALUES('%s','%s')" % ('1d', dt.now().strftime('%Y-%m-%d %H:%M:%S'),
+                                                                     results_industry)
+        cursor_pool.execute(sql)
+        db_pool.commit()
+
+        # print(len(results_industry), results_industry)
+        print(dt.now(), '数据库数据已赋值!')
+        cursor_pool.close()
+        db_pool.close()
+
+        # 取值交易
+        keep_stocks = results_industry.split(",")
+        new_keep_stock = [stock.replace('XSHG', 'SH').replace('XSHE', 'SZ') for stock in keep_stocks]
+        print(f'{dt.now()},new_keep_stock is:{len(new_keep_stock)}')
+
+        if len(new_keep_stock) != 0:
+            # 进入购买程序
+            max_pos = 7
+            for stock in new_keep_stock:
+                positions = xt_trader.query_stock_positions(acc)
+
+                asset = xt_trader.query_stock_asset(acc)
+                cash = asset.cash
+                positions_dict = {positions[x].stock_code: positions[x].volume for x in range(0, len(positions)) if
+                                  positions[x].volume > 0}
+                print(f'判断{stock}:cash={cash},持仓数量为{len(positions_dict)}')
+                current_price = data[stock]['lastPrice']
+                current_high = data[stock]['high']
+                if stock not in positions_dict:
+                    if len(positions_dict) < max_pos and current_price > 9 \
+                            and current_price > (current_high * 0.98):
+                        if 5000 > cash:
+                            i = 1
+                        else:
+                            i = 2
+                        volume = int((cash / i / current_price) // 100 * 100)
+                        print('买入信号!!!!!!', stock, volume, current_price)
+                        order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE,
+                                                         current_price, 'MA5策略', 'MA5趋势向上')
+                        print(order_id)
+                    else:
+                        print(f'Cash只有:{cash} 或者 现有持仓{len(positions_dict)} 超过了{max_pos}')
+                else:
+                    print(f'{stock}已持仓!')
+            engine_hlfx_pool.dispose()
+    print('一轮结束了,现在时间是:', dt.now())
+
+
+def trader(data):
+    print(f'本轮订阅{len(data)}')
+    # print(f'xt_trader = {xt_trader},{session_id}')
+    # print(len(xt_trader.query_stock_positions(acc)))
+    # 卖出判断
+    sell_trader(data)
+    # 买入条件
+    buy_trader(data)
+
+
+def bridge():
+    global session_id, xt_trader
+    pid = os.getpid()
+    connect_result = -1
+    subscribe_result = -1
+
+    while True:
+        if connect_result != 0 or subscribe_result != 0:
+            session_id = int(time.time())
+            xt_trader = XtQuantTrader(path, session_id)
+            # 创建交易回调类对象,并声明接收回调
+            callback = MyXtQuantTraderCallback()
+            xt_trader.register_callback(callback)
+            # 启动交易线程
+            xt_trader.start()
+            # 建立交易连接,返回0表示连接成功
+            connect_result = xt_trader.connect()
+            print('建立交易连接,返回0表示连接成功', connect_result)
+            # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+            subscribe_result = xt_trader.subscribe(acc)
+            print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+            # 建立交易连接,返回0表示连接成功
+            connect_result = xt_trader.connect()
+            print('建立交易连接,返回0表示连接成功', connect_result)
+            # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+            subscribe_result = xt_trader.subscribe(acc)
+            print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+            time.sleep(3)
+        else:
+            break
+    print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了,session_id = {session_id}, \n, {xt_trader}')
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
+    run(seq, pid)
+
+
+def job_func():
+    print(f"Job started at {dt.now()}")
+    # 创建子进程
+    p = mp.Process(target=bridge)
+    # 启动子进程
+    p.start()
+    # 等待子进程结束
+    p.join()
+    print(f"Job finished at {dt.now()}")
+
+
+if __name__ == '__main__':
+    mp.freeze_support()
+    # print('cpu_count =', mp.cpu_count())
+    pus = psutil.Process()
+    pus.cpu_affinity([12, 13, 14, 15])
+
+    # job_func()
+
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='40',
+                      timezone="Asia/Shanghai")
+    # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='12', minute='35',
+    #                   timezone="Asia/Shanghai")
+    try:
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        pass
+

+ 61 - 0
QMT/LSTM_1.py

@@ -0,0 +1,61 @@
+import pandas as pd
+import numpy as np
+from sklearn.preprocessing import MinMaxScaler
+from keras.models import Sequential
+from keras.layers import Dense, LSTM
+from ta.volatility import BollingerBands
+from ta.trend import MACD
+from ta.trend import EMAIndicator
+from ta.volume import MFIIndicator
+from sqlalchemy import create_engine
+
+# 加载数据
+engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+df = pd.read_sql_table('600000.SH_1d', con=engine_tech)
+
+
+df = df[['open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'close_back', 'macd', 'willr', 'volume_back', 'dif', 'dea', 'rsi_6', 'rsi_12', 'rsi_24']].values
+
+# df = df[~np.isnan(df).any(axis=1), :]
+
+# 归一化
+scaler = MinMaxScaler(feature_range=(0, 1))
+scaled_data = scaler.fit_transform(df)
+print(len(df), len(scaled_data))
+
+# 构造训练数据
+lookback = 90
+target_days = 30
+
+x_train = []
+y_train = []
+for i in range(lookback, len(df) - target_days):
+    x_train.append(scaled_data[i - lookback:i])
+    y_train.append(scaled_data[i:i + target_days, 0])
+
+x_train, y_train = np.array(x_train), np.array(y_train)
+print('11111111111111111111', x_train.shape, y_train.shape)
+
+
+# 构建模型
+model = Sequential()
+model.add(LSTM(units=50, return_sequences=True, input_shape=(x_train.shape[1], x_train.shape[2])))
+model.add(LSTM(units=50, return_sequences=True))
+model.add(LSTM(units=50))
+model.add(Dense(units=30))
+
+model.compile(optimizer='adam', loss='mean_squared_error')
+
+model.fit(x_train, y_train, epochs=10, batch_size=32)
+
+# 预测未来30天走势
+test_data = scaled_data[-lookback:]
+test_data = np.expand_dims(test_data, axis=0)
+print(test_data.shape)
+predictions = model.predict(test_data)
+
+# 将归一化的预测数据转换为原始数据
+predictions = scaler.inverse_transform(predictions)
+
+# 打印预测结果
+print(predictions)

+ 7 - 6
QMT/chongxie_run.py

@@ -282,6 +282,7 @@ def ma(df, num):
 
 
 def cont(data):
+    # print(dt.now())
     code = '127045.SZ'
     dd = xtdata.get_market_data(field_list=[], stock_list=[code], period='tick', start_time='20230101', end_time='',
                                 count=-1, dividend_type='none', fill_data=True)
@@ -310,12 +311,12 @@ def cont(data):
             order_id = xt_trader.order_stock(acc, code, xtconstant.STOCK_SELL, positions_dict[code],
                                              xtconstant.LATEST_PRICE, 0, '可转债', '滞涨')
             print(f'可转债卖出_{order_id},成交价格:{xtconstant.LATEST_PRICE}')
-    if df_day['close'].iloc[-1] > ma_now > ma_1 and ma_1 < ma_2 <= ma_3 and df_day['close'].iloc[-2] < ma_1:
-        print('buy:',  df_day['time'].iloc[-1], df_day['close'].iloc[-1], ma_now)
-        order_id = xt_trader.order_stock(acc, code, xtconstant.STOCK_BUY, 10, xtconstant.LATEST_PRICE,
-                                         0, '可转债', '跳多MA5')
-        print(f'可转债Buy——{order_id},成交价格:{xtconstant.LATEST_PRICE}')
-    elif df_day['close'].iloc[-1] > df_day['close'].iloc[-2] and df_day['close'].iloc[-4] >df_day['close'].iloc[-3] > df_day['close'].iloc[-2]:
+    # if df_day['close'].iloc[-1] > ma_now > ma_1 and ma_1 < ma_2 <= ma_3 and df_day['close'].iloc[-2] < ma_1:
+    #     print('buy:',  df_day['time'].iloc[-1], df_day['close'].iloc[-1], ma_now)
+    #     order_id = xt_trader.order_stock(acc, code, xtconstant.STOCK_BUY, 10, xtconstant.LATEST_PRICE,
+    #                                      0, '可转债', '跳多MA5')
+    #     print(f'可转债Buy——{order_id},成交价格:{xtconstant.LATEST_PRICE}')
+    if df_day['close'].iloc[-1] > df_day['close'].iloc[-2] and df_day['close'].iloc[-4] >df_day['close'].iloc[-3] > df_day['close'].iloc[-2]:
         print('buy:', df_day['time'].iloc[-1], df_day['close'].iloc[-1], ma_now)
         order_id = xt_trader.order_stock(acc, code, xtconstant.STOCK_BUY, 10, xtconstant.LATEST_PRICE, 0, '可转债', 'tick底分型')
         print(f'可转债Buy——{order_id},成交价格:{xtconstant.LATEST_PRICE}')

+ 23 - 3
QMT/demo.py

@@ -4,6 +4,8 @@ from xtquant import xtdata
 from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
 from xtquant.xttype import StockAccount
 from xtquant import xtconstant
+from sqlalchemy import create_engine
+import pandas as pd
 
 #定义一个类 创建类的实例 作为状态的容器
 class _a():
@@ -99,6 +101,17 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
 
 
 if __name__ == '__main__':
+    engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+    df = pd.read_sql_table('601155.SH_1d', engine)
+    df = df[df['HL'] != '-']
+    print(df)
+    df.to_csv(f"C:\Daniel\策略\601155.csv", index=True, encoding='utf_8_sig',
+              mode='w')
+
+
+
+
+    exit()
     print("start")
     #指定客户端所在路径
     path = r'c:\\qmt\\userdata_mini'
@@ -118,9 +131,16 @@ if __name__ == '__main__':
     # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
     subscribe_result = xt_trader.subscribe(acc)
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
-    # asset = xt_trader.query_stock_asset(acc)
-    # cash = asset.cash
-    # print(cash)
+    asset = xt_trader.query_stock_asset(acc)
+    cash = asset.cash
+    positions = xt_trader.query_stock_positions(acc)
+
+    print(cash, positions)
+    # xt_trader.run_forever()
+    print('run over')
+    if '600362.SH' == positions[0].stock_code:
+        print('aaa')
+    exit()
     # xtdata.subscribe_whole_quote(["SH", "SZ"], callback=f)
     xtdata.run()
 

+ 7 - 2
QMT/download_data_whole.py

@@ -7,6 +7,7 @@ import multiprocessing as mp
 import os
 from apscheduler.schedulers.blocking import BlockingScheduler
 import traceback
+import psutil
 
 
 pd.set_option('display.max_columns', None) # 设置显示最大行
@@ -61,8 +62,10 @@ def download_data():
     print(dt.now(), '开始下载!')
     xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
     print(dt.now(), '下载完成,准备入库!')
-    step = math.ceil(len(stock_list) / mp.cpu_count())
-    pool = mp.Pool(processes=mp.cpu_count())
+    # step = math.ceil(len(stock_list) / mp.cpu_count())
+    # pool = mp.Pool(processes=mp.cpu_count())
+    pool = mp.Pool(processes=8)
+    step = math.ceil(len(stock_list) / 8)
     for i in range(0, len(stock_list), step):
         pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
     pool.close()
@@ -74,6 +77,8 @@ def download_data():
 if __name__ == '__main__':
     field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
     cpu_count = mp.cpu_count()
+    pus = psutil.Process()
+    pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19])
 
     # download_data()
 

+ 42 - 6
QMT/qmt_get_indicators.py

@@ -13,6 +13,8 @@ import talib as ta
 from xtquant import xtdata
 import os
 import traceback
+from apscheduler.schedulers.blocking import BlockingScheduler
+import psutil
 
 pd.set_option('display.max_columns', None)  # 设置显示最大行
 engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
@@ -121,6 +123,7 @@ def get_hlfx(data):
                     df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
 
                 df_day.loc[x, 'HL'] = 'L*'
+
                 while m:
                     if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
                         if (x - m) > 3:
@@ -129,12 +132,16 @@ def get_hlfx(data):
                             # 产生信号,进入hlfx_pool
                             if x == len(df_day.index) - 1:
                                 Trading_signals = 1
+                        else:
+                            # 不成笔 次级别中枢,保持L* 修订原H为H*
+                            df_day.loc[m, 'HL'] = 'H*'
                         break
 
                     elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
                         if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
                             # 前一个为底更高,且中间不存在更低的底
                             df_day.loc[x, 'HL'] = 'L'
+                            df_day.loc[m, 'HL'] = '-'
 
                             # 产生信号,进入hlfx_pool
                             if x == len(df_day.index) - 1:
@@ -148,10 +155,13 @@ def get_hlfx(data):
 
                             # MACD底背驰
                             if m_macd_dif < x_macd_dif:
-                                # 背驰底->LL
+                                # 次级别背驰底->LL
                                 df_day.loc[x, 'HL'] = 'LL'
                             break
-                        break
+                        else:
+                            # 前底更低,本底无效
+                            df_day.loc[x, 'HL'] = '-'
+                            break
                     m = m - 1
                     if m == 0:
                         df_day.loc[x, 'HL'] = 'L'
@@ -169,12 +179,16 @@ def get_hlfx(data):
                             # 产生信号,进入hlfx_pool
                             if x == len(df_day.index) - 1:
                                 Trading_signals = 2
+                        else:
+                            # 不成笔 次级别中枢,保持H* 修订原L为L*
+                            df_day.loc[m, 'HL'] = 'L*'
                         break
 
-                    elif df_day.loc[m, 'HL'] == 'H':
+                    elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
                         if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
                             # 前一个为顶,且中间存在不包含 or 更高的顶
                             df_day.loc[x, 'HL'] = 'H'
+                            df_day.loc[m, 'HL'] = '-'
                             # 产生信号,进入hlfx_pool
                             if x == len(df_day.index) - 1:
                                 Trading_signals = 2
@@ -187,9 +201,13 @@ def get_hlfx(data):
 
                             # MACD顶背驰
                             if x_macd_dif < m_macd_dif:
+                                # 次级别背驰底->HH
                                 df_day.loc[x, 'HL'] = 'HH'
                             break
-                        break
+                        else:
+                            # 前顶更高,本顶无效
+                            df_day.loc[x, 'HL'] = '-'
+                            break
                     m = m - 1
                     if m == 0:
                         df_day.loc[x, 'HL'] = 'H'
@@ -253,7 +271,7 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
     print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
 
 
-if __name__ == '__main__':
+def ind():
     sttime = dt.now()
 
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
@@ -262,7 +280,8 @@ if __name__ == '__main__':
 
     err_list = mp.Manager().list()
     fre = '1d'
-    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
+                                     pool_recycle=3600, pool_pre_ping=True)
     hlfx_pool = mp.Manager().list()
     hlfx_pool_daily = mp.Manager().list()
     hlfx_pool.extend(pd.read_sql_query(
@@ -270,6 +289,8 @@ if __name__ == '__main__':
 
     pool = mp.Pool(processes=mp.cpu_count())
     step = math.ceil(len(stocks) / mp.cpu_count())
+    # pool = mp.Pool(processes=12)
+    # step = math.ceil(len(stocks) / 12)
     # step = 10000
     x = 1
     # tech_anal(stocks, hlfx_pool)
@@ -313,3 +334,18 @@ if __name__ == '__main__':
 
     edtime = dt.now()
     print(edtime - sttime)
+
+
+if __name__ == '__main__':
+    pus = psutil.Process()
+    # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
+
+    # ind()
+
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
+                      timezone="Asia/Shanghai")
+    try:
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        pass

+ 10 - 9
QMT/qmt_real_hlfx.py

@@ -105,6 +105,8 @@ def run(seq, pid):
         str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
     afternoon = datetime.datetime.strptime(
         str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
+    mor_1 = datetime.datetime.strptime(
+        str(dt.now().date()) + '12:59', '%Y-%m-%d%H:%M')
     """阻塞线程接收行情回调"""
     import time
     client = xtdata.get_client()
@@ -114,12 +116,12 @@ def run(seq, pid):
         if not client.is_connected():
             xtdata.unsubscribe_quote(seq)
             raise Exception('行情服务连接断开')
-        if mor < dt.now():
-            xtdata.unsubscribe_quote(seq)
-            print(f'现在时间:{dt.now()},已休市')
-            sys.exit()
-            break
-            # return 0
+        # if mor < dt.now() < mor_1:
+        #     xtdata.unsubscribe_quote(seq)
+        #     print(f'现在时间:{dt.now()},已休市')
+        #     sys.exit()
+        #     break
+        #     return 0
         elif dt.now() > afternoon:
             xtdata.unsubscribe_quote(seq)
             print(f'现在时间:{dt.now()},已收盘')
@@ -140,7 +142,6 @@ def hlfx(stock_list, data):
         'select value from `%s` order by `index` desc limit 10' % fre, engine_hlfx_pool).iloc[0, 0].split(","))
     print(f'本次hlfx_pool有{len(results)}个个股')
 
-
     engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
 
     for qmt_stock in stock_list:
@@ -361,8 +362,8 @@ if __name__ == '__main__':
     scheduler = BlockingScheduler()
     scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='25',
                       timezone="Asia/Shanghai")
-    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='00',
-                      timezone="Asia/Shanghai")
+    # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='00',
+    #                   timezone="Asia/Shanghai")
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):

+ 37 - 34
QMT/real_time.py

@@ -19,6 +19,7 @@ import gc
 
 # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
 engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
+auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
 
 
 class MyXtQuantTraderCallback(XtQuantTraderCallback):
@@ -86,11 +87,33 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
         print(datetime.datetime.now(), sys._getframe().f_code.co_name)
 
 
+# 指定客户端所在路径
+path = r'c:\\qmt\\userdata_mini'
+# 生成session id 整数类型 同时运行的策略不能重复
+session_id = int(time.time())
+xt_trader = XtQuantTrader(path, session_id)
+# 创建资金账号为 800068 的证券账号对象
+acc = StockAccount('920000207040', 'SECURITY')
+# 创建交易回调类对象,并声明接收回调
+callback = MyXtQuantTraderCallback()
+xt_trader.register_callback(callback)
+# 启动交易线程
+xt_trader.start()
+# 建立交易连接,返回0表示连接成功
+connect_result = xt_trader.connect()
+print('建立交易连接,返回0表示连接成功', connect_result)
+# 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+subscribe_result = xt_trader.subscribe(acc)
+print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+
+
 def run(seq, pid):
     mor = datetime.datetime.strptime(
         str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
     afternoon = datetime.datetime.strptime(
         str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
+    mor_1 = datetime.datetime.strptime(
+        str(dt.now().date()) + '12:59', '%Y-%m-%d%H:%M')
     """阻塞线程接收行情回调"""
     import time
     client = xtdata.get_client()
@@ -100,12 +123,12 @@ def run(seq, pid):
         if not client.is_connected():
             xtdata.unsubscribe_quote(seq)
             raise Exception('行情服务连接断开')
-        if mor < dt.now():
-            xtdata.unsubscribe_quote(seq)
-            print(f'现在时间:{dt.now()},已休市')
-            sys.exit()
-            break
-            # return 0
+        # if mor < dt.now() < mor_1:
+        #     xtdata.unsubscribe_quote(seq)
+        #     print(f'现在时间:{dt.now()},已休市')
+        #     sys.exit()
+        #     break
+        #     return 0
         elif dt.now() > afternoon:
             xtdata.unsubscribe_quote(seq)
             print(f'现在时间:{dt.now()},已收盘')
@@ -218,7 +241,6 @@ def sell_trader(data):
 
 def buy_trader(data):
     print('买入函数:', dt.now(), f'接受到{len(data.keys())}个个股')
-
     results = mp.Manager().list()
     mp_list = []
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
@@ -303,7 +325,9 @@ def buy_trader(data):
         max_pos = 7
         for stock in new_keep_stock:
             positions = xt_trader.query_stock_positions(acc)
+
             asset = xt_trader.query_stock_asset(acc)
+            print('bbbb', positions, asset)
             cash = asset.cash
             positions_dict = {positions[x].stock_code: positions[x].volume for x in range(0, len(positions)) if
                               positions[x].volume > 0}
@@ -324,19 +348,17 @@ def buy_trader(data):
 
 
 def trader(data):
-    # 先判断卖出条件
-    positions = xt_trader.query_stock_positions(acc)
-    if len(positions) != 0:
-        sell_trader(data)
+    sell_trader(data)
     # 买入条件
     buy_trader(data)
 
 
 def bridge():
+    pid = os.getpid()
     print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
     seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
-    run(seq)
+    run(seq, pid)
 
 
 def job_func():
@@ -351,37 +373,18 @@ def job_func():
 
 
 if __name__ == '__main__':
-    auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
-
     mp.freeze_support()
     print('cpu_count =', mp.cpu_count())
     pus = psutil.Process()
     pus.cpu_affinity([12, 13, 14, 15, 16, 17])
 
-    # 指定客户端所在路径
-    path = r'c:\\qmt\\userdata_mini'
-    # 生成session id 整数类型 同时运行的策略不能重复
-    session_id = int(time.time())
-    xt_trader = XtQuantTrader(path, session_id)
-    # 创建资金账号为 800068 的证券账号对象
-    acc = StockAccount('920000207040', 'SECURITY')
-    # 创建交易回调类对象,并声明接收回调
-    callback = MyXtQuantTraderCallback()
-    xt_trader.register_callback(callback)
-    # 启动交易线程
-    xt_trader.start()
-    # 建立交易连接,返回0表示连接成功
-    connect_result = xt_trader.connect()
-    print('建立交易连接,返回0表示连接成功', connect_result)
-    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
-    subscribe_result = xt_trader.subscribe(acc)
-    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+    # job_func()
 
     scheduler = BlockingScheduler()
     scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='40',
                       timezone="Asia/Shanghai")
-    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='10',
-                      timezone="Asia/Shanghai")
+    # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='05',
+    #                   timezone="Asia/Shanghai")
     try:
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):

+ 65 - 99
QMT/test_kzz.py

@@ -1,115 +1,81 @@
 # coding:utf-8
-from datetime import datetime as dt
-import os
 import pandas as pd
-from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
-from xtquant.xttype import StockAccount
-from xtquant import xtdata, xtconstant
-import time
+import numpy as np
+from sklearn.preprocessing import MinMaxScaler
+from keras.models import Sequential
+from keras.layers import Dense, LSTM
+from keras.callbacks import EarlyStopping
+import matplotlib.pyplot as plt
 from sqlalchemy import create_engine
-from jqdatasdk import *
-import pymysql
-import multiprocessing as mp
-import math
-import psutil
-from apscheduler.schedulers.blocking import BlockingScheduler
-import sys
 
-class MyXtQuantTraderCallback(XtQuantTraderCallback):
-    def on_disconnected(self):
-        """
-        连接断开
-        :return:
-        """
-        print(datetime.datetime.now(), '连接断开回调')
+# 加载数据
+engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
 
-    def on_stock_order(self, order):
-        """
-        委托回报推送
-        :param order: XtOrder对象
-        :return:
-        """
-        print(datetime.datetime.now(), '委托回调', order.order_remark)
 
-    def on_stock_trade(self, trade):
-        """
-        成交变动推送
-        :param trade: XtTrade对象
-        :return:
-        """
-        print(datetime.datetime.now(), '成交回调', trade.order_remark)
+df = pd.read_sql_table('600000.SH_1d', con=engine_tech)
+print(df)
 
-    def on_order_error(self, order_error):
-        """
-        委托失败推送
-        :param order_error:XtOrderError 对象
-        :return:
-        """
-        # print("on order_error callback")
-        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
-        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
+# 计算MACD指标
+ema12 = df['close_back'].ewm(span=12, adjust=False).mean()
+ema26 = df['close_back'].ewm(span=26, adjust=False).mean()
+macd = ema12 - ema26
+signal = macd.ewm(span=9, adjust=False).mean()
+df['macd'] = macd
+df['signal'] = signal
 
-    def on_cancel_error(self, cancel_error):
-        """
-        撤单失败推送
-        :param cancel_error: XtCancelError 对象
-        :return:
-        """
-        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+# 准备数据
+data = df[['close_back', 'macd', 'signal', 'willr', 'volume_back', 'dif', 'dea', 'rsi_6', 'rsi_12', 'rsi_24']].values
+print(data)
+data = data[~np.isnan(data).any(axis=1), :]
+print('处理后', data)
 
-    def on_order_stock_async_response(self, response):
-        """
-        异步下单回报推送
-        :param response: XtOrderResponse 对象
-        :return:
-        """
-        print(f"异步委托回调 {response.order_remark}")
 
-    def on_cancel_order_stock_async_response(self, response):
-        """
-        :param response: XtCancelOrderResponse 对象
-        :return:
-        """
-        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+scaler = MinMaxScaler(feature_range=(0, 1))
+data = scaler.fit_transform(data)
+train_size = int(len(data) * 0.7)
+train_data = data[:train_size, :]
+test_data = data[train_size:, :]
 
-    def on_account_status(self, status):
-        """
-        :param response: XtAccountStatus 对象
-        :return:
-        """
-        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+def create_dataset(dataset, look_back):
+    X, Y = [], []
+    for i in range(len(dataset) - look_back - 1):
+        X.append(dataset[i:(i + look_back), :])
+        Y.append(dataset[i + look_back, 0])
+    return np.array(X), np.array(Y)
 
+look_back = 90
+train_X, train_Y = create_dataset(train_data, look_back)
+test_X, test_Y = create_dataset(test_data, look_back)
 
-if __name__ == '__main__':
-    auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
+# LSTM模型
+model = Sequential()
+model.add(LSTM(units=50, input_shape=(train_X.shape[1], train_X.shape[2])))
+model.add(Dense(units=1))
+model.compile(optimizer='adam', loss='mean_squared_error')
 
-    mp.freeze_support()
-    print('cpu_count =', mp.cpu_count())
-    pus = psutil.Process()
-    pus.cpu_affinity([6, 7])
+# 训练模型
+early_stop = EarlyStopping(monitor='val_loss', patience=10)
+history = model.fit(train_X, train_Y, epochs=100, batch_size=1, validation_data=(test_X, test_Y), callbacks=[early_stop], verbose=2)
 
-    # 指定客户端所在路径
-    path = r'c:\\qmt\\userdata_mini'
-    # 生成session id 整数类型 同时运行的策略不能重复
-    session_id = int(time.time())
-    xt_trader = XtQuantTrader(path, session_id)
-    # 创建资金账号为 800068 的证券账号对象
-    acc = StockAccount('920000207040')
-    # 创建交易回调类对象,并声明接收回调
-    callback = MyXtQuantTraderCallback()
-    xt_trader.register_callback(callback)
-    # 启动交易线程
-    xt_trader.start()
-    # 建立交易连接,返回0表示连接成功
-    connect_result = xt_trader.connect()
-    print('建立交易连接,返回0表示连接成功', connect_result)
-    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
-    subscribe_result = xt_trader.subscribe(acc)
-    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+# 绘制损失函数
+plt.plot(history.history['loss'], label='train')
+plt.plot(history.history['val_loss'], label='test')
+plt.legend()
+plt.show()
+
+# 预测未来30天的涨跌
+last_data = data[-look_back:, :]
+last_data = np.expand_dims(last_data, axis=0)
+predictions = []
+for i in range(30):
+    prediction = model.predict(last_data)
+    predictions.append(prediction)
+    last_data = np.concatenate([last_data[:, 1:, :], prediction.reshape(1, 1, 3)], axis=1)
+predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1))
+
+# 绘制预测结果
+plt.plot(predictions, label='predicted')
+plt.plot(df['back_close'].tail(30).reset_index(drop=True), label='actual')
+plt.legend()
+plt.show()
 
-    stock = '110074.SZ'
-    volume = 10
-    current_price = xtconstant.LATEST_PRICE
-    order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume,
-                                     xtconstant.LATEST_PRICE, 0, 'KZZ', 'TEST')
-    print(order_id)

+ 17 - 11
backtrader/230202_backtrader.py

@@ -12,6 +12,8 @@ import math
 from datetime import datetime as dt
 import multiprocessing as mp
 from backtrader.feeds import PandasData
+import platform
+import psutil
 
 
 # import multiprocessing
@@ -151,18 +153,22 @@ def err_call_back(err):
 
 
 def to_df(lt):
-    df = pd.DataFrame(list(lt), columns=['周期', '波动率', '乖离率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
+    print('开始存数据')
+    df = pd.DataFrame(list(lt), columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
                                '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损'])
-    df.sort_values(by=['周期', '波动率', '乖离率'], ascending=True, inplace=True)
+    df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
     df = df.reset_index(drop=True)
-    df.to_csv(f"D:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
-    print(df)
+    if platform.node() == 'DanieldeMBP.lan':
+        df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True, encoding='utf_8_sig', mode='w')
+    else:
+        df.to_csv(f"C:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d%H%m%S' )}.csv", index=True, encoding='utf_8_sig', mode='w')
+    print(f'结果:{df}')
 
 
 def backtrader(list_date, table_list, result, result_change, result_change_fall, num, Volatility, rate, err_list):
     print(f'{num}天波动率为{Volatility}%乖离率为{rate}', 'myPID is ', os.getpid())
     sttime = dt.now()
-    engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+    engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
     for stock in table_list:
         # print(stock)
         stk_df = pd.read_sql_table(stock, engine)
@@ -207,12 +213,12 @@ def backtrader(list_date, table_list, result, result_change, result_change_fall,
                 err_list.append(stock)
             else:
                 if cerebro.broker.getvalue() > 100000.0:
-                    result_change.append((cerebro.broker.getvalue() / 10000 - 1))
+                    result_change.append((cerebro.broker.getvalue() / 100000 - 1))
                     result.append(stock)
                     # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
                     # print(result)
                 else:
-                    result_change_fall.append((1 - cerebro.broker.getvalue() / 10000))
+                    result_change_fall.append((1 - cerebro.broker.getvalue() / 100000))
                     # print('aaaaaaaaaaa')
                     # print(result_change_fall)
 
@@ -252,18 +258,18 @@ if __name__ == '__main__':
                          user='root',
                          port=3307,
                          password='r6kEwqWU9!v3',
-                         database='qmt_stocks')
+                         database='qmt_stocks_whole')
     cursor = db.cursor()
     cursor.execute("show tables like '%%%s%%' " % fre)
     table_list = [tuple[0] for tuple in cursor.fetchall()]
     # print(table_list)
-    # table_list = table_list[0:100]
+    table_list = table_list[0:500]
 
     list_date = mp.Manager().list()
     thread_list = []
     pool = mp.Pool(processes=mp.cpu_count())
-    for num in range(60, 100, 20):
-        for Volatility in range(5, 7, 1):
+    for num in range(60, 80, 20):
+        for Volatility in range(5, 8, 1):
             for rate in range(7, 9, 1):
                 step = math.ceil(len(table_list) / mp.cpu_count())
                 result = []

+ 3 - 3
backtrader/230215_backtrader.py

@@ -128,7 +128,7 @@ class TestStrategy(bt.Strategy):
                     and (((lowest * (1 - vola)) < self.low[-2] < (lowest * (1 + vola))) or (
                     (lowest * (1 - vola)) < self.low[-1] < (lowest * (1 + vola)))):
                 self.order = self.buy()
-            elif self.hl[0] == 5 and ((highest * (1 - vola)) < self.high[-2] < (highest * (1 + vola))):
+            elif self.hl[0] == 5 or ((highest * (1 - vola)) < self.high[-2] < (highest * (1 + vola))):
                 self.order = self.close()
         '''
                 if len(self) > self.params.num:
@@ -279,11 +279,11 @@ if __name__ == '__main__':
     cursor.execute("show tables like '%%%s%%' " % fre)
     table_list = [tuple[0] for tuple in cursor.fetchall()]
     # print(table_list)
-    # table_list = table_list[0:100]
+    table_list = table_list[0:100]
 
     list_date = mp.Manager().list()
     thread_list = []
-    pool = mp.Pool(processes=mp.cpu_count())
+    pool = mp.Pool(processes=int(mp.cpu_count()/2))
     for num in range(60, 180, 20):
         for Volatility in range(5, 8, 1):
             for rate in range(7, 8, 1):

+ 2 - 2
backtrader/230219_backtrader.py

@@ -187,8 +187,8 @@ def to_df(lt):
                                '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损'])
     df.sort_values(by=['周期', '波动率', '量能增长率'], ascending=True, inplace=True)
     df = df.reset_index(drop=True)
-    # df.to_csv(f"D:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
-    df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
+    df.to_csv(f"C:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
+    # df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
     print(df)
 
 

+ 1 - 1
backtrader/230224_backtrader.py

@@ -174,7 +174,7 @@ def to_df(lt):
     if platform.node() == 'DanieldeMBP.lan':
         df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
     else:
-        df.to_csv(f"D:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
+        df.to_csv(f"C:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
     print(f'结果:{df}')
 
 

+ 5 - 3
backtrader/230429_bt.py

@@ -150,6 +150,8 @@ class TestStrategy(bt.Strategy):
                         self.pos_price = self.low[-1]
                         break
                     m -= 1
+                    if m < (-len(self)):
+                        break
             elif ((self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]) and ((highest * (1 - vola)) < self.high[-2] < (highest * (1 + vola)))) \
                     or self.dataclose[0] < self.pos_price:
                 self.order = self.close()
@@ -293,9 +295,9 @@ if __name__ == '__main__':
     list_date = mp.Manager().list()
     thread_list = []
     pool = mp.Pool(processes=mp.cpu_count())
-    for num in range(20, 180, 20):
-        for Volatility in range(5, 11, 1):
-            for rate in range(1, 5, 1):
+    for num in range(60, 180, 20):
+        for Volatility in range(5, 10, 1):
+            for rate in range(1, 4, 1):
                 step = math.ceil(len(table_list) / mp.cpu_count())
                 result = []
                 result_change = []

+ 49 - 29
backtrader/230503_bt.py

@@ -49,7 +49,7 @@ class TestStrategy(bt.Strategy):
     params = (
         ("num", 3),
         ('Volatility', 0),
-        ('rate', 5),  # 注意要有逗号!!
+        ('rate', 3),  # 注意要有逗号!!
     )
 
     def log(self, txt, dt=None):
@@ -133,27 +133,40 @@ class TestStrategy(bt.Strategy):
         # and (self.net_pct_l[0] > 10) and (self.net_pct_xl[0] > 3)  \
         # and (self.net_amount_main[-1] > 0) and (self.net_amount_main[0] > 0)
 
+
+
         if len(self) > self.params.num:
             vola = self.params.Volatility / 100
             rate = self.params.rate / 100
             lowest = np.min(self.low.get(size=self.params.num))
             highest = np.max(self.high.get(size=self.params.num))
+
+            #
+            # > self.sma5[-1]
+            # and (((lowest * (1 - vola)) < self.low[-2] < (lowest * (1 + vola))) or (
+            #         (lowest * (1 - vola)) < self.low[-1] < (lowest * (1 + vola)))) \
             '''
-            if (self.hl[-1] == 2 or self.hl[-1] == 1) and self.dataclose[0] > self.sma5[0] > self.sma5[-1] \
-                    and self.sma5[-4]/self.sma5[-3] > self.sma5[-3]/self.sma5[-2]  \
-                    and self.sma10[-4]/self.sma10[-3] > self.sma10[-3]/self.sma10[-2] \
-                    and self.sma20[-4] < self.sma20[-3] < self.sma20[-2] < self.sma20[-1] < self.sma20[0] \
-                    and (self.low[-2] < self.sma5[-2]*(1-rate) or self.low[-1] < self.sma5[-1]*(1-rate))\
-                    and self.dataclose[0] > self.dataopen[0] \
-                    and self.sma5[-1] < self.sma10[-1] < self.sma20[-1]:
+                        if (self.hl[-1] == 2 or self.hl[-1] == 1) and self.dataclose[0] > self.sma5[0]  \
+                    and self.dataclose[-1] > self.dataopen[-1] \
+                    and (self.sma10[-2] - self.sma5[-2]) < (self.sma10[-1] - self.sma5[-1]) \
+                    and self.low[-2] < self.sma5[-2] * (1 - rate) \
+                    and self.sma5[-1] < self.sma10[-1] < self.sma20[-1] < self.sma20[-2] < self.sma20[-3] < self.sma20[-4]:
             '''
-            if self.dataclose[0] > self.sma5[0] > self.sma10[0] and self.sma5[-1] < self.sma10[-1]:
+            if (self.hl[0] == 1 or self.hl[0] == 2 or self.hl[0] == 3) and (1-vola)*lowest < self.low[-1] < (1+vola)*lowest:
+            # if self.dataclose[0] > self.sma5[0] > self.sma10[0] and self.sma5[-1] < self.sma10[-1]:
                 self.order = self.buy()
                 self.pos_price = self.low[-1]
 
-            elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]) or self.dataclose[0] < self.pos_price:
-                self.order = self.close()
-                self.pos_price = 0
+            elif ((self.hl[0] == 4 or self.hl[0] == 5 or self.hl[0] == 6) and self.dataclose < self.sma5[0]) or self.dataclose[0] < self.pos_price:
+                    self.order = self.close()
+                    self.pos_price = 0
+
+
+            # elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]):
+            '''
+                        elif self.dataclose[0] < self.sma10[0] or self.sma5[0] < self.sma5[-1] or self.dataclose[0] < self.pos_price\
+                    or len(self) == (self.buflen()-1):
+            '''
 
     def stop(self):
         # pass
@@ -168,13 +181,13 @@ def err_call_back(err):
 def to_df(lt):
     print('开始存数据')
     df = pd.DataFrame(list(lt), columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
-                               '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损'])
+                               '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比'])
     df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
     df = df.reset_index(drop=True)
     if platform.node() == 'DanieldeMBP.lan':
-        df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
+        df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True, encoding='utf_8_sig', mode='w')
     else:
-        df.to_csv(f"C:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d')}.csv", index=True, encoding='utf-8', mode='w')
+        df.to_csv(f"C:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d%H%m%S' )}.csv", index=True, encoding='utf_8_sig', mode='w')
     print(f'结果:{df}')
 
 
@@ -186,6 +199,7 @@ def backtrader(list_date, table_list, result, result_change, result_change_fall,
         # print(stock)
         stk_df = pd.read_sql_table(stock, engine)
         stk_df.time = pd.to_datetime(stk_df.time)
+        stk_df = stk_df[stk_df['HL'] != '-']
         try:
             stk_df['HL'] = stk_df['HL'].map({'L': 1,
                                              'LL': 2,
@@ -228,31 +242,35 @@ def backtrader(list_date, table_list, result, result_change, result_change_fall,
                 try:
                     # 策略执行
                     cerebro.run()
-                except IndexError:
+                except IndexError as e:
                     err_list.append(stock)
+                    # print(e)
                 else:
                     if cerebro.broker.getvalue() > 100000.0:
-                        result_change.append((cerebro.broker.getvalue() / 10000 - 1))
+                        result_change.append(cerebro.broker.getvalue()-100000)
                         result.append(stock)
                         # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
                         # print(result)
                     else:
-                        result_change_fall.append((1 - cerebro.broker.getvalue() / 10000))
+                        result_change_fall.append(cerebro.broker.getvalue()-100000)
+                        # if (cerebro.broker.getvalue()-100000) == np.min(result_change_fall):
+                        #     print(stock, (cerebro.broker.getvalue()-100000))
                         # print('aaaaaaaaaaa')
                         # print(result_change_fall)
-
+    print(f'计算总数={len(result)+len(result_change_fall)}')
     if len(result) * len(result_change) * len(result_change_fall) != 0:
         print(f'以{num}内最低值波动{Volatility}为支撑、MA5斜率为{rate}%,结果状态为:')
-        print('正盈利的个股为:', len(result_change), '成功率为:', len(result) / len(table_list))
+        print('正盈利的个股为:', len(result), '成功率为:', len(result) / len(table_list))
         print(
-            f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
+            f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change)/len(result)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
         print(
-            f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
+            f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall)/len(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
 
+        # '周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比']
         list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
                           np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
                           np.nansum(result_change_fall), np.nanmean(result_change_fall),
-                          np.nanmin(result_change_fall), np.nanmax(result_change_fall)])
+                          np.nanmin(result_change_fall), np.nanmax(result_change_fall), len(result_change)/len(result_change_fall)])
         to_df(list_date)
         endtime = dt.now()
         print(f'{num}天波动率为{Volatility}%MA5斜率为{rate},myPID is {os.getpid()}.本轮耗时为{endtime - sttime}')
@@ -271,7 +289,8 @@ if __name__ == '__main__':
     starttime = dt.now()
     print(starttime)
     pus = psutil.Process()
-    pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
+    pus.cpu_affinity([23, 16, 17, 18, 19, 20, 21, 22])
+    # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
     # print(type(platform.node()))
     # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8', poolclass=NullPool)
 
@@ -288,15 +307,16 @@ if __name__ == '__main__':
     cursor.execute("show tables like '%%%s%%' " % fre)
     table_list = [tuple[0] for tuple in cursor.fetchall()]
     # print(table_list)
-    table_list = table_list[0:500]
+    # table_list = table_list[0:500]
     print(f'计算个股数为:{len(table_list)}')
 
     list_date = mp.Manager().list()
     thread_list = []
-    pool = mp.Pool(processes=mp.cpu_count())
-    for num in range(60, 70, 20):
-        for Volatility in range(5, 6, 1):
-            for rate in range(3, 11, 1):
+    # pool = mp.Pool(processes=mp.cpu_count())
+    pool = mp.Pool(processes=8)
+    for num in range(20, 200, 20):
+        for Volatility in range(3, 11, 1):
+            for rate in range(1, 2, 1):
                 # step = math.ceil(len(table_list) / mp.cpu_count())
                 result = []
                 result_change = []

+ 346 - 0
backtrader/230505_bt.py

@@ -0,0 +1,346 @@
+import os
+import traceback
+import numpy as np
+from sqlalchemy import create_engine
+import pandas as pd
+import pymysql
+import backtrader as bt
+import backtrader.indicators as btind
+import datetime
+import math
+from datetime import datetime as dt
+import multiprocessing as mp
+from backtrader.feeds import PandasData
+import platform
+import psutil
+
+
+# import multiprocessing
+# import matplotlib
+
+
+class MyPandasData(PandasData):
+    lines = ('hl', 'dif', 'dea', 'macd', 'rsi_6', 'rsi_12', 'rsi_24',)
+    params = (('hl', 7),
+              ('dif', 8),
+              ('dea', 9),
+              ('macd', 10),
+              ('rsi_6', 11),
+              ('rsi_12', 12),
+              ('rsi_24', 13),
+              )
+    '''
+    lines = ('change_pct', 'net_amount_main', 'net_pct_main', 'net_amount_xl', 'net_pct_xl', 'net_amount_l', 'net_pct_l'
+             , 'net_amount_m', 'net_pct_m', 'net_amount_s', 'net_pct_s',)
+    params = (('change_pct', 7),
+              ('net_amount_main', 8),
+              ('net_pct_main', 9),
+              ('net_amount_xl', 10),
+              ('net_pct_xl', 11),
+              ('net_amount_l', 12),
+              ('net_pct_l', 13),
+              ('net_amount_m', 14),
+              ('net_pct_m', 15),
+              ('net_amount_s', 16),
+              ('net_pct_s', 17),
+              )
+    '''
+
+
+class TestStrategy(bt.Strategy):
+    params = (
+        ("num", 3),
+        ('Volatility', 0),
+        ('rate', 3),  # 注意要有逗号!!
+    )
+
+    def log(self, txt, dt=None):
+        ''' Logging function for this strategy'''
+        dt = dt or self.datas[0].datetime.date(0)
+        # print('%s, %s' % (dt.isoformat(), txt))
+
+    def __init__(self):
+        # self.num = num
+        # self.Volatility = Volatility/100
+        # Keep a reference to the "close" line in the data[0] dataseries
+        self.pos_price = 0
+        self.dataclose = self.datas[0].close
+        self.dataopen = self.datas[0].open
+        self.high = self.datas[0].high
+        self.low = self.datas[0].low
+        self.volume = self.datas[0].volume
+        self.hl = self.datas[0].hl
+        self.dif = self.datas[0].dif
+        self.dea = self.datas[0].dea
+        self.macd = self.datas[0].macd
+        self.rsi_6 = self.datas[0].rsi_6
+        self.rsi_12 = self.datas[0].rsi_12
+        self.rsi_24 = self.datas[0].rsi_24
+        # self.change_pct = self.datas[0].change_pct
+        # self.net_amount_main = self.datas[0].net_amount_main
+        # self.net_pct_main = self.datas[0].net_pct_main
+        # self.net_amount_xl = self.datas[0].net_amount_xl
+        # self.net_pct_xl = self.datas[0].net_pct_xl
+        # self.net_amount_l = self.datas[0].net_amount_l
+        # self.net_pct_l = self.datas[0].net_pct_l
+        self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
+        self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
+        self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
+        self.sma60 = btind.MovingAverageSimple(self.datas[0].close, period=60)
+
+    def notify_order(self, order):
+        """
+        订单状态处理
+
+        Arguments:
+            order {object} -- 订单状态
+        """
+        if order.status in [order.Submitted, order.Accepted]:
+            # 如订单已被处理,则不用做任何事情
+            return
+
+        # 检查订单是否完成
+        if order.status in [order.Completed]:
+            if order.isbuy():
+                self.buyprice = order.executed.price
+                self.buycomm = order.executed.comm
+            self.bar_executed = len(self)
+
+        # 订单因为缺少资金之类的原因被拒绝执行
+        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
+            pass
+            # self.log('Order Canceled/Margin/Rejected')
+
+        # 订单状态处理完成,设为空
+        self.order = None
+
+    def notify_trade(self, trade):
+        """
+        交易成果
+
+        Arguments:
+            trade {object} -- 交易状态
+        """
+        if not trade.isclosed:
+            return
+
+        # 显示交易的毛利率和净利润
+        # self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm))
+
+    def next(self):
+        # print(self.num,self.Volatility)
+        # Simply log the closing price of the series from the reference
+        # self.sma20[-2] < self.sma20[-1] < self.sma20[0] and self.sma10[-2] < self.sma10[-1] < self.sma10[0]
+        # and (self.sma5[-1] < self.sma10[-1])
+        # and (self.net_pct_l[0] > 10) and (self.net_pct_xl[0] > 3)  \
+        # and (self.net_amount_main[-1] > 0) and (self.net_amount_main[0] > 0)
+
+        if len(self) > self.params.num:
+            vola = self.params.Volatility / 100
+            rate = self.params.rate / 100
+            lowest = np.min(self.low.get(size=self.params.num))
+            highest = np.max(self.high.get(size=self.params.num))
+
+            # > self.sma5[-1]
+            # and (((lowest * (1 - vola)) < self.low[-2] < (lowest * (1 + vola))) or (
+            #         (lowest * (1 - vola)) < self.low[-1] < (lowest * (1 + vola)))) \
+            if self.hl[-1] == 2 or self.hl[-1] == 1:
+                m = -2
+                self.order = self.buy()
+                self.pos_price = self.low[-1]
+
+                while True:
+                    if (self.hl[m] == 2 or self.hl[m] == 1) and self.macd[m] > self.macd[-1] \
+                            and self.dataclose[0] > self.sma5[0] \
+                            and self.dataclose[-1] > self.dataopen[-1] \
+                            and (self.sma10[-2] - self.sma5[-2]) < (self.sma10[-1] - self.sma5[-1]) \
+                            and self.low[-2] < self.sma5[-2] * (1 - rate) \
+                            and self.sma5[-1] < self.sma10[-1] < self.sma20[-1] < self.sma20[-2] < self.sma20[-3]\
+                            and lowest*(1-vola) < self.low[-1] < lowest * (1 + vola):
+                        self.order = self.buy()
+                        self.pos_price = self.low[-1]
+                        break
+                    m -= 1
+                    if m + len(self) == 2:
+                        break
+
+            # elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]):
+            elif self.dataclose[0] < self.sma5[0] or self.sma5[0] < self.sma5[-1] \
+                    or self.dataclose[0] < self.pos_price or self.high[0] > self.sma5[0] * (1 + vola):
+                self.order = self.close()
+                self.pos_price = 0
+
+    def stop(self):
+        # pass
+        self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
+
+
+def err_call_back(err):
+    print(f'出错啦~ error:{str(err)}')
+    traceback.format_exc(err)
+
+
+def to_df(lt):
+    print('开始存数据')
+    df = pd.DataFrame(list(lt),
+                      columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
+                               '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比'])
+    df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
+    df = df.reset_index(drop=True)
+    if platform.node() == 'DanieldeMBP.lan':
+        df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True,
+                  encoding='utf_8_sig', mode='w')
+    else:
+        df.to_csv(f"C:\Daniel\策略\策略穷举{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True, encoding='utf_8_sig',
+                  mode='w')
+    print(f'结果:{df}')
+
+
+def backtrader(list_date, table_list, result, result_change, result_change_fall, num, Volatility, rate, err_list):
+    print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}', 'myPID is ', os.getpid())
+    sttime = dt.now()
+    engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+    for stock in table_list:
+        # print(stock)
+        stk_df = pd.read_sql_table(stock, engine)
+        stk_df.time = pd.to_datetime(stk_df.time)
+        # stk_df = stk_df[stk_df['HL'] != '-']
+        try:
+            stk_df['HL'] = stk_df['HL'].map({'L': 1,
+                                             'LL': 2,
+                                             'L*': 3,
+                                             'H': 4,
+                                             'HH': 5,
+                                             'H*': 6,
+                                             '-': 7})
+        except BaseException:
+            print(f'{stock}数据不全,不做测试')
+        else:
+            if len(stk_df) > 60:
+                cerebro = bt.Cerebro()
+                cerebro.addstrategy(TestStrategy, num=num, Volatility=Volatility, rate=rate)
+                cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
+                data = MyPandasData(dataname=stk_df,
+                                    fromdate=datetime.datetime(2017, 1, 1),
+                                    todate=datetime.datetime(2022, 10, 30),
+                                    datetime='time',
+                                    open='open_back',
+                                    close='close_back',
+                                    high='high_back',
+                                    low='low_back',
+                                    volume='volume_back',
+                                    hl='HL',
+                                    dif='dif',
+                                    dea='dea',
+                                    macd='macd',
+                                    rsi_6='rsi_6',
+                                    rsi_12='rsi_12',
+                                    rsi_24='rsi_24',
+                                    )
+                # print('取值完成')
+                cerebro.adddata(data, name=stock)
+                cerebro.broker.setcash(100000.0)
+                cerebro.broker.setcommission(0.005)
+                cerebro.addanalyzer(bt.analyzers.PyFolio)
+                # 策略执行前的资金
+                # print('启动资金: %.2f' % cerebro.broker.getvalue())
+                try:
+                    # 策略执行
+                    cerebro.run()
+                except IndexError as e:
+                    err_list.append(stock)
+                    # print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}的{stock}错误')
+                    # print(e)
+                else:
+                    if cerebro.broker.getvalue() > 100000.0:
+                        result_change.append(cerebro.broker.getvalue() - 100000)
+                        result.append(stock)
+                        # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
+                        # print(result)
+                    else:
+                        result_change_fall.append(cerebro.broker.getvalue() - 100000)
+                        # print('aaaaaaaaaaa')
+                        # print(result_change_fall)
+    print(f'计算总数={len(result) + len(result_change_fall)}')
+    if len(result) * len(result_change) * len(result_change_fall) != 0:
+        print(f'以{num}内最低值波动{Volatility}为支撑、MA5斜率为{rate}%,结果状态为:')
+        print('正盈利的个股为:', len(result), '成功率为:', len(result) / len(table_list))
+        print(
+            f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change) / len(result)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
+        print(
+            f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall) / len(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
+
+        # '周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比']
+        list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
+                          np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
+                          np.nansum(result_change_fall), np.nanmean(result_change_fall),
+                          np.nanmin(result_change_fall), np.nanmax(result_change_fall),
+                          len(result_change) / len(result_change_fall)])
+        to_df(list_date)
+        endtime = dt.now()
+        print(f'{num}天波动率为{Volatility}%MA5斜率为{rate},myPID is {os.getpid()}.本轮耗时为{endtime - sttime}')
+    else:
+        print('阿欧', len(result), len(result_change), len(result_change_fall), num, Volatility, rate, err_list)
+        list_date.append([num, Volatility, rate, 0, len(result) / len(table_list), len(result),
+                          len(result), len(result), len(result), len(result), len(result), len(result), 0])
+    to_df(list_date)
+    # cerebro.plot()
+
+
+# df = pd.DataFrame(
+#     columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损',
+#              '平均亏损', '最大亏损', '最小亏损'])
+if __name__ == '__main__':
+    starttime = dt.now()
+    print(starttime)
+    pus = psutil.Process()
+    # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
+    # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
+    # print(type(platform.node()))
+    # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8', poolclass=NullPool)
+
+    # stocks = pd.read_sql_query(
+    #                     'select value from MA5_1d', engine_hlfx)
+
+    fre = '1d'
+    db = pymysql.connect(host='localhost',
+                         user='root',
+                         port=3307,
+                         password='r6kEwqWU9!v3',
+                         database='qmt_stocks_tech')
+    cursor = db.cursor()
+    cursor.execute("show tables like '%%%s%%' " % fre)
+    table_list = [tuple[0] for tuple in cursor.fetchall()]
+    # print(table_list)
+    # table_list = table_list[0:500]
+    print(f'计算个股数为:{len(table_list)}')
+
+    list_date = mp.Manager().list()
+    thread_list = []
+    # pool = mp.Pool(processes=mp.cpu_count())
+    pool = mp.Pool(processes=24)
+    for num in range(20, 200, 20):
+        for Volatility in range(3, 13, 1):
+            for rate in range(3, 13, 1):
+                # step = math.ceil(len(table_list) / mp.cpu_count())
+                result = []
+                result_change = []
+                result_change_fall = []
+                err_list = []
+                print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}')
+                # for i in range(0, len(table_list), step):
+                stattime = dt.now()
+                # thd = threading.local()
+                # print(i)
+                # p = mp.Process(target=backtrader, args=(df, table_list, result, result_change, result_change_fall,
+                #                                         num, Volatility, rate, err_list))
+                # thread_list.append(p)
+                pool.apply_async(func=backtrader,
+                                 args=(list_date, table_list, result, result_change, result_change_fall,
+                                       num, Volatility, rate, err_list,), error_callback=err_call_back)
+    pool.close()
+    pool.join()
+
+    edtime = dt.now()
+    print('总耗时:', edtime - starttime)
+    # df.to_csv(r'C:\Users\Daniel\Documents\策略穷举2.csv', index=True)

+ 13 - 0
docker.start.txt

@@ -6,3 +6,16 @@ docker run -itd --name mysql82 -p 3312:3306 -e MYSQL_ROOT_PASSWORD=r6kEwqWU9!v3
 mysqldump -uroot -pr6kEwqWU9!v3 -P3307 --databases qmt_stocks_whole >d:/qmt_stocks_whole.sql
 
 # version order 1
+
+
+docker run -itd --name mysql8 -p 3307:3306 -e MYSQL_ROOT_PASSWORD=r6kEwqWU9!v3 -v C:/docker_mysql/stock_data:/var/lib/mysql -v C:/docker_mysql/config/mysql.cnf:/etc/my.cnf daocloud.io/library/mysql:8.0.21
+
+docker run -itd --name mysql8033 -p 3308:3306 -e MYSQL_ROOT_PASSWORD=r6kEwqWU9!v3 -v C:/docker_mysql/0833:/var/lib/mysql -v C:/docker_mysql/conf:/etc/mysql mysql:8.0.33
+
+docker run -itd --name mysql8033 -p 3308:3306 -e MYSQL_ROOT_PASSWORD=r6kEwqWU9!v3 -v C:/docker_mysql/mysql.cnf:/etc/my.cnf mysql:8.0.33-debian
+
+docker run  -itd  -p 3307:3306 --name mysql8033 -e character-set-server=utf8mb4 --privileged=true  --restart unless-stopped -v C:/docker_mysql/stock_data:/var/lib/mysql  -e MYSQL_ROOT_PASSWORD=r6kEwqWU9!v3  -d mysql:8.0.33 --skip-log-bin --disable-log-bin --lower_case_table_names=1
+
+
+docker run  -itd  -p 3307:3306 --name mysql8033 -e character-set-server=utf8mb4 --privileged=true  --restart unless-stopped -v C:/docker_mysql/stock_data:/var/lib/mysql  -e MYSQL_ROOT_PASSWORD=r6kEwqWU9!v3  -d mysql:8.0.33 --lower_case_table_names=1 --skip-log-bin --disable-log-bin
+

+ 9 - 0
ttt.py

@@ -0,0 +1,9 @@
+m = 0
+def mm(m):
+    while True:
+        print('m=', m)
+        m = m + 1
+        if m == 10:
+            break
+    return
+print(mm(m))