Mô hình định lượng

1. Tìm các điểm Pivot Point và Price Channel

Các điểm Pivot Point là mức giá quan trọng được tính toán dựa trên giá cao nhất (high), giá thấp nhất (low) và giá đóng cửa (close) của phiên giao dịch trước đó. Nó được sử dụng để xác định các mức hỗ trợ và kháng cự tiềm năng. Stock channel hay kênh giá là một phạm vi giá mà cổ phiếu dao động trong một khoảng thời gian nhất định. Kênh giá giúp xác định xu hướng hiện tại của thị trường. Có ba loại kênh giá phổ biến:

  1. Kênh giá tăng (Ascending Channel) – Giá dao động trong một xu hướng tăng với mức đáy và đỉnh cao dần.

  2. Kênh giá giảm (Descending Channel) – Giá dao động trong xu hướng giảm với mức đáy và đỉnh thấp dần.

  3. Kênh giá ngang (Sideways Channel) – Giá dao động trong một phạm vi hẹp, không có xu hướng rõ ràng.

Kênh giá được xác định bằng hai đường trendline:

  • Trendline trên kết nối các đỉnh

  • Trendline dưới kết nối các đáy

Khi giá chạm vào đường trendline trên hoặc dưới, nó có thể phản ứng theo hai cách:

  • Bật ngược lại nếu kênh giá tiếp tục giữ.

  • Phá vỡ (Breakout) nếu xu hướng thay đổi, có thể dẫn đến sự hình thành một kênh giá mới.

Stock channels thường được sử dụng để tìm điểm vào lệnh mua ở vùng hỗ trợ và bán ở vùng kháng cự.

Pivot Point (điểm màu tím) trong dữ liệu lích sử giá của VCB

Lấy Historical data từ thư viện Fiinquant

import pandas as pd
import numpy as np
import plotly.graph_objects as go

from scipy import stats
from FiinQuantX import FiinSession

username = 'REPLACE_WITH_YOUR_USER_NAME'
password = 'REPLACE_WITH_YOUR_PASS_WORD'

client = FiinSession(
    username=username,
    password=password
).login()

data = client.Fetch_Trading_Data(
    tickers='VCB',
    fields=['open', 'high', 'low', 'close', 'volume'],
    adjusted=True,
    period=1000,
    realtime=False,
    by='1d',
).get_data()

TÌm Pivot Point và plot giá cổ phiếu

# Function to identify pivot points in price data
# Returns: 0 for no pivot, 1 for pivot high, 2 for pivot low, 3 for both
# Parameters:
#   candle: index of the current candle being checked
#   window: number of candles to check on either side
def isPivot(candle, window):
    if candle-window < 0 or candle+window >= len(data):
        return 0
    
    pivotHigh = 1
    pivotLow = 2
    for i in range(candle-window, candle+window+1):
        if data.iloc[candle].low > data.iloc[i].low:
            pivotLow=0
        if data.iloc[candle].high < data.iloc[i].high:
            pivotHigh=0
    if (pivotHigh and pivotLow):
        return 3
    elif pivotHigh:
        return pivotHigh
    elif pivotLow:
        return pivotLow
    else:
        return 0

# Apply isPivot function to each row of data
window=10
data['isPivot'] = data.apply(lambda x: isPivot(x.name,window), axis=1)

# Function to determine plotting position for pivot points
# Returns slightly offset values for visual clarity on the chart
# Parameters:
#   x: row of dataframe containing price and pivot information
def pointpos(x):
    if x['isPivot']==2:
        return x['low']-1e-3  # Offset below pivot low
    elif x['isPivot']==1:
        return x['high']+1e-3 # Offset above pivot high
    else:
        return np.nan

# Apply pointpos function to create plotting positions
data['pointpos'] = data.apply(lambda row: pointpos(row), axis=1)


# Create candlestick chart
dfpl = data.copy()
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['open'],
                high=dfpl['high'],
                low=dfpl['low'],
                close=dfpl['close'])])

fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
                marker=dict(size=5, color="MediumPurple"),
                name="pivot")
                
fig.update_layout(xaxis_rangeslider_visible=False)

fig.show()

Tìm Price Channel dựa trên Pivot Point

Một đoạn Sideway Channel
# Function to identify and calculate channel slopes using pivot points
# Parameters:
#   candle: Current candle position to analyze from
#   backcandles: Number of previous candles to look back
#   window: Window size for pivot point calculation
def collect_channel(candle, backcandles, window):
    # Get subset of data for analysis
    localdf = data[candle-backcandles-window:candle-window]
    localdf['isPivot'] = localdf.apply(lambda x: isPivot(x.name,window), axis=1)
    
    # Extract pivot highs and lows with their indices
    highs = localdf[localdf['isPivot']==1].high.values
    idxhighs = localdf[localdf['isPivot']==1].high.index
    lows = localdf[localdf['isPivot']==2].low.values
    idxlows = localdf[localdf['isPivot']==2].low.index
    
    # Calculate regression lines if enough pivot points exist
    if len(lows)>=2 and len(highs)>=2:
        # Calculate regression for lower channel (pivot lows)
        sl_lows, interc_lows, r_value_l, _, _ = stats.linregress(idxlows,lows)
        # Calculate regression for upper channel (pivot highs)
        sl_highs, interc_highs, r_value_h, _, _ = stats.linregress(idxhighs,highs)
    
        return(sl_lows, interc_lows, sl_highs, interc_highs, r_value_l**2, r_value_h**2)
    else:
        return(0,0,0,0,0,0)

