qmt_get_indicators.py 14 KB


  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import numpy as np
  4. import os
  5. import pandas as pd
  6. import time
  7. from sqlalchemy import create_engine
  8. from jqdatasdk import *
  9. import pymysql
  10. import multiprocessing as mp
  11. import math
  12. import talib as ta
  13. from xtquant import xtdata
  14. import os
  15. import traceback
  16. from apscheduler.schedulers.blocking import BlockingScheduler
  17. import psutil
  18. pd.set_option('display.max_columns', None) # 设置显示最大行
  19. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
  20. def err_call_back(err):
  21. print(f'出错啦~ error:{str(err)}')
  22. traceback.print_exc()
  23. def myself_kdj(df):
  24. low_list = df['low_back'].rolling(9, min_periods=9).min()
  25. low_list.fillna(value=df['low_back'].expanding().min(), inplace=True)
  26. high_list = df['high_back'].rolling(9, min_periods=9).max()
  27. high_list.fillna(value=df['high_back'].expanding().max(), inplace=True)
  28. rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100
  29. df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
  30. df['d'] = df['k'].ewm(com=2).mean()
  31. df['j'] = 3 * df['k'] - 2 * df['d']
  32. return df
  33. # macd指标
  34. def get_macd_data(data, short=0, long1=0, mid=0):
  35. if short == 0:
  36. short = 12
  37. if long1 == 0:
  38. long1 = 26
  39. if mid == 0:
  40. mid = 9
  41. data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean()
  42. data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean()
  43. data.fillna(0, inplace=True)
  44. data['dif'] = data['sema'] - data['lema']
  45. data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
  46. data['macd'] = 2 * (data['dif'] - data['dea'])
  47. data.fillna(0, inplace=True)
  48. # return data[['dif', 'dea', 'macd']]
  49. # rsi指标
  50. # 建议用talib库的RSI方法,亲测有用
  51. def get_ris(data):
  52. data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
  53. data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
  54. data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24)
  55. def get_bias(data):
  56. # 计算方法:
  57. # bias指标
  58. # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
  59. data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \
  60. data['close_back'].rolling(6, min_periods=1).mean() * 100
  61. data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \
  62. data['close_back'].rolling(12, min_periods=1).mean() * 100
  63. data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \
  64. data['close_back'].rolling(24, min_periods=1).mean() * 100
  65. data['bias_6'] = round(data['bias_6'], 2)
  66. data['bias_12'] = round(data['bias_12'], 2)
  67. data['bias_24'] = round(data['bias_24'], 2)
  68. def get_wilr(data):
  69. # 威廉指标
  70. # 建议用talib库的WILLR方法,亲测有用
  71. data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14)
  72. def get_hlfx(data):
  73. Trading_signals = 0
  74. data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']]
  75. data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
  76. df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
  77. # 先处理去包含
  78. for i in data_temp.index:
  79. if i == 0 or i == 1:
  80. df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
  81. # 不包含
  82. elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
  83. and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
  84. or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
  85. and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
  86. df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
  87. # 包含
  88. else:
  89. # 左高,下降
  90. if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
  91. df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  92. df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  93. else:
  94. # 右高,上升
  95. df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  96. df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  97. # print('111', df_day, data_temp)
  98. if len(df_day.index) > 2:
  99. # 寻找顶底分型
  100. for x in range(2, len(df_day.index)):
  101. m = x - 1
  102. # 底
  103. # 符合底分型形态,且第2、3根k线是阳线
  104. if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
  105. (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])) and \
  106. df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
  107. df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
  108. df_day.loc[x, 'HL'] = 'L*'
  109. while m:
  110. if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  111. if (x - m) > 3:
  112. # 成笔——>L
  113. df_day.loc[x, 'HL'] = 'L'
  114. # 产生信号,进入hlfx_pool
  115. if x == len(df_day.index) - 1:
  116. Trading_signals = 1
  117. else:
  118. # 不成笔 次级别中枢,保持L* 修订原H为H*
  119. df_day.loc[m, 'HL'] = 'H*'
  120. break
  121. elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  122. if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
  123. # 前一个为底更高,且中间不存在更低的底
  124. df_day.loc[x, 'HL'] = 'L'
  125. df_day.loc[m, 'HL'] = '-'
  126. # 产生信号,进入hlfx_pool
  127. if x == len(df_day.index) - 1:
  128. Trading_signals = 1
  129. # 获得MACD,判断MACD判断背驰
  130. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  131. data_temp.loc[x, 'macd']
  132. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  133. data_temp.loc[m, 'macd']
  134. # MACD底背驰
  135. if m_macd_dif < x_macd_dif:
  136. # 次级别背驰底->LL
  137. df_day.loc[x, 'HL'] = 'LL'
  138. break
  139. else:
  140. # 前底更低,本底无效
  141. df_day.loc[x, 'HL'] = '-'
  142. break
  143. m = m - 1
  144. if m == 0:
  145. df_day.loc[x, 'HL'] = 'L'
  146. # 顶
  147. elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
  148. df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
  149. df_day.loc[x, 'HL'] = 'H*'
  150. while m:
  151. if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  152. if x - m > 3:
  153. # 成笔->H
  154. df_day.loc[x, 'HL'] = 'H'
  155. # 产生信号,进入hlfx_pool
  156. if x == len(df_day.index) - 1:
  157. Trading_signals = 2
  158. else:
  159. # 不成笔 次级别中枢,保持H* 修订原L为L*
  160. df_day.loc[m, 'HL'] = 'L*'
  161. break
  162. elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  163. if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
  164. # 前一个为顶,且中间存在不包含 or 更高的顶
  165. df_day.loc[x, 'HL'] = 'H'
  166. df_day.loc[m, 'HL'] = '-'
  167. # 产生信号,进入hlfx_pool
  168. if x == len(df_day.index) - 1:
  169. Trading_signals = 2
  170. # 获得MACD,判断MACD判断背驰
  171. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  172. data_temp.loc[x, 'macd']
  173. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  174. data_temp.loc[m, 'macd']
  175. # MACD顶背驰
  176. if x_macd_dif < m_macd_dif:
  177. # 次级别背驰底->HH
  178. df_day.loc[x, 'HL'] = 'HH'
  179. break
  180. else:
  181. # 前顶更高,本顶无效
  182. df_day.loc[x, 'HL'] = '-'
  183. break
  184. m = m - 1
  185. if m == 0:
  186. df_day.loc[x, 'HL'] = 'H'
  187. else:
  188. df_day.loc[x, 'HL'] = '-'
  189. df_temp = df_day[['time', 'HL']]
  190. return df_temp, Trading_signals
  191. def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
  192. print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},池子长度为{len(stocks)}')
  193. engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
  194. m = 0
  195. for stock in stocks:
  196. # print(stock)
  197. try:
  198. df = pd.read_sql_table('%s_1d' % stock, con=engine)
  199. df.dropna(axis=0, how='any')
  200. except BaseException:
  201. print(f'{stock}读取有问题')
  202. traceback.print_exc()
  203. pass
  204. else:
  205. if len(df) != 0:
  206. try:
  207. get_macd_data(df)
  208. get_ris(df)
  209. get_bias(df)
  210. get_wilr(df)
  211. df_temp, T_signals = get_hlfx(df)
  212. df = pd.merge(df, df_temp, on='time', how='left')
  213. df['HL'].fillna(value='-', inplace=True)
  214. df = df.reset_index(drop=True)
  215. # print(stock, '\n', df[['open_front', 'HL']])
  216. df = df.replace([np.inf, -np.inf], np.nan)
  217. df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
  218. # with engine.connect() as con:
  219. # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
  220. except BaseException:
  221. print(f'{stock}存储有问题')
  222. traceback.print_exc()
  223. err_list.append(stock)
  224. pass
  225. else:
  226. # print(f"{stock} 成功!")
  227. m += 1
  228. else:
  229. err_list.append(stock)
  230. print(f'{stock}数据为空')
  231. if stock in hlfx_pool and T_signals == 2:
  232. hlfx_pool.remove(stock)
  233. elif stock not in hlfx_pool and T_signals == 1:
  234. hlfx_pool.append(stock)
  235. hlfx_pool_daily.append(stock)
  236. print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
  237. def ind():
  238. sttime = dt.now()
  239. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  240. print(len(stocks))
  241. stocks.sort()
  242. err_list = mp.Manager().list()
  243. fre = '1d'
  244. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
  245. pool_recycle=3600, pool_pre_ping=True)
  246. hlfx_pool = mp.Manager().list()
  247. hlfx_pool_daily = mp.Manager().list()
  248. hlfx_pool.extend(pd.read_sql_query(
  249. 'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
  250. # pool = mp.Pool(processes=int(mp.cpu_count()/2))
  251. step = math.ceil(len(stocks) / mp.cpu_count())
  252. pool = mp.Pool(processes=18)
  253. # step = math.ceil(len(stocks) / 12)
  254. # step = 10000
  255. x = 1
  256. # tech_anal(stocks, hlfx_pool)
  257. for i in range(0, len(stocks), step):
  258. print(x)
  259. pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, hlfx_pool_daily, err_list,),
  260. error_callback=err_call_back)
  261. x += 1
  262. time.sleep(5)
  263. pool.close()
  264. pool.join()
  265. print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
  266. print(len(err_list), err_list)
  267. results_list = ','.join(set(hlfx_pool))
  268. results_list_daily = ','.join(set(hlfx_pool_daily))
  269. # 存档入库
  270. db_pool = pymysql.connect(host='localhost',
  271. user='root',
  272. port=3307,
  273. password='r6kEwqWU9!v3',
  274. database='hlfx_pool')
  275. cursor_pool = db_pool.cursor()
  276. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  277. cursor_pool.execute(sql)
  278. db_pool.commit()
  279. # 存档入库daily_1d
  280. db_pool2 = pymysql.connect(host='localhost',
  281. user='root',
  282. port=3307,
  283. password='r6kEwqWU9!v3',
  284. database='hlfx_pool')
  285. cursor_pool2 = db_pool2.cursor()
  286. sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  287. results_list_daily)
  288. cursor_pool2.execute(sql2)
  289. db_pool2.commit()
  290. edtime = dt.now()
  291. print(edtime - sttime)
  292. if __name__ == '__main__':
  293. pus = psutil.Process()
  294. # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
  295. # ind()
  296. scheduler = BlockingScheduler()
  297. scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
  298. timezone="Asia/Shanghai")
  299. try:
  300. scheduler.start()
  301. except (KeyboardInterrupt, SystemExit):
  302. pass