Daniel 1 jaar geleden
bovenliggende
commit
aed1618245

+ 30 - 0
.idea/deployment.xml

@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="PublishConfigData" autoUpload="Always" serverName="root@localhost:8722 password (3)" remoteFilesAllowedToDisappearOnAutoupload="false" confirmBeforeUploading="false">
+    <option name="confirmBeforeUploading" value="false" />
+    <serverData>
+      <paths name="root@localhost:8722 password">
+        <serverdata>
+          <mappings>
+            <mapping deploy="/tmp/pycharm_project_662" local="$PROJECT_DIR$" />
+          </mappings>
+        </serverdata>
+      </paths>
+      <paths name="root@localhost:8722 password (2)">
+        <serverdata>
+          <mappings>
+            <mapping deploy="/tmp/pycharm_project_952" local="$PROJECT_DIR$" />
+          </mappings>
+        </serverdata>
+      </paths>
+      <paths name="root@localhost:8722 password (3)">
+        <serverdata>
+          <mappings>
+            <mapping deploy="/Git/tmp" local="$PROJECT_DIR$/../backtest" />
+          </mappings>
+        </serverdata>
+      </paths>
+    </serverData>
+    <option name="myAutoUpload" value="ALWAYS" />
+  </component>
+</project>

+ 1 - 1
.idea/misc.xml

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
-  <component name="ProjectRootManager" version="2" project-jdk-name="Quant" project-jdk-type="Python SDK" />
+  <component name="ProjectRootManager" version="2" project-jdk-name="Remote Python 3.10.11 (sftp://root@localhost:8722/conda/envs/rapids/bin/python3.10)" project-jdk-type="Python SDK" />
   <component name="PyPackaging">
     <option name="earlyReleasesAsUpgrades" value="true" />
   </component>

+ 1 - 0
.idea/modules.xml

@@ -3,6 +3,7 @@
   <component name="ProjectModuleManager">
     <modules>
       <module fileurl="file://$PROJECT_DIR$/../AI/.idea/AI.iml" filepath="$PROJECT_DIR$/../AI/.idea/AI.iml" />
+      <module fileurl="file://$PROJECT_DIR$/../backtest/.idea/backtest.iml" filepath="$PROJECT_DIR$/../backtest/.idea/backtest.iml" />
       <module fileurl="file://$USER_HOME$/PycharmProjects/pythonProject/.idea/pythonProject.iml" filepath="$USER_HOME$/PycharmProjects/pythonProject/.idea/pythonProject.iml" />
       <module fileurl="file://$USER_HOME$/Library/CloudStorage/OneDrive-个人/个人/python_stocks/quantify01/.idea/quantify01.iml" filepath="$USER_HOME$/Library/CloudStorage/OneDrive-个人/个人/python_stocks/quantify01/.idea/quantify01.iml" />
       <module fileurl="file://$PROJECT_DIR$/.idea/stock.iml" filepath="$PROJECT_DIR$/.idea/stock.iml" />

+ 6 - 0
.idea/other.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="PySciProjectComponent">
+    <option name="PY_SCI_VIEW_SUGGESTED" value="true" />
+  </component>
+</project>

+ 1 - 0
.idea/stock.iml

@@ -8,5 +8,6 @@
     <orderEntry type="module" module-name="quantify01" />
     <orderEntry type="module" module-name="pythonProject" />
     <orderEntry type="module" module-name="AI" />
+    <orderEntry type="module" module-name="backtest" />
   </component>
 </module>

+ 1 - 0
.idea/vcs.xml

@@ -2,5 +2,6 @@
 <project version="4">
   <component name="VcsDirectoryMappings">
     <mapping directory="" vcs="Git" />
+    <mapping directory="$PROJECT_DIR$/../backtest" vcs="Git" />
   </component>
 </project>

BIN
QMT/20230516105422482.dmp


+ 4 - 3
QMT/230504_real_time.py

@@ -230,7 +230,7 @@ def buy_trader(data):
     results = mp.Manager().list()
     mp_list = []
     engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
-                                     pool_size=4000, pool_recycle=50, max_overflow=-1)
+                                     pool_pre_ping=True, pool_size=20, pool_timeout=60)
 
     try:
         stock_pool = pd.read_sql_query(
@@ -258,6 +258,7 @@ def buy_trader(data):
         for j in mp_list:
             j.join()
     results = list(set(results))
+    engine_hlfx_pool.dispose()
 
     # 选择板块
     if len(results) != 0:
@@ -336,7 +337,7 @@ def buy_trader(data):
 
 
 def trader(data):
-    print(f'本轮订阅{len(data)}')
+    print(f'本轮订阅{len(data)}', dt.now())
     # print(f'xt_trader = {xt_trader},{session_id}')
     # print(len(xt_trader.query_stock_positions(acc)))
     # 卖出判断
@@ -401,7 +402,7 @@ if __name__ == '__main__':
     pus = psutil.Process()
     pus.cpu_affinity([12, 13, 14, 15])
 
-    # job_func()
+    job_func()
 
     scheduler = BlockingScheduler()
     scheduler.add_job(func=job_func, trigger='cron', day_of_week='0-4', hour='09', minute='40',

+ 23 - 12
QMT/download_data_whole.py

@@ -8,6 +8,7 @@ import os
 from apscheduler.schedulers.blocking import BlockingScheduler
 import traceback
 import psutil
+import pymysql
 
 
 pd.set_option('display.max_columns', None) # 设置显示最大行
@@ -19,8 +20,6 @@ field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
 cpu_count = mp.cpu_count()
 
 
-
-
 def err_call_back(err):
     print(f'问题在这里~ error:{str(err)}')
     traceback.print_exc()
@@ -30,7 +29,7 @@ def to_sql(stock_list):
     print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}')
     m = 0
     for stock in stock_list:
-        eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
+        eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8',
                               pool_recycle=3600, pool_pre_ping=True, pool_size=1)
         # 后复权数据
         data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
