Skip to content

inference

inference

Probabilistic inference modules for FPLX.

HMMInference

HMMInference(
    transition_matrix: Optional[ndarray] = None,
    emission_params: Optional[dict] = None,
    initial_dist: Optional[ndarray] = None,
)

Hidden Markov Model for discrete player form states.

Supports dynamic transition matrix perturbation so that external signals (news, injuries) can shift state probabilities mid-sequence.

PARAMETER DESCRIPTION
transition_matrix

transition_matrix[i,j] = P(S_{t+1}=j | S_t=i). Rows must sum to 1.

TYPE: (ndarray, shape(N, N)) DEFAULT: None

emission_params

{state_index: (mean, std)} for Gaussian emissions.

TYPE: dict DEFAULT: None

initial_dist

Prior over initial state.

TYPE: (ndarray, shape(N)) DEFAULT: None

Source code in fplx/inference/hmm.py
def __init__(
    self,
    transition_matrix: Optional[np.ndarray] = None,
    emission_params: Optional[dict] = None,
    initial_dist: Optional[np.ndarray] = None,
):
    self.transition_matrix = (
        transition_matrix.copy() if transition_matrix is not None else DEFAULT_TRANSITION_MATRIX.copy()
    )
    self.emission_params = emission_params or dict(DEFAULT_EMISSION_PARAMS)
    self.pi = initial_dist.copy() if initial_dist is not None else DEFAULT_INITIAL_DIST.copy()
    self.n_states = len(self.pi)

    # per-timestep transition overrides (for news injection)
    # key: timestep t, Value: modified transition_matrix matrix for that step
    self._transition_overrides: dict[int, np.ndarray] = {}

inject_news_perturbation

inject_news_perturbation(
    timestep: int,
    state_boost: dict[int, float],
    confidence: float = 1.0,
)

Perturb transition matrix at a specific timestep based on news.

For each source state, the transition probability toward boosted target states is multiplied by the boost factor (scaled by confidence), then the row is renormalized.

PARAMETER DESCRIPTION
timestep

The gameweek at which the perturbation applies.

TYPE: int

state_boost

{target_state: multiplicative_boost}. E.g., {0: 10.0} means "10x more likely to transition to Injured."

TYPE: dict[int, float]

confidence

Scales the perturbation. 0 = no effect, 1 = full effect.

TYPE: float DEFAULT: 1.0

Source code in fplx/inference/hmm.py
def inject_news_perturbation(
    self,
    timestep: int,
    state_boost: dict[int, float],
    confidence: float = 1.0,
):
    """
    Perturb transition matrix at a specific timestep based on news.

    For each source state, the transition probability toward boosted
    target states is multiplied by the boost factor (scaled by confidence),
    then the row is renormalized.

    Parameters
    ----------
    timestep : int
        The gameweek at which the perturbation applies.
    state_boost : dict[int, float]
        {target_state: multiplicative_boost}. E.g., {0: 10.0} means
        "10x more likely to transition to Injured."
    confidence : float
        Scales the perturbation. 0 = no effect, 1 = full effect.
    """
    perturbed_transition_matrix = self.transition_matrix.copy()

    for source_state in range(self.n_states):
        for target_state, boost in state_boost.items():
            # scale boost by confidence: effective_boost = 1 + confidence*(boost-1)
            effective_boost = 1.0 + confidence * (boost - 1.0)
            perturbed_transition_matrix[source_state, target_state] *= effective_boost

        # renormalize row
        row_sum = perturbed_transition_matrix[source_state].sum()
        if row_sum > 0:
            perturbed_transition_matrix[source_state] /= row_sum

    self._transition_overrides[timestep] = perturbed_transition_matrix

clear_perturbations

clear_perturbations()

Remove all per-timestep transition overrides.

Source code in fplx/inference/hmm.py
def clear_perturbations(self):
    """Remove all per-timestep transition overrides."""
    self._transition_overrides.clear()

forward

forward(observations: ndarray)

Forward algorithm with dynamic transition matrices.

PARAMETER DESCRIPTION
observations

TYPE: (ndarray, shape(num_timesteps))

RETURNS DESCRIPTION
forward_messages

Normalized forward messages. forward_messages[t] = P(S_t | y_1:t)

TYPE: (ndarray, shape(num_timesteps, N))

scale

Per-timestep normalization constants.

TYPE: (ndarray, shape(num_timesteps))

Source code in fplx/inference/hmm.py
def forward(self, observations: np.ndarray):
    """
    Forward algorithm with dynamic transition matrices.

    Parameters
    ----------
    observations : np.ndarray, shape (num_timesteps,)

    Returns
    -------
    forward_messages : np.ndarray, shape (num_timesteps, N)
        Normalized forward messages. forward_messages[t] = P(S_t | y_1:t)
    scale : np.ndarray, shape (num_timesteps,)
        Per-timestep normalization constants.
    """
    num_timesteps = len(observations)
    forward_messages = np.zeros((num_timesteps, self.n_states))
    scale = np.zeros(num_timesteps)

    # t = 0
    b = self._emission_vector(observations[0])
    forward_messages[0] = self.pi * b
    scale[0] = forward_messages[0].sum()
    if scale[0] > 0:
        forward_messages[0] /= scale[0]

    # t = 1..num_timesteps-1
    for t in range(1, num_timesteps):
        transition_matrix_t = self._get_transition_matrix(t)
        b = self._emission_vector(observations[t])
        forward_messages[t] = (forward_messages[t - 1] @ transition_matrix_t) * b
        scale[t] = forward_messages[t].sum()
        if scale[t] > 0:
            forward_messages[t] /= scale[t]

    return forward_messages, scale

forward_backward

forward_backward(observations: ndarray) -> ndarray

Compute smoothed posteriors P(S_t | y_1:num_timesteps).

PARAMETER DESCRIPTION
observations

TYPE: (ndarray, shape(num_timesteps))

RETURNS DESCRIPTION
smoothed_posteriors

smoothed_posteriors[t, s] = P(S_t=s | y_1:num_timesteps)

TYPE: (ndarray, shape(num_timesteps, N))

Source code in fplx/inference/hmm.py
def forward_backward(self, observations: np.ndarray) -> np.ndarray:
    """
    Compute smoothed posteriors P(S_t | y_1:num_timesteps).

    Parameters
    ----------
    observations : np.ndarray, shape (num_timesteps,)

    Returns
    -------
    smoothed_posteriors : np.ndarray, shape (num_timesteps, N)
        smoothed_posteriors[t, s] = P(S_t=s | y_1:num_timesteps)
    """
    num_timesteps = len(observations)
    forward_messages, scale = self.forward(observations)

    backward_messages = np.zeros((num_timesteps, self.n_states))
    backward_messages[num_timesteps - 1] = 1.0

    for t in range(num_timesteps - 2, -1, -1):
        transition_matrix_t_plus_1 = self._get_transition_matrix(t + 1)
        b_next = self._emission_vector(observations[t + 1])
        backward_messages[t] = transition_matrix_t_plus_1 @ (b_next * backward_messages[t + 1])
        if scale[t + 1] > 0:
            backward_messages[t] /= scale[t + 1]

    smoothed_posteriors = forward_messages * backward_messages
    row_sums = smoothed_posteriors.sum(axis=1, keepdims=True)
    row_sums[row_sums == 0] = 1.0
    smoothed_posteriors /= row_sums

    return smoothed_posteriors

viterbi

viterbi(observations: ndarray) -> ndarray

Most likely state sequence via Viterbi decoding.

PARAMETER DESCRIPTION
observations

TYPE: (ndarray, shape(num_timesteps))

RETURNS DESCRIPTION
best_path

