fastcashflow.io의 소스 코드

"""File I/O for model points, the actuarial basis and valuation results.

Model points and results go through polars; the actuarial basis -- read by
:func:`read_basis` -- comes from an Excel workbook via openpyxl.

Model points come in two shapes, both producing the same ``ModelPoints``:

* **wide** -- one row per policy, every benefit a column:
  ``<coverage>_benefit`` per rate-driven coverage, plus the survival
  benefits ``maturity_benefit`` and ``annuity_payment``. The convenient
  form for a single, homogeneous product.
* a policies frame (contract attributes) plus a coverages
  frame, one row per policy x coverage carrying ``amount`` and ``premium``.
  The form for a heterogeneous, multi-product portfolio.

:func:`read_model_points` reads either. The engine ships no ModelPoints ->
file exporter: both forms are lossy projections (they cannot carry per-coverage
waiting / reduction rules, ``issue_class``, ``elapsed_months`` or the VFA
account fields), so they are accepted only as external input, never produced.

The core engine stays identifier-free: the kernel never needs a policy id, so
none is carried through ``ModelPoints`` or ``GMMMeasurement``. Identifiers are a
file-boundary concern -- pass them to :func:`write_measurement` (or via
``measure_stream``'s ``id_column``) to join results back to policies.
"""
from __future__ import annotations

import importlib.resources as resources
import warnings
from dataclasses import replace
from functools import singledispatch
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np
import openpyxl
import polars as pl

from fastcashflow._typing import FloatArray
from fastcashflow.basis import (
    Basis, CoverageRate, ExpenseItem,
)
from fastcashflow.statemodel import STATE_MODELS
from fastcashflow.coverage import (
    CalculationMethod, RATE_DRIVEN_METHODS,
)
from fastcashflow.modelpoints import STATE_ACTIVE, STATE_NAMES, ModelPoints

# ``engine`` is the largest module in the package (codegen + the numba CPU
# kernels) and importing it at module load pulls all of that into any
# downstream that needs the I/O layer. The two engine names used here --
# ``GMMMeasurement`` for write_measurement's type hint and ``measure`` for the
# ``measure_stream`` stream -- are imported under TYPE_CHECKING (for the hint)
# and lazily inside ``measure_stream`` (for the call), so a script that only
# reads model points or writes a results frame never imports engine.py.
if TYPE_CHECKING:  # pragma: no cover -- import only for type hints
    from fastcashflow.engine import GMMMeasurement


def _read_frame(path) -> pl.DataFrame:
    p = str(path)
    if p.endswith(".parquet"):
        return pl.read_parquet(p)
    if p.endswith(".csv"):
        return pl.read_csv(p)
    if p.endswith(".xlsx"):
        return pl.read_excel(p, engine="openpyxl")
    if p.endswith((".feather", ".arrow")):
        return pl.read_ipc(p)
    raise ValueError(
        f"unsupported file type: {path!r} "
        "(expected .parquet, .csv, .xlsx or .feather)"
    )


def _write_frame(df: pl.DataFrame, path) -> None:
    p = str(path)
    if p.endswith(".parquet"):
        df.write_parquet(p)
    elif p.endswith(".csv"):
        df.write_csv(p)
    elif p.endswith((".feather", ".arrow")):
        df.write_ipc(p)
    elif p.endswith(".xlsx"):
        # Single-sheet xlsx via openpyxl (polars's own write_excel needs an
        # extra ``xlsxwriter`` dependency we do not require). The reader
        # side already accepts .xlsx via ``_read_frame``.
        import openpyxl
        wb = openpyxl.Workbook()
        ws = wb.active
        ws.append(list(df.columns))
        for row in df.iter_rows():
            ws.append(list(row))
        wb.save(p)
    else:
        raise ValueError(
            f"unsupported file type: {path!r} "
            "(expected .parquet, .csv, .xlsx or .feather)"
        )


# ---------------------------------------------------------------------------
# Actuarial basis -- the basis workbook
# ---------------------------------------------------------------------------
#
# A single workbook (``basis.xlsx``) carries every assumption the engine
# needs. Ten sheets:
#
#   * ``segments``       -- (product, channel) -> which tables + scalar params
#                           (a ``_DEFAULTS`` row that blank cells inherit).
#   * ``coverages``      -- (product) -> coverage, type, optional rate_table.
#   * ``mortality_tables``, ``incidence_rate_tables``, ``waiver_tables``,
#     ``lapse_tables``, ``discount_tables``, ``surrender_value_tables``,
#     ``expense_tables``, ``inflation_tables`` -- the named rate tables the
#     segments reference.
#
# See docs/basis-format.md for the column-level schema and
# docs/naming-conventions.md for the value-case rules.
#
# v1 limitation (refined in a later round): the discount, inflation and
# maintenance tables are read but used flat (their first entry). The reader
# returns ``{(product, channel): Basis}`` -- splitting model points by
# segment and valuing each is left to the caller.


def _sheet_dicts(ws):
    """Yield each data row of a worksheet as a dict keyed by the header row."""
    rows = ws.iter_rows(values_only=True)
    try:
        header = next(rows)
    except StopIteration:
        return
    names = [str(h).strip() if h is not None else "" for h in header]
    for row in rows:
        if all(c is None for c in row):
            continue
        yield {n: v for n, v in zip(names, row) if n}


def _require_sheet(wb, sheet_name):
    """Return ``wb[sheet_name]`` or raise a friendly error.

    Without this wrap a missing required sheet surfaces as a raw openpyxl
    ``KeyError(sheet_name)`` -- non-obvious to a non-programmer actuary
    reading the traceback.
    """
    if sheet_name not in wb.sheetnames:
        raise ValueError(
            f"basis workbook is missing required sheet "
            f"{sheet_name!r}; known sheets: {sorted(wb.sheetnames)}"
        )
    return wb[sheet_name]


def _require_row_cols(row, required, *, sheet, table_id=None, what="row"):
    """Raise a friendly error if any required column is missing from ``row``.

    ``row`` is a dict yielded by :func:`_sheet_dicts`. Without this wrap a
    missing column surfaces as a bare ``KeyError(col)`` deep in the
    reader -- the user gets a column name but no context.
    """
    missing = [c for c in required if c not in row]
    if not missing:
        return
    ctx = f" (table_id={table_id!r})" if table_id is not None else ""
    raise ValueError(
        f"sheet {sheet!r}{ctx}: {what} is missing required column(s) "
        f"{missing}; row has columns {sorted(row)}"
    )


def _truncate_list(items, cap=10):
    """Format a list capped at ``cap`` items with a ``(... and N more)`` suffix.

    Used for not-found errors that enumerate the registered alternatives --
    a workbook with 100+ table_ids would otherwise produce an unreadable
    multi-line traceback.
    """
    items = list(items)
    if len(items) <= cap:
        return repr(items)
    extra = len(items) - cap
    return f"{items[:cap]!r} (... and {extra} more)"


# Axes a rate table may carry, in the order they index the internal grid.
# A sheet may include any subset; missing axes broadcast (the rate is held
# flat over that axis at lookup time). ``age`` (attained) is mutually
# exclusive with ``issue_age`` / ``duration`` (select-and-ultimate schema).
# ``issue_class`` is the at-issue classification axis (직업class / UW
# class) -- absent from most tables, broadcasts to a no-op when absent.
# ``elapsed`` is the semi-Markov sojourn axis (state-duration in years
# since entering the source state) -- carried by re-incidence /
# post-event mortality tables; broadcasts to a no-op when absent.
_RATE_AXES = ("sex", "issue_age", "duration", "age", "issue_class", "elapsed")


def _flex_rate_table(ws, *, value_col="rate"):
    """Schema-detecting rate-table reader -- returns ``{table_id: callable}``.

    The sheet may carry any subset of ``_RATE_AXES`` plus ``table_id`` and
    ``value_col`` (``rate`` or ``amount``). The reader detects which axes are
    present and returns a callable per table with the standard
    ``(sex, issue_age, duration)`` signature; axes not in the sheet broadcast
    (the rate is held flat over them), and lookups past the table's range
    clip to the edge.

    Supported schemas (any subset of ``{sex, age, issue_age, duration}``):

    * ``[rate]``                          -- flat scalar
    * ``[age, rate]``                     -- by attained age, sex broadcast
    * ``[sex, age, rate]``                -- by sex x age (the historical default)
    * ``[duration, rate]``                -- by duration, sex / age broadcast (lapse)
    * ``[sex, issue_age, duration, rate]`` -- full select-and-ultimate

    A sheet mixing ``age`` (attained) with ``issue_age`` / ``duration``
    (select schema) is rejected -- pick one parameterisation.
    """
    rows = list(_sheet_dicts(ws))
    if not rows:
        return {}
    header = set(rows[0].keys())
    if "table_id" not in header:
        raise ValueError(
            f"sheet {ws.title!r} is missing required column 'table_id'; "
            f"row has columns {sorted(header)}"
        )
    if value_col not in header:
        raise ValueError(
            f"sheet {ws.title!r} is missing required column {value_col!r} "
            f"(every rate-table row carries the rate in this column); "
            f"row has columns {sorted(header)}"
        )
    axes = tuple(a for a in _RATE_AXES if a in header)
    if "age" in axes and ("issue_age" in axes or "duration" in axes):
        raise ValueError(
            f"sheet {ws.title!r} mixes 'age' (attained) with "
            "'issue_age' / 'duration' (select schema) -- pick one"
        )

    by_id: dict[str, dict[tuple, float]] = {}
    for r in rows:
        tid = str(r["table_id"]).strip()
        try:
            key = tuple(int(r[a]) for a in axes)
        except KeyError as exc:
            raise ValueError(
                f"sheet {ws.title!r} table {tid!r}: row is missing axis "
                f"column {exc.args[0]!r} (header declares axes {axes!r}, "
                "so every row must populate them)"
            ) from None
        bucket = by_id.setdefault(tid, {})
        if key in bucket:
            raise ValueError(
                f"sheet {ws.title!r} table {tid!r}: duplicate row at "
                f"{dict(zip(axes, key))} -- a rate table must have one "
                "entry per axis combination (the last row would silently "
                "overwrite the first)"
            )
        bucket[key] = float(r[value_col])
    return {tid: _build_rate_callable(axes, list(entries.items()), ws.title, tid)
            for tid, entries in by_id.items()}