@@ -65,8 +64,20 @@ def to_sql(stock_list):
 def download_data():
     stock_list = xtdata.get_stock_list_in_sector('沪深A股')
     stock_list.sort()
+    results_list = ','.join(set(stock_list))
+    print(type(results_list))
+    db_pool = pymysql.connect(host='localhost',
+                              user='root',
+                              port=3308,
+                              password='r6kEwqWU9!v3',
+                              database='hlfx_pool')
+    cursor_pool = db_pool.cursor()
+    sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % ('stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+    cursor_pool.execute(sql)
+    db_pool.commit()
+
     print(dt.now(), '开始下载!')
-    xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
+    # xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
     print(dt.now(), '下载完成,准备入库!')
     step = math.ceil(len(stock_list) / mp.cpu_count())
     pool = mp.Pool(processes=mp.cpu_count())
@@ -86,12 +97,12 @@ if __name__ == '__main__':
     pus = psutil.Process()
     # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19])
 
-    # download_data()
+    download_data()
 
-    scheduler = BlockingScheduler()
-    scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
-                      timezone="Asia/Shanghai", max_instances=10)
-    try:
-        scheduler.start()
-    except (KeyboardInterrupt, SystemExit):
-        pass
+    # scheduler = BlockingScheduler()
+    # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
+    #                   timezone="Asia/Shanghai", max_instances=10)
+    # try:
+    #     scheduler.start()
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass

+ 111 - 78
QMT/qmt_get_indicators.py

@@ -8,6 +8,7 @@ from sqlalchemy import create_engine, text
 from jqdatasdk import *
 import pymysql
 import multiprocessing as mp
+from multiprocessing import freeze_support
 import math
 import talib as ta
 from xtquant import xtdata
@@ -16,6 +17,7 @@ import traceback
 from apscheduler.schedulers.blocking import BlockingScheduler
 import psutil
 import random
+import logging
 
 pd.set_option('display.max_columns', None)  # 设置显示最大行
 
@@ -117,7 +119,7 @@ def get_hlfx(data):
             # 底
             # 符合底分型形态,且第2、3根k线是阳线
             if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
-                (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
+                    (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
                 # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
                 #     df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
 
@@ -148,9 +150,9 @@ def get_hlfx(data):
 
                             # 获得MACD,判断MACD判断背驰
                             x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
-                            data_temp.loc[x, 'macd']
+                                data_temp.loc[x, 'macd']
                             m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
-                            data_temp.loc[m, 'macd']
+                                data_temp.loc[m, 'macd']
 
                             # MACD底背驰
                             if m_macd_dif < x_macd_dif:
@@ -194,9 +196,9 @@ def get_hlfx(data):
 
                             # 获得MACD,判断MACD判断背驰
                             x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
-                            data_temp.loc[x, 'macd']
+                                data_temp.loc[x, 'macd']
                             m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
-                            data_temp.loc[m, 'macd']
+                                data_temp.loc[m, 'macd']
 
                             # MACD顶背驰
                             if x_macd_dif < m_macd_dif:
@@ -219,95 +221,126 @@ def get_hlfx(data):
 
 
 def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
-    print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},父进程是{os.getppid},池子长度为{len(stocks)}')
-    m = 0
-
-    for stock in stocks:
-        engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8',
-                               )
-        engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8',
-                                    )
-        # print(stock)
-        try:
-            df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
-            df.dropna(axis=0, how='any')
-            engine.dispose()
-        except BaseException:
-            print(f'{stock}读取有问题')
-            traceback.print_exc()
-            pass
-        else:
-            if len(df) != 0:
-                try:
-                    get_macd_data(df)
-                    get_ris(df)
-                    get_bias(df)
-                    get_wilr(df)
-                    df_temp, T_signals = get_hlfx(df)
-                    df = pd.merge(df, df_temp, on='time', how='left')
-                    df['HL'].fillna(value='-', inplace=True)
-                    df = df.reset_index(drop=True)
-                    # print(stock, '\n', df[['open_front', 'HL']])
-                    df = df.replace([np.inf, -np.inf], np.nan)
-                    df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
-                    engine_tech.dispose()
-                # with engine.connect() as con:
-                #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
-                except BaseException:
-                    print(f'{stock}存储有问题')
-                    traceback.print_exc()
-                    err_list.append(stock)
-                    pass
-                else:
-                    # print(f"{stock} 成功!")
-                    m += 1
+    try:
+        print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},父进程是{os.getppid()},池子长度为{len(stocks)}')
+        m = 0
+
+        for stock in stocks:
+            engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
+            engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+            # print(stock)
+            try:
+                df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
+                df.dropna(axis=0, how='any')
+                engine.dispose()
+            except BaseException:
+                print(f'{stock}读取有问题')
+                traceback.print_exc()
+                pass
             else:
