Преглед на файлове

修改多进程逻辑,改为map 5000个自动补齐

增加sell_real_time 将买|卖拆分
Daniel преди 1 година
родител
ревизия
570a387d44
променени са 7 файла, в които са добавени 1002 реда и са изтрити 63 реда
  1. 12 6
      QMT/230504_real_time.py
  2. 219 0
      QMT/230715_get_indicators.py
  3. 81 52
      QMT/download_data_whole.py
  4. 9 4
      QMT/qmt_get_indicators.py
  5. 1 1
      QMT/qmt_real_hlfx.py
  6. 300 0
      QMT/sell_real_time.py
  7. 380 0
      backtrader/230723 _bt.py

+ 12 - 6
QMT/230504_real_time.py

@@ -193,15 +193,19 @@ def sell_trader(data):
         f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume != 0])}')
 
     for stock, can_use_volume in positions_dict.items():
-        if stock in data and can_use_volume != 0:
+        # if stock in data and can_use_volume != 0:
+        if stock in data:
             current_price = data[stock]['lastPrice']
             open_price = data[stock]['open']
             MA5 = ma(stock, 5, data)
             MA5_1 = ma_1(stock, 5)
+            print(
+                f"{data[stock]['time']}, {stock}\n当前时间为:{dt.now().strftime('%Y-%m-%d %H:%M:%S')},"
+                f"信号时间:{dt.fromtimestamp((data[stock]['time']) / 1000.0)}\n"
+                f"持仓量为{can_use_volume}当前价:{current_price},开盘价:{open_price},"f"MA5:{MA5},昨日MA5:{MA5_1},开始判断:")
             df = pd.read_sql_query(text(
                 'select close_front, high_front from `%s_1d`' % stock), engine_stock.connect())
-            print(f"{data[stock]['time']}, {stock},持仓量为{can_use_volume}当前价:{current_price},开盘价:{open_price},"
-                  f"MA5:{MA5},昨日MA5:{MA5_1},开始判断:")
+
             if current_price == xtdata.get_instrument_detail(stock).get('UpStopPrice') \
                     or (df['close_front'].iloc[-1] == df['high_front'].iloc[-1]
                         and df['close_front'].iloc[-1] / df['close_front'].iloc[-2] > 1.08):
@@ -381,8 +385,10 @@ def bridge():
     positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
     print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
-    seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
-    run(seq, pid)
+    # seq_s = xtdata.subscribe_whole_quote(stocks, callback=sell_trader)
+    seq_b = xtdata.subscribe_whole_quote(stocks, callback=buy_trader)
+    # run(seq_s, pid)
+    run(seq_b, pid)
 
 
 def job_func():
@@ -402,7 +408,7 @@ if __name__ == '__main__':
     pus = psutil.Process()
     pus.cpu_affinity([12, 13, 14, 15])
 
-    job_func()
+    # job_func()
 
     scheduler = BlockingScheduler()
     scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='40',

+ 219 - 0
QMT/230715_get_indicators.py