def _build_rate_callable(axes, entries, sheet_title, table_id):
    """Pack rows into a dense numpy grid and wrap in a lookup closure."""
    if not axes:
        # Flat scalar table -- one row, one rate.
        if len(entries) != 1:
            raise ValueError(
                f"sheet {sheet_title!r} table {table_id!r}: a flat (axis-less) "
                f"table must have exactly one row, got {len(entries)}"
            )
        val = entries[0][1]

        def rate(sex, issue_age, duration, issue_class, elapsed):
            shape = np.broadcast_shapes(
                np.asarray(sex).shape, np.asarray(issue_age).shape,
                np.asarray(duration).shape, np.asarray(issue_class).shape,
                np.asarray(elapsed).shape,
            )
            return np.full(shape, val, dtype=np.float64)
        rate._fcf_table_id = table_id
        rate._fcf_sheet = sheet_title
        rate._fcf_modifiers = ()
        return rate

    keys = np.array([k for k, _ in entries], dtype=np.int64)
    values = np.array([v for _, v in entries], dtype=np.float64)
    mins = keys.min(axis=0)
    maxs = keys.max(axis=0)
    shape = tuple(int(maxs[i] - mins[i] + 1) for i in range(len(axes)))
    grid = np.full(shape, np.nan, dtype=np.float64)
    for k, v in zip(keys, values):
        idx = tuple(int(k[i] - mins[i]) for i in range(len(axes)))
        grid[idx] = v
    if np.isnan(grid).any():
        raise ValueError(
            f"sheet {sheet_title!r} table {table_id!r} is not dense over its "
            f"axes {axes} -- some cells in the cartesian product are missing"
        )

    def rate(sex, issue_age, duration, issue_class, elapsed):
        sex = np.asarray(sex, dtype=np.int64)
        issue_age = np.asarray(issue_age, dtype=np.int64)
        duration = np.asarray(duration, dtype=np.int64)
        issue_class = np.asarray(issue_class, dtype=np.int64)
        elapsed = np.asarray(elapsed, dtype=np.int64)
        # One index array per axis present in the table.
        idxs = []
        for i, a in enumerate(axes):
            if a == "sex":
                raw = sex
            elif a == "age":
                raw = issue_age + duration                # attained age
            elif a == "issue_age":
                raw = issue_age
            elif a == "issue_class":
                raw = issue_class
            elif a == "elapsed":
                raw = elapsed
            else:                                          # duration
                raw = duration
            idxs.append(np.clip(raw - int(mins[i]), 0, shape[i] - 1))
        # Broadcast each index to the input's full broadcast shape so that
        # numpy fancy-indexing returns a result of that shape (axes absent
        # from the table contribute through broadcast, not indexing).
        target = np.broadcast_shapes(
            sex.shape, issue_age.shape, duration.shape,
            issue_class.shape, elapsed.shape,
        )
        return grid[tuple(np.broadcast_to(ix, target) for ix in idxs)]
    rate._fcf_table_id = table_id
    rate._fcf_sheet = sheet_title
    rate._fcf_modifiers = ()
    return rate


def _read_expense_tables(ws) -> dict[str, tuple[ExpenseItem, ...]]:
    """Read the optional ``expense_tables`` sheet.

    Each row is one ``ExpenseItem`` -- the item-form expense ledger the
    engine dispatches on. Columns: ``table_id``, ``expense_type``,
    ``basis``, ``value``. The same ``table_id`` may span multiple rows
    (an acquisition row plus a maintenance row, plus an LAE row, ...).
    Returns ``{table_id: tuple[ExpenseItem, ...]}`` for the
    segments-side ``expense_table`` lookup to consume. Inflation is
    *not* a row attribute -- it lives on the segment as the global
    economic ``expense_inflation`` curve (see :data:`inflation_tables`
    sheet).
    """
    by_id: dict[str, list[ExpenseItem]] = {}
    first = True
    for r in _sheet_dicts(ws):
        if first:
            _require_row_cols(
                r, ("table_id", "expense_type", "basis", "value"),
                sheet=ws.title,
            )
            first = False
        tid = str(r["table_id"]).strip()
        by_id.setdefault(tid, []).append(ExpenseItem(
            expense_type=str(r["expense_type"]).strip(),
            basis=str(r["basis"]).strip(),
            value=float(r["value"]),
        ))
    return {tid: tuple(rows) for tid, rows in by_id.items()}


def _read_ae_factors(ws):
    """Read the optional ``ae_factors`` sheet.

    Each row is one (product, channel, coverage) -> factor (a runtime
    multiplier on the base rate). Optional axis columns
    ``{sex, age, issue_age, duration}`` let the factor vary along those
    dimensions (same schema-detection rules as the base rate tables); missing
    axes broadcast. ``channel`` empty matches the segment whose channel is
    blank (a single-segment workbook).

    Returns ``(factors, seg_axes)`` where ``factors`` is
    ``{(*seg_axis_values, coverage): callable(sex, issue_age, duration,
    issue_class, elapsed) -> factor}`` and ``seg_axes`` are the segment-axis
    columns the sheet declares -- any subset of the segments' axes, so an A/E
    calibrated coarsely (just ``product``) broadcasts over the finer
    routing axes. Missing / empty sheet -> ``({}, ())`` -> no A/E adjustment.
    """
    rows = list(_sheet_dicts(ws))
    if not rows:
        return {}, ()
    header = list(rows[0].keys())
    _require_row_cols(
        rows[0], ("product", "coverage", "factor"), sheet=ws.title,
    )
    # Rate axes (the factor's shape, like a rate table); everything else that is
    # not coverage / factor is a segment axis (which segment+coverage the
    # factor applies to).
    rate_axes = tuple(a for a in _RATE_AXES if a in header)
    if "age" in rate_axes and ("issue_age" in rate_axes or "duration" in rate_axes):
        raise ValueError(
            f"sheet {ws.title!r} mixes 'age' (attained) with "
            "'issue_age' / 'duration' (select schema) -- pick one"
        )
    seg_axes = tuple(c for c in header
                     if c not in _RATE_AXES and c not in ("coverage", "factor")
                     and not str(c).endswith("_name"))

    by_key: dict[tuple, list] = {}
    for r in rows:
        seg_key = tuple(str(r.get(a, "") or "").strip() for a in seg_axes)
        coverage = str(r["coverage"]).strip()
        key = seg_key + (coverage,)
        try:
            axes_key = tuple(int(r[a]) for a in rate_axes)
        except KeyError as exc:
            raise ValueError(
                f"sheet {ws.title!r} row for {key!r} is missing axis "
                f"column {exc.args[0]!r} (header declares rate axes {rate_axes!r})"
            ) from None
        by_key.setdefault(key, []).append((axes_key, float(r["factor"])))
    factors = {
        key: _build_rate_callable(rate_axes, entries, ws.title,
                                  "/".join(map(str, key)))
        for key, entries in by_key.items()
    }
    return factors, seg_axes


def _propagate_table_id(wrapper, inner, modifier_tag):
    """Carry the source table_id from an inner rate callable to a wrapper."""
    tid = getattr(inner, "_fcf_table_id", None)
    if tid is None:
        return
    wrapper._fcf_table_id = tid
    wrapper._fcf_sheet = getattr(inner, "_fcf_sheet", None)
    wrapper._fcf_modifiers = getattr(inner, "_fcf_modifiers", ()) + (modifier_tag,)


def _with_improvement(rate_fn, improvement_curve):
    """Wrap a rate callable to multiply by an annual improvement factor.

    ``improvement_curve`` is a ``(n_years,)`` array indexed by policy year
    (= duration). ``factor[0] = 1.0`` typically, decreasing for genuine
    improvement (mortality falls). Held flat past the curve's end.
    ``None`` returns ``rate_fn`` unchanged.
    """
    if rate_fn is None or improvement_curve is None:
        return rate_fn
    n = improvement_curve.shape[0]

    def improved(sex, issue_age, duration, issue_class, elapsed):
        d = np.asarray(duration, dtype=np.int64)
        idx = np.clip(d, 0, n - 1)
        return (rate_fn(sex, issue_age, duration, issue_class, elapsed)
                * improvement_curve[idx])
    _propagate_table_id(improved, rate_fn, "improvement")
    return improved


def _with_ae_factor(rate_fn, factor_fn):
    """Wrap a rate callable to multiply by an A/E factor at call time.

    ``factor_fn`` shares the ``(sex, issue_age, duration) -> array``
    signature; ``None`` (no factor configured for this coverage) returns
    ``rate_fn`` unchanged.
    """
    if factor_fn is None or rate_fn is None:
        return rate_fn

    def adjusted(sex, issue_age, duration, issue_class, elapsed):
        return (rate_fn(sex, issue_age, duration, issue_class, elapsed)
                * factor_fn(sex, issue_age, duration, issue_class, elapsed))
    _propagate_table_id(adjusted, rate_fn, "ae")
    return adjusted


def _with_age_shift(rate_fn, shift):
    """Wrap a rate callable to shift its ``issue_age`` argument by ``shift``.

    A positive shift treats every life as ``shift`` years older when looking
    up the base table; negative shifts make them younger. Returns ``rate_fn``
    unchanged when ``shift == 0`` (no allocation cost). ``rate_fn`` may be
    ``None`` (an optional rate the segment did not configure), in which case
    the wrapper is a no-op too.
    """
    if rate_fn is None or shift == 0:
        return rate_fn

    def shifted(sex, issue_age, duration, issue_class, elapsed):
        return rate_fn(sex, issue_age + shift, duration, issue_class, elapsed)
    _propagate_table_id(shifted, rate_fn, f"shift{shift:+d}")
    return shifted


def _surrender_value_col(ws) -> str:
    """Detect the surrender table's value column. ``amount`` -- a surrender
    amount by duration (per policy or per unit of the MP's
    surrender_base_amount -> an amount mode); ``factor`` -- a factor on
    cumulative premium (cum_premium_factor, the legacy default)."""
    header = next(ws.iter_rows(min_row=1, max_row=1, values_only=True))
    cols = {str(c) for c in header if c is not None}
    return "amount" if "amount" in cols else "factor"


