230504_real_time.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import os
  4. import pandas as pd
  5. from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
  6. from xtquant.xttype import StockAccount
  7. from xtquant import xtdata, xtconstant
  8. import time
  9. from sqlalchemy import create_engine, text
  10. from jqdatasdk import *
  11. import pymysql
  12. import multiprocessing as mp
  13. import math
  14. import psutil
  15. import datetime
  16. from apscheduler.schedulers.blocking import BlockingScheduler
  17. import sys
  18. # 指定客户端所在路径
  19. path = r'c:\\qmt\\userdata_mini'
  20. # 创建资金账号为 800068 的证券账号对象
  21. acc = StockAccount('920000207040', 'SECURITY')
  22. # 生成session id 整数类型 同时运行的策略不能重复
  23. session_id = 123456
  24. xt_trader = None
  25. engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
  26. pool_size=5000, pool_recycle=50, max_overflow=-1)
  27. auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
  28. class MyXtQuantTraderCallback(XtQuantTraderCallback):
  29. def on_disconnected(self):
  30. """
  31. 连接断开
  32. :return:
  33. """
  34. print(datetime.datetime.now(), '连接断开回调')
  35. def on_stock_order(self, order):
  36. """
  37. 委托回报推送
  38. :param order: XtOrder对象
  39. :return:
  40. """
  41. print(datetime.datetime.now(), '委托回调', order.order_remark)
  42. def on_stock_trade(self, trade):
  43. """
  44. 成交变动推送
  45. :param trade: XtTrade对象
  46. :return:
  47. """
  48. print(datetime.datetime.now(), '成交回调', trade.order_remark)
  49. def on_order_error(self, order_error):
  50. """
  51. 委托失败推送
  52. :param order_error:XtOrderError 对象
  53. :return:
  54. """
  55. # print("on order_error callback")
  56. # print(order_error.order_id, order_error.error_id, order_error.error_msg)
  57. print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
  58. def on_cancel_error(self, cancel_error):
  59. """
  60. 撤单失败推送
  61. :param cancel_error: XtCancelError 对象
  62. :return:
  63. """
  64. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  65. def on_order_stock_async_response(self, response):
  66. """
  67. 异步下单回报推送
  68. :param response: XtOrderResponse 对象
  69. :return:
  70. """
  71. print(f"异步委托回调 {response.order_remark}")
  72. def on_cancel_order_stock_async_response(self, response):
  73. """
  74. :param response: XtCancelOrderResponse 对象
  75. :return:
  76. """
  77. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  78. def on_account_status(self, status):
  79. """
  80. :param response: XtAccountStatus 对象
  81. :return:
  82. """
  83. print(datetime.datetime.now(), sys._getframe().f_code.co_name)
  84. def run(seq, pid):
  85. mor = datetime.datetime.strptime(
  86. str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
  87. afternoon = datetime.datetime.strptime(
  88. str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
  89. mor_1 = datetime.datetime.strptime(
  90. str(dt.now().date()) + '12:59', '%Y-%m-%d%H:%M')
  91. """阻塞线程接收行情回调"""
  92. import time
  93. client = xtdata.get_client()
  94. while True:
  95. time.sleep(3)
  96. now_date = dt.now()
  97. if not client.is_connected():
  98. xtdata.unsubscribe_quote(seq)
  99. raise Exception('行情服务连接断开')
  100. # if mor < dt.now() < mor_1:
  101. # xtdata.unsubscribe_quote(seq)
  102. # print(f'现在时间:{dt.now()},已休市')
  103. # sys.exit()
  104. # break
  105. # return 0
  106. elif dt.now() > afternoon:
  107. xtdata.unsubscribe_quote(seq)
  108. print(f'现在时间:{dt.now()},已收盘')
  109. sys.exit()
  110. break
  111. # return 0
  112. return
  113. def get_fundamentals(results):
  114. return results
  115. pass
  116. def ma(stock, num, data):
  117. global engine_stock
  118. try:
  119. i = (num - 1) * -1
  120. df = pd.read_sql_query(text(
  121. 'select close_front from `%s_1d`' % stock), engine_stock.connect())
  122. except BaseException as e:
  123. print(e)
  124. return 9999999
  125. else:
  126. ma_num = (sum(df['close_front'][i:]) + data[stock]['lastPrice']) / num
  127. return ma_num
  128. def ma_1(stock, num):
  129. global engine_stock
  130. i = num * -1
  131. try:
  132. df = pd.read_sql_query(text(
  133. 'select close_front from `%s_1d`' % stock), engine_stock.connect())
  134. except BaseException as e:
  135. print(e)
  136. return 9999999
  137. else:
  138. ma_num_1 = df['close_front'][i:].mean()
  139. return ma_num_1
  140. def his_vol(stock, num):
  141. global engine_stock
  142. num = num * -1
  143. try:
  144. df = pd.read_sql_query(text(
  145. 'select volume_front from `%s_1d`' % stock), engine_stock.connect())
  146. except BaseException:
  147. return 9999999
  148. else:
  149. return df['volume_front'].iloc[num]
  150. def ma_judge(data, list_judge, rate, results):
  151. # print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
  152. for stock in list_judge:
  153. current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
  154. MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30,
  155. data), \
  156. ma(stock, 60, data), ma(stock, 120, data)
  157. MA5_1 = ma_1(stock, 5)
  158. # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
  159. # 入交易池标准:阳线\大于MA5\MA5向上\MA20<MA10\离120线有距离
  160. if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.05) \
  161. & (current_price > MA120 or current_price < MA120 * rate):
  162. if his_vol(stock, -1) > his_vol(stock, -2):
  163. results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
  164. def sell_trader(data):
  165. # print('卖出函数:', dt.now())
  166. positions = xt_trader.query_stock_positions(acc)
  167. positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
  168. print(
  169. f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume != 0])}')
  170. for stock, can_use_volume in positions_dict.items():
  171. if stock in data and can_use_volume != 0:
  172. current_price = data[stock]['lastPrice']
  173. open_price = data[stock]['open']
  174. MA5 = ma(stock, 5, data)
  175. MA5_1 = ma_1(stock, 5)
  176. df = pd.read_sql_query(text(
  177. 'select close_front, high_front from `%s_1d`' % stock), engine_stock.connect())
  178. print(f"{data[stock]['time']}, {stock},持仓量为{can_use_volume}当前价:{current_price},开盘价:{open_price},"
  179. f"MA5:{MA5},昨日MA5:{MA5_1},开始判断:")
  180. if current_price == xtdata.get_instrument_detail(stock).get('UpStopPrice') \
  181. or (df['close_front'].iloc[-1] == df['high_front'].iloc[-1]
  182. and df['close_front'].iloc[-1] / df['close_front'].iloc[-2] > 1.08):
  183. print(f"{stock}涨停或昨日涨幅超过8%,持股观察!{data[stock]['time']}")
  184. continue
  185. elif current_price < MA5 or MA5 < MA5_1:
  186. print('卖出信号!!!!!!', stock, current_price)
  187. order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
  188. xtconstant.LATEST_PRICE, 0, 'MA5策略', '低于MA5趋势向下')
  189. print('价格:', current_price, open_price, MA5, MA5_1, '低于MA5趋势向下')
  190. print(order_id, stock, can_use_volume)
  191. elif current_price > MA5 * 1.07:
  192. print('盈利乖离率超7%!!!!!!', stock, current_price)
  193. order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
  194. xtconstant.LATEST_PRICE, 0, 'MA5策略', '盈利乖离率超7%')
  195. print('价格:', current_price, open_price, MA5, MA5_1, '盈利乖离率超7%')
  196. print(order_id, stock, can_use_volume)
  197. else:
  198. # print(f'本轮没有持仓股票信息!')
  199. pass
  200. engine_stock.dispose()
  201. def buy_trader(data):
  202. # print('买入函数:', dt.now(), f'接受到{len(data.keys())}个个股')
  203. results = mp.Manager().list()
  204. mp_list = []
  205. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
  206. pool_size=4000, pool_recycle=50, max_overflow=-1)
  207. try:
  208. stock_pool = pd.read_sql_query(
  209. text('select value from `%s` order by `index` desc limit 10' % '1d'), engine_hlfx_pool.connect())
  210. stock_pool = stock_pool.iloc[0, 0].split(",")
  211. stock_pool.sort()
  212. print('stock_pool', len(stock_pool))
  213. except BaseException as e:
  214. print(e)
  215. if len(stock_pool) != 0:
  216. list_judge = list(set(data.keys()) & set(stock_pool))
  217. print(f'本轮有{len(data.keys())}条个股信息,而list_judge有:{len(list_judge)}')
  218. else:
  219. print(f'stock_pool为{len(stock_pool)}个')
  220. step = math.ceil(len(list_judge) / 4)
  221. rate = 0.8
  222. if len(list_judge) != 0:
  223. print(f'list_judge:{list_judge}')
  224. for i in range(0, len(list_judge), step):
  225. p = mp.Process(target=ma_judge, args=(data, list_judge[i:i + step], rate, results))
  226. mp_list.append(p)
  227. p.start()
  228. for j in mp_list:
  229. j.join()
  230. results = list(set(results))
  231. # 选择板块
  232. if len(results) != 0:
  233. print(f'进入板块选择{results}')
  234. # 基本面过滤
  235. results = get_fundamentals(results)
  236. num_industry = get_industry(results)
  237. industry_list = []
  238. for key in num_industry.values():
  239. for key2 in key.values():
  240. industry_list.append(key2['industry_name'])
  241. industry_list = pd.value_counts(industry_list)
  242. # 最热集中的n个板块
  243. max_industry_list = list(industry_list[0:2].index)
  244. results_industry = []
  245. for key, value in num_industry.items():
  246. for key2 in value.values():
  247. if key2['industry_name'] in max_industry_list:
  248. results_industry.append(key)
  249. results_industry = ','.join(set(results_industry))
  250. print('1d', '\n', results_industry)
  251. db_pool = pymysql.connect(host='localhost',
  252. user='root',
  253. port=3307,
  254. password='r6kEwqWU9!v3',
  255. database='hlfx_pool')
  256. cursor_pool = db_pool.cursor()
  257. sql = "INSERT INTO MA5_%s (date,value) VALUES('%s','%s')" % ('1d', dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  258. results_industry)
  259. cursor_pool.execute(sql)
  260. db_pool.commit()
  261. # print(len(results_industry), results_industry)
  262. print(dt.now(), '数据库数据已赋值!')
  263. cursor_pool.close()
  264. db_pool.close()
  265. # 取值交易
  266. keep_stocks = results_industry.split(",")
  267. new_keep_stock = [stock.replace('XSHG', 'SH').replace('XSHE', 'SZ') for stock in keep_stocks]
  268. print(f'{dt.now()},new_keep_stock is:{len(new_keep_stock)}')
  269. if len(new_keep_stock) != 0:
  270. # 进入购买程序
  271. max_pos = 7
  272. for stock in new_keep_stock:
  273. positions = xt_trader.query_stock_positions(acc)
  274. asset = xt_trader.query_stock_asset(acc)
  275. cash = asset.cash
  276. positions_dict = {positions[x].stock_code: positions[x].volume for x in range(0, len(positions)) if
  277. positions[x].volume > 0}
  278. print(f'判断{stock}:cash={cash},持仓数量为{len(positions_dict)}')
  279. current_price = data[stock]['lastPrice']
  280. current_high = data[stock]['high']
  281. if stock not in positions_dict:
  282. if len(positions_dict) < max_pos and current_price > 9 \
  283. and current_price > (current_high * 0.98):
  284. if 5000 > cash:
  285. i = 1
  286. else:
  287. i = 2
  288. volume = int((cash / i / current_price) // 100 * 100)
  289. print('买入信号!!!!!!', stock, volume, current_price)
  290. order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume,
  291. xtconstant.LATEST_PRICE,
  292. current_price, 'MA5策略', 'MA5趋势向上')
  293. print(order_id)
  294. else:
  295. print(f'Cash只有:{cash} 或者 现有持仓{len(positions_dict)} 超过了{max_pos}')
  296. else:
  297. print(f'{stock}已持仓!')
  298. engine_hlfx_pool.dispose()
  299. print('一轮结束了,现在时间是:', dt.now())
  300. def trader(data):
  301. print(f'本轮订阅{len(data)}')
  302. # print(f'xt_trader = {xt_trader},{session_id}')
  303. # print(len(xt_trader.query_stock_positions(acc)))
  304. # 卖出判断
  305. sell_trader(data)
  306. # 买入条件
  307. buy_trader(data)
  308. def bridge():
  309. global session_id, xt_trader
  310. pid = os.getpid()
  311. connect_result = -1
  312. subscribe_result = -1
  313. while True:
  314. if connect_result != 0 or subscribe_result != 0:
  315. session_id = int(time.time())
  316. xt_trader = XtQuantTrader(path, session_id)
  317. # 创建交易回调类对象,并声明接收回调
  318. callback = MyXtQuantTraderCallback()
  319. xt_trader.register_callback(callback)
  320. # 启动交易线程
  321. xt_trader.start()
  322. # 建立交易连接,返回0表示连接成功
  323. connect_result = xt_trader.connect()
  324. print('建立交易连接,返回0表示连接成功', connect_result)
  325. # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
  326. subscribe_result = xt_trader.subscribe(acc)
  327. print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
  328. # 建立交易连接,返回0表示连接成功
  329. connect_result = xt_trader.connect()
  330. print('建立交易连接,返回0表示连接成功', connect_result)
  331. # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
  332. subscribe_result = xt_trader.subscribe(acc)
  333. print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
  334. time.sleep(3)
  335. else:
  336. break
  337. print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了,session_id = {session_id}, \n')
  338. positions = xt_trader.query_stock_positions(acc)
  339. positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
  340. print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
  341. stocks = xtdata.get_stock_list_in_sector('沪深A股')
  342. seq = xtdata.subscribe_whole_quote(stocks, callback=trader)
  343. run(seq, pid)
  344. def job_func():
  345. print(f"Job started at {dt.now()}")
  346. # 创建子进程
  347. p = mp.Process(target=bridge)
  348. # 启动子进程
  349. p.start()
  350. # 等待子进程结束
  351. p.join()
  352. print(f"Job finished at {dt.now()}")
  353. if __name__ == '__main__':
  354. mp.freeze_support()
  355. # print('cpu_count =', mp.cpu_count())
  356. pus = psutil.Process()
  357. pus.cpu_affinity([12, 13, 14, 15])
  358. # job_func()
  359. scheduler = BlockingScheduler()
  360. scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='40',
  361. timezone="Asia/Shanghai", max_instances=5)
  362. # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='12', minute='35',
  363. # timezone="Asia/Shanghai")
  364. try:
  365. scheduler.start()
  366. except (KeyboardInterrupt, SystemExit):
  367. pass