-                err_list.append(stock)
-                print(f'{stock}数据为空')
+                if len(df) != 0:
+                    try:
+                        get_macd_data(df)
+                        get_ris(df)
+                        get_bias(df)
+                        get_wilr(df)
+                        df_temp, T_signals = get_hlfx(df)
+                        df = pd.merge(df, df_temp, on='time', how='left')
+                        df['HL'].fillna(value='-', inplace=True)
+                        df = df.reset_index(drop=True)
+                    except BaseException:
+                        print(f'{stock}计算指标有问题')
+                        # print(stock, '\n', df[['open_front', 'HL']])
+                        pass
+                    try:
+                        df = df.replace([np.inf, -np.inf], np.nan)
+                        df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+                        # engine_tech.dispose()
+                    # with engine.connect() as con:
+                    #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
+                    except BaseException:
+                        print(f'{stock}存储有问题')
+                        traceback.print_exc()
+                        err_list.append(stock)
+                        pass
+                    else:
+                        # print(f"{stock} 成功!")
+                        m += 1
+                else:
+                    err_list.append(stock)
+                    print(f'{stock}数据为空')
+
+                if stock in hlfx_pool and T_signals == 2:
+                    hlfx_pool.remove(stock)
+                elif stock not in hlfx_pool and T_signals == 1:
+                    hlfx_pool.append(stock)
+                    hlfx_pool_daily.append(stock)
+    except Exception as e:
+        logging.exception("子进程异常:", os.getpid(), e)
 
-            if stock in hlfx_pool and T_signals == 2:
-                hlfx_pool.remove(stock)
-            elif stock not in hlfx_pool and T_signals == 1:
-                hlfx_pool.append(stock)
-                hlfx_pool_daily.append(stock)
+    print(f'{dt.now()}, Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
 
 
+def split_list(lst, num_parts):
+    avg = len(lst) // num_parts
+    rem = len(lst) % num_parts
 
-    print(f'Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
+    partitions = []
+    start = 0
+    for i in range(num_parts):
+        end = start + avg + (1 if i < rem else 0)
+        partitions.append(lst[start:end])
+        start = end
+
+    return partitions
 
 
 def ind():
+    fre = '1d'
+    logging.basicConfig(filename='error.log', level=logging.ERROR)
+    logger = mp.log_to_stderr()
+    logger.setLevel(logging.DEBUG)
+    logger.warning('doomed')
+
+    # mp.log_to_stderr()
+
     sttime = dt.now()
+    num_cpus = mp.cpu_count()
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
 
-    stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    # stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    stocks = pd.read_sql_query(
+        text("select securities from %s" % 'stocks_list'), engine_hlfx_pool.connect()).iloc[-1, 0].split(",")
     print(len(stocks))
     # stocks.sort()
+    # print(type(stocks))
     random.shuffle(stocks)
+    partitions = split_list(stocks, num_cpus)
 
+    print(len(partitions))
+    # random.shuffle(stocks)
 
     err_list = mp.Manager().list()
-    fre = '1d'
-    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8',
-                                     )
+
+
     hlfx_pool = mp.Manager().list()
     hlfx_pool_daily = mp.Manager().list()
     hlfx_pool.extend(pd.read_sql_query(
         text("select value from %s" % fre), engine_hlfx_pool.connect()).iloc[-1, 0].split(","))
     engine_hlfx_pool.dispose()
 
-    pool = mp.Pool(processes=int(mp.cpu_count()))
-    step = math.ceil(len(stocks) / mp.cpu_count())
-    # pool = mp.Pool(processes=18)
-    # step = math.ceil(len(stocks) / 18)
+    pool = mp.Pool(processes=int(num_cpus))
+    step = math.ceil(len(stocks) / num_cpus)
+    # pool = mp.Pool(processes=20)
+    # step = math.ceil(len(stocks) / 20)
     # step = 10000
     # tech_anal(stocks, hlfx_pool)
-    for i in range(0, len(stocks), step):
-        pool.apply_async(func=tech_anal, args=(stocks[i:i + step], hlfx_pool, hlfx_pool_daily, err_list,),
+    for lst in partitions:
+        pool.apply_async(func=tech_anal, args=(lst, hlfx_pool, hlfx_pool_daily, err_list,),
                          error_callback=err_call_back)
-    time.sleep(5)
+    # time.sleep(3)
     pool.close()
     pool.join()
-    
 
     print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
     print(len(err_list), err_list)
@@ -343,15 +376,15 @@ def ind():
 
 
 if __name__ == '__main__':
-    pus = psutil.Process()
+    # pus = psutil.Process()
     # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
-
-    # ind()
-
-    scheduler = BlockingScheduler()
-    scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
-                      timezone="Asia/Shanghai", max_instances=10)
-    try:
-        scheduler.start()
-    except (KeyboardInterrupt, SystemExit):
-        pass
+    freeze_support()
+    ind()
+
+    # scheduler = BlockingScheduler()
+    # scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
+    #                   timezone="Asia/Shanghai", max_instances=10)
+    # try:
+    #     scheduler.start()
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass

+ 388 - 0
QMT/qmt_get_indicators_3308.py

