from jqdatasdk import * import pandas as pd import pymysql from sqlalchemy import create_engine import threading from datetime import datetime as dt auth('18019403367', 'Qwer4321') starttime = dt.now() # 连接数据库 # db_stk_sql = pymysql.connect(host='localhost', # user='root', # port=3307, # password='r6kEwqWU9!v3', # database='stocks', # connect_timeout=600) # # # db_qbh = pymysql.connect(host='localhost', # user='root', # port=3307, # password='r6kEwqWU9!v3', # database='qbh', # charset='utf8') # # # cursor = db_qbh.cursor() # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx?charset=utf8') engine2 = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/stocks?charset=utf8') stocks = list(get_all_securities(['stock'], date='2022-02-01').index) # stocks =stocks[0:70] thd = threading.local() # def qbh(stocks, engine, engine_backup): fre = '30m' stk = locals() # 获取数据存入DataFrame for stock in stocks: stk['stk'+stock] = pd.read_sql_query('select date,open,close,high,low,volume,money from `stk%s_%s`' % (stock, fre), engine2) # print(stock, stk['stk'+stock[:6]]) print("###############################################################################################################" "###############################################################################################################" "###############################################################################################################" "###############################################################################################################" "###############################################################################################################" "###############################################################################################################" "###############################################################################################################") # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh?charset=utf8') def qbh(stocks, engine, engine_backup): for stock in stocks: thd.new_df = pd.DataFrame(columns=('date', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL')) # print(new_df.head()) thd.df_day = stk['stk' + stock] for i in thd.df_day.index: if i == 0 or i == 1: thd.new_df = pd.concat([thd.new_df, thd.df_day.iloc[[i]]], ignore_index=True) # 不包含 elif (thd.new_df.iloc[-1, 3] > thd.df_day.loc[i, 'high'] and thd.new_df.iloc[-1, 4] > thd.df_day.loc[i, 'low']) \ or (thd.new_df.iloc[-1, 3] < thd.df_day.loc[i, 'high'] and thd.new_df.iloc[-1, 4] < thd.df_day.loc[i, 'low']): thd.new_df = pd.concat([thd.new_df, thd.df_day.iloc[[i]]], ignore_index=True) # 包含 else: # (new_df.iloc[-1,3]>=df_day.loc[i,'high'] and new_df.iloc[-1,4]<= df_day.loc[i,'low']): # 左高,下降 if thd.new_df.iloc[-2, 3] > thd.new_df.iloc[-1, 3]: thd.new_df.iloc[-1, 3] = min(thd.new_df.iloc[-1, 3], thd.df_day.loc[i, 'high']) thd.new_df.iloc[-1, 4] = min(thd.new_df.iloc[-1, 4], thd.df_day.loc[i, 'low']) else: # 右高,上升 thd.new_df.iloc[-1, 3] = max(thd.new_df.iloc[-1, 3], thd.df_day.loc[i, 'high']) thd.new_df.iloc[-1, 4] = max(thd.new_df.iloc[-1, 4], thd.df_day.loc[i, 'low']) thd.new_df.to_sql('stk%s_%s' % (stock, fre), con=engine, index=True, if_exists='append') with engine.connect() as con: con.execute('ALTER TABLE `stk%s_%s` ADD PRIMARY KEY (`date`);' % (stock, fre)) # thd.new_df.to_sql('stk%s_%s' % (stock[:6], u), con=engine_backup, index=True, if_exists='replace') # with engine_backup.connect() as con_backup: # con_backup.execute('ALTER TABLE stk%s_%s ADD PRIMARY KEY (`date`);' % (stock[:6], u)) # thd.new_df.to_csv( # '/Users/daniel/Library/CloudStorage/OneDrive-个人/个人/python_stocks/20220211qbh/qbh%s.csv' % stock[:6]) print(stock) print("**************") # # # new_df.to_csv('new_df.csv') # # #return new_df engine = [] engine_backup = [] # # # # 开始去包含 # qbh(stocks) thread_list = [] step = 100 times_engine = 0 for m in range(0, len(stocks), step): engine.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh?charset=utf8')) engine_backup.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx_backup?charset=utf8')) thread = threading.Thread(target=qbh, args=(stocks[m:m + step], engine[times_engine], engine_backup[times_engine])) times_engine =times_engine + 1 thread.start() thread_list.append(thread) for thread in thread_list: thread.join() # endtime = dt.now() print((endtime-starttime).seconds)