123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- import os
- import numpy as np
- from sqlalchemy import create_engine
- import pandas as pd
- import pymysql
- import backtrader as bt
- import backtrader.indicators as btind
- import datetime
- import math
- from datetime import datetime as dt
- import multiprocessing as mp
- from backtrader.feeds import PandasData
- from numba import jit, cuda, njit
- class MyPandasData(PandasData):
- lines = ()
- params = ()
- '''
- lines = ('change_pct', 'net_amount_main', 'net_pct_main', 'net_amount_xl', 'net_pct_xl', 'net_amount_l', 'net_pct_l'
- , 'net_amount_m', 'net_pct_m', 'net_amount_s', 'net_pct_s',)
- params = (('change_pct', 7),
- ('net_amount_main', 8),
- ('net_pct_main', 9),
- ('net_amount_xl', 10),
- ('net_pct_xl', 11),
- ('net_amount_l', 12),
- ('net_pct_l', 13),
- ('net_amount_m', 14),
- ('net_pct_m', 15),
- ('net_amount_s', 16),
- ('net_pct_s', 17),
- )
- '''
- class TestStrategy(bt.Strategy):
- params = (
- ("num", 3),
- ('Volatility', 0),
- ('rate', 5),
- )
- def log(self, txt, dt=None):
- ''' Logging function for this strategy'''
- dt = dt or self.datas[0].datetime.date(0)
-
- def __init__(self):
- print('__init__', dt.now())
- print(f'{self.params.num}天波动率为{self.params.Volatility}%乖离率为{self.params.rate}', 'myPID is ', os.getpid())
-
-
-
- self.dataclose = self.datas[0].close
- self.dataopen = self.datas[0].open
- self.high = self.datas[0].high
- self.low = self.datas[0].low
- self.volume = self.datas[0].volume
-
-
-
-
-
-
-
- self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
- self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
- self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
- self.yx = self.dataclose[0] > self.dataopen[0]
- self.lowest = btind.Lowest(self.params.num)
- self.highest = btind.Highest(self.params.num)
- self.vola = self.params.Volatility / 100
- self.rate = self.params.rate / 100
- def notify_order(self, order):
- """
- 订单状态处理
- Arguments:
- order {object} -- 订单状态
- """
- if order.status in [order.Submitted, order.Accepted]:
-
- return
-
- if order.status in [order.Completed]:
- if order.isbuy():
- self.buyprice = order.executed.price
- self.buycomm = order.executed.comm
- self.bar_executed = len(self)
-
- elif order.status in [order.Canceled, order.Margin, order.Rejected]:
- pass
-
-
- self.order = None
- def notify_trade(self, trade):
- """
- 交易成果
- Arguments:
- trade {object} -- 交易状态
- """
- if not trade.isclosed:
- return
-
-
- @njit
- def next(self):
-
-
-
-
-
-
- if len(self) > self.params.num:
-
- if self.yx \
- and (((self.lowest[0] * (1 - self.vola)) < self.low[-2] < (self.lowest[0] * (1 + self.vola))) or (
- (self.lowest[0] * (1 - self.vola)) < self.low[-1] < (self.lowest[0] * (1 + self.vola)))) \
- and (self.dataclose[0] > self.sma5[0]) and self.sma5[0] > self.sma5[-1] \
- and (not self.position) and (self.sma5[0] > self.sma10[0]):
-
- self.order = self.buy()
- elif self.dataclose < self.sma5[0] or self.sma5[0] < self.sma10[0] \
- or (self.dataclose[0] > (self.sma5[0] * (1 + self.rate))) or \
- (((self.highest[0] * (1 - self.vola)) < self.high[-2] < (self.highest[0] * (1 + self.vola))) or (
- (self.highest[0] * (1 - self.vola)) < self.high[-1] < (self.highest[0] * (1 + self.vola)))):
- self.order = self.close()
-
- def stop(self):
-
- self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
- def err_call_back(err):
- print(f'出错啦~ error:{str(err)}')
- def to_df(lt):
- df = pd.DataFrame(list(lt), columns=['周期', '波动率', '乖离率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
- '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损'])
- df.sort_values(by=['周期', '波动率', '乖离率'], ascending=True, inplace=True)
- df = df.reset_index(drop=True)
- df.to_csv(r'D:\Daniel\策略\策略穷举.csv', index=True, encoding='utf-8', mode='w')
- print(df)
- def backtrader(list_date, table_list, result, result_change, result_change_fall, err_list):
- sttime = dt.now()
- engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
- cerebro = bt.Cerebro()
-
- cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
- cerebro.broker.setcash(100000.0)
- cerebro.broker.setcommission(0.005)
- for stock in table_list:
-
- stk_df = pd.read_sql_table(stock, engine)
- stk_df.time = pd.to_datetime(stk_df.time)
- data = MyPandasData(dataname=stk_df,
- fromdate=datetime.datetime(2010, 1, 1),
- todate=datetime.datetime(2022, 12, 31),
- datetime='time',
- open='open',
- close='close',
- high='high',
- low='low',
- volume='volume',
-
-
-
-
-
-
-
-
-
-
-
- )
- cerebro.adddata(data, name=stock)
- print('取值完成', dt.now())
- cerebro.optstrategy(TestStrategy, num=range(60, 80, 20), Volatility=range(3, 7), rate=range(5, 12))
- print('最优参定义', dt.now())
- cerebro.addanalyzer(bt.analyzers.PyFolio)
-
-
- try:
-
- print('开始执行', dt.now())
- cerebro.run(maxcpus=None)
- except IndexError:
- err_list.append(stock)
- else:
- if cerebro.broker.getvalue() > 100000.0:
- result_change.append((cerebro.broker.getvalue() / 10000 - 1))
- result.append(stock)
-
-
- else:
- result_change_fall.append((1 - cerebro.broker.getvalue() / 10000))
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- if __name__ == '__main__':
- starttime = dt.now()
- print(starttime)
-
-
-
- fre = '1d'
- db = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='qmt_stocks')
- cursor = db.cursor()
- cursor.execute("show tables like '%%%s%%' " % fre)
- table_list = [tuple[0] for tuple in cursor.fetchall()]
-
- table_list = table_list[0:100]
- list_date = mp.Manager().list()
- thread_list = []
- pool = mp.Pool(processes=mp.cpu_count())
-
-
-
- step = math.ceil(len(table_list) / mp.cpu_count())
- result = []
- result_change = []
- result_change_fall = []
- err_list = []
-
- backtrader(list_date, table_list, result, result_change, result_change_fall,
- err_list)
-
- stattime = dt.now()
- print(stattime)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- pool.close()
- pool.join()
- edtime = dt.now()
- print('总耗时:', edtime - starttime)
-
|