qmt_real_hlfx.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. # coding:utf-8
  2. from jqdatasdk import *
  3. import pandas as pd
  4. import pymysql
  5. from sqlalchemy import create_engine
  6. import threading
  7. from datetime import datetime as dt
  8. from jqdatasdk.technical_analysis import *
  9. from xtquant import xtdata, xtconstant
  10. from xtquant.xttype import StockAccount
  11. from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
  12. import time
  13. import math
  14. import multiprocessing as mp
  15. import os
  16. #原始版本
  17. # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
  18. # auth('18521506014', 'Abc123!@#')
  19. # stocks = list(get_all_securities(['stock'], date=dt.today().strftime('%Y-%m-%d')).index)
  20. # stocks = stocks[0:200]
  21. pd.set_option('display.max_columns', None) # 设置显示最大行
  22. fre = '1d'
  23. class MyXtQuantTraderCallback(XtQuantTraderCallback):
  24. def on_disconnected(self):
  25. """
  26. 连接断开
  27. :return:
  28. """
  29. print(datetime.datetime.now(), '连接断开回调')
  30. def on_stock_order(self, order):
  31. """
  32. 委托回报推送
  33. :param order: XtOrder对象
  34. :return:
  35. """
  36. print(datetime.datetime.now(), '委托回调', order.order_remark)
  37. def on_stock_trade(self, trade):
  38. """
  39. 成交变动推送
  40. :param trade: XtTrade对象
  41. :return:
  42. """
  43. print(datetime.datetime.now(), '成交回调', trade.order_remark)
  44. def on_order_error(self, order_error):
  45. """
  46. 委托失败推送
  47. :param order_error:XtOrderError 对象
  48. :return:
  49. """
  50. # print("on order_error callback")
  51. # print(order_error.order_id, order_error.error_id, order_error.error_msg)
  52. print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
  53. def on_cancel_error(self, cancel_error):
  54. """
  55. 撤单失败推送
  56. :param cancel_error: XtCancelError 对象
  57. :return:
  58. """
  59. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  60. def on_order_stock_async_response(self, response):
  61. """
  62. 异步下单回报推送
  63. :param response: XtOrderResponse 对象
  64. :return:
  65. """
  66. print(f"异步委托回调 {response.order_remark}")
  67. def on_cancel_order_stock_async_response(self, response):
  68. """
  69. :param response: XtCancelOrderResponse 对象
  70. :return:
  71. """
  72. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  73. def on_account_status(self, status):
  74. """
  75. :param response: XtAccountStatus 对象
  76. :return:
  77. """
  78. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  79. def err_call_back(err):
  80. print(f'问题在这里~ error:{str(err)}')
  81. def hlfx(data, stocks, pool_list):
  82. engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
  83. print(stocks)
  84. for qmt_stock in stocks:
  85. # 读取qmt_stocks_whole表-前复权-信息
  86. try:
  87. df_day = pd.read_sql_query(
  88. 'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, dif, dea, macd,HL from `%s_%s`'
  89. % (qmt_stock, fre), engine_stock)
  90. df_day.columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd','HL']
  91. except BaseException:
  92. print(qmt_stock)
  93. pass
  94. else:
  95. # 获得最新价格信息
  96. get_price = data[qmt_stock]
  97. # 调整time时间格式
  98. get_price['time'] = dt.fromtimestamp(get_price['time'] / 1000.0)
  99. # print('成功判定', get_price['time'])
  100. # 先处理去包含
  101. # 不包含
  102. if (df_day.iloc[-1, 3] > get_price['high']
  103. and df_day.iloc[-1, 4] > get_price['low']) \
  104. or (df_day.iloc[-1, 3] < get_price['high']
  105. and df_day.iloc[-1, 4] < get_price['low']):
  106. # print('lalallala', get_price['open'], get_price['lastPrice'], get_price['high'],
  107. # get_price['low'], get_price['volume'], get_price['amount'])
  108. qmt_df = pd.DataFrame(data=[[get_price['time'], get_price['open'], get_price['lastPrice'], get_price['high'],
  109. get_price['low'], get_price['volume'], get_price['amount']]],
  110. columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount'])
  111. # print('qmt_______', qmt_df)
  112. df_day = pd.concat([df_day, qmt_df], ignore_index=True)
  113. # print('不包含,合并完成', df_day)
  114. # 包含
  115. else:
  116. # 左高,下降
  117. if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
  118. df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], get_price['high'])
  119. df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], get_price['low'])
  120. # 右高,上升
  121. else:
  122. df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], get_price['high'])
  123. df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], get_price['low'])
  124. # print('包含', df_day)
  125. # 数合并完成,确认df_day
  126. # print(df_day)
  127. # 寻找顶底分型
  128. x = len(df_day.index)-1
  129. m = x - 1
  130. # 底
  131. if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (
  132. df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
  133. df_day.loc[x, 'HL'] = 'L*'
  134. # 判断底的性质
  135. while m:
  136. if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
  137. if (x - m) > 3:
  138. # 成笔——>L
  139. df_day.loc[x, 'HL'] = 'L'
  140. elif df_day.loc[m, 'HL'] == 'L':
  141. if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
  142. # pool_list.append(qmt_stock)
  143. # 获得MACD,判断MACD判断背驰
  144. x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
  145. df_day.loc[x, 'macd']
  146. m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
  147. df_day.loc[m, 'macd']
  148. # 背驰底->LL
  149. if m_macd_dif < x_macd_dif:
  150. df_day.loc[x, 'HL'] = 'LL'
  151. # 产生信号,进入hlfx_pool
  152. pool_list.append(qmt_stock)
  153. # 前一个为底更高,且中间不存在更低的底
  154. else:
  155. df_day.loc[x, 'HL'] = 'L'
  156. # 产生信号,进入hlfx_pool
  157. pool_list.append(qmt_stock)
  158. break
  159. break
  160. m = m - 1
  161. if m == 0:
  162. df_day.loc[x, 'HL'] = 'L'
  163. # 顶
  164. elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
  165. df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in pool_list):
  166. df_day.loc[x, 'HL'] = 'H*'
  167. while m:
  168. if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
  169. if x - m > 3:
  170. # 成笔->H
  171. df_day.loc[x, 'HL'] = 'H'
  172. # 产生卖出信号,进入hlfx_pool
  173. pool_list.remove(qmt_stock)
  174. break
  175. elif (df_day.loc[m, 'HL'] == 'H'):
  176. if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
  177. # 获得MACD,判断MACD判断背驰
  178. x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
  179. df_day.loc[x, 'macd']
  180. m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
  181. df_day.loc[m, 'macd']
  182. # MACD顶背驰
  183. if x_macd_dif < m_macd_dif:
  184. df_day.loc[x, 'HL'] = 'HH'
  185. # 产生卖出信号,进入hlfx_pool
  186. pool_list.remove(qmt_stock)
  187. # 前一个为顶,且中间存在不包含 or 更高的顶
  188. else:
  189. df_day.loc[x, 'HL'] = 'H'
  190. # 产生卖出信号,进入hlfx_pool
  191. pool_list.remove(qmt_stock)
  192. break
  193. break
  194. m = m - 1
  195. if m == 0:
  196. df_day.loc[x, 'HL'] = 'H'
  197. def bridge(data):
  198. # 连接数据库
  199. '''
  200. db = pymysql.connect(host='localhost',
  201. user='root',
  202. port=3307,
  203. password='r6kEwqWU9!v3',
  204. database='hlfx')
  205. cursor = db.cursor()
  206. cursor.execute("show tables like '%%%s%%' " % fre)
  207. pool_list = [tuple[0] for tuple in cursor.fetchall()]
  208. print('取得 table_list %s' % fre)
  209. '''
  210. '''
  211. 1.获取hlfx_pool中隔夜的标的
  212. 2.将本此的data均分,给到进程池
  213. 3.将data总数据、分配的任务stocklist、hlfx_pool 送入realtime_hlfx中进行计算
  214. 4.将实时刷新的hlfx存入hlfx_pool 以过滤出现顶分型的标的
  215. '''
  216. # 获得hlfx_pool池子
  217. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  218. results = mp.Manager().list()
  219. results.extend(pd.read_sql_query(
  220. 'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
  221. print(results)
  222. to_hlfx_list = []
  223. keys = list(data.keys())
  224. step = math.ceil(len(keys) / (mp.cpu_count()/2))
  225. for i in range(0, len(keys), step):
  226. to_hlfx_list.append([x for x in keys[i:i+step]])
  227. pool = mp.Pool(processes=int(mp.cpu_count()/2))
  228. for m in range(int(mp.cpu_count()/2)):
  229. pool.apply_async(func=hlfx,
  230. args=(data, to_hlfx_list[m], results,), error_callback=err_call_back)
  231. pool.close()
  232. pool.join()
  233. db_pool = pymysql.connect(host='localhost',
  234. user='root',
  235. port=3307,
  236. password='r6kEwqWU9!v3',
  237. database='hlfx_pool')
  238. cursor_pool = db_pool.cursor()
  239. print(set(results))
  240. results_list = ','.join(set(results))
  241. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  242. cursor_pool.execute(sql)
  243. db_pool.commit()
  244. print(f'{dt.now()}写入新的results,hlfx_pool更新')
  245. # hlfx(data, engine_stock, engine_hlfx)
  246. pass
  247. def prepare():
  248. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  249. results = pd.read_sql_query(
  250. 'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(",")
  251. results = [x.replace('XSHG', 'SH').replace('XSHE', 'SZ') for x in results]
  252. print('数据库读取,并转化后缀格式', len(results))
  253. # print(results[0:10])
  254. return results
  255. if __name__ == '__main__':
  256. path = r'c:\\qmt\\userdata_mini'
  257. # 生成session id 整数类型 同时运行的策略不能重复
  258. session_id = int(time.time())
  259. xt_trader = XtQuantTrader(path, session_id)
  260. # 创建资金账号为 800068 的证券账号对象
  261. acc = StockAccount('920000207040', 'SECURITY')
  262. # 创建交易回调类对象,并声明接收回调
  263. callback = MyXtQuantTraderCallback()
  264. xt_trader.register_callback(callback)
  265. # 启动交易线程
  266. xt_trader.start()
  267. # 建立交易连接,返回0表示连接成功
  268. connect_result = xt_trader.connect()
  269. print('建立交易连接,返回0表示连接成功', connect_result)
  270. # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
  271. subscribe_result = xt_trader.subscribe(acc)
  272. print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
  273. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  274. xtdata.subscribe_whole_quote(stocks, callback=bridge)
  275. xtdata.run()
  276. start = dt.now()
  277. while True:
  278. now_date = dt.now()
  279. date_morning_begin = now_date.replace(hour=9, minute=25, second=0)
  280. date_morning_end = now_date.replace(hour=11, minute=31, second=0)
  281. date_afternooe_begin = now_date.replace(hour=13, minute=0, second=0)
  282. date_afternooe_end = now_date.replace(hour=15, minute=0, second=0)
  283. # print(now_date,date_morning_begin,date_morning_end,date_afternooe_begin,date_afternooe_end)
  284. # if date_morning_begin < now_date < date_afternooe_end:
  285. if True:
  286. for fre in ['1d']:
  287. start = dt.now()
  288. stk = locals()
  289. thd = threading.local()
  290. # 进程准备
  291. step = 400
  292. thread_list = []
  293. engine_stock = []
  294. engine_hlfx = []
  295. times_engine = 0
  296. df = get_bars(stocks, count=5, unit=fre,
  297. fields=['date', 'open', 'close', 'high', 'low', 'volume', 'money'], include_now=True, df=True)
  298. print(df, type(df))
  299. print(df.loc['603566.XSHG'])
  300. print(dt.now(), 'get_bars 成功')
  301. exit()
  302. for i in range(0, len(stocks), step):
  303. engine_stock.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/stocks?charset=utf8'))
  304. engine_hlfx.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8'))
  305. thread = threading.Thread(target=hlfx, args=(stocks[i:i + step], engine_stock[times_engine], engine_hlfx[times_engine]))
  306. times_engine = times_engine + 1
  307. thread.start()
  308. thread_list.append(thread)
  309. for thread in thread_list:
  310. thread.join()
  311. db.close()
  312. time = dt.now().strftime('%Y-%m-%d %H:%M:%S')
  313. results_list =','.join(set(results))
  314. print(set(results))
  315. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  316. cursor_pool.execute(sql)
  317. db_pool.commit()
  318. print(fre, '\n', '做多:', len(set(results)), set(results))
  319. print('做空', len(set(results_short)), set(results_short))
  320. end= dt.now()
  321. print('总时长:', (end - start).seconds)
  322. elif now_date>date_afternooe_end:
  323. pass
  324. # print("HLFX_收盘了",now_date)
  325. # break