230723 _bt.py 17 KB


  1. import os
  2. import traceback
  3. import numpy as np
  4. from sqlalchemy import create_engine
  5. import pandas as pd
  6. import pymysql
  7. import backtrader as bt
  8. import backtrader.indicators as btind
  9. import datetime
  10. import math
  11. from datetime import datetime as dt
  12. import multiprocessing as mp
  13. from multiprocessing import Pool, Lock, Value
  14. from backtrader.feeds import PandasData
  15. import platform
  16. import psutil
  17. import logging
  18. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
  19. max_overflow=-1)
  20. lock = Lock()
  21. counter = Value('i', 0)
  22. class MyPandasData(PandasData):
  23. lines = ('hl', 'dif', 'dea', 'macd', 'rsi_6', 'rsi_12', 'rsi_24',)
  24. params = (('hl', 7),
  25. ('dif', 8),
  26. ('dea', 9),
  27. ('macd', 10),
  28. ('rsi_6', 11),
  29. ('rsi_12', 12),
  30. ('rsi_24', 13),
  31. )
  32. '''
  33. lines = ('change_pct', 'net_amount_main', 'net_pct_main', 'net_amount_xl', 'net_pct_xl', 'net_amount_l', 'net_pct_l'
  34. , 'net_amount_m', 'net_pct_m', 'net_amount_s', 'net_pct_s',)
  35. params = (('change_pct', 7),
  36. ('net_amount_main', 8),
  37. ('net_pct_main', 9),
  38. ('net_amount_xl', 10),
  39. ('net_pct_xl', 11),
  40. ('net_amount_l', 12),
  41. ('net_pct_l', 13),
  42. ('net_amount_m', 14),
  43. ('net_pct_m', 15),
  44. ('net_amount_s', 16),
  45. ('net_pct_s', 17),
  46. )
  47. '''
  48. class TestStrategy(bt.Strategy):
  49. params = (
  50. ("num", 3),
  51. ('Volatility', 0),
  52. ('rate', 3), # 注意要有逗号!!
  53. )
  54. def log(self, txt, dt=None):
  55. ''' Logging function for this strategy'''
  56. dt = dt or self.datas[0].datetime.date(0)
  57. # print('%s, %s' % (dt.isoformat(), txt))
  58. def __init__(self):
  59. # self.num = num
  60. # self.Volatility = Volatility/100
  61. # Keep a reference to the "close" line in the data[0] dataseries
  62. self.pos_price = 0
  63. self.dataclose = self.datas[0].close
  64. self.dataopen = self.datas[0].open
  65. self.high = self.datas[0].high
  66. self.low = self.datas[0].low
  67. self.volume = self.datas[0].volume
  68. self.hl = self.datas[0].hl
  69. self.dif = self.datas[0].dif
  70. self.dea = self.datas[0].dea
  71. self.macd = self.datas[0].macd
  72. self.rsi_6 = self.datas[0].rsi_6
  73. self.rsi_12 = self.datas[0].rsi_12
  74. self.rsi_24 = self.datas[0].rsi_24
  75. # self.change_pct = self.datas[0].change_pct
  76. # self.net_amount_main = self.datas[0].net_amount_main
  77. # self.net_pct_main = self.datas[0].net_pct_main
  78. # self.net_amount_xl = self.datas[0].net_amount_xl
  79. # self.net_pct_xl = self.datas[0].net_pct_xl
  80. # self.net_amount_l = self.datas[0].net_amount_l
  81. # self.net_pct_l = self.datas[0].net_pct_l
  82. self.sma5 = btind.MovingAverageSimple(self.datas[0].close, period=5)
  83. self.sma10 = btind.MovingAverageSimple(self.datas[0].close, period=10)
  84. self.sma20 = btind.MovingAverageSimple(self.datas[0].close, period=20)
  85. self.sma60 = btind.MovingAverageSimple(self.datas[0].close, period=60)
  86. def notify_order(self, order):
  87. """
  88. 订单状态处理
  89. Arguments:
  90. order {object} -- 订单状态
  91. """
  92. if order.status in [order.Submitted, order.Accepted]:
  93. # 如订单已被处理,则不用做任何事情
  94. return
  95. # 检查订单是否完成
  96. if order.status in [order.Completed]:
  97. if order.isbuy():
  98. self.buyprice = order.executed.price
  99. self.buycomm = order.executed.comm
  100. self.bar_executed = len(self)
  101. # 订单因为缺少资金之类的原因被拒绝执行
  102. elif order.status in [order.Canceled, order.Margin, order.Rejected]:
  103. pass
  104. # self.log('Order Canceled/Margin/Rejected')
  105. # 订单状态处理完成,设为空
  106. self.order = None
  107. def notify_trade(self, trade):
  108. """
  109. 交易成果
  110. Arguments:
  111. trade {object} -- 交易状态
  112. """
  113. if not trade.isclosed:
  114. return
  115. # 显示交易的毛利率和净利润
  116. # self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' % (trade.pnl, trade.pnlcomm))
  117. def next(self):
  118. # print(self.num,self.Volatility)
  119. # Simply log the closing price of the series from the reference
  120. # self.sma20[-2] < self.sma20[-1] < self.sma20[0] and self.sma10[-2] < self.sma10[-1] < self.sma10[0]
  121. # and (self.sma5[-1] < self.sma10[-1])
  122. # and (self.net_pct_l[0] > 10) and (self.net_pct_xl[0] > 3) \
  123. # and (self.net_amount_main[-1] > 0) and (self.net_amount_main[0] > 0)
  124. if len(self) > self.params.num:
  125. vola = self.params.Volatility / 100
  126. rate = self.params.rate / 100
  127. lowest = np.min(self.low.get(size=self.params.num))
  128. highest = np.max(self.high.get(size=self.params.num))
  129. # > self.sma5[-1]
  130. # and (((lowest * (1 - vola)) < self.low[-2] < (lowest * (1 + vola))) or (
  131. # (lowest * (1 - vola)) < self.low[-1] < (lowest * (1 + vola)))) \
  132. if self.hl[-1] == 2 or self.hl[-1] == 1:
  133. m = -2
  134. self.order = self.buy()
  135. self.pos_price = self.low[-1]
  136. while True:
  137. if (self.hl[m] == 2 or self.hl[m] == 1) and self.macd[m] > self.macd[-1] \
  138. and self.dataclose[0] > self.sma5[0] \
  139. and self.dataclose[-1] > self.dataopen[-1] \
  140. and (self.sma10[-2] - self.sma5[-2]) < (self.sma10[-1] - self.sma5[-1]) \
  141. and self.low[-2] < self.sma5[-2] * (1 - rate) \
  142. and self.sma5[-1] < self.sma10[-1] < self.sma20[-1] < self.sma20[-2] < self.sma20[-3] \
  143. and lowest * (1 - vola) < self.low[-1] < lowest * (1 + vola):
  144. self.order = self.buy()
  145. self.pos_price = self.low[-1]
  146. break
  147. m -= 1
  148. if m + len(self) == 2:
  149. break
  150. # elif (self.hl[0] == 5 or self.dataclose[0] < self.sma5[0]):
  151. elif self.dataclose[0] < self.sma5[0] or self.sma5[0] < self.sma5[-1] \
  152. or self.dataclose[0] < self.pos_price or self.high[0] > self.sma5[0] * (1 + vola):
  153. self.order = self.close()
  154. self.pos_price = 0
  155. def stop(self):
  156. # pass
  157. self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
  158. def err_call_back(err):
  159. print(f'出错啦~ error:{str(err)}')
  160. traceback.format_exc(err)
  161. def to_df(lt):
  162. print('开始存数据')
  163. df = pd.DataFrame(list(lt),
  164. columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
  165. '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比'])
  166. df.sort_values(by=['周期', '波动率', 'MA5斜率'], ascending=True, inplace=True)
  167. df = df.reset_index(drop=True)
  168. if platform.node() == 'DanieldeMBP.lan':
  169. df.to_csv(f"/Users/daniel/Documents/策略/策略穷举-均线粘连后底分型{dt.now().strftime('%Y%m%d%H%m%S')}.csv",
  170. index=True,
  171. encoding='utf_8_sig', mode='w')
  172. else:
  173. df.to_csv(f"C:\Daniel\策略\策略穷举底分型_均线缠绕_只买一次{dt.now().strftime('%Y%m%d%H%m%S')}.csv", index=True,
  174. encoding='utf_8_sig', mode='w')
  175. print(f'结果:, \n, {df}')
  176. def backtrader(table_list, stock, result, result_change, result_change_fall, num, Volatility, rate, err_list):
  177. global engine, counter, lock
  178. conn = engine.connect()
  179. stk_df = pd.read_sql_table(stock, conn)
  180. stk_df.time = pd.to_datetime(stk_df.time)
  181. # stk_df = stk_df[stk_df['HL'] != '-']
  182. try:
  183. stk_df['HL'] = stk_df['HL'].map({'L': 1,
  184. 'LL': 2,
  185. 'L*': 3,
  186. 'H': 4,
  187. 'HH': 5,
  188. 'H*': 6,
  189. '-': 7})
  190. except BaseException:
  191. print(f'{stock}数据不全,不做测试')
  192. else:
  193. conn.close()
  194. if len(stk_df) > 60:
  195. cerebro = bt.Cerebro()
  196. cerebro.addstrategy(TestStrategy, num=num, Volatility=Volatility, rate=rate)
  197. cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
  198. data = MyPandasData(dataname=stk_df,
  199. fromdate=datetime.datetime(2017, 1, 1),
  200. todate=datetime.datetime(2022, 10, 30),
  201. datetime='time',
  202. open='open_back',
  203. close='close_back',
  204. high='high_back',
  205. low='low_back',
  206. volume='volume_back',
  207. hl='HL',
  208. dif='dif',
  209. dea='dea',
  210. macd='macd',
  211. rsi_6='rsi_6',
  212. rsi_12='rsi_12',
  213. rsi_24='rsi_24',
  214. )
  215. # print('取值完成')
  216. cerebro.adddata(data, name=stock)
  217. cerebro.broker.setcash(100000.0)
  218. cerebro.broker.setcommission(0.005)
  219. cerebro.addanalyzer(bt.analyzers.PyFolio)
  220. # 策略执行前的资金
  221. # print('启动资金: %.2f' % cerebro.broker.getvalue())
  222. try:
  223. # 策略执行
  224. cerebro.run()
  225. except IndexError as e:
  226. err_list.append(stock)
  227. # print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}的{stock}错误')
  228. # print(e)
  229. else:
  230. if cerebro.broker.getvalue() > 100000.0:
  231. result_change.append(cerebro.broker.getvalue() - 100000)
  232. result.append(stock)
  233. # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
  234. # print(result)
  235. elif cerebro.broker.getvalue() <= 100000.0:
  236. result_change_fall.append(cerebro.broker.getvalue() - 100000)
  237. # print('aaaaaaaaaaa')
  238. # print(result_change_fall)
  239. # print('最终资金: %.2f' % cerebro.broker.getvalue())
  240. finally:
  241. with lock:
  242. counter.value += 1
  243. logging.info('执行完成:(%d / %d) 进程号: %d --------------- %s', counter.value, len(table_list), os.getpid(), stock)
  244. # print(f'已计算{counter.value}/{len(table_list)}只股票')
  245. # print(f'已计算{(len(result) + len(result_change_fall)+len(err_list))}/{len(table_list)}只股票')
  246. '''
  247. if len(result) * len(result_change) * len(result_change_fall) != 0:
  248. print(f'以{num}内最低值波动{Volatility}为支撑、MA5斜率为{rate}%,结果状态为:')
  249. print('正盈利的个股为:', len(result), '成功率为:', len(result) / len(table_list))
  250. print(
  251. f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change) / len(result)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
  252. print(
  253. f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall) / len(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
  254. # '周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损', '盈亏对比']
  255. list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
  256. np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
  257. np.nansum(result_change_fall), np.nanmean(result_change_fall),
  258. np.nanmin(result_change_fall), np.nanmax(result_change_fall),
  259. len(result_change) / len(result_change_fall)])
  260. # to_df(list_date)
  261. endtime = dt.now()
  262. print(f'{num}天波动率为{Volatility}%MA5斜率为{rate},myPID is {os.getpid()}.本轮耗时为{endtime - sttime}')
  263. else:
  264. print('阿欧', len(result), len(result_change), len(result_change_fall), num, Volatility, rate, err_list)
  265. list_date.append([num, Volatility, rate, 0, len(result) / len(table_list), len(result),
  266. len(result), len(result), len(result), len(result), len(result), len(result), 0])
  267. '''
  268. # list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
  269. # np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
  270. # np.nansum(result_change_fall), np.nanmean(result_change_fall),
  271. # np.nanmin(result_change_fall), np.nanmax(result_change_fall),
  272. # len(result_change) / len(result_change_fall)])
  273. # cerebro.plot()
  274. # df = pd.DataFrame(
  275. # columns=['周期', '波动率', 'MA5斜率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损',
  276. # '平均亏损', '最大亏损', '最小亏损'])
  277. if __name__ == '__main__':
  278. logger = mp.log_to_stderr()
  279. logger.setLevel(logging.INFO)
  280. starttime = dt.now()
  281. print(starttime)
  282. pus = psutil.Process()
  283. fre = '1d'
  284. db = pymysql.connect(host='localhost',
  285. user='root',
  286. port=3307,
  287. password='r6kEwqWU9!v3',
  288. database='qmt_stocks_tech')
  289. cursor = db.cursor()
  290. cursor.execute("show tables like '%%%s%%' " % fre)
  291. table_list = [tuple[0] for tuple in cursor.fetchall()]
  292. # print(table_list)
  293. # table_list = table_list[0:500]
  294. print(f'计算个股数为:{len(table_list)}')
  295. list_date = []
  296. thread_list = []
  297. pool = mp.Pool(processes=mp.cpu_count())
  298. # pool = mp.Pool(processes=8)
  299. for num in range(60, 80, 20):
  300. for Volatility in range(7, 8, 1):
  301. for rate in range(3, 4, 1):
  302. # step = math.ceil(len(table_list) / mp.cpu_count())
  303. result = mp.Manager().list()
  304. result_change = mp.Manager().list()
  305. result_change_fall = mp.Manager().list()
  306. err_list = mp.Manager().list()
  307. print(f'{num}天波动率为{Volatility}%MA5斜率为{rate}')
  308. # for i in range(0, len(table_list), step):
  309. stattime = dt.now()
  310. # 保存AsyncResult对象的列表
  311. async_results = []
  312. # thd = threading.local()
  313. # print(i)
  314. # p = mp.Process(target=backtrader, args=(df, table_list, result, result_change, result_change_fall,
  315. # num, Volatility, rate, err_list))
  316. # thread_list.append(p)
  317. for stock in table_list:
  318. async_result = pool.apply_async(func=backtrader,
  319. args=(table_list, stock, result, result_change, result_change_fall,
  320. num, Volatility, rate, err_list,),
  321. error_callback=err_call_back)
  322. async_results.append(async_result)
  323. # p.start()
  324. pool.close()
  325. pool.join()
  326. # 统计返回为 None 的结果数量
  327. none_count = 0
  328. for i, result_async in enumerate(async_results):
  329. _ = result_async.get() # 获取任务的结果
  330. if _ is None:
  331. none_count += 1
  332. print(f'计算总数={len(result) + len(result_change_fall)}\n计数为:{none_count}')
  333. list_date.append(
  334. [num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
  335. np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
  336. np.nansum(result_change_fall), np.nanmean(result_change_fall),
  337. np.nanmin(result_change_fall), np.nanmax(result_change_fall),
  338. len(result_change) / len(result_change_fall)])
  339. print(list_date)
  340. # to_df(list_date)
  341. edtime = dt.now()
  342. print('总耗时:', edtime - starttime)
  343. # df.to_csv(r'C:\Users\Daniel\Documents\策略穷举2.csv', index=True)