get_macd.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import os
  4. import pandas as pd
  5. # from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
  6. # from xtquant.xttype import StockAccount
  7. # from xtquant import xtdata, xtconstant
  8. import time
  9. from sqlalchemy import create_engine
  10. from jqdatasdk import *
  11. import pymysql
  12. import multiprocessing as mp
  13. import math
  14. import talib as ta
  15. pd.set_option('display.max_columns', None) # 设置显示最大行
  16. def err_call_back(err):
  17. print(f'出错啦~ error:{str(err)}')
  18. def myself_kdj(df):
  19. low_list = df['low'].rolling(9, min_periods=9).min()
  20. low_list.fillna(value=df['low'].expanding().min(), inplace=True)
  21. high_list = df['high'].rolling(9, min_periods=9).max()
  22. high_list.fillna(value=df['high'].expanding().max(), inplace=True)
  23. rsv = (df['close'] - low_list) / (high_list - low_list) * 100
  24. df['k'] = pd.DataFrame(rsv).ewm(com=2).mean()
  25. df['d'] = df['k'].ewm(com=2).mean()
  26. df['j'] = 3 * df['k'] - 2 * df['d']
  27. return df
  28. # macd指标
  29. def get_macd_data(data, short=0, long1=0, mid=0):
  30. if short == 0:
  31. short = 12
  32. if long1 == 0:
  33. long1 = 26
  34. if mid == 0:
  35. mid = 9
  36. data['sema'] = pd.Series(data['close']).ewm(span=short).mean()
  37. data['lema'] = pd.Series(data['close']).ewm(span=long1).mean()
  38. data.fillna(0, inplace=True)
  39. data['dif'] = data['sema'] - data['lema']
  40. data['dea'] = pd.Series(data['dif']).ewm(span=mid).mean()
  41. data['macd'] = 2 * (data['dif'] - data['dea'])
  42. data.fillna(0, inplace=True)
  43. return data[['dif', 'dea', 'macd']]
  44. # rsi指标
  45. # 建议用talib库的RSI方法,亲测有用
  46. def get_ris(data):
  47. data["rsi_6"] = ta.RSI(data['close'], timeperiod=6)
  48. data["rsi_12"] = ta.RSI(data['close'], timeperiod=12)
  49. data["rsi_24"] = ta.RSI(data['close'], timeperiod=24)
  50. def get_bias(df):
  51. # 计算方法:
  52. # bias指标
  53. # N期BIAS=(当日收盘价-N期平均收盘价)/N期平均收盘价*100%
  54. df['bias_6'] = (df['close'] - df['close'].rolling(6, min_periods=1).mean()) / df['close'].rolling(6,
  55. min_periods=1).mean() * 100
  56. df['bias_12'] = (df['close'] - df['close'].rolling(12, min_periods=1).mean()) / df['close'].rolling(12,
  57. min_periods=1).mean() * 100
  58. df['bias_24'] = (df['close'] - df['close'].rolling(24, min_periods=1).mean()) / df['close'].rolling(24,
  59. min_periods=1).mean() * 100
  60. df['bias_6'] = round(df['bias_6'], 2)
  61. df['bias_12'] = round(df['bias_12'], 2)
  62. df['bias_24'] = round(df['bias_24'], 2)
  63. def get_wilr(df):
  64. # 威廉指标
  65. # 建议用talib库的WILLR方法,亲测有用
  66. df['willr'] = ta.WILLR(df['high'], df['low'], df['close'], timeperiod=14)
  67. def tech_anal(datas):
  68. engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks?charset=utf8')
  69. engine_tech = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
  70. for stock in datas:
  71. stock = stock.replace('XSHG', 'SH').replace('XSHE', 'SZ')
  72. df = pd.read_sql_table('%s_1d' % stock, con=engine)
  73. get_macd_data(df)
  74. get_ris(df)
  75. get_bias(df)
  76. get_wilr(df)
  77. df = df.reset_index(drop=True)
  78. df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='append')
  79. # with engine.connect() as con:
  80. # con.execute("ALTER TABLE `%s_1d` ADD PRIMARY KEY (`time`);" % stock)
  81. print(stock, df)
  82. if __name__ == '__main__':
  83. sttime = dt.now()
  84. engine_hlfx_pool = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  85. stock_pool = pd.read_sql_query('select securities from stocks_list', engine_hlfx_pool)
  86. stock_pool = stock_pool.iloc[-1, 0].split(",")
  87. stock_pool.sort()
  88. print(len(stock_pool))
  89. pool = mp.Pool(processes=mp.cpu_count())
  90. # step = math.ceil(len(stock_pool) / mp.cpu_count())
  91. step = 100
  92. for i in range(0, len(stock_pool), step):
  93. pool.apply_async(func=tech_anal, args=(stock_pool[i:i+step],), error_callback=err_call_back)
  94. pool.close()
  95. pool.join()
  96. edtime = dt.now()
  97. print(edtime-sttime)