230715_get_indicators.py 12 KB


  1. # coding:utf-8
  2. import time
  3. from datetime import datetime as dt
  4. import socket
  5. import pandas as pd
  6. import numpy as np
  7. from sqlalchemy import create_engine, text
  8. from jqdatasdk import *
  9. import pymysql
  10. import multiprocessing as mp
  11. from multiprocessing import freeze_support
  12. import concurrent.futures
  13. import math
  14. import talib as ta
  15. import os
  16. import traceback
  17. import random
  18. import logging
  19. from myindicator import myind
  20. import psutil
  21. from tqdm import tqdm
  22. from itertools import islice
  23. from func_timeout import func_set_timeout, FunctionTimedOut
  24. from apscheduler.schedulers.blocking import BlockingScheduler
  25. # 显示最大行与列
  26. pd.set_option('display.max_rows', None)
  27. pd.set_option('display.max_columns', None)
  28. # 设置日志
  29. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  30. # 创建连接池
  31. engine = create_engine(
  32. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8', pool_recycle=3600, pool_size=100,
  33. max_overflow=20)
  34. engine_tech = create_engine(
  35. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8', pool_size=100, pool_recycle=3600,
  36. max_overflow=20)
  37. # engine_tech2 = create_engine(
  38. # 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_tech?charset=utf8', pool_size=100, max_overflow=20)
  39. def err_call_back(err):
  40. logging.error(f'进程池出错~ error:{str(err)}')
  41. traceback.print_exc()
  42. def tech_anal(stock, df_stock, fre, hlfx_pool, hlfx_pool_daily, err_list):
  43. import pandas as pd
  44. t_signals = 0
  45. global engine
  46. global engine_tech
  47. # global engine_tech2
  48. try:
  49. # con_engine = engine.connect()
  50. # con_engine_tech = engine_tech.connect()
  51. # con_engine_tech2 = engine_tech2.connect()
  52. try:
  53. # table_name = f'{stock}_{fre}'
  54. # 从engine中读取table_name表存入df
  55. # df = pd.read_sql_table(table_name, con=engine)
  56. table_name = stock
  57. df = df_stock
  58. df.dropna(axis=0, how='any')
  59. except BaseException as e:
  60. print(f"{stock}读取有问题")
  61. traceback.print_exc()
  62. err_list.append(stock[0:9])
  63. else:
  64. if len(df) != 0:
  65. # 计算技术指标
  66. print(f'{stock}开始计算技术指标')
  67. try:
  68. myind.get_macd_data(df)
  69. myind.get_ris(df)
  70. myind.get_bias(df)
  71. myind.get_wilr(df)
  72. df = df.round(2)
  73. df_temp, t_signals = myind.get_hlfx(df)
  74. df = pd.merge(df, df_temp, on='time', how='left')
  75. df['HL'].fillna(value='-', inplace=True)
  76. df = df.reset_index(drop=True)
  77. df = df.replace([np.inf, -np.inf], np.nan)
  78. df = df.round(2)
  79. except BaseException as e:
  80. print(f'{stock}计算有问题', e)
  81. else:
  82. # 存入数据库
  83. try:
  84. # pass
  85. df.to_sql('%s' % stock, con=engine_tech, index=False, if_exists='replace')
  86. # df.to_sql('%s_1d' % stock, con=engine_tech2, index=False, if_exists='replace')
  87. except BaseException as e:
  88. print(f'{stock}存储有问题', e)
  89. traceback.print_exc()
  90. err_list.append(stock[0:9])
  91. else:
  92. err_list.append(stock[0:9])
  93. print(f'{stock}数据为空')
  94. finally:
  95. if stock in hlfx_pool and t_signals == 2:
  96. hlfx_pool.remove(stock)
  97. elif stock not in hlfx_pool and t_signals == 1:
  98. hlfx_pool.append(stock[0:9])
  99. hlfx_pool_daily.append(stock[0:9])
  100. # con_engine.close()
  101. # con_engine_tech.close()
  102. print(f'{stock}计算完成!')
  103. # con_engine_tech2.close()
  104. # print(f"{stock}, {T_signals}, '\n', {df_temp.head(20)}")
  105. # print(f'{stock}计算完成!')
  106. except Exception as e:
  107. logging.error(f'子进程{os.getpid()}问题在这里~~ error:{str(e)}')
  108. traceback.print_exc()
  109. engine.dispose()
  110. engine_tech.dispose()
  111. # engine_tech2.dispose()
  112. def query_database(table_name):
  113. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
  114. df = pd.read_sql_table(table_name, engine)
  115. engine.dispose()
  116. return df
  117. def get_stock_data():
  118. while True:
  119. try:
  120. db = pymysql.connect(host='localhost',
  121. user='root',
  122. port=3307,
  123. password='r6kEwqWU9!v3',
  124. database='qmt_stocks_whole')
  125. cursor = db.cursor()
  126. cursor.execute("show tables like '%%%s%%' " % '1d')
  127. table_list = [tuple[0] for tuple in cursor.fetchall()]
  128. table_list = table_list
  129. cursor.close()
  130. db.close()
  131. print(f'开始数据库读取')
  132. with concurrent.futures.ProcessPoolExecutor(max_workers=24) as executor:
  133. # 使用executor.map方法实现多进程并行查询数据库,得到每个表的数据,并存储在一个字典中
  134. data_dict = {table_name: df for table_name, df in
  135. tqdm(zip(table_list, executor.map(query_database, table_list)))}
  136. print(f'数据库读取完成')
  137. break
  138. except BaseException as e:
  139. print(f'数据库读取错误{e}')
  140. time.sleep(30)
  141. continue
  142. return data_dict
  143. # 分割列表
  144. def split_list(lst, num_parts):
  145. avg = len(lst) // num_parts
  146. rem = len(lst) % num_parts
  147. partitions = []
  148. start = 0
  149. for i in range(num_parts):
  150. end = start + avg + (1 if i < rem else 0)
  151. partitions.append(lst[start:end])
  152. start = end
  153. return partitions
  154. def chunked_iterable(iterable, size):
  155. """将可迭代对象分割为指定大小的块"""
  156. it = iter(iterable)
  157. while True:
  158. chunk = tuple(islice(it, size))
  159. if not chunk:
  160. return
  161. yield chunk
  162. # 多进程实现技术指标计算
  163. def ind():
  164. # 记录开始时间
  165. start_time = dt.now()
  166. fre = '1d'
  167. if socket.gethostname() == 'DESKTOP-PC':
  168. num_cpus = mp.cpu_count()
  169. else:
  170. num_cpus = mp.cpu_count()
  171. print(
  172. f"{socket.gethostname()}共有{num_cpus}个核心\n{start_time.strftime('%Y-%m-%d %H:%M:%S')}开始计算{fre}技术指标")
  173. while True:
  174. try:
  175. # 连接数据库 获取股票列表
  176. conn_engine_hlfx_pool = create_engine(
  177. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  178. con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
  179. # stocks = xtdata.get_stock_list_in_sector('沪深A股')
  180. stocks = pd.read_sql_query(
  181. text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
  182. con_engine_hlfx_pool.close()
  183. conn_engine_hlfx_pool.dispose()
  184. except BaseException as e:
  185. print(f'股票列表读取错误{e}')
  186. continue
  187. else:
  188. print(f'股票列表长度为{len(stocks)}')
  189. break
  190. err_list, hlfx_pool, hlfx_pool_daily = mp.Manager().list(), mp.Manager().list(), mp.Manager().list() # 定义共享列表
  191. # 多进程执行tech_anal方法
  192. # 保存AsyncResult对象的列表
  193. async_results = []
  194. # m = 0
  195. # with concurrent.futures.ProcessPoolExecutor(max_workers=num_cpus) as executor:
  196. # for stock in tqdm(stocks):
  197. # executor.submit(tech_anal, stock, fre, hlfx_pool, hlfx_pool_daily, err_list)
  198. # m += 1
  199. # print(m)
  200. # 获取数据
  201. stock_data_dict = get_stock_data()
  202. # 设置每一轮的任务数
  203. CHUNK_SIZE = 200 # 您可以根据需要进行调整
  204. timeout = 120
  205. max_retries =3
  206. for chunk in chunked_iterable(stock_data_dict.items(), CHUNK_SIZE):
  207. retries = 0
  208. while True:
  209. print(f'chunk:{chunk[0][0]}-{chunk[-1][0]}')
  210. with mp.Pool(processes=min(CHUNK_SIZE, len(chunk), num_cpus)) as pool: # 使用最小值确保不会超出任务数或超过24核心
  211. for stock, df_stock in chunk:
  212. print('**************', stock)
  213. async_result = pool.apply_async(func=tech_anal, args=(stock, df_stock, fre, hlfx_pool, hlfx_pool_daily,
  214. err_list), error_callback=err_call_back)
  215. async_results.append(async_result)
  216. try:
  217. for async_result in async_results:
  218. result = async_result.get(timeout=timeout)
  219. except mp.TimeoutError:
  220. retries += 1
  221. print(f"Timeout occurred in pool. Retry {retries}/{max_retries}...")
  222. continue
  223. except FunctionTimedOut:
  224. retries += 1
  225. print(f"Timeout occurred in worker. Retry {retries}/{max_retries}...")
  226. continue
  227. except Exception as e:
  228. print(f"Error occurred: {e}")
  229. break
  230. else:
  231. pool.close()
  232. pool.join()
  233. break
  234. # with mp.Pool(processes=1) as pool:
  235. # for stock, df_stock in tqdm(stock_data_dict.items()):
  236. # # print(stock, df_stock.shape)
  237. # async_result = pool.apply_async(tech_anal, args=(stock, df_stock, fre, hlfx_pool, hlfx_pool_daily, err_list),
  238. # error_callback=err_call_back)
  239. # async_results.append(async_result)
  240. # pool.close()
  241. # pool.join()
  242. # 统计返回为 None 的结果数量
  243. none_count = 0
  244. for i, result_async in enumerate(async_results):
  245. result = result_async.get() # 获取任务的结果
  246. # print(f"The result of task {i} is: {result}")
  247. if result is None:
  248. none_count += 1
  249. print(
  250. f"共计算{none_count}/{len(async_results)},\n当日信号:{len(hlfx_pool_daily)},\n持续检测为:{len(hlfx_pool)}, \n错误列表:{err_list}")
  251. # 将list转换为字符串
  252. results_list = ','.join(set(hlfx_pool))
  253. results_list_daily = ','.join(set(hlfx_pool_daily))
  254. # 建立数据库连接
  255. db_pool = pymysql.connect(host='localhost',
  256. user='root',
  257. port=3307,
  258. password='r6kEwqWU9!v3',
  259. database='hlfx_pool')
  260. # db_pool2 = pymysql.connect(host='localhost',
  261. # user='root',b
  262. # port=3308,
  263. # password='r6kEwqWU9!v3',
  264. # database='hlfx_pool')
  265. # 将list插入数据库
  266. cursor = db_pool.cursor()
  267. # cursor2 = db_pool2.cursor()
  268. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  269. sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  270. results_list_daily)
  271. try:
  272. cursor.execute(sql)
  273. cursor.execute(sql2)
  274. # cursor2.execute(sql)
  275. # cursor2.execute(sql2)
  276. db_pool.commit()
  277. # db_pool2.commit()
  278. except Exception as e:
  279. print(f'1d存入有问题', e)
  280. # db_pool.rollback()
  281. finally:
  282. print(f"results_list_daily:{results_list_daily}")
  283. cursor.close()
  284. db_pool.close()
  285. # cursor2.close()
  286. # db_pool2.close()
  287. # 记录结束时间
  288. end_time = dt.now()
  289. print(f"运行时间:{end_time - start_time}")
  290. if __name__ == '__main__':
  291. logger = mp.log_to_stderr()
  292. logger.setLevel(logging.DEBUG)
  293. freeze_support()
  294. # 创建一个0-17的列表,用于设置cpu亲和度
  295. cpu_list = list(range(23))
  296. pus = psutil.Process()
  297. pus.cpu_affinity(cpu_list)
  298. ind()