123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- # coding:utf-8
- import time
- from datetime import datetime as dt
- import socket
- import pandas as pd
- import numpy as np
- from sqlalchemy import create_engine, text
- from jqdatasdk import *
- import pymysql
- import multiprocessing as mp
- from multiprocessing import freeze_support
- import concurrent.futures
- import math
- import talib as ta
- import os
- import traceback
- import random
- import logging
- from myindicator import myind
- import psutil
- from tqdm import tqdm
- from itertools import islice
- from func_timeout import func_set_timeout, FunctionTimedOut
- from apscheduler.schedulers.blocking import BlockingScheduler
- # 显示最大行与列
- pd.set_option('display.max_rows', None)
- pd.set_option('display.max_columns', None)
- # 设置日志
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- # 创建连接池
- engine = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8', pool_recycle=3600, pool_size=100,
- max_overflow=20)
- engine_tech = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8', pool_size=100, pool_recycle=3600,
- max_overflow=20)
- # engine_tech2 = create_engine(
- # 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_tech?charset=utf8', pool_size=100, max_overflow=20)
- def err_call_back(err):
- logging.error(f'进程池出错~ error:{str(err)}')
- traceback.print_exc()
- def tech_anal(stock, df_stock, fre, hlfx_pool, hlfx_pool_daily, err_list):
- import pandas as pd
- t_signals = 0
- global engine
- global engine_tech
- # global engine_tech2
- try:
- # con_engine = engine.connect()
- # con_engine_tech = engine_tech.connect()
- # con_engine_tech2 = engine_tech2.connect()
- try:
- # table_name = f'{stock}_{fre}'
- # 从engine中读取table_name表存入df
- # df = pd.read_sql_table(table_name, con=engine)
- table_name = stock
- df = df_stock
- df.dropna(axis=0, how='any')
- except BaseException as e:
- print(f"{stock}读取有问题")
- traceback.print_exc()
- err_list.append(stock[0:9])
- else:
- if len(df) != 0:
- # 计算技术指标
- print(f'{stock}开始计算技术指标')
- try:
- myind.get_macd_data(df)
- myind.get_ris(df)
- myind.get_bias(df)
- myind.get_wilr(df)
- df = df.round(2)
- df_temp, t_signals = myind.get_hlfx(df)
- df = pd.merge(df, df_temp, on='time', how='left')
- df['HL'].fillna(value='-', inplace=True)
- df = df.reset_index(drop=True)
- df = df.replace([np.inf, -np.inf], np.nan)
- df = df.round(2)
- except BaseException as e:
- print(f'{stock}计算有问题', e)
- else:
- # 存入数据库
- try:
- # pass
- df.to_sql('%s' % stock, con=engine_tech, index=False, if_exists='replace')
- # df.to_sql('%s_1d' % stock, con=engine_tech2, index=False, if_exists='replace')
- except BaseException as e:
- print(f'{stock}存储有问题', e)
- traceback.print_exc()
- err_list.append(stock[0:9])
- else:
- err_list.append(stock[0:9])
- print(f'{stock}数据为空')
- finally:
- if stock in hlfx_pool and t_signals == 2:
- hlfx_pool.remove(stock)
- elif stock not in hlfx_pool and t_signals == 1:
- hlfx_pool.append(stock[0:9])
- hlfx_pool_daily.append(stock[0:9])
- # con_engine.close()
- # con_engine_tech.close()
- print(f'{stock}计算完成!')
- # con_engine_tech2.close()
- # print(f"{stock}, {T_signals}, '\n', {df_temp.head(20)}")
- # print(f'{stock}计算完成!')
- except Exception as e:
- logging.error(f'子进程{os.getpid()}问题在这里~~ error:{str(e)}')
- traceback.print_exc()
- engine.dispose()
- engine_tech.dispose()
- # engine_tech2.dispose()
- def query_database(table_name):
- engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
- df = pd.read_sql_table(table_name, engine)
- engine.dispose()
- return df
- def get_stock_data():
- while True:
- try:
- db = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='qmt_stocks_whole')
- cursor = db.cursor()
- cursor.execute("show tables like '%%%s%%' " % '1d')
- table_list = [tuple[0] for tuple in cursor.fetchall()]
- table_list = table_list
- cursor.close()
- db.close()
- print(f'开始数据库读取')
- with concurrent.futures.ProcessPoolExecutor(max_workers=24) as executor:
- # 使用executor.map方法实现多进程并行查询数据库,得到每个表的数据,并存储在一个字典中
- data_dict = {table_name: df for table_name, df in
- tqdm(zip(table_list, executor.map(query_database, table_list)))}
- print(f'数据库读取完成')
- break
- except BaseException as e:
- print(f'数据库读取错误{e}')
- time.sleep(30)
- continue
- return data_dict
- # 分割列表
- def split_list(lst, num_parts):
- avg = len(lst) // num_parts
- rem = len(lst) % num_parts
- partitions = []
- start = 0
- for i in range(num_parts):
- end = start + avg + (1 if i < rem else 0)
- partitions.append(lst[start:end])
- start = end
- return partitions
- def chunked_iterable(iterable, size):
- """将可迭代对象分割为指定大小的块"""
- it = iter(iterable)
- while True:
- chunk = tuple(islice(it, size))
- if not chunk:
- return
- yield chunk
- # 多进程实现技术指标计算
- def ind():
- # 记录开始时间
- start_time = dt.now()
- fre = '1d'
- if socket.gethostname() == 'DESKTOP-PC':
- num_cpus = mp.cpu_count()
- else:
- num_cpus = mp.cpu_count()
- print(
- f"{socket.gethostname()}共有{num_cpus}个核心\n{start_time.strftime('%Y-%m-%d %H:%M:%S')}开始计算{fre}技术指标")
- while True:
- try:
- # 连接数据库 获取股票列表
- conn_engine_hlfx_pool = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
- con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
- # stocks = xtdata.get_stock_list_in_sector('沪深A股')
- stocks = pd.read_sql_query(
- text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
- con_engine_hlfx_pool.close()
- conn_engine_hlfx_pool.dispose()
- except BaseException as e:
- print(f'股票列表读取错误{e}')
- continue
- else:
- print(f'股票列表长度为{len(stocks)}')
- break
- err_list, hlfx_pool, hlfx_pool_daily = mp.Manager().list(), mp.Manager().list(), mp.Manager().list() # 定义共享列表
- # 多进程执行tech_anal方法
- # 保存AsyncResult对象的列表
- async_results = []
- # m = 0
- # with concurrent.futures.ProcessPoolExecutor(max_workers=num_cpus) as executor:
- # for stock in tqdm(stocks):
- # executor.submit(tech_anal, stock, fre, hlfx_pool, hlfx_pool_daily, err_list)
- # m += 1
- # print(m)
- # 获取数据
- stock_data_dict = get_stock_data()
- # 设置每一轮的任务数
- CHUNK_SIZE = 200 # 您可以根据需要进行调整
- timeout = 120
- max_retries =3
- for chunk in chunked_iterable(stock_data_dict.items(), CHUNK_SIZE):
- retries = 0
- while True:
- print(f'chunk:{chunk[0][0]}-{chunk[-1][0]}')
- with mp.Pool(processes=min(CHUNK_SIZE, len(chunk), num_cpus)) as pool: # 使用最小值确保不会超出任务数或超过24核心
- for stock, df_stock in chunk:
- print('**************', stock)
- async_result = pool.apply_async(func=tech_anal, args=(stock, df_stock, fre, hlfx_pool, hlfx_pool_daily,
- err_list), error_callback=err_call_back)
- async_results.append(async_result)
- try:
- for async_result in async_results:
- result = async_result.get(timeout=timeout)
- except mp.TimeoutError:
- retries += 1
- print(f"Timeout occurred in pool. Retry {retries}/{max_retries}...")
- continue
- except FunctionTimedOut:
- retries += 1
- print(f"Timeout occurred in worker. Retry {retries}/{max_retries}...")
- continue
- except Exception as e:
- print(f"Error occurred: {e}")
- break
- else:
- pool.close()
- pool.join()
- break
- # with mp.Pool(processes=1) as pool:
- # for stock, df_stock in tqdm(stock_data_dict.items()):
- # # print(stock, df_stock.shape)
- # async_result = pool.apply_async(tech_anal, args=(stock, df_stock, fre, hlfx_pool, hlfx_pool_daily, err_list),
- # error_callback=err_call_back)
- # async_results.append(async_result)
- # pool.close()
- # pool.join()
- # 统计返回为 None 的结果数量
- none_count = 0
- for i, result_async in enumerate(async_results):
- result = result_async.get() # 获取任务的结果
- # print(f"The result of task {i} is: {result}")
- if result is None:
- none_count += 1
- print(
- f"共计算{none_count}/{len(async_results)},\n当日信号:{len(hlfx_pool_daily)},\n持续检测为:{len(hlfx_pool)}, \n错误列表:{err_list}")
- # 将list转换为字符串
- results_list = ','.join(set(hlfx_pool))
- results_list_daily = ','.join(set(hlfx_pool_daily))
- # 建立数据库连接
- db_pool = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='hlfx_pool')
- # db_pool2 = pymysql.connect(host='localhost',
- # user='root',b
- # port=3308,
- # password='r6kEwqWU9!v3',
- # database='hlfx_pool')
- # 将list插入数据库
- cursor = db_pool.cursor()
- # cursor2 = db_pool2.cursor()
- sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
- sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
- results_list_daily)
- try:
- cursor.execute(sql)
- cursor.execute(sql2)
- # cursor2.execute(sql)
- # cursor2.execute(sql2)
- db_pool.commit()
- # db_pool2.commit()
- except Exception as e:
- print(f'1d存入有问题', e)
- # db_pool.rollback()
- finally:
- print(f"results_list_daily:{results_list_daily}")
- cursor.close()
- db_pool.close()
- # cursor2.close()
- # db_pool2.close()
- # 记录结束时间
- end_time = dt.now()
- print(f"运行时间:{end_time - start_time}")
- if __name__ == '__main__':
- logger = mp.log_to_stderr()
- logger.setLevel(logging.DEBUG)
- freeze_support()
- # 创建一个0-17的列表,用于设置cpu亲和度
- cpu_list = list(range(23))
- pus = psutil.Process()
- pus.cpu_affinity(cpu_list)
- ind()
|