TYPE: np.ndarray of int, shape (num_timesteps,)

Source code in fplx/inference/hmm.py
def viterbi(self, observations: np.ndarray) -> np.ndarray:
    """
    Most likely state sequence via Viterbi decoding.

    Parameters
    ----------
    observations : np.ndarray, shape (num_timesteps,)

    Returns
    -------
    best_path : np.ndarray of int, shape (num_timesteps,)
    """
    num_timesteps = len(observations)
    log_pi = np.log(self.pi + 1e-300)

    log_probabilities = np.zeros((num_timesteps, self.n_states))
    backpointers = np.zeros((num_timesteps, self.n_states), dtype=int)

    b0 = self._emission_vector(observations[0])
    log_probabilities[0] = log_pi + np.log(b0 + 1e-300)

    for t in range(1, num_timesteps):
        transition_matrix_t = self._get_transition_matrix(t)
        log_transition_matrix_t = np.log(transition_matrix_t + 1e-300)
        b = self._emission_vector(observations[t])
        for s in range(self.n_states):
            candidates = log_probabilities[t - 1] + log_transition_matrix_t[:, s]
            backpointers[t, s] = np.argmax(candidates)
            log_probabilities[t, s] = candidates[backpointers[t, s]] + np.log(b[s] + 1e-300)

    best_path = np.zeros(num_timesteps, dtype=int)
    best_path[num_timesteps - 1] = np.argmax(log_probabilities[num_timesteps - 1])
    for t in range(num_timesteps - 2, -1, -1):
        best_path[t] = backpointers[t + 1, best_path[t + 1]]

    return best_path

predict_next

predict_next(
    observations: ndarray,
) -> tuple[float, float, ndarray]

Predict next timestep's points distribution.

Runs forward algorithm, then propagates one step ahead via the transition matrix.

PARAMETER DESCRIPTION
observations

TYPE: (ndarray, shape(num_timesteps))

RETURNS DESCRIPTION
expected_points

E[Y_{num_timesteps+1} | y_1:num_timesteps]

TYPE: float

variance

Var[Y_{num_timesteps+1} | y_1:num_timesteps] (from law of total variance)

TYPE: float

next_state_dist

P(S_{num_timesteps+1} | y_1:num_timesteps)

TYPE: (ndarray, shape(N))

Source code in fplx/inference/hmm.py
def predict_next(self, observations: np.ndarray) -> tuple[float, float, np.ndarray]:
    """
    Predict next timestep's points distribution.

    Runs forward algorithm, then propagates one step ahead via
    the transition matrix.

    Parameters
    ----------
    observations : np.ndarray, shape (num_timesteps,)

    Returns
    -------
    expected_points : float
        E[Y_{num_timesteps+1} | y_1:num_timesteps]
    variance : float
        Var[Y_{num_timesteps+1} | y_1:num_timesteps] (from law of total variance)
    next_state_dist : np.ndarray, shape (N,)
        P(S_{num_timesteps+1} | y_1:num_timesteps)
    """
    forward_messages, _ = self.forward(observations)
    current_belief = forward_messages[-1]  # P(S_num_timesteps | y_1:num_timesteps)

    num_timesteps = len(observations)
    next_transition_matrix = self._get_transition_matrix(num_timesteps)  # transition for next step
    next_state_dist = (
        current_belief @ next_transition_matrix
    )  # P(S_{num_timesteps+1} | y_1:num_timesteps)

    state_means = np.array([self.emission_params[s][0] for s in range(self.n_states)])
    state_vars = np.array([self.emission_params[s][1] ** 2 for s in range(self.n_states)])

    expected_points = next_state_dist @ state_means

    # law of total variance: Var = E[Var|S] + Var[E|S]
    variance = next_state_dist @ state_vars + next_state_dist @ (state_means**2) - expected_points**2

    return expected_points, max(0.0, variance), next_state_dist

fit

fit(
    observations: ndarray,
    n_iter: int = 20,
    tol: float = 0.0001,
    verbose: bool = False,
)

Learn transition matrix and emission parameters via Baum-Welch EM.

PARAMETER DESCRIPTION
observations

Training sequence.

TYPE: (ndarray, shape(num_timesteps))

n_iter

Maximum EM iterations.

TYPE: int DEFAULT: 20

tol

Convergence tolerance on log-likelihood.

TYPE: float DEFAULT: 0.0001

verbose

Print progress.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
self
Source code in fplx/inference/hmm.py
def fit(
    self,
    observations: np.ndarray,
    n_iter: int = 20,
    tol: float = 1e-4,
    verbose: bool = False,
):
    """
    Learn transition matrix and emission parameters via Baum-Welch EM.

    Parameters
    ----------
    observations : np.ndarray, shape (num_timesteps,)
        Training sequence.
    n_iter : int
        Maximum EM iterations.
    tol : float
        Convergence tolerance on log-likelihood.
    verbose : bool
        Print progress.

    Returns
    -------
    self
    """
    num_timesteps = len(observations)
    prev_log_likelihood = -np.inf

    for iteration in range(n_iter):
        # E-step
        forward_messages, scale = self.forward(observations)

        # Backward pass using the same scaling factors as forward()
        backward_messages = np.zeros((num_timesteps, self.n_states))
        backward_messages[num_timesteps - 1] = 1.0
        for t in range(num_timesteps - 2, -1, -1):
            transition_matrix_t_plus_1 = self._get_transition_matrix(t + 1)
            b_next = self._emission_vector(observations[t + 1])
            backward_messages[t] = transition_matrix_t_plus_1 @ (b_next * backward_messages[t + 1])
            if scale[t + 1] > 0:
                backward_messages[t] /= scale[t + 1]

        # gamma_t(i) = P(S_t=i | y_1:T)
        smoothed_posteriors = forward_messages * backward_messages
        row_sums = smoothed_posteriors.sum(axis=1, keepdims=True)
        row_sums[row_sums == 0] = 1.0
        smoothed_posteriors /= row_sums

        # transition_posteriors: P(S_t=i, S_{t+1}=j | y_1:num_timesteps) for transition re-estimation
        transition_posteriors = np.zeros((num_timesteps - 1, self.n_states, self.n_states))
        for t in range(num_timesteps - 1):
            transition_matrix_t_plus_1 = self._get_transition_matrix(t + 1)
            b_next = self._emission_vector(observations[t + 1])

            # xi_t(i,j) = P(S_t=i, S_{t+1}=j | y_1:T)
            for i in range(self.n_states):
                for j in range(self.n_states):
                    transition_posteriors[t, i, j] = (
                        forward_messages[t, i]
                        * transition_matrix_t_plus_1[i, j]
                        * b_next[j]
                        * backward_messages[t + 1, j]
                    )

            xi_sum = transition_posteriors[t].sum()
            if xi_sum > 0:
                transition_posteriors[t] /= xi_sum

        # M-step
        # Re-estimate initial distribution
        self.pi = smoothed_posteriors[0]

        # Re-estimate transition matrix
        for i in range(self.n_states):
            denom = smoothed_posteriors[:-1, i].sum()
            if denom > 0:
                for j in range(self.n_states):
                    self.transition_matrix[i, j] = transition_posteriors[:, i, j].sum() / denom
            # Renormalize
            row_sum = self.transition_matrix[i].sum()
            if row_sum > 0:
                self.transition_matrix[i] /= row_sum

        # re-estimate emission parameters
        for s in range(self.n_states):
            weights = smoothed_posteriors[:, s]
            w_sum = weights.sum()
            if w_sum > 1e-10:
                mu = np.average(observations, weights=weights)
                var = np.average((observations - mu) ** 2, weights=weights)
                sigma = max(np.sqrt(var), 0.1)  # floor to prevent collapse
                self.emission_params[s] = (mu, sigma)

        # log-likelihood
        log_likelihood = np.sum(np.log(scale + 1e-300))
        if verbose:
            logger.info("EM iteration %d: LL = %.4f", iteration, log_likelihood)

        if abs(log_likelihood - prev_log_likelihood) < tol:
            if verbose:
                logger.info("Converged at iteration %d", iteration)
            break
        prev_log_likelihood = log_likelihood

    return self

