Skip to content

multivariate_hmm

multivariate_hmm

Position-aware multivariate-emission HMM for player form inference.

Uses position-specific feature vectors extracted from the full vaastav dataset:

GK: [saves/90, xGC/90, clean_sheet, bonus, mins_frac] DEF: [xG, xA, xGC/90, clean_sheet, influence/100, bonus, mins_frac] MID: [xG, xA, creativity/100, threat/100, bonus, mins_frac] FWD: [xG, xA, threat/100, bonus, mins_frac]

Each state emits a multivariate Gaussian with diagonal covariance. Baum-Welch learns per-player emission parameters from their history.

The minutes_fraction feature (0 or ~1) lets the HMM identify the Injured state from the feature vector alone, without NLP news signals.

MultivariateHMM

MultivariateHMM(
    position: str = "MID",
    transition_matrix: Optional[ndarray] = None,
    initial_dist: Optional[ndarray] = None,
)

Position-aware HMM with multivariate diagonal Gaussian emissions.

PARAMETER DESCRIPTION
position

GK, DEF, MID, FWD. Determines feature set and default emissions.

TYPE: str DEFAULT: 'MID'

Source code in fplx/inference/multivariate_hmm.py
def __init__(
    self,
    position: str = "MID",
    transition_matrix: Optional[np.ndarray] = None,
    initial_dist: Optional[np.ndarray] = None,
):
    self.position = position
    self.means, self.vars = _default_emissions(position)

    # Priors for MAP-style regularization in Baum-Welch.
    self.prior_means = self.means.copy()
    self.prior_vars = self.vars.copy()
    self.prior_A = (
        transition_matrix.copy() if transition_matrix is not None else DEFAULT_TRANSITION.copy()
    )

    self.A = self.prior_A.copy()
    self.pi = initial_dist.copy() if initial_dist is not None else DEFAULT_INITIAL.copy()
    self.n_states = N_STATES
    self.n_features = self.means.shape[1]
    self._transition_overrides: dict[int, np.ndarray] = {}

inject_news_perturbation

inject_news_perturbation(
    timestep: int,
    state_boost: dict,
    confidence: float = 1.0,
)

Perturb transition matrix at timestep (same API as scalar HMM).

Source code in fplx/inference/multivariate_hmm.py
def inject_news_perturbation(self, timestep: int, state_boost: dict, confidence: float = 1.0):
    """Perturb transition matrix at timestep (same API as scalar HMM)."""
    A_p = self.A.copy()
    for src in range(self.n_states):
        for tgt, boost in state_boost.items():
            A_p[src, tgt] *= 1.0 + confidence * (boost - 1.0)
        s = A_p[src].sum()
        if s > 0:
            A_p[src] /= s
    self._transition_overrides[timestep] = A_p

forward

forward(observations: ndarray)

Forward algorithm. observations: (T, D).

Source code in fplx/inference/multivariate_hmm.py
def forward(self, observations: np.ndarray):
    """Forward algorithm. observations: (T, D)."""
    T = len(observations)
    alpha = np.zeros((T, self.n_states))
    scale = np.zeros(T)
    b = self._emission_prob_vector(observations[0])
    alpha[0] = self.pi * b
    scale[0] = alpha[0].sum()
    if scale[0] > 0:
        alpha[0] /= scale[0]
    for t in range(1, T):
        b = self._emission_prob_vector(observations[t])
        alpha[t] = (alpha[t - 1] @ self._get_A(t)) * b
        scale[t] = alpha[t].sum()
        if scale[t] > 0:
            alpha[t] /= scale[t]
    return alpha, scale

forward_backward

forward_backward(observations: ndarray) -> ndarray

Smoothed posteriors P(S_t | y_{1:T}).

Source code in fplx/inference/multivariate_hmm.py
def forward_backward(self, observations: np.ndarray) -> np.ndarray:
    """Smoothed posteriors P(S_t | y_{1:T})."""
    T = len(observations)
    alpha, scale = self.forward(observations)
    beta = np.zeros((T, self.n_states))
    beta[T - 1] = 1.0
    for t in range(T - 2, -1, -1):
        b_next = self._emission_prob_vector(observations[t + 1])
        beta[t] = self._get_A(t + 1) @ (b_next * beta[t + 1])
        if scale[t + 1] > 0:
            beta[t] /= scale[t + 1]
    gamma = alpha * beta
    rs = gamma.sum(axis=1, keepdims=True)
    rs[rs == 0] = 1.0
    return gamma / rs

