Browse Source

qmt 实时更新顶底分型
qmt 计算技术指标+顶底分型 ——>get_macd

Daniel 2 years ago
parent
commit
d8c3bee1a5
5 changed files with 750 additions and 124 deletions
  1. 83 0
      QMT/download_data_whole.py
  2. 0 116
      QMT/get_macd.py
  3. 266 0
      QMT/qmt_get_macd.py
  4. 400 0
      QMT/qmt_real_hlfx.py
  5. 1 8
      QMT/real_time.py

+ 83 - 0
QMT/download_data_whole.py

@@ -0,0 +1,83 @@
+from xtquant import xtdata
+from datetime import datetime as dt
+import pandas as pd
+import math
+from sqlalchemy import create_engine
+import multiprocessing as mp
+import os
+
+
+pd.set_option('display.max_columns', None) # 设置显示最大行
+
+
+path = 'C:\\qmt\\userdata_mini'
+
+field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
+cpu_count = mp.cpu_count()
+eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
+
+
+def err_call_back(err):
+    print(f'问题在这里~ error:{str(err)}')
+
+
+def to_sql(stock_list):
+    print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}')
+    m = 0
+    for stock in stock_list:
+        # 后复权数据
+        data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
+        df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
+                                                                 'amount']], axis=1)
+        df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
+        df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+        df_back.reset_index(drop=True, inplace=True)
+
+        # 前复权数据
+        data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
+        df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
+                                                                   'amount']], axis=1)
+        df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
+                            'amount_front']
+        df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+        df = pd.merge_asof(df_back, df_front, 'time')
+        # print(df)
+        try:
+            df.to_sql('%s_1d' % stock, con=eng_w, index=True, if_exists='replace')
+        except BaseException:
+            print(stock)
+            pass
+        else:
+            m += 1
+    print(f'Pid:{os.getpid()}已经完工了.应入库{len(stock_list)},共入库{m}支个股')
+
+
+def download_data(stock_list):
+    print(dt.now(), '开始下载!')
+    xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
+    print(dt.now(), '下载完成,准备入库!')
+    step = math.ceil(len(stock_list) / mp.cpu_count())
+    pool = mp.Pool(processes=mp.cpu_count())
+    for i in range(0, len(stock_list), step):
+        pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
+    pool.close()
+    pool.join()
+
+
+
+if __name__ == '__main__':
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
+    cpu_count = mp.cpu_count()
+    stocks.sort()
+    step = math.ceil(len(stocks) / cpu_count)
+
+    download_data(stocks)
+
+    # scheduler = BlockingScheduler()
+    # scheduler.add_job(func=download_data, trigger='cron', hour='15', minute='45', args=[stocks, eng_b, eng_f],
+    #                   timezone="Asia/Shanghai")
+    # try:
+    #     scheduler.start()
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass

+ 0 - 116
QMT/get_macd.py