KalmanFilter

KalmanFilter(
    process_noise: float = 1.0,
    observation_noise: float = 4.0,
    initial_state_mean: float = 4.0,
    initial_state_covariance: float = 2.0,
)

1D Kalman Filter for tracking latent point potential.

PARAMETER DESCRIPTION
process_noise

Default process noise variance (form drift rate).

TYPE: float DEFAULT: 1.0

observation_noise

Default observation noise variance (weekly point noise).

TYPE: float DEFAULT: 4.0

initial_state_mean

Initial state estimate.

TYPE: float DEFAULT: 4.0

initial_state_covariance

Initial state uncertainty (variance).

TYPE: float DEFAULT: 2.0

Source code in fplx/inference/kalman.py
def __init__(
    self,
    process_noise: float = 1.0,
    observation_noise: float = 4.0,
    initial_state_mean: float = 4.0,
    initial_state_covariance: float = 2.0,
):
    self.default_process_noise = process_noise
    self.default_observation_noise = observation_noise
    self.initial_state_mean = initial_state_mean
    self.initial_state_covariance = initial_state_covariance

    # Per-timestep noise overrides
    self._process_noise_overrides: dict[int, float] = {}
    self._observation_noise_overrides: dict[int, float] = {}

    # Stored results after filtering
    self.filtered_state_means: Optional[np.ndarray] = None
    self.filtered_state_covariances: Optional[np.ndarray] = None
    self.kalman_gains: Optional[np.ndarray] = None  # Kalman gains

inject_process_shock

inject_process_shock(timestep: int, multiplier: float)

Inflate process noise at a specific timestep.

Use when news indicates a sudden form change (injury, transfer). process_noise_t = default_process_noise * multiplier.

PARAMETER DESCRIPTION
timestep

Gameweek index.

TYPE: int

multiplier

Process noise multiplier (>1 = more uncertainty about form drift).

TYPE: float

Source code in fplx/inference/kalman.py
def inject_process_shock(self, timestep: int, multiplier: float):
    """
    Inflate process noise at a specific timestep.

    Use when news indicates a sudden form change (injury, transfer).
    process_noise_t = default_process_noise * multiplier.

    Parameters
    ----------
    timestep : int
        Gameweek index.
    multiplier : float
        Process noise multiplier (>1 = more uncertainty about form drift).
    """
    self._process_noise_overrides[timestep] = self.default_process_noise * multiplier

inject_observation_noise

inject_observation_noise(timestep: int, factor: float)

Adjust observation noise at a specific timestep.

Use for fixture difficulty: harder opponents → less predictable points. observation_noise_t = default_observation_noise * factor.

PARAMETER DESCRIPTION
timestep

Gameweek index.

TYPE: int

factor

Observation noise factor (>1 = harder fixture, noisier observation).

TYPE: float

Source code in fplx/inference/kalman.py
def inject_observation_noise(self, timestep: int, factor: float):
    """
    Adjust observation noise at a specific timestep.

    Use for fixture difficulty: harder opponents → less predictable points.
    observation_noise_t = default_observation_noise * factor.

    Parameters
    ----------
    timestep : int
        Gameweek index.
    factor : float
        Observation noise factor (>1 = harder fixture, noisier observation).
    """
    self._observation_noise_overrides[timestep] = self.default_observation_noise * factor

clear_overrides

clear_overrides()

Remove all per-timestep noise overrides.

Source code in fplx/inference/kalman.py
def clear_overrides(self):
    """Remove all per-timestep noise overrides."""
    self._process_noise_overrides.clear()
    self._observation_noise_overrides.clear()

get_process_noise_override

get_process_noise_override(
    timestep: int,
) -> Optional[float]

Return explicit process noise override at timestep, if any.

Source code in fplx/inference/kalman.py
def get_process_noise_override(self, timestep: int) -> Optional[float]:
    """Return explicit process noise override at timestep, if any."""
    return self._process_noise_overrides.get(timestep)

set_noise_overrides

set_noise_overrides(
    process_noise_overrides: dict[int, float],
    observation_noise_overrides: dict[int, float],
)

Replace per-timestep noise overrides.

Source code in fplx/inference/kalman.py
def set_noise_overrides(
    self,
    process_noise_overrides: dict[int, float],
    observation_noise_overrides: dict[int, float],
):
    """Replace per-timestep noise overrides."""
    self._process_noise_overrides = dict(process_noise_overrides)
    self._observation_noise_overrides = dict(observation_noise_overrides)

copy_with_overrides

copy_with_overrides(
    max_timestep: Optional[int] = None,
) -> KalmanFilter

Create a parameter-identical filter with copied noise overrides.

PARAMETER DESCRIPTION
max_timestep

If provided, only overrides for timesteps <= max_timestep are copied.

TYPE: int DEFAULT: None

Source code in fplx/inference/kalman.py
def copy_with_overrides(self, max_timestep: Optional[int] = None) -> "KalmanFilter":
    """Create a parameter-identical filter with copied noise overrides.

    Parameters
    ----------
    max_timestep : int, optional
        If provided, only overrides for timesteps <= max_timestep are copied.
    """
    copied = KalmanFilter(
        process_noise=self.default_process_noise,
        observation_noise=self.default_observation_noise,
        initial_state_mean=self.initial_state_mean,
        initial_state_covariance=self.initial_state_covariance,
    )

    if max_timestep is None:
        proc = dict(self._process_noise_overrides)
        obs = dict(self._observation_noise_overrides)
    else:
        proc = {k: v for k, v in self._process_noise_overrides.items() if k <= max_timestep}
        obs = {k: v for k, v in self._observation_noise_overrides.items() if k <= max_timestep}

    copied.set_noise_overrides(proc, obs)

    return copied

filter

filter(observations: ndarray)

Run Kalman filter on observations with per-timestep noise.

PARAMETER DESCRIPTION
observations

TYPE: (ndarray, shape(num_timesteps))

RETURNS DESCRIPTION
filtered_state_means

Filtered state estimates (posterior mean).

TYPE: (ndarray, shape(num_timesteps))

filtered_state_covariances

Filtered state uncertainties (posterior variance).

TYPE: (ndarray, shape(num_timesteps))

Source code in fplx/inference/kalman.py
def filter(self, observations: np.ndarray):
    """
    Run Kalman filter on observations with per-timestep noise.

    Parameters
    ----------
    observations : np.ndarray, shape (num_timesteps,)

    Returns
    -------
    filtered_state_means : np.ndarray, shape (num_timesteps,)
        Filtered state estimates (posterior mean).
    filtered_state_covariances : np.ndarray, shape (num_timesteps,)
        Filtered state uncertainties (posterior variance).
    """
    num_timesteps = len(observations)
    filtered_state_means = np.zeros(num_timesteps)
    filtered_state_covariances = np.zeros(num_timesteps)
    kalman_gains = np.zeros(num_timesteps)

    predicted_state_mean = self.initial_state_mean
    predicted_state_covariance = self.initial_state_covariance

    for t in range(num_timesteps):
        process_noise_t = self._get_process_noise(t)
        observation_noise_t = self._get_observation_noise(t)

        # Predict
        if t > 0:
            predicted_state_mean = filtered_state_means[t - 1]
            predicted_state_covariance = filtered_state_covariances[t - 1] + process_noise_t

        # Update
        y = observations[t]
        innovation = y - predicted_state_mean
        innovation_covariance = predicted_state_covariance + observation_noise_t  # Innovation covariance
        kalman_gain = predicted_state_covariance / innovation_covariance  # Kalman gain

        filtered_state_means[t] = predicted_state_mean + kalman_gain * innovation
        filtered_state_covariances[t] = (1 - kalman_gain) * predicted_state_covariance
        kalman_gains[t] = kalman_gain

    self.filtered_state_means = filtered_state_means
    self.filtered_state_covariances = filtered_state_covariances
    self.kalman_gains = kalman_gains

    return filtered_state_means, filtered_state_covariances