viterbi

viterbi(observations: ndarray) -> ndarray

Most likely state sequence.

Source code in fplx/inference/multivariate_hmm.py
def viterbi(self, observations: np.ndarray) -> np.ndarray:
    """Most likely state sequence."""
    T = len(observations)
    log_d = np.zeros((T, self.n_states))
    psi = np.zeros((T, self.n_states), dtype=int)
    log_d[0] = np.log(self.pi + 1e-300) + np.array([
        self._emission_log_prob(observations[0], s) for s in range(self.n_states)
    ])
    for t in range(1, T):
        log_A = np.log(self._get_A(t) + 1e-300)
        log_b = np.array([self._emission_log_prob(observations[t], s) for s in range(self.n_states)])
        for s in range(self.n_states):
            c = log_d[t - 1] + log_A[:, s]
            psi[t, s] = np.argmax(c)
            log_d[t, s] = c[psi[t, s]] + log_b[s]
    path = np.zeros(T, dtype=int)
    path[T - 1] = np.argmax(log_d[T - 1])
    for t in range(T - 2, -1, -1):
        path[t] = psi[t + 1, path[t + 1]]
    return path

predict_next_features

predict_next_features(observations: ndarray)

Predict next gameweek's feature vector.

Returns mean, var (per feature), and state distribution.

Source code in fplx/inference/multivariate_hmm.py
def predict_next_features(self, observations: np.ndarray):
    """
    Predict next gameweek's feature vector.

    Returns mean, var (per feature), and state distribution.
    """
    alpha, _ = self.forward(observations)
    next_dist = alpha[-1] @ self._get_A(len(observations))
    mean = next_dist @ self.means
    var = next_dist @ self.vars + next_dist @ (self.means**2) - mean**2
    return mean, np.maximum(var, 1e-8), next_dist

one_step_point_predictions

one_step_point_predictions(
    observations: ndarray,
) -> ndarray

One-step-ahead point predictions for each historical timestep.

Returns array preds where preds[t] predicts points at timestep t, using information up to t-1 (preds[0] is NaN).

Source code in fplx/inference/multivariate_hmm.py
def one_step_point_predictions(self, observations: np.ndarray) -> np.ndarray:
    """One-step-ahead point predictions for each historical timestep.

    Returns array preds where preds[t] predicts points at timestep t,
    using information up to t-1 (preds[0] is NaN).
    """
    T = len(observations)
    preds = np.full(T, np.nan)
    if T < 2:
        return preds

    alpha, _ = self.forward(observations)
    for t in range(1, T):
        pred_dist = alpha[t - 1] @ self._get_A(t)
        preds[t] = self._expected_points_from_state_dist(pred_dist)
    return preds

predict_next_points

predict_next_points(
    observations: ndarray,
) -> tuple[float, float]

Convert predicted features → expected FPL points.

Uses FPL scoring rules applied to predicted feature rates.

Source code in fplx/inference/multivariate_hmm.py
def predict_next_points(self, observations: np.ndarray) -> tuple[float, float]:
    """
    Convert predicted features → expected FPL points.

    Uses FPL scoring rules applied to predicted feature rates.
    """
    feat_mean, feat_var, _ = self.predict_next_features(observations)
    feat_names = POSITION_FEATURES[self.position]
    xpts_idx = feat_names.index("xPts")

    ep = max(0.0, float(feat_mean[xpts_idx]))
    var_pts = float(max(feat_var[xpts_idx], 1e-6) + 1.0)  # residual floor
    return ep, var_pts

fit

fit(
    observations: ndarray,
    n_iter: int = 20,
    tol: float = 0.0001,
    prior_weight: float = 0.85,
)

Baum-Welch EM with MAP-style prior interpolation.

PARAMETER DESCRIPTION
observations

Feature matrix with shape (T, D).

TYPE: ndarray

n_iter

Maximum EM iterations.

TYPE: int DEFAULT: 20

tol

Convergence tolerance on log-likelihood.

TYPE: float DEFAULT: 0.0001

prior_weight

Weight on prior parameters in [0, 1]. Higher values increase regularization toward position-level default emissions/transitions.

TYPE: float DEFAULT: 0.85

