Mô hình định lượng
1. Tìm các điểm Pivot Point và Price Channel

Lấy Historical data từ thư viện Fiinquant
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from scipy import stats
from FiinQuantX import FiinSession
username = 'REPLACE_WITH_YOUR_USER_NAME'
password = 'REPLACE_WITH_YOUR_PASS_WORD'
client = FiinSession(
username=username,
password=password
).login()
data = client.Fetch_Trading_Data(
tickers='VCB',
fields=['open', 'high', 'low', 'close', 'volume'],
adjusted=True,
period=1000,
realtime=False,
by='1d',
).get_data()
TÌm Pivot Point và plot giá cổ phiếu
# Function to identify pivot points in price data
# Returns: 0 for no pivot, 1 for pivot high, 2 for pivot low, 3 for both
# Parameters:
# candle: index of the current candle being checked
# window: number of candles to check on either side
def isPivot(candle, window):
if candle-window < 0 or candle+window >= len(data):
return 0
pivotHigh = 1
pivotLow = 2
for i in range(candle-window, candle+window+1):
if data.iloc[candle].low > data.iloc[i].low:
pivotLow=0
if data.iloc[candle].high < data.iloc[i].high:
pivotHigh=0
if (pivotHigh and pivotLow):
return 3
elif pivotHigh:
return pivotHigh
elif pivotLow:
return pivotLow
else:
return 0
# Apply isPivot function to each row of data
window=10
data['isPivot'] = data.apply(lambda x: isPivot(x.name,window), axis=1)
# Function to determine plotting position for pivot points
# Returns slightly offset values for visual clarity on the chart
# Parameters:
# x: row of dataframe containing price and pivot information
def pointpos(x):
if x['isPivot']==2:
return x['low']-1e-3 # Offset below pivot low
elif x['isPivot']==1:
return x['high']+1e-3 # Offset above pivot high
else:
return np.nan
# Apply pointpos function to create plotting positions
data['pointpos'] = data.apply(lambda row: pointpos(row), axis=1)
# Create candlestick chart
dfpl = data.copy()
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
open=dfpl['open'],
high=dfpl['high'],
low=dfpl['low'],
close=dfpl['close'])])
fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
marker=dict(size=5, color="MediumPurple"),
name="pivot")
fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()
Tìm Price Channel dựa trên Pivot Point

