Skip to content

pipeline

pipeline

Per-player inference pipeline orchestrator.

This is the single entry point that FPLModel.fit() calls for each player. It coordinates HMM, Kalman Filter, signal injection, and fusion.

Usage: pipeline = PlayerInferencePipeline() pipeline.ingest_observations(points_array) pipeline.inject_news("Player ruled out for 3 weeks", timestep=20) pipeline.inject_fixture_difficulty(difficulty=4.5, timestep=21) results = pipeline.run() ep_mean, ep_var = pipeline.predict_next()

InferenceResult dataclass

InferenceResult(
    filtered_beliefs: ndarray,
    smoothed_beliefs: ndarray,
    viterbi_path: ndarray,
    hmm_predicted_mean: float = 0.0,
    hmm_predicted_var: float = 0.0,
    kalman_filtered: ndarray = (lambda: array([]))(),
    kalman_uncertainty: ndarray = (lambda: array([]))(),
    kf_predicted_mean: float = 0.0,
    kf_predicted_var: float = 0.0,
    fused_mean: ndarray = (lambda: array([]))(),
    fused_var: ndarray = (lambda: array([]))(),
    fusion_alpha: Optional[float] = None,
    predicted_mean: float = 0.0,
    predicted_var: float = 0.0,
)

Container for inference pipeline outputs.

PlayerInferencePipeline

PlayerInferencePipeline(
    hmm_params: Optional[dict] = None,
    kf_params: Optional[dict] = None,
    hmm_variance_floor: float = 1.0,
    news_params: Optional[dict] = None,
    fusion_mode: str = "precision",
    fusion_params: Optional[dict] = None,
)

Orchestrates HMM + Kalman inference for a single player.

PARAMETER DESCRIPTION
hmm_params

Override HMM parameters: transition_matrix, emission_params, initial_dist.

TYPE: dict DEFAULT: None

kf_params

Override Kalman parameters: Q, R, x0, P0.

TYPE: dict DEFAULT: None

Source code in fplx/inference/pipeline.py
def __init__(
    self,
    hmm_params: Optional[dict] = None,
    kf_params: Optional[dict] = None,
    hmm_variance_floor: float = 1.0,
    news_params: Optional[dict] = None,
    fusion_mode: str = "precision",
    fusion_params: Optional[dict] = None,
):
    hmm_params = hmm_params or {}
    kf_params = kf_params or {}

    self.hmm = HMMInference(
        transition_matrix=hmm_params.get("transition_matrix"),
        emission_params=hmm_params.get("emission_params"),
        initial_dist=hmm_params.get("initial_dist"),
    )
    self.kf = KalmanFilter(
        process_noise=kf_params.get("process_noise", 1.0),
        observation_noise=kf_params.get("observation_noise", 4.0),
        initial_state_mean=kf_params.get("initial_state_mean", 4.0),
        initial_state_covariance=kf_params.get("initial_state_covariance", 2.0),
    )
    self.hmm_variance_floor = max(float(hmm_variance_floor), 1e-6)
    self.news_params = _merge_nested_dicts(DEFAULT_NEWS_PARAMS, news_params or {})
    self.fusion_mode = fusion_mode
    self.fusion_params = _merge_nested_dicts(DEFAULT_FUSION_PARAMS, fusion_params or {})
    if self.fusion_mode not in {"precision", "calibrated_alpha"}:
        raise ValueError(
            f"Unknown fusion_mode '{self.fusion_mode}'. Expected one of: 'precision', 'calibrated_alpha'."
        )

    self.observations: Optional[np.ndarray] = None
    self._result: Optional[InferenceResult] = None

ingest_observations

ingest_observations(points: ndarray)

Set the player's historical points sequence.

PARAMETER DESCRIPTION
points

Weekly points history.

TYPE: (ndarray, shape(T))

Source code in fplx/inference/pipeline.py
def ingest_observations(self, points: np.ndarray):
    """
    Set the player's historical points sequence.

    Parameters
    ----------
    points : np.ndarray, shape (T,)
        Weekly points history.
    """
    self.observations = np.asarray(points, dtype=float)
    self._result = None  # invalidate cached result

inject_news

inject_news(news_signal: dict, timestep: int)

Inject a news signal into the inference at a specific gameweek.

Bridges from existing NewsSignal.generate_signal() output format.

PARAMETER DESCRIPTION
news_signal

Output from NewsSignal.generate_signal(). Must contain: 'availability', 'minutes_risk', 'confidence'.

TYPE: dict

timestep

The gameweek index to apply the perturbation.

TYPE: int

Source code in fplx/inference/pipeline.py
def inject_news(
    self,
    news_signal: dict,
    timestep: int,
):
    """
    Inject a news signal into the inference at a specific gameweek.

    Bridges from existing NewsSignal.generate_signal() output format.

    Parameters
    ----------
    news_signal : dict
        Output from NewsSignal.generate_signal(). Must contain:
        'availability', 'minutes_risk', 'confidence'.
    timestep : int
        The gameweek index to apply the perturbation.
    """
    category = _classify_news(
        news_signal.get("availability", 1.0),
        news_signal.get("minutes_risk", 0.0),
        self.news_params.get("classification_thresholds"),
    )
    confidence = news_signal.get(
        "confidence",
        float(self.news_params.get("default_confidence", 0.6)),
    )

    perturbation_map = self.news_params.get("perturbation_map", DEFAULT_NEWS_PERTURBATION_MAP)
    perturbation = perturbation_map.get(
        category,
        perturbation_map.get("neutral", {"state_boost": {}, "kalman_shock": 1.0}),
    )

    # Inject into HMM
    state_boost = perturbation.get("state_boost", {})
    if state_boost:
        self.hmm.inject_news_perturbation(
            timestep=timestep,
            state_boost=state_boost,
            confidence=confidence,
        )

    # Inject into Kalman
    kalman_shock = float(perturbation.get("kalman_shock", 1.0))
    if kalman_shock != 1.0:
        self.kf.inject_process_shock(
            timestep=timestep,
            multiplier=kalman_shock,
        )