predict_next

predict_next() -> tuple[float, float]

Predict next observation with uncertainty.

Returns the predictive distribution for Y_{t+1} (the observation), not X_{t+1} (the latent state). This ensures consistency with the HMM predict_next which also returns observation-level variance.

Var[Y_{t+1}] = Var[X_{t+1}|y_{1:t}] + R = (P_t + Q) + R

Must call filter() first.

RETURNS DESCRIPTION
predicted_mean

E[Y_{t+1} | y_{1:t}].

TYPE: float

predicted_var

Var[Y_{t+1} | y_{1:t}] (observation-level, includes R).

TYPE: float

Source code in fplx/inference/kalman.py
def predict_next(self) -> tuple[float, float]:
    """
    Predict next observation with uncertainty.

    Returns the predictive distribution for Y_{t+1} (the observation),
    not X_{t+1} (the latent state). This ensures consistency with the
    HMM predict_next which also returns observation-level variance.

    Var[Y_{t+1}] = Var[X_{t+1}|y_{1:t}] + R
                 = (P_t + Q) + R

    Must call filter() first.

    Returns
    -------
    predicted_mean : float
        E[Y_{t+1} | y_{1:t}].
    predicted_var : float
        Var[Y_{t+1} | y_{1:t}] (observation-level, includes R).
    """
    if self.filtered_state_means is None or self.filtered_state_covariances is None:
        raise RuntimeError("Must call filter() before predict_next().")

    num_timesteps = len(self.filtered_state_means)
    next_process_noise = self._get_process_noise(num_timesteps)
    next_observation_noise = self._get_observation_noise(num_timesteps)

    predicted_mean = self.filtered_state_means[-1]
    # State-level: P_{t+1|t} = P_{t|t} + Q
    state_var = self.filtered_state_covariances[-1] + next_process_noise
    # Observation-level: Var[Y] = P_{t+1|t} + R
    predicted_var = state_var + next_observation_noise

    return predicted_mean, predicted_var

smooth

smooth(observations: ndarray)

Run RTS smoother (backward pass after forward Kalman filter).

PARAMETER DESCRIPTION
observations

TYPE: (ndarray, shape(num_timesteps))

RETURNS DESCRIPTION
smoothed_state_means

Smoothed state estimates.

TYPE: (ndarray, shape(num_timesteps))

smoothed_state_covariances

Smoothed state uncertainties.

TYPE: (ndarray, shape(num_timesteps))

Source code in fplx/inference/kalman.py
def smooth(self, observations: np.ndarray):
    """
    Run RTS smoother (backward pass after forward Kalman filter).

    Parameters
    ----------
    observations : np.ndarray, shape (num_timesteps,)

    Returns
    -------
    smoothed_state_means : np.ndarray, shape (num_timesteps,)
        Smoothed state estimates.
    smoothed_state_covariances : np.ndarray, shape (num_timesteps,)
        Smoothed state uncertainties.
    """
    filtered_state_means, filtered_state_covariances = self.filter(observations)
    num_timesteps = len(observations)

    smoothed_state_means = np.zeros(num_timesteps)
    smoothed_state_covariances = np.zeros(num_timesteps)

    smoothed_state_means[-1] = filtered_state_means[-1]
    smoothed_state_covariances[-1] = filtered_state_covariances[-1]

    for t in range(num_timesteps - 2, -1, -1):
        next_process_noise = self._get_process_noise(t + 1)
        predicted_state_covariance = filtered_state_covariances[t] + next_process_noise

        # Smoother gain
        if predicted_state_covariance > 0:
            smoother_gain = filtered_state_covariances[t] / predicted_state_covariance
        else:
            smoother_gain = 0.0

        smoothed_state_means[t] = filtered_state_means[t] + smoother_gain * (
            smoothed_state_means[t + 1] - filtered_state_means[t]
        )
        smoothed_state_covariances[t] = filtered_state_covariances[t] + smoother_gain**2 * (
            smoothed_state_covariances[t + 1] - predicted_state_covariance
        )

    return smoothed_state_means, smoothed_state_covariances

MultivariateHMM

MultivariateHMM(
    position: str = "MID",
    transition_matrix: Optional[ndarray] = None,
    initial_dist: Optional[ndarray] = None,
)

Position-aware HMM with multivariate diagonal Gaussian emissions.

PARAMETER DESCRIPTION
position

GK, DEF, MID, FWD. Determines feature set and default emissions.

TYPE: str DEFAULT: 'MID'

Source code in fplx/inference/multivariate_hmm.py
def __init__(
    self,
    position: str = "MID",
    transition_matrix: Optional[np.ndarray] = None,
    initial_dist: Optional[np.ndarray] = None,
):
    self.position = position
    self.means, self.vars = _default_emissions(position)

    # Priors for MAP-style regularization in Baum-Welch.
    self.prior_means = self.means.copy()
    self.prior_vars = self.vars.copy()
    self.prior_A = (
        transition_matrix.copy() if transition_matrix is not None else DEFAULT_TRANSITION.copy()
    )

    self.A = self.prior_A.copy()
    self.pi = initial_dist.copy() if initial_dist is not None else DEFAULT_INITIAL.copy()
    self.n_states = N_STATES
    self.n_features = self.means.shape[1]
    self._transition_overrides: dict[int, np.ndarray] = {}

inject_news_perturbation

inject_news_perturbation(
    timestep: int,
    state_boost: dict,
    confidence: float = 1.0,
)

Perturb transition matrix at timestep (same API as scalar HMM).

Source code in fplx/inference/multivariate_hmm.py
def inject_news_perturbation(self, timestep: int, state_boost: dict, confidence: float = 1.0):
    """Perturb transition matrix at timestep (same API as scalar HMM)."""
    A_p = self.A.copy()
    for src in range(self.n_states):
        for tgt, boost in state_boost.items():
            A_p[src, tgt] *= 1.0 + confidence * (boost - 1.0)
        s = A_p[src].sum()
        if s > 0:
            A_p[src] /= s
    self._transition_overrides[timestep] = A_p

forward

forward(observations: ndarray)

Forward algorithm. observations: (T, D).

Source code in fplx/inference/multivariate_hmm.py
def forward(self, observations: np.ndarray):
    """Forward algorithm. observations: (T, D)."""
    T = len(observations)
    alpha = np.zeros((T, self.n_states))
    scale = np.zeros(T)
    b = self._emission_prob_vector(observations[0])
    alpha[0] = self.pi * b
    scale[0] = alpha[0].sum()
    if scale[0] > 0:
        alpha[0] /= scale[0]
    for t in range(1, T):
        b = self._emission_prob_vector(observations[t])
        alpha[t] = (alpha[t - 1] @ self._get_A(t)) * b
        scale[t] = alpha[t].sum()
        if scale[t] > 0:
            alpha[t] /= scale[t]
    return alpha, scale

forward_backward

forward_backward(observations: ndarray) -> ndarray

