|  | @@ -101,6 +101,11 @@ def err_call_back(err):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def run(seq):
 | 
	
		
			
				|  |  | +    mor = datetime.datetime.strptime(
 | 
	
		
			
				|  |  | +        str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
 | 
	
		
			
				|  |  | +    afternoon = datetime.datetime.strptime(
 | 
	
		
			
				|  |  | +        str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
 | 
	
		
			
				|  |  | +    print(mor, afternoon)
 | 
	
		
			
				|  |  |      """阻塞线程接收行情回调"""
 | 
	
		
			
				|  |  |      import time
 | 
	
		
			
				|  |  |      client = xtdata.get_client()
 | 
	
	
		
			
				|  | @@ -110,15 +115,17 @@ def run(seq):
 | 
	
		
			
				|  |  |          if not client.is_connected():
 | 
	
		
			
				|  |  |              xtdata.unsubscribe_quote(seq)
 | 
	
		
			
				|  |  |              raise Exception('行情服务连接断开')
 | 
	
		
			
				|  |  | -        if now_date.replace(hour=11, minute=30, second=0) < dt.now() < now_date.replace(hour=13, minute=0, second=0):
 | 
	
		
			
				|  |  | +        if mor < dt.now():
 | 
	
		
			
				|  |  | +            xtdata.unsubscribe_quote(seq)
 | 
	
		
			
				|  |  |              print(f'现在时间:{dt.now()},已休市')
 | 
	
		
			
				|  |  |              break
 | 
	
		
			
				|  |  |              # return 0
 | 
	
		
			
				|  |  | -        elif dt.now() > now_date.replace(hour=15, minute=0, second=0):
 | 
	
		
			
				|  |  | +        elif dt.now() > afternoon:
 | 
	
		
			
				|  |  | +            xtdata.unsubscribe_quote(seq)
 | 
	
		
			
				|  |  |              print(f'现在时间:{dt.now()},已收盘')
 | 
	
		
			
				|  |  |              break
 | 
	
		
			
				|  |  |              # return 0
 | 
	
		
			
				|  |  | -    return
 | 
	
		
			
				|  |  | +    # return
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def hlfx(stock_list, data):
 | 
	
	
		
			
				|  | @@ -132,6 +139,7 @@ def hlfx(stock_list, data):
 | 
	
		
			
				|  |  |          'select value from `%s` order by `index` desc limit 10' % fre, engine_hlfx_pool).iloc[0, 0].split(","))
 | 
	
		
			
				|  |  |      print(f'本次hlfx_pool有{len(results)}个个股')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      for qmt_stock in stock_list:
 | 
	
	
		
			
				|  | @@ -169,99 +177,101 @@ def hlfx(stock_list, data):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              # 包含
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  | -                # 左高,下降
 | 
	
		
			
				|  |  | -                if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
 | 
	
		
			
				|  |  | -                    df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], get_price['high'])
 | 
	
		
			
				|  |  | -                    df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], get_price['low'])
 | 
	
		
			
				|  |  | -                # 右高,上升
 | 
	
		
			
				|  |  | -                else:
 | 
	
		
			
				|  |  | -                    df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], get_price['high'])
 | 
	
		
			
				|  |  | -                    df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], get_price['low'])
 | 
	
		
			
				|  |  | +                if len(df_day) > 2:
 | 
	
		
			
				|  |  | +                    # 左高,下降
 | 
	
		
			
				|  |  | +                    if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
 | 
	
		
			
				|  |  | +                        df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], get_price['high'])
 | 
	
		
			
				|  |  | +                        df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], get_price['low'])
 | 
	
		
			
				|  |  | +                    # 右高,上升
 | 
	
		
			
				|  |  | +                    else:
 | 
	
		
			
				|  |  | +                        df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], get_price['high'])
 | 
	
		
			
				|  |  | +                        df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], get_price['low'])
 | 
	
		
			
				|  |  |                  # print('包含', df_day)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              # 数合并完成,确认df_day
 | 
	
		
			
				|  |  |              # print(df_day)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              # 寻找顶底分型
 | 
	
		
			
				|  |  | -            x = len(df_day.index)-1
 | 
	
		
			
				|  |  | -            m = x - 1
 | 
	
		
			
				|  |  | -            # 底
 | 
	
		
			
				|  |  | -            if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (
 | 
	
		
			
				|  |  | -                    df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
 | 
	
		
			
				|  |  | -                df_day.loc[x, 'HL'] = 'L*'
 | 
	
		
			
				|  |  | -                # 判断底的性质
 | 
	
		
			
				|  |  | -                while m:
 | 
	
		
			
				|  |  | -                    if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
 | 
	
		
			
				|  |  | -                        if (x - m) > 3:
 | 
	
		
			
				|  |  | -                            # 成笔——>L
 | 
	
		
			
				|  |  | -                            df_day.loc[x, 'HL'] = 'L'
 | 
	
		
			
				|  |  | -                            break
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                    elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
 | 
	
		
			
				|  |  | -                        if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
 | 
	
		
			
				|  |  | -                            # pool_list.append(qmt_stock)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            # 获得MACD,判断MACD判断背驰
 | 
	
		
			
				|  |  | -                            x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
 | 
	
		
			
				|  |  | -                            df_day.loc[x, 'macd']
 | 
	
		
			
				|  |  | -                            m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
 | 
	
		
			
				|  |  | -                            df_day.loc[m, 'macd']
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                            # 背驰底->LL
 | 
	
		
			
				|  |  | -                            if m_macd_dif < x_macd_dif:
 | 
	
		
			
				|  |  | -                                df_day.loc[x, 'HL'] = 'LL'
 | 
	
		
			
				|  |  | -                                # 产生信号,进入hlfx_pool
 | 
	
		
			
				|  |  | -                                results.append(qmt_stock)
 | 
	
		
			
				|  |  | -                                break
 | 
	
		
			
				|  |  | -                            # 前一个为底更高,且中间不存在更低的底
 | 
	
		
			
				|  |  | -                            else:
 | 
	
		
			
				|  |  | +            if len(df_day) > 2:
 | 
	
		
			
				|  |  | +                x = len(df_day.index)-1
 | 
	
		
			
				|  |  | +                m = x - 1
 | 
	
		
			
				|  |  | +                # 底
 | 
	
		
			
				|  |  | +                if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (
 | 
	
		
			
				|  |  | +                        df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
 | 
	
		
			
				|  |  | +                    df_day.loc[x, 'HL'] = 'L*'
 | 
	
		
			
				|  |  | +                    # 判断底的性质
 | 
	
		
			
				|  |  | +                    while m:
 | 
	
		
			
				|  |  | +                        if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
 | 
	
		
			
				|  |  | +                            if (x - m) > 3:
 | 
	
		
			
				|  |  | +                                # 成笔——>L
 | 
	
		
			
				|  |  |                                  df_day.loc[x, 'HL'] = 'L'
 | 
	
		
			
				|  |  | -                                # 产生信号,进入hlfx_pool
 | 
	
		
			
				|  |  | -                        break
 | 
	
		
			
				|  |  | -                    m = m - 1
 | 
	
		
			
				|  |  | -                    if m == 0:
 | 
	
		
			
				|  |  | -                        df_day.loc[x, 'HL'] = 'L'
 | 
	
		
			
				|  |  | -                        results.append(qmt_stock)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            # 顶
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
 | 
	
		
			
				|  |  | -                    df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in results):
 | 
	
		
			
				|  |  | -                df_day.loc[x, 'HL'] = 'H*'
 | 
	
		
			
				|  |  | -                while m:
 | 
	
		
			
				|  |  | -                    if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
 | 
	
		
			
				|  |  | -                        if x - m > 3:
 | 
	
		
			
				|  |  | -                            # 成笔->H
 | 
	
		
			
				|  |  | -                            df_day.loc[x, 'HL'] = 'H'
 | 
	
		
			
				|  |  | -                            # 产生卖出信号,进入hlfx_pool
 | 
	
		
			
				|  |  | -                            results.remove(qmt_stock)
 | 
	
		
			
				|  |  | -                            break
 | 
	
		
			
				|  |  | +                                break
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                    elif df_day.loc[m, 'HL'] in ['H','HH', 'H*']:
 | 
	
		
			
				|  |  | -                        if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
 | 
	
		
			
				|  |  | -                            # 获得MACD,判断MACD判断背驰
 | 
	
		
			
				|  |  | -                            x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
 | 
	
		
			
				|  |  | +                        elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
 | 
	
		
			
				|  |  | +                            if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
 | 
	
		
			
				|  |  | +                                # pool_list.append(qmt_stock)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                                # 获得MACD,判断MACD判断背驰
 | 
	
		
			
				|  |  | +                                x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
 | 
	
		
			
				|  |  |                                  df_day.loc[x, 'macd']
 | 
	
		
			
				|  |  | -                            m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
 | 
	
		
			
				|  |  | +                                m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
 | 
	
		
			
				|  |  |                                  df_day.loc[m, 'macd']
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                            # MACD顶背驰
 | 
	
		
			
				|  |  | -                            if x_macd_dif < m_macd_dif:
 | 
	
		
			
				|  |  | -                                df_day.loc[x, 'HL'] = 'HH'
 | 
	
		
			
				|  |  | -                                # 产生卖出信号,进入hlfx_pool
 | 
	
		
			
				|  |  | -                                results.remove(qmt_stock)
 | 
	
		
			
				|  |  | -                                break
 | 
	
		
			
				|  |  | +                                # 背驰底->LL
 | 
	
		
			
				|  |  | +                                if m_macd_dif < x_macd_dif:
 | 
	
		
			
				|  |  | +                                    df_day.loc[x, 'HL'] = 'LL'
 | 
	
		
			
				|  |  | +                                    # 产生信号,进入hlfx_pool
 | 
	
		
			
				|  |  | +                                    results.append(qmt_stock)
 | 
	
		
			
				|  |  | +                                    break
 | 
	
		
			
				|  |  | +                                # 前一个为底更高,且中间不存在更低的底
 | 
	
		
			
				|  |  | +                                else:
 | 
	
		
			
				|  |  | +                                    df_day.loc[x, 'HL'] = 'L'
 | 
	
		
			
				|  |  | +                                    # 产生信号,进入hlfx_pool
 | 
	
		
			
				|  |  | +                            break
 | 
	
		
			
				|  |  | +                        m = m - 1
 | 
	
		
			
				|  |  | +                        if m == 0:
 | 
	
		
			
				|  |  | +                            df_day.loc[x, 'HL'] = 'L'
 | 
	
		
			
				|  |  | +                            results.append(qmt_stock)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                # 顶
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                            # 前一个为顶,且中间存在不包含 or 更高的顶
 | 
	
		
			
				|  |  | -                            else:
 | 
	
		
			
				|  |  | +                elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
 | 
	
		
			
				|  |  | +                        df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in results):
 | 
	
		
			
				|  |  | +                    df_day.loc[x, 'HL'] = 'H*'
 | 
	
		
			
				|  |  | +                    while m:
 | 
	
		
			
				|  |  | +                        if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
 | 
	
		
			
				|  |  | +                            if x - m > 3:
 | 
	
		
			
				|  |  | +                                # 成笔->H
 | 
	
		
			
				|  |  |                                  df_day.loc[x, 'HL'] = 'H'
 | 
	
		
			
				|  |  |                                  # 产生卖出信号,进入hlfx_pool
 | 
	
		
			
				|  |  |                                  results.remove(qmt_stock)
 | 
	
		
			
				|  |  | -                        break
 | 
	
		
			
				|  |  | -                    m = m - 1
 | 
	
		
			
				|  |  | -                    if m == 0:
 | 
	
		
			
				|  |  | -                        df_day.loc[x, 'HL'] = 'H'
 | 
	
		
			
				|  |  | -                        results.remove(qmt_stock)
 | 
	
		
			
				|  |  | +                                break
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        elif df_day.loc[m, 'HL'] in ['H','HH', 'H*']:
 | 
	
		
			
				|  |  | +                            if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
 | 
	
		
			
				|  |  | +                                # 获得MACD,判断MACD判断背驰
 | 
	
		
			
				|  |  | +                                x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
 | 
	
		
			
				|  |  | +                                    df_day.loc[x, 'macd']
 | 
	
		
			
				|  |  | +                                m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
 | 
	
		
			
				|  |  | +                                    df_day.loc[m, 'macd']
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                                # MACD顶背驰
 | 
	
		
			
				|  |  | +                                if x_macd_dif < m_macd_dif:
 | 
	
		
			
				|  |  | +                                    df_day.loc[x, 'HL'] = 'HH'
 | 
	
		
			
				|  |  | +                                    # 产生卖出信号,进入hlfx_pool
 | 
	
		
			
				|  |  | +                                    results.remove(qmt_stock)
 | 
	
		
			
				|  |  | +                                    break
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                                # 前一个为顶,且中间存在不包含 or 更高的顶
 | 
	
		
			
				|  |  | +                                else:
 | 
	
		
			
				|  |  | +                                    df_day.loc[x, 'HL'] = 'H'
 | 
	
		
			
				|  |  | +                                    # 产生卖出信号,进入hlfx_pool
 | 
	
		
			
				|  |  | +                                    results.remove(qmt_stock)
 | 
	
		
			
				|  |  | +                            break
 | 
	
		
			
				|  |  | +                        m = m - 1
 | 
	
		
			
				|  |  | +                        if m == 0:
 | 
	
		
			
				|  |  | +                            df_day.loc[x, 'HL'] = 'H'
 | 
	
		
			
				|  |  | +                            results.remove(qmt_stock)
 | 
	
		
			
				|  |  |      engine_stock.dispose()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      db_pool = pymysql.connect(host='localhost',
 | 
	
	
		
			
				|  | @@ -283,28 +293,32 @@ def bridge():
 | 
	
		
			
				|  |  |      print(f'bridge is {os.getpid()}, now is {dt.now()},开盘了')
 | 
	
		
			
				|  |  |      stocks = xtdata.get_stock_list_in_sector('沪深A股')
 | 
	
		
			
				|  |  |      seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
 | 
	
		
			
				|  |  | -    run(seq)
 | 
	
		
			
				|  |  | +    m = run(seq)
 | 
	
		
			
				|  |  |      xtdata.unsubscribe_quote(seq)
 | 
	
		
			
				|  |  | +    if m == 1:
 | 
	
		
			
				|  |  | +        pass
 | 
	
		
			
				|  |  |      print(f'{dt.now()},收盘了,收工!')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def prepare(data):
 | 
	
		
			
				|  |  |      print(f'prepare is {os.getpid()}, now is {dt.now()},开盘了')
 | 
	
		
			
				|  |  |      stock_list = list(data.keys())
 | 
	
		
			
				|  |  | -    cpu_count = 6
 | 
	
		
			
				|  |  | -    pool = mp.Pool(processes=cpu_count, maxtasksperchild=6)
 | 
	
		
			
				|  |  | +    if len(data.keys()) >= 12:
 | 
	
		
			
				|  |  | +        cpu_count = 12
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +        cpu_count = len(data.keys())
 | 
	
		
			
				|  |  |      step = math.ceil(len(stock_list) / cpu_count)
 | 
	
		
			
				|  |  | -    to_hlfx_list = []
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    to_hlfx_list = []
 | 
	
		
			
				|  |  |      for i in range(0, len(stock_list), step):
 | 
	
		
			
				|  |  |          to_hlfx_list.append([x for x in stock_list[i:i + step]])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    for m in range(cpu_count):
 | 
	
		
			
				|  |  | +    pool = mp.Pool(processes=cpu_count, maxtasksperchild=12)
 | 
	
		
			
				|  |  | +    print(len(to_hlfx_list))
 | 
	
		
			
				|  |  | +    for m in range(len(to_hlfx_list)):
 | 
	
		
			
				|  |  |          pool.apply_async(func=hlfx,
 | 
	
		
			
				|  |  |                           args=(to_hlfx_list[m], data), error_callback=err_call_back)
 | 
	
		
			
				|  |  |      pool.close()
 | 
	
		
			
				|  |  | -    time.sleep(8000)
 | 
	
		
			
				|  |  | -    pool.terminate()
 | 
	
		
			
				|  |  |      pool.join()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -312,7 +326,7 @@ if __name__ == '__main__':
 | 
	
		
			
				|  |  |      print(f'总进程pid:{os.getpid()}')
 | 
	
		
			
				|  |  |      mp.freeze_support()
 | 
	
		
			
				|  |  |      pus = psutil.Process()
 | 
	
		
			
				|  |  | -    pus.cpu_affinity([0, 1, 2, 3, 4, 5])
 | 
	
		
			
				|  |  | +    pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      path = r'c:\\qmt\\userdata_mini'
 | 
	
		
			
				|  |  |      # 生成session id 整数类型 同时运行的策略不能重复
 | 
	
	
		
			
				|  | @@ -332,6 +346,8 @@ if __name__ == '__main__':
 | 
	
		
			
				|  |  |      subscribe_result = xt_trader.subscribe(acc)
 | 
	
		
			
				|  |  |      print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    # bridge()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      scheduler = BlockingScheduler()
 | 
	
		
			
				|  |  |      scheduler.add_job(func=bridge, trigger='cron', day_of_week='0-4', hour='09', minute='25',
 | 
	
		
			
				|  |  |                        timezone="Asia/Shanghai")
 |