"""Model point data -- the contracts to be projected."""
from __future__ import annotations
import warnings
from dataclasses import dataclass
import numpy as np
from fastcashflow._typing import FloatArray, IntArray
from fastcashflow.coverage import CalculationMethod
# Contract states -- a model point's in-force state at the valuation date.
# ACTIVE is the ordinary premium-paying contract. WAIVER (premium waived on a
# triggering event) and PAIDUP (the premium-paying term has ended) both keep
# the coverage in force while collecting no premium. The state places the
# model point's starting in-force on the active or the waiver track; during
# the projection active in-force can itself transition to waiver at the
# waiver-inception rate (IFRS 17 Sec. 33-34 -- the fulfilment cash flows
# reflect the contract's actual terms at the measurement date).
STATE_ACTIVE = 0
STATE_WAIVER = 1
STATE_PAIDUP = 2
# Names for the file layer -- a model-point ``state`` column reads and writes
# these strings, the readable form a practitioner edits in a spreadsheet.
STATE_NAMES = {"ACTIVE": STATE_ACTIVE, "WAIVER": STATE_WAIVER,
"PAIDUP": STATE_PAIDUP}
STATE_LABELS = {code: name for name, code in STATE_NAMES.items()}
# When True, ``ModelPoints.__post_init__`` skips the redundant re-validation
# that ``subset`` would otherwise re-run for every segment of a large
# portfolio. ``subset`` slices an already-validated parent, so the slice is
# valid by construction (a subset of unique mp_id is unique, a slice of finite
# amounts is finite); only ``subset`` sets this, synchronously, around the
# construction call. Single-threaded at the Python level, so a module flag is
# safe -- the kernel's parallelism is in the @njit layer, not here.
_TRUST_SLICE = False
[문서]
@dataclass(frozen=True, slots=True)
class ModelPoints:
"""Columnar model point data.
Every scalar field is a numpy array of length ``n_mp``; the model-point
axis is the vectorised dimension throughout the engine. Monetary amounts
are stated per single policy; ``count`` is how many policies the model
point stands for -- it defaults to one (one row per policy: seriatim),
and a larger value scales the policy linearly through the projection.
A policy's claim benefits are a variable-length list of *coverages* (see
:mod:`fastcashflow.coverage`), held in CSR (Compressed Sparse Row) form
so the kernels loop them generically -- new benefit types add no fields:
* ``coverage_index[k]`` -- the coverage code; an integer index into
:attr:`Basis.coverages` (entry ``i`` of that tuple lives at
code ``i``). No code is reserved.
* ``coverage_amount[k]`` -- the benefit amount of coverage ``k``.
* ``coverage_offset`` -- ``(n_mp+1,)``; policy ``mp``'s coverages are the
slice ``[coverage_offset[mp] : coverage_offset[mp+1]]``.
Each coverage may carry a benefit rule: ``coverage_waiting`` (months from
issue with no benefit) and ``coverage_reduction_end`` / ``coverage_reduction_factor``
(a benefit multiplier in force until a cut-off month). Both are CSR
arrays aligned with ``coverage_index`` and default to off -- no waiting, full
benefit.
The coverage list is built one of two ways. ``benefits`` is the general
form: a ``{cov_idx: amount array}`` map keyed by coverage code (the index
into :attr:`Basis.coverages`). Or pass the CSR arrays
``coverage_index`` / ``coverage_amount`` / ``coverage_offset`` directly --
the preferred form for a portfolio with per-coverage benefit rules
(waiting / reduction periods).
Premiums and survival benefits stay as plain fields -- they do not
proliferate the way claim benefits do:
* ``premium`` -- premium charged each payment occurrence.
* ``premium_term_months`` -- months the level premium is collected,
defaulting to the full coverage term.
* ``premium_frequency_months`` -- months between level-premium payments
(1 monthly, 3 quarterly, 6 half-yearly, 12 annual), defaulting to 1.
* ``maturity_benefit`` -- benefit on survival to the end of the term.
* ``annuity_payment`` -- survival income paid each payout occurrence.
* ``annuity_frequency_months`` -- months between annuity payouts,
defaulting to 1.
* ``disability_income`` -- income paid each month a benefit state
is occupied (disability income on a disabled state).
* ``disability_benefit`` -- lump sum paid when a lump-sum transition
fires (a disability lump sum on becoming disabled).
"""
issue_age: FloatArray # attained age at issue, in years
premium: FloatArray # premium charged each payment occurrence
term_months: IntArray # coverage term, in months
benefits: dict[int, FloatArray] | None = None # general {cov_idx: amount}
maturity_benefit: FloatArray | None = None # benefit on survival to term
annuity_payment: FloatArray | None = None # survival income, each month
disability_income: FloatArray | None = None # income while in a benefit state
disability_benefit: FloatArray | None = None # lump sum on a flagged transition
# Per-policy base amount the surrender value scales against under
# surrender_value_basis="amount_per_unit" (e.g. sum insured / basic
# premium): surrender_cf = lapse_flow * surrender_value_curve[t] *
# surrender_base_amount. Explicit -- no default base is inferred, since
# the right base differs by product. None unless that mode is used.
surrender_base_amount: FloatArray | None = None
# IFRS 17 contract boundary (Sec. 34): the month past which cash flows
# leave the current contract (e.g. a step-rated renewable's next renewal,
# where the insurer can reprice). The projection stops here; the maturity
# benefit is paid only when the boundary equals the coverage term. None
# defaults to ``term_months`` -- no boundary cut, the historical behaviour.
contract_boundary_months: IntArray | None = None
premium_term_months: IntArray | None = None # months premium is collected
premium_frequency_months: IntArray | None = None # months between premiums
annuity_frequency_months: IntArray | None = None # months between payouts
account_value: FloatArray | None = None # account value at issue (VFA)
# VFA contract terms -- locked at issue, per policy. A guaranteed minimum
# crediting rate (annual) credited to the account value when the
# underlying-items return falls short; cohort-dependent (a 4%-guarantee
# 2010 block vs a 1%-guarantee 2024 block can coexist in one portfolio,
# which a single Basis value could not represent). Default 0.0 = no
# guarantee; ignored by non-VFA measurements.
minimum_crediting_rate: FloatArray | None = None
# Guaranteed minimum death benefit (GMDB) -- the floor the death benefit
# cannot fall below. On death the VFA pays max(account value, GMDB); the
# excess over the account value is the guarantee's intrinsic cost. Locked
# at issue, per policy; cohort-dependent like the credit-rate guarantee.
# Default 0.0 = no floor (max(AV, 0) = AV); ignored by non-VFA measurements.
minimum_death_benefit: FloatArray | None = None
# Guaranteed minimum accumulation benefit (GMAB) -- the floor the maturity
# benefit cannot fall below. Survivors reaching term receive max(account
# value, GMAB); the excess over the account value is the guarantee's
# intrinsic cost. Locked at issue, per policy. Default 0.0 = no floor
# (max(AV, 0) = AV); ignored by non-VFA measurements.
minimum_accumulation_benefit: FloatArray | None = None
coverage_index: IntArray | None = None # CSR: coverage index
coverage_amount: FloatArray | None = None # CSR: coverage amount
coverage_offset: IntArray | None = None # CSR: per-policy slice bounds
coverage_waiting: IntArray | None = None # CSR: waiting period, months
coverage_reduction_end: IntArray | None = None # CSR: reduced-benefit end, months
coverage_reduction_factor: FloatArray | None = None # CSR: reduced-benefit factor
coverage_step_month: IntArray | None = None # CSR: benefit step-up month (0 = none)
coverage_step_factor: FloatArray | None = None # CSR: benefit factor from step_month on
coverage_escalation_annual: FloatArray | None = None # CSR: annual benefit growth (0 = level)
coverage_escalation_cap: FloatArray | None = None # CSR: max benefit multiple (0 = unbounded)
count: FloatArray | None = None # policies the row stands for
sex: IntArray | None = None # 0 = male, 1 = female
state: IntArray | None = None # contract state (STATE_*)
# At-issue classification axis (직업class / UW class) -- one integer per
# model point, default 0 for every policy. Rate tables that key on
# ``issue_class`` look up the per-policy value; tables without the axis
# broadcast over it (no effect).
issue_class: IntArray | None = None
# In-force valuation -- months since policy inception at the valuation
# date. Default 0 reproduces the new-business behaviour (every contract
# treated as just issued). Set per-MP for an in-force portfolio: each
# contract has its own inception, so at a single valuation date the
# array carries different elapsed values across rows. Rate lookups, the
# premium-paying-window check and surrender's cumulative-premium basis
# all shift by ``elapsed_months[mp]``.
elapsed_months: IntArray | None = None
# Segment metadata -- the (product, channel) keys that map a
# model point to its assumption set when ``measure`` splits a
# portfolio. Object arrays of string labels (or None for a
# single-segment book). These are opaque routing keys: the engine
# never interprets them, so a code, a name, or any custom analysis
# group is equally valid. A human-friendly ``product_name`` /
# ``channel_name`` column may sit alongside in the input files for
# readability, but it is display-only -- the engine ignores it.
product: np.ndarray | None = None
channel: np.ndarray | None = None
# Portfolio-level taxonomy of coverage codes -- ``{coverage:
# CalculationMethod}``. The dict is the company catalogue (the
# ``calculation_methods.csv`` file): every code a contract may attach is
# registered here with its kernel-routing method (DEATH / MORBIDITY /
# DIAGNOSIS / ANNUITY / MATURITY). The engine derives
# ``(is_diagnosis, risk)`` from the method via
# :func:`fastcashflow.coverage.method_attrs`; the I/O reader
# routes coverage rows by it (annuity / maturity into scalar fields,
# rate-driven into the CSR). ``None`` lets the engine fall back to its
# default (every rate-driven coverage treated as a non-diagnosis
# morbidity claim) -- fine for a hand-written one-MP test that does
# not need the taxonomy.
calculation_methods: dict[str, "CalculationMethod"] | None = None
# Rate-driven coverage codes in registration order, captured at
# construction time. The integers in ``coverage_index`` are positional
# indices into this tuple (equivalently, into the ``Basis.coverages``
# the model points were built against). At engine entry the tuple is
# matched against the current ``Basis.coverages`` order; a swap or
# an insertion would silently shift the meaning of every ``coverage_index``
# value, so a mismatch is refused with a clear error. ``None`` skips the
# strict check (a hand-written one-MP test that did not pin an
# basis order); the catalogue-consistency check on
# ``calculation_methods`` still applies.
coverage_codes: tuple[str, ...] | None = None
# Source grouping attributes -- carried for aggregation, never read by the
# projection kernel. ``issue_date`` is the policy inception date
# (date-like / numpy datetime64), the source for the annual-cohort axis
# ``issue_year``. ``attributes`` holds any number of named per-MP label
# columns -- portfolio_id, profitability_group, risk_class, region,
# campaign_id, ... -- so :func:`fastcashflow.group` can aggregate on any
# axis. ``group_of_contracts`` is the IFRS 17 preset over the same machinery.
issue_date: np.ndarray | None = None
attributes: dict[str, np.ndarray] | None = None
# Contract identity -- the mp_id from the policies file, carried so
# ``apply_inforce_state`` can join the period-close state on it instead of
# trusting row order. A per-MP label, never read by the projection kernel;
# ``None`` for a hand-built set. Compared as a string (the uniqueness check
# and the in-force join both str-key it, so ``1`` and ``"1"`` are the same
# id); use a consistently-typed id column to avoid surprise.
mp_id: np.ndarray | None = None
def __post_init__(self) -> None:
# Normalise the required fields to numpy arrays of the right dtype.
for name, dtype in (
("issue_age", np.float64),
("premium", np.float64),
("term_months", np.int64),
):
object.__setattr__(self, name, np.asarray(getattr(self, name), dtype=dtype))
n_mp = self.issue_age.shape[0]
# Per-model-point arrays must all match issue_age's length (which
# defines n_mp); a mismatch otherwise reads n_mp from one field and
# silently ignores the rest. The rate / cash-flow inputs must also be
# finite -- a NaN age or premium yields a NaN BEL with no error.
for _nm in ("premium", "term_months"):
_a = getattr(self, _nm)
if _a.shape != (n_mp,):
raise ValueError(
f"{_nm} has length {_a.size} but n_mp is {n_mp} (from "
f"issue_age); per-model-point arrays must match"
)
if not np.all(np.isfinite(self.issue_age)):
raise ValueError("issue_age must be finite")
if not np.all(np.isfinite(self.premium)):
raise ValueError(
"premium must be finite (a NaN premium yields a NaN BEL)"
)
# premium is a forward projection assumption (the contractual
# premium each occurrence), not an accounting ledger entry. Accounting
# adjustments (refunds, retrospective true-ups) are actual experience
# and belong in roll_forward / reconcile, not the projection input, so
# a negative here is a sign / data error.
if np.any(self.premium < 0):
raise ValueError(
"premium must be >= 0 (a negative premium is a sign error; "
"accounting adjustments belong in movement analysis, not the "
"projection assumption)"
)
# Reject obviously-wrong scalar contract fields at construction time,
# not at the bottom of a kernel where the error becomes a NaN BEL.
if np.any(self.issue_age < 0):
raise ValueError("issue_age must be >= 0")
# issue_age carries through into the rate-table lookup as an int64
# (rate grids are indexed by integer year). A fractional input is
# silently truncated toward zero -- issue_age=40.7 looks up age 40
# not 41. Warn so a stray .5 from a "midpoint of year" mistake or
# a date-arithmetic bug does not slip through.
if np.any(np.modf(self.issue_age)[0] != 0):
warnings.warn(
"issue_age has fractional values; the engine truncates "
"toward zero at rate-table lookup (issue_age=40.7 -> 40). "
"Round to whole years upstream if integer age was intended.",
UserWarning,
stacklevel=2,
)
if np.any(self.term_months < 1):
raise ValueError("term_months must be >= 1")
# Premiums / survival benefits default to zero (absent).
for name in ("maturity_benefit", "annuity_payment", "disability_income",
"disability_benefit", "account_value",
"minimum_crediting_rate", "minimum_death_benefit",
"minimum_accumulation_benefit"):
value = getattr(self, name)
value = np.zeros(n_mp) if value is None else np.asarray(value, np.float64)
if not np.all(np.isfinite(value)):
raise ValueError(f"{name} must be finite")
# Benefit / premium / account amounts are non-negative; a negative
# one is a sign or data error that flows silently into the BEL.
# minimum_crediting_rate is a rate, not an amount -- skip it.
if name != "minimum_crediting_rate" and np.any(value < 0):
raise ValueError(f"{name} must be >= 0 (got a negative amount)")
object.__setattr__(self, name, value)
# count defaults to one policy per model point (seriatim).
cnt = self.count
cnt = np.ones(n_mp) if cnt is None else np.asarray(cnt, np.float64)
if np.any(cnt < 0):
raise ValueError("count must be >= 0")
object.__setattr__(self, "count", cnt)
# surrender_base_amount stays None unless provided (amount_per_unit
# needs it; no default base is inferred). When given it is a per-MP
# non-negative finite amount.
sba = self.surrender_base_amount
if sba is not None:
sba = np.asarray(sba, np.float64)
if sba.shape != (n_mp,):
raise ValueError(
f"surrender_base_amount has length {sba.size} but n_mp "
f"is {n_mp}")
if not np.all(np.isfinite(sba)) or np.any(sba < 0):
raise ValueError(
"surrender_base_amount must be finite and >= 0")
object.__setattr__(self, "surrender_base_amount", sba)
# sex defaults to 0 (male) for every model point.
sex = self.sex
sex = np.zeros(n_mp, np.int64) if sex is None else np.asarray(sex, np.int64)
if sex.shape != (n_mp,):
raise ValueError(f"sex has length {sex.size} but n_mp is {n_mp}")
if np.any((sex != 0) & (sex != 1)):
raise ValueError("sex must be 0 (male) or 1 (female)")
object.__setattr__(self, "sex", sex)
# state defaults to ACTIVE -- an ordinary premium-paying contract.
state = self.state
state = (np.zeros(n_mp, np.int64) if state is None
else np.asarray(state, np.int64))
object.__setattr__(self, "state", state)
# issue_class defaults to 0 for every model point -- the conventional
# 'no class distinction' fallback. Rate tables without an issue_class
# axis ignore this; tables with the axis look up the per-policy value.
ic = self.issue_class
ic = (np.zeros(n_mp, np.int64) if ic is None
else np.asarray(ic, np.int64))
object.__setattr__(self, "issue_class", ic)
# elapsed_months defaults to 0 -- every contract treated as just
# issued (new-business mode). Non-zero values switch the model
# point into in-force mode (see the field docstring above).
em = self.elapsed_months
em = (np.zeros(n_mp, np.int64) if em is None
else np.asarray(em, np.int64))
object.__setattr__(self, "elapsed_months", em)
# premium_term_months defaults to the full coverage term -- the level
# premium is collected every in-force month, the ordinary case.
premium_term = self.premium_term_months
premium_term = (self.term_months.copy() if premium_term is None
else np.asarray(premium_term, np.int64))
object.__setattr__(self, "premium_term_months", premium_term)
# contract_boundary_months defaults to the full coverage term -- no
# Sec. 34 boundary cut (the historical behaviour). When supplied it
# must be in [1, term]: the projection runs to the boundary and the
# maturity benefit is withheld when the boundary is short of the term.
boundary = self.contract_boundary_months
boundary = (self.term_months.copy() if boundary is None
else np.asarray(boundary, np.int64))
if np.any(boundary < 1):
raise ValueError("contract_boundary_months must be >= 1")
if np.any(boundary > self.term_months):
raise ValueError(
"contract_boundary_months must not exceed term_months "
"(the boundary cannot extend past the coverage term)")
object.__setattr__(self, "contract_boundary_months", boundary)
# Payment frequencies -- months between successive level-premium
# payments and annuity payouts; default 1 (monthly), must be >= 1.
for name in ("premium_frequency_months", "annuity_frequency_months"):
freq = getattr(self, name)
freq = (np.ones(n_mp, np.int64) if freq is None
else np.asarray(freq, np.int64))
if np.any(freq < 1):
raise ValueError(f"{name} must be >= 1")
object.__setattr__(self, name, freq)
# Coverage CSR: explicit arrays win; otherwise build from the
# general benefits map. With no shortcut field, an empty input
# yields an empty coverage list -- a portfolio with no rate-driven
# claim benefits (premiums-only, or one with only survival
# benefits via maturity_benefit / annuity_payment).
if self.coverage_index is not None:
coverage_index = np.asarray(self.coverage_index, np.int64)
coverage_amount = np.asarray(self.coverage_amount, np.float64)
coverage_offset = np.asarray(self.coverage_offset, np.int64)
else:
items = [] # (cov_idx, per-mp amount array), in coverage-list order
if self.benefits is not None:
for cov_idx, amount in self.benefits.items():
amt = np.asarray(amount, np.float64)
if amt.shape != (n_mp,):
raise ValueError(
f"benefits[{cov_idx}] has length {amt.size} but "
f"n_mp is {n_mp}"
)
items.append((int(cov_idx), amt))
coverage_index, coverage_amount, coverage_offset = _build_csr(items, n_mp)
# Validate the packed benefit amounts in one place, after both input
# forms (the benefits map and the CSR arrays the file reader fills)
# land here. The amount feeds straight into the kernel (claim = rate x
# amount), so a NaN / inf silently NaNs the BEL and a negative flips the
# claim's sign -- the file-reader path filled the CSR arrays directly
# and so used to skip this entirely.
if coverage_amount.size:
if not np.all(np.isfinite(coverage_amount)):
raise ValueError(
"coverage amounts must be finite (a NaN / inf amount yields "
"a silently-NaN BEL)"
)
if np.any(coverage_amount < 0):
bad = int(np.argmax(coverage_amount < 0))
raise ValueError(
"coverage amounts must be >= 0 (a negative flips the claim "
f"sign); coverage_amount[{bad}] = {coverage_amount[bad]}"
)
object.__setattr__(self, "coverage_index", coverage_index)
object.__setattr__(self, "coverage_amount", coverage_amount)
object.__setattr__(self, "coverage_offset", coverage_offset)
# Per-coverage benefit rules, CSR-aligned with coverage_index. A waiting
# period (months with no benefit) and a reduced-benefit period (a
# multiplier until a cut-off month) both default to off.
n_cov = coverage_amount.shape[0]
coverage_waiting = self.coverage_waiting
coverage_waiting = (np.zeros(n_cov, np.int64) if coverage_waiting is None
else np.asarray(coverage_waiting, np.int64))
coverage_reduction_end = self.coverage_reduction_end
coverage_reduction_end = (np.zeros(n_cov, np.int64) if coverage_reduction_end is None
else np.asarray(coverage_reduction_end, np.int64))
coverage_reduction_factor = self.coverage_reduction_factor
coverage_reduction_factor = (np.ones(n_cov) if coverage_reduction_factor is None
else np.asarray(coverage_reduction_factor, np.float64))
# Benefit step-up (체증): the bidirectional partner of the reduction rule.
# coverage_step_month is the month the benefit steps to coverage_step_factor
# (an absolute level, 1.2 = benefit x1.2 from that month on); 0 = no step.
coverage_step_month = self.coverage_step_month
coverage_step_month = (np.zeros(n_cov, np.int64) if coverage_step_month is None
else np.asarray(coverage_step_month, np.int64))
coverage_step_factor = self.coverage_step_factor
coverage_step_factor = (np.ones(n_cov) if coverage_step_factor is None
else np.asarray(coverage_step_factor, np.float64))
# Annual benefit escalation (체증형 보험금 / 연금): the benefit grows
# (1 + escalation_annual) ** policy_year, capped at escalation_cap x base
# (0 = unbounded). 0 growth = level. Compounding %; a step is the
# separate coverage_step_*.
coverage_escalation_annual = self.coverage_escalation_annual
coverage_escalation_annual = (np.zeros(n_cov) if coverage_escalation_annual is None
else np.asarray(coverage_escalation_annual, np.float64))
coverage_escalation_cap = self.coverage_escalation_cap
coverage_escalation_cap = (np.zeros(n_cov) if coverage_escalation_cap is None
else np.asarray(coverage_escalation_cap, np.float64))
# Each CSR-aligned rule array carries one entry per coverage. A wrong
# length would silently drop a coverage's rule (too short) or misread
# it (too long); a non-finite or negative month / factor would silently
# mis-time or flip a benefit. Months and multipliers are both >= 0
# (escalation is the growth axis; the reduction rule handles cuts).
for name, arr in (
("coverage_waiting", coverage_waiting),
("coverage_reduction_end", coverage_reduction_end),
("coverage_reduction_factor", coverage_reduction_factor),
("coverage_step_month", coverage_step_month),
("coverage_step_factor", coverage_step_factor),
("coverage_escalation_annual", coverage_escalation_annual),
("coverage_escalation_cap", coverage_escalation_cap),
):
if arr.shape != (n_cov,):
raise ValueError(
f"{name} must align with the coverage list (one entry per "
f"coverage): shape ({n_cov},), got {arr.shape}"
)
if not np.all(np.isfinite(arr)):
raise ValueError(f"{name} must be finite")
if np.any(arr < 0):
bad = int(np.argmax(arr < 0))
raise ValueError(f"{name} must be >= 0; {name}[{bad}] = {arr[bad]}")
object.__setattr__(self, "coverage_waiting", coverage_waiting)
object.__setattr__(self, "coverage_reduction_end", coverage_reduction_end)
object.__setattr__(self, "coverage_reduction_factor", coverage_reduction_factor)
object.__setattr__(self, "coverage_step_month", coverage_step_month)
object.__setattr__(self, "coverage_step_factor", coverage_step_factor)
object.__setattr__(self, "coverage_escalation_annual", coverage_escalation_annual)
object.__setattr__(self, "coverage_escalation_cap", coverage_escalation_cap)
# Segment metadata + mp_id -- normalise to object arrays so they slice
# with the per-row fields. ``None`` stays None (a single-segment book).
for name in ("product", "channel", "mp_id"):
value = getattr(self, name)
if value is not None:
value = np.asarray(value, dtype=object)
if value.shape != (n_mp,):
raise ValueError(
f"{name} must have shape ({n_mp},), got {value.shape}"
)
object.__setattr__(self, name, value)
# mp_id is the contract identity the in-force / settlement joins key on
# (apply_inforce_state, group_of_contracts). A duplicate makes that
# join ambiguous, so reject it when mp_id is supplied. (The file reader
# already rejects duplicate policy ids; this covers a hand-built set.)
if self.mp_id is not None and not _TRUST_SLICE:
# str-key the ids so the check matches apply_inforce_state's string
# join (1 and "1" are the same id) and so a mixed-type column raises
# a clear duplicate error, not a np.unique sort TypeError. A set is
# O(n) -- no sort -- and runs once per build (subset skips it via
# _TRUST_SLICE).
keys = [str(v) for v in self.mp_id.tolist()]
if len(set(keys)) != len(keys):
seen, dup = set(), []
for k in keys:
if k in seen and k not in dup:
dup.append(k)
seen.add(k)
raise ValueError(
f"ModelPoints.mp_id must be unique (it is the contract "
f"identity / join key); duplicates: {dup[:5]}"
)
# Source grouping attributes -- per-row, sliced with the segment keys,
# untouched by the kernel. issue_date -> datetime64[D]; attributes
# values -> object label arrays, each of length n_mp.
if self.issue_date is not None:
issue_date = np.asarray(self.issue_date, dtype="datetime64[D]")
if issue_date.shape != (n_mp,):
raise ValueError(
f"issue_date must have shape ({n_mp},), got {issue_date.shape}"
)
object.__setattr__(self, "issue_date", issue_date)
if self.attributes is not None:
attrs: dict[str, np.ndarray] = {}
for k, v in self.attributes.items():
v = np.asarray(v, dtype=object)
if v.shape != (n_mp,):
raise ValueError(
f"attributes[{k!r}] must have shape ({n_mp},), got {v.shape}"
)
attrs[str(k)] = v
object.__setattr__(self, "attributes", attrs)
# Benefit-pattern taxonomy -- normalise dict values to CalculationMethod
# members so a CSV-derived ``{"CANCER": "DIAGNOSIS"}`` works the same
# as a hand-built ``{"CANCER": CalculationMethod.DIAGNOSIS}``.
bp = self.calculation_methods
if bp is not None:
bp = {str(k): CalculationMethod(v) for k, v in bp.items()}
object.__setattr__(self, "calculation_methods", bp)
# Registered coverage codes -- normalise to an immutable tuple of str
# so a hand-built list or a polars Series passes through, and the
# stored value can never drift out of sync with itself.
cc = self.coverage_codes
if cc is not None:
object.__setattr__(self, "coverage_codes",
tuple(str(c) for c in cc))
@property
def n_mp(self) -> int:
"""Number of model points."""
return int(self.issue_age.shape[0])
[문서]
def axis(self, name: str) -> np.ndarray:
"""Resolve a grouping axis to a ``(n_mp,)`` label array by name.
Used by :func:`fastcashflow.group` to aggregate on any axis. Resolution
order: the derived ``issue_year`` (calendar year of ``issue_date``); the
named source fields ``product`` / ``channel`` / ``issue_date``; then
any key in ``attributes``
(portfolio_id, profitability_group, risk_class, ...). Raises
:class:`KeyError` listing the available axes when the name is unknown.
"""
if name == "issue_year":
if self.issue_date is None:
raise KeyError("issue_year needs issue_date, which is not set")
return self.issue_date.astype("datetime64[Y]").astype(int) + 1970
# Engine-native per-MP fields are axes too, and take precedence over a
# same-named attribute. ``issue_class`` (위험등급), sex, state and
# elapsed_months default to a filled array, so they always resolve;
# product / channel / issue_date may be None.
_fields = ("product", "channel", "issue_date",
"issue_class", "sex", "state", "elapsed_months")
if name in _fields:
value = getattr(self, name)
if value is None:
raise KeyError(f"axis {name!r} is not set on these model points")
return value
if self.attributes is not None and name in self.attributes:
return self.attributes[name]
available = ["issue_year", *_fields]
if self.attributes:
available += list(self.attributes)
raise KeyError(
f"unknown grouping axis {name!r}; available: {sorted(set(available))}"
)
def __repr__(self) -> str:
from fastcashflow._display import model_points_repr
return model_points_repr(self)
def __str__(self) -> str:
from fastcashflow._display import model_points_str
return model_points_str(self)
[문서]
@classmethod
def single(
cls,
issue_age: float,
premium: float,
term_months: int,
benefits: dict[int, float] | None = None,
maturity_benefit: float = 0.0,
annuity_payment: float = 0.0,
disability_income: float = 0.0,
disability_benefit: float = 0.0,
premium_term_months: int | None = None,
premium_frequency_months: int = 1,
annuity_frequency_months: int = 1,
account_value: float = 0.0,
minimum_crediting_rate: float = 0.0,
minimum_death_benefit: float = 0.0,
minimum_accumulation_benefit: float = 0.0,
count: float = 1.0,
sex: int = 0,
state: int = STATE_ACTIVE,
calculation_methods: dict[str, "CalculationMethod"] | None = None,
) -> ModelPoints:
"""Build a single-model-point set -- a convenience for hand checks.
``benefits`` is the per-coverage benefit-amount map keyed by
coverage code (the index into :attr:`Basis.coverages`); pass
``{0: 1_000_000.0}`` to attach the benefit to the first registered
coverage. None means no claim benefits.
"""
return cls(
issue_age=np.array([issue_age]),
premium=np.array([premium]),
term_months=np.array([term_months]),
maturity_benefit=np.array([maturity_benefit]),
annuity_payment=np.array([annuity_payment]),
disability_income=np.array([disability_income]),
disability_benefit=np.array([disability_benefit]),
premium_term_months=(None if premium_term_months is None
else np.array([premium_term_months])),
premium_frequency_months=np.array([premium_frequency_months]),
annuity_frequency_months=np.array([annuity_frequency_months]),
account_value=np.array([account_value]),
minimum_crediting_rate=np.array([minimum_crediting_rate]),
minimum_death_benefit=np.array([minimum_death_benefit]),
minimum_accumulation_benefit=np.array([minimum_accumulation_benefit]),
count=np.array([count]),
sex=np.array([sex]),
state=np.array([state]),
benefits=(
None if benefits is None
else {k: np.array([v]) for k, v in benefits.items()}
),
calculation_methods=calculation_methods,
)
[문서]
def subset(self, indices) -> ModelPoints:
"""Return a new ``ModelPoints`` carrying the rows at ``indices``.
Per-row fields (issue_age, premium, ...) and the segment
metadata (product, channel) are sliced. The coverage CSR is
rebuilt: each selected row's coverage slice
``coverage_index[coverage_offset[i]:coverage_offset[i+1]]`` is concatenated, and
``coverage_offset`` is reset to the new running cumulative sum. Used by
:func:`fastcashflow.gmm.measure` to split a portfolio
by (product, channel) before per-segment measurement.
``indices`` is expected to select **distinct** rows -- it is a row
selection, not a gather. As an optimisation the result skips the
re-validation the constructor runs (the parent was already validated),
so a repeated index (``subset([0, 0])``) would carry a duplicate mp_id
the constructor would otherwise reject. Every engine caller passes a
unique segment index, so this is safe on the hot path; pass distinct
indices when calling it directly.
"""
idx = np.asarray(indices, dtype=np.int64)
# Per-row scalar fields.
per_row = (
"issue_age", "premium", "term_months",
"maturity_benefit", "annuity_payment", "disability_income",
"disability_benefit", "premium_term_months",
"contract_boundary_months",
"premium_frequency_months", "annuity_frequency_months",
"account_value", "minimum_crediting_rate", "minimum_death_benefit",
"minimum_accumulation_benefit",
"count", "sex", "state", "issue_class", "elapsed_months",
)
kwargs: dict = {name: getattr(self, name)[idx] for name in per_row}
# CSR coverage arrays -- gather each selected row's slice and rebuild
# coverage_offset as the new cumulative count. The gather index
# ``[start_i .. end_i)`` for every row is built vectorised (a repeat of
# each start plus a per-row ramp) rather than a Python ``np.arange`` per
# row, so it stays O(n_cov) with no per-model-point call overhead -- the
# hot path when ``measure`` subsets a large portfolio per segment.
starts = self.coverage_offset[idx]
ends = self.coverage_offset[idx + 1]
lengths = ends - starts
total = int(lengths.sum())
if total > 0:
block_start = np.repeat(np.cumsum(lengths) - lengths, lengths)
ramp = np.arange(total, dtype=np.int64) - block_start
cov_idx = np.repeat(starts, lengths) + ramp
else:
cov_idx = np.zeros(0, dtype=np.int64)
kwargs["coverage_index"] = self.coverage_index[cov_idx]
kwargs["coverage_amount"] = self.coverage_amount[cov_idx]
kwargs["coverage_offset"] = np.concatenate(
([0], np.cumsum(ends - starts, dtype=np.int64))
)
kwargs["coverage_step_month"] = self.coverage_step_month[cov_idx]
kwargs["coverage_step_factor"] = self.coverage_step_factor[cov_idx]
kwargs["coverage_escalation_annual"] = self.coverage_escalation_annual[cov_idx]
kwargs["coverage_escalation_cap"] = self.coverage_escalation_cap[cov_idx]
kwargs["coverage_waiting"] = self.coverage_waiting[cov_idx]
kwargs["coverage_reduction_end"] = self.coverage_reduction_end[cov_idx]
kwargs["coverage_reduction_factor"] = self.coverage_reduction_factor[cov_idx]
# Segment metadata + mp_id + optional surrender base -- slice if set;
# otherwise stay None.
for name in ("product", "channel", "mp_id",
"surrender_base_amount"):
value = getattr(self, name)
kwargs[name] = None if value is None else value[idx]
# Source grouping attributes -- slice if set.
kwargs["issue_date"] = (None if self.issue_date is None
else self.issue_date[idx])
kwargs["attributes"] = (None if self.attributes is None
else {k: v[idx] for k, v in self.attributes.items()})
# Taxonomy carries through unchanged -- subsetting drops rows, not
# the company-level catalogue of coverage codes.
kwargs["calculation_methods"] = self.calculation_methods
# The registered coverage-code order is a property of the basis
# the model points were built against, not of the row subset.
kwargs["coverage_codes"] = self.coverage_codes
# The slice is valid by construction (the parent was validated), so skip
# the redundant re-validation -- this is the hot path when measure splits
# a large portfolio into per-segment subsets.
global _TRUST_SLICE
_TRUST_SLICE = True
try:
return ModelPoints(**kwargs)
finally:
_TRUST_SLICE = False
[문서]
@dataclass(frozen=True, slots=True)
class InforceState:
"""Per-MP closing state from the prior reporting period.
The input layer for in-force / subsequent-measurement workflows. A
fresh ``inforce_state.csv`` is produced at each period close from the
company's policy administration system and joined onto the static
``policies.csv`` to value the in-force at the next reporting date.
Fields:
* ``mp_id`` -- join key, matches the ``mp_id`` column on the policies
file.
* ``elapsed_months`` -- months since each contract's inception as of
the valuation date (= valuation date - inception date).
* ``count`` -- in-force at the valuation date (the user has already
scaled it down for past lapses); seats the projection.
* ``prior_csm`` -- closing CSM at month
``elapsed_months - period_months``, the prior reporting date's
result carried into this period.
* ``lock_in_rate`` -- annual locked-in discount rate (Sec. B72(b)).
Scalar in v1; per-MP cohort-aware rates are a future extension.
"""
mp_id: np.ndarray
elapsed_months: IntArray
count: FloatArray
prior_csm: FloatArray
lock_in_rate: float
def __post_init__(self) -> None:
# Coerce each array to its canonical dtype so a hand-built state
# (or a reader using a different default dtype) feeds the engine
# with the dtypes the kernels expect -- without this, an int64
# ``count`` or a float32 ``elapsed_months`` reaches the kernel and
# silently triggers a slow path or a numba dispatch error.
object.__setattr__(
self, "elapsed_months",
np.asarray(self.elapsed_months, dtype=np.int64),
)
object.__setattr__(
self, "count", np.asarray(self.count, dtype=np.float64),
)
object.__setattr__(
self, "prior_csm", np.asarray(self.prior_csm, dtype=np.float64),
)
object.__setattr__(self, "lock_in_rate", float(self.lock_in_rate))
# Validate: a negative elapsed month indexes backwards into the
# trajectory (silently wrong); a NaN prior CSM / lock-in rate makes the
# carried-forward CSM NaN with no error; a ragged array reads n from
# one field and ignores the rest.
n = self.elapsed_months.shape[0]
for nm in ("mp_id", "count", "prior_csm"):
a = np.asarray(getattr(self, nm))
if a.shape[0] != n:
raise ValueError(
f"InforceState.{nm} has length {a.shape[0]} but "
f"elapsed_months has {n}; per-MP arrays must match"
)
if np.any(self.elapsed_months < 0):
raise ValueError("InforceState.elapsed_months must be >= 0")
if np.any(self.count < 0):
raise ValueError("InforceState.count must be >= 0")
if not np.all(np.isfinite(self.prior_csm)):
raise ValueError("InforceState.prior_csm must be finite")
if not np.isfinite(self.lock_in_rate):
raise ValueError("InforceState.lock_in_rate must be finite")
# mp_id is the identity key the period-close state is joined on
# (align_inforce_state / apply_inforce_state). A duplicate id makes
# that join ambiguous -- the dict lookup keeps one row and silently
# drops the other -- so reject it here. str-key the ids (matching the
# string join) so a mixed-type column raises a clear duplicate error,
# not a np.unique sort TypeError.
keys = [str(v) for v in np.asarray(self.mp_id).tolist()]
if len(set(keys)) != len(keys):
seen, dup = set(), []
for k in keys:
if k in seen and k not in dup:
dup.append(k)
seen.add(k)
raise ValueError(
f"InforceState.mp_id must be unique (it is the join key); "
f"duplicates: {dup[:5]}"
)
def subset(self, indices) -> "InforceState":
"""Return a new ``InforceState`` carrying the rows at ``indices``.
The per-MP fields (``mp_id``, ``elapsed_months``, ``count``,
``prior_csm``) are sliced together and the scalar ``lock_in_rate``
is carried, so the result stays internally consistent. Use it
alongside :meth:`ModelPoints.subset` to split a period-close state
by segment before a per-segment
:func:`fastcashflow.gmm.measure_inforce` (slicing only ``prior_csm``
would leave the state ragged).
"""
idx = np.asarray(indices, dtype=np.int64)
return InforceState(
mp_id=np.asarray(self.mp_id)[idx],
elapsed_months=self.elapsed_months[idx],
count=self.count[idx],
prior_csm=self.prior_csm[idx],
lock_in_rate=self.lock_in_rate,
)
[문서]
def align_inforce_state(
model_points: "ModelPoints", state: InforceState,
) -> InforceState:
"""Return ``state`` reordered so its rows line up with ``model_points``.
Every per-MP field of the returned state (``elapsed_months``, ``count``,
``prior_csm``, ``mp_id``) is row-for-row aligned with the model points.
When both carry ``mp_id`` the match is **by mp_id** -- reordered when the
two files are in different orders, and rejected when their id sets differ
-- so a misaligned period-close file cannot silently assign one contract's
state (including its prior CSM) to another. When the model points have no
``mp_id`` (a hand-built set), the rows are taken positionally after a
length check; align them yourself in that case.
"""
n_mp = int(model_points.issue_age.shape[0])
if state.elapsed_months.shape[0] != n_mp:
raise ValueError(
f"state has {state.elapsed_months.shape[0]} rows; the "
f"model points have {n_mp}. The state must cover exactly the "
"valued contracts."
)
mp_ids = model_points.mp_id
if mp_ids is not None:
mp_ids = np.asarray(mp_ids).astype(str)
st_ids = np.asarray(state.mp_id).astype(str)
if set(mp_ids) != set(st_ids):
missing = sorted(set(mp_ids) - set(st_ids))[:5]
extra = sorted(set(st_ids) - set(mp_ids))[:5]
raise ValueError(
"align_inforce_state: model points and state carry different "
f"mp_id sets (in model points only: {missing}; in state only: "
f"{extra}). The state must cover exactly the valued contracts."
)
if not np.array_equal(mp_ids, st_ids): # different order -> join
pos = {mid: i for i, mid in enumerate(st_ids)}
state = state.subset(np.array([pos[mid] for mid in mp_ids]))
return state
[문서]
def apply_inforce_state(
model_points: "ModelPoints", state: InforceState,
) -> "ModelPoints":
"""Return a ``ModelPoints`` with the state's ``elapsed_months`` and
``count`` substituted in, joined on ``mp_id`` (see
:func:`align_inforce_state` for the join rules).
Note this substitutes only ``elapsed_months`` / ``count`` onto the model
points; the state's ``prior_csm`` rides on the (separately passed)
:class:`InforceState`. :func:`~fastcashflow.gmm.measure_inforce` re-aligns
that state by mp_id internally, so prior_csm cannot drift out of order.
"""
from dataclasses import replace
state = align_inforce_state(model_points, state)
return replace(
model_points,
elapsed_months=np.asarray(state.elapsed_months, dtype=np.int64),
count=np.asarray(state.count, dtype=np.float64),
)
def _build_csr(
items: list[tuple[int, FloatArray]], n_mp: int
) -> tuple[IntArray, FloatArray, IntArray]:
"""Pack ``(cov_idx, per-mp amount)`` items into a coverage CSR.
A zero amount is no coverage. Coverages are ordered by model point, and
within a model point by the order the cov_idx values appear in ``items``.
An empty ``items`` list yields an empty coverage list -- no claim
coverages on any policy.
"""
if not items:
return (
np.zeros(0, np.int64),
np.zeros(0, np.float64),
np.zeros(n_mp + 1, np.int64),
)
mp_parts, cov_idx_parts, amount_parts = [], [], []
for cov_idx, amount in items:
present = amount != 0.0
mp_idx = np.nonzero(present)[0]
mp_parts.append(mp_idx)
cov_idx_parts.append(np.full(mp_idx.size, cov_idx, np.int64))
amount_parts.append(amount[present])
all_mp = np.concatenate(mp_parts)
all_cov_idx = np.concatenate(cov_idx_parts)
all_amount = np.concatenate(amount_parts)
order = np.argsort(all_mp, kind="stable") # group by mp, keep cov_idx order
coverage_index = np.ascontiguousarray(all_cov_idx[order])
coverage_amount = np.ascontiguousarray(all_amount[order])
coverage_offset = np.concatenate((
np.zeros(1, np.int64),
np.cumsum(np.bincount(all_mp, minlength=n_mp), dtype=np.int64),
))
return coverage_index, coverage_amount, coverage_offset