from xtquant import xtdata
from xtquant import xtdatacenter as xtdc
from datetime import datetime as dt
import pandas as pd
import math
from sqlalchemy import create_engine, text
import multiprocessing as mp
from multiprocessing import freeze_support
import os
from apscheduler.schedulers.blocking import BlockingScheduler
import traceback
import psutil
import pymysql
from tqdm import tqdm
import logging

xtdata.connect('', port=586211, remember_if_success=True)

pd.set_option('display.max_columns', None)  # 设置显示最大行

# path = 'C:\\qmt\\userdata_mini'
# path = '\\DANIEL-NUC\\qmt\\userdata_mini'
path = 'C:\\方正证券FQT交易客户端\\userdata_mini'
# path = 'C:\\迅投极速交易终端睿智融科版'

field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
# 创建共享计数器
count = mp.Value('i', 0)

eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8', )


# eng_w2 = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8')


def err_call_back(err):
    print(f'问题在这里~ error:{str(err)}')
    traceback.print_exc()


def to_sql(stock):
    global eng_w

    # 后复权数据
    data_back = xtdata.get_market_data(field, [stock], '1d', count=-1, dividend_type='back')
    df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
                                                             'amount']], axis=1)

    df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
    df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
    df_back.reset_index(drop=True, inplace=True)



    # 前复权数据
    data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
    df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
                                                               'amount']], axis=1)
    df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
                        'amount_front']
    df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
    df = pd.merge_asof(df_back, df_front, 'time')
    df = df.round(2)
    # print(df)
    try:
        # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
        df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
    # df.to_sql('%s_1d' % stock, con=eng_w2, index=False, if_exists='replace', chunksize=20000)
    except BaseException as e:
        print(stock, e)
        pass
    finally:
        print(f'{stock}入库完成!')
        eng_w.dispose()
    # eng_w2.dispose()


def on_progress(data):
    print(data)
    return


def download_data():
    global count
    stock_list = xtdata.get_stock_list_in_sector('沪深A股')
    '''
	# 连接数据库 获取股票列表
	conn_engine_hlfx_pool = create_engine(
		'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')

	con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()

	stock_list = pd.read_sql_query(
		text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
	'''

    results_list = ','.join(set(stock_list))
    print(f'今日个股列表为{len(stock_list)}')
    db_pool = pymysql.connect(host='localhost',
                              user='root',
                              port=3307,
                              password='r6kEwqWU9!v3',
                              database='hlfx_pool')
    # db_pool2 = pymysql.connect(host='localhost',
    #                            user='root',
    #                            port=3308,
    #                            password='r6kEwqWU9!v3',
    #                            database='hlfx_pool')
    cursor_pool = db_pool.cursor()
    # cursor_pool2 = db_pool2.cursor()
    sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % (
        'stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
    cursor_pool.execute(sql)
    # cursor_pool2.execute(sql)
    try:
        db_pool.commit()
    except BaseException as e:
        print(e)
    # db_pool.rollback()
    # db_pool2.commit()

    print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '开始下载!')
    xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='',callback=on_progress)
    print(dt.now().strftime('%Y-%m-%d %H:%M:%S'), '下载完成,准备入库!')

    async_results = []
    pool = mp.Pool(processes=8)
    for stock in tqdm(stock_list, desc='入库进度'):
        async_result = pool.apply_async(func=to_sql, args=(stock,), error_callback=err_call_back)
        async_results.append(async_result)
    print(f'记录循环{len(async_results)}次!')
    pool.close()
    pool.join()

    # 统计返回为 None 的结果数量
    none_count = 0
    for i, result_async in enumerate(async_results):
        _ = result_async.get()  # 获取任务的结果
        if _ is None:
            none_count += 1

    print(f"{dt.now().strftime('%Y-%m-%d %H:%M:%S')}\n今日数据{len(async_results)}下载完毕,入库{none_count}条!")


if __name__ == '__main__':
    logger = mp.log_to_stderr()
    logger.setLevel(logging.DEBUG)
    freeze_support()
    field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
    cpu_list = list(range(24))
    pus = psutil.Process()
    pus.cpu_affinity(cpu_list)

    download_data()

# scheduler = BlockingScheduler()
# scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
#                   timezone="Asia/Shanghai", max_instances=10)
# try:
#     scheduler.start()
# except (KeyboardInterrupt, SystemExit):
#     pass