@@ -0,0 +1,219 @@
+# coding:utf-8
+from datetime import datetime as dt
+import socket
+import pandas as pd
+import numpy as np
+from sqlalchemy import create_engine, text
+from jqdatasdk import *
+import pymysql
+import multiprocessing as mp
+from multiprocessing import freeze_support
+import math
+import talib as ta
+import os
+import traceback
+import random
+import logging
+from myindicator import myind
+import psutil
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+# 显示最大行与列
+pd.set_option('display.max_rows', None)
+pd.set_option('display.max_columns', None)
+
+# 设置日志
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+# 创建连接池
+engine = create_engine(
+    'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
+engine_tech = create_engine(
+    'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+engine_tech2 = create_engine(
+    'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_tech?charset=utf8')
+
+
+def err_call_back(err):
+    logging.error(f'进程池出错~ error:{str(err)}')
+    traceback.print_exc()
+
+
+def tech_anal(stock, fre, hlfx_pool, hlfx_pool_daily, err_list):
+    t_signals = 0
+    global engine
+    global engine_tech
+    global engine_tech2
+
+    try:
+        con_engine = engine.connect()
+        con_engine_tech = engine_tech.connect()
+        con_engine_tech2 = engine_tech2.connect()
+        try:
+            table_name = f'{stock}_{fre}'
+            # 从engine中读取table_name表存入df
+            df = pd.read_sql_table(table_name, con=engine)
+            df.dropna(axis=0, how='any')
+        except BaseException as e:
+            print(f"{stock}读取有问题")
+            traceback.print_exc()
+            err_list.append(stock)
+        else:
+            if len(df) != 0:
+                # 计算技术指标
+                try:
+                    myind.get_macd_data(df)
+                    df_temp, t_signals = myind.get_hlfx(df)
+                    myind.get_ris(df)
+                    myind.get_bias(df)
+                    myind.get_wilr(df)
+                    df = pd.merge(df, df_temp, on='time', how='left')
+                    df['HL'].fillna(value='-', inplace=True)
+                    df = df.reset_index(drop=True)
+                except BaseException as e:
+                    print(f'{stock}计算有问题', e)
+                else:
+                    try:
+                        df = df.replace([np.inf, -np.inf], np.nan)
+                        df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+                        df.to_sql('%s_1d' % stock, con=engine_tech2, index=False, if_exists='replace')
+                    except BaseException as e:
+                        print(f'{stock}存储有问题', e)
+                        traceback.print_exc()
+                        err_list.append(stock)
+            else:
+                err_list.append(stock)
+                print(f'{stock}数据为空')
+        finally:
+            if stock in hlfx_pool and t_signals == 2:
+                hlfx_pool.remove(stock)
+            elif stock not in hlfx_pool and t_signals == 1:
+                hlfx_pool.append(stock)
+                hlfx_pool_daily.append(stock)
+            con_engine.close()
+            con_engine_tech.close()
+            con_engine_tech2.close()
+            # print(f"{stock}, {T_signals}, '\n', {df_temp.head(20)}")
+            # print(f'{stock}计算完成!')
+
+    except Exception as e:
+        logging.error(f'子进程{os.getpid()}问题在这里~~ error:{str(e)}')
+        traceback.print_exc()
+
+    engine.dispose()
+    engine_tech.dispose()
+    engine_tech2.dispose()
+
+
+# 分割列表
+def split_list(lst, num_parts):
+    avg = len(lst) // num_parts
+    rem = len(lst) % num_parts
+
+    partitions = []
+    start = 0
+    for i in range(num_parts):
+        end = start + avg + (1 if i < rem else 0)
+        partitions.append(lst[start:end])
+        start = end
+
+    return partitions
+
+
+# 多进程实现技术指标计算
+def ind():
+    # 记录开始时间
+    start_time = dt.now()
+    fre = '1d'
+    num_cpus = mp.cpu_count()
+    print(f"{socket.gethostname()}共有{num_cpus}个核心\n{start_time.strftime('%Y-%m-%d %H:%M:%S')}开始计算{fre}技术指标")
+    # 连接数据库 获取股票列表
+    conn_engine_hlfx_pool = create_engine(
+        'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+    con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
+
+    # stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    stocks = pd.read_sql_query(
+        text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
+    con_engine_hlfx_pool.close()
+    print(f'股票列表长度为{len(stocks)}')
+    err_list, hlfx_pool, hlfx_pool_daily = mp.Manager().list(), mp.Manager().list(), mp.Manager().list()  # 定义共享列表
+
+    # 多进程执行tech_anal方法
+    pool = mp.Pool(processes=num_cpus)
+    # 保存AsyncResult对象的列表
+    async_results = []
+    for stock in stocks:
+        async_result = pool.apply_async(tech_anal, args=(stock, fre, hlfx_pool, hlfx_pool_daily, err_list,),
+                                        error_callback=err_call_back)
+        async_results.append(async_result)
+    pool.close()
+    pool.join()
+
+    # 统计返回为 None 的结果数量
+    none_count = 0
+    for i, result_async in enumerate(async_results):
+        result = result_async.get()  # 获取任务的结果
+        # print(f"The result of task {i} is: {result}")
+        if result is None:
+            none_count += 1
+
+    print(
+        f"共计算{none_count}/{i+1},\n当日信号:{len(hlfx_pool_daily)},\n持续检测为:{len(hlfx_pool)}, \n错误列表:{err_list}")
+    '''
+    # 将list转换为字符串
+    results_list = ','.join(set(hlfx_pool))
+    results_list_daily = ','.join(set(hlfx_pool_daily))
+
+    # 建立数据库连接
+
+    db_pool = pymysql.connect(host='localhost',
+                              user='root',
+                              port=3307,
+                              password='r6kEwqWU9!v3',
+                              database='hlfx_pool')
+
+    db_pool2 = pymysql.connect(host='localhost',
+                               user='root',
+                               port=3308,
+                               password='r6kEwqWU9!v3',
+                               database='hlfx_pool')
+
+    # 将list插入数据库
+    cursor = db_pool.cursor()
+    cursor2 = db_pool2.cursor()
+    sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+    sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
+                                                                    results_list_daily)
+    try:
+        cursor.execute(sql)
+        cursor.execute(sql2)
+        cursor2.execute(sql)
+        cursor2.execute(sql2)
+        db_pool.commit()
+        db_pool2.commit()
+    except Exception as e:
+        print(f'1d存入有问题', e)
+        # db_pool.rollback()
+    finally:
+        cursor.close()
+        db_pool.close()
+        cursor2.close()
+        db_pool2.close()
+    '''
+
+    # 记录结束时间
+    end_time = dt.now()
+    print(f"运行时间:{end_time - start_time}")
+
+
+if __name__ == '__main__':
+    logger = mp.log_to_stderr()
+    # logger.setLevel(logging.DEBUG)
+    freeze_support()
+    # 创建一个0-23的列表,用于设置cpu亲和度
+    cpu_list = list(range(24))
+    pus = psutil.Process()
+    pus.cpu_affinity(cpu_list)
+
+    ind()

+ 81 - 52
QMT/download_data_whole.py

@@ -4,20 +4,24 @@ import pandas as pd
 import math
 from sqlalchemy import create_engine, text
 import multiprocessing as mp
+from multiprocessing import freeze_support
 import os
 from apscheduler.schedulers.blocking import BlockingScheduler
 import traceback
 import psutil
 import pymysql
 
+pd.set_option('display.max_columns', None)  # 设置显示最大行
 
-pd.set_option('display.max_columns', None) # 设置显示最大行
-
-
-path = 'C:\\qmt\\userdata_mini'
+# path = 'C:\\qmt\\userdata_mini'
+path = '\\DANIEL-NUC\\qmt\\userdata_mini'
 
 field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
-cpu_count = mp.cpu_count()
+# 创建共享计数器
+count = mp.Value('i', 0)
+
+eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',)
+eng_w2 = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8',)
 
 
 def err_call_back(err):
@@ -25,77 +29,102 @@ def err_call_back(err):
     traceback.print_exc()
 
 
-def to_sql(stock_list):
-    print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}')
-    m = 0
-    for stock in stock_list:
-        eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8',
-                              pool_recycle=3600, pool_pre_ping=True, pool_size=1)
-        # 后复权数据
-        data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
-        df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
-                                                                 'amount']], axis=1)
-        df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
-        df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
-        df_back.reset_index(drop=True, inplace=True)
-
-        # 前复权数据
-        data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
-        df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
-                                                                   'amount']], axis=1)
-        df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
-                            'amount_front']
-        df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
-        df = pd.merge_asof(df_back, df_front, 'time')
-        # print(df)
-        try:
-            # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
-            df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
-        except BaseException as e:
-            print(stock, e)
-            pass
-        else:
-            m += 1
-
+def to_sql(stock):
+    global eng_w, eng_w2
+
+    # 后复权数据
+    data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
+    df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
+                                                             'amount']], axis=1)
+    df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
+    df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+    df_back.reset_index(drop=True, inplace=True)
+
+    # 前复权数据
+    data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
+    df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
+                                                               'amount']], axis=1)
+    df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
+                        'amount_front']
+    df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+    df = pd.merge_asof(df_back, df_front, 'time')
+    # print(df)
+    try:
+        # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
+        df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
+        df.to_sql('%s_1d' % stock, con=eng_w2, index=False, if_exists='replace', chunksize=20000)
+        with count.get_lock():
+            count.value += 1
+    except BaseException as e:
+        print(stock, e)
+        pass
+    finally:
         eng_w.dispose()
