Browse Source

增加前复权数据

Daniel 2 years ago
parent
commit
43b79b8599
5 changed files with 461 additions and 27 deletions
  1. 126 0
      QMT/demo.py
  2. 27 13
      QMT/download_data.py
  3. 18 0
      QMT/get_local_data.py
  4. 289 14
      QMT/real_time.py
  5. 1 0
      real_time_order_MA_HLFX_1025.py

+ 126 - 0
QMT/demo.py

@@ -0,0 +1,126 @@
+#coding:utf-8
+import time, datetime, traceback, sys
+from xtquant import xtdata
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
+from xtquant import xtconstant
+
+#定义一个类 创建类的实例 作为状态的容器
+class _a():
+    pass
+A = _a()
+A.bought_list = []
+A.hsa = xtdata.get_stock_list_in_sector('沪深A股')
+
+def interact():
+    """执行后进入repl模式"""
+    import code
+    code.InteractiveConsole(locals=globals()).interact()
+xtdata.download_sector_data()
+
+def f(data):
+    now = datetime.datetime.now()
+    for stock in data:
+        if stock not in A.hsa:
+            continue
+        cuurent_price = data[stock]['lastPrice']
+        pre_price = data[stock]['lastClose']
+        ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0
+        if ratio > 0.09 and stock not in A.bought_list:
+            print(f"{now} 最新价 买入 {stock} 200股")
+            async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 200, xtconstant.LATEST_PRICE, -1, 'strategy_name', stock)
+            A.bought_list.append(stock)
+    xt_trader.query_stock_orders()
+class MyXtQuantTraderCallback(XtQuantTraderCallback):
+    def on_disconnected(self):
+        """
+        连接断开
+        :return:
+        """
+        print(datetime.datetime.now(),'连接断开回调')
+
+    def on_stock_order(self, order):
+        """
+        委托回报推送
+        :param order: XtOrder对象
+        :return:
+        """
+        print(datetime.datetime.now(), '委托回调', order.order_remark)
+
+
+    def on_stock_trade(self, trade):
+        """
+        成交变动推送
+        :param trade: XtTrade对象
+        :return:
+        """
+        print(datetime.datetime.now(), '成交回调', trade.order_remark)
+
+
+    def on_order_error(self, order_error):
+        """
+        委托失败推送
+        :param order_error:XtOrderError 对象
+        :return:
+        """
+        # print("on order_error callback")
+        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
+        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
+
+    def on_cancel_error(self, cancel_error):
+        """
+        撤单失败推送
+        :param cancel_error: XtCancelError 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_order_stock_async_response(self, response):
+        """
+        异步下单回报推送
+        :param response: XtOrderResponse 对象
+        :return:
+        """
+        print(f"异步委托回调 {response.order_remark}")
+
+    def on_cancel_order_stock_async_response(self, response):
+        """
+        :param response: XtCancelOrderResponse 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_account_status(self, status):
+        """
+        :param response: XtAccountStatus 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+
+if __name__ == '__main__':
+    print("start")
+    #指定客户端所在路径
+    path = r'c:\\qmt\\userdata_mini'
+    # 生成session id 整数类型 同时运行的策略不能重复
+    session_id = int(time.time())
+    xt_trader = XtQuantTrader(path, session_id)
+    # 创建资金账号为 800068 的证券账号对象
+    acc = StockAccount('920000207040', 'SECURITY')
+    # 创建交易回调类对象,并声明接收回调
+    callback = MyXtQuantTraderCallback()
+    xt_trader.register_callback(callback)
+    # 启动交易线程
+    xt_trader.start()
+    # 建立交易连接,返回0表示连接成功
+    connect_result = xt_trader.connect()
+    print('建立交易连接,返回0表示连接成功', connect_result)
+    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+    subscribe_result = xt_trader.subscribe(acc)
+    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+
+    xtdata.subscribe_whole_quote(["SH", "SZ"], callback=f)
+    xtdata.run()
+
+    #进入交互模式
+    # interact()

+ 27 - 13
QMT/download_data.py

@@ -15,7 +15,7 @@ field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
 cpu_count = mp.cpu_count()
 
 
-def to_sql(stock_list, eng):
+def to_sql(stock_list, eng_b, eng_f):
     print(dt.now(), '开始循环入库!')
     for stock in stock_list:
         print(stock)
@@ -26,14 +26,25 @@ def to_sql(stock_list, eng):
         df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
         df.reset_index(drop=True, inplace=True)
         print(df)
-        df.to_sql('%s_1d' % stock, con=eng, index=True, if_exists='append')
+        df.to_sql('%s_1d' % stock, con=eng_b, index=True, if_exists='append')
+
+    for stock in stock_list:
+        print(stock)
+        data = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
+        df = pd.concat([data[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']],
+                       axis=1)
+        df.columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'amount']
+        df['time'] = df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
+        df.reset_index(drop=True, inplace=True)
+        print(df)
+        df.to_sql('%s_1d' % stock, con=eng_f, index=True, if_exists='append')
 
 
-def download_data(stock_list, eng):
+def download_data(stock_list, eng_b, eng_f):
     print(dt.now(), '开始下载!')
     xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
     print(dt.now(), '下载完成,准备入库!')
-    to_sql(stock_list, eng)
+    to_sql(stock_list, eng_b, eng_f)
 
 
 # def to_df(key, values, engine):
@@ -47,12 +58,15 @@ if __name__ == '__main__':
     cpu_count = mp.cpu_count()
     stocks.sort()
     step = math.ceil(len(stocks) / cpu_count)
-    engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
-
-    scheduler = BlockingScheduler()
-    scheduler.add_job(func=download_data, trigger='cron', hour='15', minute='45', args=[stocks, engine],
-                      timezone="Asia/Shanghai")
-    try:
-        scheduler.start()
-    except (KeyboardInterrupt, SystemExit):
-        pass
+    eng_b = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
+    eng_f = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_front?charset=utf8')
+
+    download_data(stocks, eng_b, eng_f)
+
+    # scheduler = BlockingScheduler()
+    # scheduler.add_job(func=download_data, trigger='cron', hour='15', minute='45', args=[stocks, engine],
+    #                   timezone="Asia/Shanghai")
+    # try:
+    #     scheduler.start()
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass

+ 18 - 0
QMT/get_local_data.py

@@ -6,8 +6,18 @@ from datetime import datetime as dt
 import time
 
 import pandas as pd
+from xtquant import xtdata
+
+def real_price(data):
+    for i in data:
+        list.append(i)
+        xtdata.unsubscribe_quote(id)
+        print(i)
+        print(list)
+
 
 if __name__=='__main__':
+    '''
     from xtquant import xtdata
     s = '000001.SZ'
     # 实际使用时,该接口每天盘后执行一次即可
@@ -39,3 +49,11 @@ if __name__=='__main__':
     # for column in data2:
     #     print(f"              {column}\n {data2[column].head()}\n")
 
+'''
+
+    list=[]
+    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    id = xtdata.subscribe_whole_quote(stocks, callback=real_price)
+    print(list)
+    xtdata.run()
+    print(list)

+ 289 - 14
QMT/real_time.py

@@ -1,24 +1,299 @@
+# coding:utf-8
+from datetime import datetime as dt
+import os
 import pandas as pd
+from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
+from xtquant.xttype import StockAccount
 from xtquant import xtdata
-from datetime import datetime as dt
+import time
+from sqlalchemy import create_engine
+from jqdatasdk import *
+import pymysql
+import multiprocessing as mp
+import math
+
+auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
+db_pool = pymysql.connect(host='localhost',
+                          user='root',
+                          port=3307,
+                          password='r6kEwqWU9!v3',
+                          database='hlfx_pool')
+cursor_pool = db_pool.cursor()
+engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_front?charset=utf8')
+
+
+def real_price(datas):
+    # i = '000001.SZ'
+    for i in datas:
+        if i == '000001.SZ':
+            print(i, datas[i])
+    # trader(datas)
+    # return datas
+
+
+def ma(stock, num, data):
+    global engine_stock
+    try:
+        i = (num - 1) * -1
+        df = pd.read_sql_query(
+            'select close from `%s_1d`' % stock, engine_stock)
+    except:
+        return 9999999
+    else:
+        ma_num = sum(df['close'][i:-1] + data[stock]['lastPrice'])/num
+        return ma_num
+
+
+def ma_1(stock, num):
+    global engine_stock
+    i = num  * -1
+    try:
+        df = pd.read_sql_query(
+            'select close from `%s_1d`' % stock, engine_stock)
+    except BaseException:
+        return 9999999
+    else:
+        ma_num_1 = df['close'][i:-1].mean()
+        return ma_num_1
+
+
+def his_vol(stock, num):
+    global engine_stock
+    num = num * -1
+    try:
+        df = pd.read_sql_query(
+            'select volume from `%s_1d`' % stock, engine_stock)
+    except BaseException:
+        return 9999999
+    else:
+        return df['volume'].iloc[num]
+
+
+def ma_judge(data, stock_list, results):
+    print('这个ma_judge的PID为:', os.getpid())
+    for stock in data:
+        i = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
+        current_price, open_price = data[i]['lastPrice'], data[i]['open']
+        MA5, MA10, MA20 = ma(i, 5, data), ma(i, 10, data), ma(i, 20, data)
+        MA5_1 = ma_1(i, 5)
+        print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
+        if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
+                MA20 < MA10):
+            if his_vol(i, -1) > his_vol(i, -2):
+                results.append(i.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
+    print('RRRRRRR,', results)
+
+
+def sell_trader(data, positions):
+    # for m in data:
+    #     print(m, data[m]['lastPrice'])
+    print('卖出函数:', dt.now())
+    # positions = xt_trader.query_stock_positions(acc)
+    # print('持仓总数:', len(positions))
+    for stock in data:
+        if stock in positions:
+            print('持仓', stock, data[stock])
+            current_price = data[stock]['lastPrice']
+            open_price = data[stock]['open']
+            print('价格:', current_price, open_price)
+            MA5 = ma(stock, 5, data)
+            MA5_1 = ma_1(stock, 5)
+            if current_price < MA5 or MA5 < MA5_1 or current_price > MA5 * 1.07:
+                print('卖出信号!!!!!!', stock, current_price)
+
+            #     order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_SELL,
+            #                                     i.volume, xtconstant.LATEST_PRICE, 0, 'strategy1', 'order_test')
+            # print(order_id, i)
+
+
+def buy_trader(data):
+    print('买入函数:', dt.now())
+    results = mp.Manager().list()
+    mp_list = []
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+
+    try:
+        stock_pool = pd.read_sql_query(
+            'select value from `%s`' % '1d', engine_hlfx_pool)
+        stock_pool = stock_pool.iloc[-1, 0].split(",")
+        stock_pool.sort()
+        print('stock_pool', stock_pool)
+    except BaseException:
+        pass
+    '''
+    for stock in data:
+        if stock.replace('SH', 'XSHG').replace('SZ', 'XSHE') in stock_pool:
+            # 真实买入策略
+            current_price, open_price = data[stock]['lastPrice'], data[stock]['open']
+            MA5, MA10, MA20 = ma(stock, 5), ma(stock, 10), ma(stock, 20)
+            MA5_1 = ma_1(stock, 5)
+            print(stock, current_price, open_price, MA5, MA10, MA20, MA5_1)
+            if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (MA20 < MA10):
+                if his_vol(stock, -1) > his_vol(stock, -2):
+                    results.append(stock.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
+                    print('append')
+    '''
+
+    step = math.ceil(len(stock_pool) / mp.cpu_count())
+    print('step:', step)
+    print('cpu_count =', mp.cpu_count())
+    for i in range(0, len(stock_pool), math.ceil(len(stock_pool) / mp.cpu_count())):
+        p = mp.Process(target=ma_judge, args=(data, stock_pool[i:i + step], results))
+        mp_list.append(p)
+        p.start()
+    for j in mp_list:
+        j.join()
+    results = list(set(results))
+    print('results!!!!', len(results))
 
-def MA(datas):
-    print(datas.keys())
-    print(pd.DataFrame(datas[datas.keys()]))
-    # return datas['close']
-    # for stock in datas:
-    #     df = pd.DataFrame(datas[stock])
-    #     df['time']= df['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
-    #     print(stock, df['close'])
-    # return df['close']
+    # 选择板块
+    if len(results) != 0:
+        num_industry = get_industry(results)
+        print(num_industry)
+        industry_list = []
+        for key in num_industry.values():
+            for key2 in key.values():
+                industry_list.append(key2['industry_name'])
+        industry_list = pd.value_counts(industry_list)
+        # 最热集中的n个板块
+        max_industry_list = list(industry_list[0:3].index)
+        results_industry = []
+        for key, value in num_industry.items():
+            for key2 in value.values():
+                if key2['industry_name'] in max_industry_list:
+                    results_industry.append(key)
+        print('所有:', set(results_industry))
+        results_industry = ','.join(set(results_industry))
+        print('1d', '\n', results_industry)
 
+        sql = "INSERT INTO MA5_%s (date,value) VALUES('%s','%s')" % ('1d', dt.now().strftime('%Y-%m-%d %H:%M:%S'),
+                                                                     results_industry)
+        cursor_pool.execute(sql)
+        db_pool.commit()
 
+        # print(len(results_industry), results_industry)
+        print(dt.now(), '数据库数据已赋值!')
+
+        # 取值交易
+
+        keep_stocks = results_industry.split(",")
+        new_keep_stock = [stock.replace('XSHG', 'SH').replace('XSHE', 'SZ') for stock in keep_stocks]
+        print(new_keep_stock)
+
+        for stock in data:
+            if stock in new_keep_stock:
+                current_price = data[stock]['lastPrice']
+                if acc.cash > 2000:
+                    volume = int((acc.cash / 2 / current_price) // 100 * 100)
+                    print('volume:', volume)
+                    print('买入信号!!!!!!', stock, volume, current_price)
+                    # order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE, current_price, 'strategy1', 'order_test')
+                    # print(order_id)
+    print('一轮结束了,现在时间是:', dt.now())
+
+
+def trader(data):
+    print(len(data.keys()))
+
+    # 先判断卖出条件
+    positions = xt_trader.query_stock_positions(acc)
+    print('持仓数量', len(positions))
+    if len(positions) != 0:
+        sell_trader(data, positions)
+
+    # 买入条件
+    buy_trader(data)
+
+
+class MyXtQuantTraderCallback(XtQuantTraderCallback):
+    def on_disconnected(self):
+        """
+        连接断开
+        :return:
+        """
+        print(datetime.datetime.now(), '连接断开回调')
+
+    def on_stock_order(self, order):
+        """
+        委托回报推送
+        :param order: XtOrder对象
+        :return:
+        """
+        print(datetime.datetime.now(), '委托回调', order.order_remark)
+
+    def on_stock_trade(self, trade):
+        """
+        成交变动推送
+        :param trade: XtTrade对象
+        :return:
+        """
+        print(datetime.datetime.now(), '成交回调', trade.order_remark)
+
+    def on_order_error(self, order_error):
+        """
+        委托失败推送
+        :param order_error:XtOrderError 对象
+        :return:
+        """
+        # print("on order_error callback")
+        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
+        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
+
+    def on_cancel_error(self, cancel_error):
+        """
+        撤单失败推送
+        :param cancel_error: XtCancelError 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_order_stock_async_response(self, response):
+        """
+        异步下单回报推送
+        :param response: XtOrderResponse 对象
+        :return:
+        """
+        print(f"异步委托回调 {response.order_remark}")
+
+    def on_cancel_order_stock_async_response(self, response):
+        """
+        :param response: XtCancelOrderResponse 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
+
+    def on_account_status(self, status):
+        """
+        :param response: XtAccountStatus 对象
+        :return:
+        """
+        print(datetime.datetime.now(), sys._getframe().f_code.co_name)
 
 
 if __name__ == '__main__':
+    auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
+
+    print("start")
+    # 指定客户端所在路径
+    path = r'c:\\qmt\\userdata_mini'
+    # 生成session id 整数类型 同时运行的策略不能重复
+    session_id = int(time.time())
+    xt_trader = XtQuantTrader(path, session_id)
+    # 创建资金账号为 800068 的证券账号对象
+    acc = StockAccount('920000207040', 'SECURITY')
+    # 创建交易回调类对象,并声明接收回调
+    callback = MyXtQuantTraderCallback()
+    xt_trader.register_callback(callback)
+    # 启动交易线程
+    xt_trader.start()
+    # 建立交易连接,返回0表示连接成功
+    connect_result = xt_trader.connect()
+    print('建立交易连接,返回0表示连接成功', connect_result)
+    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
+    subscribe_result = xt_trader.subscribe(acc)
+    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
+
     stocks = xtdata.get_stock_list_in_sector('沪深A股')
-    xtdata.subscribe_whole_quote(stocks, callback=real_price)
-    # xtdata.subscribe_quote('000001.SZ', '1d', '', '', count=1, callback=MA)
-    price = MA
-    print(price)
+    xtdata.subscribe_whole_quote(stocks, callback=trader)
     xtdata.run()
+    # xtdata.subscribe_quote('000001.SZ', '1d', '', '', count=1, callback=MA)

+ 1 - 0
real_time_order_MA_HLFX_1025.py

@@ -61,6 +61,7 @@ def XtTrader(new_keep_stock):
     asset = xt_trader.query_stock_asset(account)
     xtdata.subscribe_whole_quote(new_keep_stock, callback=real_price)
     positions = xt_trader.query_stock_positions(account)
+
     if asset:
         print("asset:")
         print(asset.account_type, asset.account_id, asset.cash, asset.frozen_cash, asset.market_value,