123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- # coding:utf-8
- # from jqdatasdk import *
- import pandas as pd
- import pymysql
- from sqlalchemy import create_engine, text
- import threading
- from datetime import datetime as dt
- import datetime
- from jqdatasdk.technical_analysis import *
- from xtquant import xtdata, xtconstant
- from xtquant.xttype import StockAccount
- from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
- import time
- import math
- import multiprocessing as mp
- import os
- import psutil
- import traceback
- from apscheduler.schedulers.blocking import BlockingScheduler
- import sys
- import gc
- # 原始版本
- # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
- # auth('18521506014', 'Abc123!@#')
- # stocks = list(get_all_securities(['stock'], date=dt.today().strftime('%Y-%m-%d')).index)
- # stocks = stocks[0:200]
- pd.set_option('display.max_columns', None) # 设置显示最大行
- fre = '1d'
- class MyXtQuantTraderCallback(XtQuantTraderCallback):
- def on_disconnected(self):
- """
- 连接断开
- :return:
- """
- print(datetime.datetime.now(), '连接断开回调')
- def on_stock_order(self, order):
- """
- 委托回报推送
- :param order: XtOrder对象
- :return:
- """
- print(datetime.datetime.now(), '委托回调', order.order_remark)
- def on_stock_trade(self, trade):
- """
- 成交变动推送
- :param trade: XtTrade对象
- :return:
- """
- print(datetime.datetime.now(), '成交回调', trade.order_remark)
- def on_order_error(self, order_error):
- """
- 委托失败推送
- :param order_error:XtOrderError 对象
- :return:
- """
- # print("on order_error callback")
- # print(order_error.order_id, order_error.error_id, order_error.error_msg)
- print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
- def on_cancel_error(self, cancel_error):
- """
- 撤单失败推送
- :param cancel_error: XtCancelError 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def on_order_stock_async_response(self, response):
- """
- 异步下单回报推送
- :param response: XtOrderResponse 对象
- :return:
- """
- print(f"异步委托回调 {response.order_remark}")
- def on_cancel_order_stock_async_response(self, response):
- """
- :param response: XtCancelOrderResponse 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def on_account_status(self, status):
- """
- :param response: XtAccountStatus 对象
- :return:
- """
- print(datetime.datetime.now(), sys._getframe().f_code.co_name)
- def err_call_back(err):
- print(f'问题在这里~ error:{str(err)}')
- traceback.print_exc()
- def run(seq):
- mor = datetime.datetime.strptime(
- str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
- afternoon = datetime.datetime.strptime(
- str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
- mor_1 = datetime.datetime.strptime(
- str(dt.now().date()) + '11:10', '%Y-%m-%d%H:%M')
- """阻塞线程接收行情回调"""
- import time
- client = xtdata.get_client()
- while True:
- now_date = dt.now()
- if not client.is_connected():
- xtdata.unsubscribe_quote(seq)
- raise Exception('行情服务连接断开')
- # if mor < dt.now() < mor_1:
- # xtdata.unsubscribe_quote(seq)
- # print(f'现在时间:{dt.now()},已休市')
- # sys.exit()
- # break
- # return 0
- elif dt.now() > afternoon:
- xtdata.unsubscribe_quote(seq)
- print(f'现在时间:{dt.now()},已收盘')
- sys.exit()
- break
- return
- def hlfx(stock_list, data):
- # stock_list = list(data.keys())
- # print(f'def-->hlfx, MyPid is {os.getpid()}, 本次我需要计算{len(stock_list)},now is {dt.now()}')
- # 获得hlfx_pool池子
- engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
- pool_size=100, pool_recycle=60, max_overflow=50, pool_timeout=60)
- results = []
- results.extend(pd.read_sql_query(text(
- 'select value from `%s` order by `index` desc limit 10' % fre), engine_hlfx_pool.connect()).iloc[0, 0].split(","))
- # print(f'本次hlfx_pool有{len(results)}个个股')
- engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
- pool_size=100, pool_recycle=60, max_overflow=50, pool_timeout=60)
- for qmt_stock in stock_list:
- # 读取qmt_stocks_whole表-前复权-信息
- try:
- df_day = pd.read_sql_query(text(
- 'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, '
- 'dif, dea, macd, HL from `%s_%s`' % (qmt_stock, fre)), engine_stock.connect())
- df_day.columns = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd', 'HL']
- except BaseException as e:
- print(qmt_stock, '未能读取!')
- pass
- else:
- # 获得最新价格信息
- get_price = data[qmt_stock]
- # print(get_price)
- # 调整time时间格式
- get_price['time'] = dt.fromtimestamp(get_price['time'] / 1000.0)
- # print('成功判定', get_price['time'])
- # 先处理去包含
- # 不包含
- if (df_day.iloc[-1, 3] > get_price['high']
- and df_day.iloc[-1, 4] > get_price['low']) \
- or (df_day.iloc[-1, 3] < get_price['high']
- and df_day.iloc[-1, 4] < get_price['low']):
- # print('lalallala', get_price['open'], get_price['lastPrice'], get_price['high'],
- # get_price['low'], get_price['volume'], get_price['amount'])
- qmt_df = pd.DataFrame(data=[[get_price['time'], get_price['open'], get_price['lastPrice'],
- get_price['high'], get_price['low'], get_price['volume'],
- get_price['amount']]],
- columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount'])
- # print('qmt_______', qmt_df)
- df_day = pd.concat([df_day, qmt_df], ignore_index=True)
- # print('不包含,合并完成', df_day)
- # 包含
- else:
- if len(df_day) > 2:
- # 左高,下降
- if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
- df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], get_price['high'])
- df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], get_price['low'])
- # 右高,上升
- else:
- df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], get_price['high'])
- df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], get_price['low'])
- # print('包含', df_day)
- # 数合并完成,确认df_day
- # print(df_day)
- # 寻找顶底分型
- if len(df_day) > 2:
- x = len(df_day.index)-1
- m = x - 1
- # 底
- if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (
- df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
- df_day.loc[x, 'HL'] = 'L*'
- # 判断底的性质
- while m:
- if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
- if (x - m) > 3:
- # 成笔——>L
- df_day.loc[x, 'HL'] = 'L'
- break
- elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
- if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
- # pool_list.append(qmt_stock)
- # 获得MACD,判断MACD判断背驰
- x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
- df_day.loc[x, 'macd']
- m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
- df_day.loc[m, 'macd']
- # 背驰底->LL
- if m_macd_dif < x_macd_dif:
- df_day.loc[x, 'HL'] = 'LL'
- # 产生信号,进入hlfx_pool
- results.append(qmt_stock)
- break
- # 前一个为底更高,且中间不存在更低的底
- else:
- df_day.loc[x, 'HL'] = 'L'
- # 产生信号,进入hlfx_pool
- break
- m = m - 1
- if m == 0:
- df_day.loc[x, 'HL'] = 'L'
- results.append(qmt_stock)
- # 顶
- elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
- df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in results):
- df_day.loc[x, 'HL'] = 'H*'
- while m:
- if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
- if x - m > 3:
- # 成笔->H
- df_day.loc[x, 'HL'] = 'H'
- # 产生卖出信号,进入hlfx_pool
- results.remove(qmt_stock)
- break
- elif df_day.loc[m, 'HL'] in ['H','HH', 'H*']:
- if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
- # 获得MACD,判断MACD判断背驰
- x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
- df_day.loc[x, 'macd']
- m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
- df_day.loc[m, 'macd']
- # MACD顶背驰
- if x_macd_dif < m_macd_dif:
- df_day.loc[x, 'HL'] = 'HH'
- # 产生卖出信号,进入hlfx_pool
- results.remove(qmt_stock)
- break
- # 前一个为顶,且中间存在不包含 or 更高的顶
- else:
- df_day.loc[x, 'HL'] = 'H'
- # 产生卖出信号,进入hlfx_pool
- results.remove(qmt_stock)
- break
- m = m - 1
- if m == 0:
- df_day.loc[x, 'HL'] = 'H'
- results.remove(qmt_stock)
- db_pool = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='hlfx_pool')
- cursor_pool = db_pool.cursor()
- results_list = ','.join(set(results))
- sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
- cursor_pool.execute(sql)
- db_pool.commit()
- print(f'{dt.now()} 新的results有{len(set(results))}, \n {set(results)}')
- engine_stock.dispose()
- engine_hlfx_pool.dispose()
- def prepare(data):
- print(dt.now())
- stock_list = list(data.keys())
- if len(data.keys()) >= 12:
- cpu_count = 12
- else:
- cpu_count = len(data.keys())
- step = math.ceil(len(stock_list) / cpu_count)
- to_hlfx_list = []
- for i in range(0, len(stock_list), step):
- to_hlfx_list.append([x for x in stock_list[i:i + step]])
- pool = mp.Pool(processes=int(cpu_count/2))
- for m in range(len(to_hlfx_list)):
- pool.apply_async(func=hlfx,
- args=(to_hlfx_list[m], data), error_callback=err_call_back)
- pool.close()
- pool.join()
- def bridge():
- print(f'bridge is {os.getpid()}, now is {dt.now()},开盘了')
- stocks = xtdata.get_stock_list_in_sector('沪深A股')
- seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
- run(seq)
- def job_func():
- print(f"Job started at {dt.now()}")
- # 创建子进程
- p = mp.Process(target=bridge)
- # 启动子进程
- p.start()
- # 等待子进程结束
- p.join()
- print(f"Job finished at {dt.now()}")
- if __name__ == '__main__':
- print(f'总进程pid:{os.getpid()}')
- mp.freeze_support()
- pus = psutil.Process()
- # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
- path = r'c:\\qmt\\userdata_mini'
- # 生成session id 整数类型 同时运行的策略不能重复
- session_id = int(time.time())
- xt_trader = XtQuantTrader(path, session_id)
- # 创建资金账号为 800068 的证券账号对象
- acc = StockAccount('920000207040', 'SECURITY')
- # 创建交易回调类对象,并声明接收回调
- callback = MyXtQuantTraderCallback()
- xt_trader.register_callback(callback)
- # 启动交易线程
- xt_trader.start()
- # 建立交易连接,返回0表示连接成功
- connect_result = xt_trader.connect()
- print('建立交易连接,返回0表示连接成功', connect_result)
- # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
- subscribe_result = xt_trader.subscribe(acc)
- print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
- job_func()
- scheduler = BlockingScheduler()
- scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='25',
- timezone="Asia/Shanghai", max_instances=5)
- # # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='00',
- # # timezone="Asia/Shanghai")
- try:
- scheduler.start()
- except (KeyboardInterrupt, SystemExit):
- pass
|