fastcashflow.statemodel의 소스 코드

"""The in-force state machine -- a product's states and transitions as data.

Phase (b) generalises the in-force projection from a single survival track to
an N-state Markov occupancy model. In-force is an occupancy vector ``occ`` over
a small set of transient states; each month a transition matrix advances it,
``occ[t+1] = occ[t] @ P[t]``. The kernels -- ``projection._project_kernel``,
the engine's codegen fast kernel and the CUDA kernel -- run that recursion on a flat
edge list and are state-machine-agnostic: they carry no hardcoded state set.

This module is the product-facing layer. A :class:`StateModel` declares the
states, their transitions and which states pay premium or a benefit, all as
data. States can *be* data -- rather than a per-product DSL -- because the
occupancy recursion treats every state identically; there is no per-state
engine logic. (Coverage ``type``, by contrast, needs per-type logic and so
stays a fixed vocabulary.)

:func:`compile_state_model` turns a :class:`StateModel` plus the evaluated
assumption rates into the flat edge arrays the kernels consume.

The transition probabilities follow the standard ordered multiple-decrement
model. A state's transitions are applied IN ORDER as competing decrements:
transition ``i`` fires, among the entrants to the state, with the dependent
probability ``rate_i * prod_{j<i}(1 - rate_j)`` -- it acts on the survivors of
every earlier transition. The residual ``prod_j(1 - rate_j)`` stays in the
state. A transition either moves occupancy to another transient state (waiver
inception: active -> waiver; recovery: disabled -> active) or removes it from
the in-force set entirely (death, lapse). The fulfilment cash flows reflect
the contract's actual terms at the measurement date (IFRS 17 Sec. 33-34).
"""
from __future__ import annotations

from collections.abc import Mapping
from dataclasses import dataclass
from types import MappingProxyType

import numpy as np

from fastcashflow._typing import FloatArray, IntArray


@dataclass(frozen=True, slots=True)
class CompiledStateModel:
    """Flat edge-tensor view of a compiled :class:`StateModel`.

    Returned by both :func:`compile_state_model` (Markov) and
    :func:`compile_state_model_with_duration` (semi-Markov). The two
    paths share this shape; ``state_duration_max`` distinguishes them --
    ``None`` from the Markov compile, an ``(n_states,)`` int array from
    the semi-Markov compile (per-state cohort count, 1 for untracked
    states).

    Fields
    ------
    edge_from, edge_to
        ``(n_edges,)`` int arrays of source and destination state indices.
    edge_prob
        Transition probabilities. Markov: ``(n_edges, *grid)``.
        Semi-Markov: ``(n_edges, *grid, max_D)`` with cohort axis last.
    edge_lump_sum
        ``(n_edges,)`` bool, the lump-sum transitions.
    n_states
        The number of transient states.
    premium_state, benefit_state
        ``(n_states,)`` bool flags.
    state_duration_max
        ``None`` for Markov; ``(n_states,)`` int with the effective cohort
        count per state for semi-Markov.
    benefit_max_months
        ``None`` for Markov; ``(n_states,)`` int with the per-state monthly
        benefit cap (``0`` = unbounded). Sojourn cohorts ``tau >= cap`` stop
        being paid while the lives stay in force.
    """
    edge_from: IntArray
    edge_to: IntArray
    edge_prob: FloatArray
    edge_lump_sum: np.ndarray
    n_states: int
    premium_state: np.ndarray
    benefit_state: np.ndarray
    state_duration_max: IntArray | None = None
    benefit_max_months: IntArray | None = None
    # Per-state exact in-force death-exit probability -- ``survive x mortality``,
    # where ``survive`` is the product of ``(1 - rate)`` over the transitions
    # listed before mortality in the state. The deaths reporter multiplies it by
    # the occupancy, so the death count respects the within-month competing-risk
    # order (it equals the raw rate exactly when mortality is the first
    # transition, which every bundled model declares). ``(n_states, *grid)``.
    state_death_exit: FloatArray | None = None
    # Per-state death-benefit multiplier (``State.death_benefit_factor``),
    # ``(n_states,)`` float, all-ones default. Occupancy-weighted into the
    # aggregate death claim: ``claim = (sum_s occ[s]*factor[s]) * claim_rate``.
    state_death_benefit_factor: FloatArray | None = None
    # Per-state sojourn exit (``State.exit_after``), ``(n_states,)`` int, zeros
    # default. Semi-Markov only: a cohort reaching this sojourn leaves the
    # in-force set (no occ_next write). ``None`` for Markov models.
    state_exit_after: IntArray | None = None


