# coding:utf-8 from jqdatasdk import * import pandas as pd import pymysql from sqlalchemy import create_engine import threading from datetime import datetime as dt from jqdatasdk.technical_analysis import * from xtquant import xtdata, xtconstant from xtquant.xttype import StockAccount from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback import time import math import multiprocessing as mp import os #原始版本 # auth('18616891214', 'Ea?*7f68nD.dafcW34d!') # auth('18521506014', 'Abc123!@#') # stocks = list(get_all_securities(['stock'], date=dt.today().strftime('%Y-%m-%d')).index) # stocks = stocks[0:200] pd.set_option('display.max_columns', None) # 设置显示最大行 fre = '1d' class MyXtQuantTraderCallback(XtQuantTraderCallback): def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(datetime.datetime.now(), '委托回调', order.order_remark) def on_stock_trade(self, trade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ print(datetime.datetime.now(), '成交回调', trade.order_remark) def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ # print("on order_error callback") # print(order_error.order_id, order_error.error_id, order_error.error_msg) print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_cancel_error(self, cancel_error): """ 撤单失败推送 :param cancel_error: XtCancelError 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_order_stock_async_response(self, response): """ 异步下单回报推送 :param response: XtOrderResponse 对象 :return: """ print(f"异步委托回调 {response.order_remark}") def on_cancel_order_stock_async_response(self, response): """ :param response: XtCancelOrderResponse 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def err_call_back(err): print(f'问题在这里~ error:{str(err)}') def hlfx(data, stocks, pool_list): engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8') print(stocks) for qmt_stock in stocks: stock = qmt_stock.replace('SH', 'XSHG').replace('SZ', 'XSHE') # 读取qmt_stocks_whole表-前复权-信息 try: df_day = pd.read_sql_query( 'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, dif, dea, macd,HL from `%s_%s`' % (qmt_stock, fre), engine_stock) df_day.columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd','HL'] except BaseException: print(stock) pass else: # 获得最新价格信息 get_price = data[qmt_stock] # 调整time时间格式 get_price['time'] = dt.fromtimestamp(get_price['time'] / 1000.0) # print('成功判定', get_price['time']) # 先处理去包含 # 不包含 if (df_day.iloc[-1, 3] > get_price['high'] and df_day.iloc[-1, 4] > get_price['low']) \ or (df_day.iloc[-1, 3] < get_price['high'] and df_day.iloc[-1, 4] < get_price['low']): # print('lalallala', get_price['open'], get_price['lastPrice'], get_price['high'], # get_price['low'], get_price['volume'], get_price['amount']) qmt_df = pd.DataFrame(data=[[get_price['time'], get_price['open'], get_price['lastPrice'], get_price['high'], get_price['low'], get_price['volume'], get_price['amount']]], columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount']) # print('qmt_______', qmt_df) df_day = pd.concat([df_day, qmt_df], ignore_index=True) # print('不包含,合并完成', df_day) # 包含 else: # 左高,下降 if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]: df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], get_price['high']) df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], get_price['low']) # 右高,上升 else: df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], get_price['high']) df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], get_price['low']) # print('包含', df_day) # 数合并完成,确认df_day # print(df_day) # 寻找顶底分型 x = len(df_day.index)-1 m = x - 1 # 底 if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and ( df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])): df_day.loc[x, 'HL'] = 'L*' # 判断底的性质 while m: if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']: if (x - m) > 3: # 成笔——>L df_day.loc[x, 'HL'] = 'L' elif df_day.loc[m, 'HL'] == 'L': if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']: # pool_list.append(qmt_stock) # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \ df_day.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \ df_day.loc[m, 'macd'] # 背驰底->LL if m_macd_dif < x_macd_dif: df_day.loc[x, 'HL'] = 'LL' # 产生信号,进入hlfx_pool pool_list.append(qmt_stock) # 前一个为底更高,且中间不存在更低的底 else: df_day.loc[x, 'HL'] = 'L' # 产生信号,进入hlfx_pool pool_list.append(qmt_stock) break break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'L' # 顶 elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and ( df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in pool_list): df_day.loc[x, 'HL'] = 'H*' while m: if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']: if x - m > 3: # 成笔->H df_day.loc[x, 'HL'] = 'H' # 产生卖出信号,进入hlfx_pool pool_list.remove(qmt_stock) break elif (df_day.loc[m, 'HL'] == 'H'): if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']: # 获得MACD,判断MACD判断背驰 x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \ df_day.loc[x, 'macd'] m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \ df_day.loc[m, 'macd'] # MACD顶背驰 if x_macd_dif < m_macd_dif: df_day.loc[x, 'HL'] = 'HH' # 产生卖出信号,进入hlfx_pool pool_list.remove(qmt_stock) # 前一个为顶,且中间存在不包含 or 更高的顶 else: df_day.loc[x, 'HL'] = 'H' # 产生卖出信号,进入hlfx_pool pool_list.remove(qmt_stock) break break m = m - 1 if m == 0: df_day.loc[x, 'HL'] = 'H' def bridge(data): # 连接数据库 ''' db = pymysql.connect(host='localhost', user='root', port=3307, password='r6kEwqWU9!v3', database='hlfx') cursor = db.cursor() cursor.execute("show tables like '%%%s%%' " % fre) pool_list = [tuple[0] for tuple in cursor.fetchall()] print('取得 table_list %s' % fre) ''' ''' 1.获取hlfx_pool中隔夜的标的 2.将本此的data均分,给到进程池 3.将data总数据、分配的任务stocklist、hlfx_pool 送入realtime_hlfx中进行计算 4.将实时刷新的hlfx存入hlfx_pool 以过滤出现顶分型的标的 ''' # 获得hlfx_pool池子 engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8') results = mp.Manager().list() results.extend(pd.read_sql_query( 'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(",")) print(results) to_hlfx_list = [] keys = list(data.keys()) step = math.ceil(len(keys) / (mp.cpu_count()/2)) for i in range(0, len(keys), step): to_hlfx_list.append([x for x in keys[i:i+step]]) pool = mp.Pool(processes=int(mp.cpu_count()/2)) for m in range(int(mp.cpu_count()/2)): pool.apply_async(func=hlfx, args=(data, to_hlfx_list[m], results,), error_callback=err_call_back) pool.close() pool.join() db_pool = pymysql.connect(host='localhost', user='root', port=3307, password='r6kEwqWU9!v3', database='hlfx_pool') cursor_pool = db_pool.cursor() print(set(results)) results_list = ','.join(set(results)) sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list) cursor_pool.execute(sql) db_pool.commit() print(f'{dt.now()}写入新的results,hlfx_pool更新') # hlfx(data, engine_stock, engine_hlfx) pass def prepare(): engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8') results = pd.read_sql_query( 'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(",") results = [x.replace('XSHG', 'SH').replace('XSHE', 'SZ') for x in results] print('数据库读取,并转化后缀格式', len(results)) # print(results[0:10]) return results if __name__ == '__main__': path = r'c:\\qmt\\userdata_mini' # 生成session id 整数类型 同时运行的策略不能重复 session_id = int(time.time()) xt_trader = XtQuantTrader(path, session_id) # 创建资金账号为 800068 的证券账号对象 acc = StockAccount('920000207040', 'SECURITY') # 创建交易回调类对象,并声明接收回调 callback = MyXtQuantTraderCallback() xt_trader.register_callback(callback) # 启动交易线程 xt_trader.start() # 建立交易连接,返回0表示连接成功 connect_result = xt_trader.connect() print('建立交易连接,返回0表示连接成功', connect_result) # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功 subscribe_result = xt_trader.subscribe(acc) print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result) stocks = xtdata.get_stock_list_in_sector('沪深A股') xtdata.subscribe_whole_quote(stocks, callback=bridge) xtdata.run() start = dt.now() while True: now_date = dt.now() date_morning_begin = now_date.replace(hour=9, minute=25, second=0) date_morning_end = now_date.replace(hour=11, minute=31, second=0) date_afternooe_begin = now_date.replace(hour=13, minute=0, second=0) date_afternooe_end = now_date.replace(hour=15, minute=0, second=0) # print(now_date,date_morning_begin,date_morning_end,date_afternooe_begin,date_afternooe_end) # if date_morning_begin < now_date < date_afternooe_end: if True: for fre in ['1d']: start = dt.now() stk = locals() thd = threading.local() # 进程准备 step = 400 thread_list = [] engine_stock = [] engine_hlfx = [] times_engine = 0 df = get_bars(stocks, count=5, unit=fre, fields=['date', 'open', 'close', 'high', 'low', 'volume', 'money'], include_now=True, df=True) print(df, type(df)) print(df.loc['603566.XSHG']) print(dt.now(), 'get_bars 成功') exit() for i in range(0, len(stocks), step): engine_stock.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/stocks?charset=utf8')) engine_hlfx.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8')) thread = threading.Thread(target=hlfx, args=(stocks[i:i + step], engine_stock[times_engine], engine_hlfx[times_engine])) times_engine = times_engine + 1 thread.start() thread_list.append(thread) for thread in thread_list: thread.join() db.close() time = dt.now().strftime('%Y-%m-%d %H:%M:%S') results_list =','.join(set(results)) print(set(results)) sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list) cursor_pool.execute(sql) db_pool.commit() print(fre, '\n', '做多:', len(set(results)), set(results)) print('做空', len(set(results_short)), set(results_short)) end= dt.now() print('总时长:', (end - start).seconds) elif now_date>date_afternooe_end: pass # print("HLFX_收盘了",now_date) # break