在电商数据分析领域,商品评论数据蕴含着用户对产品的真实反馈,对商家优化产品、提升服务质量具有重要价值。本文将详细介绍如何接入淘宝 API,实现商品评论的实时采集,从环境搭建到代码实现进行全流程讲解。
1. 淘宝api概述
淘宝平台(TOP)为第三方开发者提供了一系列 RESTful API 接口,覆盖商品、交易、用户、营销等核心业务场景。接入淘宝 API 需要完成以下准备工作:
- 注册淘宝api账号并通过实名认证
- 创建应用并申请所需权限(如商品评论读取权限)
- 获取 ApiKey 和 ApiSecret 用于 API 调用
- 了解 API 接口文档和调用规范
2. 开发环境准备
技术栈选择:
- 编程语言:Python 3.8+
- HTTP 请求库:requests
- 数据处理:pandas, json
- 存储方案:MySQL
- 定时任务:APScheduler
依赖安装:
pip install requests pandas pymysql python-dotenv apscheduler
3. 认证与授权流程
淘宝 API 采用 OAuth2.0 授权码模式,需引导用户完成授权获取访问令牌:
import os
import requests
import json
import time
import hmac
import hashlib
from urllib.parse import urlencode
from dotenv import load_dotenv# 加载环境变量
load_dotenv()# 配置信息
APP_KEY = os.getenv("APP_KEY")
APP_SECRET = os.getenv("APP_SECRET")
REDIRECT_URI = os.getenv("REDIRECT_URI")
TOKEN_FILE = "taobao_token.json"def get_authorization_url():"""生成授权URL,引导用户授权"""params = {"client_id": APP_KEY,"redirect_uri": REDIRECT_URI,"response_type": "code","state": "init"}return f"https://oauth.taobao.com/authorize?{urlencode(params)}"def get_access_token(auth_code):"""通过授权码获取Access Token"""url = "https://oauth.taobao.com/token"payload = {"grant_type": "authorization_code","client_id": APP_KEY,"client_secret": APP_SECRET,"code": auth_code,"redirect_uri": REDIRECT_URI}response = requests.post(url, data=payload)token_data = response.json()# 保存Token信息到文件with open(TOKEN_FILE, "w") as f:json.dump(token_data, f)return token_datadef refresh_access_token(refresh_token):"""刷新Access Token"""url = "https://oauth.taobao.com/token"payload = {"grant_type": "refresh_token","client_id": APP_KEY,"client_secret": APP_SECRET,"refresh_token": refresh_token}response = requests.post(url, data=payload)token_data = response.json()# 更新Token文件with open(TOKEN_FILE, "w") as f:json.dump(token_data, f)return token_datadef get_current_token():"""获取当前有效的Access Token"""try:with open(TOKEN_FILE, "r") as f:token_data = json.load(f)# 检查Token是否过期if time.time() > token_data.get("expires_in", 0):return refresh_access_token(token_data["refresh_token"])return token_dataexcept (FileNotFoundError, json.JSONDecodeError):print("请先完成授权流程获取Access Token")return None
4. API 请求签名生成
淘宝 API 要求所有请求参数必须经过 MD5 签名:
def generate_sign(params, app_secret):"""生成API请求签名"""# 按参数名排序sorted_params = sorted(params.items(), key=lambda x: x[0])# 拼接参数名和值string_to_sign = app_secretfor key, value in sorted_params:string_to_sign += f"{key}{value}"string_to_sign += app_secret# MD5加密signature = hashlib.md5(string_to_sign.encode("utf-8")).hexdigest().upper()return signature
5. 商品评论 API 调用实现
def get_item_comments(item_id, page_no=1, page_size=20):"""获取商品评论列表"""token_data = get_current_token()if not token_data:return Noneaccess_token = token_data["access_token"]# API请求参数params = {"method": "taobao.taobaoke.items.comments.get", # 淘宝客商品评论获取接口"app_key": APP_KEY,"session": access_token,"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),"format": "json","v": "2.0","sign_method": "md5","fields": "comment_id,item_id,content,rate,created,useful","num_iid": item_id,"page_no": page_no,"page_size": page_size}# 生成签名params["sign"] = generate_sign(params, APP_SECRET)# 发送请求url = "https://eco.taobao.com/router/rest"response = requests.post(url, data=params)if response.status_code == 200:return response.json()else:print(f"请求失败: {response.status_code} - {response.text}")return None
6. 评论数据解析与存储
import pandas as pd
from datetime import datetimedef parse_comments(api_response):"""解析商品评论数据"""if not api_response:return []# 检查是否有错误if "error_response" in api_response:error = api_response["error_response"]print(f"API错误: {error.get('code')} - {error.get('msg')}")return []# 提取评论列表comments = api_response.get("taobaoke_items_comments_get_response", {}) \.get("comments", {}) \.get("n_tbk_item_comment", [])parsed_comments = []for comment in comments:parsed = {"comment_id": comment.get("comment_id"),"item_id": comment.get("item_id"),"content": comment.get("content"),"rate": int(comment.get("rate", 5)), # 评分,默认5分"created": comment.get("created"),"useful": int(comment.get("useful", 0)), # 有用数"crawl_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}parsed_comments.append(parsed)return parsed_commentsdef save_comments_to_mysql(comments):"""将评论数据存入MySQL"""import pymysqlfrom pymysql.cursors import DictCursorif not comments:return# 连接数据库conn = pymysql.connect(host=os.getenv("DB_HOST"),user=os.getenv("DB_USER"),password=os.getenv("DB_PASSWORD"),database=os.getenv("DB_NAME"),charset="utf8mb4",cursorclass=DictCursor)try:with conn.cursor() as cursor:# 插入评论数据insert_sql = """INSERT INTO taobao_comments (comment_id, item_id, content, rate, created, useful, crawl_time)VALUES (%s, %s, %s, %s, %s, %s, %s)ON DUPLICATE KEY UPDATE content=VALUES(content), rate=VALUES(rate), useful=VALUES(useful), crawl_time=VALUES(crawl_time)"""for comment in comments:cursor.execute(insert_sql, (comment["comment_id"],comment["item_id"],comment["content"],comment["rate"],comment["created"],comment["useful"],comment["crawl_time"]))# 提交事务conn.commit()print(f"成功存储 {len(comments)} 条评论")except Exception as e:print(f"存储失败: {str(e)}")conn.rollback()finally:conn.close()
7. 批量采集与分页处理
def batch_collect_comments(item_id, max_pages=10):"""批量采集商品评论(支持分页)"""all_comments = []for page in range(1, max_pages + 1):print(f"正在采集第 {page}/{max_pages} 页评论...")result = get_item_comments(item_id, page_no=page)if not result:print("获取评论失败,跳过当前页")continuecomments = parse_comments(result)if not comments:print("没有更多评论,结束采集")breakall_comments.extend(comments)# 检查是否有下一页if len(comments) < 20: # 每页默认20条,如果不足20条说明没有下一页print("已获取全部评论,结束采集")break# 控制采集频率,避免触发限流time.sleep(1)# 保存所有评论到数据库if all_comments:save_comments_to_mysql(all_comments)print(f"商品 {item_id} 评论采集完成,共获取 {len(all_comments)} 条评论")return all_comments
8. 定时任务配置
from apscheduler.schedulers.blocking import BlockingSchedulerdef scheduled_comment_collection():"""定时采集商品评论"""print(f"开始定时采集评论: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")# 从数据库获取需要监控的商品列表# 这里简化为固定商品ID列表items_to_monitor = ["687674347856", # 示例商品ID,替换为实际商品ID"687674347857","687674347858"]# 遍历商品列表,采集评论for item_id in items_to_monitor:print(f"开始采集商品 {item_id} 的评论...")batch_collect_comments(item_id, max_pages=5) # 每商品采集前5页评论time.sleep(5) # 商品间间隔,避免触发限流print(f"定时采集完成: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")if __name__ == "__main__":# 首次运行需要先获取授权# auth_url = get_authorization_url()# print(f"请访问以下URL授权: {auth_url}")# # 用户授权后,获取code并调用:# token_data = get_access_token("YOUR_AUTH_CODE_HERE")# 设置定时任务(每天凌晨2点执行)scheduler = BlockingScheduler()scheduler.add_job(scheduled_comment_collection, 'cron', hour='2')print("定时任务已启动,按Ctrl+C退出")scheduler.start()
9. 数据库设计建议
商品评论表(taobao_comments):
CREATE TABLE `taobao_comments` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`comment_id` varchar(50) NOT NULL COMMENT '评论ID',`item_id` varchar(50) NOT NULL COMMENT '商品ID',`content` text COMMENT '评论内容',`rate` tinyint(4) DEFAULT '5' COMMENT '评分(1-5)',`created` datetime DEFAULT NULL COMMENT '评论时间',`useful` int(11) DEFAULT '0' COMMENT '有用数',`crawl_time` datetime DEFAULT NULL COMMENT '采集时间',PRIMARY KEY (`id`),UNIQUE KEY `idx_comment_id` (`comment_id`),KEY `idx_item_id` (`item_id`),KEY `idx_created` (`created`),KEY `idx_crawl_time` (`crawl_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='淘宝商品评论表';
10. 常见问题与解决方案
-
Q:如何处理 API 限流?
- A:控制请求频率,添加适当的延时,实现指数退避重试机制。
-
Q:授权过期如何处理?
- A:使用 Refresh Token 刷新 Access Token,若 Refresh Token 也过期,则需重新引导用户授权。
-
Q:如何处理评论中的特殊字符?
- A:使用 UTF-8 编码存储数据,确保数据库表和字段使用 utf8mb4 字符集。
-
Q:如何提高采集效率?
- A:使用异步请求、多线程或分布式架构,并行采集多个商品的评论。
总结
通过本文的实战指南,你可以完整实现淘宝商品评论的实时采集系统,包括:
- OAuth2.0 认证授权流程
- API 请求签名生成
- 商品评论数据解析
- 分页采集与批量存储
- 定时任务配置
实际应用中,建议根据业务需求调整采集频率和数据字段,同时注意遵守淘宝 API 使用规范,避免因过度请求导致 IP 封禁。通过合理的架构设计,可以构建出稳定、高效的电商评论数据采集系统。