123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- # coding:utf-8
- import time
- from multiprocessing import freeze_support, Value, Lock
- import backtrader as bt
- from backtrader.feeds import PandasData
- import backtrader.indicators as btind
- from sqlalchemy import create_engine, text
- import pymysql
- from tqdm import tqdm
- import concurrent.futures
- import numpy as np
- import pandas as pd
- import platform
- import datetime
- from datetime import datetime as dt
- from itertools import product
- import psutil
- import logging
- import multiprocessing as mp
- from itertools import islice
- from func_timeout import func_set_timeout, FunctionTimedOut
- from functools import partial
- class MyPandasData(PandasData):
- lines = ('hl', 'dif', 'dea', 'macd', 'rsi_6', 'rsi_12', 'rsi_24',)
- params = (('hl', 7),
- ('dif', 8),
- ('dea', 9),
- ('macd', 10),
- ('rsi_6', 11),
- ('rsi_12', 12),
- ('rsi_24', 13),
- )
- class TestStrategy(bt.Strategy):
- params = (
- ("num", 3),
- ('Volatility', 0),
- ('rate', 3), # 注意要有逗号!!
- )
- def log(self, txt, dt=None):
- # 记录策略的执行日志
- dt = dt or self.datas[0].datetime.date(0)
- # print('%s, %s' % (dt.isoformat(), txt))
- def __init__(self):
- try:
- self.pos_price = 0
- self.dataclose = self.datas[0].close
- self.dataopen = self.datas[0].open
- self.high = self.datas[0].high
- self.low = self.datas[0].low
- self.volume = self.datas[0].volume
- self.hl = self.datas[0].hl
- self.dif = self.datas[0].dif
- self.dea = self.datas[0].dea
- self.macd = self.datas[0].macd
- self.rsi_6 = self.datas[0].rsi_6
- self.rsi_12 = self.datas[0].rsi_12
- self.rsi_24 = self.datas[0].rsi_24
- self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
- self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
- self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
- self.sma60 = btind.MovingAverageSimple(self.datas[0].close, period=60)
- except BaseException as e:
- print(f'初始化错误{e}')
- def notify_order(self, order):
- """
- 订单状态处理
- Arguments:
- order {object} -- 订单状态
- """
- if order.status in [order.Submitted, order.Accepted]:
- # 如订单已被处理,则不用做任何事情
- return
- # 检查订单是否完成
- if order.status in [order.Completed]:
- if order.isbuy():
- self.buyprice = order.executed.price
- self.buycomm = order.executed.comm
- self.bar_executed = len(self)
- # 订单因为缺少资金之类的原因被拒绝执行
- elif order.status in [order.Canceled, order.Margin, order.Rejected]:
- pass
- # self.log('Order Canceled/Margin/Rejected')
- # 订单状态处理完成,设为空
- self.order = None
- def notify_trade(self, trade):
- """
- 交易成果
- Arguments:
- trade {object} -- 交易状态
- """
- if not trade.isclosed:
- return
- # 显示交易的毛利率和净利润
- # self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm))
- def next(self):
- if len(self) > self.params.num:
- vola = self.params.Volatility / 100
- rate = self.params.rate / 100
- lowest = np.min(self.low.get(size=self.params.num))
- highest = np.max(self.high.get(size=self.params.num))
- if self.hl[-1] == 2 or self.hl[-1] == 1:
- m = -2
- # self.order = self.buy()
- # self.pos_price = self.low[-1]
- while True:
- if (self.hl[m] == 2 or self.hl[m] == 1) and self.macd[m] > self.macd[-1] \
- and self.dataclose[0] > self.sma5[0] \
- and self.dataclose[-1] > self.dataopen[-1] \
- and (self.sma10[-2] - self.sma5[-2]) < (self.sma10[-1] - self.sma5[-1]) \
- and self.low[-2] < self.sma5[-2] * (1 - rate) \
- and self.sma5[-1] < self.sma10[-1] < self.sma20[-1] < self.sma20[-2] < self.sma20[-3] \
- and lowest * (1 - vola) < self.low[-1] < lowest * (1 + vola):
- self.order = self.buy()
- self.pos_price = self.low[-1]
- break
- m -= 1
- if m + len(self) == 2:
- break
- # elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]):
- elif self.dataclose[0] < self.sma5[0] or self.sma5[0] < self.sma5[-1] \
- or self.dataclose[0] < self.pos_price or self.high[0] > self.sma5[0] * (1 + vola):
- self.order = self.close()
- self.pos_price = 0
- def stop(self):
- # pass
- self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
- def to_df(df):
- print('开始存数据')
- df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
- df = df.reset_index(drop=True)
- if platform.node() == 'DanieldeMBP.lan':
- df.to_csv(f"/Users/daniel/Documents/策略/策略穷举-均线粘连后底分型{dt.now().strftime('%Y%m%d%H%m%S')}.csv",
- index=True,
- encoding='utf_8_sig', mode='w')
- else:
- df.to_csv(f"C:\策略结果\策略穷举底分型_均线缠绕_只买一次{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True,
- encoding='utf_8_sig', mode='w')
- print(f'结果:, \n, {df}')
- def chunked_iterable(iterable, size):
- """将可迭代对象分割为指定大小的块"""
- it = iter(iterable)
- while True:
- chunk = tuple(islice(it, size))
- if not chunk:
- return
- yield chunk
- def query_database(table_name):
- engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
- df = pd.read_sql_table(table_name, engine)
- return df
- def get_stock_data():
- while True:
- try:
- db = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='qmt_stocks_tech')
- cursor = db.cursor()
- cursor.execute("show tables like '%%%s%%' " % '1d')
- table_list = [tuple[0] for tuple in cursor.fetchall()]
- # table_list = table_list[0: 10]
- cursor.close()
- db.close()
- print(f'开始数据库读取')
- with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
- # 使用executor.map方法实现多进程并行查询数据库,得到每个表的数据,并存储在一个字典中
- data_dict = {table_name: df for table_name, df in
- tqdm(zip(table_list, executor.map(query_database, table_list)))}
- print(f'数据库读取完成')
- break
- except BaseException as e:
- print(f'数据库读取错误{e}')
- continue
- return data_dict
- def backtrader_test(stock_data, stock_name, num, vot, rate):
- # print(f'开始回测{stock_name}')
- try:
- cerebro = bt.Cerebro()
- stock_data.time = pd.to_datetime(stock_data.time)
- stock_data['HL'] = stock_data['HL'].map({'L': 1,
- 'LL': 2,
- 'L*': 3,
- 'H': 4,
- 'HH': 5,
- 'H*': 6,
- '-': 7})
- cerebro.addstrategy(TestStrategy, num=num, Volatility=vot, rate=rate)
- data = MyPandasData(dataname=stock_data,
- fromdate=datetime.datetime(2017, 1, 1),
- todate=datetime.datetime(2022, 10, 30),
- datetime='time',
- open='open_back',
- close='close_back',
- high='high_back',
- low='low_back',
- volume='volume_back',
- hl='HL',
- dif='dif',
- dea='dea',
- macd='macd',
- rsi_6='rsi_6',
- rsi_12='rsi_12',
- rsi_24='rsi_24',
- )
- cerebro.adddata(data)
- cerebro.addstrategy(TestStrategy)
- cerebro.broker.setcash(100000.0)
- cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
- cerebro.broker.setcommission(commission=0.001)
- cerebro.run()
- except BaseException as e:
- print(f'{stock_name}回测错误{e}')
- return np.nan
- # print(cerebro.broker.getvalue() - 100000.0)
- # print(stock_name)
- else:
- return cerebro.broker.getvalue() - 100000.0
- def bbt(stock_data_dict, num, Volatility, rate, err_list):
- async_results = []
- try:
- # 设置每一轮的任务数
- CHUNK_SIZE = 200 # 您可以根据需要进行调整
- timeout = 120
- max_retries = 3
- with concurrent.futures.ProcessPoolExecutor(max_workers=16, max_tasks_per_child=20000) as inner_executor:
- # 使用executor.map方法实现多进程并行计算不同参数组合的结果
- results = [result for result in tqdm(
- inner_executor.map(backtrader_test, stock_data_dict.values(), stock_data_dict.keys(), [num] * len(stock_data_dict),
- [Volatility] * len(stock_data_dict), [rate] * len(stock_data_dict)), desc='单轮计算进度')]
- except BaseException as e:
- print(f'计算错误{e}')
- # print(f'{num},{Volatility},{rate}计算完成,共计算{len(async_results)}个股票')
- # outputs = [result.get() for result in async_results]
- print(f'{num},{Volatility},{rate}计算完成,共计算{len(results)}个股票')
- print(f'计算结果{results}')
- return results
- def tdf(tt, num, Volatility, rate):
- num_nan = np.isnan(tt).sum() # Count NaN values
- print(f'num_nan={num_nan}')
- filtered_result = [r for r in tt if not np.isnan(r)] # Filter out NaN values
- print(f'filtered_result={filtered_result}')
- # Calculate statistics
- num_profits = len([r for r in tt if r > 0])
- num_losses = len([r for r in tt if r < 0])
- profit_ratio = num_profits / (len(filtered_result))
- total_profit = sum([r for r in tt if r > 0])
- avg_profit = total_profit / num_profits if num_profits else 0
- max_profit = max(tt)
- min_profit = min([r for r in tt if r > 0]) if num_profits else 0
- total_loss = sum([r for r in tt if r < 0])
- avg_loss = total_loss / num_losses if num_losses else 0
- max_loss = min(tt)
- min_loss = max([r for r in tt if r < 0]) if num_losses else 0
- # Append the results into the DataFrame
- result_dict = {'周期': num, '波动率': Volatility, 'MA5斜率': rate, '盈利个数': num_profits,
- '盈利比例': profit_ratio, '总盈利': total_profit, '平均盈利': avg_profit,
- '最大盈利': max_profit, '最小盈利': min_profit, '总亏损': total_loss,
- '平均亏损': avg_loss, '最大亏损': max_loss, '最小亏损': min_loss, '未计算个股数': num_nan}
- df_t = pd.Series(result_dict)
- return df_t
- if __name__ == '__main__':
- logger = mp.log_to_stderr()
- logger.setLevel(logging.DEBUG)
- # cpu_list = list(range(23))
- # pus = psutil.Process()
- # pus.cpu_affinity(cpu_list)
- start_time = dt.now()
- # 定义需要穷举的参数值
- nums = range(60, 120, 20)
- Volatilitys = range(5, 13, 1)
- rates = range(3, 8, 1)
- # 生成所有参数组合
- all_combinations = list(product(nums, Volatilitys, rates))
- print(f'共需计算{len(all_combinations)}次')
- # 获取数据
- stock_data_dict = get_stock_data()
- results = []
- df = pd.DataFrame(
- columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利',
- '总亏损',
- '平均亏损', '最大亏损', '最小亏损', '未计算个股数'])
- err_list = []
- # 设置每一轮的任务数
- CHUNK_SIZE = 200 # 您可以根据需要进行调整
- timeout = 120
- max_retries = 3
- with concurrent.futures.ProcessPoolExecutor(max_workers=24) as inner_executor:
- for num, Volatility, rate in tqdm(all_combinations, desc='计算进度'):
- while True:
- try:
- # 使用executor.map方法实现多进程并行计算不同参数组合的结果
- res = [result for result in tqdm(
- inner_executor.map(backtrader_test, stock_data_dict.values(), stock_data_dict.keys(),
- [num] * len(stock_data_dict),
- [Volatility] * len(stock_data_dict), [rate] * len(stock_data_dict)),
- desc='单轮计算进度')]
- except BaseException as e:
- print(f'计算错误{e}')
- inner_executor = concurrent.futures.ProcessPoolExecutor(max_workers=16)
- else:
- results.append(res)
- df_t = tdf(res, num, Volatility, rate)
- df = pd.concat([df, df_t.to_frame().T], ignore_index=True)
- break
- time.sleep(1)
- print(f'{num},{Volatility},{rate}计算完成,共计算{len(res)}个股票')
- print(df)
- print('循环结束')
- to_df(df)
- print(f'计算完成,共耗时{dt.now() - start_time}秒')
|