Daniel 2 роки тому
батько
коміт
786e005278
1 змінених файлів з 92 додано та 66 видалено
  1. 92 66
      futures_backtrader.py

+ 92 - 66
futures_backtrader.py

@@ -1,3 +1,4 @@
+import os
 import numpy as np
 from sqlalchemy import create_engine
 import pandas as pd
@@ -10,6 +11,7 @@ from datetime import datetime as dt
 import multiprocessing as mp
 from backtrader.feeds import PandasData
 
+
 # import multiprocessing
 # import matplotlib
 
@@ -33,12 +35,14 @@ class MyPandasData(PandasData):
               )
     '''
 
+
 class TestStrategy(bt.Strategy):
     params = (
         ("num", 3),
         ('Volatility', 0),
-        ('rate', 5),# 注意要有逗号!!
+        ('rate', 5),  # 注意要有逗号!!
     )
+
     def log(self, txt, dt=None):
         ''' Logging function for this strategy'''
         dt = dt or self.datas[0].datetime.date(0)
@@ -112,19 +116,21 @@ class TestStrategy(bt.Strategy):
         # and (self.net_amount_main[-1] > 0) and (self.net_amount_main[0] > 0)
         if len(self) > self.params.num:
             lowest = np.min(self.low.get(size=self.params.num))
-            highest = np.max(self.high.get(size = self.params.num))
-            vola = self.params.Volatility/100
-            rate = self.params.rate/100
+            highest = np.max(self.high.get(size=self.params.num))
+            vola = self.params.Volatility / 100
+            rate = self.params.rate / 100
             # print(f'{self.params.num}日天最低值:{lowest},波动率为{self.params.Volatility/100}')
             if (self.dataclose[0] > self.dataopen[0]) \
-                    and (((lowest*(1-vola)) < self.low[-2] < (lowest*(1+vola))) or ((lowest*(1-vola)) < self.low[-1] < (lowest*(1+vola))))\
+                    and (((lowest * (1 - vola)) < self.low[-2] < (lowest * (1 + vola))) or (
+                    (lowest * (1 - vola)) < self.low[-1] < (lowest * (1 + vola)))) \
                     and (self.dataclose[0] > self.sma5[0]) and self.sma5[0] > self.sma5[-1] \
                     and (not self.position) and (self.sma5[0] > self.sma10[0]):
                 # self.log('BUY CREATE, %.2f' % self.dataclose[0])
                 self.order = self.buy()
             elif self.dataclose < self.sma5[0] or self.sma5[0] < self.sma10[0] \
-                    or (self.dataclose[0] > (self.sma5[0] * (1+rate))) or \
-                (((highest*(1-vola)) < self.high[-2] < (highest*(1+vola))) or ((highest*(1-vola)) < self.high[-1] < (highest*(1+vola)))):
+                    or (self.dataclose[0] > (self.sma5[0] * (1 + rate))) or \
+                    (((highest * (1 - vola)) < self.high[-2] < (highest * (1 + vola))) or (
+                            (highest * (1 - vola)) < self.high[-1] < (highest * (1 + vola)))):
                 self.order = self.close()
                 # self.log('Close, %.2f' % self.dataclose[0])
 
@@ -133,8 +139,22 @@ class TestStrategy(bt.Strategy):
         self.log(u'(MA趋势交易效果) Ending Value %.2f' % (self.broker.getvalue()))
 
 
+def err_call_back(err):
+    print(f'出错啦~ error:{str(err)}')
+
+
+def to_df(lt):
+    df = pd.DataFrame(list(lt), columns=['周期', '波动率', '乖离率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利',
+                               '最小盈利', '总亏损', '平均亏损', '最大亏损', '最小亏损'])
+    df.sort_values(by=['周期', '波动率', '乖离率'], ascending=True, inplace=True)
+    df = df.reset_index(drop=True)
+    df.to_csv(r'D:\Daniel\策略\策略穷举.csv', index=True, encoding='utf-8', mode='w')
+    print(df)
+
 
-def backtrader(table_list, result, result_change,result_change_fall, num, Volatility, rate,err_list):
+def backtrader(list_date, table_list, result, result_change, result_change_fall, num, Volatility, rate, err_list):
+    print(f'{num}天波动率为{Volatility}%乖离率为{rate}', 'myPID is ', os.getpid())
+    sttime = dt.now()
     engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
     for stock in table_list:
         # print(stock)
@@ -142,12 +162,10 @@ def backtrader(table_list, result, result_change,result_change_fall, num, Volati
         stk_df.time = pd.to_datetime(stk_df.time)
         if len(stk_df) > 60:
             cerebro = bt.Cerebro()
-
-            cerebro.addstrategy(TestStrategy, num=num, Volatility=Volatility,rate=rate)
-
+            cerebro.addstrategy(TestStrategy, num=num, Volatility=Volatility, rate=rate)
             cerebro.addsizer(bt.sizers.FixedSize, stake=10000)
             data = MyPandasData(dataname=stk_df,
-                                fromdate=datetime.datetime(2010,1,1),
+                                fromdate=datetime.datetime(2010, 1, 1),
                                 todate=datetime.datetime(2022, 10, 30),
                                 datetime='time',
                                 open='open',
@@ -168,32 +186,54 @@ def backtrader(table_list, result, result_change,result_change_fall, num, Volati
                                 # net_pct_s='net_pct_s',
                                 )
             # print('取值完成')
-
             cerebro.adddata(data, name=stock)
-
             cerebro.broker.setcash(100000.0)
             cerebro.broker.setcommission(0.005)
             cerebro.addanalyzer(bt.analyzers.PyFolio)
             # 策略执行前的资金
             # print('启动资金: %.2f' % cerebro.broker.getvalue())
             try:
-            # 策略执行
+                # 策略执行
                 cerebro.run()
             except IndexError:
                 err_list.append(stock)
             else:
                 if cerebro.broker.getvalue() > 100000.0:
-                    # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
-                    result_change.append((cerebro.broker.getvalue()/10000-1))
+                    result_change.append((cerebro.broker.getvalue() / 10000 - 1))
                     result.append(stock)
+                    # print('recode!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
+                    # print(result)
                 else:
-                    result_change_fall.append((1-cerebro.broker.getvalue() / 10000))
+                    result_change_fall.append((1 - cerebro.broker.getvalue() / 10000))
                     # print('aaaaaaaaaaa')
-
-
-            # cerebro.plot()
-
+                    # print(result_change_fall)
+
+    if len(result) * len(result_change) * len(result_change_fall) != 0:
+        print(f'以{num}内最低值波动{Volatility}为支撑、乖离率为{rate}%,结果状态为:')
+        print('正盈利的个股为:', len(result_change), '成功率为:', len(result) / len(table_list))
+        print(
+            f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
+        print(
+            f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
+
+        list_date.append([num, Volatility, rate, len(result), len(result) / len(table_list), np.nansum(result_change),
+                          np.nanmean(result_change), np.nanmax(result_change), np.min(result_change),
+                          np.nansum(result_change_fall), np.nanmean(result_change_fall),
+                          np.nanmin(result_change_fall), np.nanmax(result_change_fall)])
+        to_df(list_date)
+        endtime = dt.now()
+        print(f'{num}天波动率为{Volatility}%乖离率为{rate},myPID is {os.getpid()}.本轮耗时为{endtime - sttime}')
+    else:
+        print(result, result_change, result_change_fall, num, Volatility, rate, err_list)
+    # cerebro.plot()
+
+
+# df = pd.DataFrame(
+#     columns=['周期', '波动率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损',
+#              '平均亏损', '最大亏损', '最小亏损'])
 if __name__ == '__main__':
+    starttime = dt.now()
+    print(starttime)
     # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8', poolclass=NullPool)
 
     # stocks = pd.read_sql_query(
@@ -209,54 +249,40 @@ if __name__ == '__main__':
     cursor.execute("show tables like '%%%s%%' " % fre)
     table_list = [tuple[0] for tuple in cursor.fetchall()]
     # print(table_list)
-    # table_list = table_list[0:500]
-
-    df = pd.DataFrame(columns=['周期', '波动率', '盈利个数', '盈利比例', '总盈利', '平均盈利', '最大盈利', '最小盈利', '总亏损',
-                               '平均亏损', '最大亏损', '最小亏损'])
+    table_list = table_list[0:100]
 
-
-    sttime = dt.now()
-
-    for num in range(60, 130, 20):
+    list_date = mp.Manager().list()
+    thread_list = []
+    pool = mp.Pool(processes=mp.cpu_count())
+    for num in range(60, 100, 20):
         for Volatility in range(3, 7, 1):
-            for rate in range(7, 10, 1):
+            for rate in range(7, 9, 1):
                 step = math.ceil(len(table_list) / mp.cpu_count())
-                thread_list = []
-                result = mp.Manager().list()
-                result_change = mp.Manager().list()
-                result_change_fall = mp.Manager().list()
-                err_list = mp.Manager().list()
+                result = []
+                result_change = []
+                result_change_fall = []
+                err_list = []
                 print(f'{num}天波动率为{Volatility}%乖离率为{rate}')
-                for i in range(0, len(table_list), step):
-
-
-                    stattime = dt.now()
-                    # thd = threading.local()
-                    # print(i)
-                    p = mp.Process(target=backtrader, args=(table_list[i:i + step], result,
-                                                            result_change, result_change_fall, num, Volatility, rate,err_list))
-                    thread_list.append(p)
-                    # p.start()
-                    # p.join()
-                    # print(thread_list)
-                for thread in thread_list:
-                    thread.start()
-                for thread in thread_list:
-                    thread.join()
+                # for i in range(0, len(table_list), step):
+                stattime = dt.now()
+                # thd = threading.local()
+                # print(i)
+                # p = mp.Process(target=backtrader, args=(df, table_list, result, result_change, result_change_fall,
+                #                                         num, Volatility, rate, err_list))
+                # thread_list.append(p)
+                pool.apply_async(func=backtrader,
+                                 args=(list_date, table_list, result, result_change, result_change_fall,
+                                       num, Volatility, rate, err_list,), error_callback=err_call_back)
+                # p.start()
+                # p.join()
+                # print(thread_list)
+    # for thread in thread_list:
+    #     thread.start()
+    # for thread in thread_list:
+    #     thread.join()
+    pool.close()
+    pool.join()
 
-                print(f'以{num}内最低值波动{Volatility}为支撑、乖离率为{rate}%,结果状态为:')
-                print('正盈利的个股为:', len(result_change), '成功率为:', len(result)/len(table_list))
-                print(f'总盈利:{np.sum(result_change)} 平均盈利:{np.mean(result_change)},最大盈利:{np.max(result_change)}, 最小盈利:{np.min(result_change)}')
-                print(
-                    f'总亏损:{np.sum(result_change_fall)},平均亏损:{np.mean(result_change_fall)},最大亏损:{np.min(result_change_fall)} 最小亏损:{np.max(result_change_fall)}')
-                endtime = dt.now()
-                df.loc[len(df)] = [num, Volatility, len(result), len(result)/len(table_list), np.sum(result_change),
-                                                np.mean(result_change), np.max(result_change), np.min(result_change),
-                                                np.sum(result_change_fall), np.mean(result_change_fall),
-                                                np.min(result_change_fall), np.max(result_change_fall)]
-                print(df)
-                print('每轮耗时:', endtime-stattime)
-                df.to_csv(r'D:\Daniel\策略\策略穷举.csv', index=True)
     edtime = dt.now()
-    print('总耗时:', edtime - sttime)
+    print('总耗时:', edtime - starttime)
     # df.to_csv(r'C:\Users\Daniel\Documents\策略穷举2.csv', index=True)