Position-aware multivariate-emission HMM for player form inference.
Uses position-specific feature vectors extracted from the full vaastav dataset:
GK: [saves/90, xGC/90, clean_sheet, bonus, mins_frac]
DEF: [xG, xA, xGC/90, clean_sheet, influence/100, bonus, mins_frac]
MID: [xG, xA, creativity/100, threat/100, bonus, mins_frac]
FWD: [xG, xA, threat/100, bonus, mins_frac]
Each state emits a multivariate Gaussian with diagonal covariance.
Baum-Welch learns per-player emission parameters from their history.
The minutes_fraction feature (0 or ~1) lets the HMM identify the Injured
state from the feature vector alone, without NLP news signals.
MultivariateHMM
MultivariateHMM(
position: str = "MID",
transition_matrix: Optional[ndarray] = None,
initial_dist: Optional[ndarray] = None,
)
Position-aware HMM with multivariate diagonal Gaussian emissions.
| PARAMETER |
DESCRIPTION |
position
|
GK, DEF, MID, FWD. Determines feature set and default emissions.
TYPE:
str
DEFAULT:
'MID'
|
Source code in fplx/inference/multivariate_hmm.py
| def __init__(
self,
position: str = "MID",
transition_matrix: Optional[np.ndarray] = None,
initial_dist: Optional[np.ndarray] = None,
):
self.position = position
self.means, self.vars = _default_emissions(position)
# Priors for MAP-style regularization in Baum-Welch.
self.prior_means = self.means.copy()
self.prior_vars = self.vars.copy()
self.prior_A = (
transition_matrix.copy() if transition_matrix is not None else DEFAULT_TRANSITION.copy()
)
self.A = self.prior_A.copy()
self.pi = initial_dist.copy() if initial_dist is not None else DEFAULT_INITIAL.copy()
self.n_states = N_STATES
self.n_features = self.means.shape[1]
self._transition_overrides: dict[int, np.ndarray] = {}
|
inject_news_perturbation
inject_news_perturbation(
timestep: int,
state_boost: dict,
confidence: float = 1.0,
)
Perturb transition matrix at timestep (same API as scalar HMM).
Source code in fplx/inference/multivariate_hmm.py
| def inject_news_perturbation(self, timestep: int, state_boost: dict, confidence: float = 1.0):
"""Perturb transition matrix at timestep (same API as scalar HMM)."""
A_p = self.A.copy()
for src in range(self.n_states):
for tgt, boost in state_boost.items():
A_p[src, tgt] *= 1.0 + confidence * (boost - 1.0)
s = A_p[src].sum()
if s > 0:
A_p[src] /= s
self._transition_overrides[timestep] = A_p
|
forward
forward(observations: ndarray)
Forward algorithm. observations: (T, D).
Source code in fplx/inference/multivariate_hmm.py
| def forward(self, observations: np.ndarray):
"""Forward algorithm. observations: (T, D)."""
T = len(observations)
alpha = np.zeros((T, self.n_states))
scale = np.zeros(T)
b = self._emission_prob_vector(observations[0])
alpha[0] = self.pi * b
scale[0] = alpha[0].sum()
if scale[0] > 0:
alpha[0] /= scale[0]
for t in range(1, T):
b = self._emission_prob_vector(observations[t])
alpha[t] = (alpha[t - 1] @ self._get_A(t)) * b
scale[t] = alpha[t].sum()
if scale[t] > 0:
alpha[t] /= scale[t]
return alpha, scale
|
forward_backward
forward_backward(observations: ndarray) -> ndarray
Smoothed posteriors P(S_t | y_{1:T}).
Source code in fplx/inference/multivariate_hmm.py
| def forward_backward(self, observations: np.ndarray) -> np.ndarray:
"""Smoothed posteriors P(S_t | y_{1:T})."""
T = len(observations)
alpha, scale = self.forward(observations)
beta = np.zeros((T, self.n_states))
beta[T - 1] = 1.0
for t in range(T - 2, -1, -1):
b_next = self._emission_prob_vector(observations[t + 1])
beta[t] = self._get_A(t + 1) @ (b_next * beta[t + 1])
if scale[t + 1] > 0:
beta[t] /= scale[t + 1]
gamma = alpha * beta
rs = gamma.sum(axis=1, keepdims=True)
rs[rs == 0] = 1.0
return gamma / rs
|
viterbi
viterbi(observations: ndarray) -> ndarray
Most likely state sequence.
Source code in fplx/inference/multivariate_hmm.py
| def viterbi(self, observations: np.ndarray) -> np.ndarray:
"""Most likely state sequence."""
T = len(observations)
log_d = np.zeros((T, self.n_states))
psi = np.zeros((T, self.n_states), dtype=int)
log_d[0] = np.log(self.pi + 1e-300) + np.array([
self._emission_log_prob(observations[0], s) for s in range(self.n_states)
])
for t in range(1, T):
log_A = np.log(self._get_A(t) + 1e-300)
log_b = np.array([self._emission_log_prob(observations[t], s) for s in range(self.n_states)])
for s in range(self.n_states):
c = log_d[t - 1] + log_A[:, s]
psi[t, s] = np.argmax(c)
log_d[t, s] = c[psi[t, s]] + log_b[s]
path = np.zeros(T, dtype=int)
path[T - 1] = np.argmax(log_d[T - 1])
for t in range(T - 2, -1, -1):
path[t] = psi[t + 1, path[t + 1]]
return path
|
predict_next_features
predict_next_features(observations: ndarray)
Predict next gameweek's feature vector.
Returns mean, var (per feature), and state distribution.
Source code in fplx/inference/multivariate_hmm.py
| def predict_next_features(self, observations: np.ndarray):
"""
Predict next gameweek's feature vector.
Returns mean, var (per feature), and state distribution.
"""
alpha, _ = self.forward(observations)
next_dist = alpha[-1] @ self._get_A(len(observations))
mean = next_dist @ self.means
var = next_dist @ self.vars + next_dist @ (self.means**2) - mean**2
return mean, np.maximum(var, 1e-8), next_dist
|
one_step_point_predictions
one_step_point_predictions(
observations: ndarray,
) -> ndarray
One-step-ahead point predictions for each historical timestep.
Returns array preds where preds[t] predicts points at timestep t,
using information up to t-1 (preds[0] is NaN).
Source code in fplx/inference/multivariate_hmm.py
| def one_step_point_predictions(self, observations: np.ndarray) -> np.ndarray:
"""One-step-ahead point predictions for each historical timestep.
Returns array preds where preds[t] predicts points at timestep t,
using information up to t-1 (preds[0] is NaN).
"""
T = len(observations)
preds = np.full(T, np.nan)
if T < 2:
return preds
alpha, _ = self.forward(observations)
for t in range(1, T):
pred_dist = alpha[t - 1] @ self._get_A(t)
preds[t] = self._expected_points_from_state_dist(pred_dist)
return preds
|
predict_next_points
predict_next_points(
observations: ndarray,
) -> tuple[float, float]
Convert predicted features → expected FPL points.
Uses FPL scoring rules applied to predicted feature rates.
Source code in fplx/inference/multivariate_hmm.py
| def predict_next_points(self, observations: np.ndarray) -> tuple[float, float]:
"""
Convert predicted features → expected FPL points.
Uses FPL scoring rules applied to predicted feature rates.
"""
feat_mean, feat_var, _ = self.predict_next_features(observations)
feat_names = POSITION_FEATURES[self.position]
xpts_idx = feat_names.index("xPts")
ep = max(0.0, float(feat_mean[xpts_idx]))
var_pts = float(max(feat_var[xpts_idx], 1e-6) + 1.0) # residual floor
return ep, var_pts
|
fit
fit(
observations: ndarray,
n_iter: int = 20,
tol: float = 0.0001,
prior_weight: float = 0.85,
)
Baum-Welch EM with MAP-style prior interpolation.
| PARAMETER |
DESCRIPTION |
observations
|
Feature matrix with shape (T, D).
TYPE:
ndarray
|
n_iter
|
TYPE:
int
DEFAULT:
20
|
tol
|
Convergence tolerance on log-likelihood.
TYPE:
float
DEFAULT:
0.0001
|
prior_weight
|
Weight on prior parameters in [0, 1]. Higher values increase
regularization toward position-level default emissions/transitions.
TYPE:
float
DEFAULT:
0.85
|
Source code in fplx/inference/multivariate_hmm.py
| def fit(
self,
observations: np.ndarray,
n_iter: int = 20,
tol: float = 1e-4,
prior_weight: float = 0.85,
):
"""Baum-Welch EM with MAP-style prior interpolation.
Parameters
----------
observations : np.ndarray
Feature matrix with shape (T, D).
n_iter : int
Maximum EM iterations.
tol : float
Convergence tolerance on log-likelihood.
prior_weight : float
Weight on prior parameters in [0, 1]. Higher values increase
regularization toward position-level default emissions/transitions.
"""
T = observations.shape[0]
prev_ll = -np.inf
prior_weight = float(np.clip(prior_weight, 0.0, 1.0))
for _ in range(n_iter):
alpha, scale = self.forward(observations)
# Backward pass with scaling aligned to forward()
beta = np.zeros((T, self.n_states))
beta[T - 1] = 1.0
for t in range(T - 2, -1, -1):
b_next = self._emission_prob_vector(observations[t + 1])
beta[t] = self._get_A(t + 1) @ (b_next * beta[t + 1])
if scale[t + 1] > 0:
beta[t] /= scale[t + 1]
gamma = alpha * beta
rs = gamma.sum(axis=1, keepdims=True)
rs[rs == 0] = 1.0
gamma /= rs
# M-step: initial
self.pi = np.maximum(gamma[0], 1e-10)
self.pi /= self.pi.sum()
# M-step: transitions
xi = np.zeros((T - 1, self.n_states, self.n_states))
for t in range(T - 1):
b_next = self._emission_prob_vector(observations[t + 1])
for i in range(self.n_states):
for j in range(self.n_states):
xi[t, i, j] = alpha[t, i] * self._get_A(t + 1)[i, j] * b_next[j] * beta[t + 1, j]
xs = xi[t].sum()
if xs > 0:
xi[t] /= xs
for i in range(self.n_states):
d = gamma[:-1, i].sum()
if d > 1e-10:
mle_A = xi[:, i, :].sum(axis=0) / d
self.A[i] = prior_weight * self.prior_A[i] + (1.0 - prior_weight) * mle_A
rs = self.A[i].sum()
if rs > 0:
self.A[i] /= rs
# M-step: emissions
for s in range(self.n_states):
w = gamma[:, s]
ws = w.sum()
if ws > 1e-10:
mle_mu = np.average(observations, axis=0, weights=w)
diff = observations - mle_mu
mle_var = np.average(diff**2, axis=0, weights=w)
self.means[s] = prior_weight * self.prior_means[s] + (1.0 - prior_weight) * mle_mu
self.vars[s] = np.maximum(
prior_weight * self.prior_vars[s] + (1.0 - prior_weight) * mle_var,
1e-4,
)
ll = np.sum(np.log(scale + 1e-300))
if abs(ll - prev_ll) < tol:
break
prev_ll = ll
return self
|
build_feature_matrix
build_feature_matrix(
timeseries: DataFrame, position: str
) -> ndarray
Extract position-specific feature matrix from player timeseries.
| PARAMETER |
DESCRIPTION |
timeseries
|
Player gameweek history from vaastav dataset.
TYPE:
DataFrame
|
position
|
TYPE:
str
|
| RETURNS |
DESCRIPTION |
np.ndarray, shape (T, D) where D depends on position.
|
|
Source code in fplx/inference/multivariate_hmm.py
| def build_feature_matrix(timeseries: pd.DataFrame, position: str) -> np.ndarray:
"""
Extract position-specific feature matrix from player timeseries.
Parameters
----------
timeseries : pd.DataFrame
Player gameweek history from vaastav dataset.
position : str
GK, DEF, MID, or FWD.
Returns
-------
np.ndarray, shape (T, D) where D depends on position.
"""
n = len(timeseries)
features = np.zeros((n, 2))
mins = _safe_col(timeseries, "minutes")
features[:, 1] = np.clip(mins / 90.0, 0.0, 1.0) # mins_frac
# Domain-specific projection from rich event space to structural xPts.
features[:, 0] = compute_xpoints(timeseries, position)
return features
|