Smoothed posteriors P(S_t | y_{1:T}).

Source code in fplx/inference/multivariate_hmm.py
def forward_backward(self, observations: np.ndarray) -> np.ndarray:
    """Smoothed posteriors P(S_t | y_{1:T})."""
    T = len(observations)
    alpha, scale = self.forward(observations)
    beta = np.zeros((T, self.n_states))
    beta[T - 1] = 1.0
    for t in range(T - 2, -1, -1):
        b_next = self._emission_prob_vector(observations[t + 1])
        beta[t] = self._get_A(t + 1) @ (b_next * beta[t + 1])
        if scale[t + 1] > 0:
            beta[t] /= scale[t + 1]
    gamma = alpha * beta
    rs = gamma.sum(axis=1, keepdims=True)
    rs[rs == 0] = 1.0
    return gamma / rs

viterbi

viterbi(observations: ndarray) -> ndarray

Most likely state sequence.

Source code in fplx/inference/multivariate_hmm.py
def viterbi(self, observations: np.ndarray) -> np.ndarray:
    """Most likely state sequence."""
    T = len(observations)
    log_d = np.zeros((T, self.n_states))
    psi = np.zeros((T, self.n_states), dtype=int)
    log_d[0] = np.log(self.pi + 1e-300) + np.array([
        self._emission_log_prob(observations[0], s) for s in range(self.n_states)
    ])
    for t in range(1, T):
        log_A = np.log(self._get_A(t) + 1e-300)
        log_b = np.array([self._emission_log_prob(observations[t], s) for s in range(self.n_states)])
        for s in range(self.n_states):
            c = log_d[t - 1] + log_A[:, s]
            psi[t, s] = np.argmax(c)
            log_d[t, s] = c[psi[t, s]] + log_b[s]
    path = np.zeros(T, dtype=int)
    path[T - 1] = np.argmax(log_d[T - 1])
    for t in range(T - 2, -1, -1):
        path[t] = psi[t + 1, path[t + 1]]
    return path

predict_next_features

predict_next_features(observations: ndarray)

Predict next gameweek's feature vector.

Returns mean, var (per feature), and state distribution.

Source code in fplx/inference/multivariate_hmm.py
def predict_next_features(self, observations: np.ndarray):
    """
    Predict next gameweek's feature vector.

    Returns mean, var (per feature), and state distribution.
    """
    alpha, _ = self.forward(observations)
    next_dist = alpha[-1] @ self._get_A(len(observations))
    mean = next_dist @ self.means
    var = next_dist @ self.vars + next_dist @ (self.means**2) - mean**2
    return mean, np.maximum(var, 1e-8), next_dist

one_step_point_predictions

one_step_point_predictions(
    observations: ndarray,
) -> ndarray

One-step-ahead point predictions for each historical timestep.

Returns array preds where preds[t] predicts points at timestep t, using information up to t-1 (preds[0] is NaN).

Source code in fplx/inference/multivariate_hmm.py
def one_step_point_predictions(self, observations: np.ndarray) -> np.ndarray:
    """One-step-ahead point predictions for each historical timestep.

    Returns array preds where preds[t] predicts points at timestep t,
    using information up to t-1 (preds[0] is NaN).
    """
    T = len(observations)
    preds = np.full(T, np.nan)
    if T < 2:
        return preds

    alpha, _ = self.forward(observations)
    for t in range(1, T):
        pred_dist = alpha[t - 1] @ self._get_A(t)
        preds[t] = self._expected_points_from_state_dist(pred_dist)
    return preds

predict_next_points

predict_next_points(
    observations: ndarray,
) -> tuple[float, float]

Convert predicted features → expected FPL points.

Uses FPL scoring rules applied to predicted feature rates.

Source code in fplx/inference/multivariate_hmm.py
def predict_next_points(self, observations: np.ndarray) -> tuple[float, float]:
    """
    Convert predicted features → expected FPL points.

    Uses FPL scoring rules applied to predicted feature rates.
    """
    feat_mean, feat_var, _ = self.predict_next_features(observations)
    feat_names = POSITION_FEATURES[self.position]
    xpts_idx = feat_names.index("xPts")

    ep = max(0.0, float(feat_mean[xpts_idx]))
    var_pts = float(max(feat_var[xpts_idx], 1e-6) + 1.0)  # residual floor
    return ep, var_pts

fit

fit(
    observations: ndarray,
    n_iter: int = 20,
    tol: float = 0.0001,
    prior_weight: float = 0.85,
)

Baum-Welch EM with MAP-style prior interpolation.

PARAMETER DESCRIPTION
observations

Feature matrix with shape (T, D).

TYPE: ndarray

n_iter

Maximum EM iterations.

TYPE: int DEFAULT: 20

tol

Convergence tolerance on log-likelihood.

TYPE: float DEFAULT: 0.0001

prior_weight

Weight on prior parameters in [0, 1]. Higher values increase regularization toward position-level default emissions/transitions.

TYPE: float DEFAULT: 0.85

Source code in fplx/inference/multivariate_hmm.py
def fit(
    self,
    observations: np.ndarray,
    n_iter: int = 20,
    tol: float = 1e-4,
    prior_weight: float = 0.85,
):
    """Baum-Welch EM with MAP-style prior interpolation.

    Parameters
    ----------
    observations : np.ndarray
        Feature matrix with shape (T, D).
    n_iter : int
        Maximum EM iterations.
    tol : float
        Convergence tolerance on log-likelihood.
    prior_weight : float
        Weight on prior parameters in [0, 1]. Higher values increase
        regularization toward position-level default emissions/transitions.
    """
    T = observations.shape[0]
    prev_ll = -np.inf
    prior_weight = float(np.clip(prior_weight, 0.0, 1.0))

    for _ in range(n_iter):
        alpha, scale = self.forward(observations)

        # Backward pass with scaling aligned to forward()
        beta = np.zeros((T, self.n_states))
        beta[T - 1] = 1.0
        for t in range(T - 2, -1, -1):
            b_next = self._emission_prob_vector(observations[t + 1])
            beta[t] = self._get_A(t + 1) @ (b_next * beta[t + 1])
            if scale[t + 1] > 0:
                beta[t] /= scale[t + 1]

        gamma = alpha * beta
        rs = gamma.sum(axis=1, keepdims=True)
        rs[rs == 0] = 1.0
        gamma /= rs

        # M-step: initial
        self.pi = np.maximum(gamma[0], 1e-10)
        self.pi /= self.pi.sum()

        # M-step: transitions
        xi = np.zeros((T - 1, self.n_states, self.n_states))
        for t in range(T - 1):
            b_next = self._emission_prob_vector(observations[t + 1])
            for i in range(self.n_states):
                for j in range(self.n_states):
                    xi[t, i, j] = alpha[t, i] * self._get_A(t + 1)[i, j] * b_next[j] * beta[t + 1, j]
            xs = xi[t].sum()
            if xs > 0:
                xi[t] /= xs
        for i in range(self.n_states):
            d = gamma[:-1, i].sum()
            if d > 1e-10:
                mle_A = xi[:, i, :].sum(axis=0) / d
                self.A[i] = prior_weight * self.prior_A[i] + (1.0 - prior_weight) * mle_A
            rs = self.A[i].sum()
            if rs > 0:
                self.A[i] /= rs

        # M-step: emissions
        for s in range(self.n_states):
            w = gamma[:, s]
            ws = w.sum()
            if ws > 1e-10:
                mle_mu = np.average(observations, axis=0, weights=w)
                diff = observations - mle_mu
                mle_var = np.average(diff**2, axis=0, weights=w)
                self.means[s] = prior_weight * self.prior_means[s] + (1.0 - prior_weight) * mle_mu
                self.vars[s] = np.maximum(
                    prior_weight * self.prior_vars[s] + (1.0 - prior_weight) * mle_var,
                    1e-4,
                )

        ll = np.sum(np.log(scale + 1e-300))
        if abs(ll - prev_ll) < tol:
            break
        prev_ll = ll
    return self

