123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- import os
- import traceback
- import numpy as np
- from sqlalchemy import create_engine
- import pandas as pd
- import pymysql
- import backtrader as bt
- import backtrader.indicators as btind
- import datetime
- import math
- from datetime import datetime as dt
- import multiprocessing as mp
- from backtrader.feeds import PandasData
- import platform
- import psutil
- # import multiprocessing
- # import matplotlib
- class MyPandasData(PandasData):
- lines = ('hl', 'dif', 'dea', 'macd', 'rsi_6', 'rsi_12', 'rsi_24',)
- params = (('hl', 7),
- ('dif', 8),
- ('dea', 9),
- ('macd', 10),
- ('rsi_6', 11),
- ('rsi_12', 12),
- ('rsi_24', 13),
- )
- '''
- lines = ('change_pct', 'net_amount_main', 'net_pct_main', 'net_amount_xl', 'net_pct_xl', 'net_amount_l', 'net_pct_l'
- , 'net_amount_m', 'net_pct_m', 'net_amount_s', 'net_pct_s',)
- params = (('change_pct', 7),
- ('net_amount_main', 8),
- ('net_pct_main', 9),
- ('net_amount_xl', 10),
- ('net_pct_xl', 11),
- ('net_amount_l', 12),
- ('net_pct_l', 13),
- ('net_amount_m', 14),
- ('net_pct_m', 15),
- ('net_amount_s', 16),
- ('net_pct_s', 17),
- )
- '''
- class TestStrategy(bt.Strategy):
- params = (
- ("num", 3),
- ('Volatility', 0),
- ('rate', 3), # 注意要有逗号!!
- )
- def log(self, txt, dt=None):
- ''' Logging function for this strategy'''
- dt = dt or self.datas[0].datetime.date(0)
- # print('%s, %s' % (dt.isoformat(), txt))
- def __init__(self):
- # self.num = num
- # self.Volatility = Volatility/100
- # Keep a reference to the "close" line in the data[0] dataseries
- self.pos_price = 0
- self.dataclose = self.datas[0].close
- self.dataopen = self.datas[0].open
- self.high = self.datas[0].high
- self.low = self.datas[0].low
- self.volume = self.datas[0].volume
- self.hl = self.datas[0].hl
- self.dif = self.datas[0].dif
- self.dea = self.datas[0].dea
- self.macd = self.datas[0].macd
- self.rsi_6 = self.datas[0].rsi_6
- self.rsi_12 = self.datas[0].rsi_12
- self.rsi_24 = self.datas[0].rsi_24
- # self.change_pct = self.datas[0].change_pct
- # self.net_amount_main = self.datas[0].net_amount_main
- # self.net_pct_main = self.datas[0].net_pct_main
- # self.net_amount_xl = self.datas[0].net_amount_xl
- # self.net_pct_xl = self.datas[0].net_pct_xl
- # self.net_amount_l = self.datas[0].net_amount_l
- # self.net_pct_l = self.datas[0].net_pct_l
- self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
- self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
- self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
- self.sma60 = btind.MovingAverageSimple(self.datas[0].close, period=60)
- def notify_order(self, order):
- """
- 订单状态处理
- Arguments:
- order {object} -- 订单状态
- """
- if order.status in [order.Submitted, order.Accepted]:
- # 如订单已被处理,则不用做任何事情
- return
- # 检查订单是否完成
- if order.status in [order.Completed]:
- if order.isbuy():
- self.buyprice = order.executed.price
- self.buycomm = order.executed.comm
- self.bar_executed = len(self)
- # 订单因为缺少资金之类的原因被拒绝执行
- elif order.status in [order.Canceled, order.Margin, order.Rejected]:
- pass
- # self.log('Order Canceled/Margin/Rejected')
- # 订单状态处理完成,设为空
- self.order = None
- def notify_trade(self, trade):
- """
- 交易成果
- Arguments:
- trade {object} -- 交易状态
- """
- if not trade.isclosed:
- return
- # 显示交易的毛利率和净利润
- # self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm))
- def next(self):
- # print(self.num,self.Volatility)
- # Simply log the closing price of the series from the reference
- # self.sma20[-2] < self.sma20[-1] < self.sma20[0] and self.sma10[-2] < self.sma10[-1] < self.sma10[0]
- # and (self.sma5[-1] < self.sma10[-1])
- # and (self.net_pct_l[0] > 10) and (self.net_pct_xl[0] > 3) \
- # and (self.net_amount_main[-1] > 0) and (self.net_amount_main[0] > 0)
- if len(self) > self.params.num:
- vola = self.params.Volatility / 100
- rate = self.params.rate / 100
- lowest = np.min(self.low.get(size=self.params.num))
- highest = np.max(self.high.get(size=self.params.num))
- # > self.sma5[-1]
- # and (((lowest * (1 - vola)) < self.low[-2] < (lowest * (1 + vola))) or (
- # (lowest * (1 - vola)) < self.low[-1] < (lowest * (1 + vola)))) \
- if self.hl[-1] == 2 or self.hl[-1] == 1:
- m = -2
- self.order = self.buy()
- self.pos_price = self.low[-1]
- while True:
- if (self.hl[m] == 2 or self.hl[m] == 1) and self.macd[m] > self.macd[-1] \
- and self.dataclose[0] > self.sma5[0] \
- and self.dataclose[-1] > self.dataopen[-1] \
- and (self.sma10[-2] - self.sma5[-2]) < (self.sma10[-1] - self.sma5[-1]) \
- and self.low[-2] < self.sma5[-2] * (1 - rate) \
- and self.sma5[-1] < self.sma10[-1] < self.sma20[-1] < self.sma20[-2] < self.sma20[-3]\
- and lowest*(1-vola) < self.low[-1] < lowest * (1 + vola):
- self.order = self.buy()
- self.pos_price = self.low[-1]
- break
- m -= 1
- if m + len(self) == 2:
- break
- # elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]):
- elif self.dataclose[0] < self.sma5[0] or self.sma5[0] < self.sma5[-1] \
- or self.dataclose[0] < self.pos_price or self.high[0] > self.sma5[0] * (1 + vola):
- self.order = self.close()
- self.pos_price = 0
- def stop(self):
- # pass
- self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
- def err_call_back(err):
- print(f'出错啦~ error:{str(err)}')
- traceback.format_exc(err)
- def to_df(lt):
- print('开始存数据')
- df = pd.DataFrame(list(lt),
- columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
- '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比'])
- df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
- df = df.reset_index(drop=True)
- if platform.node() == 'DanieldeMBP.lan':
- df.to_csv(f"/Users/daniel/Documents/策略/策略穷举{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True,
- encoding='utf_8_sig', mode='w')
- else:
- df.to_csv(f"C:\Daniel\策略\策略穷举_底分且均线扩散_rate低点{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True, encoding='utf_8_sig',
- mode='w')
- print(f'结果:{df}')
- def backtrader(list_date, table_list, result, result_change, result_change_fall, num, Volatility, rate, err_list):
- print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}', 'myPID is ', os.getpid())
- sttime = dt.now()
- engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',pool_size=4000)
- for stock in table_list:
- # print(stock)
- stk_df = pd.read_sql_table(stock, engine.connect())
- stk_df.time = pd.to_datetime(stk_df.time)
- # stk_df = stk_df[stk_df['HL'] != '-']
- try:
- stk_df['HL'] = stk_df['HL'].map({'L': 1,
- 'LL': 2,
- 'L*': 3,
- 'H': 4,
- 'HH': 5,
- 'H*': 6,
- '-': 7})
- except BaseException:
- print(f'{stock}数据不全,不做测试')
- else:
- if len(stk_df) > 60:
- cerebro = bt.Cerebro()
- cerebro.addstrategy(TestStrategy, num=num, Volatility=Volatility, rate=rate)
- cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
- data = MyPandasData(dataname=stk_df,
- fromdate=datetime.datetime(2017, 1, 1),
- todate=datetime.datetime(2022, 10, 30),
- datetime='time',
- open='open_back',
- close='close_back',
- high='high_back',
- low='low_back',
- volume='volume_back',
- hl='HL',
- dif='dif',
- dea='dea',
- macd='macd',
- rsi_6='rsi_6',
- rsi_12='rsi_12',
- rsi_24='rsi_24',
- )
- # print('取值完成')
- cerebro.adddata(data, name=stock)
- cerebro.broker.setcash(100000.0)
- cerebro.broker.setcommission(0.005)
- cerebro.addanalyzer(bt.analyzers.PyFolio)
- # 策略执行前的资金
- # print('启动资金: %.2f' % cerebro.broker.getvalue())
- try:
- # 策略执行
- cerebro.run()
- except IndexError as e:
- err_list.append(stock)
- # print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}的{stock}错误')
- # print(e)
- else:
- if cerebro.broker.getvalue() > 100000.0:
- result_change.append(cerebro.broker.getvalue() - 100000)
- result.append(stock)
- # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
- # print(result)
- else:
- result_change_fall.append(cerebro.broker.getvalue() - 100000)
- # print('aaaaaaaaaaa')
- # print(result_change_fall)
- print(f'计算总数={len(result) + len(result_change_fall)}')
- if len(result) * len(result_change) * len(result_change_fall) != 0:
- print(f'以{num}内最低值波动{Volatility}为支撑、MA5斜率为{rate}%,结果状态为:')
- print('正盈利的个股为:', len(result), '成功率为:', len(result) / len(table_list))
- print(
- f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change) / len(result)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
- print(
- f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall) / len(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
- # '周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比']
- list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
- np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
- np.nansum(result_change_fall), np.nanmean(result_change_fall),
- np.nanmin(result_change_fall), np.nanmax(result_change_fall),
- len(result_change) / len(result_change_fall)])
- to_df(list_date)
- endtime = dt.now()
- print(f'{num}天波动率为{Volatility}%MA5斜率为{rate},myPID is {os.getpid()}.本轮耗时为{endtime - sttime}')
- else:
- print('阿欧', len(result), len(result_change), len(result_change_fall), num, Volatility, rate, err_list)
- list_date.append([num, Volatility, rate, 0, len(result) / len(table_list), len(result),
- len(result), len(result), len(result), len(result), len(result), len(result), 0])
- to_df(list_date)
- # cerebro.plot()
- # df = pd.DataFrame(
- # columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损',
- # '平均亏损', '最大亏损', '最小亏损'])
- if __name__ == '__main__':
- starttime = dt.now()
- print(starttime)
- pus = psutil.Process()
- # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
- # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
- # print(type(platform.node()))
- # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8', poolclass=NullPool)
- # stocks = pd.read_sql_query(
- # 'select value from MA5_1d', engine_hlfx)
- fre = '1d'
- db = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='qmt_stocks_tech')
- cursor = db.cursor()
- cursor.execute("show tables like '%%%s%%' " % fre)
- table_list = [tuple[0] for tuple in cursor.fetchall()]
- # print(table_list)
- # table_list = table_list[0:500]
- print(f'计算个股数为:{len(table_list)}')
- list_date = mp.Manager().list()
- thread_list = []
- # pool = mp.Pool(processes=mp.cpu_count())
- pool = mp.Pool(processes=24)
- for num in range(80, 200, 20):
- for Volatility in range(3, 13, 1):
- for rate in range(3, 13, 1):
- # step = math.ceil(len(table_list) / mp.cpu_count())
- result = []
- result_change = []
- result_change_fall = []
- err_list = []
- print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}')
- # for i in range(0, len(table_list), step):
- stattime = dt.now()
- # thd = threading.local()
- # print(i)
- # p = mp.Process(target=backtrader, args=(df, table_list, result, result_change, result_change_fall,
- # num, Volatility, rate, err_list))
- # thread_list.append(p)
- pool.apply_async(func=backtrader,
- args=(list_date, table_list, result, result_change, result_change_fall,
- num, Volatility, rate, err_list,), error_callback=err_call_back)
- pool.close()
- pool.join()
- edtime = dt.now()
- print('总耗时:', edtime - starttime)
- # df.to_csv(r'C:\Users\Daniel\Documents\策略穷举2.csv', index=True)
|