inject_fixture_difficulty

inject_fixture_difficulty(difficulty: float, timestep: int)

Inject fixture difficulty into Kalman observation noise.

PARAMETER DESCRIPTION
difficulty

Fixture difficulty score (1-5, from FixtureSignal).

TYPE: float

timestep

The gameweek index.

TYPE: int

Source code in fplx/inference/pipeline.py
def inject_fixture_difficulty(self, difficulty: float, timestep: int):
    """
    Inject fixture difficulty into Kalman observation noise.

    Parameters
    ----------
    difficulty : float
        Fixture difficulty score (1-5, from FixtureSignal).
    timestep : int
        The gameweek index.
    """
    noise_factor = _difficulty_to_noise_factor(difficulty)
    self.kf.inject_observation_noise(timestep=timestep, factor=noise_factor)

run

run() -> InferenceResult

Run full inference pipeline: HMM + Kalman + Fusion.

RETURNS DESCRIPTION
InferenceResult

All inference outputs.

Source code in fplx/inference/pipeline.py
def run(self) -> InferenceResult:
    """
    Run full inference pipeline: HMM + Kalman + Fusion.

    Returns
    -------
    InferenceResult
        All inference outputs.
    """
    if self.observations is None or len(self.observations) == 0:
        raise RuntimeError("No observations ingested. Call ingest_observations().")

    obs = self.observations

    # HMM
    alpha, _ = self.hmm.forward(obs)
    gamma = self.hmm.forward_backward(obs)
    viterbi_path = self.hmm.viterbi(obs)
    hmm_pred_mean, hmm_pred_var, _ = self.hmm.predict_next(obs)

    # Kalman
    kf_x, kf_P = self.kf.filter(obs)
    kf_pred_mean, kf_pred_var = self.kf.predict_next()

    fusion_alpha = None
    if self.fusion_mode == "calibrated_alpha":
        fusion_alpha = self._estimate_fusion_alpha(obs)
        hmm_seq_mean, hmm_seq_var = self._hmm_sequence_moments(gamma)

        fused_mean = fusion_alpha * kf_x + (1.0 - fusion_alpha) * hmm_seq_mean
        fused_var = fusion_alpha**2 * np.maximum(kf_P, 1e-6) + (1.0 - fusion_alpha) ** 2 * np.maximum(
            hmm_seq_var, self.hmm_variance_floor
        )

        pred_mean = fusion_alpha * kf_pred_mean + (1.0 - fusion_alpha) * hmm_pred_mean
        pred_var = fusion_alpha**2 * max(kf_pred_var, 1e-6) + (1.0 - fusion_alpha) ** 2 * max(
            hmm_pred_var, self.hmm_variance_floor
        )
    else:
        # Fusion (full sequence, smoothed)
        # Apply an HMM variance floor so HMM does not become unrealistically
        # overconfident and dominate precision-weighted fusion.
        emission_params_for_fusion = {
            s: (mu, max(std, np.sqrt(self.hmm_variance_floor)))
            for s, (mu, std) in self.hmm.emission_params.items()
        }
        fused_mean, fused_var = fuse_sequences(gamma, kf_x, kf_P, emission_params_for_fusion)

        # Fused one-step-ahead prediction
        pred_mean, pred_var = fuse_estimates(
            hmm_pred_mean,
            max(hmm_pred_var, self.hmm_variance_floor),
            kf_pred_mean,
            kf_pred_var,
        )

    self._result = InferenceResult(
        filtered_beliefs=alpha,
        smoothed_beliefs=gamma,
        viterbi_path=viterbi_path,
        hmm_predicted_mean=hmm_pred_mean,
        hmm_predicted_var=hmm_pred_var,
        kalman_filtered=kf_x,
        kalman_uncertainty=kf_P,
        kf_predicted_mean=kf_pred_mean,
        kf_predicted_var=kf_pred_var,
        fused_mean=fused_mean,
        fused_var=fused_var,
        fusion_alpha=fusion_alpha,
        predicted_mean=pred_mean,
        predicted_var=pred_var,
    )

    return self._result

predict_next

predict_next() -> tuple[float, float]

Get the fused one-step-ahead forecast.

RETURNS DESCRIPTION
expected_points

TYPE: float

variance

TYPE: float

Source code in fplx/inference/pipeline.py
def predict_next(self) -> tuple[float, float]:
    """
    Get the fused one-step-ahead forecast.

    Returns
    -------
    expected_points : float
    variance : float
    """
    if self._result is None:
        self.run()
    return self._result.predicted_mean, self._result.predicted_var

learn_parameters

learn_parameters(n_iter: int = 20)

Run Baum-Welch to learn HMM parameters from current observations.

Call this before run() if you want data-driven parameters.

Source code in fplx/inference/pipeline.py
def learn_parameters(self, n_iter: int = 20):
    """
    Run Baum-Welch to learn HMM parameters from current observations.

    Call this before run() if you want data-driven parameters.
    """
    if self.observations is None:
        raise RuntimeError("No observations. Call ingest_observations() first.")
    self.hmm.fit(self.observations, n_iter=n_iter)