5m_data_whole.py 17 KB


  1. from xtquant import xtdata
  2. from datetime import datetime as dt
  3. import pandas as pd
  4. import math
  5. from sqlalchemy import create_engine, text
  6. import multiprocessing as mp
  7. import os
  8. from apscheduler.schedulers.blocking import BlockingScheduler
  9. import traceback
  10. import psutil
  11. import pymysql
  12. import talib as ta
  13. import numpy as np
  14. pd.set_option('display.max_columns', None) # 设置显示最大行
  15. path = 'C:\\qmt\\userdata_mini'
  16. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  17. cpu_count = mp.cpu_count()
  18. def err_call_back(err):
  19. print(f'问题在这里~ error:{str(err)}')
  20. traceback.print_exc()
  21. def err_call_back(err):
  22. print(f'出错啦~ error:{str(err)}')
  23. traceback.print_exc()
  24. def myself_kdj(df):
  25. low_list = df['low_back'].rolling(9, min_periods=9).min()
  26. low_list.fillna(value=df['low_back'].expanding().min(), inplace=True)
  27. high_list = df['high_back'].rolling(9, min_periods=9).max()
  28. high_list.fillna(value=df['high_back'].expanding().max(), inplace=True)
  29. rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100
  30. df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
  31. df['d'] = df['k'].ewm(com=2).mean()
  32. df['j'] = 3 * df['k'] - 2 * df['d']
  33. return df
  34. # macd指标
  35. def get_macd_data(data, short=0, long1=0, mid=0):
  36. if short == 0:
  37. short = 12
  38. if long1 == 0:
  39. long1 = 26
  40. if mid == 0:
  41. mid = 9
  42. data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean()
  43. data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean()
  44. data.fillna(0, inplace=True)
  45. data['dif'] = data['sema'] - data['lema']
  46. data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
  47. data['macd'] = 2 * (data['dif'] - data['dea'])
  48. data.fillna(0, inplace=True)
  49. # return data[['dif', 'dea', 'macd']]
  50. # rsi指标
  51. def get_ris(data):
  52. data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
  53. data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
  54. data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24)
  55. def get_bias(data):
  56. # 计算方法:
  57. # bias指标
  58. # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
  59. data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \
  60. data['close_back'].rolling(6, min_periods=1).mean() * 100
  61. data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \
  62. data['close_back'].rolling(12, min_periods=1).mean() * 100
  63. data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \
  64. data['close_back'].rolling(24, min_periods=1).mean() * 100
  65. data['bias_6'] = round(data['bias_6'], 2)
  66. data['bias_12'] = round(data['bias_12'], 2)
  67. data['bias_24'] = round(data['bias_24'], 2)
  68. def get_wilr(data):
  69. # 威廉指标
  70. # 建议用talib库的WILLR方法,亲测有用
  71. data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14)
  72. def get_hlfx(data):
  73. Trading_signals = 0
  74. data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']]
  75. data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
  76. df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
  77. # 先处理去包含
  78. for i in data_temp.index:
  79. if i == 0 or i == 1:
  80. df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
  81. # 不包含
  82. elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
  83. and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
  84. or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
  85. and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
  86. df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
  87. # 包含
  88. else:
  89. # 左高,下降
  90. if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
  91. df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  92. df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  93. else:
  94. # 右高,上升
  95. df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
  96. df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
  97. # print('111', df_day, data_temp)
  98. if len(df_day.index) > 2:
  99. # 寻找顶底分型
  100. for x in range(2, len(df_day.index)):
  101. m = x - 1
  102. # 底
  103. # 符合底分型形态,且第2、3根k线是阳线
  104. if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
  105. (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
  106. # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
  107. # df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
  108. df_day.loc[x, 'HL'] = 'L*'
  109. while m:
  110. if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  111. if (x - m) > 3:
  112. # 成笔——>L
  113. df_day.loc[x, 'HL'] = 'L'
  114. # 产生信号,进入hlfx_pool
  115. if x == len(df_day.index) - 1:
  116. Trading_signals = 1
  117. else:
  118. # 不成笔 次级别中枢,保持L* 修订原H为H*
  119. df_day.loc[m, 'HL'] = 'H*'
  120. break
  121. elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  122. if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
  123. # 前一个为底更高,且中间不存在更低的底
  124. df_day.loc[x, 'HL'] = 'L'
  125. df_day.loc[m, 'HL'] = '-'
  126. # 产生信号,进入hlfx_pool
  127. if x == len(df_day.index) - 1:
  128. Trading_signals = 1
  129. # 获得MACD,判断MACD判断背驰
  130. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  131. data_temp.loc[x, 'macd']
  132. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  133. data_temp.loc[m, 'macd']
  134. # MACD底背驰
  135. if m_macd_dif < x_macd_dif:
  136. # 次级别背驰底->LL
  137. df_day.loc[x, 'HL'] = 'LL'
  138. break
  139. else:
  140. # 前底更低,本底无效
  141. df_day.loc[x, 'HL'] = '-'
  142. break
  143. m = m - 1
  144. if m == 0:
  145. df_day.loc[x, 'HL'] = 'L'
  146. # 顶
  147. elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
  148. df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
  149. df_day.loc[x, 'HL'] = 'H*'
  150. while m:
  151. if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  152. if x - m > 3:
  153. # 成笔->H
  154. df_day.loc[x, 'HL'] = 'H'
  155. # 产生信号,进入hlfx_pool
  156. if x == len(df_day.index) - 1:
  157. Trading_signals = 2
  158. else:
  159. # 不成笔 次级别中枢,保持H* 修订原L为L*
  160. df_day.loc[m, 'HL'] = 'L*'
  161. break
  162. elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  163. if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
  164. # 前一个为顶,且中间存在不包含 or 更高的顶
  165. df_day.loc[x, 'HL'] = 'H'
  166. df_day.loc[m, 'HL'] = '-'
  167. # 产生信号,进入hlfx_pool
  168. if x == len(df_day.index) - 1:
  169. Trading_signals = 2
  170. # 获得MACD,判断MACD判断背驰
  171. x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
  172. data_temp.loc[x, 'macd']
  173. m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
  174. data_temp.loc[m, 'macd']
  175. # MACD顶背驰
  176. if x_macd_dif < m_macd_dif:
  177. # 次级别背驰底->HH
  178. df_day.loc[x, 'HL'] = 'HH'
  179. break
  180. else:
  181. # 前顶更高,本顶无效
  182. df_day.loc[x, 'HL'] = '-'
  183. break
  184. m = m - 1
  185. if m == 0:
  186. df_day.loc[x, 'HL'] = 'H'
  187. else:
  188. df_day.loc[x, 'HL'] = '-'
  189. df_temp = df_day[['time', 'HL']]
  190. return df_temp, Trading_signals
  191. def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
  192. print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},池子长度为{len(stocks)}')
  193. m = 0
  194. for stock in stocks:
  195. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/5m_stocks_whole?charset=utf8',
  196. pool_size=1, pool_recycle=7200, max_overflow=1000, pool_timeout=60)
  197. engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/5m_stocks_tech?charset=utf8',
  198. pool_size=1, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
  199. # print(stock)
  200. try:
  201. df = pd.read_sql_table('%s_5m' % stock, con=engine.connect())
  202. df.dropna(axis=0, how='any')
  203. engine.dispose()
  204. except BaseException:
  205. print(f'{stock}读取有问题')
  206. traceback.print_exc()
  207. pass
  208. else:
  209. if len(df) != 0:
  210. try:
  211. get_macd_data(df)
  212. get_ris(df)
  213. get_bias(df)
  214. get_wilr(df)
  215. df_temp, T_signals = get_hlfx(df)
  216. df = pd.merge(df, df_temp, on='time', how='left')
  217. df['HL'].fillna(value='-', inplace=True)
  218. df = df.reset_index(drop=True)
  219. # print(stock, '\n', df[['open_front', 'HL']])
  220. df = df.replace([np.inf, -np.inf], np.nan)
  221. df.to_sql('%s_5m' % stock, con=engine_tech, index=False, if_exists='replace')
  222. engine_tech.dispose()
  223. # with engine.connect() as con:
  224. # con.execute("ALTER TABLE `%s_5m` ADD PRIMARY KEY (`time`);" % stock)
  225. except BaseException as e:
  226. print(f'{stock}存储有问题', e)
  227. traceback.print_exc()
  228. err_list.append(stock)
  229. pass
  230. else:
  231. # print(f"{stock} 成功!")
  232. m += 1
  233. else:
  234. err_list.append(stock)
  235. print(f'{stock}数据为空')
  236. if stock in hlfx_pool and T_signals == 2:
  237. hlfx_pool.remove(stock)
  238. elif stock not in hlfx_pool and T_signals == 1:
  239. hlfx_pool.append(stock)
  240. hlfx_pool_daily.append(stock)
  241. print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
  242. def ind():
  243. sttime = dt.now()
  244. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  245. print(len(stocks))
  246. stocks.sort()
  247. err_list = mp.Manager().list()
  248. fre = '5m'
  249. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
  250. pool_size=1, pool_recycle=3600, max_overflow=1000, pool_timeout=60)
  251. hlfx_pool = mp.Manager().list()
  252. hlfx_pool_daily = mp.Manager().list()
  253. hlfx_pool.extend(pd.read_sql_query(
  254. text("select value from %s" % fre), engine_hlfx_pool.connect()).iloc[-1, 0].split(","))
  255. # pool = mp.Pool(processes=int(mp.cpu_count()))
  256. # step = math.ceil(len(stocks) / mp.cpu_count())
  257. pool = mp.Pool(processes=12)
  258. step = math.ceil(len(stocks) / 12)
  259. # step = 10000
  260. # tech_anal(stocks, hlfx_pool)
  261. for i in range(0, len(stocks), step):
  262. pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, hlfx_pool_daily, err_list,),
  263. error_callback=err_call_back)
  264. pool.close()
  265. pool.join()
  266. engine_hlfx_pool.dispose()
  267. print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
  268. print(len(err_list), err_list)
  269. results_list = ','.join(set(hlfx_pool))
  270. results_list_daily = ','.join(set(hlfx_pool_daily))
  271. # 存档入库
  272. db_pool = pymysql.connect(host='localhost',
  273. user='root',
  274. port=3307,
  275. password='r6kEwqWU9!v3',
  276. database='hlfx_pool')
  277. cursor_pool = db_pool.cursor()
  278. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  279. cursor_pool.execute(sql)
  280. db_pool.commit()
  281. # 存档入库daily_5m
  282. db_pool2 = pymysql.connect(host='localhost',
  283. user='root',
  284. port=3307,
  285. password='r6kEwqWU9!v3',
  286. database='hlfx_pool')
  287. cursor_pool2 = db_pool2.cursor()
  288. sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  289. results_list_daily)
  290. cursor_pool2.execute(sql2)
  291. db_pool2.commit()
  292. edtime = dt.now()
  293. print(edtime - sttime)
  294. def to_sql(stock_list):
  295. print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}')
  296. m = 0
  297. for stock in stock_list:
  298. eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/5m_stocks_whole?charset=utf8',
  299. pool_recycle=3600, pool_pre_ping=True, pool_size=1)
  300. # 后复权数据
  301. data_back = xtdata.get_market_data(field, [stock], '5m', end_time='', count=-1, dividend_type='back')
  302. df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  303. 'amount']], axis=1)
  304. df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
  305. df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  306. df_back.reset_index(drop=True, inplace=True)
  307. # 前复权数据
  308. data_front = xtdata.get_market_data(field, [stock], '5m', end_time='', count=-1, dividend_type='front')
  309. df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  310. 'amount']], axis=1)
  311. df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
  312. 'amount_front']
  313. df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  314. df = pd.merge_asof(df_back, df_front, 'time')
  315. # print(df)
  316. try:
  317. # eng_w.connect().execute(text("truncate table `%s_5m`" % stock))
  318. df.to_sql('%s_5m' % stock, con=eng_w, index=False, if_exists='replace', chunksize=5000)
  319. except BaseException as e:
  320. print(stock, e)
  321. pass
  322. else:
  323. m += 1
  324. eng_w.dispose()
  325. print(f'Pid:{os.getpid()}已经完工了.应入库{len(stock_list)},共入库{m}支个股')
  326. def download_data():
  327. stock_list = xtdata.get_stock_list_in_sector('沪深A股')
  328. stock_list.sort()
  329. print(dt.now(), '开始下载!')
  330. # xtdata.download_history_data2(stock_list=stock_list, period='5m', start_time='', end_time='')
  331. print(dt.now(), '下载完成,准备入库!')
  332. # step = math.ceil(len(stock_list) / mp.cpu_count())
  333. # pool = mp.Pool(processes=mp.cpu_count())
  334. # pool = mp.Pool(processes=12)
  335. # step = math.ceil(len(stock_list) / 12)
  336. # for i in range(0, len(stock_list), step):
  337. # pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
  338. # pool.close()
  339. # pool.join()
  340. ind()
  341. print(f'今日数据下载完毕 {dt.now()}')
  342. if __name__ == '__main__':
  343. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  344. cpu_count = mp.cpu_count()
  345. pus = psutil.Process()
  346. pus.cpu_affinity([8, 9, 10, 11, 16, 17, 18, 19, 20, 21, 22, 23])
  347. download_data()
  348. # scheduler = BlockingScheduler()
  349. # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='23', minute='05',
  350. # timezone="Asia/Shanghai", max_instances=10)
  351. # try:
  352. # scheduler.start()
  353. # except (KeyboardInterrupt, SystemExit):
  354. # pass