Jelajahi Sumber

v0.9999999版

Daniel 3 tahun lalu
induk
melakukan
f50bf4cf80
2 mengubah file dengan 33 tambahan dan 18 penghapusan
  1. 13 10
      hlfx.py
  2. 20 8
      qbh.py

+ 13 - 10
hlfx.py

@@ -2,8 +2,9 @@ import threading
 import pymysql
 import pandas as pd
 from sqlalchemy import create_engine
+from datetime import datetime as dt
 
-
+starttime = dt.now()
 # 数据库引擎
 # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx?charset=utf8')
 
@@ -32,9 +33,7 @@ def hlfx(table_list, engine, tosql):
     for table in table_list:
         # stk.fxdf = pd.DataFrame(columns=('date', 'open', 'close', 'high', 'low', 'volume', 'money', 'HL'))
         stk.df_day = pd.read_sql_query('select date,open,close,high,low,volume,money,HL from %s' % table, engine)
-        stk.df_day.to_sql(name='%s' % table, con=tosql, index=True, if_exists='replace')
-        with tosql.connect() as con_backup:
-            con_backup.execute('ALTER TABLE %s ADD PRIMARY KEY (`date`);' % table)
+
         for i in stk.df_day.index:
             m = i - 1
             if i <= 3:
@@ -83,32 +82,36 @@ def hlfx(table_list, engine, tosql):
                     m = m-1
             else:
                 stk.df_day.loc[i, 'HL'] = '-'
-        stk.df_day.to_sql('%s' % table, con=engine, index=True, if_exists='replace', chunksize=20000)
-        print(table, '\n', stk.df_day)
         stk.df_day.to_csv('/Users/daniel/Library/CloudStorage/OneDrive-个人/个人/python_stocks/20220212hlfx2/hlfx%s.csv' % table)
-        stk.df_day.to_sql(name='%s' % table, con=tosql, index=True, if_exists='replace')
+        stk.df_day.to_sql('%s' % table, con=tosql, index=True, if_exists='replace')
         with tosql.connect() as con_backup:
             con_backup.execute('ALTER TABLE %s ADD PRIMARY KEY (`date`);' % table)
+        print(table, '\n', '**********************************')
 
 # table_list = ['stk002237_1d','stk000004_1d']
 # engine = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx_backup?charset=utf8')
 # tosql = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/bb22?charset=utf8')
 # hlfx(table_list, engine, tosql)
 
-step = 50
+step = 100
 thread_list = []
 engine = []
 tosql = []
+times_engine = 0
 for i in range(0, len(table_list), step):
-    engine.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx_backup?charset=utf8'))
+    engine.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx?charset=utf8'))
     tosql.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_backup?charset=utf8'))
-    thread = threading.Thread(target=hlfx, args=(table_list[i:i + step], engine[i], tosql[i]))
+    thread = threading.Thread(target=hlfx, args=(table_list[i:i + step], engine[times_engine], tosql[times_engine]))
+    times_engine = times_engine + 1
     thread.start()
     thread_list.append(thread)
 
 for thread in thread_list:
     thread.join()
 
+endtime = dt.now()
+print('总时长:', (endtime - starttime).seconds)
+
 
 
 

+ 20 - 8
qbh.py

@@ -4,8 +4,9 @@ import pandas as pd
 import pymysql
 from sqlalchemy import create_engine
 import threading
+from datetime import datetime as dt
 
-
+starttime = dt.now()
 
 # 连接数据库
 # db_stk_sql = pymysql.connect(host='localhost',
@@ -59,16 +60,16 @@ def qbh(stocks, engine, engine_backup):
                     # 右高,上升
                     thd.new_df.iloc[-1, 3] = max(thd.new_df.iloc[-1, 3], thd.df_day.loc[i, 'high'])
                     thd.new_df.iloc[-1, 4] = max(thd.new_df.iloc[-1, 4], thd.df_day.loc[i, 'low'])
-        thd.new_df.to_sql('stk%s_%s' % (stock[:6], u), con=engine, index=True, if_exists='replace')
-        with engine.connect() as con:
-            con.execute('ALTER TABLE stk%s_%s ADD PRIMARY KEY (`date`);' % (stock[:6], u))
+        # thd.new_df.to_sql('stk%s_%s' % (stock[:6], u), con=engine, index=True, if_exists='replace')
+        # with engine.connect() as con:
+        #     con.execute('ALTER TABLE stk%s_%s ADD PRIMARY KEY (`date`);' % (stock[:6], u))
         thd.new_df.to_sql('stk%s_%s' % (stock[:6], u), con=engine_backup, index=True, if_exists='replace')
         with engine_backup.connect() as con_backup:
             con_backup.execute('ALTER TABLE stk%s_%s ADD PRIMARY KEY (`date`);' % (stock[:6], u))
         # thd.new_df.to_csv(
         #     '/Users/daniel/Library/CloudStorage/OneDrive-个人/个人/python_stocks/20220211qbh/qbh%s.csv' % stock[:6])
-        # print(stock)
-        # print("**************")
+        print(stock)
+        print("**************")
         #
         # # new_df.to_csv('new_df.csv')
         #
@@ -95,8 +96,19 @@ print("#########################################################################
 
 # 开始去包含
 # qbh(stocks)
-step = 100
+thread_list = []
+step = 1000
+times_engine = 0
 for m in range(0, len(stocks), step):
     engine.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx?charset=utf8', pool_recycle= 3600))
     engine_backup.append(create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qbh_hlfx_backup?charset=utf8', pool_recycle= 3600))
-    threading.Thread(target=qbh, args=(stocks[m:m + step], engine[m], engine_backup[m])).start()
+    thread = threading.Thread(target=qbh, args=(stocks[m:m + step], engine[times_engine], engine_backup[times_engine]))
+    times_engine =times_engine + 1
+    thread.start()
+    thread_list.append(thread)
+
+for thread in thread_list:
+    thread.join()
+
+endtime = dt.now()
+print((endtime-starttime).seconds)