get_macd.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import os
  4. import pandas as pd
  5. import time
  6. from sqlalchemy import create_engine
  7. from jqdatasdk import *
  8. import pymysql
  9. import multiprocessing as mp
  10. import math
  11. import talib as ta
  12. # pd.set_option('display.max_columns', None) # 设置显示最大行
  13. def err_call_back(err):
  14. print(f'出错啦~ error:{str(err)}')
  15. def myself_kdj(df):
  16. low_list = df['low'].rolling(9, min_periods=9).min()
  17. low_list.fillna(value=df['low'].expanding().min(), inplace=True)
  18. high_list = df['high'].rolling(9, min_periods=9).max()
  19. high_list.fillna(value=df['high'].expanding().max(), inplace=True)
  20. rsv = (df['close'] - low_list) / (high_list - low_list) * 100
  21. df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
  22. df['d'] = df['k'].ewm(com=2).mean()
  23. df['j'] = 3 * df['k'] - 2 * df['d']
  24. return df
  25. # macd指标
  26. def get_macd_data(data, short=0, long1=0, mid=0):
  27. if short == 0:
  28. short = 12
  29. if long1 == 0:
  30. long1 = 26
  31. if mid == 0:
  32. mid = 9
  33. data['sema'] = pd.Series(data['close']).ewm(span=short).mean()
  34. data['lema'] = pd.Series(data['close']).ewm(span=long1).mean()
  35. data.fillna(0, inplace=True)
  36. data['dif'] = data['sema'] - data['lema']
  37. data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
  38. data['macd'] = 2 * (data['dif'] - data['dea'])
  39. data.fillna(0, inplace=True)
  40. return data[['dif', 'dea', 'macd']]
  41. # rsi指标
  42. # 建议用talib库的RSI方法,亲测有用
  43. def get_ris(data):
  44. data["rsi_6"] = ta.RSI(data['close'], timeperiod=6)
  45. data["rsi_12"] = ta.RSI(data['close'], timeperiod=12)
  46. data["rsi_24"] = ta.RSI(data['close'], timeperiod=24)
  47. def get_bias(df):
  48. # 计算方法:
  49. # bias指标
  50. # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
  51. df['bias_6'] = (df['close'] - df['close'].rolling(6, min_periods=1).mean()) / \
  52. df['close'].rolling(6, min_periods=1).mean() * 100
  53. df['bias_12'] = (df['close'] - df['close'].rolling(12, min_periods=1).mean()) / \
  54. df['close'].rolling(12, min_periods=1).mean() * 100
  55. df['bias_24'] = (df['close'] - df['close'].rolling(24, min_periods=1).mean()) / \
  56. df['close'].rolling(24, min_periods=1).mean() * 100
  57. df['bias_6'] = round(df['bias_6'], 2)
  58. df['bias_12'] = round(df['bias_12'], 2)
  59. df['bias_24'] = round(df['bias_24'], 2)
  60. def get_wilr(df):
  61. # 威廉指标
  62. # 建议用talib库的WILLR方法,亲测有用
  63. df['willr'] = ta.WILLR(df['high'], df['low'], df['close'], timeperiod=14)
  64. def tech_anal(datas):
  65. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
  66. engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
  67. for stock in datas:
  68. stock = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
  69. df = pd.read_sql_table('%s_1d' % stock, con=engine)
  70. get_macd_data(df)
  71. get_ris(df)
  72. get_bias(df)
  73. get_wilr(df)
  74. df = df.reset_index(drop=True)
  75. df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
  76. # with engine.connect() as con:
  77. # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
  78. print(stock, df)
  79. if __name__ == '__main__':
  80. sttime = dt.now()
  81. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  82. stock_pool = pd.read_sql_query('select securities from stocks_list', engine_hlfx_pool)
  83. stock_pool = stock_pool.iloc[-1, 0].split(",")
  84. stock_pool.sort()
  85. print(len(stock_pool))
  86. pool = mp.Pool(processes=mp.cpu_count())
  87. # step = math.ceil(len(stock_pool) / mp.cpu_count())
  88. step = 100
  89. for i in range(0, len(stock_pool), step):
  90. pool.apply_async(func=tech_anal, args=(stock_pool[i:i + step],), error_callback=err_call_back)
  91. pool.close()
  92. pool.join()
  93. edtime = dt.now()
  94. print(edtime - sttime)