K线连续涨跌统计与分析工具
1. 概述
本工具是一个用于分析金融时间序列数据(特别是K线数据)的Python脚本,主要功能是统计连续n根同方向K线后,第n+1根K线的涨跌情况。该工具不仅提供统计分析功能,还支持图形化标记以验证结果,帮助交易者和量化分析师识别市场中的特定模式。
2. 功能需求
- 统计连续n根阳线或阴线后,第n+1根K线的涨跌情况
- 支持自定义n值(连续K线数量)和m值(统计次数)
- 提供图形化界面标记连续模式出现的位置
- 输出详细的统计报告
- 支持多种数据源输入(CSV、数据库、在线API等)
3. 实现代码
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Rectangle
from typing import List, Dict, Tuple, Optional
from enum import Enum
import argparse
import os
import sys
from datetime import datetimeclass CandleDirection(Enum):"""K线方向枚举"""BULLISH = 1 # 阳线BEARISH = -1 # 阴线NEUTRAL = 0 # 平盘class CandlePatternAnalyzer:"""K线模式分析器"""def __init__(self, data: pd.DataFrame, open_col: str = 'open',high_col: str = 'high',low_col: str = 'low',close_col: str = 'close',date_col: str = 'date'):"""初始化分析器参数:data: 包含K线数据的DataFrameopen_col: 开盘价列名high_col: 最高价列名low_col: 最低价列名close_col: 收盘价列名date_col: 日期列名"""self.data = data.copy()self.open_col = open_colself.high_col = high_colself.low_col = low_colself.close_col = close_colself.date_col = date_col# 预处理数据self._preprocess_data()def _preprocess_data(self):"""预处理数据,计算K线方向"""# 确保日期是datetime类型并设置为索引if not pd.api.types.is_datetime64_any_dtype(self.data[self.date_col]):self.data[self.date_col] = pd.to_datetime(self.data[self.date_col])self.data.set_index(self.date_col, inplace=True)# 计算K线方向self.data['direction'] = np.where(self.data[self.close_col] > self.data[self.open_col],CandleDirection.BULLISH.value,np.where(self.data[self.close_col] < self.data[self.open_col],CandleDirection.BEARISH.value,CandleDirection.NEUTRAL.value))# 计算涨跌幅 (百分比)self.data['pct_change'] = self.data[self.close_col].pct_change() * 100def _find_consecutive_directions(self, n: int) -> List[Tuple[int, int, CandleDirection]]:"""查找连续n根同方向K线的起始和结束位置参数:n: 连续K线数量返回:列表,每个元素是元组(start_index, end_index, direction)"""directions = self.data['direction'].valuessequences = []current_dir = Nonestart_idx = 0count = 0for i, dir_val in enumerate(directions):if dir_val == current_dir and dir_val != CandleDirection.NEUTRAL.value:count += 1else:if count >= n:sequences.append((start_idx, i-1, CandleDirection(current_dir)))current_dir = dir_val if dir_val != CandleDirection.NEUTRAL.value else Nonestart_idx = icount = 1 if current_dir is not None else 0# 检查最后一段序列if count >= n:sequences.append((start_idx, len(directions)-1, CandleDirection(current_dir)))return sequencesdef analyze_consecutive_patterns(self, n: int, m: Optional[int] = None) -> Dict:"""分析连续n根同方向K线后第n+1根K线的表现参数:n: 连续K线数量m: 可选,只分析前m次出现的情况返回:包含分析结果的字典"""sequences = self._find_consecutive_directions(n)if m is not None:sequences = sequences[:m]results = {'total_bullish_sequences': 0,'total_bearish_sequences': 0,'bullish_sequences_next_up': 0,'bullish_sequences_next_down': 0,'bullish_sequences_next_neutral': 0,'bearish_sequences_next_up': 0,'bearish_sequences_next_down': 0,'bearish_sequences_next_neutral': 0,'sequences_details': [],'n': n,