123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- # coding:utf-8
- import pandas as pd
- import numpy as np
- from sklearn.preprocessing import MinMaxScaler
- from keras.models import Sequential
- from keras.layers import Dense, LSTM
- from keras.callbacks import EarlyStopping
- import matplotlib.pyplot as plt
- from sqlalchemy import create_engine
- # 加载数据
- engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
- df = pd.read_sql_table('600000.SH_1d', con=engine_tech)
- print(df)
- # 计算MACD指标
- ema12 = df['close_back'].ewm(span=12, adjust=False).mean()
- ema26 = df['close_back'].ewm(span=26, adjust=False).mean()
- macd = ema12 - ema26
- signal = macd.ewm(span=9, adjust=False).mean()
- df['macd'] = macd
- df['signal'] = signal
- # 准备数据
- data = df[['close_back', 'macd', 'signal', 'willr', 'volume_back', 'dif', 'dea', 'rsi_6', 'rsi_12', 'rsi_24']].values
- print(data)
- data = data[~np.isnan(data).any(axis=1), :]
- print('处理后', data)
- scaler = MinMaxScaler(feature_range=(0, 1))
- data = scaler.fit_transform(data)
- train_size = int(len(data) * 0.7)
- train_data = data[:train_size, :]
- test_data = data[train_size:, :]
- def create_dataset(dataset, look_back):
- X, Y = [], []
- for i in range(len(dataset) - look_back - 1):
- X.append(dataset[i:(i + look_back), :])
- Y.append(dataset[i + look_back, 0])
- return np.array(X), np.array(Y)
- look_back = 90
- train_X, train_Y = create_dataset(train_data, look_back)
- test_X, test_Y = create_dataset(test_data, look_back)
- # LSTM模型
- model = Sequential()
- model.add(LSTM(units=50, input_shape=(train_X.shape[1], train_X.shape[2])))
- model.add(Dense(units=1))
- model.compile(optimizer='adam', loss='mean_squared_error')
- # 训练模型
- early_stop = EarlyStopping(monitor='val_loss', patience=10)
- history = model.fit(train_X, train_Y, epochs=100, batch_size=1, validation_data=(test_X, test_Y), callbacks=[early_stop], verbose=2)
- # 绘制损失函数
- plt.plot(history.history['loss'], label='train')
- plt.plot(history.history['val_loss'], label='test')
- plt.legend()
- plt.show()
- # 预测未来30天的涨跌
- last_data = data[-look_back:, :]
- last_data = np.expand_dims(last_data, axis=0)
- predictions = []
- for i in range(30):
- prediction = model.predict(last_data)
- predictions.append(prediction)
- last_data = np.concatenate([last_data[:, 1:, :], prediction.reshape(1, 1, 3)], axis=1)
- predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1))
- # 绘制预测结果
- plt.plot(predictions, label='predicted')
- plt.plot(df['back_close'].tail(30).reset_index(drop=True), label='actual')
- plt.legend()
- plt.show()
|