|  | @@ -2,7 +2,7 @@
 | 
	
		
			
				|  |  |  # from jqdatasdk import *
 | 
	
		
			
				|  |  |  import pandas as pd
 | 
	
		
			
				|  |  |  import pymysql
 | 
	
		
			
				|  |  | -from sqlalchemy import create_engine
 | 
	
		
			
				|  |  | +from sqlalchemy import create_engine, text
 | 
	
		
			
				|  |  |  import threading
 | 
	
		
			
				|  |  |  from datetime import datetime as dt
 | 
	
		
			
				|  |  |  import datetime
 | 
	
	
		
			
				|  | @@ -100,18 +100,17 @@ def err_call_back(err):
 | 
	
		
			
				|  |  |      traceback.print_exc()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def run(seq, pid):
 | 
	
		
			
				|  |  | +def run(seq):
 | 
	
		
			
				|  |  |      mor = datetime.datetime.strptime(
 | 
	
		
			
				|  |  |          str(dt.now().date()) + '11:30', '%Y-%m-%d%H:%M')
 | 
	
		
			
				|  |  |      afternoon = datetime.datetime.strptime(
 | 
	
		
			
				|  |  |          str(dt.now().date()) + '15:00', '%Y-%m-%d%H:%M')
 | 
	
		
			
				|  |  |      mor_1 = datetime.datetime.strptime(
 | 
	
		
			
				|  |  | -        str(dt.now().date()) + '12:59', '%Y-%m-%d%H:%M')
 | 
	
		
			
				|  |  | +        str(dt.now().date()) + '11:10', '%Y-%m-%d%H:%M')
 | 
	
		
			
				|  |  |      """阻塞线程接收行情回调"""
 | 
	
		
			
				|  |  |      import time
 | 
	
		
			
				|  |  |      client = xtdata.get_client()
 | 
	
		
			
				|  |  |      while True:
 | 
	
		
			
				|  |  | -        time.sleep(3)
 | 
	
		
			
				|  |  |          now_date = dt.now()
 | 
	
		
			
				|  |  |          if not client.is_connected():
 | 
	
		
			
				|  |  |              xtdata.unsubscribe_quote(seq)
 | 
	
	
		
			
				|  | @@ -127,8 +126,8 @@ def run(seq, pid):
 | 
	
		
			
				|  |  |              print(f'现在时间:{dt.now()},已收盘')
 | 
	
		
			
				|  |  |              sys.exit()
 | 
	
		
			
				|  |  |              break
 | 
	
		
			
				|  |  | -            # return 0
 | 
	
		
			
				|  |  | -    # return
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    return
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def hlfx(stock_list, data):
 | 
	
	
		
			
				|  | @@ -136,20 +135,22 @@ def hlfx(stock_list, data):
 | 
	
		
			
				|  |  |      print(f'def-->hlfx, MyPid is {os.getpid()}, 本次我需要计算{len(stock_list)},now is {dt.now()}')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      # 获得hlfx_pool池子
 | 
	
		
			
				|  |  | -    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
 | 
	
		
			
				|  |  | +    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
 | 
	
		
			
				|  |  | +                                     pool_size=100, pool_recycle=3600, max_overflow=50, pool_timeout=60)
 | 
	
		
			
				|  |  |      results = []
 | 
	
		
			
				|  |  | -    results.extend(pd.read_sql_query(
 | 
	
		
			
				|  |  | -        'select value from `%s` order by `index` desc limit 10' % fre, engine_hlfx_pool).iloc[0, 0].split(","))
 | 
	
		
			
				|  |  | +    results.extend(pd.read_sql_query(text(
 | 
	
		
			
				|  |  | +        'select value from `%s` order by `index` desc limit 10' % fre), engine_hlfx_pool.connect()).iloc[0, 0].split(","))
 | 
	
		
			
				|  |  |      print(f'本次hlfx_pool有{len(results)}个个股')
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
 | 
	
		
			
				|  |  | +    engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
 | 
	
		
			
				|  |  | +                                 pool_size=100, pool_recycle=3600, max_overflow=50, pool_timeout=60)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      for qmt_stock in stock_list:
 | 
	
		
			
				|  |  |          # 读取qmt_stocks_whole表-前复权-信息
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  | -            df_day = pd.read_sql_query(
 | 
	
		
			
				|  |  | +            df_day = pd.read_sql_query(text(
 | 
	
		
			
				|  |  |                  'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, '
 | 
	
		
			
				|  |  | -                'dif, dea, macd, HL from `%s_%s`' % (qmt_stock, fre), engine_stock)
 | 
	
		
			
				|  |  | +                'dif, dea, macd, HL from `%s_%s`' % (qmt_stock, fre)), engine_stock.connect())
 | 
	
		
			
				|  |  |              df_day.columns = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd', 'HL']
 | 
	
		
			
				|  |  |          except BaseException as e:
 | 
	
		
			
				|  |  |              print(qmt_stock, '未能读取!', e)
 | 
	
	
		
			
				|  | @@ -275,7 +276,7 @@ def hlfx(stock_list, data):
 | 
	
		
			
				|  |  |                          if m == 0:
 | 
	
		
			
				|  |  |                              df_day.loc[x, 'HL'] = 'H'
 | 
	
		
			
				|  |  |                              results.remove(qmt_stock)
 | 
	
		
			
				|  |  | -    engine_stock.dispose()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      db_pool = pymysql.connect(host='localhost',
 | 
	
		
			
				|  |  |                                user='root',
 | 
	
	
		
			
				|  | @@ -288,20 +289,12 @@ def hlfx(stock_list, data):
 | 
	
		
			
				|  |  |      sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
 | 
	
		
			
				|  |  |      cursor_pool.execute(sql)
 | 
	
		
			
				|  |  |      db_pool.commit()
 | 
	
		
			
				|  |  | -    print(f'{dt.now()}写入新的results{len(results_list)}个,hlfx_pool更新')
 | 
	
		
			
				|  |  | +    print(f'{dt.now()}写入新的results-{len(results_list)}个,hlfx_pool更新')
 | 
	
		
			
				|  |  | +    engine_stock.dispose()
 | 
	
		
			
				|  |  |      engine_hlfx_pool.dispose()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def bridge():
 | 
	
		
			
				|  |  | -    pid = os.getpid()
 | 
	
		
			
				|  |  | -    print(f'bridge is {os.getpid()}, now is {dt.now()},开盘了')
 | 
	
		
			
				|  |  | -    stocks = xtdata.get_stock_list_in_sector('沪深A股')
 | 
	
		
			
				|  |  | -    seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
 | 
	
		
			
				|  |  | -    run(seq, pid)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  def prepare(data):
 | 
	
		
			
				|  |  | -    print(f'prepare is {os.getpid()}, now is {dt.now()},开盘了')
 | 
	
		
			
				|  |  |      stock_list = list(data.keys())
 | 
	
		
			
				|  |  |      if len(data.keys()) >= 12:
 | 
	
		
			
				|  |  |          cpu_count = 12
 | 
	
	
		
			
				|  | @@ -314,7 +307,6 @@ def prepare(data):
 | 
	
		
			
				|  |  |          to_hlfx_list.append([x for x in stock_list[i:i + step]])
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      pool = mp.Pool(processes=cpu_count, maxtasksperchild=12)
 | 
	
		
			
				|  |  | -    print(len(to_hlfx_list))
 | 
	
		
			
				|  |  |      for m in range(len(to_hlfx_list)):
 | 
	
		
			
				|  |  |          pool.apply_async(func=hlfx,
 | 
	
		
			
				|  |  |                           args=(to_hlfx_list[m], data), error_callback=err_call_back)
 | 
	
	
		
			
				|  | @@ -322,6 +314,13 @@ def prepare(data):
 | 
	
		
			
				|  |  |      pool.join()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +def bridge():
 | 
	
		
			
				|  |  | +    print(f'bridge is {os.getpid()}, now is {dt.now()},开盘了')
 | 
	
		
			
				|  |  | +    stocks = xtdata.get_stock_list_in_sector('沪深A股')
 | 
	
		
			
				|  |  | +    seq = xtdata.subscribe_whole_quote(stocks, callback=prepare)
 | 
	
		
			
				|  |  | +    run(seq)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  def job_func():
 | 
	
		
			
				|  |  |      print(f"Job started at {dt.now()}")
 | 
	
		
			
				|  |  |      # 创建子进程
 | 
	
	
		
			
				|  | @@ -362,8 +361,8 @@ if __name__ == '__main__':
 | 
	
		
			
				|  |  |      scheduler = BlockingScheduler()
 | 
	
		
			
				|  |  |      scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='25',
 | 
	
		
			
				|  |  |                        timezone="Asia/Shanghai", max_instances=5)
 | 
	
		
			
				|  |  | -    # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='00',
 | 
	
		
			
				|  |  | -    #                   timezone="Asia/Shanghai")
 | 
	
		
			
				|  |  | +    # # scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='13', minute='00',
 | 
	
		
			
				|  |  | +    # #                   timezone="Asia/Shanghai")
 | 
	
		
			
				|  |  |      try:
 | 
	
		
			
				|  |  |          scheduler.start()
 | 
	
		
			
				|  |  |      except (KeyboardInterrupt, SystemExit):
 |