# Set parameters for channel analysis
candle = 200          # Current candle position
backcandles = 100     # Number of candles to look back
window = 3            # Window size for pivot calculation

# Create candlestick chart
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['open'],
                high=dfpl['high'],
                low=dfpl['low'],
                close=dfpl['close'])])

# Add pivot points to chart
fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
                marker=dict(size=5, color="MediumPurple"),
                name="pivot")

# Calculate channel slopes and angles
sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, backcandles, window)
print(sl_lows,sl_highs)
angle_lows = np.degrees(np.arctan(sl_lows))
angle_highs = np.degrees(np.arctan(sl_highs))

# Plot channel lines
x = np.array(range(candle-backcandles-window, candle+1))
fig.add_trace(go.Scatter(x=x, y=sl_lows*x + interc_lows, mode='lines', name='lower slope'))
fig.add_trace(go.Scatter(x=x, y=sl_highs*x + interc_highs, mode='lines', name='max slope'))
fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

2. Thống kê các giai đoạn tăng giá của VN-Index

Thông số có thể thay đổi:

Nếu đổi percent từ 5.0 lên 10.0 sẽ lọc những giai đoạn thị trường chỉnh nhiều hơn

df["zigzag"] = zigzag_percent(df["high"], df["low"], df["close"], percent=5.0)

Fulll Code

# Thống kê các giai đoạn giá tăng VNINDEX bằng ZigZag tự code (pivot chỉ tại điểm đảo chiều)
from FiinQuantX import FiinSession
import pandas as pd

# 1) Đăng nhập
client = FiinSession(username="YOUR_USER", password="YOUR_PASS").login()

# 2) Lấy dữ liệu VNINDEX (giá từ FiinQuant)
event = client.Fetch_Trading_Data(
    realtime=False,
    tickers=["VNINDEX"],
    fields=["high", "low", "close"],
    adjusted=True,
    by="1d",
    period=1500  # ~6 năm, tùy chỉnh
)
df = event.get_data().reset_index(drop=True)

# 3) ZigZag theo % đảo chiều: chỉ gắn pivot khi đảo chiều >= threshold
def zigzag_percent(high: pd.Series, low: pd.Series, close: pd.Series, percent: float = 5.0) -> pd.Series:
    thr = percent / 100.0
    n = len(close)
    pivots = [None] * n

    # Trạng thái
    trend = 0          # 0: chưa xác định, 1: up, -1: down
    start_idx = 0
    start_px = close.iloc[0]
    extreme_idx = 0    # chỉ số của đỉnh/đáy cực trị trong xu hướng hiện tại
    extreme_px = close.iloc[0]

    for i in range(1, n):
        px = close.iloc[i]

        if trend == 0:
            # Chờ giá đi đủ biên để xác nhận hướng ban đầu
            if px >= start_px * (1 + thr):
                trend = 1
                extreme_idx, extreme_px = i, px
            elif px <= start_px * (1 - thr):
                trend = -1
                extreme_idx, extreme_px = i, px
            else:
                # chưa đủ biên, tiếp tục theo dõi, cập nhật cực trị tạm
                if px > extreme_px:
                    extreme_idx, extreme_px = i, px
                if px < extreme_px:
                    extreme_idx, extreme_px = i, px
            continue

        if trend == 1:
            # Đang uptrend: cập nhật đỉnh cực trị
            if px > extreme_px:
                extreme_idx, extreme_px = i, px
            # Đảo chiều đủ biên → chốt pivot tại đỉnh cực trị trước đó
            elif px <= extreme_px * (1 - thr):
                pivots[extreme_idx] = extreme_px  # pivot tại reversal
                trend = -1
                extreme_idx, extreme_px = i, px   # bắt đầu theo dõi đáy mới
        else:  # trend == -1
            # Đang downtrend: cập nhật đáy cực trị
            if px < extreme_px:
                extreme_idx, extreme_px = i, px
            # Đảo chiều đủ biên → chốt pivot tại đáy cực trị trước đó
            elif px >= extreme_px * (1 + thr):
                pivots[extreme_idx] = extreme_px
                trend = 1
                extreme_idx, extreme_px = i, px

    # Chốt pivot cuối cùng tại cực trị hiện tại (tùy chọn)
    pivots[extreme_idx] = extreme_px
    return pd.Series(pivots, index=close.index, name="zigzag")

# 4) Áp dụng ZigZag và tạo bảng các đoạn TĂNG
df["zigzag"] = zigzag_percent(df["high"], df["low"], df["close"], percent=5.0)

turn_points = df.dropna(subset=["zigzag"]).reset_index()  # chỉ còn các pivot tại điểm đảo chiều
uptrends = []