InferenceResult dataclass

InferenceResult(
    filtered_beliefs: ndarray,
    smoothed_beliefs: ndarray,
    viterbi_path: ndarray,
    hmm_predicted_mean: float = 0.0,
    hmm_predicted_var: float = 0.0,
    kalman_filtered: ndarray = (lambda: array([]))(),
    kalman_uncertainty: ndarray = (lambda: array([]))(),
    kf_predicted_mean: float = 0.0,
    kf_predicted_var: float = 0.0,
    fused_mean: ndarray = (lambda: array([]))(),
    fused_var: ndarray = (lambda: array([]))(),
    fusion_alpha: Optional[float] = None,
    predicted_mean: float = 0.0,
    predicted_var: float = 0.0,
)

Container for inference pipeline outputs.

PlayerInferencePipeline

PlayerInferencePipeline(
    hmm_params: Optional[dict] = None,
    kf_params: Optional[dict] = None,
    hmm_variance_floor: float = 1.0,
    news_params: Optional[dict] = None,
    fusion_mode: str = "precision",
    fusion_params: Optional[dict] = None,
)

Orchestrates HMM + Kalman inference for a single player.

PARAMETER DESCRIPTION
hmm_params

Override HMM parameters: transition_matrix, emission_params, initial_dist.

TYPE: dict DEFAULT: None

kf_params

Override Kalman parameters: Q, R, x0, P0.

TYPE: dict DEFAULT: None

Source code in fplx/inference/pipeline.py
def __init__(
    self,
    hmm_params: Optional[dict] = None,
    kf_params: Optional[dict] = None,
    hmm_variance_floor: float = 1.0,
    news_params: Optional[dict] = None,
    fusion_mode: str = "precision",
    fusion_params: Optional[dict] = None,
):
    hmm_params = hmm_params or {}
    kf_params = kf_params or {}

    self.hmm = HMMInference(
        transition_matrix=hmm_params.get("transition_matrix"),
        emission_params=hmm_params.get("emission_params"),
        initial_dist=hmm_params.get("initial_dist"),
    )
    self.kf = KalmanFilter(
        process_noise=kf_params.get("process_noise", 1.0),
        observation_noise=kf_params.get("observation_noise", 4.0),
        initial_state_mean=kf_params.get("initial_state_mean", 4.0),
        initial_state_covariance=kf_params.get("initial_state_covariance", 2.0),
    )
    self.hmm_variance_floor = max(float(hmm_variance_floor), 1e-6)
    self.news_params = _merge_nested_dicts(DEFAULT_NEWS_PARAMS, news_params or {})
    self.fusion_mode = fusion_mode
    self.fusion_params = _merge_nested_dicts(DEFAULT_FUSION_PARAMS, fusion_params or {})
    if self.fusion_mode not in {"precision", "calibrated_alpha"}:
        raise ValueError(
            f"Unknown fusion_mode '{self.fusion_mode}'. Expected one of: 'precision', 'calibrated_alpha'."
        )

    self.observations: Optional[np.ndarray] = None
    self._result: Optional[InferenceResult] = None

ingest_observations

ingest_observations(points: ndarray)

Set the player's historical points sequence.

PARAMETER DESCRIPTION
points

Weekly points history.

TYPE: (ndarray, shape(T))

Source code in fplx/inference/pipeline.py
def ingest_observations(self, points: np.ndarray):
    """
    Set the player's historical points sequence.

    Parameters
    ----------
    points : np.ndarray, shape (T,)
        Weekly points history.
    """
    self.observations = np.asarray(points, dtype=float)
    self._result = None  # invalidate cached result

inject_news

inject_news(news_signal: dict, timestep: int)

Inject a news signal into the inference at a specific gameweek.

Bridges from existing NewsSignal.generate_signal() output format.

PARAMETER DESCRIPTION
news_signal

Output from NewsSignal.generate_signal(). Must contain: 'availability', 'minutes_risk', 'confidence'.

TYPE: dict

timestep

The gameweek index to apply the perturbation.

TYPE: int

Source code in fplx/inference/pipeline.py
def inject_news(
    self,
    news_signal: dict,
    timestep: int,
):
    """
    Inject a news signal into the inference at a specific gameweek.

    Bridges from existing NewsSignal.generate_signal() output format.

    Parameters
    ----------
    news_signal : dict
        Output from NewsSignal.generate_signal(). Must contain:
        'availability', 'minutes_risk', 'confidence'.
    timestep : int
        The gameweek index to apply the perturbation.
    """
    category = _classify_news(
        news_signal.get("availability", 1.0),
        news_signal.get("minutes_risk", 0.0),
        self.news_params.get("classification_thresholds"),
    )
    confidence = news_signal.get(
        "confidence",
        float(self.news_params.get("default_confidence", 0.6)),
    )

    perturbation_map = self.news_params.get("perturbation_map", DEFAULT_NEWS_PERTURBATION_MAP)
    perturbation = perturbation_map.get(
        category,
        perturbation_map.get("neutral", {"state_boost": {}, "kalman_shock": 1.0}),
    )

    # Inject into HMM
    state_boost = perturbation.get("state_boost", {})
    if state_boost:
        self.hmm.inject_news_perturbation(
            timestep=timestep,
            state_boost=state_boost,
            confidence=confidence,
        )

    # Inject into Kalman
    kalman_shock = float(perturbation.get("kalman_shock", 1.0))
    if kalman_shock != 1.0:
        self.kf.inject_process_shock(
            timestep=timestep,
            multiplier=kalman_shock,
        )

inject_fixture_difficulty

inject_fixture_difficulty(difficulty: float, timestep: int)

Inject fixture difficulty into Kalman observation noise.

PARAMETER DESCRIPTION
difficulty

Fixture difficulty score (1-5, from FixtureSignal).

TYPE: float

timestep

The gameweek index.

TYPE: int

Source code in fplx/inference/pipeline.py
def inject_fixture_difficulty(self, difficulty: float, timestep: int):
    """
    Inject fixture difficulty into Kalman observation noise.

    Parameters
    ----------
    difficulty : float
        Fixture difficulty score (1-5, from FixtureSignal).
    timestep : int
        The gameweek index.
    """
    noise_factor = _difficulty_to_noise_factor(difficulty)
    self.kf.inject_observation_noise(timestep=timestep, factor=noise_factor)

run

run() -> InferenceResult

Run full inference pipeline: HMM + Kalman + Fusion.

RETURNS DESCRIPTION
InferenceResult

All inference outputs.

