| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 | 
							- # coding:utf-8
 
- from jqdatasdk import *
 
- import pandas as pd
 
- import pymysql
 
- from sqlalchemy import create_engine
 
- import threading
 
- from datetime import datetime as dt
 
- from jqdatasdk.technical_analysis import *
 
- from xtquant import xtdata, xtconstant
 
- from xtquant.xttype import StockAccount
 
- from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
 
- import time
 
- import math
 
- import multiprocessing as mp
 
- import os
 
- #原始版本
 
- # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
 
- # auth('18521506014', 'Abc123!@#')
 
- # stocks = list(get_all_securities(['stock'], date=dt.today().strftime('%Y-%m-%d')).index)
 
- # stocks = stocks[0:200]
 
- pd.set_option('display.max_columns', None) # 设置显示最大行
 
- fre = '1d'
 
- class MyXtQuantTraderCallback(XtQuantTraderCallback):
 
-     def on_disconnected(self):
 
-         """
 
-         连接断开
 
-         :return:
 
-         """
 
-         print(datetime.datetime.now(), '连接断开回调')
 
-     def on_stock_order(self, order):
 
-         """
 
-         委托回报推送
 
-         :param order: XtOrder对象
 
-         :return:
 
-         """
 
-         print(datetime.datetime.now(), '委托回调', order.order_remark)
 
-     def on_stock_trade(self, trade):
 
-         """
 
-         成交变动推送
 
-         :param trade: XtTrade对象
 
-         :return:
 
-         """
 
-         print(datetime.datetime.now(), '成交回调', trade.order_remark)
 
-     def on_order_error(self, order_error):
 
-         """
 
-         委托失败推送
 
-         :param order_error:XtOrderError 对象
 
-         :return:
 
-         """
 
-         # print("on order_error callback")
 
-         # print(order_error.order_id, order_error.error_id, order_error.error_msg)
 
-         print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
 
-     def on_cancel_error(self, cancel_error):
 
-         """
 
-         撤单失败推送
 
-         :param cancel_error: XtCancelError 对象
 
-         :return:
 
-         """
 
-         print(datetime.datetime.now(), sys._getframe().f_code.co_name)
 
-     def on_order_stock_async_response(self, response):
 
-         """
 
-         异步下单回报推送
 
-         :param response: XtOrderResponse 对象
 
-         :return:
 
-         """
 
-         print(f"异步委托回调 {response.order_remark}")
 
-     def on_cancel_order_stock_async_response(self, response):
 
-         """
 
-         :param response: XtCancelOrderResponse 对象
 
-         :return:
 
-         """
 
-         print(datetime.datetime.now(), sys._getframe().f_code.co_name)
 
-     def on_account_status(self, status):
 
-         """
 
-         :param response: XtAccountStatus 对象
 
-         :return:
 
-         """
 
-         print(datetime.datetime.now(), sys._getframe().f_code.co_name)
 
- def err_call_back(err):
 
-     print(f'问题在这里~ error:{str(err)}')
 
- def hlfx(data, stocks, pool_list):
 
-     engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
 
-     print(stocks)
 
-     for qmt_stock in stocks:
 
-         # 读取qmt_stocks_whole表-前复权-信息
 
-         try:
 
-             df_day = pd.read_sql_query(
 
-                 'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, dif, dea, macd,HL from `%s_%s`'
 
-                 % (qmt_stock, fre), engine_stock)
 
-             df_day.columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd','HL']
 
-         except BaseException:
 
-             print(qmt_stock)
 
-             pass
 
-         else:
 
-             # 获得最新价格信息
 
-             get_price = data[qmt_stock]
 
-             # 调整time时间格式
 
-             get_price['time'] = dt.fromtimestamp(get_price['time'] / 1000.0)
 
-             # print('成功判定', get_price['time'])
 
-             # 先处理去包含
 
-             # 不包含
 
-             if (df_day.iloc[-1, 3] > get_price['high']
 
-                 and df_day.iloc[-1, 4] > get_price['low']) \
 
-                     or (df_day.iloc[-1, 3] < get_price['high']
 
-                         and df_day.iloc[-1, 4] < get_price['low']):
 
-                 # print('lalallala', get_price['open'], get_price['lastPrice'], get_price['high'],
 
-                 #                             get_price['low'], get_price['volume'], get_price['amount'])
 
-                 qmt_df = pd.DataFrame(data=[[get_price['time'], get_price['open'], get_price['lastPrice'], get_price['high'],
 
-                                             get_price['low'], get_price['volume'], get_price['amount']]],
 
-                                       columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount'])
 
-                 # print('qmt_______', qmt_df)
 
-                 df_day = pd.concat([df_day, qmt_df], ignore_index=True)
 
-                 # print('不包含,合并完成', df_day)
 
-             # 包含
 
-             else:
 
-                 # 左高,下降
 
-                 if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
 
-                     df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], get_price['high'])
 