# Function to identify and calculate channel slopes using pivot points
# Parameters:
# candle: Current candle position to analyze from
# backcandles: Number of previous candles to look back
# window: Window size for pivot point calculation
def collect_channel(candle, backcandles, window):
# Get subset of data for analysis
localdf = data[candle-backcandles-window:candle-window]
localdf['isPivot'] = localdf.apply(lambda x: isPivot(x.name,window), axis=1)
# Extract pivot highs and lows with their indices
highs = localdf[localdf['isPivot']==1].high.values
idxhighs = localdf[localdf['isPivot']==1].high.index
lows = localdf[localdf['isPivot']==2].low.values
idxlows = localdf[localdf['isPivot']==2].low.index
# Calculate regression lines if enough pivot points exist
if len(lows)>=2 and len(highs)>=2:
# Calculate regression for lower channel (pivot lows)
sl_lows, interc_lows, r_value_l, _, _ = stats.linregress(idxlows,lows)
# Calculate regression for upper channel (pivot highs)
sl_highs, interc_highs, r_value_h, _, _ = stats.linregress(idxhighs,highs)
return(sl_lows, interc_lows, sl_highs, interc_highs, r_value_l**2, r_value_h**2)
else:
return(0,0,0,0,0,0)
# Set parameters for channel analysis
candle = 200 # Current candle position
backcandles = 100 # Number of candles to look back
window = 3 # Window size for pivot calculation
# Create candlestick chart
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
open=dfpl['open'],
high=dfpl['high'],
low=dfpl['low'],
close=dfpl['close'])])
# Add pivot points to chart
fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
marker=dict(size=5, color="MediumPurple"),
name="pivot")
# Calculate channel slopes and angles
sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, backcandles, window)
print(sl_lows,sl_highs)
angle_lows = np.degrees(np.arctan(sl_lows))
angle_highs = np.degrees(np.arctan(sl_highs))
# Plot channel lines
x = np.array(range(candle-backcandles-window, candle+1))
fig.add_trace(go.Scatter(x=x, y=sl_lows*x + interc_lows, mode='lines', name='lower slope'))
fig.add_trace(go.Scatter(x=x, y=sl_highs*x + interc_highs, mode='lines', name='max slope'))
fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()
2. Thống kê các giai đoạn tăng giá của VN-Index
Thông số có thể thay đổi:
Nếu đổi percent từ 5.0 lên 10.0 sẽ lọc những giai đoạn thị trường chỉnh nhiều hơn
df["zigzag"] = zigzag_percent(df["high"], df["low"], df["close"], percent=5.0)
Fulll Code
# Thống kê các giai đoạn giá tăng VNINDEX bằng ZigZag tự code (pivot chỉ tại điểm đảo chiều)
from FiinQuantX import FiinSession
import pandas as pd
# 1) Đăng nhập
client = FiinSession(username="YOUR_USER", password="YOUR_PASS").login()
# 2) Lấy dữ liệu VNINDEX (giá từ FiinQuant)
event = client.Fetch_Trading_Data(
realtime=False,
tickers=["VNINDEX"],
fields=["high", "low", "close"],
adjusted=True,
by="1d",
period=1500 # ~6 năm, tùy chỉnh
)
df = event.get_data().reset_index(drop=True)
# 3) ZigZag theo % đảo chiều: chỉ gắn pivot khi đảo chiều >= threshold
def zigzag_percent(high: pd.Series, low: pd.Series, close: pd.Series, percent: float = 5.0) -> pd.Series:
thr = percent / 100.0
n = len(close)
pivots = [None] * n
# Trạng thái
trend = 0 # 0: chưa xác định, 1: up, -1: down
start_idx = 0
start_px = close.iloc[0]
extreme_idx = 0 # chỉ số của đỉnh/đáy cực trị trong xu hướng hiện tại
extreme_px = close.iloc[0]
for i in range(1, n):
px = close.iloc[i]
if trend == 0:
# Chờ giá đi đủ biên để xác nhận hướng ban đầu
if px >= start_px * (1 + thr):
trend = 1
extreme_idx, extreme_px = i, px
elif px <= start_px * (1 - thr):
trend = -1
extreme_idx, extreme_px = i, px
else:
# chưa đủ biên, tiếp tục theo dõi, cập nhật cực trị tạm
if px > extreme_px:
extreme_idx, extreme_px = i, px
if px < extreme_px:
extreme_idx, extreme_px = i, px
continue
if trend == 1:
# Đang uptrend: cập nhật đỉnh cực trị
if px > extreme_px:
extreme_idx, extreme_px = i, px
# Đảo chiều đủ biên → chốt pivot tại đỉnh cực trị trước đó
elif px <= extreme_px * (1 - thr):
pivots[extreme_idx] = extreme_px # pivot tại reversal
trend = -1
extreme_idx, extreme_px = i, px # bắt đầu theo dõi đáy mới
else: # trend == -1
# Đang downtrend: cập nhật đáy cực trị
if px < extreme_px:
extreme_idx, extreme_px = i, px
# Đảo chiều đủ biên → chốt pivot tại đáy cực trị trước đó
elif px >= extreme_px * (1 + thr):
pivots[extreme_idx] = extreme_px
trend = 1
extreme_idx, extreme_px = i, px
# Chốt pivot cuối cùng tại cực trị hiện tại (tùy chọn)
pivots[extreme_idx] = extreme_px
return pd.Series(pivots, index=close.index, name="zigzag")
# 4) Áp dụng ZigZag và tạo bảng các đoạn TĂNG
df["zigzag"] = zigzag_percent(df["high"], df["low"], df["close"], percent=5.0)
turn_points = df.dropna(subset=["zigzag"]).reset_index() # chỉ còn các pivot tại điểm đảo chiều
uptrends = []
for i in range(1, len(turn_points)):
i0 = turn_points.loc[i-1, "index"]
i1 = turn_points.loc[i, "index"]
p0 = turn_points.loc[i-1, "zigzag"]
p1 = turn_points.loc[i, "zigzag"]
if p1 > p0: # đoạn tăng là từ pivot đáy → pivot đỉnh tiếp theo
uptrends.append({
"StartIndex": i0,
"EndIndex": i1,
"StartDate": df.loc[i0, "timestamp"],
"EndDate": df.loc[i1, "timestamp"],
"LengthInBars": i1 - i0,
"StartPrice": float(p0),
"EndPrice": float(p1),
"PctChange": (p1 - p0) / p0 * 100.0
})
uptrend_df = pd.DataFrame(uptrends)
# Ví dụ lọc các đợt tăng đủ dài và mạnh
# uptrend_df = uptrend_df[(uptrend_df["LengthInBars"] >= 20) & (uptrend_df["PctChange"] >= 10)]
print(uptrend_df.sort_values("StartIndex").reset_index(drop=True))
3. Thống kê diễn biến các phiên trước nghỉ lễ
from FiinQuantX import FiinSession
import pandas as pd
from datetime import datetime, timedelta
client = FiinSession(username= 'USERNAME', password='PASSWORD').login()
# Hàm truy xuất 10 phiên trước 2/9 của mỗi năm
def get_last_sessions_before_sep2(year, n=10):
to_date = f"{year}-09-01"
from_date = (datetime.strptime(to_date, "%Y-%m-%d") - timedelta(days=30)).strftime("%Y-%m-%d")
event = client.Fetch_Trading_Data(
realtime=False,
tickers=["VNINDEX"],
fields=["close"],
adjusted=True,
by="1d",
from_date=from_date,
to_date=to_date
)
df = event.get_data().sort_values("timestamp").reset_index(drop=True)
df = df.tail(n).copy()
df["return"] = df["close"].pct_change() * 100 # Tính % thay đổi
df["label"] = [f"T-{i}" for i in reversed(range(1, n + 1))]
df["year"] = year
return df[["year", "label", "timestamp", "close", "return"]]
# Gom dữ liệu 10 năm gần nhất
all_data = []
for y in range(datetime.today().year - 10, datetime.today().year):
df = get_last_sessions_before_sep2(y)
all_data.append(df)
# Kết quả
final_df = pd.concat(all_data).reset_index(drop=True)
# Pivot để tạo bảng
pivot_table = final_df.pivot(index="year", columns="label", values="close")
returns_table = final_df.pivot(index="year", columns="label", values="return")
print("==== Giá đóng cửa (Close) ====")
print(pivot_table.round(2))
print("\n==== Tỷ suất sinh lời hàng ngày (%) ====")
print(returns_table.round(2))
4. Thống kê số lượng mã cổ phiếu cắt lên và cắt xuống đường MA20
from FiinQuantX import FiinSession
import pandas as pd
# Bước 1: Đăng nhập FiinQuant
client = FiinSession(username='username', password='password').login()
# Bước 2: Lấy danh sách cổ phiếu cần lọc (VD: VN30)
tickers = client.TickerList(ticker="VNIndex")
# Bước 3: Lấy dữ liệu lịch sử
event = client.Fetch_Trading_Data(
realtime=False,
tickers=tickers,
fields=["close", "volume"],
adjusted=True,
by="1d",
period=21
)
data = event.get_data()
# Bước 4: Phân tích SMA20 và tín hiệu cắt lên/xuống
fi = client.FiinIndicator()
cut_down = []
cut_up = []
for ticker in tickers:
df = data[data['ticker'] == ticker].copy()
df.sort_values("timestamp", inplace=True)
df['sma_20'] = fi.sma(df['close'], window=20)
if len(df) < 21:
continue
prev_close = df['close'].iloc[-2]
curr_close = df['close'].iloc[-1]
prev_sma = df['sma_20'].iloc[-2]
curr_sma = df['sma_20'].iloc[-1]
volume = df['volume'].iloc[-1]
if prev_close > prev_sma and curr_close < curr_sma:
cut_down.append((ticker, volume))
elif prev_close < prev_sma and curr_close > curr_sma:
cut_up.append((ticker, volume))
# Bước 5: Sắp xếp theo volume giảm dần
cut_down_sorted = sorted(cut_down, key=lambda x: x[1], reverse=True)
cut_up_sorted = sorted(cut_up, key=lambda x: x[1], reverse=True)
# Bước 6: In kết quả
print("=== CỔ PHIẾU CẮT XUỐNG SMA20 HÔM NAY ===")
print(f"Tổng cộng: {len(cut_down_sorted)} mã")
for ticker, vol in cut_down_sorted:
print(f"{ticker}: {vol:,}")
print("\n=== CỔ PHIẾU CẮT LÊN SMA20 HÔM NAY ===")
print(f"Tổng cộng: {len(cut_up_sorted)} mã")
for ticker, vol in cut_up_sorted:
print(f"{ticker}: {vol:,}")
Last updated