Source code in fplx/inference/pipeline.py
def run(self) -> InferenceResult:
    """
    Run full inference pipeline: HMM + Kalman + Fusion.

    Returns
    -------
    InferenceResult
        All inference outputs.
    """
    if self.observations is None or len(self.observations) == 0:
        raise RuntimeError("No observations ingested. Call ingest_observations().")

    obs = self.observations

    # HMM
    alpha, _ = self.hmm.forward(obs)
    gamma = self.hmm.forward_backward(obs)
    viterbi_path = self.hmm.viterbi(obs)
    hmm_pred_mean, hmm_pred_var, _ = self.hmm.predict_next(obs)

    # Kalman
    kf_x, kf_P = self.kf.filter(obs)
    kf_pred_mean, kf_pred_var = self.kf.predict_next()

    fusion_alpha = None
    if self.fusion_mode == "calibrated_alpha":
        fusion_alpha = self._estimate_fusion_alpha(obs)
        hmm_seq_mean, hmm_seq_var = self._hmm_sequence_moments(gamma)

        fused_mean = fusion_alpha * kf_x + (1.0 - fusion_alpha) * hmm_seq_mean
        fused_var = fusion_alpha**2 * np.maximum(kf_P, 1e-6) + (1.0 - fusion_alpha) ** 2 * np.maximum(
            hmm_seq_var, self.hmm_variance_floor
        )

        pred_mean = fusion_alpha * kf_pred_mean + (1.0 - fusion_alpha) * hmm_pred_mean
        pred_var = fusion_alpha**2 * max(kf_pred_var, 1e-6) + (1.0 - fusion_alpha) ** 2 * max(
            hmm_pred_var, self.hmm_variance_floor
        )
    else:
        # Fusion (full sequence, smoothed)
        # Apply an HMM variance floor so HMM does not become unrealistically
        # overconfident and dominate precision-weighted fusion.
        emission_params_for_fusion = {
            s: (mu, max(std, np.sqrt(self.hmm_variance_floor)))
            for s, (mu, std) in self.hmm.emission_params.items()
        }
        fused_mean, fused_var = fuse_sequences(gamma, kf_x, kf_P, emission_params_for_fusion)

        # Fused one-step-ahead prediction
        pred_mean, pred_var = fuse_estimates(
            hmm_pred_mean,
            max(hmm_pred_var, self.hmm_variance_floor),
            kf_pred_mean,
            kf_pred_var,
        )

    self._result = InferenceResult(
        filtered_beliefs=alpha,
        smoothed_beliefs=gamma,
        viterbi_path=viterbi_path,
        hmm_predicted_mean=hmm_pred_mean,
        hmm_predicted_var=hmm_pred_var,
        kalman_filtered=kf_x,
        kalman_uncertainty=kf_P,
        kf_predicted_mean=kf_pred_mean,
        kf_predicted_var=kf_pred_var,
        fused_mean=fused_mean,
        fused_var=fused_var,
        fusion_alpha=fusion_alpha,
        predicted_mean=pred_mean,
        predicted_var=pred_var,
    )

    return self._result

predict_next

predict_next() -> tuple[float, float]

Get the fused one-step-ahead forecast.

RETURNS DESCRIPTION
expected_points

TYPE: float

variance

TYPE: float

Source code in fplx/inference/pipeline.py
def predict_next(self) -> tuple[float, float]:
    """
    Get the fused one-step-ahead forecast.

    Returns
    -------
    expected_points : float
    variance : float
    """
    if self._result is None:
        self.run()
    return self._result.predicted_mean, self._result.predicted_var

learn_parameters

learn_parameters(n_iter: int = 20)

Run Baum-Welch to learn HMM parameters from current observations.

Call this before run() if you want data-driven parameters.

Source code in fplx/inference/pipeline.py
def learn_parameters(self, n_iter: int = 20):
    """
    Run Baum-Welch to learn HMM parameters from current observations.

    Call this before run() if you want data-driven parameters.
    """
    if self.observations is None:
        raise RuntimeError("No observations. Call ingest_observations() first.")
    self.hmm.fit(self.observations, n_iter=n_iter)

batch_enriched_predict

batch_enriched_predict(
    players, alpha=0.3, fixture_info=None
)

Run enriched prediction for all players. Returns ep, var, downside_risk dicts.

Source code in fplx/inference/enriched.py
def batch_enriched_predict(players, alpha=0.3, fixture_info=None):
    """Run enriched prediction for all players. Returns ep, var, downside_risk dicts."""
    ep, ev, dr = {}, {}, {}
    for p in players:
        fix = fixture_info.get(p.id) if fixture_info else None
        mu, var, dsr = enriched_predict(p.timeseries, p.position, alpha=alpha, upcoming_fixture=fix)
        ep[p.id] = mu
        ev[p.id] = var
        dr[p.id] = dsr
    return ep, ev, dr

compute_xpoints

compute_xpoints(timeseries, position)

Compute per-GW expected points from ALL underlying components.

Source code in fplx/inference/enriched.py
def compute_xpoints(timeseries, position):
    """Compute per-GW expected points from ALL underlying components."""
    n = len(timeseries)
    if n == 0:
        return np.array([])

    mins = _safe_col(timeseries, "minutes")
    played = mins > 0
    appearance = np.where(mins >= 60, 2.0, np.where(played, 1.0, 0.0))

    xg = _safe_col(timeseries, "xG")
    if np.all(xg == 0):
        xg = _safe_col(timeseries, "goals").astype(float)
    xa = _safe_col(timeseries, "xA")
    if np.all(xa == 0):
        xa = _safe_col(timeseries, "assists").astype(float)

    goal_c = xg * GOAL_PTS.get(position, 4)
    assist_c = xa * ASSIST_PTS
    cs_c = _safe_col(timeseries, "clean_sheets") * CS_PTS.get(position, 0)
    gc_c = np.floor(_safe_col(timeseries, "goals_conceded") / 2.0) * GC_PTS.get(position, 0)
    bonus_c = _safe_col(timeseries, "bonus")

    saves_c = np.zeros(n)
    if position == "GK":
        saves_c = np.floor(_safe_col(timeseries, "saves") / 3.0)

    yc = _safe_col(timeseries, "yellow_cards") * (-1)
    rc = _safe_col(timeseries, "red_cards") * (-3)
    og = _safe_col(timeseries, "own_goals") * (-2)
    pm = _safe_col(timeseries, "penalties_missed") * (-2)
    ps = np.zeros(n)
    if position == "GK":
        ps = _safe_col(timeseries, "penalties_saved") * 5

    return (
        appearance + goal_c + assist_c + cs_c + gc_c + bonus_c + saves_c + yc + rc + og + pm + ps
    ) * played

enriched_predict

enriched_predict(
    timeseries,
    position,
    alpha=0.3,
    lookback=15,
    upcoming_fixture=None,
)

Predict expected points with fixture awareness and semi-variance.

PARAMETER DESCRIPTION
timeseries

TYPE: DataFrame

position

TYPE: str

alpha

EWMA decay.

TYPE: float DEFAULT: 0.3

lookback

Max recent GWs (increased from 10 to 15 for more data).

TYPE: int DEFAULT: 15

upcoming_fixture

{"was_home": bool, "opponent_team": int, "xP": float}

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION
expected_points

TYPE: float

variance

TYPE: float

downside_risk

TYPE: float (semi-deviation below E[P])