def _axis_tables(ws, axis, *, value_col="rate"):
    """``{table_id: value array}`` from a sheet keyed by ``axis`` (0-based).

    ``value_col`` names the column carrying the per-axis value -- ``"rate"``
    for rate / probability sheets, ``"amount"`` for currency sheets
    (maintenance expense). The column-name distinction documents units;
    a probability and a currency amount should not share a column name.
    """
    by_id: dict[str, dict] = {}
    first = True
    for r in _sheet_dicts(ws):
        if first:
            _require_row_cols(
                r, ("table_id", axis, value_col), sheet=ws.title,
            )
            first = False
        tid = str(r["table_id"]).strip()
        try:
            by_id.setdefault(tid, {})[int(r[axis])] = float(r[value_col])
        except KeyError as exc:
            raise ValueError(
                f"sheet {ws.title!r} table {tid!r}: row is missing "
                f"column {exc.args[0]!r} (row has columns {sorted(r)})"
            ) from None
    return {tid: np.asarray([by_k[k] for k in range(len(by_k))], np.float64)
            for tid, by_k in by_id.items()}


# Recognised assumption-slot columns on the segments sheet. Every *other*
# column is a routing axis -- product, channel by convention, but a
# workbook may use any axes (just channel for a pricing run, or
# product x channel x risk_class). Detection mirrors the policies-frame
# attributes rule.
_SEGMENT_ASSUMPTION_COLS = frozenset({
    "mortality_table", "mortality_improvement_table", "lapse_table",
    "waiver_table", "surrender_value_table", "discount_table", "expense_table",
    "inflation_table",
    "mortality_age_shift", "morbidity_age_shift", "waiver_age_shift",
    "ra_confidence", "mortality_cv", "morbidity_cv", "longevity_cv",
    "disability_cv", "expense_cv", "cost_of_capital_rate", "investment_return",
    "fund_fee", "ra_method", "state_model", "surrender_value_basis",
})


class SegmentedBasis(dict):
    """A ``{segment-key: Basis}`` dict that remembers its segment axis names.

    Returned by :func:`read_basis`. A plain ``dict`` everywhere it is used;
    :func:`~fastcashflow.gmm.measure` reads ``segment_axes`` to route without the
    caller re-passing ``segment_by``. ``segment_axes`` are the segments-sheet
    columns that are not assumption slots -- ``("product", "channel")``
    by default, but any axes the workbook declares.
    """

    def __init__(self, *args, segment_axes=("product", "channel"), **kw):
        super().__init__(*args, **kw)
        self.segment_axes = tuple(segment_axes)


