230822_bt.py 14 KB


  1. # coding:utf-8
  2. import time
  3. from multiprocessing import freeze_support, Value, Lock
  4. import backtrader as bt
  5. from backtrader.feeds import PandasData
  6. import backtrader.indicators as btind
  7. from sqlalchemy import create_engine, text
  8. import pymysql
  9. from tqdm import tqdm
  10. import concurrent.futures
  11. import numpy as np
  12. import pandas as pd
  13. import platform
  14. import datetime
  15. from datetime import datetime as dt
  16. from itertools import product
  17. import psutil
  18. import logging
  19. import multiprocessing as mp
  20. from itertools import islice
  21. from func_timeout import func_set_timeout, FunctionTimedOut
  22. from functools import partial
  23. class MyPandasData(PandasData):
  24. lines = ('hl', 'dif', 'dea', 'macd', 'rsi_6', 'rsi_12', 'rsi_24',)
  25. params = (('hl', 7),
  26. ('dif', 8),
  27. ('dea', 9),
  28. ('macd', 10),
  29. ('rsi_6', 11),
  30. ('rsi_12', 12),
  31. ('rsi_24', 13),
  32. )
  33. class TestStrategy(bt.Strategy):
  34. params = (
  35. ("num", 3),
  36. ('Volatility', 0),
  37. ('rate', 3), # 注意要有逗号!!
  38. )
  39. def log(self, txt, dt=None):
  40. # 记录策略的执行日志
  41. dt = dt or self.datas[0].datetime.date(0)
  42. # print('%s, %s' % (dt.isoformat(), txt))
  43. def __init__(self):
  44. try:
  45. self.pos_price = 0
  46. self.dataclose = self.datas[0].close
  47. self.dataopen = self.datas[0].open
  48. self.high = self.datas[0].high
  49. self.low = self.datas[0].low
  50. self.volume = self.datas[0].volume
  51. self.hl = self.datas[0].hl
  52. self.dif = self.datas[0].dif
  53. self.dea = self.datas[0].dea
  54. self.macd = self.datas[0].macd
  55. self.rsi_6 = self.datas[0].rsi_6
  56. self.rsi_12 = self.datas[0].rsi_12
  57. self.rsi_24 = self.datas[0].rsi_24
  58. self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
  59. self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
  60. self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
  61. self.sma60 = btind.MovingAverageSimple(self.datas[0].close, period=60)
  62. except BaseException as e:
  63. print(f'初始化错误{e}')
  64. def notify_order(self, order):
  65. """
  66. 订单状态处理
  67. Arguments:
  68. order {object} -- 订单状态
  69. """
  70. if order.status in [order.Submitted, order.Accepted]:
  71. # 如订单已被处理,则不用做任何事情
  72. return
  73. # 检查订单是否完成
  74. if order.status in [order.Completed]:
  75. if order.isbuy():
  76. self.buyprice = order.executed.price
  77. self.buycomm = order.executed.comm
  78. self.bar_executed = len(self)
  79. # 订单因为缺少资金之类的原因被拒绝执行
  80. elif order.status in [order.Canceled, order.Margin, order.Rejected]:
  81. pass
  82. # self.log('Order Canceled/Margin/Rejected')
  83. # 订单状态处理完成,设为空
  84. self.order = None
  85. def notify_trade(self, trade):
  86. """
  87. 交易成果
  88. Arguments:
  89. trade {object} -- 交易状态
  90. """
  91. if not trade.isclosed:
  92. return
  93. # 显示交易的毛利率和净利润
  94. # self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm))
  95. def next(self):
  96. if len(self) > self.params.num:
  97. vola = self.params.Volatility / 100
  98. rate = self.params.rate / 100
  99. lowest = np.min(self.low.get(size=self.params.num))
  100. highest = np.max(self.high.get(size=self.params.num))
  101. if self.hl[-1] == 2 or self.hl[-1] == 1:
  102. m = -2
  103. # self.order = self.buy()
  104. # self.pos_price = self.low[-1]
  105. while True:
  106. if (self.hl[m] == 2 or self.hl[m] == 1) and self.macd[m] < self.macd[-1] \
  107. and self.dataclose[0] > self.sma5[0] \
  108. and self.dataclose[-1] > self.dataopen[-1] \
  109. and (self.sma10[-2] - self.sma5[-2]) > (self.sma10[-1] - self.sma5[-1]) \
  110. and self.low[-2] < self.sma5[-2] * (1 - rate) \
  111. and self.sma5[-1] < self.sma10[-1] < self.sma20[-1] < self.sma20[-2] < self.sma20[-3] \
  112. and lowest * (1 - vola) < self.low[-1] < lowest * (1 + vola):
  113. self.order = self.buy()
  114. self.pos_price = self.low[-1]
  115. break
  116. m -= 1
  117. if m + len(self) == 2:
  118. break
  119. # elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]):
  120. elif self.dataclose[0] < self.sma5[0] or self.sma5[0] < self.sma5[-1] \
  121. or self.dataclose[0] < self.pos_price or self.high[0] > self.sma5[0] * (1 + vola):
  122. self.order = self.close()
  123. self.pos_price = 0
  124. def stop(self):
  125. # pass
  126. self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
  127. def to_df(df):
  128. print('开始存数据')
  129. df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
  130. df = df.reset_index(drop=True)
  131. if platform.node() == 'DanieldeMBP.lan':
  132. df.to_csv(f"/Users/daniel/Documents/策略/策略穷举-均线粘连后底分型{dt.now().strftime('%Y%m%d%H%m%S')}.csv",
  133. index=True,
  134. encoding='utf_8_sig', mode='w')
  135. else:
  136. df.to_csv(f"C:\策略结果\策略穷举底分型_均线缠绕_只买一次{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True,
  137. encoding='utf_8_sig', mode='w')
  138. print(f'结果:, \n, {df}')
  139. def chunked_iterable(iterable, size):
  140. """将可迭代对象分割为指定大小的块"""
  141. it = iter(iterable)
  142. while True:
  143. chunk = tuple(islice(it, size))
  144. if not chunk:
  145. return
  146. yield chunk
  147. def query_database(table_name):
  148. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
  149. df = pd.read_sql_table(table_name, engine)
  150. return df
  151. def get_stock_data():
  152. while True:
  153. try:
  154. db = pymysql.connect(host='localhost',
  155. user='root',
  156. port=3307,
  157. password='r6kEwqWU9!v3',
  158. database='qmt_stocks_tech')
  159. cursor = db.cursor()
  160. cursor.execute("show tables like '%%%s%%' " % '1d')
  161. table_list = [tuple[0] for tuple in cursor.fetchall()]
  162. # table_list = table_list[0: 10]
  163. cursor.close()
  164. db.close()
  165. print(f'开始数据库读取')
  166. with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
  167. # 使用executor.map方法实现多进程并行查询数据库,得到每个表的数据,并存储在一个字典中
  168. data_dict = {table_name: df for table_name, df in
  169. tqdm(zip(table_list, executor.map(query_database, table_list)))}
  170. print(f'数据库读取完成')
  171. break
  172. except BaseException as e:
  173. print(f'数据库读取错误{e}')
  174. continue
  175. return data_dict
  176. def backtrader_test(stock_data, stock_name, num, vot, rate):
  177. # print(f'开始回测{stock_name}')
  178. try:
  179. cerebro = bt.Cerebro()
  180. stock_data.time = pd.to_datetime(stock_data.time)
  181. stock_data['HL'] = stock_data['HL'].map({'L': 1,
  182. 'LL': 2,
  183. 'L*': 3,
  184. 'H': 4,
  185. 'HH': 5,
  186. 'H*': 6,
  187. '-': 7})
  188. cerebro.addstrategy(TestStrategy, num=num, Volatility=vot, rate=rate)
  189. data = MyPandasData(dataname=stock_data,
  190. fromdate=datetime.datetime(2017, 1, 1),
  191. todate=datetime.datetime(2022, 10, 30),
  192. datetime='time',
  193. open='open_back',
  194. close='close_back',
  195. high='high_back',
  196. low='low_back',
  197. volume='volume_back',
  198. hl='HL',
  199. dif='dif',
  200. dea='dea',
  201. macd='macd',
  202. rsi_6='rsi_6',
  203. rsi_12='rsi_12',
  204. rsi_24='rsi_24',
  205. )
  206. cerebro.adddata(data)
  207. cerebro.addstrategy(TestStrategy)
  208. cerebro.broker.setcash(100000.0)
  209. cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
  210. cerebro.broker.setcommission(commission=0.001)
  211. cerebro.run()
  212. except BaseException as e:
  213. print(f'{stock_name}回测错误{e}')
  214. return np.nan
  215. # print(cerebro.broker.getvalue() - 100000.0)
  216. # print(stock_name)
  217. else:
  218. return cerebro.broker.getvalue() - 100000.0
  219. def bbt(stock_data_dict, num, Volatility, rate, err_list):
  220. async_results = []
  221. try:
  222. # 设置每一轮的任务数
  223. CHUNK_SIZE = 200 # 您可以根据需要进行调整
  224. timeout = 120
  225. max_retries = 3
  226. with concurrent.futures.ProcessPoolExecutor(max_workers=16, max_tasks_per_child=20000) as inner_executor:
  227. # 使用executor.map方法实现多进程并行计算不同参数组合的结果
  228. results = [result for result in tqdm(
  229. inner_executor.map(backtrader_test, stock_data_dict.values(), stock_data_dict.keys(), [num] * len(stock_data_dict),
  230. [Volatility] * len(stock_data_dict), [rate] * len(stock_data_dict)), desc='单轮计算进度')]
  231. except BaseException as e:
  232. print(f'计算错误{e}')
  233. # print(f'{num},{Volatility},{rate}计算完成,共计算{len(async_results)}个股票')
  234. # outputs = [result.get() for result in async_results]
  235. print(f'{num},{Volatility},{rate}计算完成,共计算{len(results)}个股票')
  236. print(f'计算结果{results}')
  237. return results
  238. def tdf(tt, num, Volatility, rate):
  239. num_nan = np.isnan(tt).sum() # Count NaN values
  240. print(f'num_nan={num_nan}')
  241. filtered_result = [r for r in tt if not np.isnan(r)] # Filter out NaN values
  242. print(f'filtered_result={filtered_result}')
  243. # Calculate statistics
  244. num_profits = len([r for r in tt if r > 0])
  245. num_losses = len([r for r in tt if r < 0])
  246. profit_ratio = num_profits / (len(filtered_result))
  247. total_profit = sum([r for r in tt if r > 0])
  248. avg_profit = total_profit / num_profits if num_profits else 0
  249. max_profit = max(tt)
  250. min_profit = min([r for r in tt if r > 0]) if num_profits else 0
  251. total_loss = sum([r for r in tt if r < 0])
  252. avg_loss = total_loss / num_losses if num_losses else 0
  253. max_loss = min(tt)
  254. min_loss = max([r for r in tt if r < 0]) if num_losses else 0
  255. # Append the results into the DataFrame
  256. result_dict = {'周期': num, '波动率': Volatility, 'MA5斜率': rate, '盈利个数': num_profits,
  257. '盈利比例': profit_ratio, '总盈利': total_profit, '平均盈利': avg_profit,
  258. '最大盈利': max_profit, '最小盈利': min_profit, '总亏损': total_loss,
  259. '平均亏损': avg_loss, '最大亏损': max_loss, '最小亏损': min_loss, '未计算个股数': num_nan}
  260. df_t = pd.Series(result_dict)
  261. return df_t
  262. if __name__ == '__main__':
  263. logger = mp.log_to_stderr()
  264. logger.setLevel(logging.DEBUG)
  265. # cpu_list = list(range(23))
  266. # pus = psutil.Process()
  267. # pus.cpu_affinity(cpu_list)
  268. start_time = dt.now()
  269. # 定义需要穷举的参数值
  270. nums = range(60, 120, 20)
  271. Volatilitys = range(5, 13, 1)
  272. rates = range(3, 8, 1)
  273. # 生成所有参数组合
  274. all_combinations = list(product(nums, Volatilitys, rates))
  275. print(f'共需计算{len(all_combinations)}次')
  276. # 获取数据
  277. stock_data_dict = get_stock_data()
  278. results = []
  279. df = pd.DataFrame(
  280. columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利',
  281. '总亏损',
  282. '平均亏损', '最大亏损', '最小亏损', '未计算个股数'])
  283. err_list = []
  284. # 设置每一轮的任务数
  285. CHUNK_SIZE = 200 # 您可以根据需要进行调整
  286. timeout = 120
  287. max_retries = 3
  288. with concurrent.futures.ProcessPoolExecutor(max_workers=24) as inner_executor:
  289. for num, Volatility, rate in tqdm(all_combinations, desc='计算进度'):
  290. while True:
  291. try:
  292. # 使用executor.map方法实现多进程并行计算不同参数组合的结果
  293. res = [result for result in tqdm(
  294. inner_executor.map(backtrader_test, stock_data_dict.values(), stock_data_dict.keys(),
  295. [num] * len(stock_data_dict),
  296. [Volatility] * len(stock_data_dict), [rate] * len(stock_data_dict)),
  297. desc='单轮计算进度')]
  298. except BaseException as e:
  299. print(f'计算错误{e}')
  300. inner_executor = concurrent.futures.ProcessPoolExecutor(max_workers=16)
  301. else:
  302. results.append(res)
  303. df_t = tdf(res, num, Volatility, rate)
  304. df = pd.concat([df, df_t.to_frame().T], ignore_index=True)
  305. break
  306. time.sleep(1)
  307. print(f'{num},{Volatility},{rate}计算完成,共计算{len(res)}个股票')
  308. print(df)
  309. print('循环结束')
  310. to_df(df)
  311. print(f'计算完成,共耗时{dt.now() - start_time}秒')