-                     df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], get_price['low'])
 
-                 # 右高,上升
 
-                 else:
 
-                     df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], get_price['high'])
 
-                     df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], get_price['low'])
 
-                 # print('包含', df_day)
 
-             # 数合并完成,确认df_day
 
-             # print(df_day)
 
-             # 寻找顶底分型
 
-             x = len(df_day.index)-1
 
-             m = x - 1
 
-             # 底
 
-             if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (
 
-                     df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
 
-                 df_day.loc[x, 'HL'] = 'L*'
 
-                 # 判断底的性质
 
-                 while m:
 
-                     if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
 
-                         if (x - m) > 3:
 
-                             # 成笔——>L
 
-                             df_day.loc[x, 'HL'] = 'L'
 
-                     elif df_day.loc[m, 'HL'] == 'L':
 
-                         if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
 
-                             # pool_list.append(qmt_stock)
 
-                             # 获得MACD,判断MACD判断背驰
 
-                             x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
 
-                             df_day.loc[x, 'macd']
 
-                             m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
 
-                             df_day.loc[m, 'macd']
 
-                             # 背驰底->LL
 
-                             if m_macd_dif < x_macd_dif:
 
-                                 df_day.loc[x, 'HL'] = 'LL'
 
-                                 # 产生信号,进入hlfx_pool
 
-                                 pool_list.append(qmt_stock)
 
-                             # 前一个为底更高,且中间不存在更低的底
 
-                             else:
 
-                                 df_day.loc[x, 'HL'] = 'L'
 
-                                 # 产生信号,进入hlfx_pool
 
-                                 pool_list.append(qmt_stock)
 
-                             break
 
-                         break
 
-                     m = m - 1
 
-                     if m == 0:
 
-                         df_day.loc[x, 'HL'] = 'L'
 
-             # 顶
 
-             elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
 
-                     df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in pool_list):
 
-                 df_day.loc[x, 'HL'] = 'H*'
 
-                 while m:
 
-                     if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
 
-                         if x - m > 3:
 
-                             # 成笔->H
 
-                             df_day.loc[x, 'HL'] = 'H'
 
-                             # 产生卖出信号,进入hlfx_pool
 
-                             pool_list.remove(qmt_stock)
 
-                             break
 
-                     elif (df_day.loc[m, 'HL'] == 'H'):
 
-                         if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
 
-                             # 获得MACD,判断MACD判断背驰
 
-                             x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
 
-                                 df_day.loc[x, 'macd']
 
-                             m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
 
-                                 df_day.loc[m, 'macd']
 
-                             # MACD顶背驰
 
-                             if x_macd_dif < m_macd_dif:
 
-                                 df_day.loc[x, 'HL'] = 'HH'
 
-                                 # 产生卖出信号,进入hlfx_pool
 
-                                 pool_list.remove(qmt_stock)
 
-                             # 前一个为顶,且中间存在不包含 or 更高的顶
 
-                             else:
 
-                                 df_day.loc[x, 'HL'] = 'H'
 
-                                 # 产生卖出信号,进入hlfx_pool
 
-                                 pool_list.remove(qmt_stock)
 
-                             break
 
-                         break
 
-                     m = m - 1
 
-                     if m == 0:
 
-                         df_day.loc[x, 'HL'] = 'H'
 
- def bridge(data):
 
-     # 连接数据库
 
-     '''
 
-     db = pymysql.connect(host='localhost',
 
-                          user='root',
 
-                          port=3307,
 
-                          password='r6kEwqWU9!v3',
 
-                          database='hlfx')
 
-     cursor = db.cursor()
 
-     cursor.execute("show tables like '%%%s%%' " % fre)
 
-     pool_list = [tuple[0] for tuple in cursor.fetchall()]
 
-     print('取得 table_list %s' % fre)
 
-     '''
 
-     '''
 
-     
 
-     1.获取hlfx_pool中隔夜的标的
 
-     2.将本此的data均分,给到进程池
 
-     3.将data总数据、分配的任务stocklist、hlfx_pool 送入realtime_hlfx中进行计算
 
-     4.将实时刷新的hlfx存入hlfx_pool 以过滤出现顶分型的标的
 
-     
 
-     '''
 
-     # 获得hlfx_pool池子
 
-     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
 
-     results = mp.Manager().list()
 