-    print(f'Pid:{os.getpid()}已经完工了.应入库{len(stock_list)},共入库{m}支个股')
+        eng_w2.dispose()
 
 
 def download_data():
+    global count
     stock_list = xtdata.get_stock_list_in_sector('沪深A股')
-    stock_list.sort()
+    '''
+    # 连接数据库 获取股票列表
+    conn_engine_hlfx_pool = create_engine(
+        'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+
+    con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
+
+    stock_list = pd.read_sql_query(
+        text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
+    '''
+
     results_list = ','.join(set(stock_list))
-    print(type(results_list))
+    print(f'今日个股列表为{len(stock_list)}')
     db_pool = pymysql.connect(host='localhost',
                               user='root',
-                              port=3308,
+                              port=3307,
                               password='r6kEwqWU9!v3',
                               database='hlfx_pool')
+    db_pool2 = pymysql.connect(host='localhost',
+                               user='root',
+                               port=3308,
+                               password='r6kEwqWU9!v3',
+                               database='hlfx_pool')
     cursor_pool = db_pool.cursor()
-    sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % ('stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+    cursor_pool2 = db_pool2.cursor()
+    sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % (
+        'stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
     cursor_pool.execute(sql)
+    cursor_pool2.execute(sql)
     db_pool.commit()
+    db_pool2.commit()
+
+    print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!')
+    xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
+    print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!')
 
-    print(dt.now(), '开始下载!')
-    # xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
-    print(dt.now(), '下载完成,准备入库!')
-    step = math.ceil(len(stock_list) / mp.cpu_count())
+    async_results = []
     pool = mp.Pool(processes=mp.cpu_count())
-    # pool = mp.Pool(processes=8)
-    # step = math.ceil(len(stock_list) / 8)
-    for i in range(0, len(stock_list), step):
-        pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
+    for stock in stock_list:
+        async_result = pool.apply_async(func=to_sql, args=(stock, ), error_callback=err_call_back)
+        async_results.append(async_result)
     pool.close()
     pool.join()
 
-    print(f'今日数据下载完毕 {dt.now()}')
+    # 统计返回为 None 的结果数量
+    none_count = 0
+    for i, result_async in enumerate(async_results):
+        _ = result_async.get()  # 获取任务的结果
+        if _ is None:
+            none_count += 1
+
+    print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!")
 
 
 if __name__ == '__main__':
+    freeze_support()
     field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
     cpu_count = mp.cpu_count()
     pus = psutil.Process()
-    # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19])
 
     download_data()
 

+ 9 - 4
QMT/qmt_get_indicators.py

@@ -224,15 +224,19 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
     try:
         print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},父进程是{os.getppid()},池子长度为{len(stocks)}')
         m = 0
+        engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
+                               pool_recycle=3600, pool_pre_ping=True, pool_size=1000)
+        engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
+                                    pool_recycle=3600, pool_pre_ping=True, pool_size=1000)
 
         for stock in stocks:
-            engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
-            engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+            engine.dispose()
+            engine_tech.dispose()
+
             # print(stock)
             try:
                 df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
                 df.dropna(axis=0, how='any')
-                engine.dispose()
             except BaseException:
                 print(f'{stock}读取有问题')
                 traceback.print_exc()
@@ -275,6 +279,7 @@ def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
                 elif stock not in hlfx_pool and T_signals == 1:
                     hlfx_pool.append(stock)
                     hlfx_pool_daily.append(stock)
+
     except Exception as e:
         logging.exception("子进程异常:", os.getpid(), e)
 
@@ -305,7 +310,7 @@ def ind():
     # mp.log_to_stderr()
 
     sttime = dt.now()
-    num_cpus = mp.cpu_count()
+    num_cpus = int(mp.cpu_count()/2)
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
 
     # stocks = xtdata.get_stock_list_in_sector('沪深A股')

+ 1 - 1
QMT/qmt_real_hlfx.py

@@ -358,7 +358,7 @@ if __name__ == '__main__':
     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
     # job_func()
-
+    #
     scheduler = BlockingScheduler()
     scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='25',
                       timezone="Asia/Shanghai", max_instances=5)

+ 300 - 0
QMT/sell_real_time.py

@@ -0,0 +1,300 @@
+# coding:utf-8
+from datetime import datetime as dt
+import os
+import pandas as pd
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
+from xtquant import xtdata, xtconstant
+import time
+from sqlalchemy import create_engine, text
+from jqdatasdk import *
+import pymysql
+import multiprocessing as mp
+import math
+import psutil
+import datetime
+from apscheduler.schedulers.blocking import BlockingScheduler
+import sys
+
+# 指定客户端所在路径
+path = r'c:\\qmt\\userdata_mini'
+# 创建资金账号为 800068 的证券账号对象
+acc = StockAccount('920000207040', 'SECURITY')
+# 生成session id 整数类型 同时运行的策略不能重复
+session_id = 123456
+xt_trader = None
+engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
+                             pool_size=5000, pool_recycle=50, max_overflow=-1)
+
+
+
+class MyXtQuantTraderCallback(XtQuantTraderCallback):
+    def on_disconnected(self):
+        """
+        连接断开
+        :return:
+        """
+        print(datetime.datetime.now(), '连接断开回调')
+
+    def on_stock_order(self, order):
+        """
+        委托回报推送
+        :param order: XtOrder对象
+        :return:
+        """
+        print(datetime.datetime.now(), '委托回调', order.order_remark)
+
+    def on_stock_trade(self, trade):
+        """
+        成交变动推送
+        :param trade: XtTrade对象
+        :return:
+        """
+        print(datetime.datetime.now(), '成交回调', trade.order_remark)
+
+    def on_order_error(self, order_error):
+        """
+        委托失败推送
+        :param order_error:XtOrderError 对象
+        :return:
+        """
+        # print("on order_error callback")
+        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
+        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
+
+    def on_cancel_error(self, cancel_error):
+        """
+        撤单失败推送
+        :param cancel_error: XtCancelError 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_order_stock_async_response(self, response):
+        """
+        异步下单回报推送
+        :param response: XtOrderResponse 对象
+        :return:
+        """
+        print(f"异步委托回调 {response.order_remark}")
+
+    def on_cancel_order_stock_async_response(self, response):
+        """
+        :param response: XtCancelOrderResponse 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_account_status(self, status):
+        """
+        :param response: XtAccountStatus 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+
+def run(seq, pid):
+    mor = datetime.datetime.strptime(
+        str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
+    afternoon = datetime.datetime.strptime(
+        str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
+    mor_1 = datetime.datetime.strptime(
+        str(dt.now().date()) + '12:59', '%Y-%m-%d%H:%M')
+    """阻塞线程接收行情回调"""
+    import time
+    client = xtdata.get_client()
+    while True:
+        time.sleep(3)
+        now_date = dt.now()
+        if not client.is_connected():
+            xtdata.unsubscribe_quote(seq)
+            raise Exception('行情服务连接断开')
+        # if mor < dt.now() < mor_1:
+        #     xtdata.unsubscribe_quote(seq)
+        #     print(f'现在时间:{dt.now()},已休市')
+        #     sys.exit()
+        #     break
+        #     return 0
+        elif dt.now() > afternoon:
+            xtdata.unsubscribe_quote(seq)
+            print(f'现在时间:{dt.now()},已收盘')
+            sys.exit()
+            break
+            # return 0
+    return
+
+
+def get_fundamentals(results):
+    return results
+    pass
+
+
+def ma(stock, num, data):
+    global engine_stock
+    try:
+        i = (num - 1) * -1
+        df = pd.read_sql_query(text(
+            'select close_front from `%s_1d`' % stock), engine_stock.connect())
+    except BaseException as e:
+        print(e)
+        return 9999999
+    else:
+        ma_num = (sum(df['close_front'][i:]) + data[stock]['lastPrice']) / num
+        return ma_num
+
+
+def ma_1(stock, num):
+    global engine_stock
+    i = num * -1
+    try:
+        df = pd.read_sql_query(text(
+            'select close_front from `%s_1d`' % stock), engine_stock.connect())
+    except BaseException as e:
+        print(e)
+        return 9999999
+    else:
+        ma_num_1 = df['close_front'][i:].mean()
+        return ma_num_1
+
+
+def his_vol(stock, num):
+    global engine_stock
+    num = num * -1
+    try:
+        df = pd.read_sql_query(text(
+            'select volume_front from `%s_1d`' % stock), engine_stock.connect())
+    except BaseException:
+        return 9999999
+    else:
+        return df['volume_front'].iloc[num]
+
+
+def ma_judge(data, list_judge, rate, results):
+    # print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
+    for stock in list_judge:
+        current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
+        MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30,
+                                                                                                              data), \
+            ma(stock, 60, data), ma(stock, 120, data)
+        MA5_1 = ma_1(stock, 5)
+        # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
+        # 入交易池标准:阳线\大于MA5\MA5向上\MA20<MA10\离120线有距离
+        if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.05) \
+                & (current_price > MA120 or current_price < MA120 * rate):
+            if his_vol(stock, -1) > his_vol(stock, -2):
+                results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
+
+
+def sell_trader(data):
+    # print('卖出函数:', dt.now())
+    positions = xt_trader.query_stock_positions(acc)
+    positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
+    print(
+        f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume != 0])}')
+
+    for stock, can_use_volume in positions_dict.items():
+        # if stock in data and can_use_volume != 0:
+        if stock in data:
+            current_price = data[stock]['lastPrice']
+            open_price = data[stock]['open']
+            MA5 = ma(stock, 5, data)
+            MA5_1 = ma_1(stock, 5)
+            print(
+                f"{data[stock]['time']}, {stock}\n当前时间为:{dt.now().strftime('%Y-%m-%d %H:%M:%S')},"
+                f"信号时间:{dt.fromtimestamp((data[stock]['time']) / 1000.0)}\n"
+                f"持仓量为{can_use_volume}当前价:{current_price},开盘价:{open_price},"f"MA5:{MA5},昨日MA5:{MA5_1},开始判断:")
+            df = pd.read_sql_query(text(
+                'select close_front, high_front from `%s_1d`' % stock), engine_stock.connect())
+
+            if current_price == xtdata.get_instrument_detail(stock).get('UpStopPrice') \
+                    or (df['close_front'].iloc[-1] == df['high_front'].iloc[-1]
+                        and df['close_front'].iloc[-1] / df['close_front'].iloc[-2] > 1.08):
+                print(f"{stock}涨停或昨日涨幅超过8%,持股观察!{data[stock]['time']}")
+                continue
+            elif current_price < MA5 or MA5 < MA5_1:
+                print('卖出信号!!!!!!', stock, current_price)
+                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                                 xtconstant.LATEST_PRICE, 0, 'MA5策略', '低于MA5趋势向下')
+                print('价格:', current_price, open_price, MA5, MA5_1, '低于MA5趋势向下')
+                print(order_id, stock, can_use_volume)
+            elif current_price > MA5 * 1.07:
+                print('盈利乖离率超7%!!!!!!', stock, current_price)
+                order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
+                                                 xtconstant.LATEST_PRICE, 0, 'MA5策略', '盈利乖离率超7%')
+                print('价格:', current_price, open_price, MA5, MA5_1, '盈利乖离率超7%')
+                print(order_id, stock, can_use_volume)
+        else:
+            # print(f'本轮没有持仓股票信息!')
+            pass
+        engine_stock.dispose()
+
+
+def bridge():
+    global session_id, xt_trader
+    pid = os.getpid()
+    connect_result = -1
+    subscribe_result = -1
+
+    while True:
+        if connect_result != 0 or subscribe_result != 0:
+            session_id = int(time.time())
+            xt_trader = XtQuantTrader(path, session_id)
+            # 创建交易回调类对象,并声明接收回调
+            callback = MyXtQuantTraderCallback()
+            xt_trader.register_callback(callback)
+            # 启动交易线程
+            xt_trader.start()
+            # 建立交易连接,返回0表示连接成功
+            connect_result = xt_trader.connect()
+            print('建立交易连接,返回0表示连接成功', connect_result)
+            # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+            subscribe_result = xt_trader.subscribe(acc)
+            print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+            # 建立交易连接,返回0表示连接成功
+            connect_result = xt_trader.connect()
+            print('建立交易连接,返回0表示连接成功', connect_result)
+            # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+            subscribe_result = xt_trader.subscribe(acc)
+            print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+            time.sleep(3)
+        else:
+            break
+    print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了,session_id = {session_id}, \n')
+    positions = xt_trader.query_stock_positions(acc)
+    positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
+    print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    seq_s = xtdata.subscribe_whole_quote(stocks, callback=sell_trader)
+    # seq_b = xtdata.subscribe_whole_quote(stocks, callback=buy_trader)
+    run(seq_s, pid)
+    # run(seq_b, pid)
+
+
+def job_func():
+    print(f"Job started at {dt.now()}")
+    # 创建子进程
+    p = mp.Process(target=bridge)
+    # 启动子进程
+    p.start()
+    # 等待子进程结束
+    p.join()
+    print(f"Job finished at {dt.now()}")
+
+
+if __name__ == '__main__':
+    mp.freeze_support()
+    # print('cpu_count =', mp.cpu_count())
+    pus = psutil.Process()
+    pus.cpu_affinity([16, 17, 18, 19])
+
+    # job_func()
+
+    scheduler = BlockingScheduler()
+    scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='30',
+                      timezone="Asia/Shanghai", max_instances=5)
+    # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='12', minute='35',
+    #                   timezone="Asia/Shanghai")
+    try:
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        pass

+ 380 - 0
backtrader/230723 _bt.py

@@ -0,0 +1,380 @@
+import os
+import traceback
+import numpy as np
+from sqlalchemy import create_engine
+import pandas as pd
+import pymysql
+import backtrader as bt
+import backtrader.indicators as btind
+import datetime
+import math
+from datetime import datetime as dt
+import multiprocessing as mp
+from multiprocessing import Pool, Lock, Value
+from backtrader.feeds import PandasData
+import platform
+import psutil
+import logging
+
+
+
+engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
+                       max_overflow=-1)
+lock = Lock()
+counter = Value('i', 0)
+
+class MyPandasData(PandasData):
+    lines = ('hl', 'dif', 'dea', 'macd', 'rsi_6', 'rsi_12', 'rsi_24',)
+    params = (('hl', 7),
+              ('dif', 8),
+              ('dea', 9),
+              ('macd', 10),
+              ('rsi_6', 11),
+              ('rsi_12', 12),
+              ('rsi_24', 13),
+              )
+    '''
+    lines = ('change_pct', 'net_amount_main', 'net_pct_main', 'net_amount_xl', 'net_pct_xl', 'net_amount_l', 'net_pct_l'
+             , 'net_amount_m', 'net_pct_m', 'net_amount_s', 'net_pct_s',)
+    params = (('change_pct', 7),
+              ('net_amount_main', 8),
+              ('net_pct_main', 9),
+              ('net_amount_xl', 10),
+              ('net_pct_xl', 11),
+              ('net_amount_l', 12),
+              ('net_pct_l', 13),
+              ('net_amount_m', 14),
+              ('net_pct_m', 15),
+              ('net_amount_s', 16),
+              ('net_pct_s', 17),
+              )
+    '''
+
+
+class TestStrategy(bt.Strategy):
+    params = (
+        ("num", 3),
+        ('Volatility', 0),
+        ('rate', 3),  # 注意要有逗号!!
+    )
+
+    def log(self, txt, dt=None):
+        ''' Logging function for this strategy'''
+        dt = dt or self.datas[0].datetime.date(0)
+        # print('%s, %s' % (dt.isoformat(), txt))
+
+    def __init__(self):
+        # self.num = num
+        # self.Volatility = Volatility/100
+        # Keep a reference to the "close" line in the data[0] dataseries
+        self.pos_price = 0
+        self.dataclose = self.datas[0].close
+        self.dataopen = self.datas[0].open
+        self.high = self.datas[0].high
+        self.low = self.datas[0].low
+        self.volume = self.datas[0].volume
+        self.hl = self.datas[0].hl
+        self.dif = self.datas[0].dif
+        self.dea = self.datas[0].dea
+        self.macd = self.datas[0].macd
+        self.rsi_6 = self.datas[0].rsi_6
+        self.rsi_12 = self.datas[0].rsi_12
+        self.rsi_24 = self.datas[0].rsi_24
+        # self.change_pct = self.datas[0].change_pct
+        # self.net_amount_main = self.datas[0].net_amount_main
+        # self.net_pct_main = self.datas[0].net_pct_main
+        # self.net_amount_xl = self.datas[0].net_amount_xl
+        # self.net_pct_xl = self.datas[0].net_pct_xl
+        # self.net_amount_l = self.datas[0].net_amount_l
+        # self.net_pct_l = self.datas[0].net_pct_l
+        self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
+        self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
+        self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
+        self.sma60 = btind.MovingAverageSimple(self.datas[0].close, period=60)
+
+    def notify_order(self, order):
+        """
+        订单状态处理
+
+        Arguments:
+            order {object} -- 订单状态
+        """
+        if order.status in [order.Submitted, order.Accepted]:
+            # 如订单已被处理,则不用做任何事情
+            return
+
+        # 检查订单是否完成
+        if order.status in [order.Completed]:
+            if order.isbuy():
+                self.buyprice = order.executed.price
+                self.buycomm = order.executed.comm
+            self.bar_executed = len(self)
+
+        # 订单因为缺少资金之类的原因被拒绝执行
+        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
+            pass
+            # self.log('Order Canceled/Margin/Rejected')
+
+        # 订单状态处理完成,设为空
+        self.order = None
+
+    def notify_trade(self, trade):
+        """
+        交易成果
+
+        Arguments:
+            trade {object} -- 交易状态
+        """
+        if not trade.isclosed:
+            return
+
+        # 显示交易的毛利率和净利润
+        # self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm))
+
+    def next(self):
+        # print(self.num,self.Volatility)
+        # Simply log the closing price of the series from the reference
+        # self.sma20[-2] < self.sma20[-1] < self.sma20[0] and self.sma10[-2] < self.sma10[-1] < self.sma10[0]
+        # and (self.sma5[-1] < self.sma10[-1])
+        # and (self.net_pct_l[0] > 10) and (self.net_pct_xl[0] > 3)  \
+        # and (self.net_amount_main[-1] > 0) and (self.net_amount_main[0] > 0)
+
+        if len(self) > self.params.num:
+            vola = self.params.Volatility / 100
+            rate = self.params.rate / 100
+            lowest = np.min(self.low.get(size=self.params.num))
+            highest = np.max(self.high.get(size=self.params.num))
+
+            # > self.sma5[-1]
+            # and (((lowest * (1 - vola)) < self.low[-2] < (lowest * (1 + vola))) or (
+            #         (lowest * (1 - vola)) < self.low[-1] < (lowest * (1 + vola)))) \
+            if self.hl[-1] == 2 or self.hl[-1] == 1:
+                m = -2
+                self.order = self.buy()
+                self.pos_price = self.low[-1]
+
+                while True:
+                    if (self.hl[m] == 2 or self.hl[m] == 1) and self.macd[m] > self.macd[-1] \
+                            and self.dataclose[0] > self.sma5[0] \
+                            and self.dataclose[-1] > self.dataopen[-1] \
+                            and (self.sma10[-2] - self.sma5[-2]) < (self.sma10[-1] - self.sma5[-1]) \
+                            and self.low[-2] < self.sma5[-2] * (1 - rate) \
+                            and self.sma5[-1] < self.sma10[-1] < self.sma20[-1] < self.sma20[-2] < self.sma20[-3] \
+                            and lowest * (1 - vola) < self.low[-1] < lowest * (1 + vola):
+                        self.order = self.buy()
+                        self.pos_price = self.low[-1]
+                        break
+                    m -= 1
+                    if m + len(self) == 2:
+                        break
+
+            # elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]):
+            elif self.dataclose[0] < self.sma5[0] or self.sma5[0] < self.sma5[-1] \
+                    or self.dataclose[0] < self.pos_price or self.high[0] > self.sma5[0] * (1 + vola):
+                self.order = self.close()
+                self.pos_price = 0
+
+    def stop(self):
+        # pass
+        self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
+
+
+def err_call_back(err):
+    print(f'出错啦~ error:{str(err)}')
+    traceback.format_exc(err)
+
+
+def to_df(lt):
+    print('开始存数据')
+    df = pd.DataFrame(list(lt),
+                      columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
+                               '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比'])
+    df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
+    df = df.reset_index(drop=True)
+    if platform.node() == 'DanieldeMBP.lan':
+        df.to_csv(f"/Users/daniel/Documents/策略/策略穷举-均线粘连后底分型{dt.now().strftime('%Y%m%d%H%m%S')}.csv",
+                  index=True,
+                  encoding='utf_8_sig', mode='w')
+    else:
+        df.to_csv(f"C:\Daniel\策略\策略穷举底分型_均线缠绕_只买一次{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True,
+                  encoding='utf_8_sig', mode='w')
+    print(f'结果:, \n, {df}')
+
+
+def backtrader(table_list, stock, result, result_change, result_change_fall, num, Volatility, rate, err_list):
+    global engine, counter, lock
+    conn = engine.connect()
+    stk_df = pd.read_sql_table(stock, conn)
+    stk_df.time = pd.to_datetime(stk_df.time)
+    # stk_df = stk_df[stk_df['HL'] != '-']
+    try:
+        stk_df['HL'] = stk_df['HL'].map({'L': 1,
+                                         'LL': 2,
+                                         'L*': 3,
+                                         'H': 4,
+                                         'HH': 5,
+                                         'H*': 6,
+                                         '-': 7})
+    except BaseException:
+        print(f'{stock}数据不全,不做测试')
+    else:
+        conn.close()
+        if len(stk_df) > 60:
+            cerebro = bt.Cerebro()
+            cerebro.addstrategy(TestStrategy, num=num, Volatility=Volatility, rate=rate)
+            cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
+            data = MyPandasData(dataname=stk_df,
+                                fromdate=datetime.datetime(2017, 1, 1),
+                                todate=datetime.datetime(2022, 10, 30),
+                                datetime='time',
+                                open='open_back',
+                                close='close_back',
+                                high='high_back',
+                                low='low_back',
+                                volume='volume_back',
+                                hl='HL',
+                                dif='dif',
+                                dea='dea',
+                                macd='macd',
+                                rsi_6='rsi_6',
+                                rsi_12='rsi_12',
+                                rsi_24='rsi_24',
+                                )
+            # print('取值完成')
+            cerebro.adddata(data, name=stock)
+            cerebro.broker.setcash(100000.0)
+            cerebro.broker.setcommission(0.005)
+            cerebro.addanalyzer(bt.analyzers.PyFolio)
+            # 策略执行前的资金
+            # print('启动资金: %.2f' % cerebro.broker.getvalue())
+            try:
+                # 策略执行
+                cerebro.run()
+            except IndexError as e:
+                err_list.append(stock)
+                # print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}的{stock}错误')
+                # print(e)
+            else:
+                if cerebro.broker.getvalue() > 100000.0:
+                    result_change.append(cerebro.broker.getvalue() - 100000)
+                    result.append(stock)
+                    # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
+                    # print(result)
+                elif cerebro.broker.getvalue() <= 100000.0:
+                    result_change_fall.append(cerebro.broker.getvalue() - 100000)
+                    # print('aaaaaaaaaaa')
+                    # print(result_change_fall)
+                # print('最终资金: %.2f' % cerebro.broker.getvalue())
+            finally:
+                with lock:
+                    counter.value += 1
+                logging.info('执行完成:(%d / %d) 进程号: %d --------------- %s', counter.value, len(table_list), os.getpid(), stock)
+
+        # print(f'已计算{counter.value}/{len(table_list)}只股票')
+    # print(f'已计算{(len(result) + len(result_change_fall)+len(err_list))}/{len(table_list)}只股票')
+    '''
+        if len(result) * len(result_change) * len(result_change_fall) != 0:
+        print(f'以{num}内最低值波动{Volatility}为支撑、MA5斜率为{rate}%,结果状态为:')
+        print('正盈利的个股为:', len(result), '成功率为:', len(result) / len(table_list))
+        print(
+            f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change) / len(result)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
+        print(
+            f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall) / len(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
+
+        # '周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比']
+        list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
+                          np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
+                          np.nansum(result_change_fall), np.nanmean(result_change_fall),
+                          np.nanmin(result_change_fall), np.nanmax(result_change_fall),
+                          len(result_change) / len(result_change_fall)])
+        # to_df(list_date)
+        endtime = dt.now()
+        print(f'{num}天波动率为{Volatility}%MA5斜率为{rate},myPID is {os.getpid()}.本轮耗时为{endtime - sttime}')
+    else:
+        print('阿欧', len(result), len(result_change), len(result_change_fall), num, Volatility, rate, err_list)
+        list_date.append([num, Volatility, rate, 0, len(result) / len(table_list), len(result),
+                          len(result), len(result), len(result), len(result), len(result), len(result), 0])
+    '''
+    # list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
+    #                   np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
+    #                   np.nansum(result_change_fall), np.nanmean(result_change_fall),
+    #                   np.nanmin(result_change_fall), np.nanmax(result_change_fall),
+    #                   len(result_change) / len(result_change_fall)])
+    # cerebro.plot()
+
+
+# df = pd.DataFrame(
+#     columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损',
+#              '平均亏损', '最大亏损', '最小亏损'])
+if __name__ == '__main__':
+    logger = mp.log_to_stderr()
+    logger.setLevel(logging.INFO)
+    starttime = dt.now()
+    print(starttime)
+    pus = psutil.Process()
+
+    fre = '1d'
+    db = pymysql.connect(host='localhost',
+                         user='root',
+                         port=3307,
+                         password='r6kEwqWU9!v3',
+                         database='qmt_stocks_tech')
+    cursor = db.cursor()
+    cursor.execute("show tables like '%%%s%%' " % fre)
+    table_list = [tuple[0] for tuple in cursor.fetchall()]
+    # print(table_list)
+    # table_list = table_list[0:500]
+    print(f'计算个股数为:{len(table_list)}')
+
+    list_date = []
+    thread_list = []
+    pool = mp.Pool(processes=mp.cpu_count())
+    # pool = mp.Pool(processes=8)
+    for num in range(60, 80, 20):
+        for Volatility in range(7, 8, 1):
+            for rate in range(3, 4, 1):
+                # step = math.ceil(len(table_list) / mp.cpu_count())
+                result = mp.Manager().list()
+                result_change = mp.Manager().list()
+                result_change_fall = mp.Manager().list()
+                err_list = mp.Manager().list()
+                print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}')
+                # for i in range(0, len(table_list), step):
+                stattime = dt.now()
+                # 保存AsyncResult对象的列表
+                async_results = []
+                # thd = threading.local()
+                # print(i)
+                # p = mp.Process(target=backtrader, args=(df, table_list, result, result_change, result_change_fall,
+                #                                         num, Volatility, rate, err_list))
+                # thread_list.append(p)
+                for stock in table_list:
+                    async_result = pool.apply_async(func=backtrader,
+                                                    args=(table_list, stock, result, result_change, result_change_fall,
+                                                          num, Volatility, rate, err_list,),
+                                                    error_callback=err_call_back)
+                    async_results.append(async_result)
+                # p.start()
+                pool.close()
+                pool.join()
+
+                # 统计返回为 None 的结果数量
+                none_count = 0
+                for i, result_async in enumerate(async_results):
+                    _ = result_async.get()  # 获取任务的结果
+                    if _ is None:
+                        none_count += 1
+                print(f'计算总数={len(result) + len(result_change_fall)}\n计数为:{none_count}')
+                list_date.append(
+                    [num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
+                     np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
+                     np.nansum(result_change_fall), np.nanmean(result_change_fall),
+                     np.nanmin(result_change_fall), np.nanmax(result_change_fall),
+                     len(result_change) / len(result_change_fall)])
+                print(list_date)
+
+    # to_df(list_date)
+
+    edtime = dt.now()
+    print('总耗时:', edtime - starttime)
+    # df.to_csv(r'C:\Users\Daniel\Documents\策略穷举2.csv', index=True)