一、程序功能
程序基于 baostock 接口实现 A 股股票数据的获取与存储,主要功能包括股票列表更新、数据下载与处理。程序通过三个核心函数协同工作:
-
update_stk_list(date=None)
:获取指定日期的 A 股股票列表,默认使用当日。自动处理周末日期,尝试最多 3 次 API 调用。筛选出沪市(sh.600000 及以上)和深市(sz.399000 及以下)的股票,保存完整列表和筛选后的列表到 CSV 文件,返回股票代码列表。 -
load_stk_list()
:从本地 CSV 文件加载股票列表,用于非首次运行时避免重复获取。 -
download_data(stk_list, fromdate, todate, datas, frequency, adjustflag)
:下载指定股票的历史 K 线数据,支持日线、周线、月线及分钟线(5/15/30/60 分钟)。可设置复权类型(默认前复权),自动跳过停牌数据,将时间戳转换为可读格式,按股票代码分文件存储。
二、主程序流程
- 初始化数据目录
- 登录 baostock 接口
- 更新或加载股票列表
- 下载日线数据(其他周期数据下载被注释)
- 登出接口
关键参数说明:
DROP_SUSPENSION
:是否删除停牌数据frequency
:数据周期(d/w/m/5/15/30/60)adjustflag
:复权类型(1: 后复权,2: 前复权,3: 不复权)datas
:需要获取的字段列表
程序具备完善的异常处理机制,包括 API 调用失败重试、数据为空处理、网络异常捕获等,确保数据获取的稳定性。
三、程序源代码
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 4 11:26:27 2025@author: Administrator
"""# 导入必要的库
import baostock as bs # 导入baostock接口库用于获取股票数据
import pandas as pd # 导入pandas用于数据处理
import datetime # 导入datetime处理日期
import time # 导入time用于延时处理'''
日线指标参数包括:'date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ,psTTM,pcfNcfTTM,isST'
周、月线指标参数包括:'date,code,open,high,low,close,volume,amount,adjustflag,turn,pctChg'
分钟指标参数包括:'date,time,code,open,high,low,close,volume,amount,adjustflag'adjustflag:复权类型,默认不复权:3;1:后复权;2:前复权。已支持分钟线、日线、周线、月线前后复权。
'''# 是否删除停盘数据
DROP_SUSPENSION = Truedef update_stk_list(date = None):# 如果没有提供日期,使用最近的交易日if date is None:# 获取最近的交易日today = datetime.date.today()# 尝试使用今天作为日期date = today# 如果今天是周末,往前找最近的工作日if today.weekday() >= 5: # 5是周六,6是周日date = today - datetime.timedelta(days=(today.weekday() - 4))print(f"尝试获取 {date} 的股票列表")# 获取指定日期的指数、股票数据max_retries = 3for attempt in range(max_retries):try:# 调用baostock接口获取股票列表stock_rs = bs.query_all_stock(date.strftime('%Y-%m-%d'))stock_df = stock_rs.get_data()# 检查API返回状态if stock_rs.error_code != '0':print(f"API调用错误: {stock_rs.error_code} - {stock_rs.error_msg}")if attempt < max_retries - 1:print(f"尝试重试 ({attempt+1}/{max_retries})...")time.sleep(2) # 等待2秒后重试continueelse:print("达到最大重试次数,退出")return []# 检查是否成功获取数据if stock_df.empty:print(f"未获取到 {date} 的股票列表数据")# 如果今天没数据,尝试昨天if date == today:print("尝试获取昨天的数据...")return update_stk_list(today - datetime.timedelta(days=1))else:print("请检查日期或网络连接")return []# 打印列名以确认实际列名print(f"获取到的股票列表列名: {list(stock_df.columns)}")# 保存完整股票列表stock_df.to_csv('./stk_data/all_list.csv', encoding = 'gbk', index = False)# 筛选股票代码范围,使用实际列名try:# 尝试使用stock_df['code']而不是stock_df.codestock_df.drop(stock_df[stock_df['code'] < 'sh.600000'].index, inplace = True)stock_df.drop(stock_df[stock_df['code'] > 'sz.399000'].index, inplace = True)stock_df = stock_df[['code']] # 确保只保留code列stock_df.to_csv('./stk_data/stk_list.csv', encoding = 'gbk', index = False)print(f"成功获取 {len(stock_df)} 支股票")return stock_df['code'].tolist()except KeyError as e:print(f"列名错误: {e}")print("请检查baostock返回的数据结构和列名")return []break # 如果成功,跳出重试循环except Exception as e:print(f"获取股票列表时发生异常: {e}")if attempt < max_retries - 1:print(f"尝试重试 ({attempt+1}/{max_retries})...")time.sleep(2) # 等待2秒后重试else:print("达到最大重试次数,退出")return []return []def load_stk_list():try:# 从CSV文件加载股票列表df = pd.read_csv('./stk_data/stk_list.csv')return df['code'].tolist()except FileNotFoundError:print("股票列表文件不存在,请先运行update_stk_list函数")return []def convert_time(t):# 将时间戳转换为可读格式H = t[8:10]M = t[10:12]S = t[12:14]return H + ':' + M + ':' + Sdef download_data(stk_list = [], fromdate = '1990-12-19', todate = datetime.date.today(), datas = 'date,open,high,low,close,volume,amount,turn,pctChg', frequency = 'd', adjustflag = '2'):# 确保stk_list不为空if not stk_list:print("股票列表为空,无法下载数据")return# 创建目录(如果不存在)import osos.makedirs(f'./stk_data/{frequency}', exist_ok=True)# 统计成功和失败的股票数量success_count = 0fail_count = 0for code in stk_list:print(f"Downloading ({success_count+fail_count+1}/{len(stk_list)}): {code}")try:# 调用baostock接口获取K线数据k_rs = bs.query_history_k_data_plus(code, datas, start_date = fromdate, end_date = todate.strftime('%Y-%m-%d'),frequency = frequency, adjustflag = adjustflag)datapath = f'./stk_data/{frequency}/{code}.csv'out_df = k_rs.get_data()# 检查API返回状态if k_rs.error_code != '0':print(f"API调用错误: {k_rs.error_code} - {k_rs.error_msg}")fail_count += 1continue# 检查是否成功获取数据if out_df.empty:print(f"未获取到{code}的数据")fail_count += 1continue# 如果需要,删除停牌数据(成交量为0)if DROP_SUSPENSION and 'volume' in list(out_df):out_df.drop(out_df[out_df.volume == '0'].index, inplace = True)# 做time转换if frequency in ['5', '15', '30', '60'] and 'time' in list(out_df):out_df['time'] = out_df['time'].apply(convert_time)# 保存数据到CSV文件out_df.to_csv(datapath, encoding = 'gbk', index = False)success_count += 1except Exception as e:print(f"下载{code}数据时出错: {e}")fail_count += 1print(f"数据下载完成: 成功 {success_count}/{len(stk_list)}, 失败 {fail_count}/{len(stk_list)}")if __name__ == '__main__':# 创建数据目录(如果不存在)import osos.makedirs('./stk_data', exist_ok=True)os.makedirs('./stk_data/d', exist_ok=True)# os.makedirs('./stk_data/w', exist_ok=True)# os.makedirs('./stk_data/m', exist_ok=True)# os.makedirs('./stk_data/5', exist_ok=True)# os.makedirs('./stk_data/15', exist_ok=True)# os.makedirs('./stk_data/30', exist_ok=True)# os.makedirs('./stk_data/60', exist_ok=True)# 登录baostock接口lg = bs.login()if lg.error_code != '0':print(f"登录失败: {lg.error_code} - {lg.error_msg}")exit(1)print("登录成功")try:# 首次运行#stk_list = update_stk_list()# 非首次运行stk_list = load_stk_list()if stk_list:# 下载日线download_data(stk_list)# # 下载周线# download_data(stk_list, frequency = 'w')# # 下载月线# download_data(stk_list, frequency = 'm')# # 下载5分钟线# download_data(stk_list, fromdate = '2020-6-1', frequency = '5', datas = 'date,time,open,high,low,close,volume,amount,adjustflag')# # 下载15分钟线# download_data(stk_list, fromdate = '2020-6-1', frequency = '15', datas = 'date,time,open,high,low,close,volume,amount,adjustflag')# # 下载30分钟线# download_data(stk_list, fromdate = '2020-6-1', frequency = '30', datas = 'date,time,open,high,low,close,volume,amount,adjustflag')# # 下载60分钟线# download_data(stk_list, fromdate = '2020-6-1', frequency = '60', datas = 'date,time,open,high,low,close,volume,amount,adjustflag')else:print("没有获取到股票列表,无法继续下载数据")finally:# 登出baostock接口bs.logout()print("登出成功")