-     results.extend(pd.read_sql_query(
 
-         'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
 
-     print(results)
 
-     to_hlfx_list = []
 
-     keys = list(data.keys())
 
-     step = math.ceil(len(keys) / (mp.cpu_count()/2))
 
-     for i in range(0, len(keys), step):
 
-         to_hlfx_list.append([x for x in keys[i:i+step]])
 
-     pool = mp.Pool(processes=int(mp.cpu_count()/2))
 
-     for m in range(int(mp.cpu_count()/2)):
 
-         pool.apply_async(func=hlfx,
 
-                          args=(data, to_hlfx_list[m], results,), error_callback=err_call_back)
 
-     pool.close()
 
-     pool.join()
 
-     db_pool = pymysql.connect(host='localhost',
 
-                               user='root',
 
-                               port=3307,
 
-                               password='r6kEwqWU9!v3',
 
-                               database='hlfx_pool')
 
-     cursor_pool = db_pool.cursor()
 
-     print(set(results))
 
-     results_list = ','.join(set(results))
 
-     sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
 
-     cursor_pool.execute(sql)
 
-     db_pool.commit()
 
-     print(f'{dt.now()}写入新的results,hlfx_pool更新')
 
-     # hlfx(data, engine_stock, engine_hlfx)
 
-     pass
 
- def prepare():
 
-     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
 
-     results = pd.read_sql_query(
 
-         'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(",")
 
-     results = [x.replace('XSHG', 'SH').replace('XSHE', 'SZ') for x in results]
 
-     print('数据库读取,并转化后缀格式', len(results))
 
-     # print(results[0:10])
 
-     return results
 
- if __name__ == '__main__':
 
-     path = r'c:\\qmt\\userdata_mini'
 
-     # 生成session id 整数类型 同时运行的策略不能重复
 
-     session_id = int(time.time())
 
-     xt_trader = XtQuantTrader(path, session_id)
 
-     # 创建资金账号为 800068 的证券账号对象
 
-     acc = StockAccount('920000207040', 'SECURITY')
 
-     # 创建交易回调类对象,并声明接收回调
 
-     callback = MyXtQuantTraderCallback()
 
-     xt_trader.register_callback(callback)
 
-     # 启动交易线程
 
-     xt_trader.start()
 
-     # 建立交易连接,返回0表示连接成功
 
-     connect_result = xt_trader.connect()
 
-     print('建立交易连接,返回0表示连接成功', connect_result)
 
-     # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
 
-     subscribe_result = xt_trader.subscribe(acc)
 
-     print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 
-     stocks = xtdata.get_stock_list_in_sector('沪深A股')
 
-     xtdata.subscribe_whole_quote(stocks, callback=bridge)
 
-     xtdata.run()
 
-     start = dt.now()
 
-     while True:
 
-         now_date = dt.now()
 
-         date_morning_begin = now_date.replace(hour=9, minute=25, second=0)
 
-         date_morning_end = now_date.replace(hour=11, minute=31, second=0)
 
-         date_afternooe_begin = now_date.replace(hour=13, minute=0, second=0)
 
-         date_afternooe_end = now_date.replace(hour=15, minute=0, second=0)
 
-         # print(now_date,date_morning_begin,date_morning_end,date_afternooe_begin,date_afternooe_end)
 
-         # if date_morning_begin < now_date < date_afternooe_end:
 
-         if True:
 
-             for fre in ['1d']:
 
-                 start = dt.now()
 
-                 stk = locals()
 
-                 thd = threading.local()
 
-                 # 进程准备
 
-                 step = 400
 
-                 thread_list = []
 
-                 engine_stock = []
 
-                 engine_hlfx = []
 
-                 times_engine = 0
 
-                 df = get_bars(stocks, count=5, unit=fre,
 
-                               fields=['date', 'open', 'close', 'high', 'low', 'volume', 'money'], include_now=True, df=True)
 
-                 print(df, type(df))
 
-                 print(df.loc['603566.XSHG'])
 
-                 print(dt.now(), 'get_bars 成功')
 
-                 exit()
 
-                 for i in range(0, len(stocks), step):
 
-                     engine_stock.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/stocks?charset=utf8'))
 
-                     engine_hlfx.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8'))
 
-                     thread = threading.Thread(target=hlfx, args=(stocks[i:i + step], engine_stock[times_engine], engine_hlfx[times_engine]))
 
-                     times_engine = times_engine + 1
 
-                     thread.start()
 
-                     thread_list.append(thread)
 
-                 for thread in thread_list:
 
-                     thread.join()
 
-                 db.close()
 
-                 time = dt.now().strftime('%Y-%m-%d %H:%M:%S')
 
-                 results_list =','.join(set(results))
 
-                 print(set(results))
 
-                 sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
 
-                 cursor_pool.execute(sql)
 
-                 db_pool.commit()
 
-                 print(fre, '\n', '做多:', len(set(results)),  set(results))
 
-                 print('做空', len(set(results_short)), set(results_short))
 
-                 end= dt.now()
 
-                 print('总时长:', (end - start).seconds)
 
-         elif now_date>date_afternooe_end:
 
-             pass
 
-             # print("HLFX_收盘了",now_date)
 
-             # break
 
 
  |