|
- # coding:utf-8
- from datetime import datetime as dt
- import os
- import pandas as pd
- from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
- from xtquant.xttype import StockAccount
- from xtquant import xtdata, xtconstant
- import time
- from sqlalchemy import create_engine, text
- from jqdatasdk import *
- import pymysql
- import multiprocessing as mp
- import math
- import psutil
- import datetime
- from apscheduler.schedulers.blocking import BlockingScheduler
- import sys
- # 指定客户端所在路径
- path = r'c:\\qmt\\userdata_mini'
- # 创建资金账号为 800068 的证券账号对象
- acc = StockAccount('920000207040', 'SECURITY')
- # 生成session id 整数类型 同时运行的策略不能重复
- session_id = 123456
- xt_trader = None
- order_list = []
- engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
- pool_size=5000, pool_recycle=50, max_overflow=-1)
- class MyXtQuantTraderCallback(XtQuantTraderCallback):
- def on_disconnected(self):
- """
- 连接断开
- :return:
- """
- print(datetime.datetime.now(), '连接断开回调')
- def on_stock_order(self, order):
- """
- 委托回报推送
- :param order: XtOrder对象
- :return:
- """
- print(datetime.datetime.now(), '委托回调', order.order_remark)
- def on_stock_trade(self, trade):
- """
- 成交变动推送
- :param trade: XtTrade对象
- :return:
- """
- print(datetime.datetime.now(), '成交回调', trade.order_remark)
- def on_order_error(self, order_error):
- """
- 委托失败推送
- :param order_error:XtOrderError 对象
- :return:
- """
- # print("on order_error callback")
- # print(order_error.order_id, order_error.error_id, order_error.error_msg)
- print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
- def on_cancel_error(self, cancel_error):
- """
- 撤单失败推送
- :param cancel_error: XtCancelError 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def on_order_stock_async_response(self, response):
- """
- 异步下单回报推送
- :param response: XtOrderResponse 对象
- :return:
- """
- print(f"异步委托回调 {response.order_remark}")
- def on_cancel_order_stock_async_response(self, response):
- """
- :param response: XtCancelOrderResponse 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def on_account_status(self, status):
- """
- :param response: XtAccountStatus 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def run(seq, pid):
- mor = datetime.datetime.strptime(
- str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
- afternoon = datetime.datetime.strptime(
- str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
- mor_1 = datetime.datetime.strptime(
- str(dt.now().date()) + '12:59', '%Y-%m-%d%H:%M')
- """阻塞线程接收行情回调"""
- import time
- client = xtdata.get_client()
- while True:
- time.sleep(3)
- now_date = dt.now()
- if not client.is_connected():
- xtdata.unsubscribe_quote(seq)
- raise Exception('行情服务连接断开')
- # if mor < dt.now() < mor_1:
- # xtdata.unsubscribe_quote(seq)
- # print(f'现在时间:{dt.now()},已休市')
- # sys.exit()
- # break
- # return 0
- elif dt.now() > afternoon:
- xtdata.unsubscribe_quote(seq)
- print(f'现在时间:{dt.now()},已收盘')
- sys.exit()
- break
- # return 0
- return
- def get_fundamentals(results):
- return results
- pass
- def ma(stock, num, data):
- global engine_stock
- try:
- i = (num - 1) * -1
- df = pd.read_sql_query(text(
- 'select close_front from `%s_1d`' % stock), engine_stock.connect())
- except BaseException as e:
- print(e)
- return 9999999
- else:
- ma_num = (sum(df['close_front'][i:]) + data[stock]['lastPrice']) / num
- return ma_num
- def ma_1(stock, num):
- global engine_stock
- i = num * -1
- try:
- df = pd.read_sql_query(text(
- 'select close_front from `%s_1d`' % stock), engine_stock.connect())
- except BaseException as e:
- print(e)
- return 9999999
- else:
- ma_num_1 = df['close_front'][i:].mean()
- return ma_num_1
- def his_vol(stock, num):
- global engine_stock
- num = num * -1
- try:
- df = pd.read_sql_query(text(
- 'select volume_front from `%s_1d`' % stock), engine_stock.connect())
- except BaseException:
- return 9999999
- else:
- return df['volume_front'].iloc[num]
- def ma_judge(data, list_judge, rate, results):
- # print(f'这个ma_judge的PID为:{os.getpid()},本轮计算:{len(list_judge)}个股')
- for stock in list_judge:
- current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
- MA5, MA10, MA20, MA30, MA60, MA120 = ma(stock, 5, data), ma(stock, 10, data), ma(stock, 20, data), ma(stock, 30,
- data), \
- ma(stock, 60, data), ma(stock, 120, data)
- MA5_1 = ma_1(stock, 5)
- # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
- # 入交易池标准:阳线\大于MA5\MA5向上\MA20<MA10\离120线有距离
- if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.05) \
- & (current_price > MA120 or current_price < MA120 * rate):
- if his_vol(stock, -1) > his_vol(stock, -2):
- results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
- def sell_trader(data):
- # print('卖出函数:', dt.now())
- positions = xt_trader.query_stock_positions(acc)
- positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
- print(
- f'目前持仓总数为:{len([positions[x].stock_code for x in range(0, len(positions)) if positions[x].volume != 0])}')
- # orders = xt_trader.query_stock_orders(acc, False)
- # print(orders)
- # if orders is not None:
- # for order_id in order_list:
- # cancel_result = xt_trader.cancel_order_stock(acc, order_id)
- # print(f'{order_id}撤单结果:{cancel_result}')
- # else:
- # print(f'今日没有委托单!')
- for stock, can_use_volume in positions_dict.items():
- # if stock in data and can_use_volume != 0:
- if stock in data:
- current_price = data[stock]['lastPrice']
- open_price = data[stock]['open']
- MA5 = ma(stock, 5, data)
- MA5_1 = ma_1(stock, 5)
- print(
- f"{data[stock]['time']}, {stock}\n当前时间为:{dt.now().strftime('%Y-%m-%d %H:%M:%S')},"
- f"信号时间:{dt.fromtimestamp((data[stock]['time']) / 1000.0)}\n"
- f"持仓量为{can_use_volume}当前价:{current_price},开盘价:{open_price},"f"MA5:{MA5},昨日MA5:{MA5_1},开始判断:")
- df = pd.read_sql_query(text(
- 'select close_front, high_front from `%s_1d`' % stock), engine_stock.connect())
- if current_price == xtdata.get_instrument_detail(stock).get('UpStopPrice') \
- or (df['close_front'].iloc[-1] == df['high_front'].iloc[-1]
- and df['close_front'].iloc[-1] / df['close_front'].iloc[-2] > 1.08):
- print(f"{stock}涨停或昨日涨幅超过8%,持股观察!{data[stock]['time']}")
- continue
- elif current_price < MA5 or MA5 < MA5_1:
- print('卖出信号!!!!!!', stock, current_price)
- order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
- xtconstant.LATEST_PRICE, 0, 'MA5策略', '低于MA5趋势向下')
- print('价格:', current_price, open_price, MA5, MA5_1, '低于MA5趋势向下')
- print(order_id, stock, can_use_volume)
- order_list.append(order_id)
- elif current_price > MA5 * 1.07:
- print('盈利乖离率超7%!!!!!!', stock, current_price)
- order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL, can_use_volume,
- xtconstant.LATEST_PRICE, 0, 'MA5策略', '盈利乖离率超7%')
- print('价格:', current_price, open_price, MA5, MA5_1, '盈利乖离率超7%')
- print(order_id, stock, can_use_volume)
- order_list.append(order_id)
- else:
- # print(f'本轮没有持仓股票信息!')
- pass
- engine_stock.dispose()
- def bridge():
- global session_id, xt_trader
- pid = os.getpid()
- connect_result = -1
- subscribe_result = -1
- while True:
- if connect_result != 0 or subscribe_result != 0:
- session_id = int(time.time())
- xt_trader = XtQuantTrader(path, session_id)
- # 创建交易回调类对象,并声明接收回调
- callback = MyXtQuantTraderCallback()
- xt_trader.register_callback(callback)
- # 启动交易线程
- xt_trader.start()
- # 建立交易连接,返回0表示连接成功
- connect_result = xt_trader.connect()
- print('建立交易连接,返回0表示连接成功', connect_result)
- # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
- subscribe_result = xt_trader.subscribe(acc)
- print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
- # 建立交易连接,返回0表示连接成功
- connect_result = xt_trader.connect()
- print('建立交易连接,返回0表示连接成功', connect_result)
- # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
- subscribe_result = xt_trader.subscribe(acc)
- print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
- time.sleep(3)
- else:
- break
- print(f'MyPid is {os.getpid()}, now is {dt.now()},开盘了,session_id = {session_id}, \n')
- positions = xt_trader.query_stock_positions(acc)
- positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
- print(f'今日可卖出个股总数:{len([value for value in positions_dict.values() if value != 0])}')
- stocks = xtdata.get_stock_list_in_sector('沪深A股')
- seq_s = xtdata.subscribe_whole_quote(stocks, callback=sell_trader)
- # seq_b = xtdata.subscribe_whole_quote(stocks, callback=buy_trader)
- run(seq_s, pid)
- # run(seq_b, pid)
- def job_func():
- print(f"Job started at {dt.now()}")
- # 创建子进程
- p = mp.Process(target=bridge)
- # 启动子进程
- p.start()
- # 等待子进程结束
- p.join()
- print(f"Job finished at {dt.now()}")
- if __name__ == '__main__':
- mp.freeze_support()
- # print('cpu_count =', mp.cpu_count())
- pus = psutil.Process()
- # pus.cpu_affinity([16, 17, 18, 19])
- print('sell real time start at', dt.now())
- job_func()
- scheduler = BlockingScheduler()
- scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='40',
- timezone="Asia/Shanghai", max_instances=5)
- # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='12', minute='35',
- # timezone="Asia/Shanghai")
- try:
- scheduler.start()
- except (KeyboardInterrupt, SystemExit):
- pass
|