@@ -0,0 +1,388 @@
+# coding:utf-8
+from datetime import datetime as dt
+import numpy as np
+import os
+import pandas as pd
+import time
+from sqlalchemy import create_engine, text
+from jqdatasdk import *
+import pymysql
+import multiprocessing as mp
+from multiprocessing import freeze_support
+import math
+import talib as ta
+from xtquant import xtdata
+import os
+import traceback
+from apscheduler.schedulers.blocking import BlockingScheduler
+import psutil
+import random
+import logging
+
+pd.set_option('display.max_columns', None)  # 设置显示最大行
+
+
+def err_call_back(err):
+    print(f'出错啦~ error:{str(err)}')
+    traceback.print_exc()
+
+
+def myself_kdj(df):
+    low_list = df['low_back'].rolling(9, min_periods=9).min()
+    low_list.fillna(value=df['low_back'].expanding().min(), inplace=True)
+    high_list = df['high_back'].rolling(9, min_periods=9).max()
+    high_list.fillna(value=df['high_back'].expanding().max(), inplace=True)
+    rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100
+    df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
+    df['d'] = df['k'].ewm(com=2).mean()
+    df['j'] = 3 * df['k'] - 2 * df['d']
+    return df
+
+
+# macd指标
+def get_macd_data(data, short=0, long1=0, mid=0):
+    if short == 0:
+        short = 12
+    if long1 == 0:
+        long1 = 26
+    if mid == 0:
+        mid = 9
+    data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean()
+    data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean()
+    data.fillna(0, inplace=True)
+    data['dif'] = data['sema'] - data['lema']
+    data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
+    data['macd'] = 2 * (data['dif'] - data['dea'])
+    data.fillna(0, inplace=True)
+    # return data[['dif', 'dea', 'macd']]
+
+
+# rsi指标
+def get_ris(data):
+    data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
+    data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
+    data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24)
+
+
+def get_bias(data):
+    # 计算方法:
+    # bias指标
+    # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
+    data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \
+                     data['close_back'].rolling(6, min_periods=1).mean() * 100
+    data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \
+                      data['close_back'].rolling(12, min_periods=1).mean() * 100
+    data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \
+                      data['close_back'].rolling(24, min_periods=1).mean() * 100
+    data['bias_6'] = round(data['bias_6'], 2)
+    data['bias_12'] = round(data['bias_12'], 2)
+    data['bias_24'] = round(data['bias_24'], 2)
+
+
+def get_wilr(data):
+    # 威廉指标
+    # 建议用talib库的WILLR方法,亲测有用
+    data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14)
+
+
+def get_hlfx(data):
+    Trading_signals = 0
+    data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']]
+    data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
+    df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
+    # 先处理去包含
+    for i in data_temp.index:
+        if i == 0 or i == 1:
+            df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
+        # 不包含
+        elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
+              and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
+                or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
+                    and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
+            df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
+        # 包含
+        else:
+            # 左高,下降
+            if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
+                df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
+                df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
+            else:
+                # 右高,上升
+                df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
+                df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
+    # print('111', df_day, data_temp)
+
+    if len(df_day.index) > 2:
+        # 寻找顶底分型
+        for x in range(2, len(df_day.index)):
+            m = x - 1
+            # 底
+            # 符合底分型形态,且第2、3根k线是阳线
+            if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
+                    (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
+                # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
+                #     df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
+
+                df_day.loc[x, 'HL'] = 'L*'
+
+                while m:
+                    if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
+                        if (x - m) > 3:
+                            # 成笔——>L
+                            df_day.loc[x, 'HL'] = 'L'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 1
+                        else:
+                            # 不成笔 次级别中枢,保持L* 修订原H为H*
+                            df_day.loc[m, 'HL'] = 'H*'
+                        break
+
+                    elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
+                        if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
+                            # 前一个为底更高,且中间不存在更低的底
+                            df_day.loc[x, 'HL'] = 'L'
+                            df_day.loc[m, 'HL'] = '-'
+
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 1
+
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
+                                data_temp.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
+                                data_temp.loc[m, 'macd']
+
+                            # MACD底背驰
+                            if m_macd_dif < x_macd_dif:
+                                # 次级别背驰底->LL
+                                df_day.loc[x, 'HL'] = 'LL'
+                            break
+                        else:
+                            # 前底更低,本底无效
+                            df_day.loc[x, 'HL'] = '-'
+                            break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'L'
+
+            # 顶
+            elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
+                    df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
+
+                df_day.loc[x, 'HL'] = 'H*'
+                while m:
+                    if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
+                        if x - m > 3:
+                            # 成笔->H
+                            df_day.loc[x, 'HL'] = 'H'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 2
+                        else:
+                            # 不成笔 次级别中枢,保持H* 修订原L为L*
+                            df_day.loc[m, 'HL'] = 'L*'
+                        break
+
+                    elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
+                        if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
+                            # 前一个为顶,且中间存在不包含 or 更高的顶
+                            df_day.loc[x, 'HL'] = 'H'
+                            df_day.loc[m, 'HL'] = '-'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 2
+
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
+                                data_temp.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
+                                data_temp.loc[m, 'macd']
+
+                            # MACD顶背驰
+                            if x_macd_dif < m_macd_dif:
+                                # 次级别背驰底->HH
+                                df_day.loc[x, 'HL'] = 'HH'
+                            break
+                        else:
+                            # 前顶更高,本顶无效
+                            df_day.loc[x, 'HL'] = '-'
+                            break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'H'
+
+            else:
+                df_day.loc[x, 'HL'] = '-'
+    df_temp = df_day[['time', 'HL']]
+
+    return df_temp, Trading_signals
+
+
+def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
+    try:
+        print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},父进程是{os.getppid()},池子长度为{len(stocks)}')
+        m = 0
+
+        for stock in stocks:
+            engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8')
+            engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_tech?charset=utf8')
+            # print(stock)
+            try:
+                df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
+                df.dropna(axis=0, how='any')
+                engine.dispose()
+            except BaseException:
+                print(f'{stock}读取有问题')
+                traceback.print_exc()
+                pass
+            else:
+                if len(df) != 0:
+                    try:
+                        get_macd_data(df)
+                        get_ris(df)
+                        get_bias(df)
+                        get_wilr(df)
+                        df_temp, T_signals = get_hlfx(df)
+                        df = pd.merge(df, df_temp, on='time', how='left')
+                        df['HL'].fillna(value='-', inplace=True)
+                        df = df.reset_index(drop=True)
+                    except BaseException:
+                        print(f'{stock}计算指标有问题')
+                        # print(stock, '\n', df[['open_front', 'HL']])
+                        pass
+                    try:
+                        df = df.replace([np.inf, -np.inf], np.nan)
+                        df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+                        # engine_tech.dispose()
+                    # with engine.connect() as con:
+                    #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
+                    except BaseException:
+                        print(f'{stock}存储有问题')
+                        traceback.print_exc()
+                        err_list.append(stock)
+                        pass
+                    else:
+                        # print(f"{stock} 成功!")
+                        m += 1
+                else:
+                    err_list.append(stock)
+                    print(f'{stock}数据为空')
+
+                if stock in hlfx_pool and T_signals == 2:
+                    hlfx_pool.remove(stock)
+                elif stock not in hlfx_pool and T_signals == 1:
+                    hlfx_pool.append(stock)
+                    hlfx_pool_daily.append(stock)
+    except Exception as e:
+        logging.exception("子进程异常:", os.getpid(), e)
+
+    print(f'{dt.now()}, Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
+
+
+def split_list(lst, num_parts):
+    avg = len(lst) // num_parts
+    rem = len(lst) % num_parts
+
+    partitions = []
+    start = 0
+    for i in range(num_parts):
+        end = start + avg + (1 if i < rem else 0)
+        partitions.append(lst[start:end])
+        start = end
+
+    return partitions
+
+
+def ind():
+    logging.basicConfig(filename='error.log', level=logging.ERROR)
+    logger = mp.log_to_stderr()
+    logger.setLevel(logging.DEBUG)
+    logger.warning('doomed')
+
+    # mp.log_to_stderr()
+
+    sttime = dt.now()
+    num_cpus = mp.cpu_count()
+
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+
+    # stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    stocks = pd.read_sql_query(
+        text("select securities from %s" % 'stocks_list'), engine_hlfx_pool.connect()).iloc[-1, 0].split(",")
+    print(len(stocks))
+    # stocks.sort()
+    # print(type(stocks))
+    random.shuffle(stocks)
+    print(type(stocks))
+    partitions = split_list(stocks, num_cpus)
+
+    print(len(partitions))
+
+    err_list = mp.Manager().list()
+    fre = '1d'
+    hlfx_pool = mp.Manager().list()
+    hlfx_pool_daily = mp.Manager().list()
+    hlfx_pool.extend(pd.read_sql_query(
+        text("select value from %s" % fre), engine_hlfx_pool.connect()).iloc[-1, 0].split(","))
+    engine_hlfx_pool.dispose()
+    pool = mp.Pool(processes=int(num_cpus))
+    step = math.ceil(len(stocks) / num_cpus)
+    # pool = mp.Pool(processes=20)
+    # step = math.ceil(len(stocks) / 20)
+    # step = 10000
+    # tech_anal(stocks, hlfx_pool)
+    for lst in partitions:
+        pool.apply_async(func=tech_anal, args=(lst, hlfx_pool, hlfx_pool_daily, err_list,),
+                         error_callback=err_call_back)
+    # time.sleep(3)
+    pool.close()
+    pool.join()
+
+    print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
+    print(len(err_list), err_list)
+
+    results_list = ','.join(set(hlfx_pool))
+    results_list_daily = ','.join(set(hlfx_pool_daily))
+
+    # 存档入库
+    db_pool = pymysql.connect(host='localhost',
+                              user='root',
+                              port=3308,
+                              password='r6kEwqWU9!v3',
+                              database='hlfx_pool')
+    cursor_pool = db_pool.cursor()
+    sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+    cursor_pool.execute(sql)
+    db_pool.commit()
+
+    # 存档入库daily_1d
+    db_pool2 = pymysql.connect(host='localhost',
+                               user='root',
+                               port=3308,
+                               password='r6kEwqWU9!v3',
+                               database='hlfx_pool')
+    cursor_pool2 = db_pool2.cursor()
+    sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
+                                                                    results_list_daily)
+    cursor_pool2.execute(sql2)
+    db_pool2.commit()
+
+    edtime = dt.now()
+    print(edtime - sttime)
+
+
+if __name__ == '__main__':
+    # pus = psutil.Process()
+    # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
+    freeze_support()
+    ind()
+
+    # scheduler = BlockingScheduler()
+    # scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
+    #                   timezone="Asia/Shanghai", max_instances=10)
+    # try:
+    #     scheduler.start()
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass

+ 3 - 2
QMT/qmt_real_hlfx.py

@@ -295,6 +295,7 @@ def hlfx(stock_list, data):
 
 
 def prepare(data):
+    print(dt.now())
     stock_list = list(data.keys())
     if len(data.keys()) >= 12:
         cpu_count = 12
@@ -306,7 +307,7 @@ def prepare(data):
     for i in range(0, len(stock_list), step):
         to_hlfx_list.append([x for x in stock_list[i:i + step]])
 
-    pool = mp.Pool(processes=cpu_count, maxtasksperchild=12)
+    pool = mp.Pool(processes=int(cpu_count/2))
     for m in range(len(to_hlfx_list)):
         pool.apply_async(func=hlfx,
                          args=(to_hlfx_list[m], data), error_callback=err_call_back)
@@ -336,7 +337,7 @@ if __name__ == '__main__':
     print(f'总进程pid:{os.getpid()}')
     mp.freeze_support()
     pus = psutil.Process()
-    pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
+    # pus.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7])
 
     path = r'c:\\qmt\\userdata_mini'
     # 生成session id 整数类型 同时运行的策略不能重复

+ 392 - 0
cudf/qmt_cudf_get_indicators.py

@@ -0,0 +1,392 @@
+# coding:utf-8
+from datetime import datetime as dt
+import numpy as np
+import os
+import pandas as pd
+import time
+from sqlalchemy import create_engine, text
+from jqdatasdk import *
+import pymysql
+import multiprocessing as mp
+from multiprocessing import freeze_support
+import math
+import talib as ta
+from xtquant import xtdata
+import os
+import traceback
+from apscheduler.schedulers.blocking import BlockingScheduler
+import psutil
+import random
+import logging
+
+pd.set_option('display.max_columns', None)  # 设置显示最大行
+
+
+def err_call_back(err):
+    print(f'出错啦~ error:{str(err)}')
+    traceback.print_exc()
+
+
+def myself_kdj(df):
+    low_list = df['low_back'].rolling(9, min_periods=9).min()
+    low_list.fillna(value=df['low_back'].expanding().min(), inplace=True)
+    high_list = df['high_back'].rolling(9, min_periods=9).max()
+    high_list.fillna(value=df['high_back'].expanding().max(), inplace=True)
+    rsv = (df['close_back'] - low_list) / (high_list - low_list) * 100
+    df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
+    df['d'] = df['k'].ewm(com=2).mean()
+    df['j'] = 3 * df['k'] - 2 * df['d']
+    return df
+
+
+# macd指标
+def get_macd_data(data, short=0, long1=0, mid=0):
+    if short == 0:
+        short = 12
+    if long1 == 0:
+        long1 = 26
+    if mid == 0:
+        mid = 9
+    data['sema'] = pd.Series(data['close_back']).ewm(span=short).mean()
+    data['lema'] = pd.Series(data['close_back']).ewm(span=long1).mean()
+    data.fillna(0, inplace=True)
+    data['dif'] = data['sema'] - data['lema']
+    data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
+    data['macd'] = 2 * (data['dif'] - data['dea'])
+    data.fillna(0, inplace=True)
+    # return data[['dif', 'dea', 'macd']]
+
+
+# rsi指标
+def get_ris(data):
+    data["rsi_6"] = ta.RSI(data['close_back'], timeperiod=6)
+    data["rsi_12"] = ta.RSI(data['close_back'], timeperiod=12)
+    data["rsi_24"] = ta.RSI(data['close_back'], timeperiod=24)
+
+
+def get_bias(data):
+    # 计算方法:
+    # bias指标
+    # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
+    data['bias_6'] = (data['close_back'] - data['close_back'].rolling(6, min_periods=1).mean()) / \
+                     data['close_back'].rolling(6, min_periods=1).mean() * 100
+    data['bias_12'] = (data['close_back'] - data['close_back'].rolling(12, min_periods=1).mean()) / \
+                      data['close_back'].rolling(12, min_periods=1).mean() * 100
+    data['bias_24'] = (data['close_back'] - data['close_back'].rolling(24, min_periods=1).mean()) / \
+                      data['close_back'].rolling(24, min_periods=1).mean() * 100
+    data['bias_6'] = round(data['bias_6'], 2)
+    data['bias_12'] = round(data['bias_12'], 2)
+    data['bias_24'] = round(data['bias_24'], 2)
+
+
+def get_wilr(data):
+    # 威廉指标
+    # 建议用talib库的WILLR方法,亲测有用
+    data['willr'] = ta.WILLR(data['high_back'], data['low_back'], data['close_back'], timeperiod=14)
+
+
+def get_hlfx(data):
+    Trading_signals = 0
+    data_temp = data[['time', 'open_back', 'close_back', 'high_back', 'low_back', 'dif', 'dea', 'macd']]
+    data_temp.columns = ['time', 'open', 'close', 'high', 'low', 'dif', 'dea', 'macd']
+    df_day = pd.DataFrame(columns=['time', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'])
+    # 先处理去包含
+    for i in data_temp.index:
+        if i == 0 or i == 1:
+            df_day = pd.concat([df_day, data_temp.iloc[[i]]], ignore_index=True)
+        # 不包含
+        elif (df_day.iloc[-1, 3] > data_temp.loc[i, 'high']
+              and df_day.iloc[-1, 4] > data_temp.loc[i, 'low']) \
+                or (df_day.iloc[-1, 3] < data_temp.loc[i, 'high']
+                    and df_day.iloc[-1, 4] < data_temp.loc[i, 'low']):
+            df_day = pd.concat([df_day, data_temp.loc[[i]]], ignore_index=True)
+        # 包含
+        else:
+            # 左高,下降
+            if df_day.iloc[-2, 3] > df_day.iloc[-1, 3]:
+                df_day.iloc[-1, 3] = min(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
+                df_day.iloc[-1, 4] = min(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
+            else:
+                # 右高,上升
+                df_day.iloc[-1, 3] = max(df_day.iloc[-1, 3], data_temp.loc[i, 'high'])
+                df_day.iloc[-1, 4] = max(df_day.iloc[-1, 4], data_temp.loc[i, 'low'])
+    # print('111', df_day, data_temp)
+
+    if len(df_day.index) > 2:
+        # 寻找顶底分型
+        for x in range(2, len(df_day.index)):
+            m = x - 1
+            # 底
+            # 符合底分型形态,且第2、3根k线是阳线
+            if ((df_day.loc[x, 'high'] > df_day.loc[x - 1, 'high']) and
+                    (df_day.loc[x - 2, 'high'] > df_day.loc[x - 1, 'high'])):
+                # and df_day.loc[x, 'close'] > df_day.loc[x, 'open'] and \
+                #     df_day.loc[x - 1, 'close'] > df_day.loc[x - 1, 'open']:
+
+                df_day.loc[x, 'HL'] = 'L*'
+
+                while m:
+                    if df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
+                        if (x - m) > 3:
+                            # 成笔——>L
+                            df_day.loc[x, 'HL'] = 'L'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 1
+                        else:
+                            # 不成笔 次级别中枢,保持L* 修订原H为H*
+                            df_day.loc[m, 'HL'] = 'H*'
+                        break
+
+                    elif df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
+                        if df_day.loc[m - 1, 'low'] > df_day.loc[x - 1, 'low']:
+                            # 前一个为底更高,且中间不存在更低的底
+                            df_day.loc[x, 'HL'] = 'L'
+                            df_day.loc[m, 'HL'] = '-'
+
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 1
+
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
+                                data_temp.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
+                                data_temp.loc[m, 'macd']
+
+                            # MACD底背驰
+                            if m_macd_dif < x_macd_dif:
+                                # 次级别背驰底->LL
+                                df_day.loc[x, 'HL'] = 'LL'
+                            break
+                        else:
+                            # 前底更低,本底无效
+                            df_day.loc[x, 'HL'] = '-'
+                            break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'L'
+
+            # 顶
+            elif ((df_day.loc[x, 'high'] < df_day.loc[x - 1, 'high']) and (
+                    df_day.loc[x - 2, 'high'] < df_day.loc[x - 1, 'high'])):
+
+                df_day.loc[x, 'HL'] = 'H*'
+                while m:
+                    if df_day.loc[m, 'HL'] in ['L', 'LL', 'L*']:
+                        if x - m > 3:
+                            # 成笔->H
+                            df_day.loc[x, 'HL'] = 'H'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 2
+                        else:
+                            # 不成笔 次级别中枢,保持H* 修订原L为L*
+                            df_day.loc[m, 'HL'] = 'L*'
+                        break
+
+                    elif df_day.loc[m, 'HL'] in ['H', 'HH', 'H*']:
+                        if df_day.loc[x - 1, 'high'] > df_day.loc[m - 1, 'high']:
+                            # 前一个为顶,且中间存在不包含 or 更高的顶
+                            df_day.loc[x, 'HL'] = 'H'
+                            df_day.loc[m, 'HL'] = '-'
+                            # 产生信号,进入hlfx_pool
+                            if x == len(df_day.index) - 1:
+                                Trading_signals = 2
+
+                            # 获得MACD,判断MACD判断背驰
+                            x_macd_dif, x_macd_dea, x_macd_macd = data_temp.loc[x, 'dif'], data_temp.loc[x, 'dea'], \
+                                data_temp.loc[x, 'macd']
+                            m_macd_dif, m_macd_dea, m_macd_macd = data_temp.loc[m, 'dif'], data_temp.loc[m, 'dea'], \
+                                data_temp.loc[m, 'macd']
+
+                            # MACD顶背驰
+                            if x_macd_dif < m_macd_dif:
+                                # 次级别背驰底->HH
+                                df_day.loc[x, 'HL'] = 'HH'
+                            break
+                        else:
+                            # 前顶更高,本顶无效
+                            df_day.loc[x, 'HL'] = '-'
+                            break
+                    m = m - 1
+                    if m == 0:
+                        df_day.loc[x, 'HL'] = 'H'
+
+            else:
+                df_day.loc[x, 'HL'] = '-'
+    df_temp = df_day[['time', 'HL']]
+
+    return df_temp, Trading_signals
+
+
+def tech_anal(stocks, hlfx_pool, hlfx_pool_daily, err_list):
+    try:
+        print(f'{dt.now()}开始循环计算! MyPid is {os.getpid()},父进程是{os.getppid()},池子长度为{len(stocks)}')
+        m = 0
+
+        for stock in stocks:
+            engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
+            engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
+            # print(stock)
+            try:
+                df = pd.read_sql_table('%s_1d' % stock, con=engine.connect())
+                df.dropna(axis=0, how='any')
+                engine.dispose()
+            except BaseException:
+                print(f'{stock}读取有问题')
+                traceback.print_exc()
+                pass
+            else:
+                if len(df) != 0:
+                    try:
+                        get_macd_data(df)
+                        get_ris(df)
+                        get_bias(df)
+                        get_wilr(df)
+                        df_temp, T_signals = get_hlfx(df)
+                        df = pd.merge(df, df_temp, on='time', how='left')
+                        df['HL'].fillna(value='-', inplace=True)
+                        df = df.reset_index(drop=True)
+                    except BaseException:
+                        print(f'{stock}计算指标有问题')
+                        # print(stock, '\n', df[['open_front', 'HL']])
+                        pass
+                    try:
+                        df = df.replace([np.inf, -np.inf], np.nan)
+                        df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
+                        # engine_tech.dispose()
+                    # with engine.connect() as con:
+                    #     con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
+                    except BaseException:
+                        print(f'{stock}存储有问题')
+                        traceback.print_exc()
+                        err_list.append(stock)
+                        pass
+                    else:
+                        # print(f"{stock} 成功!")
+                        m += 1
+                else:
+                    err_list.append(stock)
+                    print(f'{stock}数据为空')
+
+                if stock in hlfx_pool and T_signals == 2:
+                    hlfx_pool.remove(stock)
+                elif stock not in hlfx_pool and T_signals == 1:
+                    hlfx_pool.append(stock)
+                    hlfx_pool_daily.append(stock)
+    except Exception as e:
+        logging.exception("子进程异常:", os.getpid(), e)
+
+    print(f'{dt.now()}, Pid:{os.getpid()}已经完工了,应处理{len(stocks)},共计算{m}支个股')
+
+
+def split_list(lst, num_parts):
+    avg = len(lst) // num_parts
+    rem = len(lst) % num_parts
+
+    partitions = []
+    start = 0
+    for i in range(num_parts):
+        end = start + avg + (1 if i < rem else 0)
+        partitions.append(lst[start:end])
+        start = end
+
+    return partitions
+
+
+def ind():
+    fre = '1d'
+    logging.basicConfig(filename='error.log', level=logging.ERROR)
+    logger = mp.log_to_stderr()
+    logger.setLevel(logging.DEBUG)
+    logger.warning('doomed')
+
+    # mp.log_to_stderr()
+
+    sttime = dt.now()
+    num_cpus = mp.cpu_count()
+    engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
+
+    # stocks = xtdata.get_stock_list_in_sector('沪深A股')
+    stocks = pd.read_sql_query(
+        text("select securities from %s" % 'stocks_list'), engine_hlfx_pool.connect()).iloc[-1, 0].split(",")
+    print(len(stocks))
+    # stocks.sort()
+    # print(type(stocks))
+    random.shuffle(stocks)
+    print(type(stocks))
+    partitions = split_list(stocks, num_cpus)
+
+    print(len(partitions))
+    exit()
+    # random.shuffle(stocks)
+
+    err_list = mp.Manager().list()
+
+
+    hlfx_pool = mp.Manager().list()
+    hlfx_pool_daily = mp.Manager().list()
+    hlfx_pool.extend(pd.read_sql_query(
+        text("select value from %s" % fre), engine_hlfx_pool.connect()).iloc[-1, 0].split(","))
+    engine_hlfx_pool.dispose()
+
+    pool = mp.Pool(processes=int(num_cpus))
+    step = math.ceil(len(stocks) / num_cpus)
+    # pool = mp.Pool(processes=20)
+    # step = math.ceil(len(stocks) / 20)
+    # step = 10000
+    # tech_anal(stocks, hlfx_pool)
+    for lst in partitions:
+        pool.apply_async(func=tech_anal, args=(lst, hlfx_pool, hlfx_pool_daily, err_list,),
+                         error_callback=err_call_back)
+    # time.sleep(3)
+    pool.close()
+    pool.join()
+
+    print(f'当日信号:{len(hlfx_pool_daily)},持续检测为:{len(hlfx_pool)}')
+    print(len(err_list), err_list)
+
+    results_list = ','.join(set(hlfx_pool))
+    results_list_daily = ','.join(set(hlfx_pool_daily))
+
+    # 存档入库
+    db_pool = pymysql.connect(host='localhost',
+                              user='root',
+                              port=3307,
+                              password='r6kEwqWU9!v3',
+                              database='hlfx_pool')
+    cursor_pool = db_pool.cursor()
+    sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
+    cursor_pool.execute(sql)
+    db_pool.commit()
+
+    # 存档入库daily_1d
+    db_pool2 = pymysql.connect(host='localhost',
+                               user='root',
+                               port=3307,
+                               password='r6kEwqWU9!v3',
+                               database='hlfx_pool')
+    cursor_pool2 = db_pool2.cursor()
+    sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
+                                                                    results_list_daily)
+    cursor_pool2.execute(sql2)
+    db_pool2.commit()
+
+    edtime = dt.now()
+    print(edtime - sttime)
+
+
+if __name__ == '__main__':
+    # pus = psutil.Process()
+    # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
+    freeze_support()
+    ind()
+
+    # scheduler = BlockingScheduler()
+    # scheduler.add_job(func=ind, trigger='cron', day_of_week='0-4', hour='20', minute='30',
+    #                   timezone="Asia/Shanghai", max_instances=10)
+    # try:
+    #     scheduler.start()
+    # except (KeyboardInterrupt, SystemExit):
+    #     pass

+ 7 - 4
docker.start.txt

@@ -20,6 +20,7 @@ docker run  -itd  -p 3307:3306 --name mysql8033 -e character-set-server=utf8mb4
 docker run  -itd  -p 3307:3306 --name mysql8033 -e character-set-server=utf8mb4 --privileged=true  --restart unless-stopped -v C:/docker_mysql/stock_data:/var/lib/mysql  -e MYSQL_ROOT_PASSWORD=r6kEwqWU9!v3  -d mysql:8.0.33 --lower_case_table_names=1 --skip-log-bin --disable-log-bin
 
 
+docker run --restart=always --log-opt max-size=40960m --log-opt max-file=2 -p 6379:6379 --name redis -v C:/docker_redis:/data --network stk-net -d redislabs/redisearch:2.8.3 redis-server --appendonly yes
 
 
 20230515
@@ -29,12 +30,14 @@ docker run  -itd  -p 3309:3306 --name mysqltest -e character-set-server=utf8mb4
 20230626
 OMMAND
 docker pull nvcr.io/nvidia/rapidsai/rapidsai-core:23.06-cuda11.8-runtime-ubuntu22.04-py3.10
-docker run --name cudf --gpus all --rm -it --shm-size=1g --ulimit memlock=-1 --ulimit stack=67108864 --network stk-net -p 8888:8888 -p 8787:8787 -p 8786:8786 -p 8722:22 nvcr.io/nvidia/rapidsai/rapidsai-core:23.06-cuda11.8-runtime-ubuntu22.04-py3.10
+docker run -d --name cudf --gpus all -it --shm-size=1g --ulimit memlock=-1 --ulimit stack=67108864 --restart unless-stopped --network stk-net -p 8888:8888 -p 8787:8787 -p 8786:8786 -p 8722:22 nvcr.io/nvidia/rapidsai/rapidsai-core:23.06-cuda11.8-runtime-ubuntu22.04-py3.10
 
 
-20230627
-test
-test 1314
+20230706
+docker run --name tidb -d -v C:/docker_tidb:/tmp/tidb --privileged=true --restart unless-stopped -p 3308:4000 -p 10080:10080 --network stk-net pingcap/tidb:latest
+docker run --name pgsql -d -v C:/docker_pgsql:/var/lib/postgresql --privileged=true --restart unless-stopped -p 3309:5432 -e POSTGRES_PASSWORD='r6kEwqWU9!v3' --network stk-net postgres:13
+docker run --name tidb -d -v C:/docker_tidb:/tmp/tidb --privileged=true --restart unless-stopped -p 3308:4000 -p 10080:10080 --network stk-net pingcap/tidb:latest
+
 
 # For advice on how to change settings please see
 # http://dev.mysql.com/doc/refman/8.0/en/server-configuration-defaults.html