222.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. # coding:utf-8
  2. import pandas as pd
  3. import pymysql
  4. from sqlalchemy import create_engine, text
  5. import threading
  6. from datetime import datetime as dt
  7. import datetime
  8. from jqdatasdk.technical_analysis import *
  9. from xtquant import xtdata, xtconstant
  10. from xtquant.xttype import StockAccount
  11. from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
  12. import time
  13. import math
  14. import multiprocessing as mp
  15. import os
  16. import psutil
  17. import traceback
  18. from apscheduler.schedulers.blocking import BlockingScheduler
  19. import sys
  20. pd.set_option('display.max_columns', None) # 设置显示最大行
  21. # 全局变量 stocks_dict
  22. stocks_dict = {}
  23. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  24. for stock in stocks:
  25. stocks_dict[stock] = []
  26. print(stocks_dict)
  27. class MyXtQuantTraderCallback(XtQuantTraderCallback):
  28. def on_disconnected(self):
  29. """
  30. 连接断开
  31. :return:
  32. """
  33. print(datetime.datetime.now(), '连接断开回调')
  34. def on_stock_order(self, order):
  35. """
  36. 委托回报推送
  37. :param order: XtOrder对象
  38. :return:
  39. """
  40. print(datetime.datetime.now(), '委托回调', order.order_remark)
  41. def on_stock_trade(self, trade):
  42. """
  43. 成交变动推送
  44. :param trade: XtTrade对象
  45. :return:
  46. """
  47. print(datetime.datetime.now(), '成交回调', trade.order_remark)
  48. def on_order_error(self, order_error):
  49. """
  50. 委托失败推送
  51. :param order_error:XtOrderError 对象
  52. :return:
  53. """
  54. # print("on order_error callback")
  55. # print(order_error.order_id, order_error.error_id, order_error.error_msg)
  56. print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
  57. def on_cancel_error(self, cancel_error):
  58. """
  59. 撤单失败推送
  60. :param cancel_error: XtCancelError 对象
  61. :return:
  62. """
  63. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  64. def on_order_stock_async_response(self, response):
  65. """
  66. 异步下单回报推送
  67. :param response: XtOrderResponse 对象
  68. :return:
  69. """
  70. print(f"异步委托回调 {response.order_remark}")
  71. def on_cancel_order_stock_async_response(self, response):
  72. """
  73. :param response: XtCancelOrderResponse 对象
  74. :return:
  75. """
  76. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  77. def on_account_status(self, status):
  78. """
  79. :param response: XtAccountStatus 对象
  80. :return:
  81. """
  82. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  83. def err_call_back(err):
  84. print(f'问题在这里~ error:{str(err)}')
  85. traceback.print_exc()
  86. def run(seq):
  87. mor = datetime.datetime.strptime(
  88. str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
  89. afternoon = datetime.datetime.strptime(
  90. str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
  91. mor_1 = datetime.datetime.strptime(
  92. str(dt.now().date()) + '11:10', '%Y-%m-%d%H:%M')
  93. """阻塞线程接收行情回调"""
  94. import time
  95. client = xtdata.get_client()
  96. while True:
  97. now_date = dt.now()
  98. if not client.is_connected():
  99. xtdata.unsubscribe_quote(seq)
  100. raise Exception('行情服务连接断开')
  101. # if mor < dt.now() < mor_1:
  102. # xtdata.unsubscribe_quote(seq)
  103. # print(f'现在时间:{dt.now()},已休市')
  104. # sys.exit()
  105. # break
  106. # return 0
  107. elif dt.now() > afternoon:
  108. xtdata.unsubscribe_quote(seq)
  109. print(f'现在时间:{dt.now()},已收盘')
  110. sys.exit()
  111. break
  112. return
  113. def speed(stock_list, datas):
  114. global stocks_dict
  115. print('..............................',stocks_dict)
  116. for i in range(len(stock_list)):
  117. stock = stock_list[i]
  118. try:
  119. stocks_dict[stock].append(datas[stock]['lastPrice'])
  120. except BaseException as e:
  121. print(e)
  122. print(stock, stocks_dict[stock], datas[stock]['lastPrice'])
  123. # print(datas[i]['lastPrice'], stocks_dict[i])
  124. '''
  125. for stock in stock_list:
  126. print(stock, datas[stock]['lastPrice'])
  127. print(stocks_dict[stock])
  128. try:
  129. stocks_dict[stock].append(datas[stock]['lastPrice'])
  130. except BaseException as e:
  131. print(e)
  132. print(stock, stocks_dict[stock], datas[stock]['lastPrice'])
  133. '''
  134. def prepare(datas):
  135. for k, v in datas.items():
  136. # print(k)
  137. stocks_dict[k].append(v['lastPrice'])
  138. print(k, stocks_dict[k], v['lastPrice'])
  139. # stocks_dict[stock] = datas[stock]['lastPrice']
  140. # 将数据添加至全局变量 stocks_dict[stock] key为股票代码,值为最新价
  141. # print(stocks_dict[stock])
  142. '''
  143. stock_list = list(datas.keys())
  144. if len(datas.keys()) >= 12:
  145. cpu_count = 12
  146. else:
  147. cpu_count = len(datas.keys())
  148. step = math.ceil(len(stock_list) / cpu_count)
  149. to_list = []
  150. for i in range(0, len(stock_list), step):
  151. to_list.append([x for x in stock_list[i:i + step]])
  152. pool = mp.Pool(processes=int(cpu_count/2))
  153. for m in range(len(to_list)):
  154. pool.apply_async(func=speed,
  155. args=(to_list[m], datas), error_callback=err_call_back)
  156. pool.close()
  157. pool.join()
  158. '''
  159. def bridge():
  160. # 全局变量 stocks_dict
  161. global stocks
  162. print(f'bridge is {os.getpid()}, now is {dt.now()},开盘了')
  163. # stocks_dict = dict.fromkeys(stocks, [])
  164. # print(stocks_dict)
  165. seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
  166. # 建立一个以stocks中值为键,值为空列表的字典
  167. # print(f'stocks_dict is {stocks_dict}')
  168. run(seq)
  169. if __name__ == '__main__':
  170. print(f'总进程pid:{os.getpid()}')
  171. mp.freeze_support()
  172. pus = psutil.Process()
  173. # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
  174. path = r'c:\\qmt\\userdata_mini'
  175. # 生成session id 整数类型 同时运行的策略不能重复
  176. session_id = int(time.time())
  177. xt_trader = XtQuantTrader(path, session_id)
  178. # 创建资金账号为 800068 的证券账号对象
  179. acc = StockAccount('920000207040', 'SECURITY')
  180. # 创建交易回调类对象,并声明接收回调
  181. callback = MyXtQuantTraderCallback()
  182. xt_trader.register_callback(callback)
  183. # 启动交易线程
  184. xt_trader.start()
  185. # 建立交易连接,返回0表示连接成功
  186. connect_result = xt_trader.connect()
  187. print('建立交易连接,返回0表示连接成功', connect_result)
  188. # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
  189. subscribe_result = xt_trader.subscribe(acc)
  190. print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
  191. bridge()
  192. scheduler = BlockingScheduler()
  193. scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='25',
  194. timezone="Asia/Shanghai", max_instances=5)
  195. # # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='00',
  196. # # timezone="Asia/Shanghai")
  197. try:
  198. scheduler.start()
  199. except (KeyboardInterrupt, SystemExit):
  200. pass