@@ -1,116 +0,0 @@
-# coding:utf-8
-from datetime import datetime as dt
-import os
-import pandas as pd
-import time
-from sqlalchemy import create_engine
-from jqdatasdk import *
-import pymysql
-import multiprocessing as mp
-import math
-import talib as ta
-
-# pd.set_option('display.max_columns', None)  # 设置显示最大行
-
-
-def err_call_back(err):
-    print(f'出错啦~ error:{str(err)}')
-
-
-def myself_kdj(df):
-    low_list = df['low'].rolling(9, min_periods=9).min()
-    low_list.fillna(value=df['low'].expanding().min(), inplace=True)
-    high_list = df['high'].rolling(9, min_periods=9).max()
-    high_list.fillna(value=df['high'].expanding().max(), inplace=True)
-    rsv = (df['close'] - low_list) / (high_list - low_list) * 100
-    df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
-    df['d'] = df['k'].ewm(com=2).mean()
-    df['j'] = 3 * df['k'] - 2 * df['d']
-    return df
-
-
-# macd指标
-def get_macd_data(data, short=0, long1=0, mid=0):
-    if short == 0:
-        short = 12
-    if long1 == 0:
-        long1 = 26
-    if mid == 0:
-        mid = 9
-    data['sema'] = pd.Series(data['close']).ewm(span=short).mean()
-    data['lema'] = pd.Series(data['close']).ewm(span=long1).mean()
-    data.fillna(0, inplace=True)
-    data['dif'] = data['sema'] - data['lema']
-    data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
-    data['macd'] = 2 * (data['dif'] - data['dea'])
-    data.fillna(0, inplace=True)
-    return data[['dif', 'dea', 'macd']]
-
-
-# rsi指标
-# 建议用talib库的RSI方法,亲测有用
-def get_ris(data):
-    data["rsi_6"] = ta.RSI(data['close'], timeperiod=6)
-    data["rsi_12"] = ta.RSI(data['close'], timeperiod=12)
-    data["rsi_24"] = ta.RSI(data['close'], timeperiod=24)
-
-
-def get_bias(df):
-    # 计算方法:
-    # bias指标
-    # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
-    df['bias_6'] = (df['close'] - df['close'].rolling(6, min_periods=1).mean()) / \
-                    df['close'].rolling(6, min_periods=1).mean() * 100
-    df['bias_12'] = (df['close'] - df['close'].rolling(12, min_periods=1).mean()) / \
-                     df['close'].rolling(12, min_periods=1).mean() * 100
-    df['bias_24'] = (df['close'] - df['close'].rolling(24, min_periods=1).mean()) / \
-                     df['close'].rolling(24, min_periods=1).mean() * 100
-    df['bias_6'] = round(df['bias_6'], 2)
-    df['bias_12'] = round(df['bias_12'], 2)
-    df['bias_24'] = round(df['bias_24'], 2)
-
-
-def get_wilr(df):
-    # 威廉指标
-    # 建议用talib库的WILLR方法,亲测有用
-    df['willr'] = ta.WILLR(df['high'], df['low'], df['close'], timeperiod=14)
-
-
-def tech_anal(datas):
-    engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
-    engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
-    for stock in datas:
-        stock = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
-        df = pd.read_sql_table('%s_1d' % stock, con=engine)
-        get_macd_data(df)
-        get_ris(df)
-        get_bias(df)
-        get_wilr(df)
-        df = df.reset_index(drop=True)
-        df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
-        # with engine.connect() as con:
-        #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
-        print(stock, df)
-
-
-if __name__ == '__main__':
-    sttime = dt.now()
-
-    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
-    stock_pool = pd.read_sql_query('select securities from stocks_list', engine_hlfx_pool)
-    stock_pool = stock_pool.iloc[-1, 0].split(",")
-    stock_pool.sort()
-    print(len(stock_pool))
-
-    pool = mp.Pool(processes=mp.cpu_count())
-    # step = math.ceil(len(stock_pool) / mp.cpu_count())
-    step = 100
-
-    for i in range(0, len(stock_pool), step):
-        pool.apply_async(func=tech_anal, args=(stock_pool[i:i + step],), error_callback=err_call_back)
-
-    pool.close()
-    pool.join()
-    edtime = dt.now()
-
-    print(edtime - sttime)

+ 266 - 0
QMT/qmt_get_macd.py

