qmt_get_indicators.py 14 KB


  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import numpy as np
  4. import os
  5. import pandas as pd
  6. import time
  7. from sqlalchemy import create_engine, text
  8. from jqdatasdk import *
  9. import pymysql
  10. import multiprocessing as mp
  11. import math
  12. import talib as ta
  13. from xtquant import xtdata
  14. import os
  15. import traceback
  16. from apscheduler.schedulers.blocking import BlockingScheduler
  17. import psutil
  18. pd.set_option('display.max_columns', None) # 设置显示最大行
  19. def err_call_back(err):
  20. print(f'出错啦~ error:{str(err)}')
  21. traceback.print_exc()
  22. def myself_kdj(df):
  23. low_list = df['low_back'].rolling(9, min_periods=9).min()
  24. low_list.fillna(value=df['low_back'].expanding().min(), inplace=True)
  25. high_list = df['high_back'].rolling(9, min_periods=9).max()
  26. high_list.fillna(value=df['high_back'].expanding().max(), inplace=True)
  27. rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100
  28. df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
  29. df['d'] = df['k'].ewm(com=2).mean()
  30. df['j'] = 3 * df['k'] - 2 * df['d']
  31. return df
  32. # macd指标
  33. def get_macd_data(data, short=0, long1=0, mid=0):
  34. if short == 0:
  35. short = 12
  36. if long1 == 0:
  37. long1 = 26
  38. if mid == 0:
  39. mid = 9
  40. data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean()
  41. data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean()
  42. data.fillna(0, inplace=True)
  43. data['dif'] = data['sema'] - data['lema']
  44. data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
  45. data['macd'] = 2 * (data['dif'] - data['dea'])
  46. data.fillna(0, inplace=True)
  47. # return data[['dif', 'dea', 'macd']]
  48. # rsi指标
  49. def get_ris(data):
  50. data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
  51. data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
  52. data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24)
  53. def get_bias(data):
  54. # 计算方法:
  55. # bias指标
  56. # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
  57. data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \
  58. data['close_back'].rolling(6, min_periods=1).mean() * 100
  59. data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \
  60. data['close_back'].rolling(12, min_periods=1).mean() * 100
  61. data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \
  62. data['close_back'].rolling(24, min_periods=1).mean() * 100
  63. data['bias_6'] = round(data['bias_6'], 2)
  64. data['bias_12'] = round(data['bias_12'], 2)
  65. data['bias_24'] = round(data['bias_24'], 2)
  66. def get_wilr(data):
  67. # 威廉指标
  68. # 建议用talib库的WILLR方法,亲测有用
  69. data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14)
  70. def get_hlfx(data):
  71. Trading_signals = 0
  72. data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']]
  73. data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
  74. df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
  75. # 先处理去包含
  76. for i in data_temp.index:
  77. if i == 0 or i == 1:
  78. df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
  79. # 不包含
  80. elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
  81. and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
  82. or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
  83. and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
  84. df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
  85. # 包含
  86. else:
  87. # 左高,下降
  88. if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
  89. df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  90. df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  91. else:
  92. # 右高,上升
  93. df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  94. df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  95. # print('111', df_day, data_temp)
  96. if len(df_day.index) > 2:
  97. # 寻找顶底分型
  98. for x in range(2, len(df_day.index)):
  99. m = x - 1
  100. # 底
  101. # 符合底分型形态,且第2、3根k线是阳线
  102. if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
  103. (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
  104. # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
  105. # df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
  106. df_day.loc[x, 'HL'] = 'L*'
  107. while m:
  108. if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  109. if (x - m) > 3:
  110. # 成笔——>L
  111. df_day.loc[x, 'HL'] = 'L'
  112. # 产生信号,进入hlfx_pool
  113. if x == len(df_day.index) - 1:
  114. Trading_signals = 1
  115. else:
  116. # 不成笔 次级别中枢,保持L* 修订原H为H*
  117. df_day.loc[m, 'HL'] = 'H*'
  118. break
  119. elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  120. if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
  121. # 前一个为底更高,且中间不存在更低的底
  122. df_day.loc[x, 'HL'] = 'L'
  123. df_day.loc[m, 'HL'] = '-'
  124. # 产生信号,进入hlfx_pool
  125. if x == len(df_day.index) - 1:
  126. Trading_signals = 1
  127. # 获得MACD,判断MACD判断背驰
  128. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  129. data_temp.loc[x, 'macd']
  130. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  131. data_temp.loc[m, 'macd']
  132. # MACD底背驰
  133. if m_macd_dif < x_macd_dif:
  134. # 次级别背驰底->LL
  135. df_day.loc[x, 'HL'] = 'LL'
  136. break
  137. else:
  138. # 前底更低,本底无效
  139. df_day.loc[x, 'HL'] = '-'
  140. break
  141. m = m - 1
  142. if m == 0:
  143. df_day.loc[x, 'HL'] = 'L'
  144. # 顶
  145. elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
  146. df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
  147. df_day.loc[x, 'HL'] = 'H*'
  148. while m:
  149. if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  150. if x - m > 3:
  151. # 成笔->H
  152. df_day.loc[x, 'HL'] = 'H'
  153. # 产生信号,进入hlfx_pool
  154. if x == len(df_day.index) - 1:
  155. Trading_signals = 2
  156. else:
  157. # 不成笔 次级别中枢,保持H* 修订原L为L*
  158. df_day.loc[m, 'HL'] = 'L*'
  159. break
  160. elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  161. if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
  162. # 前一个为顶,且中间存在不包含 or 更高的顶
  163. df_day.loc[x, 'HL'] = 'H'
  164. df_day.loc[m, 'HL'] = '-'
  165. # 产生信号,进入hlfx_pool
  166. if x == len(df_day.index) - 1:
  167. Trading_signals = 2
  168. # 获得MACD,判断MACD判断背驰
  169. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  170. data_temp.loc[x, 'macd']
  171. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  172. data_temp.loc[m, 'macd']
  173. # MACD顶背驰
  174. if x_macd_dif < m_macd_dif:
  175. # 次级别背驰底->HH
  176. df_day.loc[x, 'HL'] = 'HH'
  177. break
  178. else:
  179. # 前顶更高,本顶无效
  180. df_day.loc[x, 'HL'] = '-'
  181. break
  182. m = m - 1
  183. if m == 0:
  184. df_day.loc[x, 'HL'] = 'H'
  185. else:
  186. df_day.loc[x, 'HL'] = '-'
  187. df_temp = df_day[['time', 'HL']]
  188. return df_temp, Trading_signals
  189. def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
  190. print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},池子长度为{len(stocks)}')
  191. m = 0
  192. for stock in stocks:
  193. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
  194. pool_size=1, pool_recycle=7200, max_overflow=1000, pool_timeout=60)
  195. engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
  196. pool_size=1, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
  197. # print(stock)
  198. try:
  199. df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
  200. df.dropna(axis=0, how='any')
  201. engine.dispose()
  202. except BaseException:
  203. print(f'{stock}读取有问题')
  204. traceback.print_exc()
  205. pass
  206. else:
  207. if len(df) != 0:
  208. try:
  209. get_macd_data(df)
  210. get_ris(df)
  211. get_bias(df)
  212. get_wilr(df)
  213. df_temp, T_signals = get_hlfx(df)
  214. df = pd.merge(df, df_temp, on='time', how='left')
  215. df['HL'].fillna(value='-', inplace=True)
  216. df = df.reset_index(drop=True)
  217. # print(stock, '\n', df[['open_front', 'HL']])
  218. df = df.replace([np.inf, -np.inf], np.nan)
  219. df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
  220. engine_tech.dispose()
  221. # with engine.connect() as con:
  222. # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
  223. except BaseException:
  224. print(f'{stock}存储有问题')
  225. traceback.print_exc()
  226. err_list.append(stock)
  227. pass
  228. else:
  229. # print(f"{stock} 成功!")
  230. m += 1
  231. else:
  232. err_list.append(stock)
  233. print(f'{stock}数据为空')
  234. if stock in hlfx_pool and T_signals == 2:
  235. hlfx_pool.remove(stock)
  236. elif stock not in hlfx_pool and T_signals == 1:
  237. hlfx_pool.append(stock)
  238. hlfx_pool_daily.append(stock)
  239. print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
  240. def ind():
  241. sttime = dt.now()
  242. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  243. print(len(stocks))
  244. stocks.sort()
  245. err_list = mp.Manager().list()
  246. fre = '1d'
  247. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
  248. pool_size=1, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
  249. hlfx_pool = mp.Manager().list()
  250. hlfx_pool_daily = mp.Manager().list()
  251. hlfx_pool.extend(pd.read_sql_query(
  252. text("select value from %s" % fre), engine_hlfx_pool.connect()).iloc[-1, 0].split(","))
  253. pool = mp.Pool(processes=int(mp.cpu_count()))
  254. step = math.ceil(len(stocks) / mp.cpu_count())
  255. # pool = mp.Pool(processes=16)
  256. # step = math.ceil(len(stocks) / 16)
  257. # step = 10000
  258. # tech_anal(stocks, hlfx_pool)
  259. for i in range(0, len(stocks), step):
  260. pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, hlfx_pool_daily, err_list,),
  261. error_callback=err_call_back)
  262. time.sleep(5)
  263. pool.close()
  264. pool.join()
  265. engine_hlfx_pool.dispose()
  266. print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
  267. print(len(err_list), err_list)
  268. results_list = ','.join(set(hlfx_pool))
  269. results_list_daily = ','.join(set(hlfx_pool_daily))
  270. # 存档入库
  271. db_pool = pymysql.connect(host='localhost',
  272. user='root',
  273. port=3307,
  274. password='r6kEwqWU9!v3',
  275. database='hlfx_pool')
  276. cursor_pool = db_pool.cursor()
  277. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  278. cursor_pool.execute(sql)
  279. db_pool.commit()
  280. # 存档入库daily_1d
  281. db_pool2 = pymysql.connect(host='localhost',
  282. user='root',
  283. port=3307,
  284. password='r6kEwqWU9!v3',
  285. database='hlfx_pool')
  286. cursor_pool2 = db_pool2.cursor()
  287. sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  288. results_list_daily)
  289. cursor_pool2.execute(sql2)
  290. db_pool2.commit()
  291. edtime = dt.now()
  292. print(edtime - sttime)
  293. if __name__ == '__main__':
  294. pus = psutil.Process()
  295. # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
  296. # ind()
  297. scheduler = BlockingScheduler()
  298. scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
  299. timezone="Asia/Shanghai", max_instances=10)
  300. try:
  301. scheduler.start()
  302. except (KeyboardInterrupt, SystemExit):
  303. pass