[문서] @dataclass(frozen=True, slots=True) class Transition: """One transition out of a state. ``rate`` names an assumption rate -- ``"mortality"``, ``"lapse"``, ``"waiver_incidence"`` and so on -- evaluated by the engine and supplied to :func:`compile_state_model`. ``to`` is the destination state's name when the transition moves occupancy to another transient state (waiver inception, recovery, reincidence), or ``None`` when it removes occupancy from the in-force set entirely (death, lapse). ``lump_sum`` flags a transition that pays a one-off benefit when it fires -- the ``ModelPoints.disability_benefit`` amount times the transitioning occupancy. It applies only to a transition with a destination; death and diagnosis lump sums stay on the coverage list. ``duration_dependent`` flags a semi-Markov transition: the rate depends on the **sojourn time** in the source state (time since entering it), not just on the policy duration. The source state must have ``duration_max > 0`` -- the engine tracks per-cohort occupancy there. The rate function for a duration-dependent transition takes a fourth argument ``state_duration`` (months in source state). """ rate: str to: str | None = None lump_sum: bool = False duration_dependent: bool = False
[문서] @dataclass(frozen=True, slots=True) class State: """One transient state of the in-force model. ``premium`` flags a premium-paying state -- the level and single premium accrue on the occupancy of the states so flagged. ``benefit`` flags a benefit-paying state -- the ``ModelPoints.disability_income`` amount is paid each month its occupancy is held (disability income on a disabled state). ``transitions`` are the transitions out of the state, held in application order: the competing-decrement convention (see the module docstring) applies each in turn to the survivors of the previous. ``duration_max`` switches the state to a **semi-Markov** model. When set to ``D > 0``, the engine tracks ``D`` monthly cohorts of in-force in this state (cohort 0 entered this month, cohort 1 entered last month, and cohort ``D - 1`` absorbs everyone who has been here ``D - 1`` months or longer). Transitions with ``duration_dependent=True`` then receive a cohort index and may carry different rates per cohort -- the natural way to express recovery, reincidence, exclusion (면책) periods, and other duration-since-entry effects. The default ``0`` keeps the state Markov (a single cohort, identical to the pre-Phase-(c) behaviour). ``mortality_rate`` routes this state's in-force death decrement to a named rate (default ``"mortality"``, the global decrement). A post-diagnosis state can carry an elevated death rate by naming a different rate, supplied via ``Basis.state_mortality_annual``. ``benefit_max_months`` caps how many months a ``benefit`` state pays (``0`` = unbounded); see the field comment in ``__post_init__``. ``death_benefit_factor`` scales the death-coverage benefit paid for the lives residing in this state (default ``1.0`` = no change). The aggregate death claim is occupancy-weighted: ``claim = (sum_s occ[s]*factor[s]) * claim_rate``. It multiplies the benefit AMOUNT, not the decrement, so the death count is unchanged. A post-diagnosis state paying a richer death benefit (e.g. 2x after a cancer diagnosis) sets ``death_benefit_factor=2.0``. Supported on the full path only (``measure(full=True)``); the fast path and the VFA path reject a non-default factor. ``exit_after`` removes occupancy from the in-force set once a cohort's sojourn in this state reaches ``exit_after`` months (default ``0`` = never). Semi-Markov only -- it needs sojourn tracking, so the state must set ``duration_max``. Distinct from ``benefit_max_months`` (which stops the payment but keeps the lives in force): ``exit_after`` ends the cover. A guaranteed-payout state that pays a fixed term and then lapses sets ``benefit_max_months`` (pay window) and ``exit_after`` (cover end) together. """ name: str premium: bool = False benefit: bool = False transitions: tuple[Transition, ...] = () duration_max: int = 0 benefit_max_months: int = 0 mortality_rate: str = "mortality" death_benefit_factor: float = 1.0 exit_after: int = 0 def __post_init__(self) -> None: object.__setattr__(self, "transitions", tuple(self.transitions)) object.__setattr__(self, "duration_max", int(self.duration_max)) if self.duration_max < 0: raise ValueError( f"state {self.name!r}: duration_max must be non-negative, " f"got {self.duration_max}" ) # ``benefit_max_months`` caps how many months a benefit state pays -- # the monthly ``disability_income`` stops once a cohort's sojourn # reaches the cap, while the lives stay in force (a guaranteed-payout # LTC / dementia annuity: pay 36 months, then keep cover with no # payment). ``0`` means unbounded (the historical behaviour). cap = int(self.benefit_max_months) object.__setattr__(self, "benefit_max_months", cap) if cap < 0: raise ValueError( f"state {self.name!r}: benefit_max_months must be " f"non-negative, got {cap}" ) if cap > 0: if not self.benefit: raise ValueError( f"state {self.name!r}: benefit_max_months > 0 requires " f"benefit=True (a cap on a non-paying state has no effect)" ) # Strict ``duration_max > cap``: the absorbing cohort (D-1) holds # everyone with sojourn >= D-1, so if ``cap == duration_max`` the # capped lives pile into a cohort still inside ``tau < cap`` and # keep being paid forever. One guard cohort past the cap is needed. if self.duration_max <= cap: raise ValueError( f"state {self.name!r}: duration_max ({self.duration_max}) " f"must exceed benefit_max_months ({cap}); otherwise the " f"absorbing cohort re-accumulates capped lives and keeps " f"paying. Set duration_max > benefit_max_months." ) # ``death_benefit_factor`` multiplies the death-coverage benefit amount # for lives in this state. Non-negative; defaults 1.0 (no change). factor = float(self.death_benefit_factor) object.__setattr__(self, "death_benefit_factor", factor) if factor < 0.0: raise ValueError( f"state {self.name!r}: death_benefit_factor must be " f"non-negative, got {factor}" ) # ``exit_after`` drops the cohort from the in-force set at the given # sojourn. Semi-Markov only -- needs duration tracking; and strictly # below the absorbing cohort, mirroring the benefit_max_months guard: # at exit_after == duration_max the absorbing advance maxes next_tau at # D-1 < exit_after, so the gate never fires (silent hold-forever). exit_after = int(self.exit_after) object.__setattr__(self, "exit_after", exit_after) if exit_after < 0: raise ValueError( f"state {self.name!r}: exit_after must be non-negative, " f"got {exit_after}" ) if exit_after > 0: if self.duration_max <= exit_after: raise ValueError( f"state {self.name!r}: duration_max ({self.duration_max}) " f"must exceed exit_after ({exit_after}); otherwise the " f"absorbing cohort never reaches the exit boundary and the " f"cohort is held forever. Set duration_max > exit_after " f"(semi-Markov: exit_after needs duration tracking)." ) if cap > 0 and exit_after < cap: raise ValueError( f"state {self.name!r}: exit_after ({exit_after}) must be " f">= benefit_max_months ({cap}); pay the cap, then exit. " f"The valid stack is duration_max > exit_after >= " f"benefit_max_months." )
[문서] @dataclass(frozen=True, slots=True) class StateModel: """A product's in-force state machine, declared as data. ``states`` are the transient states; position fixes the kernel state index, and state 0 is the issue state. ``seating`` maps a model point's input contract state -- the ``ModelPoints.state`` code (``STATE_ACTIVE``, ``STATE_WAIVER``, ``STATE_PAIDUP``) -- to the index of the state its in-force is seated on at the valuation date: ``seating[code]`` is that index. It defaults to seating every model point on state 0. The occupancy recursion treats every state identically, so an arbitrary StateModel runs on the existing kernels with no per-product code -- see the module docstring and :func:`compile_state_model`. """ states: tuple[State, ...] seating: tuple[int, ...] = (0,) def __post_init__(self) -> None: states = tuple(self.states) object.__setattr__(self, "states", states) object.__setattr__(self, "seating", tuple(int(s) for s in self.seating)) if not states: raise ValueError("a StateModel needs at least one state") names = {s.name for s in states} if len(names) != len(states): raise ValueError("state names must be unique") for s in states: for tr in s.transitions: if tr.to is not None and tr.to not in names: raise ValueError( f"state {s.name!r} has a transition to an unknown " f"state {tr.to!r}" ) if tr.lump_sum and tr.to is None: raise ValueError( f"state {s.name!r} has a lump-sum transition with no " f"destination; a lump sum attaches to a transition" ) if tr.duration_dependent and s.duration_max <= 0: raise ValueError( f"state {s.name!r} has a duration_dependent " f"transition {tr.rate!r} but its duration_max is 0; " f"set duration_max > 0 to track cohorts" ) if any(not 0 <= i < len(states) for i in self.seating): raise ValueError( f"seating index out of range for a {len(states)}-state model" ) @property def n_states(self) -> int: """Number of transient states.""" return len(self.states)
# The default in-force model -- two transient states. ``active`` pays premium # and is subject to mortality, waiver inception and lapse; ``waiver`` (premium # waived on a triggering event) keeps the coverage in force, pays no premium # and is subject to mortality alone -- it does not lapse. The waiver-inception # transition moves active in-force onto the waiver state. ``seating`` seats # STATE_ACTIVE (code 0) on the active state and both STATE_WAIVER (1) and # STATE_PAIDUP (2) on the waiver state: a paid-up contract and a waiver # contract have identical cash flows, differing only in the cause premiums # ceased. WAIVER_MODEL = StateModel( states=( State("active", premium=True, transitions=( Transition("mortality"), Transition("waiver_incidence", to="waiver"), Transition("lapse"), )), State("waiver", premium=False, transitions=( Transition("mortality"), )), ), seating=(0, 1, 1), ) # Three-state variant -- active / waiver / paid-up as *separate* states. # Unlike WAIVER_MODEL (which seats paid-up onto the waiver state, giving the # two identical cash flows), this model keeps paid-up distinct so it can carry # its own lapse: the paid-up state references the ``lapse_paidup`` rate # (Basis.lapse_paidup_annual, falling back to lapse_annual). The Korean # post-payment (납입후) lapse jump is the motivating case -- a contract that # has finished paying premium typically surrenders at a different rate than a # premium-paying active. Paid-up still has no premium and is exposed to # mortality + its own lapse; there is no waiver-inception out of paid-up (you # cannot waive a premium you no longer pay). ``seating`` seats STATE_ACTIVE on # active (0), STATE_WAIVER on waiver (1) and STATE_PAIDUP on paid-up (2). # There is no active -> paid-up transition: paid-up contracts are seated on # the paid-up state at the valuation date (an in-force valuation of the # 납입후 cohort), since premium cessation is a ``premium_term_months`` control, # not a modelled transition. WAIVER_PAIDUP_MODEL = StateModel( states=( State("active", premium=True, transitions=( Transition("mortality"), Transition("waiver_incidence", to="waiver"), Transition("lapse"), )), State("waiver", premium=False, transitions=( Transition("mortality"), )), State("paidup", premium=False, transitions=( Transition("mortality"), Transition("lapse_paidup"), )), ), seating=(0, 1, 2), ) # Named registry of bundled StateModels. A non-programmer actuary can pick a # topology by name -- in the ``segments`` sheet's ``state_model`` column, in # Python via ``STATE_MODELS["WAIVER"]``, or anywhere else a string label is a # natural input. Additions land here as fixed-vocabulary entries -- the same # convention as the coverage CalculationMethods; users with a topology outside # the registry still build their own ``StateModel`` in code. # # Exposed through a read-only mapping so a stray ``STATE_MODELS["WAIVER"] = # my_custom_model`` from user / plugin code cannot silently swap the # bundled topology process-wide (which would change every later segment that # resolves "WAIVER" by name). Lookup (``STATE_MODELS["WAIVER"]``) and # iteration (``sorted(STATE_MODELS)``) work as before; mutation raises # ``TypeError``. STATE_MODELS: Mapping[str, StateModel] = MappingProxyType({ "WAIVER": WAIVER_MODEL, "WAIVER_PAIDUP": WAIVER_PAIDUP_MODEL, }) def model_references_rate(model: StateModel, rate_name: str) -> bool: """Return True if any transition in the model references ``rate_name``. The engine builds the rate dict it hands to :func:`compile_state_model` from the rates the resolved model actually references, so an optional rate (e.g. ``lapse_paidup``) is built only when the topology in play uses it. """ return any(tr.rate == rate_name for s in model.states for tr in s.transitions) def is_semi_markov(model: StateModel) -> bool: """Return True if any state in the model tracks duration cohorts. A semi-Markov state has ``duration_max > 0`` and tracks per-cohort occupancy; its outgoing transitions may then be ``duration_dependent``. A model with no such state is pure Markov and runs through the original :func:`compile_state_model` path. """ return any(s.duration_max > 0 for s in model.states) def resolve_state_model(basis) -> "StateModel": """Return the StateModel driving the projection for these basis. Uses the caller-supplied ``basis.state_model`` when set, and falls back to the bundled :data:`WAIVER_MODEL` -- the most common Korean protection topology, active / waiver / paid-up. Centralising the fallback keeps the engine and the projection layer from drifting. """ return basis.state_model or WAIVER_MODEL def compile_state_model( model: StateModel, rates: dict[str, FloatArray] ) -> CompiledStateModel: """Compile a StateModel and its rates into the kernel edge arrays. ``rates`` maps each rate name a transition references to its evaluated array; the arrays broadcast to a common grid shape -- the kernels index its trailing axes (per model point, or per sex / age / duration). Returns a :class:`CompiledStateModel` with ``state_duration_max=None``. Each state contributes one edge per transition with a transient destination -- carrying that transition's dependent probability -- plus one stay-in-state edge carrying the residual (see the module docstring). A transition that exits the in-force set contributes no edge: its occupancy simply leaves the recursion. This function is **Markov-only**: it raises ``ValueError`` if the model has any state with ``duration_max > 0``. Use :func:`compile_state_model_with_duration` for semi-Markov models. """ if is_semi_markov(model): raise ValueError( "compile_state_model is Markov-only; use " "compile_state_model_with_duration for a model with " "duration-tracked states" ) for s in model.states: if s.exit_after > 0: raise ValueError( f"state {s.name!r}: exit_after needs sojourn tracking and is " f"semi-Markov only; set duration_max to switch the state to a " f"semi-Markov model" ) arrays = {name: np.asarray(arr, dtype=np.float64) for name, arr in rates.items()} if not arrays: raise ValueError("compile_state_model needs at least one rate array") grid = np.broadcast_shapes(*(a.shape for a in arrays.values())) index = {s.name: i for i, s in enumerate(model.states)} edge_from: list[int] = [] edge_to: list[int] = [] edge_prob: list[FloatArray] = [] edge_lump: list[bool] = [] death_exit_rows: list[FloatArray] = [] for i, state in enumerate(model.states): # ``survive`` accumulates prod_{j}(1 - rate_j) across the transitions # applied so far; a leaving transition fires on those survivors. survive = np.ones(grid) death_exit = np.zeros(grid) # exact death exit for the deaths reporter for tr in state.transitions: # A state's mortality decrement is routed to its own rate name # (State.mortality_rate, default "mortality") so a post-diagnosis # state can carry an elevated mortality without re-declaring the # transition. Any other rate name passes through unchanged. rname = (state.mortality_rate if tr.rate == "mortality" else tr.rate) try: rate = arrays[rname] except KeyError: raise ValueError( f"state {state.name!r} references rate {rname!r}, " f"which was not supplied to compile_state_model" ) from None if tr.rate == "mortality": # The death exit fires on whoever survived the earlier # transitions this month -- ``survive`` here is that product. death_exit = survive * rate if tr.to is not None: edge_from.append(i) edge_to.append(index[tr.to]) edge_prob.append(survive * rate) edge_lump.append(tr.lump_sum) survive = survive * (1.0 - rate) edge_from.append(i) # the residual stays in the state edge_to.append(i) edge_prob.append(survive) edge_lump.append(False) death_exit_rows.append(death_exit) return CompiledStateModel( edge_from=np.array(edge_from, dtype=np.int64), edge_to=np.array(edge_to, dtype=np.int64), edge_prob=np.ascontiguousarray(np.stack(edge_prob)), edge_lump_sum=np.array(edge_lump, dtype=np.bool_), n_states=len(model.states), premium_state=np.array([s.premium for s in model.states], dtype=np.bool_), benefit_state=np.array([s.benefit for s in model.states], dtype=np.bool_), state_duration_max=None, state_death_exit=np.ascontiguousarray(np.stack(death_exit_rows)), state_death_benefit_factor=np.array( [s.death_benefit_factor for s in model.states], dtype=np.float64), state_exit_after=None, ) def compile_state_model_with_duration( model: StateModel, rates: dict[str, FloatArray] ) -> CompiledStateModel: """Compile a semi-Markov StateModel into duration-aware kernel arrays. The cohort-aware counterpart of :func:`compile_state_model`. States declared with ``duration_max > 0`` are tracked by monthly cohort: the occupancy is a length-``duration_max`` vector indexed by sojourn time (months since entering the state, with the last cohort absorbing everyone who has been there at least ``duration_max - 1`` months). Transitions marked ``duration_dependent=True`` may then carry different rates per cohort. ``rates`` carries one array per rate name referenced by the model's transitions. Static (non-duration-dependent) rates broadcast to the ``grid`` shape -- the same convention as the Markov path. A duration- dependent rate has an extra trailing axis of length ``duration_max`` for the source state (cohort axis). Returns a :class:`CompiledStateModel`: * ``edge_from`` / ``edge_to`` -- ``(n_edges,)`` state indices. ``edge_to == edge_from`` marks the residual stay edge (cohort advances by one). * ``edge_prob`` -- ``(n_edges, *grid, max_D)`` where ``max_D`` is the max ``duration_max`` across states (1 if no state is tracked). The tau axis carries the cohort index for the source state; for an edge out of an untracked state only ``tau = 0`` is meaningful. * ``edge_lump_sum`` -- ``(n_edges,)`` bool, the lump-sum transitions. * ``n_states`` -- the number of transient states. * ``premium_state`` / ``benefit_state`` -- ``(n_states,)`` bool. * ``state_duration_max`` -- ``(n_states,)`` int. The effective cohort count per state (``max(s.duration_max, 1)``). Untracked states have value 1; tracked states have the declared ``duration_max``. """ arrays = {name: np.asarray(arr, dtype=np.float64) for name, arr in rates.items()} if not arrays: raise ValueError( "compile_state_model_with_duration needs at least one rate array" ) index = {s.name: i for i, s in enumerate(model.states)} # Effective cohort count per state: untracked -> 1, tracked -> duration_max. state_duration_max = np.array( [max(s.duration_max, 1) for s in model.states], dtype=np.int64, ) max_D = int(state_duration_max.max()) # The grid (sex, age, year, ...) is the broadcast of the static-rate # shapes -- the duration-dependent rates share that grid with an extra # trailing cohort axis. Inferring it from the *static* rates avoids # baking the cohort axis into the grid. static_shapes = [] for name, arr in arrays.items(): any_dyn = any(tr.rate == name and tr.duration_dependent for s in model.states for tr in s.transitions) if any_dyn: static_shapes.append(arr.shape[:-1]) else: static_shapes.append(arr.shape) grid = np.broadcast_shapes(*static_shapes) grid_ndim = len(grid) def rate_at(rate_name: str, src_state: State, tau: int) -> FloatArray: """Evaluate a rate at cohort ``tau`` of the source state. For a non-duration-dependent transition the array is returned as is. For a duration-dependent one the tau-th slice of the trailing cohort axis is taken. """ arr = arrays[rate_name] # Determine whether *this* transition reads the rate as dynamic; # the same rate name may be referenced by both kinds across states. # We pass the source state to look up the transition's flag. for tr in src_state.transitions: if tr.rate == rate_name: if tr.duration_dependent: if arr.ndim != grid_ndim + 1: raise ValueError( f"rate {rate_name!r} is duration_dependent in " f"state {src_state.name!r} but its array shape " f"{arr.shape} has no cohort axis" ) if arr.shape[-1] < src_state.duration_max: raise ValueError( f"rate {rate_name!r} cohort axis " f"{arr.shape[-1]} shorter than state " f"{src_state.name!r} duration_max " f"{src_state.duration_max}" ) return arr[..., tau] return arr # Reached when ``rate_name`` is supplied for this state but is not the # rate of any of its transitions (e.g. a state's own mortality routed # by name without a matching transition row); the un-indexed array is # the correct static (non-cohort) fallback. NOT an invariant break -- # a review flagged this as "unreachable", but several models reach it. return arr edge_from: list[int] = [] edge_to: list[int] = [] edge_prob_blocks: list[FloatArray] = [] # one (max_D, *grid) per edge edge_lump: list[bool] = [] death_exit_rows: list[FloatArray] = [] # per-state exact death exit (cohort 0) for i, state in enumerate(model.states): # Validate this state's transitions reference rates we have. A # "mortality" transition routes to the state's own mortality rate # name (State.mortality_rate), so validate the effective name. for tr in state.transitions: rname = (state.mortality_rate if tr.rate == "mortality" else tr.rate) if rname not in arrays: raise ValueError( f"state {state.name!r} references rate {rname!r}, " f"which was not supplied to " f"compile_state_model_with_duration" ) D = max(state.duration_max, 1) # Edges produced by this state are emitted in declaration order: # first the transient transitions (one per Transition with `to`), # then the residual stay edge. We collect their per-edge cohort # blocks here and pad to max_D below. out_edges_to: list[int] = [] out_edges_lump: list[bool] = [] out_edges_blocks: list[np.ndarray] = [] # each shape (D, *grid) # Compose one cohort at a time. For each cohort tau, run the # ordered competing-decrement composition using rate values that # depend on tau when the transition is duration_dependent. # We accumulate per-edge probabilities along the tau axis. per_edge_per_tau: list[list[np.ndarray]] = [ [] for _ in range(len([tr for tr in state.transitions if tr.to is not None])) ] res_per_tau: list[np.ndarray] = [] death_exit = np.zeros(grid) # exact death exit (cohort 0) for the reporter for tau in range(D): survive = np.ones(grid) transient_idx = 0 for tr in state.transitions: rname = (state.mortality_rate if tr.rate == "mortality" else tr.rate) r = rate_at(rname, state, tau) if tr.rate == "mortality" and tau == 0: # The deaths reporter sums occupancy over cohorts and reads a # single per-state rate, so the death exit is taken at cohort # 0; exact whenever the pre-mortality transitions are # cohort-independent (every bundled model) or mortality is # the state's first transition. death_exit = survive * r if tr.to is not None: prob = survive * r per_edge_per_tau[transient_idx].append(prob) transient_idx += 1 survive = survive * (1.0 - r) res_per_tau.append(survive) death_exit_rows.append(death_exit) # Stack tau slices per transient edge to (D, *grid), pad to # max_D (extra cohorts hold zeros; codegen won't touch them). for tr_idx, tr in enumerate([t for t in state.transitions if t.to is not None]): stacked = np.stack(per_edge_per_tau[tr_idx]) # (D, *grid) if D < max_D: pad = np.zeros((max_D - D,) + grid) stacked = np.concatenate([stacked, pad], axis=0) out_edges_to.append(index[tr.to]) out_edges_lump.append(tr.lump_sum) out_edges_blocks.append(stacked) residual = np.stack(res_per_tau) if D < max_D: pad = np.zeros((max_D - D,) + grid) residual = np.concatenate([residual, pad], axis=0) out_edges_to.append(i) out_edges_lump.append(False) out_edges_blocks.append(residual) # All edges out of state i carry source index i. for block, dst, lump in zip(out_edges_blocks, out_edges_to, out_edges_lump): edge_from.append(i) edge_to.append(dst) edge_prob_blocks.append(block) edge_lump.append(lump) # Stack edges to (n_edges, max_D, *grid), then move max_D axis to the # *end* so the layout matches the Markov path's (..., edges) extension: # final shape is (n_edges, *grid, max_D), cohort innermost. Codegen # then transposes once more in engine.py to put edge index and cohort # last for cache-friendly inner-loop access. stacked = np.stack(edge_prob_blocks) # (n_edges, max_D, *grid) # Move axis 1 (max_D) to the end. perm = (0,) + tuple(range(2, stacked.ndim)) + (1,) edge_prob = np.ascontiguousarray(np.transpose(stacked, perm)) return CompiledStateModel( edge_from=np.array(edge_from, dtype=np.int64), edge_to=np.array(edge_to, dtype=np.int64), edge_prob=edge_prob, edge_lump_sum=np.array(edge_lump, dtype=np.bool_), n_states=len(model.states), premium_state=np.array([s.premium for s in model.states], dtype=np.bool_), benefit_state=np.array([s.benefit for s in model.states], dtype=np.bool_), state_duration_max=state_duration_max, benefit_max_months=np.array( [s.benefit_max_months for s in model.states], dtype=np.int64), state_death_exit=np.ascontiguousarray(np.stack(death_exit_rows)), state_death_benefit_factor=np.array( [s.death_benefit_factor for s in model.states], dtype=np.float64), state_exit_after=np.array( [s.exit_after for s in model.states], dtype=np.int64), )