qmt_get_indicators.py 14 KB


  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import numpy as np
  4. import os
  5. import pandas as pd
  6. import time
  7. from sqlalchemy import create_engine, text
  8. from jqdatasdk import *
  9. import pymysql
  10. import multiprocessing as mp
  11. import math
  12. import talib as ta
  13. from xtquant import xtdata
  14. import os
  15. import traceback
  16. from apscheduler.schedulers.blocking import BlockingScheduler
  17. import psutil
  18. pd.set_option('display.max_columns', None) # 设置显示最大行
  19. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
  20. pool_size=4000, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
  21. def err_call_back(err):
  22. print(f'出错啦~ error:{str(err)}')
  23. traceback.print_exc()
  24. def myself_kdj(df):
  25. low_list = df['low_back'].rolling(9, min_periods=9).min()
  26. low_list.fillna(value=df['low_back'].expanding().min(), inplace=True)
  27. high_list = df['high_back'].rolling(9, min_periods=9).max()
  28. high_list.fillna(value=df['high_back'].expanding().max(), inplace=True)
  29. rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100
  30. df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
  31. df['d'] = df['k'].ewm(com=2).mean()
  32. df['j'] = 3 * df['k'] - 2 * df['d']
  33. return df
  34. # macd指标
  35. def get_macd_data(data, short=0, long1=0, mid=0):
  36. if short == 0:
  37. short = 12
  38. if long1 == 0:
  39. long1 = 26
  40. if mid == 0:
  41. mid = 9
  42. data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean()
  43. data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean()
  44. data.fillna(0, inplace=True)
  45. data['dif'] = data['sema'] - data['lema']
  46. data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
  47. data['macd'] = 2 * (data['dif'] - data['dea'])
  48. data.fillna(0, inplace=True)
  49. # return data[['dif', 'dea', 'macd']]
  50. # rsi指标
  51. # 建议用talib库的RSI方法,亲测有用
  52. def get_ris(data):
  53. data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
  54. data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
  55. data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24)
  56. def get_bias(data):
  57. # 计算方法:
  58. # bias指标
  59. # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
  60. data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \
  61. data['close_back'].rolling(6, min_periods=1).mean() * 100
  62. data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \
  63. data['close_back'].rolling(12, min_periods=1).mean() * 100
  64. data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \
  65. data['close_back'].rolling(24, min_periods=1).mean() * 100
  66. data['bias_6'] = round(data['bias_6'], 2)
  67. data['bias_12'] = round(data['bias_12'], 2)
  68. data['bias_24'] = round(data['bias_24'], 2)
  69. def get_wilr(data):
  70. # 威廉指标
  71. # 建议用talib库的WILLR方法,亲测有用
  72. data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14)
  73. def get_hlfx(data):
  74. Trading_signals = 0
  75. data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']]
  76. data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
  77. df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
  78. # 先处理去包含
  79. for i in data_temp.index:
  80. if i == 0 or i == 1:
  81. df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
  82. # 不包含
  83. elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
  84. and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
  85. or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
  86. and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
  87. df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
  88. # 包含
  89. else:
  90. # 左高,下降
  91. if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
  92. df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  93. df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  94. else:
  95. # 右高,上升
  96. df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  97. df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  98. # print('111', df_day, data_temp)
  99. if len(df_day.index) > 2:
  100. # 寻找顶底分型
  101. for x in range(2, len(df_day.index)):
  102. m = x - 1
  103. # 底
  104. # 符合底分型形态,且第2、3根k线是阳线
  105. if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
  106. (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
  107. # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
  108. # df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
  109. df_day.loc[x, 'HL'] = 'L*'
  110. while m:
  111. if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  112. if (x - m) > 3:
  113. # 成笔——>L
  114. df_day.loc[x, 'HL'] = 'L'
  115. # 产生信号,进入hlfx_pool
  116. if x == len(df_day.index) - 1:
  117. Trading_signals = 1
  118. else:
  119. # 不成笔 次级别中枢,保持L* 修订原H为H*
  120. df_day.loc[m, 'HL'] = 'H*'
  121. break
  122. elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  123. if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
  124. # 前一个为底更高,且中间不存在更低的底
  125. df_day.loc[x, 'HL'] = 'L'
  126. df_day.loc[m, 'HL'] = '-'
  127. # 产生信号,进入hlfx_pool
  128. if x == len(df_day.index) - 1:
  129. Trading_signals = 1
  130. # 获得MACD,判断MACD判断背驰
  131. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  132. data_temp.loc[x, 'macd']
  133. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  134. data_temp.loc[m, 'macd']
  135. # MACD底背驰
  136. if m_macd_dif < x_macd_dif:
  137. # 次级别背驰底->LL
  138. df_day.loc[x, 'HL'] = 'LL'
  139. break
  140. else:
  141. # 前底更低,本底无效
  142. df_day.loc[x, 'HL'] = '-'
  143. break
  144. m = m - 1
  145. if m == 0:
  146. df_day.loc[x, 'HL'] = 'L'
  147. # 顶
  148. elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
  149. df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
  150. df_day.loc[x, 'HL'] = 'H*'
  151. while m:
  152. if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  153. if x - m > 3:
  154. # 成笔->H
  155. df_day.loc[x, 'HL'] = 'H'
  156. # 产生信号,进入hlfx_pool
  157. if x == len(df_day.index) - 1:
  158. Trading_signals = 2
  159. else:
  160. # 不成笔 次级别中枢,保持H* 修订原L为L*
  161. df_day.loc[m, 'HL'] = 'L*'
  162. break
  163. elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  164. if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
  165. # 前一个为顶,且中间存在不包含 or 更高的顶
  166. df_day.loc[x, 'HL'] = 'H'
  167. df_day.loc[m, 'HL'] = '-'
  168. # 产生信号,进入hlfx_pool
  169. if x == len(df_day.index) - 1:
  170. Trading_signals = 2
  171. # 获得MACD,判断MACD判断背驰
  172. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  173. data_temp.loc[x, 'macd']
  174. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  175. data_temp.loc[m, 'macd']
  176. # MACD顶背驰
  177. if x_macd_dif < m_macd_dif:
  178. # 次级别背驰底->HH
  179. df_day.loc[x, 'HL'] = 'HH'
  180. break
  181. else:
  182. # 前顶更高,本顶无效
  183. df_day.loc[x, 'HL'] = '-'
  184. break
  185. m = m - 1
  186. if m == 0:
  187. df_day.loc[x, 'HL'] = 'H'
  188. else:
  189. df_day.loc[x, 'HL'] = '-'
  190. df_temp = df_day[['time', 'HL']]
  191. return df_temp, Trading_signals
  192. def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
  193. print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},池子长度为{len(stocks)}')
  194. m = 0
  195. for stock in stocks:
  196. engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
  197. pool_size=4000, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
  198. # print(stock)
  199. try:
  200. df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
  201. df.dropna(axis=0, how='any')
  202. except BaseException:
  203. print(f'{stock}读取有问题')
  204. traceback.print_exc()
  205. pass
  206. else:
  207. if len(df) != 0:
  208. try:
  209. get_macd_data(df)
  210. get_ris(df)
  211. get_bias(df)
  212. get_wilr(df)
  213. df_temp, T_signals = get_hlfx(df)
  214. df = pd.merge(df, df_temp, on='time', how='left')
  215. df['HL'].fillna(value='-', inplace=True)
  216. df = df.reset_index(drop=True)
  217. # print(stock, '\n', df[['open_front', 'HL']])
  218. df = df.replace([np.inf, -np.inf], np.nan)
  219. df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
  220. # with engine.connect() as con:
  221. # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
  222. except BaseException:
  223. print(f'{stock}存储有问题')
  224. traceback.print_exc()
  225. err_list.append(stock)
  226. pass
  227. else:
  228. # print(f"{stock} 成功!")
  229. m += 1
  230. else:
  231. err_list.append(stock)
  232. print(f'{stock}数据为空')
  233. if stock in hlfx_pool and T_signals == 2:
  234. hlfx_pool.remove(stock)
  235. elif stock not in hlfx_pool and T_signals == 1:
  236. hlfx_pool.append(stock)
  237. hlfx_pool_daily.append(stock)
  238. engine_tech.dispose()
  239. print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
  240. def ind():
  241. sttime = dt.now()
  242. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  243. print(len(stocks))
  244. stocks.sort()
  245. err_list = mp.Manager().list()
  246. fre = '1d'
  247. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
  248. pool_size=4000, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
  249. hlfx_pool = mp.Manager().list()
  250. hlfx_pool_daily = mp.Manager().list()
  251. hlfx_pool.extend(pd.read_sql_query(
  252. text("select value from %s" % fre), engine_hlfx_pool.connect()).iloc[-1, 0].split(","))
  253. pool = mp.Pool(processes=int(mp.cpu_count()))
  254. step = math.ceil(len(stocks) / mp.cpu_count())
  255. # pool = mp.Pool(processes=18)
  256. # step = math.ceil(len(stocks) / 12)
  257. # step = 10000
  258. x = 1
  259. # tech_anal(stocks, hlfx_pool)
  260. for i in range(0, len(stocks), step):
  261. print(x)
  262. pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, hlfx_pool_daily, err_list,),
  263. error_callback=err_call_back)
  264. x += 1
  265. time.sleep(5)
  266. pool.close()
  267. pool.join()
  268. print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
  269. print(len(err_list), err_list)
  270. results_list = ','.join(set(hlfx_pool))
  271. results_list_daily = ','.join(set(hlfx_pool_daily))
  272. # 存档入库
  273. db_pool = pymysql.connect(host='localhost',
  274. user='root',
  275. port=3307,
  276. password='r6kEwqWU9!v3',
  277. database='hlfx_pool')
  278. cursor_pool = db_pool.cursor()
  279. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  280. cursor_pool.execute(sql)
  281. db_pool.commit()
  282. # 存档入库daily_1d
  283. db_pool2 = pymysql.connect(host='localhost',
  284. user='root',
  285. port=3307,
  286. password='r6kEwqWU9!v3',
  287. database='hlfx_pool')
  288. cursor_pool2 = db_pool2.cursor()
  289. sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  290. results_list_daily)
  291. cursor_pool2.execute(sql2)
  292. db_pool2.commit()
  293. edtime = dt.now()
  294. print(edtime - sttime)
  295. if __name__ == '__main__':
  296. pus = psutil.Process()
  297. # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
  298. # ind()
  299. scheduler = BlockingScheduler()
  300. scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
  301. timezone="Asia/Shanghai")
  302. try:
  303. scheduler.start()
  304. except (KeyboardInterrupt, SystemExit):
  305. pass