@@ -0,0 +1,266 @@
+# coding:utf-8
+from datetime import datetime as dt
+import os
+import pandas as pd
+import time
+from sqlalchemy import create_engine
+from jqdatasdk import *
+import pymysql
+import multiprocessing as mp
+import math
+import talib as ta
+from xtquant import xtdata
+import os
+
+pd.set_option('display.max_columns', None)  # 设置显示最大行
+engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
+
+
+def err_call_back(err):
+    print(f'出错啦~ error:{str(err)}')
+
+
+def myself_kdj(df):
+    low_list = df['low_back'].rolling(9, min_periods=9).min()
+    low_list.fillna(value=df['low_back'].expanding().min(), inplace=True)
+    high_list = df['high_back'].rolling(9, min_periods=9).max()
+    high_list.fillna(value=df['high_back'].expanding().max(), inplace=True)
+    rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100
+    df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
+    df['d'] = df['k'].ewm(com=2).mean()
+    df['j'] = 3 * df['k'] - 2 * df['d']
+    return df
+
+
+# macd指标
+def get_macd_data(data, short=0, long1=0, mid=0):
+    if short == 0:
+        short = 12
+    if long1 == 0:
+        long1 = 26
+    if mid == 0:
+        mid = 9
+    data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean()
+    data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean()
+    data.fillna(0, inplace=True)
+    data['dif'] = data['sema'] - data['lema']
+    data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
+    data['macd'] = 2 * (data['dif'] - data['dea'])
+    data.fillna(0, inplace=True)
+    # return data[['dif', 'dea', 'macd']]
+
+
+# rsi指标
+# 建议用talib库的RSI方法,亲测有用
+def get_ris(data):
+    data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
+    data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
+    data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24)
+
+
+def get_bias(data):
+    # 计算方法:
+    # bias指标
+    # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
+    data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \
+                     data['close_back'].rolling(6, min_periods=1).mean() * 100
+    data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \
+                      data['close_back'].rolling(12, min_periods=1).mean() * 100
+    data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \
+                      data['close_back'].rolling(24, min_periods=1).mean() * 100
+    data['bias_6'] = round(data['bias_6'], 2)
+    data['bias_12'] = round(data['bias_12'], 2)
+    data['bias_24'] = round(data['bias_24'], 2)
+
+
+def get_wilr(data):
+    # 威廉指标
+    # 建议用talib库的WILLR方法,亲测有用
+    data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14)
+
+
+def get_hlfx(data):
+    Trading_signals = 0
+    data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']]
+    data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
+    df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
+    # 先处理去包含
+    for i in data_temp.index:
+        if i == 0 or i == 1:
+            df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
+        # 不包含
+        elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
+              and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
+                or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
+                    and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
+            df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
+        # 包含
+        else:
+            # 左高,下降
+            if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
+                df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
+                df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
+            else:
+                # 右高,上升
+                df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
+                df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
+    # print('111', df_day, data_temp)
+
+    if len(df_day.index) > 2:
+        # 寻找顶底分型
+        for x in range(2, len(df_day.index)):
+            m = x - 1
+            # 底
+            if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (
+                    df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
+
+                df_day.loc[x, 'HL'] = 'L*'
+                while m:
+                    if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
+                        if (x - m) > 3:
+                            # 成笔——>L
+                            df_day.loc[x, 'HL'] = 'L'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 1
+
+                    elif df_day.loc[m, 'HL'] == 'L':
+                        if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
+                            # 前一个为底更高,且中间不存在更低的底
+                            df_day.loc[x, 'HL'] = 'L'
+
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 1
+
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], data_temp.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], data_temp.loc[m, 'macd']
+
+                            # MACD底背驰
+                            if m_macd_dif < x_macd_dif:
+                                # 背驰底->LL
+                                df_day.loc[x, 'HL'] = 'LL'
+                            break
+                        break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'L'
+
+            # 顶
+            elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
+                    df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
+
+                df_day.loc[x, 'HL'] = 'H*'
+                while m:
+                    if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
+                        if x - m > 3:
+                            # 成笔->H
+                            df_day.loc[x, 'HL'] = 'H'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 2
+
+                    elif (df_day.loc[m, 'HL'] == 'H'):
+                        if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
+                            # 前一个为顶,且中间存在不包含 or 更高的顶
+                            df_day.loc[x, 'HL'] = 'H'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 2
+
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], data_temp.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], data_temp.loc[m, 'macd']
+
+                            # MACD顶背驰
+                            if x_macd_dif < m_macd_dif:
+                                df_day.loc[x, 'HL'] = 'HH'
+                            break
+                        break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'H'
+
+            else:
+                df_day.loc[x, 'HL'] = '-'
+    df_temp = df_day[['time', 'HL']]
+
+    return df_temp, Trading_signals
+
+
+def tech_anal(stocks, hlfx_pool):
+
+    engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+    m = 0
+    print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()}')
+    for stock in stocks[0:1]:
+        try:
+            df = pd.read_sql_table('%s_1d' % stock, con=engine)
+        except BaseException:
+            print(stock)
+            pass
+        else:
+            get_macd_data(df)
+            get_ris(df)
+            get_bias(df)
+            get_wilr(df)
+            df_temp, T_signals = get_hlfx(df)
+            print(T_signals)
+            df = pd.merge(df, df_temp, on='time', how='left')
+            df['HL'].fillna(value='-', inplace=True)
+            df = df.reset_index(drop=True)
+            if stock in hlfx_pool and T_signals == 2:
+                hlfx_pool.remove(stock)
+            elif stock not in hlfx_pool and T_signals == 1:
+                hlfx_pool.append(stock)
+            try:
+                df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+            # with engine.connect() as con:
+            #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
+            except BaseException:
+                print(stock)
+                pass
+            else:
+                print(f"{stock} 成功!")
+                m += 1
+    print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
+
+
+if __name__ == '__main__':
+    sttime = dt.now()
+
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    print(len(stocks))
+
+    fre = '1d'
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+    hlfx_pool = mp.Manager().list()
+    hlfx_pool.extend(pd.read_sql_query(
+        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
+
+    pool = mp.Pool(processes=mp.cpu_count())
+    step = math.ceil(len(stocks) / mp.cpu_count())
+    step = 10000
+    for i in range(0, len(stocks), step):
+        pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool,), error_callback=err_call_back)
+    pool.close()
+    pool.join()
+
+    print(hlfx_pool)
+
+    # 存档入库
+    db_pool = pymysql.connect(host='localhost',
+                              user='root',
+                              port=3307,
+                              password='r6kEwqWU9!v3',
+                              database='hlfx_pool')
+    cursor_pool = db_pool.cursor()
+    results_list = ','.join(set(hlfx_pool))
+
+    sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+    cursor_pool.execute(sql)
+    db_pool.commit()
+    edtime = dt.now()
+
+    print(edtime - sttime)

