123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- # coding:utf-8
- from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
- from xtquant.xttype import StockAccount
- from xtquant import xtdata, xtconstant
- from datetime import datetime as dt
- import pandas as pd
- import time
- import datetime
- import sys
- import os
- pd.set_option('display.max_columns', None) # 设置显示最大行
- """
- def run():
- '''阻塞线程接收行情回调'''
- import time
- client = xtdata.get_client()
- while True:
- time.sleep(3)
- now_date = dt.now()
- if not client.is_connected() :
- raise Exception('行情服务连接断开')
- break
- return
- def trader(data):
- # print('callback',os.getpid())
- print(f'{dt.now()},len={len(data.keys())}')
- print(data)
- # stocks = xtdata.get_stock_list_in_sector("沪深债券")
- # print(len(stocks))
- stocks = ['110043.SH', '110044.SH', '128075.SZ', '128079.SZ', '113017.SH', '128119.SZ']
- # # stocks = ['110055.SZ', '110061.SZ']
- stock_code = '127045.SZ'
- # seq2 = xtdata.subscribe_quote(stock_code, period='tick', start_time='20230101', end_time='20230407', count=10000, callback=trader)
- # # print(seq2)
- # seq = xtdata.subscribe_whole_quote(['沪深债券'], callback=trader)
- sss = xtdata.subscribe_whole_quote(stocks, callback=trader)
- # aaa = xtdata.subscribe_quote('128130.SZ', period='tick', start_time='', end_time='', count=0, callback=trader)
- #
- xtdata.run()
- exit()
- """
- class MyXtQuantTraderCallback(XtQuantTraderCallback):
- def on_disconnected(self):
- """
- 连接断开
- :return:
- """
- print(datetime.datetime.now(), '连接断开回调')
- def on_stock_order(self, order):
- """
- 委托回报推送
- :param order: XtOrder对象
- :return:
- """
- print(datetime.datetime.now(), '委托回调', order.order_remark)
- def on_stock_trade(self, trade):
- """
- 成交变动推送
- :param trade: XtTrade对象
- :return:
- """
- print(datetime.datetime.now(), '成交回调', trade.order_remark)
- def on_order_error(self, order_error):
- """
- 委托失败推送
- :param order_error:XtOrderError 对象
- :return:
- """
- # print("on order_error callback")
- # print(order_error.order_id, order_error.error_id, order_error.error_msg)
- print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
- def on_cancel_error(self, cancel_error):
- """
- 撤单失败推送
- :param cancel_error: XtCancelError 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def on_order_stock_async_response(self, response):
- """
- 异步下单回报推送
- :param response: XtOrderResponse 对象
- :return:
- """
- print(f"异步委托回调 {response.order_remark}")
- def on_cancel_order_stock_async_response(self, response):
- """
- :param response: XtCancelOrderResponse 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def on_account_status(self, status):
- """
- :param response: XtAccountStatus 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def get_tick(code, start_time, end_time, period='tick'):
- from xtquant import xtdata
- xtdata.download_history_data(code, period=period, start_time=start_time, end_time=end_time)
- data = xtdata.get_local_data(field_list=[], stock_code=[code], period=period)
- result_list = data[code]
- df = pd.DataFrame(result_list)
- df['time'] = df['time'].apply(lambda x: datetime.datetime.fromtimestamp(x / 1000.0))
- return df.iloc[-20:].loc[:, ['time', 'lastPrice', 'open', 'high','low','amount', 'volume']]
- # return df.iloc[-20:,:]
- def process_timestamp(df, filename):
- df = df.set_index('time_str')
- result = df.resample('3S').first().ffill()
- # result = result[(result.index >= '2022-07-20 09:30') & (result.index <= '2022-07-20 15:00')]
- result = result.reset_index()
- # result.to_csv(filename + '.csv')
- print(result)
- def get_macd_data(data, short=0, long1=0, mid=0):
- if short == 0:
- short = 12
- if long1 == 0:
- long1 = 26
- if mid == 0:
- mid = 9
- data['sema'] = pd.Series(data['close']).ewm(span=short).mean()
- data['lema'] = pd.Series(data['close']).ewm(span=long1).mean()
- data.fillna(0, inplace=True)
- data['dif'] = data['sema'] - data['lema']
- data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
- data['macd'] = 2 * (data['dif'] - data['dea'])
- data.fillna(0, inplace=True)
- return data
- def get_hlfx(data):
- Trading_signals = 0
- data_temp = data[['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']]
- data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
- df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
- # 先处理去包含
- for i in data_temp.index:
- if i == 0 or i == 1:
- df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
- # 不包含
- elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
- and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
- or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
- and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
- df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
- # 包含
- else:
- # 左高,下降
- if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
- df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
- df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
- else:
- # 右高,上升
- df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
- df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
- # print('111', df_day, data_temp)
- if len(df_day.index) > 2:
- # 寻找顶底分型
- for x in range(2, len(df_day.index)):
- m = x - 1
- # 底
- # 符合底分型形态,且第2、3根k线是阳线
- if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
- (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])) and \
- df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
- df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
- df_day.loc[x, 'HL'] = 'L*'
- while m:
- if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
- if (x - m) > 3:
- # 成笔——>L
- df_day.loc[x, 'HL'] = 'L'
- # 产生信号,进入hlfx_pool
- if x == len(df_day.index) - 1:
- Trading_signals = 1
- elif df_day.loc[m, 'HL'] == 'L':
- if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
- # 前一个为底更高,且中间不存在更低的底
- df_day.loc[x, 'HL'] = 'L'
- # 产生信号,进入hlfx_pool
- if x == len(df_day.index) - 1:
- Trading_signals = 1
- # 获得MACD,判断MACD判断背驰
- x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
- data_temp.loc[x, 'macd']
- m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
- data_temp.loc[m, 'macd']
- # MACD底背驰
- if m_macd_dif < x_macd_dif:
- # 背驰底->LL
- df_day.loc[x, 'HL'] = 'LL'
- break
- break
- m = m - 1
- if m == 0:
- df_day.loc[x, 'HL'] = 'L'
- # 顶
- elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
- df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
- df_day.loc[x, 'HL'] = 'H*'
- while m:
- if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
- if x - m > 3:
- # 成笔->H
- df_day.loc[x, 'HL'] = 'H'
- # 产生信号,进入hlfx_pool
- if x == len(df_day.index) - 1:
- Trading_signals = 2
- elif df_day.loc[m, 'HL'] == 'H':
- if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
- # 前一个为顶,且中间存在不包含 or 更高的顶
- df_day.loc[x, 'HL'] = 'H'
- # 产生信号,进入hlfx_pool
- if x == len(df_day.index) - 1:
- Trading_signals = 2
- # 获得MACD,判断MACD判断背驰
- x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
- data_temp.loc[x, 'macd']
- m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
- data_temp.loc[m, 'macd']
- # MACD顶背驰
- if x_macd_dif < m_macd_dif:
- df_day.loc[x, 'HL'] = 'HH'
- break
- break
- m = m - 1
- if m == 0:
- df_day.loc[x, 'HL'] = 'H'
- else:
- df_day.loc[x, 'HL'] = '-'
- df_temp = df_day[['time', 'HL']]
- return df_temp, Trading_signals
- def ma(df, num):
- i = -1 * (5 + num)
- try:
- if i == -5:
- ma_num = sum(df['close'][i:])/5
- else:
- ma_num = (sum(df['close'][i:(i+5)]))/5
- # print(df['close'][i:(i+5)])
- except:
- return 9999999
- else:
- return ma_num
- def cont(data):
- # print(dt.now())
- code = '127045.SZ'
- dd = xtdata.get_market_data(field_list=[], stock_list=[code], period='tick', start_time='20230101', end_time='',
- count=-1, dividend_type='none', fill_data=True)
- # print(dt.now(), type(dd), dd[code].shape)
- # print(dd[code]['time'][-10:-1], '\n', dd[code]['lastPrice'][-10:-1], '\n', dd[code]['open'][-10:-1], '\n', dd[code]['high'][-10:-1])
- # print(dd[code], '\n'*2)
- dic = {'time': dd[code]['time'],
- 'open': dd[code]['open'],
- 'close': dd[code]['lastPrice'],
- 'high': dd[code]['high'],
- 'low': dd[code]['low']}
- df_day = pd.DataFrame(dic)
- df_day['time'] = df_day['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
- ma_now = ma(df_day, 0)
- ma_1 = ma(df_day, 1)
- ma_2 = ma(df_day, 2)
- ma_3 = ma(df_day,3)
- # print(ma_now,ma_1,ma_2)
- positions = xt_trader.query_stock_positions(acc)
- positions_dict = {positions[x].stock_code: positions[x].can_use_volume for x in range(0, len(positions))}
- # print(positions_dict)
- if code in positions_dict and positions_dict[code] != 0:
- if df_day['close'].iloc[-1] < ma_now or ma_now < ma_1:
- order_id = xt_trader.order_stock(acc, code, xtconstant.STOCK_SELL, positions_dict[code],
- xtconstant.LATEST_PRICE, 0, '可转债', '滞涨')
- print(f'可转债卖出_{order_id},成交价格:{xtconstant.LATEST_PRICE}')
- # if df_day['close'].iloc[-1] > ma_now > ma_1 and ma_1 < ma_2 <= ma_3 and df_day['close'].iloc[-2] < ma_1:
- # print('buy:', df_day['time'].iloc[-1], df_day['close'].iloc[-1], ma_now)
- # order_id = xt_trader.order_stock(acc, code, xtconstant.STOCK_BUY, 10, xtconstant.LATEST_PRICE,
- # 0, '可转债', '跳多MA5')
- # print(f'可转债Buy——{order_id},成交价格:{xtconstant.LATEST_PRICE}')
- if df_day['close'].iloc[-1] > df_day['close'].iloc[-2] and df_day['close'].iloc[-4] >df_day['close'].iloc[-3] > df_day['close'].iloc[-2]:
- print('buy:', df_day['time'].iloc[-1], df_day['close'].iloc[-1], ma_now)
- order_id = xt_trader.order_stock(acc, code, xtconstant.STOCK_BUY, 10, xtconstant.LATEST_PRICE, 0, '可转债', 'tick底分型')
- print(f'可转债Buy——{order_id},成交价格:{xtconstant.LATEST_PRICE}')
- # df_day = get_macd_data(df_day)
- # df_day, Trading_signals= get_hlfx(df_day)
- # print(df_day)
- if dic ==1:
- pass
- elif dic ==1:
- pass
- def dump_single_code_tick():
- # 导出单个转债的tick数据
- code='127045'
- start_date = '20230101'
- end_date = '202304010'
- post_fix = 'SZ' if code.startswith('12') else 'SH'
- code = '{}.{}'.format(code,post_fix)
- print(code)
- filename = '{}'.format(code)
- df = get_tick(code, start_date, end_date)
- print(df)
- if __name__ == '__main__':
- # 指定客户端所在路径
- path = r'c:\\qmt\\userdata_mini'
- # 生成session id 整数类型 同时运行的策略不能重复
- session_id = int(time.time())
- xt_trader = XtQuantTrader(path, session_id)
- # 创建资金账号为 800068 的证券账号对象
- acc = StockAccount('920000207040', 'SECURITY')
- # 创建交易回调类对象,并声明接收回调
- callback = MyXtQuantTraderCallback()
- xt_trader.register_callback(callback)
- # 启动交易线程
- xt_trader.start()
- # 建立交易连接,返回0表示连接成功
- connect_result = xt_trader.connect()
- print('建立交易连接,返回0表示连接成功', connect_result)
- # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
- subscribe_result = xt_trader.subscribe(acc)
- print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
- code = '127045.SZ'
- st = dt.now()
- stocks = xtdata.get_stock_list_in_sector('沪深A股')
- xtdata.subscribe_quote(code, 'tick', start_time='', end_time='', count=1000, callback=cont)
- # xtdata.subscribe_whole_quote(stocks,callback=cont)
- # dd = xtdata.get_market_data(field_list=[], stock_list=[code], period='tick', start_time='20230101', end_time='',
- # count=-1,
- # dividend_type='none', fill_data=True)
- # print(positions_dict[code])
- # print(type(dd), type(dd[code]), type(dd[code][-1]), dd[code], dd)
- # print(dd[code]['lastPrice'])
- # dd[code]['time'] = dd[code]['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
- # print(type(dd[code]['time']), dd[code]['time'])
- # print('________','\n')
- # print(type(list(dd[code]['time'])))
- # print('________')
- # print(dd[code])
- xtdata.run()
- # dump_single_code_tick()
- et = dt.now()
- print(et-st)
|