123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- # coding:utf-8
- from datetime import datetime as dt
- import os
- import pandas as pd
- # from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
- # from xtquant.xttype import StockAccount
- # from xtquant import xtdata, xtconstant
- import time
- from sqlalchemy import create_engine
- from jqdatasdk import *
- import pymysql
- import multiprocessing as mp
- import math
- import talib as ta
- pd.set_option('display.max_columns', None) # 设置显示最大行
- def err_call_back(err):
- print(f'出错啦~ error:{str(err)}')
- def myself_kdj(df):
- low_list = df['low'].rolling(9, min_periods=9).min()
- low_list.fillna(value=df['low'].expanding().min(), inplace=True)
- high_list = df['high'].rolling(9, min_periods=9).max()
- high_list.fillna(value=df['high'].expanding().max(), inplace=True)
- rsv = (df['close'] - low_list) / (high_list - low_list) * 100
- df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
- df['d'] = df['k'].ewm(com=2).mean()
- df['j'] = 3 * df['k'] - 2 * df['d']
- return df
- # macd指标
- def get_macd_data(data, short=0, long1=0, mid=0):
- if short == 0:
- short = 12
- if long1 == 0:
- long1 = 26
- if mid == 0:
- mid = 9
- data['sema'] = pd.Series(data['close']).ewm(span=short).mean()
- data['lema'] = pd.Series(data['close']).ewm(span=long1).mean()
- data.fillna(0, inplace=True)
- data['dif'] = data['sema'] - data['lema']
- data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
- data['macd'] = 2 * (data['dif'] - data['dea'])
- data.fillna(0, inplace=True)
- return data[['dif', 'dea', 'macd']]
- # rsi指标
- # 建议用talib库的RSI方法,亲测有用
- def get_ris(data):
- data["rsi_6"] = ta.RSI(data['close'], timeperiod=6)
- data["rsi_12"] = ta.RSI(data['close'], timeperiod=12)
- data["rsi_24"] = ta.RSI(data['close'], timeperiod=24)
- def get_bias(df):
- # 计算方法:
- # bias指标
- # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
- df['bias_6'] = (df['close'] - df['close'].rolling(6, min_periods=1).mean()) / df['close'].rolling(6,
- min_periods=1).mean() * 100
- df['bias_12'] = (df['close'] - df['close'].rolling(12, min_periods=1).mean()) / df['close'].rolling(12,
- min_periods=1).mean() * 100
- df['bias_24'] = (df['close'] - df['close'].rolling(24, min_periods=1).mean()) / df['close'].rolling(24,
- min_periods=1).mean() * 100
- df['bias_6'] = round(df['bias_6'], 2)
- df['bias_12'] = round(df['bias_12'], 2)
- df['bias_24'] = round(df['bias_24'], 2)
- def get_wilr(df):
- # 威廉指标
- # 建议用talib库的WILLR方法,亲测有用
- df['willr'] = ta.WILLR(df['high'], df['low'], df['close'], timeperiod=14)
- def tech_anal(datas):
- engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
- engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
- for stock in datas:
- stock = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
- df = pd.read_sql_table('%s_1d' % stock, con=engine)
- get_macd_data(df)
- get_ris(df)
- get_bias(df)
- get_wilr(df)
- df = df.reset_index(drop=True)
- df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='append')
- # with engine.connect() as con:
- # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
- print(stock, df)
- if __name__ == '__main__':
- sttime = dt.now()
- engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
- stock_pool = pd.read_sql_query('select securities from stocks_list', engine_hlfx_pool)
- stock_pool = stock_pool.iloc[-1, 0].split(",")
- stock_pool.sort()
- print(len(stock_pool))
- pool = mp.Pool(processes=mp.cpu_count())
- # step = math.ceil(len(stock_pool) / mp.cpu_count())
- step = 100
- for i in range(0, len(stock_pool), step):
- pool.apply_async(func=tech_anal, args=(stock_pool[i:i+step],), error_callback=err_call_back)
- pool.close()
- pool.join()
- edtime = dt.now()
- print(edtime-sttime)
|