[문서] def read_basis(path: Path | str) -> "SegmentedBasis": """Read the basis workbook into a per-segment ``Basis`` dict. ``path`` is a single ``basis.xlsx`` workbook holding both the rate tables and the segment mapping (see the module header for the sheet layout). The ``segments`` sheet maps each (product, channel) to which tables it uses plus scalar parameters, with a ``_DEFAULTS`` row whose values blank cells inherit; the ``coverages`` sheet attaches rate-driven coverages to products. Returns a :class:`SegmentedBasis` (a ``dict`` subclass) keyed by the segment axes -- ``(product, channel)`` by default, or whatever non-assumption columns the segments sheet declares (one axis, or three); ``.segment_axes`` records the axis names so :func:`~fastcashflow.gmm.measure` routes without a ``segment_by`` argument. v1: the discount and inflation tables are read but used flat (their first entry); the per-segment dict is returned for the caller to value segment by segment. """ wb = openpyxl.load_workbook(path, data_only=True) def optional(sheet, reader): return reader(wb[sheet]) if sheet in wb.sheetnames else {} mortality_t = _flex_rate_table(_require_sheet(wb, "mortality_tables")) incidence_rate_t = optional("incidence_rate_tables", _flex_rate_table) waiver_t = optional("waiver_tables", _flex_rate_table) lapse_t = _flex_rate_table(_require_sheet(wb, "lapse_tables")) discount_t = _axis_tables(_require_sheet(wb, "discount_tables"), "year") inflation_t = optional( "inflation_tables", lambda w: _axis_tables(w, "year"), ) if "ae_factors" in wb.sheetnames: ae_factors, ae_axes = _read_ae_factors(wb["ae_factors"]) else: ae_factors, ae_axes = {}, () improvement_t = optional( "improvement_tables", lambda w: _axis_tables(w, "year", value_col="factor"), ) # Surrender value curves -- per-duration value, read from whichever # value column the sheet carries: ``amount`` (a currency amount, an # amount_per_* basis) or ``factor`` (a ratio on cumulative premium, the # cum_premium_factor basis). Optional; absent means lapse has no payout. # ``surrender_col_kind`` records which column was read so each segment's # ``surrender_value_basis`` can be checked against it (a factor read as # an amount, or vice versa, would silently mis-measure). surrender_col_kind = ( _surrender_value_col(wb["surrender_value_tables"]) if "surrender_value_tables" in wb.sheetnames else None ) surrender_t = optional( "surrender_value_tables", lambda w: _axis_tables(w, "duration_month", value_col=_surrender_value_col(w)), ) # Expense ledger -- item form. Optional; per-segment ``expense_table`` # in the segments sheet selects which table_id to attach. expense_t = optional("expense_tables", _read_expense_tables) defaults: dict = {} segments: list = [] seg_rows = list(_sheet_dicts(_require_sheet(wb, "segments"))) # Routing axes = every segments-sheet column that is not an assumption slot # and not a display label (``*_name``, report-only) -- order-independent, so # an axis column can sit anywhere among the assumption columns. The key tuple # reads them in column order; the default (no extra columns) is # (product, channel). axis_cols = tuple( c for c in (seg_rows[0].keys() if seg_rows else ()) if c not in _SEGMENT_ASSUMPTION_COLS and not str(c).endswith("_name") ) # An A/E axis that is not a segments-sheet column can never match a segment, # so the A/E would be silently discarded -- reject it up front. if ae_axes and seg_rows: seg_header = list(seg_rows[0].keys()) unknown = [a for a in ae_axes if a not in seg_header] if unknown: raise ValueError( f"ae_factors sheet keys on axis column(s) {unknown} that are " "not in the segments sheet; the A/E would never match a segment " "and be silently discarded. Use segments-sheet axis columns " f"(have: {[c for c in seg_header if c in axis_cols]})." ) if seg_rows: header = set(seg_rows[0].keys()) for new, legacy in (("product", "product_code"), ("channel", "channel_code")): if new not in header and legacy in header: raise ValueError( f"segments sheet has column {legacy!r} but not {new!r} " f"-- did you mean {new!r}? (the routing axes are now the " "bare keys 'product' / 'channel', no '_code' suffix)" ) for r in seg_rows: if str(r.get("product", "") or "").strip().lower() == "_defaults": defaults = r else: segments.append(r) # Coverages registry -- global, one row per coverage. The same code # plugs into any segment's contracts (a HEALTH policy and a TERM_LIFE # policy that both attach `CANCER` share the same incidence rate). When # a company genuinely needs product-specific calibrations of the same # disease, give them different coverage_codes (e.g. CANCER_HEALTH vs # CANCER_WHOLELIFE) -- the engine then treats them as separate coverages. # # Plan B (3-file split): this sheet carries only ``coverage`` + # ``rate_table`` -- the rate-driven entries. The pattern taxonomy # (DEATH / MORBIDITY / DIAGNOSIS / ANNUITY / MATURITY) moves to a # separate ``calculation_methods.csv`` file consumed by # :func:`read_model_points`, so the basis workbook is purely the # actuarial basis and the company catalogue lives elsewhere. Survival # entries (ANNUITY, MATURITY) never carry a ``rate_table`` and so do # not appear here. A death coverage's ``rate_table`` cell may point to # either an ``incidence_rate_tables`` entry or a ``mortality_tables`` # entry: the engine reads the table as a per-coverage payment rate # independently of how the same id is also used for in-force decrement. rate_driven_coverages: list[tuple[str, str]] = [] if "coverages" in wb.sheetnames: for r in _sheet_dicts(wb["coverages"]): rt = r.get("rate_table") code = str(r["coverage"]).strip() if rt in (None, ""): raise ValueError( f"coverages row {code!r} has no rate_table; the " "basis workbook only lists rate-driven coverages " "(survival entries belong in calculation_methods.csv, " "not here)" ) rate_driven_coverages.append((code, str(rt).strip())) result = {} for seg in segments: product = str(seg.get("product", "") or "").strip() channel = str(seg.get("channel", "") or "").strip() seg_key = tuple(str(seg.get(c, "") or "").strip() for c in axis_cols) where = f"segments row {seg_key}" def cell(col): v = seg.get(col) if v is None or (isinstance(v, str) and not v.strip()): v = defaults.get(col) return None if (isinstance(v, str) and not v.strip()) else v def lookup(registry, col, optional_ref=False): tid = cell(col) tid = str(tid).strip() if tid is not None else None if tid is None: if optional_ref: return None raise ValueError(f"{where}: {col!r} is required") if tid not in registry: raise ValueError(f"{where}: {col}={tid!r} is not registered") return registry[tid] def scalar(col, required=False): v = cell(col) if v is None and required: raise ValueError(f"{where}: {col!r} is required") return None if v is None else float(v) shift_mort = int(scalar("mortality_age_shift") or 0) shift_morb = int(scalar("morbidity_age_shift") or 0) shift_wvr = int(scalar("waiver_age_shift") or 0) def ae(coverage): ae_key = tuple(str(seg.get(a, "") or "").strip() for a in ae_axes) + (coverage,) return ae_factors.get(ae_key) coverage_list = [] for code, rate_table in rate_driven_coverages: # Death coverages share the mortality_tables namespace -- a # rate_table cell may name either an incidence_rate_tables or # a mortality_tables entry. Incidence wins on collision (the # convention rare in practice; calibrating the same code with # different tables under the same name is a workbook error). if rate_table in incidence_rate_t: rate_fn = incidence_rate_t[rate_table] shift = shift_morb elif rate_table in mortality_t: rate_fn = mortality_t[rate_table] shift = shift_mort else: raise ValueError( f"coverage {code!r} of product {product!r}: " f"rate_table {rate_table!r} is not registered. " f"incidence_rate_tables has " f"{_truncate_list(sorted(incidence_rate_t))}; " f"mortality_tables has " f"{_truncate_list(sorted(mortality_t))}" ) rate_fn = _with_age_shift(rate_fn, shift) rate_fn = _with_ae_factor(rate_fn, ae(code)) coverage_list.append(CoverageRate(code=code, rate=rate_fn)) mortality_fn = lookup(mortality_t, "mortality_table") mortality_fn = _with_age_shift(mortality_fn, shift_mort) improvement_curve = lookup( improvement_t, "mortality_improvement_table", optional_ref=True, ) mortality_fn = _with_improvement(mortality_fn, improvement_curve) waiver = lookup(waiver_t, "waiver_table", optional_ref=True) waiver_fn = _with_age_shift(waiver, shift_wvr) surrender_curve = lookup( surrender_t, "surrender_value_table", optional_ref=True, ) # Row-form expense ledger -- the segments row points an # ``expense_table`` cell at one entry of the ``expense_tables`` # sheet. Blank cell = no expense (empty ``expense_items`` tuple, # zero-expense projection). expense_items = lookup(expense_t, "expense_table", optional_ref=True) # Global economic inflation -- the segments row points an # ``inflation_table`` cell at one named scenario in the # ``inflation_tables`` sheet (analogous to ``discount_table``). # Blank cell = zero inflation (recurring expense items stay flat). inflation_curve = lookup( inflation_t, "inflation_table", optional_ref=True, ) kwargs: dict = dict( mortality_annual=mortality_fn, lapse_annual=lookup(lapse_t, "lapse_table"), waiver_incidence_annual=waiver_fn, # Pass the full per-year discount through -- the engine # expands it to a per-month curve via fastcashflow.curves. # A one-row table reproduces the flat-scalar behaviour. discount_annual=lookup(discount_t, "discount_table"), expense_items=expense_items or (), expense_inflation=( inflation_curve if inflation_curve is not None else 0.0 ), ra_confidence=scalar("ra_confidence", required=True), mortality_cv=scalar("mortality_cv", required=True), coverages=tuple(coverage_list), surrender_value_curve=surrender_curve, ) for opt_col in ("morbidity_cv", "longevity_cv", "disability_cv", "expense_cv", "cost_of_capital_rate", "investment_return", "fund_fee"): v = scalar(opt_col) if v is not None: kwargs[opt_col] = v method = cell("ra_method") if method is not None: kwargs["ra_method"] = str(method).strip() # Optional surrender_value_basis column -- how surrender_value_curve is # read: "cum_premium_factor" (default), "amount_per_policy", or # "amount_per_unit" (the latter needs a surrender_base_amount column on # the policies). Blank cell leaves the Basis default (cum_premium_factor). surr_basis = cell("surrender_value_basis") if surr_basis is not None: kwargs["surrender_value_basis"] = str(surr_basis).strip() # The surrender curve is read from one column kind (amount vs factor) # for the whole sheet, but each segment names how to interpret it via # surrender_value_basis. A mismatch -- an ``amount`` column used as a # cum_premium factor, or a ``factor`` column used as an amount -- # silently mis-measures the surrender cash flow, so reject it here. if surrender_curve is not None and surrender_col_kind is not None: eff_basis = kwargs.get("surrender_value_basis", "cum_premium_factor") basis_wants_amount = eff_basis in ("amount_per_policy", "amount_per_unit") col_is_amount = surrender_col_kind == "amount" if basis_wants_amount != col_is_amount: raise ValueError( f"{where}: surrender_value_basis={eff_basis!r} expects a " f"{'amount' if basis_wants_amount else 'factor'} column " f"but surrender_value_tables carries a " f"{surrender_col_kind!r} column. Use a 'factor' column " "for cum_premium_factor, or an 'amount' column for " "amount_per_policy / amount_per_unit." ) # Optional state_model column -- non-programmer actuary picks a # bundled topology by its registry key (e.g. "WAIVER"). Blank cell # leaves Basis.state_model = None; an unknown key is an # error with a hint listing the registered keys. state_model_key = cell("state_model") if state_model_key is not None: key = str(state_model_key).strip() try: kwargs["state_model"] = STATE_MODELS[key] except KeyError: raise ValueError( f"{where}: state_model={key!r} is not in STATE_MODELS " f"(known: {sorted(STATE_MODELS)})" ) from None result[seg_key] = Basis(**kwargs) return SegmentedBasis(result, segment_axes=axis_cols or ("product", "channel"))
# --------------------------------------------------------------------------- # Model points -- built from policies + coverages frames # --------------------------------------------------------------------------- def _read_state(col: pl.Series) -> np.ndarray: """Convert a model-point ``state`` column to engine state codes. Accepts the readable names a practitioner edits in a spreadsheet -- ``active`` / ``waiver`` / ``paidup`` -- or the integer codes directly. Case, spaces, hyphens and underscores are ignored, so ``Paid-up`` and ``paid up`` read the same. A blank cell means an ordinary active contract. """ if col.dtype == pl.String: # Normalised lookup -- canonical STATE_NAMES keys ("ACTIVE", "WAIVER", # "PAIDUP") are uppercase, but any spelling (case, spaces, hyphens, # underscores ignored) of the canonical name maps to the same code. normalised = { k.lower().replace("_", "").replace("-", "").replace(" ", ""): v for k, v in STATE_NAMES.items() } out = np.empty(len(col), dtype=np.int64) for i, v in enumerate(col): name = "" if v is None else str(v).strip().lower() name = name.replace(" ", "").replace("-", "").replace("_", "") if name == "": out[i] = STATE_ACTIVE elif name in normalised: out[i] = normalised[name] else: raise ValueError( f"unknown contract state {v!r}; " f"expected one of {sorted(STATE_NAMES)}" ) return out out = col.fill_null(STATE_ACTIVE).to_numpy().astype(np.int64) valid = set(STATE_NAMES.values()) bad = sorted(set(int(v) for v in out) - valid) if bad: raise ValueError( f"state column has unknown integer value(s) {bad}; " f"expected one of {sorted(valid)} (see STATE_NAMES)" ) return out def _warn_if_elapsed_months(columns) -> None: """Warn that ``elapsed_months`` on a policies frame is silently dropped. The static-spec policies frame holds inception-time facts (issue_age, term, sex ...). The in-force closing state lives in a separate ``inforce_state`` file -- :func:`read_inforce_state` is the only surface that fills the ``elapsed_months`` field of :class:`ModelPoints`. A column on the policies side is a common mistake (mixed roles) and would be silently ignored; the warning makes the source-of-truth boundary explicit. """ if "elapsed_months" in columns: warnings.warn( "policies frame has 'elapsed_months' column; this reader " "ignores it. elapsed_months belongs in the in-force state file " "(see read_inforce_state / apply_inforce_state) -- the policies " "frame is the inception-time static spec.", UserWarning, stacklevel=3, ) def _parse_calculation_methods(path: Path | str) -> dict[str, CalculationMethod]: """Read a ``calculation_methods.csv`` taxonomy file into a dict. The file has two required columns -- ``coverage`` and ``calculation_method``. Any other column (e.g. a human-friendly label) is ignored, since the engine routes by the bare ``coverage`` key. Returns ``{coverage: CalculationMethod}``. Raises :class:`ValueError` for an unknown pattern (V1) and a duplicate code (V2); the messages name the offending row so the operator can fix the file without scrolling through it. """ df = _read_frame(path) if "coverage" not in df.columns and "coverage_code" in df.columns: raise ValueError( "the calculation_methods file has column 'coverage_code' but not " "'coverage' -- the coverage key is now the bare 'coverage' " "(no '_code' suffix)" ) for need in ("coverage", "calculation_method"): if need not in df.columns: raise ValueError( f"the calculation_methods file is missing required column " f"{need!r}" ) result: dict[str, CalculationMethod] = {} valid = ", ".join(p.value for p in CalculationMethod) for row in df.iter_rows(named=True): code = str(row["coverage"]).strip() raw = str(row["calculation_method"]).strip() try: pattern = CalculationMethod(raw) except ValueError as exc: raise ValueError( f"calculation_methods row {code!r}: calculation_method={raw!r} " f"is not one of {{{valid}}}" ) from exc if code in result: raise ValueError( f"calculation_methods row {code!r}: duplicate coverage " "(every code may appear exactly once in the taxonomy)" ) result[code] = pattern return result # Policies-frame columns the engine recognises as fields (or the mp_id join # key). Every *other* column on the policies frame is a grouping attribute -- # portfolio_id, profitability_group, risk_class, region, campaign_id, ... -- # read into ModelPoints.attributes for group()/group_of_contracts. _POLICY_RESERVED_COLS = frozenset({ "mp_id", "issue_age", "term_months", "premium", "sex", "count", "state", "issue_class", "elapsed_months", "issue_date", "premium_term_months", "premium_frequency_months", "annuity_frequency_months", "disability_income", "disability_benefit", "account_value", "minimum_crediting_rate", "minimum_death_benefit", "minimum_accumulation_benefit", "surrender_base_amount", "contract_boundary_months", "product", "channel", }) # The columns ``read_vfa_model_points`` recognises (its allow-list plus the # required fields). A column outside this set is ignored, so a typo is dropped. _VFA_POLICY_COLS = frozenset({ "mp_id", "issue_age", "term_months", "premium", "state", "sex", "count", "premium_term_months", "premium_frequency_months", "annuity_frequency_months", "maturity_benefit", "annuity_payment", "disability_income", "disability_benefit", "account_value", "minimum_crediting_rate", "minimum_death_benefit", "minimum_accumulation_benefit", "surrender_base_amount", "contract_boundary_months", "product", "channel", }) def _within_edit_distance_1(a: str, b: str) -> bool: """True if ``a`` and ``b`` differ by at most one insert / delete / substitute (a Levenshtein distance <= 1) -- a cheap typo detector.""" if a == b: return True la, lb = len(a), len(b) if abs(la - lb) > 1: return False if la == lb: # one substitution return sum(x != y for x, y in zip(a, b)) == 1 if la > lb: # make ``a`` the shorter a, b = b, a i = j = 0 skipped = False while i < len(a) and j < len(b): # one insertion in the longer if a[i] == b[j]: i += 1 j += 1 elif skipped: return False else: skipped = True j += 1 return True def _warn_near_reserved_columns(columns, reserved, *, context: str) -> None: """Warn for a column that looks like a typo of a recognised engine field -- a case-only difference or one edit away. Such a column is silently dropped (read as a grouping attribute, or ignored), so ``coun`` -> ``count`` would take its default (1) with no error. A warning, not an error, so a genuine attribute that happens to sit near a reserved name still reads.""" for col in columns: c = str(col) if c in reserved or c.startswith("_"): continue cl = c.lower() for r in reserved: if _within_edit_distance_1(cl, r.lower()): warnings.warn( f"{context}: column {c!r} looks like a typo of the field " f"{r!r}; it is read as a grouping attribute, not as {r!r} " f"(so {r!r} takes its default). Rename it if you meant {r!r}.", UserWarning, stacklevel=3, ) break def _model_points_from_frames(pol: pl.DataFrame, cov: pl.DataFrame, calculation_methods=None) -> ModelPoints: """Build a ``ModelPoints`` from a policies + coverages pair. The rate-driven coverage order is taken from the ``calculation_methods`` catalogue, so the portfolio is read without the actuarial basis. The engine aligns its coverages to that order at measure time. """ if calculation_methods is None: raise ValueError( "model points need the calculation_methods taxonomy -- " "the per-code pattern routes survival rows (ANNUITY / MATURITY) " "to scalar fields and rate-driven rows to the coverage CSR. " "Pass a calculation_methods.csv path to read_model_points." ) for need in ("mp_id", "issue_age", "term_months"): if need not in pol.columns: raise ValueError( f"the policies frame is missing required column {need!r}" ) # A leftover *_code column from the pre-rename schema would otherwise be # absorbed as a grouping attribute, leaving product / channel empty and # silently mis-routing under a segmented basis. Fail with the same hint # read_basis gives. for new, legacy in (("product", "product_code"), ("channel", "channel_code")): if new not in pol.columns and legacy in pol.columns: raise ValueError( f"the policies frame has column {legacy!r} but not {new!r} " f"-- the routing axes are now the bare keys 'product' / " f"'channel' (no '_code' suffix)" ) if "coverage" not in cov.columns and "coverage_code" in cov.columns: raise ValueError( "the coverages frame has column 'coverage_code' but not 'coverage' " "-- the coverage key is now the bare 'coverage' (no '_code' suffix)" ) for need in ("mp_id", "coverage", "amount"): if need not in cov.columns: raise ValueError( f"the coverages frame is missing required column {need!r}" ) # An empty coverages frame otherwise fails cryptically at the policies # join (the all-null mp_id column infers a string dtype that does not # match the integer policies key). Every model point needs at least one # coverage row -- the benefit amounts and per-coverage premium live there. if cov.height == 0: raise ValueError( "the coverages frame is empty (0 rows); every model point needs " "at least one coverage row" ) _warn_if_elapsed_months(pol.columns) n_mp = pol.height if n_mp == 0: raise ValueError( "the policies frame is empty (0 rows); there is nothing to measure" ) # mp_id uniqueness -- a duplicate id would fan out the coverages join # (one-to-many) and silently inflate per-policy benefits. if pol["mp_id"].n_unique() != n_mp: dups = (pol.group_by("mp_id").len() .filter(pl.col("len") > 1)["mp_id"].to_list()) raise ValueError( f"policies frame has duplicate mp_id value(s) {dups[:10]}" f"{' (...)' if len(dups) > 10 else ''} -- mp_id must be unique" ) # Premium double-source -- if both the coverages frame's ``premium`` # column and the policies frame's ``premium`` are present, the # cov-side branch silently wins below. Reject up front so the operator # picks one source. if "premium" in cov.columns and "premium" in pol.columns: raise ValueError( "premium is specified twice -- 'premium' in the coverages " "frame and 'premium' in the policies frame. Pick one: " "the coverages-side column sums per coverage to the policy, " "the policies-side column is a flat per-policy amount." ) # Coverage-rule columns -- waiting / reduction_end / reduction_factor # must arrive together. A reduction_factor without a reduction_end is # silently inert (the factor applies for ``t < 0`` months, i.e. never) # and almost certainly a user oversight. has_rend = "reduction_end" in cov.columns has_rfac = "reduction_factor" in cov.columns if has_rfac and not has_rend: raise ValueError( "coverages frame has 'reduction_factor' without 'reduction_end' " "-- the factor would never fire (reduction_end defaults to 0). " "Add a reduction_end column (months) or drop the factor column." ) ctypes = {k: CalculationMethod(v) for k, v in calculation_methods.items()} # Rate-driven coverage order comes from the *catalogue* (calculation_methods), # not the basis -- so reading the portfolio needs no basis. # coverage_index integers index this order; the engine aligns # Basis.coverages to it at measure time (coverage.align_coverages). # Only the rate-driven codes that actually appear in this portfolio are # kept, in catalogue order. present_codes = set(cov["coverage"].to_list()) rate_driven_codes = [c for c, m in ctypes.items() if m in RATE_DRIVEN_METHODS and c in present_codes] code_to_cov_idx = {c: i for i, c in enumerate(rate_driven_codes)} # Resolve every coverage row to its policy index and coverage type. pol = pol.with_row_index("_mp") cmap = pl.DataFrame({ "coverage": list(ctypes.keys()), "_type": [str(v) for v in ctypes.values()], "_cov_idx": [code_to_cov_idx.get(c, -1) for c in ctypes], }) cov = (cov.join(pol.select("mp_id", "_mp"), on="mp_id", how="left") .join(cmap, on="coverage", how="left")) if cov["_mp"].null_count(): bad = sorted({v for v in cov.filter(pl.col("_mp").is_null()) ["mp_id"].to_list() if v is not None}) raise ValueError( f"coverages frame references {len(bad)} unknown mp_id " f"value(s) not present in the policies frame: " f"{_truncate_list(bad)}" ) if cov["_type"].null_count(): bad = sorted({v for v in cov.filter(pl.col("_type").is_null()) ["coverage"].to_list() if v is not None}) raise ValueError( f"coverages frame references {len(bad)} coverage " f"value(s) not in the calculation_methods taxonomy: " f"{_truncate_list(bad)}" ) mp = cov["_mp"].to_numpy() ctype = cov["_type"].to_numpy() cov_idx = cov["_cov_idx"].to_numpy().astype(np.int64) amount = cov["amount"].to_numpy().astype(np.float64) fields: dict[str, object] = dict( issue_age=pol["issue_age"].to_numpy(), term_months=pol["term_months"].to_numpy(), ) for opt in ("sex", "count", "premium_term_months", "premium_frequency_months", "annuity_frequency_months", "disability_income", "disability_benefit", "issue_class", "surrender_base_amount", "contract_boundary_months"): if opt in pol.columns: fields[opt] = pol[opt].to_numpy() for opt in ("product", "channel"): if opt in pol.columns: fields[opt] = pol[opt].to_numpy() if "issue_date" in pol.columns: fields["issue_date"] = pol["issue_date"].to_numpy() if "state" in pol.columns: fields["state"] = _read_state(pol["state"]) # Any policies column that is not a recognised engine field is a grouping # attribute (portfolio_id, profitability_group, risk_class, region, ...) -- # one value per policy = one per model point. Available to group() / # group_of_contracts via ModelPoints.axis. attributes = {c: pol[c].to_numpy() for c in pol.columns if c not in _POLICY_RESERVED_COLS and not str(c).startswith("_")} # A column one edit away from a reserved field is almost certainly a typo # (``coun`` -> count, which would otherwise default to 1 = a 1000x error). _warn_near_reserved_columns(attributes, _POLICY_RESERVED_COLS, context="read_model_points") if attributes: fields["attributes"] = attributes # Carry mp_id (the contract identity) as a dedicated field so # apply_inforce_state can join the period-close state on it instead of # trusting row order. It is a label, never read by the kernel. fields["mp_id"] = pol["mp_id"].to_numpy() def _by_policy(mask) -> np.ndarray: return np.bincount(mp[mask], weights=amount[mask], minlength=n_mp) fields["maturity_benefit"] = _by_policy(ctype == CalculationMethod.MATURITY) fields["annuity_payment"] = _by_policy(ctype == CalculationMethod.ANNUITY) # Premium -- the coverages frame carries it per coverage; sum to the policy. if "premium" in cov.columns: prem = cov["premium"].fill_null(0.0).to_numpy().astype(np.float64) fields["premium"] = np.bincount(mp, weights=prem, minlength=n_mp) elif "premium" in pol.columns: fields["premium"] = pol["premium"].to_numpy() else: # Neither source provided -- premium is silently zero. A genuine # paid-up portfolio is one valid case; a forgotten column is the # other. Warn so the latter doesn't slip through. warnings.warn( "model points have no premium source -- neither " "'premium' on the coverages frame nor 'premium' on the " "policies frame was found. premium defaults to zero; " "if this portfolio is not fully paid-up, add the column.", UserWarning, stacklevel=3, ) fields["premium"] = np.zeros(n_mp) # Coverage list: the rate-driven coverages (codes 0..n-1 indexing # ``coverage_codes`` below). annuity / maturity are survival scalars and # not part of the CSR. Every rate-driven present code is in # ``rate_driven_codes`` by construction, so ``cov_idx >= 0`` here; a code # absent from the catalogue was already rejected (the ``_type`` null # check above). Whether the basis register a rate for each code is # checked at measure time (coverage.align_coverages, the V4 guard). is_cov = np.isin(ctype, RATE_DRIVEN_METHODS) order = np.argsort(mp[is_cov], kind="stable") cov_mp = mp[is_cov][order] fields["coverage_index"] = cov_idx[is_cov][order] fields["coverage_amount"] = amount[is_cov][order] # Optional per-coverage benefit rules -- a waiting period and a # reduced-benefit period, each CSR-aligned with coverage_index. for col, field, default in (("waiting", "coverage_waiting", 0), ("reduction_end", "coverage_reduction_end", 0), ("reduction_factor", "coverage_reduction_factor", 1.0), ("step_month", "coverage_step_month", 0), ("step_factor", "coverage_step_factor", 1.0), ("escalation_annual", "coverage_escalation_annual", 0.0), ("escalation_cap", "coverage_escalation_cap", 0.0)): if col in cov.columns: rule = cov[col].fill_null(default).to_numpy() fields[field] = rule[is_cov][order] fields["coverage_offset"] = np.concatenate(( np.zeros(1, np.int64), np.cumsum(np.bincount(cov_mp, minlength=n_mp), dtype=np.int64), )) fields["calculation_methods"] = ctypes # The catalogue order the coverage_index integers were built against. # The engine aligns Basis.coverages to this at measure time. fields["coverage_codes"] = tuple(rate_driven_codes) return ModelPoints(**fields)
[문서] def read_model_points( path: Path | str, coverages: Path | str | None = None, calculation_methods: Path | str | dict[str, CalculationMethod] | None = None, ) -> ModelPoints: """Read model points from a parquet, CSV, Excel or feather file. Reads the portfolio **without any basis** -- the model points and the actuarial basis are separate inputs. The basis enters only at the engine call (``measure``), which aligns its coverages to the portfolio's coverage order. The portfolio is two frames -- a policies frame plus a coverages frame: * a policies frame (``mp_id``, ``issue_age``, ``term_months``, optional ``sex`` / ``count`` / ``state`` / ``issue_class`` / ``issue_date`` / ``premium`` / ``premium_term_months`` / ``premium_frequency_months`` / ``annuity_frequency_months`` / ``contract_boundary_months`` / ``product`` / ``channel``), one row per policy. Any *other* column is read as a grouping attribute (``portfolio_id``, ``profitability_group``, ``risk_class``, ``region``, ...) into :attr:`ModelPoints.attributes`, for :func:`~fastcashflow.group` / :func:`~fastcashflow.group_of_contracts`; * a coverages frame (``mp_id``, ``coverage``, ``amount``, optional ``premium`` / ``waiting`` / ``reduction_end`` / ``reduction_factor`` and the benefit step-up / escalation columns ``step_month`` / ``step_factor`` (a benefit step at a duration) / ``escalation_annual`` / ``escalation_cap`` (annual compounding growth, capped -- the 체증형 escalating-benefits recipe in the cookbook)), one row per policy x coverage -- so per-coverage rules (waiting, reduction and escalation) ride along, which a flat one-row-per-policy file cannot carry. Pass them as ``read_model_points(policies, coverages=coverages_path, calculation_methods=...)``, or as a single ``.xlsx`` carrying ``policies`` and ``coverages`` sheets. The normalised two-frame shape mirrors a policy table joined to a coverage table -- the form data arrives in from a policy system. ``calculation_methods`` is the company taxonomy file (CSV / parquet / feather / xlsx) -- the third side of the split between *portfolio* (policies + coverages), *basis* (basis.xlsx) and *catalogue* (calculation_methods.csv). The policies frame is the **inception-time static spec** -- issue_age, term, sex, and so on. The in-force closing state (elapsed_months, prior_csm, lock_in_rate) belongs in a separate file read by :func:`read_inforce_state`. An ``elapsed_months`` column on the policies side is ignored and a :class:`UserWarning` is emitted; do not encode the as-of date by mixing it into the static spec. """ if isinstance(calculation_methods, (str, Path)): methods_dict = _parse_calculation_methods(calculation_methods) else: methods_dict = calculation_methods p = str(path) if coverages is None and p.endswith(".xlsx"): wb = openpyxl.load_workbook(p, read_only=True) sheets = wb.sheetnames wb.close() if "policies" in sheets and "coverages" in sheets: return _model_points_from_frames( pl.read_excel(p, sheet_name="policies", engine="openpyxl"), pl.read_excel(p, sheet_name="coverages", engine="openpyxl"), methods_dict, ) pol = _read_frame(path) if coverages is None: raise ValueError( f"{p!r} was read without a coverages frame. read_model_points " "needs a coverages frame: pass coverages=<path> (an mp_id / coverage / " "amount frame), or a single .xlsx carrying 'policies' and " "'coverages' sheets. A flat one-row-per-policy (wide) file cannot " "carry per-coverage waiting / reduction rules and is not accepted." ) return _model_points_from_frames( pol, _read_frame(coverages), methods_dict, )
[문서] def read_vfa_model_points( path: Path | str, *, calculation_methods: "Path | str | dict[str, CalculationMethod] | None" = None, ) -> ModelPoints: """Read the account-value base of variable (VFA) contracts from a policies file. This reads the part measured under the VFA model: the account value and its guarantee floors (GMDB / GMAB), all named policy columns (``account_value``, ``minimum_death_benefit``, ``minimum_accumulation_benefit``, ``minimum_crediting_rate``). That base carries no coverage-code coverages, so it is a single policies frame. ``issue_age`` and ``term_months`` are required; the named policy / account columns are read if present. Protection riders attached to a variable product (death / cancer / hospitalisation 특약) are separate coverages, read and measured on their own -- a policies + coverages book through :func:`read_model_points` (GMM). So a ``<coverage>_benefit`` column is rejected here: a coverage encoded as a column is the lossy wide form, and coverages belong in their own frame which can hold the per-coverage waiting / reduction rules a flat column cannot. """ df = _read_frame(path) named_benefit = {"maturity_benefit", "disability_benefit", "minimum_death_benefit", "minimum_accumulation_benefit"} coverage_cols = sorted(c for c in df.columns if c.endswith("_benefit") and c not in named_benefit) if coverage_cols: raise ValueError( f"read_vfa_model_points got coverage benefit column(s) " f"{coverage_cols} -- account-value (VFA) contracts carry no " "coverage-code coverages. For a product with coverages, use " "read_model_points with a coverages frame." ) for need in ("issue_age", "term_months"): if need not in df.columns: raise ValueError( f"the VFA policies file is missing required column {need!r}" ) _warn_if_elapsed_months(df.columns) n_mp = df.height fields: dict[str, object] = dict( issue_age=df["issue_age"].to_numpy(), term_months=df["term_months"].to_numpy(), premium=(df["premium"].to_numpy() if "premium" in df.columns else np.zeros(n_mp)), ) for opt in ("sex", "count", "premium_term_months", "premium_frequency_months", "annuity_frequency_months", "maturity_benefit", "annuity_payment", "disability_income", "disability_benefit", "account_value", "minimum_crediting_rate", "minimum_death_benefit", "minimum_accumulation_benefit", "surrender_base_amount", "contract_boundary_months", "product", "channel", "mp_id"): if opt in df.columns: fields[opt] = df[opt].to_numpy() if "state" in df.columns: fields["state"] = _read_state(df["state"]) # The VFA reader has no grouping-attribute catch-all -- an unrecognised # column is dropped entirely, so a typo'd ``account_valu`` would leave # account_value 0 and the guarantee floor becomes the whole payout. Warn # on a near-match the same way the GMM reader does. _warn_near_reserved_columns(df.columns, _VFA_POLICY_COLS, context="read_vfa_model_points") if isinstance(calculation_methods, (str, Path)): fields["calculation_methods"] = _parse_calculation_methods(calculation_methods) elif calculation_methods is not None: fields["calculation_methods"] = calculation_methods return ModelPoints(**fields)
[문서] def read_inforce_policies( path: Path | str, coverages: Path | str | None = None, calculation_methods: "Path | str | dict[str, CalculationMethod] | None" = None, ) -> "tuple[ModelPoints, InforceState]": """Read a single combined policies + in-force state file. A self-contained snapshot at a settlement date -- one file per valuation date, one row per surviving contract. Columns combine the permanent contract spec (``issue_age``, ``sex``, ``term_months``, premiums, benefits, ...) and the closing state from the prior period (``elapsed_months``, ``count``, ``prior_csm``, ``lock_in_rate``). This matches the Korean industry "보유계약 마감파일" pattern -- one self-contained snapshot per period, no separate state file to keep in sync. Returns a ``(ModelPoints, InforceState)`` tuple. The ``ModelPoints`` has the state's ``elapsed_months`` and ``count`` already folded in; the ``InforceState`` carries ``prior_csm`` and ``lock_in_rate`` for the in-force valuation call:: mp, state = fcf.read_inforce_policies( "inforce_2026Q1.csv", coverages="coverages.csv", calculation_methods="calculation_methods.csv", ) val = fcf.gmm.measure_inforce(mp, basis, state, period_months=3) For the two-file equivalent (separate ``policies.csv`` + ``inforce_state.csv``), see :func:`read_model_points` + :func:`read_inforce_state` + :func:`apply_inforce_state`. Both workflows produce valuation-ready inputs that give the same valuation: ``measure_inforce`` re-aligns the state by mp_id internally, so the answer does not depend on the state file's row order. The returned ``InforceState`` is itself row-aligned to the model points here; in the two-file path the state object keeps its file order (only the model points are reordered by :func:`apply_inforce_state`), so call :func:`align_inforce_state` before slicing or reading ``state.prior_csm`` directly. Pick the form that fits the company's extract pipeline. Required columns: ``mp_id``, ``elapsed_months``, ``count``, ``prior_csm``, ``lock_in_rate``, plus whatever the spec side of :func:`read_model_points` needs (``issue_age``, ``term_months``, optional ``sex``, premiums, ``<code>_benefit`` columns for wide form). Variance / movement analysis (:func:`roll_forward`, :func:`reconcile`) is unaffected -- mp_id-based matching across periods works the same regardless of which reader built each snapshot. """ from fastcashflow.modelpoints import ( InforceState, ModelPoints, apply_inforce_state, ) df = _read_frame(path) needed = ("mp_id", "elapsed_months", "count", "prior_csm", "lock_in_rate") for col in needed: if col not in df.columns: raise ValueError( f"the in-force policies file is missing required column " f"{col!r}. The combined file carries the policies spec " "plus the closing-state columns " "(elapsed_months, count, prior_csm, lock_in_rate)." ) lock = df["lock_in_rate"].to_numpy().astype(np.float64) if lock.size and not np.all(lock == lock[0]): raise NotImplementedError( "lock_in_rate must be uniform across rows in v1; per-MP " "(cohort-aware) lock-in rates are a future extension" ) state = InforceState( mp_id=df["mp_id"].to_numpy(), elapsed_months=df["elapsed_months"].to_numpy().astype(np.int64), count=df["count"].to_numpy().astype(np.float64), prior_csm=df["prior_csm"].to_numpy().astype(np.float64), lock_in_rate=float(lock[0]) if lock.size else 0.0, ) # Drop the state-only columns before handing the frame to the # standard policies reader, which would otherwise warn about # ``elapsed_months`` on a policies frame and ignore the rest. ``count`` # stays -- it is a valid policies column too, and ``apply_inforce_state`` # will overwrite it with the state value below anyway. spec_df = df.drop("elapsed_months", "prior_csm", "lock_in_rate") if isinstance(calculation_methods, (str, Path)): methods_dict = _parse_calculation_methods(calculation_methods) else: methods_dict = calculation_methods if coverages is None: raise ValueError( "read_inforce_policies needs a coverages frame: pass coverages=<path> " "(an mp_id / coverage / amount frame). A flat one-row-per-policy " "(wide) file cannot carry per-coverage waiting / reduction rules and " "is not accepted." ) mp = _model_points_from_frames( spec_df, _read_frame(coverages), methods_dict, ) mp = apply_inforce_state(mp, state) return mp, state
[문서] def sample_data_dir() -> Path: """Return the on-disk path of the bundled sample data directory. The directory contains ``sample_basis.xlsx``, ``sample_policies.csv`` and ``sample_coverages.csv`` -- the inputs behind :func:`load_sample_basis` and :func:`load_sample_model_points`. Use this to open the workbook in Excel and see what a complete fastcashflow input looks like before preparing your own. """ return Path(str(resources.files("fastcashflow") / "sample_data"))
def load_sample_basis() -> dict[tuple[str, str], Basis]: """Read fastcashflow's bundled sample basis workbook. A filled-in workbook packaged with the library, the companion to :func:`load_sample_model_points`. See :func:`read_basis` for the workbook format. The bundled sample has two segments (``("term_a", "GA")`` and ``("term_a", "FC")``); pick one to use it as a single ``Basis``. """ source = resources.files("fastcashflow") / "sample_data" / "sample_basis.xlsx" with resources.as_file(source) as path: return read_basis(path) def load_sample_calculation_methods() -> dict[str, CalculationMethod]: """Read fastcashflow's bundled sample calculation-method taxonomy. The companion to :func:`load_sample_basis` and :func:`load_sample_model_points` -- the company-level catalogue that maps each ``coverage`` to its :class:`CalculationMethod`. The same file format every portfolio uses (see :func:`read_model_points` ``calculation_methods`` argument). """ source = ( resources.files("fastcashflow") / "sample_data" / "sample_calculation_methods.csv" ) with resources.as_file(source) as path: return _parse_calculation_methods(path) def load_sample_model_points() -> ModelPoints: """Read fastcashflow's bundled sample portfolio. A small portfolio -- a policies file, a coverages file and the calculation-method taxonomy -- packaged with the library, so the engine can be tried without preparing an input file. See :func:`read_model_points` for the file format. The coverage order comes from the ``calculation_methods`` catalogue; no basis are needed to read the portfolio. """ patterns = load_sample_calculation_methods() base = resources.files("fastcashflow") / "sample_data" with resources.as_file(base / "sample_policies.csv") as policies, \ resources.as_file(base / "sample_coverages.csv") as coverages: return read_model_points( policies, coverages=coverages, calculation_methods=patterns, ) def load_sample_inforce_state() -> "InforceState": """Read fastcashflow's bundled sample in-force state. Aligned row-for-row with :func:`load_sample_model_points`. Pair with :func:`apply_inforce_state` to fold ``elapsed_months`` and ``count`` into the sample model points, then call :func:`fastcashflow.gmm.measure_inforce` with the returned :class:`InforceState`. """ base = resources.files("fastcashflow") / "sample_data" with resources.as_file(base / "sample_inforce_state.csv") as path: return read_inforce_state(path) def load_sample_vfa_basis() -> Basis: """Bundled VFA (variable, account-value) basis -- a single basis. Built from the sample ``TERM_LIFE_A`` / ``FC`` basis (same mortality, lapse and discount) with the protection coverages dropped and the two VFA economic inputs set: ``investment_return`` (the underlying-items return the account value grows at) and ``fund_fee`` (the variable fee the entity keeps, which is the source of the CSM). Pair with :func:`load_sample_vfa_model_points`; ``measure_vfa`` takes a single :class:`Basis`. """ source = resources.files("fastcashflow") / "sample_data" / "sample_vfa_basis.xlsx" with resources.as_file(source) as path: return read_basis(path)[("VAR_ANNUITY_A", "BANCA")] def load_sample_vfa_model_points() -> ModelPoints: """Bundled VFA sample -- variable annuities with minimum-rate, death and maturity guarantees. Three single-premium account-value contracts that share a 2% minimum credited rate (``minimum_crediting_rate``, uniform across the rows so the stochastic time-value pass applies) and differ in their floors: one carries both a death (GMDB) and a maturity (GMAB) guarantee, one a maturity floor, one a death floor. Pair with :func:`load_sample_vfa_basis`; generate underlying-return scenarios to value the time value of the guarantees (see ``examples/vfa.py``). """ source = resources.files("fastcashflow") / "sample_data" / "sample_vfa_policies.csv" with resources.as_file(source) as path: # Carry the coverage taxonomy so a basis's coverages align at measure # time, even though these account-value contracts hold no coverages. return read_vfa_model_points( path, calculation_methods=load_sample_calculation_methods(), ) def _drop_sample_table(filename: str, dest: Path | str) -> Path: """Drop a packaged single-table sample file at ``dest``, converting to whatever format ``dest`` 's extension picks (``.csv`` / ``.parquet`` / ``.feather`` / ``.arrow``). ``dest`` may be a file path (used as-is) or a directory (file lands inside with its original ``sample_*.csv`` name).""" src = resources.files("fastcashflow") / "sample_data" / filename dest_path = Path(dest) if dest_path.is_dir(): dest_path = dest_path / filename with resources.as_file(src) as src_path: if dest_path.suffix == src_path.suffix: # Same format as the source -- a byte-for-byte copy preserves # any formatting the workbook editor cared about. import shutil shutil.copy2(src_path, dest_path) else: # Different format -- read the source as a polars frame and # let _write_frame route to the right writer by extension. _write_frame(_read_frame(src_path), dest_path) return dest_path def _save_sample_basis(path: Path | str) -> Path: """Drop the packaged sample basis workbook on disk at ``path``. Use this to bootstrap a workbook a reader can open in Excel, inspect, and then re-read with :func:`read_basis` -- the same call shape a real user types against their own file. The bundled sample carries seven (product, channel) segments across three products (``TERM_LIFE_A``, ``HEALTH_A``, ``WHOLE_LIFE_A``). Supported extension: ``.xlsx`` (the workbook carries multiple sheets, so single-table formats like CSV are not appropriate here). ``path`` may be a file (the workbook lands there) or a directory (the workbook lands inside with its original ``sample_basis.xlsx`` name). Returns the resolved destination path. """ import shutil src = (resources.files("fastcashflow") / "sample_data" / "sample_basis.xlsx") dest_path = Path(path) if dest_path.is_dir(): dest_path = dest_path / "sample_basis.xlsx" if dest_path.suffix.lower() != ".xlsx": raise ValueError( f"_save_sample_basis: expected an .xlsx path, got " f"{str(path)!r}. The basis workbook carries multiple " "sheets (mortality_tables, lapse_tables, segments, ...) and " "single-table formats (csv / parquet / feather) cannot " "represent it. Use .xlsx." ) with resources.as_file(src) as src_path: shutil.copy2(src_path, dest_path) return dest_path def _save_sample_policies(path: Path | str) -> Path: """Drop the packaged sample policies file on disk at ``path``. The companion to :func:`_save_sample_coverages` and :func:`_save_sample_calculation_methods`. Use the three together with :func:`read_model_points` for a copy-paste workflow that mirrors how you would read your own files. Supported extensions: ``.csv``, ``.xlsx``, ``.parquet``, ``.feather`` / ``.arrow``. The conversion runs through polars when the requested extension differs from the packaged ``.csv`` source. ``.xlsx`` is capped at 1,048,576 rows per sheet -- for production-scale portfolios use ``.parquet`` or ``.feather``. """ return _drop_sample_table("sample_policies.csv", path) def _save_sample_coverages(path: Path | str) -> Path: """Drop the packaged sample coverages file on disk at ``path``. Coverage entries -- one row per (model point, coverage) -- the companion to :func:`_save_sample_policies`. A portfolio has roughly ``n_mp x avg_coverages_per_mp`` rows here, so this is the file most likely to exceed the 1,048,576 row cap of ``.xlsx``. Supported extensions: ``.csv``, ``.xlsx``, ``.parquet``, ``.feather`` / ``.arrow``. ``.parquet`` or ``.feather`` for production scale. """ return _drop_sample_table("sample_coverages.csv", path) def _save_sample_calculation_methods(path: Path | str) -> Path: """Drop the packaged sample calculation-method catalogue on disk at ``path``. The company catalogue file -- one row per ``coverage`` mapping it to its :class:`CalculationMethod`. Tens-to-hundreds of rows in practice; ``.xlsx`` row cap never binds. Supported extensions: ``.csv``, ``.xlsx``, ``.parquet``, ``.feather`` / ``.arrow``. """ return _drop_sample_table("sample_calculation_methods.csv", path) def _save_sample_inforce_state(path: Path | str) -> Path: """Drop the packaged sample in-force state file on disk at ``path``. The dynamic state-at-valuation companion to the static :func:`_save_sample_policies` file: one row per ``mp_id`` carrying the closing state from the prior reporting period (``elapsed_months``, ``count``, ``prior_csm``, ``lock_in_rate``). Pair the dropped file with :func:`read_inforce_state` and feed the result through :func:`apply_inforce_state` before :func:`fastcashflow.gmm.measure_inforce` -- the subsequent-measurement workflow at each period close. Supported extensions: ``.csv``, ``.xlsx``, ``.parquet``, ``.feather`` / ``.arrow``. One row per contract, so the ``.xlsx`` row cap (~1M / sheet) binds at the same scale as the policies file. """ return _drop_sample_table("sample_inforce_state.csv", path) def _save_sample_inforce_policies(path: Path | str) -> Path: """Drop a combined policies + in-force state sample file on disk at ``path``. The companion to :func:`read_inforce_policies`. Each row carries the permanent spec (issue_age, sex, term_months, premium_term_months, product, channel) and the closing state from the prior period (elapsed_months, count, prior_csm, lock_in_rate). Built on the fly by joining the packaged ``sample_policies.csv`` and ``sample_inforce_state.csv`` on ``mp_id``; ``count`` is the state value (post-decrement), not the inception count. Supported extensions: ``.csv``, ``.xlsx``, ``.parquet``, ``.feather`` / ``.arrow``. ``.xlsx`` is capped at ~1M rows / sheet. """ base = resources.files("fastcashflow") / "sample_data" with resources.as_file(base / "sample_policies.csv") as policies, \ resources.as_file(base / "sample_inforce_state.csv") as state: pol = pl.read_csv(policies) st = pl.read_csv(state) # ``count`` is on both files; drop the inception count from policies so # the state's post-decrement count is the one that survives the join. pol = pol.drop("count") combined = pol.join(st, on="mp_id", how="inner") dest_path = Path(path) if dest_path.is_dir(): dest_path = dest_path / "sample_inforce_policies.csv" _write_frame(combined, dest_path) return dest_path # --------------------------------------------------------------------------- # Economic scenarios # ---------------------------------------------------------------------------
[문서] def read_inforce_state(path: Path | str) -> "InforceState": """Read an in-force state file -- the per-MP closing state from the prior reporting period. The file has one row per model point with columns ``mp_id``, ``elapsed_months``, ``count``, ``prior_csm`` and ``lock_in_rate``. Reads ``.parquet``, ``.csv``, ``.xlsx`` or ``.feather`` / ``.arrow`` via :func:`_read_frame`. Pair with :func:`apply_inforce_state` to join the state onto a :class:`ModelPoints` built from the static policies file, then pass the result to :func:`fastcashflow.gmm.measure_inforce` with the returned :class:`InforceState`. ``lock_in_rate`` is required to be uniform across rows in v1 -- the engine takes a scalar locked-in rate. Cohort-aware per-MP rates are a future extension; for now the reader errors out if the column is not constant rather than silently dropping the per-row detail. """ from fastcashflow.modelpoints import InforceState df = _read_frame(path) needed = ("mp_id", "elapsed_months", "count", "prior_csm", "lock_in_rate") for col in needed: if col not in df.columns: raise ValueError( f"the in-force state file is missing required column {col!r}" ) lock = df["lock_in_rate"].to_numpy().astype(np.float64) if lock.size and not np.all(lock == lock[0]): raise NotImplementedError( "lock_in_rate must be uniform across rows in v1; per-MP " "(cohort-aware) lock-in rates are a future extension" ) return InforceState( mp_id=df["mp_id"].to_numpy(), elapsed_months=df["elapsed_months"].to_numpy().astype(np.int64), count=df["count"].to_numpy().astype(np.float64), prior_csm=df["prior_csm"].to_numpy().astype(np.float64), lock_in_rate=float(lock[0]) if lock.size else 0.0, )
[문서] def read_scenarios(path: Path | str) -> FloatArray: """Read a stochastic scenario set from a file. The file is a 2-D table -- one row per scenario, one column per projection month, every cell a rate or return. Reads ``.parquet``, ``.csv``, ``.xlsx`` or ``.feather`` / ``.arrow`` via :func:`_read_frame`. Returns a numpy ``float64`` array of shape ``(n_scenarios, n_time)``, or ``(n_scenarios,)`` when the file has a single column (flat-rate scenarios). The result is what :func:`measure_stochastic` and :func:`measure_tvog` accept as their ``scenarios`` / ``return_scenarios`` input. Calibration -- Hull-White, Vasicek, regime-switching, climate paths, etc. -- is left to a separate scenario-generator step; this reader is just the storage / handover layer. For large scenario sets (thousands of paths) prefer ``.parquet`` or ``.feather`` over ``.xlsx``. """ df = _read_frame(path) arr = df.to_numpy().astype(np.float64) if arr.shape[1] == 1: return arr[:, 0] return arr
# --------------------------------------------------------------------------- # Measurement results # ---------------------------------------------------------------------------
[문서] @singledispatch def write_measurement( measurement, path: Path | str, *, ids: np.ndarray | None = None, ) -> None: """Write a measurement's per-model-point headline results to parquet / CSV. One row per model point, in model-point order. Pass ``ids`` for a leading ``id`` column so the results join back to policies. Dispatches on the measurement type -- GMM writes ``bel`` / ``ra`` / ``csm`` / ``loss_component``, PAA writes ``lrc`` / ``loss_component``, VFA adds ``variable_fee`` / ``time_value``. A new model registers its columns with ``@write_measurement.register`` in the module that defines its measurement type (so io.py stays free of the engine import). """ raise TypeError( f"write_measurement does not handle {type(measurement).__name__}; " "pass a GMM / PAA / VFA measurement" )
def _write_measurement_columns( columns: dict[str, np.ndarray], path: Path | str, ids: np.ndarray | None ) -> None: """Shared writer for the registered ``write_measurement`` implementations: an optional leading ``id`` column, then the model's headline columns.""" out: dict[str, np.ndarray] = {} if ids is not None: out["id"] = np.asarray(ids) out.update(columns) _write_frame(pl.DataFrame(out), path)
[문서] def measure_stream( input_path: Path | str, output_dir: Path | str, basis: Basis | dict[tuple[str, str], Basis], *, coverages: Path | str | None = None, calculation_methods: Path | str | dict[str, CalculationMethod] | None = None, chunk_size: int = 20_000_000, backend: str = "cpu", id_column: str | None = None, validate_unique_mp_id: bool = True, ) -> int: """Stream a valuation through a parquet file one chunk at a time. Reads the input in chunks of ``chunk_size`` model points, values each chunk with the fused fast path (``measure(..., full=False)``), and writes the results as a parquet dataset -- one ``part-NNNNN.parquet`` file per chunk -- under ``output_dir``. Peak memory is a single chunk, so this scales past what an in-memory run could hold. The input is a policies + coverages pair, mirroring :func:`read_model_points`: ``input_path`` is the policies parquet and ``coverages`` the coverages parquet. Each chunk of policies pulls its coverage rows by ``mp_id``, so sorting the coverages file by ``mp_id`` lets the parquet reader prune row groups. A flat one-row-per-policy (wide) file is not accepted -- it cannot carry the per-coverage waiting and reduction rules. ``basis`` may be a single :class:`Basis` (uniform portfolio) or a ``{(product, channel): Basis}`` dict, exactly as ``measure``. With a dict each chunk routes its model points to their segment's basis, so the policies parquet must carry ``product`` / ``channel`` columns. ``id_column`` names the policies column written as the result ``id`` (so the output parquet joins back to a business key); it defaults to ``mp_id``. The coverages are always joined on ``mp_id`` regardless. ``validate_unique_mp_id`` (default ``True``) scans the whole policies file once up front and rejects a duplicate ``mp_id`` -- the same data error :func:`read_model_points` raises, which a chunk-by-chunk read would otherwise miss when the same id falls in different chunks. Set it ``False`` to skip the scan when the upstream extract already guarantees uniqueness. Returns the total number of model points processed. """ # Lazy import -- only ``measure_stream`` actually drives a valuation, so we # keep the engine import off the I/O hot path. A script that only reads # model points or writes results never pays the engine import cost. from fastcashflow.engine import measure input_path = Path(input_path) output_dir = Path(output_dir) if input_path.suffix != ".parquet": raise ValueError( f"measure_stream streams parquet input only; got {str(input_path)!r}" ) output_dir.mkdir(parents=True, exist_ok=True) if any(output_dir.glob("part-*.parquet")): raise ValueError( f"output directory {str(output_dir)!r} already contains part " "files; use a fresh directory" ) if isinstance(calculation_methods, (str, Path)): methods_dict = _parse_calculation_methods(calculation_methods) else: methods_dict = calculation_methods scan = pl.scan_parquet(input_path) n_total = scan.select(pl.len()).collect().item() processed = 0 # The result id is written from ``id_column`` (a business key the output # joins back on); coverages still join on ``mp_id``. Validate up front so a # typo'd id_column -- or a missing mp_id -- is a clear error, not a polars # ColumnNotFoundError leaking from a per-chunk read. schema_names = scan.collect_schema().names() if "mp_id" not in schema_names: raise ValueError( f"measure_stream: the policies file {str(input_path)!r} has no " "'mp_id' column; mp_id is the contract identity and the coverages " "join key (it is required even when id_column names a different " "result id)." ) id_col = id_column if id_column is not None else "mp_id" if id_col not in schema_names: raise ValueError( f"measure_stream: id_column {id_col!r} is not a column of the " f"policies file {str(input_path)!r}" ) # mp_id is the contract identity and the coverages join key. A chunk-by-chunk # read only sees one chunk's ids at a time, so a duplicate that straddles two # chunks would pass silently (and write a duplicate result id). Scan the # whole file once -- the same uniqueness read_model_points enforces in memory. if validate_unique_mp_id: dups = ( scan.select("mp_id").group_by("mp_id").len() .filter(pl.col("len") > 1).head(5).collect() ) if dups.height: raise ValueError( f"measure_stream: duplicate mp_id in {str(input_path)!r} (e.g. " f"{dups['mp_id'].to_list()}); mp_id is the contract identity / " "coverages join key and must be unique across the whole file. " "Pass validate_unique_mp_id=False to skip this scan when the " "upstream extract already guarantees uniqueness." ) if coverages is not None: # chunk the policies, pull each chunk's coverage rows. cov_scan = pl.scan_parquet(Path(coverages)) for part, offset in enumerate(range(0, n_total, chunk_size)): pol = scan.slice(offset, chunk_size).collect() ids = pol[id_col] cov = cov_scan.join( pol.lazy().select("mp_id"), on="mp_id", how="semi" ).collect() model_points = _model_points_from_frames(pol, cov, methods_dict) write_measurement( measure(model_points, basis, full=False, backend=backend), output_dir / f"part-{part:05d}.parquet", ids=ids.to_numpy(), ) processed += model_points.n_mp return processed raise ValueError( "measure_stream needs a coverages frame: pass coverages=<parquet path> " "(an mp_id / coverage / amount frame). A flat one-row-per-policy " "(wide) file cannot carry per-coverage waiting / reduction rules and is " "not accepted." )