for i in range(1, len(turn_points)):
    i0 = turn_points.loc[i-1, "index"]
    i1 = turn_points.loc[i, "index"]
    p0 = turn_points.loc[i-1, "zigzag"]
    p1 = turn_points.loc[i, "zigzag"]

    if p1 > p0:  # đoạn tăng là từ pivot đáy → pivot đỉnh tiếp theo
        uptrends.append({
            "StartIndex": i0,
            "EndIndex": i1,
            "StartDate": df.loc[i0, "timestamp"],
            "EndDate": df.loc[i1, "timestamp"],
            "LengthInBars": i1 - i0,
            "StartPrice": float(p0),
            "EndPrice": float(p1),
            "PctChange": (p1 - p0) / p0 * 100.0
        })

uptrend_df = pd.DataFrame(uptrends)
# Ví dụ lọc các đợt tăng đủ dài và mạnh
# uptrend_df = uptrend_df[(uptrend_df["LengthInBars"] >= 20) & (uptrend_df["PctChange"] >= 10)]

print(uptrend_df.sort_values("StartIndex").reset_index(drop=True))

3. Thống kê diễn biến các phiên trước nghỉ lễ

from FiinQuantX import FiinSession
import pandas as pd
from datetime import datetime, timedelta

client = FiinSession(username= 'USERNAME', password='PASSWORD').login()
# Hàm truy xuất 10 phiên trước 2/9 của mỗi năm
def get_last_sessions_before_sep2(year, n=10):
    to_date = f"{year}-09-01"
    from_date = (datetime.strptime(to_date, "%Y-%m-%d") - timedelta(days=30)).strftime("%Y-%m-%d")

    event = client.Fetch_Trading_Data(
        realtime=False,
        tickers=["VNINDEX"],
        fields=["close"],
        adjusted=True,
        by="1d",
        from_date=from_date,
        to_date=to_date
    )
    df = event.get_data().sort_values("timestamp").reset_index(drop=True)
    df = df.tail(n).copy()
    df["return"] = df["close"].pct_change() * 100  # Tính % thay đổi
    df["label"] = [f"T-{i}" for i in reversed(range(1, n + 1))]
    df["year"] = year
    return df[["year", "label", "timestamp", "close", "return"]]

# Gom dữ liệu 10 năm gần nhất
all_data = []
for y in range(datetime.today().year - 10, datetime.today().year):
    df = get_last_sessions_before_sep2(y)
    all_data.append(df)

# Kết quả
final_df = pd.concat(all_data).reset_index(drop=True)

# Pivot để tạo bảng
pivot_table = final_df.pivot(index="year", columns="label", values="close")
returns_table = final_df.pivot(index="year", columns="label", values="return")

print("==== Giá đóng cửa (Close) ====")
print(pivot_table.round(2))

print("\n==== Tỷ suất sinh lời hàng ngày (%) ====")
print(returns_table.round(2))

4. Thống kê số lượng mã cổ phiếu cắt lên và cắt xuống đường MA20

from FiinQuantX import FiinSession
import pandas as pd

# Bước 1: Đăng nhập FiinQuant

client = FiinSession(username='username', password='password').login()

# Bước 2: Lấy danh sách cổ phiếu cần lọc (VD: VN30)
tickers = client.TickerList(ticker="VNIndex")

# Bước 3: Lấy dữ liệu lịch sử
event = client.Fetch_Trading_Data(
    realtime=False,
    tickers=tickers,
    fields=["close", "volume"],
    adjusted=True,
    by="1d",
    period=21
)
data = event.get_data()

# Bước 4: Phân tích SMA20 và tín hiệu cắt lên/xuống
fi = client.FiinIndicator()
cut_down = []
cut_up = []

for ticker in tickers:
    df = data[data['ticker'] == ticker].copy()
    df.sort_values("timestamp", inplace=True)
    df['sma_20'] = fi.sma(df['close'], window=20)

    if len(df) < 21:
        continue

    prev_close = df['close'].iloc[-2]
    curr_close = df['close'].iloc[-1]
    prev_sma = df['sma_20'].iloc[-2]
    curr_sma = df['sma_20'].iloc[-1]
    volume = df['volume'].iloc[-1]

    if prev_close > prev_sma and curr_close < curr_sma:
        cut_down.append((ticker, volume))
    elif prev_close < prev_sma and curr_close > curr_sma:
        cut_up.append((ticker, volume))

# Bước 5: Sắp xếp theo volume giảm dần
cut_down_sorted = sorted(cut_down, key=lambda x: x[1], reverse=True)
cut_up_sorted = sorted(cut_up, key=lambda x: x[1], reverse=True)

# Bước 6: In kết quả
print("=== CỔ PHIẾU CẮT XUỐNG SMA20 HÔM NAY ===")
print(f"Tổng cộng: {len(cut_down_sorted)} mã")
for ticker, vol in cut_down_sorted:
    print(f"{ticker}: {vol:,}")

print("\n=== CỔ PHIẾU CẮT LÊN SMA20 HÔM NAY ===")
print(f"Tổng cộng: {len(cut_up_sorted)} mã")
for ticker, vol in cut_up_sorted:
    print(f"{ticker}: {vol:,}")

Last updated