Source code in fplx/inference/enriched.py
def enriched_predict(timeseries, position, alpha=0.3, lookback=15, upcoming_fixture=None):
    """
    Predict expected points with fixture awareness and semi-variance.

    Parameters
    ----------
    timeseries : pd.DataFrame
    position : str
    alpha : float
        EWMA decay.
    lookback : int
        Max recent GWs (increased from 10 to 15 for more data).
    upcoming_fixture : dict, optional
        {"was_home": bool, "opponent_team": int, "xP": float}

    Returns
    -------
    expected_points : float
    variance : float
    downside_risk : float  (semi-deviation below E[P])
    """
    if timeseries.empty or "minutes" not in timeseries.columns:
        return 0.0, 4.0, 0.0

    ts = timeseries.tail(lookback).copy()
    mins = _safe_col(ts, "minutes")
    played_mask = mins > 0
    n_played = int(played_mask.sum())

    if n_played < 2:
        return 0.0, 4.0, 0.0

    avail = float(played_mask[-min(3, len(played_mask)) :].mean())
    if avail < 0.1:
        return 0.0, 1.0, 0.0

    # xPoints from all components
    xpts = compute_xpoints(ts, position)
    played_xpts = xpts[played_mask]

    # EWMA on played xPoints
    conditional_ep = max(0.0, _ewma(played_xpts, alpha))

    # Fixture adjustments
    fixture_mult = 1.0
    if upcoming_fixture:
        hf, af = _home_away_factor(timeseries)
        fixture_mult = hf if upcoming_fixture.get("was_home", False) else af
        opp_id = upcoming_fixture.get("opponent_team", 0)
        if opp_id > 0:
            fixture_mult *= _opponent_mult(timeseries, opp_id)
    conditional_ep *= fixture_mult

    # Ensemble with xP
    if upcoming_fixture and upcoming_fixture.get("xP", 0) > 0:
        conditional_ep = 0.7 * conditional_ep + 0.3 * upcoming_fixture["xP"]

    # Variance and semi-variance from residuals
    downside_risk = 0.0
    if "points" in ts.columns:
        pts = _safe_col(ts, "points")
        played_pts = pts[played_mask]
        residuals = played_pts - played_xpts
        var_estimate = float(np.var(residuals)) + 1.0

        # Semi-variance: only negative residuals (actual < expected)
        neg_residuals = residuals[residuals < 0]
        if len(neg_residuals) >= 2:
            downside_risk = float(np.sqrt(np.mean(neg_residuals**2)))
        else:
            downside_risk = float(np.sqrt(var_estimate)) * 0.5
    else:
        var_estimate = 4.0
        downside_risk = 1.0

    ep = conditional_ep * avail
    var_out = avail * var_estimate + avail * (1 - avail) * conditional_ep**2
    dr_out = downside_risk * avail

    return ep, var_out, dr_out

fuse_estimates

fuse_estimates(
    hmm_mean: float,
    hmm_var: float,
    kf_mean: float,
    kf_var: float,
) -> tuple[float, float]

Fuse a single HMM estimate with a single Kalman estimate.

Uses inverse-variance weighting: fused_mean = (hmm_mean/hmm_var + kf_mean/kf_var) / (1/hmm_var + 1/kf_var) fused_var = 1 / (1/hmm_var + 1/kf_var)

PARAMETER DESCRIPTION
hmm_mean

HMM expected points (from state posterior weighted emission means).

TYPE: float

hmm_var

HMM variance (law of total variance over state posterior).

TYPE: float

kf_mean

Kalman filtered point estimate.

TYPE: float

kf_var

Kalman filtered uncertainty (posterior variance).

TYPE: float

RETURNS DESCRIPTION
fused_mean

TYPE: float

fused_var

TYPE: float

Source code in fplx/inference/fusion.py
def fuse_estimates(
    hmm_mean: float,
    hmm_var: float,
    kf_mean: float,
    kf_var: float,
) -> tuple[float, float]:
    """
    Fuse a single HMM estimate with a single Kalman estimate.

    Uses inverse-variance weighting:
        fused_mean = (hmm_mean/hmm_var + kf_mean/kf_var) / (1/hmm_var + 1/kf_var)
        fused_var  = 1 / (1/hmm_var + 1/kf_var)

    Parameters
    ----------
    hmm_mean : float
        HMM expected points (from state posterior weighted emission means).
    hmm_var : float
        HMM variance (law of total variance over state posterior).
    kf_mean : float
        Kalman filtered point estimate.
    kf_var : float
        Kalman filtered uncertainty (posterior variance).

    Returns
    -------
    fused_mean : float
    fused_var : float
    """
    hmm_var = max(hmm_var, 1e-6)
    kf_var = max(kf_var, 1e-6)

    precision_hmm = 1.0 / hmm_var
    precision_kf = 1.0 / kf_var
    total_precision = precision_hmm + precision_kf

    fused_mean = (precision_hmm * hmm_mean + precision_kf * kf_mean) / total_precision
    fused_var = 1.0 / total_precision

    return fused_mean, fused_var

fuse_sequences

fuse_sequences(
    hmm_gamma: ndarray,
    kalman_x: ndarray,
    kalman_P: ndarray,
    emission_params: dict,
) -> tuple[ndarray, ndarray]

Fuse full sequences of HMM posteriors and Kalman estimates.

PARAMETER DESCRIPTION
hmm_gamma

Smoothed state posteriors from HMM.

TYPE: (ndarray, shape(T, N))

kalman_x

Kalman filtered estimates.

TYPE: (ndarray, shape(T))

kalman_P

Kalman filtered uncertainties.

TYPE: (ndarray, shape(T))

emission_params

{state_index: (mean, std)} from HMM.

TYPE: dict

RETURNS DESCRIPTION
fused_mean

TYPE: (ndarray, shape(T))

fused_var

TYPE: (ndarray, shape(T))

Source code in fplx/inference/fusion.py
def fuse_sequences(
    hmm_gamma: np.ndarray,
    kalman_x: np.ndarray,
    kalman_P: np.ndarray,
    emission_params: dict,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Fuse full sequences of HMM posteriors and Kalman estimates.

    Parameters
    ----------
    hmm_gamma : np.ndarray, shape (T, N)
        Smoothed state posteriors from HMM.
    kalman_x : np.ndarray, shape (T,)
        Kalman filtered estimates.
    kalman_P : np.ndarray, shape (T,)
        Kalman filtered uncertainties.
    emission_params : dict
        {state_index: (mean, std)} from HMM.

    Returns
    -------
    fused_mean : np.ndarray, shape (T,)
    fused_var : np.ndarray, shape (T,)
    """
    T = len(kalman_x)
    n_states = hmm_gamma.shape[1]

    state_means = np.array([emission_params[s][0] for s in range(n_states)])
    state_vars = np.array([emission_params[s][1] ** 2 for s in range(n_states)])

    # HMM expected value and variance per timestep
    hmm_mean = hmm_gamma @ state_means
    hmm_var = (
        hmm_gamma @ state_vars
        + hmm_gamma @ (state_means ** 2)
        - hmm_mean ** 2
    )
    hmm_var = np.maximum(hmm_var, 1e-6)
    kalman_P_safe = np.maximum(kalman_P, 1e-6)

    # Inverse-variance weighting
    precision_hmm = 1.0 / hmm_var
    precision_kf = 1.0 / kalman_P_safe
    total_precision = precision_hmm + precision_kf

    fused_mean = (precision_hmm * hmm_mean + precision_kf * kalman_x) / total_precision
    fused_var = 1.0 / total_precision

    return fused_mean, fused_var

build_feature_matrix

build_feature_matrix(
    timeseries: DataFrame, position: str
) -> ndarray

Extract position-specific feature matrix from player timeseries.

PARAMETER DESCRIPTION
timeseries

Player gameweek history from vaastav dataset.

TYPE: DataFrame

position

GK, DEF, MID, or FWD.

TYPE: str

RETURNS DESCRIPTION
np.ndarray, shape (T, D) where D depends on position.
Source code in fplx/inference/multivariate_hmm.py
def build_feature_matrix(timeseries: pd.DataFrame, position: str) -> np.ndarray:
    """
    Extract position-specific feature matrix from player timeseries.

    Parameters
    ----------
    timeseries : pd.DataFrame
        Player gameweek history from vaastav dataset.
    position : str
        GK, DEF, MID, or FWD.

    Returns
    -------
    np.ndarray, shape (T, D) where D depends on position.
    """
    n = len(timeseries)
    features = np.zeros((n, 2))

    mins = _safe_col(timeseries, "minutes")
    features[:, 1] = np.clip(mins / 90.0, 0.0, 1.0)  # mins_frac

    # Domain-specific projection from rich event space to structural xPts.
    features[:, 0] = compute_xpoints(timeseries, position)
    return features