Source code in fplx/inference/multivariate_hmm.py
def fit(
    self,
    observations: np.ndarray,
    n_iter: int = 20,
    tol: float = 1e-4,
    prior_weight: float = 0.85,
):
    """Baum-Welch EM with MAP-style prior interpolation.

    Parameters
    ----------
    observations : np.ndarray
        Feature matrix with shape (T, D).
    n_iter : int
        Maximum EM iterations.
    tol : float
        Convergence tolerance on log-likelihood.
    prior_weight : float
        Weight on prior parameters in [0, 1]. Higher values increase
        regularization toward position-level default emissions/transitions.
    """
    T = observations.shape[0]
    prev_ll = -np.inf
    prior_weight = float(np.clip(prior_weight, 0.0, 1.0))

    for _ in range(n_iter):
        alpha, scale = self.forward(observations)

        # Backward pass with scaling aligned to forward()
        beta = np.zeros((T, self.n_states))
        beta[T - 1] = 1.0
        for t in range(T - 2, -1, -1):
            b_next = self._emission_prob_vector(observations[t + 1])
            beta[t] = self._get_A(t + 1) @ (b_next * beta[t + 1])
            if scale[t + 1] > 0:
                beta[t] /= scale[t + 1]

        gamma = alpha * beta
        rs = gamma.sum(axis=1, keepdims=True)
        rs[rs == 0] = 1.0
        gamma /= rs

        # M-step: initial
        self.pi = np.maximum(gamma[0], 1e-10)
        self.pi /= self.pi.sum()

        # M-step: transitions
        xi = np.zeros((T - 1, self.n_states, self.n_states))
        for t in range(T - 1):
            b_next = self._emission_prob_vector(observations[t + 1])
            for i in range(self.n_states):
                for j in range(self.n_states):
                    xi[t, i, j] = alpha[t, i] * self._get_A(t + 1)[i, j] * b_next[j] * beta[t + 1, j]
            xs = xi[t].sum()
            if xs > 0:
                xi[t] /= xs
        for i in range(self.n_states):
            d = gamma[:-1, i].sum()
            if d > 1e-10:
                mle_A = xi[:, i, :].sum(axis=0) / d
                self.A[i] = prior_weight * self.prior_A[i] + (1.0 - prior_weight) * mle_A
            rs = self.A[i].sum()
            if rs > 0:
                self.A[i] /= rs

        # M-step: emissions
        for s in range(self.n_states):
            w = gamma[:, s]
            ws = w.sum()
            if ws > 1e-10:
                mle_mu = np.average(observations, axis=0, weights=w)
                diff = observations - mle_mu
                mle_var = np.average(diff**2, axis=0, weights=w)
                self.means[s] = prior_weight * self.prior_means[s] + (1.0 - prior_weight) * mle_mu
                self.vars[s] = np.maximum(
                    prior_weight * self.prior_vars[s] + (1.0 - prior_weight) * mle_var,
                    1e-4,
                )

        ll = np.sum(np.log(scale + 1e-300))
        if abs(ll - prev_ll) < tol:
            break
        prev_ll = ll
    return self

build_feature_matrix

build_feature_matrix(
    timeseries: DataFrame, position: str
) -> ndarray

Extract position-specific feature matrix from player timeseries.

PARAMETER DESCRIPTION
timeseries

Player gameweek history from vaastav dataset.

TYPE: DataFrame

position

GK, DEF, MID, or FWD.

TYPE: str

RETURNS DESCRIPTION
np.ndarray, shape (T, D) where D depends on position.
Source code in fplx/inference/multivariate_hmm.py
def build_feature_matrix(timeseries: pd.DataFrame, position: str) -> np.ndarray:
    """
    Extract position-specific feature matrix from player timeseries.

    Parameters
    ----------
    timeseries : pd.DataFrame
        Player gameweek history from vaastav dataset.
    position : str
        GK, DEF, MID, or FWD.

    Returns
    -------
    np.ndarray, shape (T, D) where D depends on position.
    """
    n = len(timeseries)
    features = np.zeros((n, 2))

    mins = _safe_col(timeseries, "minutes")
    features[:, 1] = np.clip(mins / 90.0, 0.0, 1.0)  # mins_frac

    # Domain-specific projection from rich event space to structural xPts.
    features[:, 0] = compute_xpoints(timeseries, position)
    return features