|
- import os
- import numpy as np
- from sqlalchemy import create_engine
- import pandas as pd
- import pymysql
- import backtrader as bt
- import backtrader.indicators as btind
- import backtrader.analyzers as btanalyzers
- import datetime
- import math
- from datetime import datetime as dt
- import multiprocessing as mp
- from backtrader.feeds import PandasData
- from numba import jit, cuda, njit
- # import multiprocessing
- import matplotlib
- pd.set_option('display.max_columns', None) # 设置显示最大行
- # global result
- # result = pd.DataFrame(columns=['code', 'result', 'num', 'Volatility', 'rate'])
- class MyPandasData(PandasData):
- lines = ()
- params = ()
- '''
- lines = ('change_pct', 'net_amount_main', 'net_pct_main', 'net_amount_xl', 'net_pct_xl', 'net_amount_l', 'net_pct_l'
- , 'net_amount_m', 'net_pct_m', 'net_amount_s', 'net_pct_s',)
- params = (('change_pct', 7),
- ('net_amount_main', 8),
- ('net_pct_main', 9),
- ('net_amount_xl', 10),
- ('net_pct_xl', 11),
- ('net_amount_l', 12),
- ('net_pct_l', 13),
- ('net_amount_m', 14),
- ('net_pct_m', 15),
- ('net_amount_s', 16),
- ('net_pct_s', 17),
- )
- '''
- class TestStrategy(bt.Strategy):
- params = (
- ("num", 3),
- ('Volatility', 0),
- ('rate', 5), # 注意要有逗号!!
- )
- def log(self, txt, dt=None):
- ''' Logging function for this strategy'''
- dt = dt or self.datas[0].datetime.date(0)
- print('%s, %s' % (dt.isoformat(), txt))
- def notify_order(self, order):
- """
- 订单状态处理
- Arguments:
- order {object} -- 订单状态
- """
- if order.status in [order.Submitted, order.Accepted]:
- # 如订单已被处理,则不用做任何事情
- return
- # 检查订单是否完成
- if order.status in [order.Completed]:
- if order.isbuy():
- self.buyprice = order.executed.price
- self.buycomm = order.executed.comm
- self.bar_executed = len(self)
- # 订单因为缺少资金之类的原因被拒绝执行
- elif order.status in [order.Canceled, order.Margin, order.Rejected]:
- pass
- self.log('Order Canceled/Margin/Rejected')
- # 订单状态处理完成,设为空
- self.order = None
- def notify_trade(self, trade):
- """
- 交易成果
- Arguments:
- trade {object} -- 交易状态
- """
- if not trade.isclosed:
- return
- # 显示交易的毛利率和净利润
- # self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm))
- def __init__(self):
- # print('__init__', dt.now())
- # print(f'{self.params.num}天波动率为{self.params.Volatility}%乖离率为{self.params.rate}', 'myPID is ', os.getpid())
- self.dataclose = self.datas[0].close
- self.dataopen = self.datas[0].open
- self.high = self.datas[0].high
- self.low = self.datas[0].low
- self.volume = self.datas[0].volume
- # self.change_pct = self.datas[0].change_pct
- # self.net_amount_main = self.datas[0].net_amount_main
- # self.net_pct_main = self.datas[0].net_pct_main
- # self.net_amount_xl = self.datas[0].net_amount_xl
- # self.net_pct_xl = self.datas[0].net_pct_xl
- # self.net_amount_l = self.datas[0].net_amount_l
- # self.net_pct_l = self.datas[0].net_pct_l
- self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
- self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
- self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
- self.yx = self.dataclose > self.dataopen
- self.lowest = min(self.dataclose.get(ago=-1, size=self.params.num))
- self.highest = max(self.high.get(ago=-1, size=self.params.num))
- self.vola = self.params.Volatility / 100
- self.rate = self.params.rate / 100
- # print('初始化完成', dt.now())
- # @njit
- def next(self):
- '''
- if self.yx and (self.dataclose[0] > self.dataclose[-1] > self.dataclose[-2])\
- and self.sma5[0] > self.sma10[0] and self.sma5[-1] < self.sma10[-1]:
- print('next', self.yx[0], (self.dataclose[0] > self.dataclose[-1] > self.dataclose[-2]),
- self.sma5[0] > self.sma10[0], self.sma5[-1] < self.sma10[-1])
- :return:
- '''
- if self.yx and ((self.lowest * (1 - self.vola)) < self.low[-2] < (self.lowest * (1 + self.vola)))\
- and self.dataclose[0] > self.dataclose[-1] > self.dataclose[-2] and self.dataclose[0] > self.sma5[0]:
- # print(f'buy, {self.lowest},{self.vola},{self.low[-2]}, {self.rate}')
- # self.log('BUY CREATE, %.2f' % self.dataclose[0])
- self.order = self.buy()
- elif self.dataclose < self.sma5[0]:
- self.order = self.close()
- # self.log('close<ma5 Close, %.2f' % self.dataclose[0])
- elif self.sma5[0] < self.sma10[0]:
- self.order = self.close()
- # self.log('ma5<ma10 Close, %.2f' % self.dataclose[0])
- elif self.dataclose[0] > self.sma5[0]*(1+self.rate):
- self.order = self.close()
- # self.log('close>rate Close, %.2f' % self.dataclose[0])
- '''
- if self.yx \
- and (((self.lowest[0] * (1 - self.vola)) < self.low[-2] < (self.lowest[0] * (1 + self.vola))) or (
- (self.lowest[0] * (1 - self.vola)) < self.low[-1] < (self.lowest[0] * (1 + self.vola)))) \
- and (self.dataclose[0] > self.sma5[0]) and self.sma5[0] > self.sma5[-1] \
- and (not self.position) and (self.sma5[0] > self.sma10[0]):
- # self.log('BUY CREATE, %.2f' % self.dataclose[0])
- self.order = self.buy()
- elif self.dataclose < self.sma5[0] or self.sma5[0] < self.sma10[0] \
- or (self.dataclose[0] > (self.sma5[0] * (1 + self.rate))) or \
- (((self.highest[0] * (1 - self.vola)) < self.high[-2] < (self.highest[0] * (1 + self.vola))) or (
- (self.highest[0] * (1 - self.vola)) < self.high[-1] < (self.highest[0] * (1 + self.vola)))):
- self.order = self.close()
- # self.log('Close, %.2f' % self.dataclose[0])
- '''
- def stop(self):
- # pass
- global result
- self.log(u'(MA趋势交易效果) Ending Value %.2f num %d Vol %d rate %d' % (self.broker.getvalue(),
- self.params.num,
- self.params.Volatility,
- self.params.rate))
- # self.log(f'time:{dt.now()}')
- # temp = pd.DataFrame(columns=['code', 'result', 'num', 'Volatility', 'rate'],
- # data=[self.getdatanames(), self.broker.getvalue(), self.params.num, self.params.rate])
- # result = pd.concat([result,temp],axis=0)
- def err_call_back(err):
- print(f'出错啦~ error:{str(err)}')
- def to_df(lt):
- df = pd.DataFrame(list(lt), columns=['周期', '波动率', '乖离率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
- '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损'])
- df.sort_values(by=['周期', '波动率', '乖离率'], ascending=True, inplace=True)
- df = df.reset_index(drop=True)
- df.to_csv(r'D:\Daniel\策略\策略穷举.csv', index=True, encoding='utf-8', mode='w')
- print(df)
- # 打印结果
- def get_my_analyzer(result):
- analyzer = {}
- # 返回参数
- analyzer['num'] = result.params.num
- analyzer['Volatility'] = result.params.Volatility
- analyzer['rate'] = result.params.rate
- # 提取年化收益
- analyzer['年化收益率'] = result.analyzers._Returns.get_analysis()['rnorm']
- analyzer['年化收益率(%)'] = result.analyzers._Returns.get_analysis()['rnorm100']
- # 提取最大回撤(习惯用负的做大回撤,所以加了负号)
- analyzer['最大回撤(%)'] = result.analyzers._DrawDown.get_analysis()['max']['drawdown'] * (-1)
- # 提取夏普比率
- analyzer['年化夏普比率'] = result.analyzers._SharpeRatio_A.get_analysis()['sharperatio']
- return analyzer
- def backtrader(table_list, result_change, result_change_fall, err_list):
- sttime = dt.now()
- engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_front?charset=utf8')
- cerebro = bt.Cerebro(stdstats=False)
- # cerebro.addobserver(bt.observers.Broker)
- # cerebro.addobserver(bt.observers.Trades)
- # cerebro.addobserver(bt.observers.BuySell)
- # cerebro.addobserver(bt.observers.DrawDown)
- # cerebro.addobserver(bt.observers.TimeReturn)
- # cerebro.addstrategy(TestStrategy)
- cerebro.addsizer(bt.sizers.FixedSize, stake=1000)
- cerebro.broker.setcash(100000.0)
- cerebro.broker.setcommission(0.005)
- for stock in table_list:
- print(stock)
- stk_df = pd.read_sql_table(stock, engine)
- stk_df.time = pd.to_datetime(stk_df.time)
- data = MyPandasData(dataname=stk_df,
- fromdate=datetime.datetime(2022, 1, 1),
- todate=datetime.datetime(2023, 2, 1),
- datetime='time',
- open='open',
- close='close',
- high='high',
- low='low',
- volume='volume',
- # change_pct='change_pct',
- # net_amount_main='net_amount_main',
- # net_pct_main='net_pct_main',
- # net_amount_xl='net_amount_xl',
- # net_pct_xl='net_pct_xl',
- # net_amount_l='net_amount_l',
- # net_pct_l='net_pct_l',
- # net_amount_m='net_amount_m',
- # net_pct_m='net_pct_m',
- # net_amount_s='net_amount_s',
- # net_pct_s='net_pct_s',
- )
- cerebro.adddata(data, name=stock)
- cerebro.optstrategy(TestStrategy, num=range(40, 130, 10), Volatility=range(5, 8), rate=range(5, 8))
- print('最优参定义', dt.now())
- # 添加分析指标
- # 返回年初至年末的年度收益率
- # cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name='_AnnualReturn')
- # 计算最大回撤相关指标
- cerebro.addanalyzer(bt.analyzers.DrawDown, _name='_DrawDown')
- # 计算年化收益:日度收益
- cerebro.addanalyzer(bt.analyzers.Returns, _name='_Returns', tann=252)
- # 计算年化夏普比率:日度收益
- cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='_SharpeRatio', timeframe=bt.TimeFrame.Days, annualize=True,
- riskfreerate=0)
- # 计算夏普比率
- cerebro.addanalyzer(bt.analyzers.SharpeRatio_A, _name='_SharpeRatio_A')
- # 返回收益率时序
- cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='_TimeReturn')
- # 策略执行前的资金
- # print('启动资金: %.2f' % cerebro.broker.getvalue())
- cerebro.addsizer(bt.sizers.PercentSizer, percents=10)
- cerebro.addanalyzer(btanalyzers.SharpeRatio, _name="sharpe")
- cerebro.addanalyzer(btanalyzers.DrawDown, _name="drawdown")
- cerebro.addanalyzer(btanalyzers.Returns, _name="returns")
- cerebro.addanalyzer(btanalyzers.TradeAnalyzer, _name='TradeAnalyzer')
- try:
- # 策略执行
- print('开始执行', dt.now())
- results = cerebro.run(maxcpus=None)
- print('回测结束', dt.now())
- except IndexError:
- err_list.append(stock)
- else:
- par_list = [[x[0].params.num,
- x[0].params.Volatility,
- x[0].params.rate,
- x[0].analyzers.returns.get_analysis()['rnorm100'],
- x[0].analyzers.drawdown.get_analysis()['max']['drawdown'],
- x[0].analyzers.sharpe.get_analysis()['sharperatio'],
- x[0].analyzers.TradeAnalyzer.get_analysis().won.total,
- ] for x in results]
- par_df = pd.DataFrame(par_list, columns=['num', 'Volatility', 'rate', 'return', 'drawdown', 'sharpe','TradeAnalyzer'])
- print(par_df)
- # par_df.to_csv('result.csv')
- ret = []
- for i in results:
- ret.append(get_my_analyzer(i[0]))
- pd.DataFrame(ret)
- '''
- if cerebro.broker.getvalue() > 100000.0:
- result_change.append((cerebro.broker.getvalue() / 10000 - 1))
- result.append(stock)
- # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
- # print(result)
- else:
- result_change_fall.append((1 - cerebro.broker.getvalue() / 10000))
- # print('aaaaaaaaaaa')
- # print(result_change_fall)
- '''
- # if len(result) * len(result_change) * len(result_change_fall) != 0:
- # print(f'以{num}内最低值波动{Volatility}为支撑、乖离率为{rate}%,结果状态为:')
- # print('正盈利的个股为:', len(result_change), '成功率为:', len(result) / len(table_list))
- # print(
- # f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
- # print(
- # f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
- #
- # list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
- # np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
- # np.nansum(result_change_fall), np.nanmean(result_change_fall),
- # np.nanmin(result_change_fall), np.nanmax(result_change_fall)])
- # to_df(list_date)
- # endtime = dt.now()
- # print(f'{num}天波动率为{Volatility}%乖离率为{rate},myPID is {os.getpid()}.本轮耗时为{endtime - sttime}')
- # else:
- # print(result, result_change, result_change_fall, num, Volatility, rate, err_list)
- # cerebro.plot()
- # df = pd.DataFrame(
- # columns=['周期', '波动率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损',
- # '平均亏损', '最大亏损', '最小亏损'])
- if __name__ == '__main__':
- starttime = dt.now()
- print(starttime)
- fre = '1d'
- db = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='qmt_stocks')
- cursor = db.cursor()
- cursor.execute("show tables like '%%%s%%' " % fre)
- table_list = [tuple[0] for tuple in cursor.fetchall()]
- # print(table_list)
- table_list = table_list[0:2]
- result_change = []
- result_change_fall = []
- err_list = []
- stattime = dt.now()
- # print(f'{num}天波动率为{Volatility}%乖离率为{rate}')
- backtrader(table_list, result_change, result_change_fall, err_list)
- edtime = dt.now()
- print('总耗时:', edtime - starttime)
- # df.to_csv(r'C:\Users\Daniel\Documents\策略穷举2.csv', index=True)
|