real_time.py 11 KB


  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import os
  4. import pandas as pd
  5. from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
  6. from xtquant.xttype import StockAccount
  7. from xtquant import xtdata, xtconstant
  8. import time
  9. from sqlalchemy import create_engine
  10. from jqdatasdk import *
  11. import pymysql
  12. import multiprocessing as mp
  13. import math
  14. auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
  15. db_pool = pymysql.connect(host='localhost',
  16. user='root',
  17. port=3307,
  18. password='r6kEwqWU9!v3',
  19. database='hlfx_pool')
  20. cursor_pool = db_pool.cursor()
  21. engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_front?charset=utf8')
  22. def real_price(datas):
  23. # i = '000001.SZ'
  24. for i in datas:
  25. if i == '000001.SZ':
  26. print(i, datas[i])
  27. # trader(datas)
  28. # return datas
  29. def ma(stock, num, data):
  30. global engine_stock
  31. try:
  32. i = (num - 1) * -1
  33. df = pd.read_sql_query(
  34. 'select close from `%s_1d`' % stock, engine_stock)
  35. except:
  36. return 9999999
  37. else:
  38. ma_num = (sum(df['close'][i:]) + data[stock]['lastPrice'])/num
  39. return ma_num
  40. def ma_1(stock, num):
  41. global engine_stock
  42. i = (num) * -1
  43. try:
  44. df = pd.read_sql_query(
  45. 'select close from `%s_1d`' % stock, engine_stock)
  46. except BaseException:
  47. return 9999999
  48. else:
  49. ma_num_1 = df['close'][i:].mean()
  50. return ma_num_1
  51. def his_vol(stock, num):
  52. global engine_stock
  53. num = num * -1
  54. try:
  55. df = pd.read_sql_query(
  56. 'select volume from `%s_1d`' % stock, engine_stock)
  57. except BaseException:
  58. return 9999999
  59. else:
  60. return df['volume'].iloc[num]
  61. def ma_judge(data, stock_list, results):
  62. print('这个ma_judge的PID为:', os.getpid())
  63. for stock in data:
  64. i = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
  65. current_price, open_price = data[i]['lastPrice'], data[i]['open']
  66. MA5, MA10, MA20 = ma(i, 5, data), ma(i, 10, data), ma(i, 20, data)
  67. MA5_1 = ma_1(i, 5)
  68. # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
  69. if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
  70. MA20 < MA10):
  71. if his_vol(i, -1) > his_vol(i, -2):
  72. results.append(i.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
  73. print('RRRRRRR,', results)
  74. def sell_trader(data, positions_dict):
  75. # for m in data:
  76. # print(m, data[m]['lastPrice'])
  77. print('卖出函数:', dt.now())
  78. # positions = xt_trader.query_stock_positions(acc)
  79. # print('持仓总数:', len(positions_list))
  80. for stock, volume in positions_dict.items():
  81. print('持仓', stock, volume)
  82. current_price = data[stock]['lastPrice']
  83. open_price = data[stock]['open']
  84. MA5 = ma(stock, 5, data)
  85. MA5_1 = ma_1(stock, 5)
  86. print('价格:', current_price, open_price, MA5, MA5_1)
  87. if current_price < MA5 or MA5 < MA5_1 or current_price > MA5 * 1.07:
  88. print('卖出信号!!!!!!', stock, current_price)
  89. order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, volume,
  90. xtconstant.LATEST_PRICE, 0, 'strategy1', 'order_test')
  91. print(order_id, stock, volume)
  92. def buy_trader(data):
  93. print('买入函数:', dt.now())
  94. results = mp.Manager().list()
  95. mp_list = []
  96. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  97. try:
  98. stock_pool = pd.read_sql_query(
  99. 'select value from `%s`' % '1d', engine_hlfx_pool)
  100. stock_pool = stock_pool.iloc[-1, 0].split(",")
  101. stock_pool.sort()
  102. print('stock_pool', stock_pool)
  103. except BaseException:
  104. pass
  105. '''
  106. for stock in data:
  107. if stock.replace('SH', 'XSHG').replace('SZ', 'XSHE') in stock_pool:
  108. # 真实买入策略
  109. current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
  110. MA5, MA10, MA20 = ma(stock, 5), ma(stock, 10), ma(stock, 20)
  111. MA5_1 = ma_1(stock, 5)
  112. print(stock, current_price, open_price, MA5, MA10, MA20, MA5_1)
  113. if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (MA20 < MA10):
  114. if his_vol(stock, -1) > his_vol(stock, -2):
  115. results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
  116. print('append')
  117. '''
  118. step = math.ceil(len(stock_pool) / mp.cpu_count())
  119. print('step:', step)
  120. print('cpu_count =', mp.cpu_count())
  121. for i in range(0, len(stock_pool), math.ceil(len(stock_pool) / mp.cpu_count())):
  122. p = mp.Process(target=ma_judge, args=(data, stock_pool[i:i + step], results))
  123. mp_list.append(p)
  124. p.start()
  125. for j in mp_list:
  126. j.join()
  127. results = list(set(results))
  128. print('results!!!!', len(results))
  129. # 选择板块
  130. if len(results) != 0:
  131. num_industry = get_industry(results)
  132. print(num_industry)
  133. industry_list = []
  134. for key in num_industry.values():
  135. for key2 in key.values():
  136. industry_list.append(key2['industry_name'])
  137. industry_list = pd.value_counts(industry_list)
  138. # 最热集中的n个板块
  139. max_industry_list = list(industry_list[0:2].index)
  140. results_industry = []
  141. for key, value in num_industry.items():
  142. for key2 in value.values():
  143. if key2['industry_name'] in max_industry_list:
  144. results_industry.append(key)
  145. print('所有:', set(results_industry))
  146. results_industry = ','.join(set(results_industry))
  147. print('1d', '\n', results_industry)
  148. sql = "INSERT INTO MA5_%s (date,value) VALUES('%s','%s')" % ('1d', dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  149. results_industry)
  150. cursor_pool.execute(sql)
  151. db_pool.commit()
  152. # print(len(results_industry), results_industry)
  153. print(dt.now(), '数据库数据已赋值!')
  154. # 取值交易
  155. keep_stocks = results_industry.split(",")
  156. new_keep_stock = [stock.replace('XSHG', 'SH').replace('XSHE', 'SZ') for stock in keep_stocks]
  157. print(new_keep_stock)
  158. for stock in data:
  159. asset = xt_trader.query_stock_asset(acc)
  160. cash = asset.cash
  161. print(cash)
  162. if stock in new_keep_stock:
  163. current_price = data[stock]['lastPrice']
  164. if cash > 2000:
  165. volume = int((cash / 3 / current_price) // 100 * 100)
  166. print('volume:', volume)
  167. print('买入信号!!!!!!', stock, volume, current_price)
  168. order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE, current_price, 'strategy1', 'order_test')
  169. print(order_id)
  170. print('一轮结束了,现在时间是:', dt.now())
  171. def trader(data):
  172. print(len(data.keys()))
  173. # 先判断卖出条件
  174. positions = xt_trader.query_stock_positions(acc)
  175. print(type(positions))
  176. print('持仓数量', len(positions))
  177. if len(positions) != 0:
  178. positions_dict = {positions[x].stock_code:positions[x].volume for x in range(0, len(positions))}
  179. print(positions_dict)
  180. sell_trader(data, positions_dict)
  181. # 买入条件
  182. buy_trader(data)
  183. class MyXtQuantTraderCallback(XtQuantTraderCallback):
  184. def on_disconnected(self):
  185. """
  186. 连接断开
  187. :return:
  188. """
  189. print(datetime.datetime.now(), '连接断开回调')
  190. def on_stock_order(self, order):
  191. """
  192. 委托回报推送
  193. :param order: XtOrder对象
  194. :return:
  195. """
  196. print(datetime.datetime.now(), '委托回调', order.order_remark)
  197. def on_stock_trade(self, trade):
  198. """
  199. 成交变动推送
  200. :param trade: XtTrade对象
  201. :return:
  202. """
  203. print(datetime.datetime.now(), '成交回调', trade.order_remark)
  204. def on_order_error(self, order_error):
  205. """
  206. 委托失败推送
  207. :param order_error:XtOrderError 对象
  208. :return:
  209. """
  210. # print("on order_error callback")
  211. # print(order_error.order_id, order_error.error_id, order_error.error_msg)
  212. print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
  213. def on_cancel_error(self, cancel_error):
  214. """
  215. 撤单失败推送
  216. :param cancel_error: XtCancelError 对象
  217. :return:
  218. """
  219. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  220. def on_order_stock_async_response(self, response):
  221. """
  222. 异步下单回报推送
  223. :param response: XtOrderResponse 对象
  224. :return:
  225. """
  226. print(f"异步委托回调 {response.order_remark}")
  227. def on_cancel_order_stock_async_response(self, response):
  228. """
  229. :param response: XtCancelOrderResponse 对象
  230. :return:
  231. """
  232. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  233. def on_account_status(self, status):
  234. """
  235. :param response: XtAccountStatus 对象
  236. :return:
  237. """
  238. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  239. if __name__ == '__main__':
  240. auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
  241. print("start")
  242. # 指定客户端所在路径
  243. path = r'c:\\qmt\\userdata_mini'
  244. # 生成session id 整数类型 同时运行的策略不能重复
  245. session_id = int(time.time())
  246. xt_trader = XtQuantTrader(path, session_id)
  247. # 创建资金账号为 800068 的证券账号对象
  248. acc = StockAccount('920000207040', 'SECURITY')
  249. # 创建交易回调类对象,并声明接收回调
  250. callback = MyXtQuantTraderCallback()
  251. xt_trader.register_callback(callback)
  252. # 启动交易线程
  253. xt_trader.start()
  254. # 建立交易连接,返回0表示连接成功
  255. connect_result = xt_trader.connect()
  256. print('建立交易连接,返回0表示连接成功', connect_result)
  257. # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
  258. subscribe_result = xt_trader.subscribe(acc)
  259. print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
  260. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  261. xtdata.subscribe_whole_quote(stocks, callback=trader)
  262. xtdata.run()
  263. # xtdata.subscribe_quote('000001.SZ', '1d', '', '', count=1, callback=MA)