from multiprocessing import freeze_support, Value, Lock import backtrader as bt from backtrader.feeds import PandasData import backtrader.indicators as btind from sqlalchemy import create_engine, text import pymysql from tqdm import tqdm import concurrent.futures import pandas as pd import matplotlib import datetime from datetime import datetime as dt from itertools import product import psutil import logging import multiprocessing as mp from itertools import islice class MyPandasData(PandasData): lines = ('hl', 'dif', 'dea', 'macd', 'rsi_6', 'rsi_12', 'rsi_24',) params = (('hl', 7), ('dif', 8), ('dea', 9), ('macd', 10), ('rsi_6', 11), ('rsi_12', 12), ('rsi_24', 13), ) ''' lines = ('change_pct', 'net_amount_main', 'net_pct_main', 'net_amount_xl', 'net_pct_xl', 'net_amount_l', 'net_pct_l' , 'net_amount_m', 'net_pct_m', 'net_amount_s', 'net_pct_s',) params = (('change_pct', 7), ('net_amount_main', 8), ('net_pct_main', 9), ('net_amount_xl', 10), ('net_pct_xl', 11), ('net_amount_l', 12), ('net_pct_l', 13), ('net_amount_m', 14), ('net_pct_m', 15), ('net_amount_s', 16), ('net_pct_s', 17), ) ''' class TestStrategy(bt.Strategy): def log(self, txt, dt=None): # 记录策略的执行日志 dt = dt or self.datas[0].datetime.date(0) # print('%s, %s' % (dt.isoformat(), txt)) def __init__(self): # 保存收盘价的引用 self.dataclose = self.datas[0].close def next(self): # 记录收盘价 self.log('Close, %.2f' % self.dataclose[0]) # 今天的收盘价 < 昨天收盘价 if self.dataclose[0] < self.dataclose[-1]: # 昨天收盘价 < 前天的收盘价 if self.dataclose[-1] < self.dataclose[-2]: # 买入 self.log('买入, %.2f' % self.dataclose[0]) self.buy() def t(): print('tttt') def chunked_iterable(iterable, size): """将可迭代对象分割为指定大小的块""" it = iter(iterable) while True: chunk = tuple(islice(it, size)) if not chunk: return yield chunk def query_database(table_name): engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8') df = pd.read_sql_table(table_name, engine) return df def get_stock_data(): while True: try: db = pymysql.connect(host='localhost', user='root', port=3307, password='r6kEwqWU9!v3', database='qmt_stocks_tech') cursor = db.cursor() cursor.execute("show tables like '%%%s%%' " % '1d') table_list = [tuple[0] for tuple in cursor.fetchall()] # table_list = table_list[0: 10] cursor.close() db.close() print(f'开始数据库读取') with concurrent.futures.ProcessPoolExecutor(max_workers=24) as executor: # 使用executor.map方法实现多进程并行查询数据库,得到每个表的数据,并存储在一个字典中 data_dict = {table_name: df for table_name, df in tqdm(zip(table_list, executor.map(query_database, table_list)))} print(f'数据库读取完成') break except BaseException as e: print(f'数据库读取错误{e}') continue return data_dict def backtrader_test(stock_data, stock_name, num, vot, rate): cerebro = bt.Cerebro() stock_data.time = pd.to_datetime(stock_data.time) stock_data['HL'] = stock_data['HL'].map({'L': 1, 'LL': 2, 'L*': 3, 'H': 4, 'HH': 5, 'H*': 6, '-': 7}) data = MyPandasData(dataname=stock_data, fromdate=datetime.datetime(2017, 1, 1), todate=datetime.datetime(2022, 10, 30), datetime='time', open='open_back', close='close_back', high='high_back', low='low_back', volume='volume_back', hl='HL', dif='dif', dea='dea', macd='macd', rsi_6='rsi_6', rsi_12='rsi_12', rsi_24='rsi_24', ) cerebro.adddata(data) cerebro.addstrategy(TestStrategy) cerebro.broker.setcash(100000.0) cerebro.addsizer(bt.sizers.FixedSize, stake=100) cerebro.broker.setcommission(commission=0.001) cerebro.run() return cerebro.broker.getvalue() - 100000.0 def bbt(stock_data_dict, num, Volatility, rate): # while True: # exception_flag = False async_results = [] try: # 设置每一轮的任务数 CHUNK_SIZE = 200 # 您可以根据需要进行调整 for chunk in tqdm(chunked_iterable(stock_data_dict.items(), CHUNK_SIZE)): print(f'chunk:{chunk[0][0]}-{chunk[-1][0]}') with mp.Pool(processes=min(CHUNK_SIZE, len(chunk), 24)) as pool: # 使用最小值确保不会超出任务数或超过24核心 for stock, df_stock in chunk: async_result = pool.apply_async(func=backtrader_test, args=(df_stock, stock, num, Volatility, rate)) async_results.append(async_result) pool.close() pool.join() # with concurrent.futures.ProcessPoolExecutor(max_workers=18) as inner_executor: # print(f'开始计算{num},{Volatility},{rate}') # # 使用executor.map方法实现多进程并行计算不同参数组合的结果 # results = [result for result in # inner_executor.map(backtrader_test, stock_data_dict.values(), stock_data_dict.keys(), # [num] * len(stock_data_dict), # [Volatility] * len(stock_data_dict), [rate] * len(stock_data_dict), # timeout=1200)] # except concurrent.futures.TimeoutError as e: # print(f'计算超时{e}') # results = [] # exception_flag = True except BaseException as e: print(f'计算错误{e}') results = True outputs = [result.get() for result in async_results] print(outputs) return outputs if __name__ == '__main__': logger = mp.log_to_stderr() logger.setLevel(logging.DEBUG) cpu_list = list(range(24)) pus = psutil.Process() pus.cpu_affinity(cpu_list) # 定义需要穷举的参数值 nums = range(60, 80, 20) Volatilitys = range(5, 6, 1) rates = range(3, 4, 1) # 生成所有参数组合 all_combinations = list(product(nums, Volatilitys, rates)) print(f'共需计算{len(all_combinations)}次') # 获取数据 stock_data_dict = get_stock_data() results = [] # 获取stock_data_dict的第1个value,即第1个DataFrame # stock_data = next(iter(stock_data_dict.values())) # print(stock_data) for num, Volatility, rate in tqdm(all_combinations, desc='计算进度'): result = bbt(stock_data_dict, num, Volatility, rate) results.append(result) print(results, len(results), len(results[0])) df = pd.DataFrame( columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损']) for tt in results: num_profits = len([r for r in tt if r > 0]) num_losses = len([r for r in tt if r < 0]) profit_ratio = num_profits / len(stock_data_dict) total_profit = sum([r for r in tt if r > 0]) avg_profit = total_profit / num_profits if num_profits else 0 max_profit = max(tt) min_profit = min([r for r in tt if r > 0]) if num_profits else 0 total_loss = sum([r for r in tt if r < 0]) avg_loss = total_loss / num_losses if num_losses else 0 max_loss = min(tt) min_loss = max([r for r in tt if r < 0]) if num_losses else 0 # Append the results into the DataFrame result_dict = {'周期': num, '波动率': Volatility, 'MA5斜率': rate, '盈利个数': num_profits, '盈利比例': profit_ratio, '总盈利': total_profit, '平均盈利': avg_profit, '最大盈利': max_profit, '最小盈利': min_profit, '总亏损': total_loss, '平均亏损': avg_loss, '最大亏损': max_loss, '最小亏损': min_loss} df_t = pd.Series(result_dict) print(df_t) df = pd.concat([df, df_t.to_frame().T], ignore_index=True) print(df) exit() num = 60 Volatility = 5 rate = 3 i = 0 st = dt.now() while True: i += 1 try: results = bbt(stock_data_dict, num, Volatility, rate) except BaseException as e: print(f'计算错误{e}') break print(results) if results is True: print(f'计算错误,重新计算') continue else: print(f'第{i}次计算完成,耗时{dt.now() - st}') print(f'计算结果为{len(results)}') print(results) print(f'全部计算完成,共{len(results)}次') exit() getvalue = backtrader_test(stock_data) if getvalue > 100000: print('盈利') else: print('亏损') # 绘制图像 # cerebro.plot()