# coding:utf-8 from datetime import datetime as dt import os import pandas as pd # from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback # from xtquant.xttype import StockAccount # from xtquant import xtdata, xtconstant import time from sqlalchemy import create_engine from jqdatasdk import * import pymysql import multiprocessing as mp import math import talib as ta pd.set_option('display.max_columns', None) # 设置显示最大行 def err_call_back(err): print(f'出错啦~ error:{str(err)}') def myself_kdj(df): low_list = df['low'].rolling(9, min_periods=9).min() low_list.fillna(value=df['low'].expanding().min(), inplace=True) high_list = df['high'].rolling(9, min_periods=9).max() high_list.fillna(value=df['high'].expanding().max(), inplace=True) rsv = (df['close'] - low_list) / (high_list - low_list) * 100 df['k'] = pd.DataFrame(rsv).ewm(com=2).mean() df['d'] = df['k'].ewm(com=2).mean() df['j'] = 3 * df['k'] - 2 * df['d'] return df # macd指标 def get_macd_data(data, short=0, long1=0, mid=0): if short == 0: short = 12 if long1 == 0: long1 = 26 if mid == 0: mid = 9 data['sema'] = pd.Series(data['close']).ewm(span=short).mean() data['lema'] = pd.Series(data['close']).ewm(span=long1).mean() data.fillna(0, inplace=True) data['dif'] = data['sema'] - data['lema'] data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean() data['macd'] = 2 * (data['dif'] - data['dea']) data.fillna(0, inplace=True) return data[['dif', 'dea', 'macd']] # rsi指标 # 建议用talib库的RSI方法,亲测有用 def get_ris(data): data["rsi_6"] = ta.RSI(data['close'], timeperiod=6) data["rsi_12"] = ta.RSI(data['close'], timeperiod=12) data["rsi_24"] = ta.RSI(data['close'], timeperiod=24) def get_bias(df): # 计算方法: # bias指标 # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100% df['bias_6'] = (df['close'] - df['close'].rolling(6, min_periods=1).mean()) / df['close'].rolling(6, min_periods=1).mean() * 100 df['bias_12'] = (df['close'] - df['close'].rolling(12, min_periods=1).mean()) / df['close'].rolling(12, min_periods=1).mean() * 100 df['bias_24'] = (df['close'] - df['close'].rolling(24, min_periods=1).mean()) / df['close'].rolling(24, min_periods=1).mean() * 100 df['bias_6'] = round(df['bias_6'], 2) df['bias_12'] = round(df['bias_12'], 2) df['bias_24'] = round(df['bias_24'], 2) def get_wilr(df): # 威廉指标 # 建议用talib库的WILLR方法,亲测有用 df['willr'] = ta.WILLR(df['high'], df['low'], df['close'], timeperiod=14) def tech_anal(datas): engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8') engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8') for stock in datas: stock = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ') df = pd.read_sql_table('%s_1d' % stock, con=engine) get_macd_data(df) get_ris(df) get_bias(df) get_wilr(df) df = df.reset_index(drop=True) df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='append') # with engine.connect() as con: # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock) print(stock, df) if __name__ == '__main__': sttime = dt.now() engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8') stock_pool = pd.read_sql_query('select securities from stocks_list', engine_hlfx_pool) stock_pool = stock_pool.iloc[-1, 0].split(",") stock_pool.sort() print(len(stock_pool)) pool = mp.Pool(processes=mp.cpu_count()) # step = math.ceil(len(stock_pool) / mp.cpu_count()) step = 100 for i in range(0, len(stock_pool), step): pool.apply_async(func=tech_anal, args=(stock_pool[i:i+step],), error_callback=err_call_back) pool.close() pool.join() edtime = dt.now() print(edtime-sttime)