+ 400 - 0
QMT/qmt_real_hlfx.py

@@ -0,0 +1,400 @@
+# coding:utf-8
+from jqdatasdk import *
+import pandas as pd
+import pymysql
+from sqlalchemy import create_engine
+import threading
+from datetime import datetime as dt
+from jqdatasdk.technical_analysis import *
+from xtquant import xtdata, xtconstant
+from xtquant.xttype import StockAccount
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+import time
+import math
+import multiprocessing as mp
+import os
+#原始版本
+
+# auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
+# auth('18521506014', 'Abc123!@#')
+# stocks = list(get_all_securities(['stock'], date=dt.today().strftime('%Y-%m-%d')).index)
+# stocks = stocks[0:200]
+
+pd.set_option('display.max_columns', None) # 设置显示最大行
+fre = '1d'
+
+class MyXtQuantTraderCallback(XtQuantTraderCallback):
+    def on_disconnected(self):
+        """
+        连接断开
+        :return:
+        """
+        print(datetime.datetime.now(), '连接断开回调')
+
+    def on_stock_order(self, order):
+        """
+        委托回报推送
+        :param order: XtOrder对象
+        :return:
+        """
+        print(datetime.datetime.now(), '委托回调', order.order_remark)
+
+    def on_stock_trade(self, trade):
+        """
+        成交变动推送
+        :param trade: XtTrade对象
+        :return:
+        """
+        print(datetime.datetime.now(), '成交回调', trade.order_remark)
+
+    def on_order_error(self, order_error):
+        """
+        委托失败推送
+        :param order_error:XtOrderError 对象
+        :return:
+        """
+        # print("on order_error callback")
+        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
+        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
+
+    def on_cancel_error(self, cancel_error):
+        """
+        撤单失败推送
+        :param cancel_error: XtCancelError 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_order_stock_async_response(self, response):
+        """
+        异步下单回报推送
+        :param response: XtOrderResponse 对象
+        :return:
+        """
+        print(f"异步委托回调 {response.order_remark}")
+
+    def on_cancel_order_stock_async_response(self, response):
+        """
+        :param response: XtCancelOrderResponse 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_account_status(self, status):
+        """
+        :param response: XtAccountStatus 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+
+def err_call_back(err):
+    print(f'问题在这里~ error:{str(err)}')
+
+
+def hlfx(data, stocks, pool_list):
+    engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+    print(stocks)
+    for qmt_stock in stocks[0:2]:
+        stock = qmt_stock.replace('SH', 'XSHG').replace('SZ', 'XSHE')
+        # 读取qmt_stocks_whole表-前复权-信息
+        try:
+            df_day = pd.read_sql_query(
+                'select time, open_front, close_front, high_front, low_front, volume_front, amount_front, dif, dea, macd,HL from `%s_%s`'
+                % (qmt_stock, fre), engine_stock)
+            df_day.columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount', 'dif', 'dea', 'macd','HL']
+        except BaseException:
+            print(stock)
+            pass
+        else:
+            # 获得最新价格信息
+            get_price = data[qmt_stock]
+            # 调整time时间格式
+            get_price['time'] = dt.fromtimestamp(get_price['time'] / 1000.0)
+            # print('成功判定', get_price['time'])
+
+            # 先处理去包含
+            # 不包含
+            if (df_day.iloc[-1, 3] > get_price['high']
+                and df_day.iloc[-1, 4] > get_price['low']) \
+                    or (df_day.iloc[-1, 3] < get_price['high']
+                        and df_day.iloc[-1, 4] < get_price['low']):
+                # print('lalallala', get_price['open'], get_price['lastPrice'], get_price['high'],
+                #                             get_price['low'], get_price['volume'], get_price['amount'])
+                qmt_df = pd.DataFrame(data=[[get_price['time'], get_price['open'], get_price['lastPrice'], get_price['high'],
+                                            get_price['low'], get_price['volume'], get_price['amount']]],
+                                      columns=['time', 'open', 'close', 'high', 'low', 'volume', 'amount'])
+                # print('qmt_______', qmt_df)
+                df_day = pd.concat([df_day, qmt_df], ignore_index=True)
+                # print('不包含,合并完成', df_day)
+
+            # 包含
+            else:
+                # 左高,下降
+                if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
+                    df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], get_price['high'])
+                    df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], get_price['low'])
+                # 右高,上升
+                else:
+                    df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], get_price['high'])
+                    df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], get_price['low'])
+                # print('包含', df_day)
+
+            # 数合并完成,确认df_day
+            # print(df_day)
+
+            # 寻找顶底分型
+            x = len(df_day.index)-1
+            m = x - 1
+            # 底
+            if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and (
+                    df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
+                df_day.loc[x, 'HL'] = 'L*'
+                # 判断底的性质
+                while m:
+                    if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
+                        if (x - m) > 3:
+                            # 成笔——>L
+                            df_day.loc[x, 'HL'] = 'L'
+
+                    elif df_day.loc[m, 'HL'] == 'L':
+                        if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
+                            # pool_list.append(qmt_stock)
+
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
+                            df_day.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
+                            df_day.loc[m, 'macd']
+
+                            # 背驰底->LL
+                            if m_macd_dif < x_macd_dif:
+                                df_day.loc[x, 'HL'] = 'LL'
+                                # 产生信号,进入hlfx_pool
+                                pool_list.append(qmt_stock)
+                            # 前一个为底更高,且中间不存在更低的底
+                            else:
+                                df_day.loc[x, 'HL'] = 'L'
+                                # 产生信号,进入hlfx_pool
+                                pool_list.append(qmt_stock)
+                            break
+                        break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'L'
+
+            # 顶
+
+            elif (df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
+                    df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high']) and (qmt_stock in pool_list):
+                df_day.loc[x, 'HL'] = 'H*'
+                while m:
+                    if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
+                        if x - m > 3:
+                            # 成笔->H
+                            df_day.loc[x, 'HL'] = 'H'
+                            # 产生卖出信号,进入hlfx_pool
+                            pool_list.remove(qmt_stock)
+                            break
+
+                    elif (df_day.loc[m, 'HL'] == 'H'):
+                        if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = df_day.loc[x, 'dif'], df_day.loc[x, 'dea'], \
+                                df_day.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = df_day.loc[m, 'dif'], df_day.loc[m, 'dea'], \
+                                df_day.loc[m, 'macd']
+
+                            # MACD顶背驰
+                            if x_macd_dif < m_macd_dif:
+                                df_day.loc[x, 'HL'] = 'HH'
+                                # 产生卖出信号,进入hlfx_pool
+                                pool_list.remove(qmt_stock)
+
+                            # 前一个为顶,且中间存在不包含 or 更高的顶
+                            else:
+                                df_day.loc[x, 'HL'] = 'H'
+                                # 产生卖出信号,进入hlfx_pool
+                                pool_list.remove(qmt_stock)
+
+                            break
+                        break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'H'
+
+
+
+def bridge(data):
+    # 连接数据库
+    '''
+    db = pymysql.connect(host='localhost',
+                         user='root',
+                         port=3307,
+                         password='r6kEwqWU9!v3',
+                         database='hlfx')
+    cursor = db.cursor()
+    cursor.execute("show tables like '%%%s%%' " % fre)
+    pool_list = [tuple[0] for tuple in cursor.fetchall()]
+    print('取得 table_list %s' % fre)
+    '''
+    '''
+    
+    1.获取hlfx_pool中隔夜的标的
+    2.将本此的data均分,给到进程池
+    3.将data总数据、分配的任务stocklist、hlfx_pool 送入realtime_hlfx中进行计算
+    4.将实时刷新的hlfx存入hlfx_pool 以过滤出现顶分型的标的
+    
+    '''
+
+    # 获得hlfx_pool池子
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+    results = mp.Manager().list()
+    results.extend(pd.read_sql_query(
+        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(","))
+    print(results)
+
+    to_hlfx_list = []
+    keys = list(data.keys())
+    step = math.ceil(len(keys) / (mp.cpu_count()/2))
+    for i in range(0, len(keys), step):
+        to_hlfx_list.append([x for x in keys[i:i+step]])
+
+    pool = mp.Pool(processes=int(mp.cpu_count()/2))
+    for m in range(int(mp.cpu_count()/2)):
+        pool.apply_async(func=hlfx,
+                         args=(data, to_hlfx_list[m], results,), error_callback=err_call_back)
+    pool.close()
+    pool.join()
+
+
+
+    db_pool = pymysql.connect(host='localhost',
+                              user='root',
+                              port=3307,
+                              password='r6kEwqWU9!v3',
+                              database='hlfx_pool')
+    cursor_pool = db_pool.cursor()
+    print('建立-HLFX-池链接')
+
+    print(results)
+    print(set(results))
+    results_list = ','.join(set(results))
+    sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+    cursor_pool.execute(sql)
+    db_pool.commit()
+    print(f'{dt.now()}写入新的results,hlfx_pool更新')
+    # hlfx(data, engine_stock, engine_hlfx)
+    pass
+
+
+def prepare():
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+
+    results = pd.read_sql_query(
+        'select value from `%s`' % fre, engine_hlfx_pool).iloc[-1, 0].split(",")
+    results = [x.replace('XSHG', 'SH').replace('XSHE', 'SZ') for x in results]
+    print('数据库读取,并转化后缀格式', len(results))
+    # print(results[0:10])
+    return results
+
+
+if __name__ == '__main__':
+    path = r'c:\\qmt\\userdata_mini'
+    # 生成session id 整数类型 同时运行的策略不能重复
+    session_id = int(time.time())
+    xt_trader = XtQuantTrader(path, session_id)
+    # 创建资金账号为 800068 的证券账号对象
+    acc = StockAccount('920000207040', 'SECURITY')
+    # 创建交易回调类对象,并声明接收回调
+    callback = MyXtQuantTraderCallback()
+    xt_trader.register_callback(callback)
+    # 启动交易线程
+    xt_trader.start()
+    # 建立交易连接,返回0表示连接成功
+    connect_result = xt_trader.connect()
+    print('建立交易连接,返回0表示连接成功', connect_result)
+    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+    subscribe_result = xt_trader.subscribe(acc)
+    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    xtdata.subscribe_whole_quote(stocks, callback=bridge)
+
+    xtdata.run()
+
+
+
+
+    start = dt.now()
+    while True:
+        now_date = dt.now()
+        date_morning_begin = now_date.replace(hour=9, minute=25, second=0)
+        date_morning_end = now_date.replace(hour=11, minute=31, second=0)
+        date_afternooe_begin = now_date.replace(hour=13, minute=0, second=0)
+        date_afternooe_end = now_date.replace(hour=15, minute=0, second=0)
+        # print(now_date,date_morning_begin,date_morning_end,date_afternooe_begin,date_afternooe_end)
+        # if date_morning_begin < now_date < date_afternooe_end:
+        if True:
+
+
+
+
+
+
+
+
+
+            for fre in ['1d']:
+                start = dt.now()
+
+
+
+
+                stk = locals()
+                thd = threading.local()
+                # 进程准备
+                step = 400
+                thread_list = []
+                engine_stock = []
+                engine_hlfx = []
+                times_engine = 0
+
+
+
+                df = get_bars(stocks, count=5, unit=fre,
+                              fields=['date', 'open', 'close', 'high', 'low', 'volume', 'money'], include_now=True, df=True)
+                print(df, type(df))
+                print(df.loc['603566.XSHG'])
+                print(dt.now(), 'get_bars 成功')
+                exit()
+                for i in range(0, len(stocks), step):
+                    engine_stock.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/stocks?charset=utf8'))
+                    engine_hlfx.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx?charset=utf8'))
+                    thread = threading.Thread(target=hlfx, args=(stocks[i:i + step], engine_stock[times_engine], engine_hlfx[times_engine]))
+                    times_engine = times_engine + 1
+                    thread.start()
+                    thread_list.append(thread)
+
+                for thread in thread_list:
+                    thread.join()
+                db.close()
+
+
+
+                time = dt.now().strftime('%Y-%m-%d %H:%M:%S')
+                results_list =','.join(set(results))
+                print(set(results))
+                sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+                cursor_pool.execute(sql)
+                db_pool.commit()
+                print(fre, '\n', '做多:', len(set(results)),  set(results))
+                print('做空', len(set(results_short)), set(results_short))
+
+
+                end= dt.now()
+                print('总时长:', (end - start).seconds)
+        elif now_date>date_afternooe_end:
+            pass
+            # print("HLFX_收盘了",now_date)
+            # break

+ 1 - 8
QMT/real_time.py

@@ -90,9 +90,6 @@ def sell_trader(data, positions_dict):
     print('卖出函数:', dt.now())
     # positions = xt_trader.query_stock_positions(acc)
     # print('持仓总数:', len(positions_list))
-    for i in data:
-        print(i, type(i))
-        break
 
     for stock, volume in positions_dict.items():
         if stock in data:
@@ -200,16 +197,12 @@ def buy_trader(data):
 
 def trader(data):
     print(len(data.keys()))
+
     # 先判断卖出条件
     positions = xt_trader.query_stock_positions(acc)
     print('持仓数量', len(positions))
     if len(positions) != 0:
         positions_dict = {positions[x].stock_code: positions[x].volume for x in range(0, len(positions))}
-        print(type(positions_dict.keys()))
-        # print(data[positions_dict.keys()[0]])
-
-
-
         sell_trader(data, positions_dict)
 
     # 买入条件