Kaynağa Gözat

控制CPU使用分布,关闭使用完成的MySQL链接

daniel-ali 2 yıl önce
ebeveyn
işleme
bc726d5d24
3 değiştirilmiş dosya ile 27 ekleme ve 16 silme
  1. 2 0
      QMT/download_data_whole.py
  2. 5 1
      QMT/qmt_real_hlfx.py
  3. 20 15
      QMT/real_time.py

+ 2 - 0
QMT/download_data_whole.py

@@ -64,6 +64,8 @@ def download_data(stock_list):
     pool.close()
     pool.join()
 
+    print(f'今日数据下载完毕 {dt.now()}')
+
 
 
 if __name__ == '__main__':

+ 5 - 1
QMT/qmt_real_hlfx.py

@@ -13,6 +13,7 @@ import time
 import math
 import multiprocessing as mp
 import os
+import psutil
 #原始版本
 
 # auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
@@ -93,7 +94,7 @@ def err_call_back(err):
 
 
 def hlfx(data, stocks, pool_list):
-    print(f'MyPid is {os.getpid()}, now is {dt.now()}')
+    print(f'MyPid is {os.getpid()}, now is {dt.now()},本进程list长度为{len(stocks)}')
     engine_stock = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
     print(stocks)
     for qmt_stock in stocks:
@@ -222,6 +223,7 @@ def hlfx(data, stocks, pool_list):
                     m = m - 1
                     if m == 0:
                         df_day.loc[x, 'HL'] = 'H'
+    engine_stock.dispose()
 
 
 
@@ -300,6 +302,8 @@ def prepare():
 
 
 if __name__ == '__main__':
+    pus = psutil.Process()
+    pus.cpu_affinity([0, 1, 2, 3])
     path = r'c:\\qmt\\userdata_mini'
     # 生成session id 整数类型 同时运行的策略不能重复
     session_id = int(time.time())

+ 20 - 15
QMT/real_time.py

@@ -11,6 +11,7 @@ from jqdatasdk import *
 import pymysql
 import multiprocessing as mp
 import math
+import psutil
 
 auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
 db_pool = pymysql.connect(host='localhost',
@@ -70,17 +71,20 @@ def his_vol(stock, num):
 
 
 def ma_judge(data, stock_list, results):
-    print('这个ma_judge的PID为:', os.getpid())
-    for stock in data:
-        i = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
-        current_price, open_price = data[i]['lastPrice'], data[i]['open']
-        MA5, MA10, MA20 = ma(i, 5, data), ma(i, 10, data), ma(i, 20, data)
-        MA5_1 = ma_1(i, 5)
-        # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
-        if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
-                MA20 < MA10):
-            if his_vol(i, -1) > his_vol(i, -2):
-                results.append(i.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
+    print('这个ma_judge的PID为:', os.getpid(), len(stock_list))
+    for stock in stock_list:
+        if stock in data:
+            i = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
+            current_price, open_price = data[i]['lastPrice'], data[i]['open']
+            MA5, MA10, MA20 = ma(i, 5, data), ma(i, 10, data), ma(i, 20, data)
+            MA5_1 = ma_1(i, 5)
+            # print(i, current_price, open_price, MA5, MA10, MA20, MA5_1)
+            if (current_price > open_price) & (current_price > MA5) & (MA5 > MA5_1) & (current_price < MA5 * 1.03) & (
+                    MA20 < MA10):
+                if his_vol(i, -1) > his_vol(i, -2):
+                    results.append(i.replace('SH', 'XSHG').replace('SZ', 'XSHE'))
+        else:
+            continue
     print('RRRRRRR,', results)
 
 
@@ -134,16 +138,17 @@ def buy_trader(data):
                     print('append')
     '''
 
-    step = math.ceil(len(stock_pool) / mp.cpu_count())
+    step = math.ceil((len(stock_pool) / mp.cpu_count())*2)
     print('step:', step)
     print('cpu_count =', mp.cpu_count())
-    for i in range(0, len(stock_pool), math.ceil(len(stock_pool) / mp.cpu_count())):
+    for i in range(0, len(stock_pool), step):
         p = mp.Process(target=ma_judge, args=(data, stock_pool[i:i + step], results))
         mp_list.append(p)
         p.start()
     for j in mp_list:
         j.join()
     results = list(set(results))
+    engine_hlfx_pool.dispose()
     print('results!!!!', len(results))
 
     # 选择板块
@@ -183,12 +188,10 @@ def buy_trader(data):
         for stock in data:
             asset = xt_trader.query_stock_asset(acc)
             cash = asset.cash
-            print(cash)
             if stock in new_keep_stock:
                 current_price = data[stock]['lastPrice']
                 if cash > 2000:
                     volume = int((cash / 3 / current_price) // 100 * 100)
-                    print('volume:', volume)
                     print('买入信号!!!!!!', stock, volume, current_price)
                     order_id = xt_trader.order_stock(acc, stock, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE, current_price, 'strategy1', 'order_test')
                     print(order_id)
@@ -276,6 +279,8 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
 
 if __name__ == '__main__':
     auth('18616891214', 'Ea?*7f68nD.dafcW34d!')
+    pus = psutil.Process()
+    pus.cpu_affinity([4, 5, 6, 7])
 
     print("start")
     # 指定客户端所在路径