from time import sleep import os import pandas as pd import talib as ta import numpy as np from datetime import datetime as dt class myind: def __init__(self, name, value): self.name = name self.value = value def __repr__(self): return self.name def __call__(self, x): return self.value(x) def myself_kdj(df): low_list = df['low_back'].rolling(9, min_periods=9).min() low_list.fillna(value=df['low_back'].expanding().min(), inplace=True) high_list = df['high_back'].rolling(9, min_periods=9).max() high_list.fillna(value=df['high_back'].expanding().max(), inplace=True) rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100 df['k'] = pd.DataFrame(rsv).ewm(com=2).mean() df['d'] = df['k'].ewm(com=2).mean() df['j'] = 3 * df['k'] - 2 * df['d'] return df # macd指标 def get_macd_data(data, short=0, long1=0, mid=0): if short == 0: short = 12 if long1 == 0: long1 = 26 if mid == 0: mid = 9 data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean() data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean() data.fillna(0, inplace=True) data['dif'] = data['sema'] - data['lema'] data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean() data['macd'] = 2 * (data['dif'] - data['dea']) data.fillna(0, inplace=True) return data[['dif', 'dea', 'macd']] # rsi指标 def get_ris(data): data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6) data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12) data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24) # return data def get_bias(data): # 计算方法: # bias指标 # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100% data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \ data['close_back'].rolling(6, min_periods=1).mean() * 100 data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \ data['close_back'].rolling(12, min_periods=1).mean() * 100 data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \ data['close_back'].rolling(24, min_periods=1).mean() * 100 data['bias_6'] = round(data['bias_6'], 2) data['bias_12'] = round(data['bias_12'], 2) data['bias_24'] = round(data['bias_24'], 2) def get_wilr(data): # 威廉指标 # 建议用talib库的WILLR方法,亲测有用 data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14) def get_hlfx(data): import pandas as pd trading_signals = 0 data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']] data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd'] df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low']) # df_day = pd.DataFrame() # 先处理去包含 for i in data_temp.index: if i == 0 or i == 1: df_day = pd.concat([df_day.copy(), data_temp.loc[i].to_frame().T], ignore_index=True) # 不包含 elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high'] and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \ or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high'] and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']): df_day = pd.concat([df_day.copy(), data_temp.loc[i].to_frame().T], ignore_index=True) # 包含 else: # 左高,下降 if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]: df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high']) df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low']) else: # 右高,上升 df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high']) df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low']) # print('111', df_day, data_temp) df_day['HL'] = np.nan if len(df_day.index) > 2: # 寻找顶底分型 for x in range(2, len(df_day.index)): m = x - 1 # 底 # 符合底分型形态,且第2、3根k线是阳线 if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])): # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \ # df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']: df_day.loc[x, 'HL'] = 'L*' while m: if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']: if (x - m) > 3: # 成笔——>L df_day.loc[x, 'HL'] = 'L' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 1 else: # 不成笔 次级别中枢,保持L* 修订原H为H* df_day.loc[m, 'HL'] = 'H*' break elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']: if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']: # 前一个为底更高,且中间不存在更低的底 df_day.loc[x, 'HL'] = 'L' df_day.loc[m, 'HL'] = '-' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 1 # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \ data_temp.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \ data_temp.loc[m, 'macd'] # MACD底背驰 if m_macd_dif < x_macd_dif: # 次级别背驰底->LL df_day.loc[x, 'HL'] = 'LL' break else: # 前底更低,本底无效 df_day.loc[x, 'HL'] = '-' break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'L' # 顶 elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and ( df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])): df_day.loc[x, 'HL'] = 'H*' while m: if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']: if x - m > 3: # 成笔->H df_day.loc[x, 'HL'] = 'H' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 2 else: # 不成笔 次级别中枢,保持H* 修订原L为L* df_day.loc[m, 'HL'] = 'L*' break elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']: if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']: # 前一个为顶,且中间存在不包含 or 更高的顶 df_day.loc[x, 'HL'] = 'H' df_day.loc[m, 'HL'] = '-' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 2 # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \ data_temp.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \ data_temp.loc[m, 'macd'] # MACD顶背驰 if x_macd_dif < m_macd_dif: # 次级别背驰底->HH df_day.loc[x, 'HL'] = 'HH' break else: # 前顶更高,本顶无效 df_day.loc[x, 'HL'] = '-' break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'H' else: df_day.loc[x, 'HL'] = '-' df_temp = df_day[['time', 'HL']] return df_temp, trading_signals # 帮我优化get_hlfx方法 def get_hlfx_optimization(data): # print(os.getpid(), 'start', len(data)) trading_signals = 0 data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']] data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd'] merged_data = [] prev_kline = data_temp.iloc[0].copy() merged_data.append(prev_kline) # df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL']) # df_day = pd.DataFrame(columns=['time', 'high', 'low', 'HL']) df_day = pd.DataFrame() for i in range(1, len(data_temp)): current_kline = data_temp.iloc[i].copy() # 使用处理过的数据进行判断 last_merged_kline = merged_data[-1] if i == 0 or i == 1 or \ (last_merged_kline['high'] >= current_kline['high'] and last_merged_kline['low'] <= current_kline['low']) or \ (last_merged_kline['high'] <= current_kline['high'] and last_merged_kline['low'] >= current_kline['low']): # 根据前一根K线的走势合并K线 if last_merged_kline['close'] > last_merged_kline['open']: # 前一根K线是上涨的 last_merged_kline['high'] = max(last_merged_kline['high'], current_kline['high']) last_merged_kline['low'] = max(last_merged_kline['low'], current_kline['low']) else: # 前一根K线是下跌的 last_merged_kline['high'] = min(last_merged_kline['high'], current_kline['high']) last_merged_kline['low'] = min(last_merged_kline['low'], current_kline['low']) else: # 保存新的K线 merged_data.append(current_kline) df_day = pd.DataFrame(merged_data).reset_index(drop=True) # 顶底分型 df_day['HL'] = np.nan try: if len(df_day.index) > 2: # 寻找顶底分型 for x in range(2, len(df_day.index)): m = x - 1 # 底 # 符合底分型形态,且第2、3根k线是阳线 if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])): # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \ # df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']: df_day.loc[x, 'HL'] = 'L*' while m: if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']: if (x - m) > 3: # 成笔——>L df_day.loc[x, 'HL'] = 'L' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 1 else: # 不成笔 次级别中枢,保持L* 修订原H为H* df_day.loc[m, 'HL'] = 'H*' break elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']: if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']: # 前一个为底更高,且中间不存在更低的底 df_day.loc[x, 'HL'] = 'L' df_day.loc[m, 'HL'] = '-' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 1 # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \ data_temp.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \ data_temp.loc[m, 'macd'] # MACD底背驰 if m_macd_dif < x_macd_dif: # 次级别背驰底->LL df_day.loc[x, 'HL'] = 'LL' break else: # 前底更低,本底无效 df_day.loc[x, 'HL'] = '-' break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'L' # 顶 elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and ( df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])): df_day.loc[x, 'HL'] = 'H*' while m: if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']: if x - m > 3: # 成笔->H df_day.loc[x, 'HL'] = 'H' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 2 else: # 不成笔 次级别中枢,保持H* 修订原L为L* df_day.loc[m, 'HL'] = 'L*' break elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']: if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']: # 前一个为顶,且中间存在不包含 or 更高的顶 df_day.loc[x, 'HL'] = 'H' df_day.loc[m, 'HL'] = '-' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 2 # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \ data_temp.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \ data_temp.loc[m, 'macd'] # MACD顶背驰 if x_macd_dif < m_macd_dif: # 次级别背驰底->HH df_day.loc[x, 'HL'] = 'HH' break else: # 前顶更高,本顶无效 df_day.loc[x, 'HL'] = '-' break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'H' else: df_day.loc[x, 'HL'] = '-' except BaseException as e: print('errrrr', e) df_temp = df_day[['time', 'HL']] return df_temp, trading_signals def get_ddfx(data, data_temp, u): df_day = data if len(df_day.index) > 2: # 寻找顶底分型 for x in range(2, len(df_day.index)): m = x - 1 # 底 # 符合底分型形态,且第2、3根k线是阳线 if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])): # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \ # df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']: df_day.loc[x, 'HL'] = 'L*' while m: if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']: if (x - m) > 3: # 成笔——>L df_day.loc[x, 'HL'] = 'L' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 1 else: # 不成笔 次级别中枢,保持L* 修订原H为H* df_day.loc[m, 'HL'] = 'H*' break elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']: if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']: # 前一个为底更高,且中间不存在更低的底 df_day.loc[x, 'HL'] = 'L' df_day.loc[m, 'HL'] = '-' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 1 # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \ data_temp.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \ data_temp.loc[m, 'macd'] # MACD底背驰 if m_macd_dif < x_macd_dif: # 次级别背驰底->LL df_day.loc[x, 'HL'] = 'LL' break else: # 前底更低,本底无效 df_day.loc[x, 'HL'] = '-' break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'L' # 顶 elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and ( df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])): df_day.loc[x, 'HL'] = 'H*' while m: if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']: if x - m > 3: # 成笔->H df_day.loc[x, 'HL'] = 'H' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 2 else: # 不成笔 次级别中枢,保持H* 修订原L为L* df_day.loc[m, 'HL'] = 'L*' break elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']: if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']: # 前一个为顶,且中间存在不包含 or 更高的顶 df_day.loc[x, 'HL'] = 'H' df_day.loc[m, 'HL'] = '-' # 产生信号,进入hlfx_pool if x == len(df_day.index) - 1: trading_signals = 2 # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \ data_temp.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \ data_temp.loc[m, 'macd'] # MACD顶背驰 if x_macd_dif < m_macd_dif: # 次级别背驰底->HH df_day.loc[x, 'HL'] = 'HH' break else: # 前顶更高,本顶无效 df_day.loc[x, 'HL'] = '-' break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'H' else: df_day.loc[x, 'HL'] = '-' data = df_day print('44444444444444444', u) print(data)