fastcashflow.grouping의 소스 코드

"""IFRS 17 level of aggregation -- grouping contracts into the unit of account.

IFRS 17 measures insurance contracts not one by one but in *groups* -- the
unit of account (paragraphs 14-24): a portfolio of contracts subject to
similar risks and managed together, divided into annual cohorts (issued no
more than a year apart) and then by profitability (onerous at inception, no
significant possibility of becoming onerous, and the rest).

The grouping is load-bearing for the CSM. The contractual service margin
cannot be negative, and that floor applies to the *group*: contracts within
a group are netted before the floor, contracts in different groups are not.
So a profitable contract's margin absorbs a slightly onerous one's loss
only when they share a group.

``group`` takes a per-model-point measurement and a group assignment and
re-expresses it at the group level -- BEL and RA summed, the CSM and loss
component re-derived on the group aggregate. The result is itself a
measurement, its rows the groups, so it flows on into ``roll_forward``,
``reconcile`` and ``report``.

The group assignment is the user's to make: the portfolio and the annual
cohort are known contract attributes, and a per-model-point measurement's
``loss_component`` flags the contracts that are onerous standalone.
"""
from __future__ import annotations

from functools import singledispatch

import numpy as np

from fastcashflow._typing import FloatArray, IntArray
from fastcashflow._paa import PAAMeasurement
from fastcashflow._reinsurance import ReinsuranceMeasurement
from fastcashflow._vfa import VFAMeasurement
from fastcashflow.engine import GMMMeasurement
from fastcashflow.numerics import _csm_kernel, _csm_roll
from fastcashflow.projection import Cashflows


# In-force floor for the segmented discount-curve check: a month counts as live
# only above this, so a numerical residual past maturity is not read as a live
# month. Legitimate in-force is orders of magnitude larger.
_INFORCE_EPS = 1e-12


class _GroupReducer:
    """Sum rows within each group -- the grouping structure built once, reused.

    ``group()`` sums ~14 arrays (BEL, RA, every cash-flow stream, LIC, ...) over
    the *same* grouping. Building the reduction structure once here and reusing
    it for every :meth:`sum` avoids rebuilding the one-hot / re-sorting per array
    (the dominant cost at portfolio scale). One of two vectorised paths, chosen
    once from the size:

    * **few groups** -- a one-hot ``(n_groups, n) @ arr`` matrix multiply (a
      single BLAS call, no per-element scatter). Skipped when the one-hot would
      be large (``n_groups x n`` elements).
    * **many groups** -- sort once and reduce contiguous runs
      (``np.add.reduceat``), so the one-hot is never materialised.

    Empty groups stay zero. Sums run in group / sorted order rather than input
    order, so the result matches an unbuffered scatter-add to round-off.
    """

    def __init__(self, inverse: IntArray, n_groups: int):
        self.inverse = inverse
        self.n_groups = n_groups
        self.n = inverse.shape[0]
        # The number of model points per group -- also the grouping counts the
        # reduceat path needs, so compute the single bincount here.
        self.sizes = np.bincount(inverse, minlength=n_groups)
        if self.n and n_groups * self.n <= 20_000_000:
            self._onehot = (
                np.arange(n_groups)[:, None] == inverse[None, :]
            ).astype(np.float64)
            self._order = self._starts = self._nonempty = None
        else:
            self._onehot = None
            self._nonempty = np.nonzero(self.sizes)[0]
            self._order = np.argsort(inverse, kind="stable")
            self._starts = np.concatenate(([0], np.cumsum(self.sizes)[:-1]))

    def sum(self, arr: FloatArray) -> FloatArray:
        """Sum the rows of ``arr`` within each group -- shape ``(n_groups, ...)``."""
        if self.n == 0:
            return np.zeros((self.n_groups, *arr.shape[1:]), dtype=np.float64)
        if self._onehot is not None:
            return self._onehot @ arr
        result = np.zeros((self.n_groups, *arr.shape[1:]), dtype=np.float64)
        result[self._nonempty] = np.add.reduceat(
            arr[self._order], self._starts[self._nonempty], axis=0
        )
        return result


def _join_keys(cols, names=None) -> np.ndarray:
    """Composite ``'|'``-joined label per row, rejecting ``'|'`` in any value.

    The separator must round-trip, so a value carrying ``'|'`` would collide two
    distinct axis tuples onto one label and silently merge groups -- the same
    guard the segmented routing applies. Each axis is converted to a string
    column once (``astype(str)``) and joined vectorised with ``np.char.add``; the
    ``'|'`` guard runs only on string-like axes, since a numeric axis (e.g. an
    integer ``issue_year``) can never carry the separator.
    """
    str_cols = []
    for i, col in enumerate(cols):
        col = np.asarray(col)
        s = col.astype(str)
        if col.dtype.kind in "OUS":      # object / unicode / bytes -- can carry '|'
            bad = sorted(set(s[np.char.find(s, "|") >= 0].tolist()))
            if bad:
                where = f" in axis {names[i]!r}" if names else ""
                raise ValueError(
                    f"group key value(s) {bad}{where} contain the '|' character, "
                    "which grouping uses as the key separator -- rename the value "
                    "or change the separator upstream."
                )
        str_cols.append(s)
    out = str_cols[0]
    for s in str_cols[1:]:
        out = np.char.add(np.char.add(out, "|"), s)
    return out.astype(object)


def _resolve_group_ids(measurement: GMMMeasurement, by) -> np.ndarray:
    """Build the per-MP group label array from ``by``.

    ``by`` is one of: a single axis **name**; a **list** of axis names and/or
    precomputed ``(n_mp,)`` label arrays (joined into one composite label); or a
    single precomputed label **array**. Names are resolved via
    :meth:`ModelPoints.axis` against the model points
    :func:`~fastcashflow.gmm.measure` stamped on the measurement.
    """
    def axis(name: str) -> np.ndarray:
        mp = measurement.model_points
        if mp is None:
            raise ValueError(
                f"group(by={name!r}) needs the model points to resolve the name "
                "-- use a measurement returned by measure() (which stamps them), "
                "or pass a precomputed label array instead of a name."
            )
        return np.asarray(mp.axis(name))

    if isinstance(by, str):
        return axis(by)
    if isinstance(by, (list, tuple)):
        cols = [axis(b) if isinstance(b, str) else np.asarray(b) for b in by]
        names = [b if isinstance(b, str) else None for b in by]
        if len(cols) == 1:
            return cols[0]
        return _join_keys(cols, names)
    return np.asarray(by)


def _group_plan(measurement, by, n_mp: int):
    """Resolve ``by`` to per-MP labels, then build the shared reduction plan.

    Returns ``(labels, reducer)`` where ``labels`` is the ascending-order
    per-group composite label and ``reducer`` is the :class:`_GroupReducer`
    every per-field sum in this ``group`` call reuses.
    """
    group_ids = _resolve_group_ids(measurement, by)
    if group_ids.shape != (n_mp,):
        raise ValueError(f"group ids must have one entry per model point ({n_mp})")
    labels, inverse = np.unique(group_ids, return_inverse=True)
    # numpy >= 2.0 can return a 2-D inverse for n-D input; flatten to (n_mp,).
    return labels, _GroupReducer(inverse.reshape(-1), labels.shape[0])


def _sum_cashflows(cf: Cashflows, reducer: _GroupReducer) -> Cashflows:
    """Sum every cash-flow stream within each group (all streams are additive)."""
    return Cashflows(
        inforce=reducer.sum(cf.inforce),
        deaths=reducer.sum(cf.deaths),
        premium_cf=reducer.sum(cf.premium_cf),
        claim_cf=reducer.sum(cf.claim_cf),
        morbidity_cf=reducer.sum(cf.morbidity_cf),
        expense_cf=reducer.sum(cf.expense_cf),
        annuity_cf=reducer.sum(cf.annuity_cf),
        disability_cf=reducer.sum(cf.disability_cf),
        maturity_cf=reducer.sum(cf.maturity_cf),
        maturity_survivors=reducer.sum(cf.maturity_survivors),
        surrender_cf=reducer.sum(cf.surrender_cf),
    )


[문서] @singledispatch def group(measurement, by): """Aggregate a per-model-point measurement to any axis. A general aggregation primitive -- not IFRS 17-specific. ``by`` is one of: * a single **axis name** (e.g. ``"product"``); * a **list** of axis names and/or precomputed ``(n_mp,)`` label arrays (e.g. ``["product", "issue_year"]``, or ``["product", onerous_array]``), joined into one composite label; * a single precomputed ``(n_mp,)`` **array** of group labels. Names are resolved per model point via :meth:`ModelPoints.axis` against the model points the measure stamped on the result, so no re-passing is needed; a computed axis with no source column (e.g. an onerous flag from ``loss_component``) is passed as an array instead -- an ``np.ndarray``, since a Python *list* is read as a list of axes, not a single label vector. BEL and RA are summed within each group; the CSM and the loss component are re-derived on the group aggregate, so the ``max(0, ...)`` floor nets the contracts within a group but not across groups. The IFRS 17 unit of account (portfolio x annual cohort x profitability) is one choice of axes -- :func:`group_of_contracts` is the preset for it; management-accounting, profitability and validation views are other choices of ``by``. Dispatches on the measurement type (``GMMMeasurement``, ``VFAMeasurement``, ``ReinsuranceMeasurement``, ``PAAMeasurement``). Returns a measurement of the same type whose rows are the groups, in ascending label order -- usable in turn by :func:`~fastcashflow.roll_forward`, :func:`~fastcashflow.reconcile` and :func:`~fastcashflow.report`. Its ``group_labels`` attribute carries the composite label of each row, so a caller can map a group back to its key (e.g. ``"|"``-split a :func:`group_of_contracts` label into portfolio / cohort / profitability) without rebuilding the keys; ``group_sizes`` carries the number of model points in each group (model-point rows, not the policy count -- they differ when a model point's ``count`` stands for several policies). """ raise TypeError( f"group is not implemented for {type(measurement).__name__}; supported: " "GMMMeasurement, VFAMeasurement, ReinsuranceMeasurement, PAAMeasurement." )
@group.register def _(measurement: GMMMeasurement, by) -> GMMMeasurement: if measurement.bel_path is None: raise ValueError( "group() requires a full=True measurement; the trajectory fields " "are None on the full=False fast path. Call measure(..., full=True)." ) labels, r = _group_plan(measurement, by, measurement.bel_path.shape[0]) bel = r.sum(measurement.bel_path) ra = r.sum(measurement.ra_path) grouped_cf = _sum_cashflows(measurement.cashflows, r) # The CSM and the loss component are re-derived on the group aggregate -- # the max(0, ...) floor applies to the group, not the contract. fcf0 = bel[:, 0] + ra[:, 0] csm0 = np.maximum(0.0, -fcf0) loss_component = np.maximum(0.0, fcf0) bom = measurement.discount_bom if bom.ndim == 2: # Segmented: each model point discounts on its own curve, and a group # must sit in one curve. But the curves are padded to the portfolio's # longest horizon -- a flat tail past each contract's maturity -- so two # contracts on the *same* curve with different terms have different # tails. Compare each row only over its live horizon (where it is still # in force; the padded tail discounts zero in-force and never reaches the # CSM), and represent the group by its longest-horizon curve so the # discounting is correct for every contract's whole term. cols = np.arange(bom.shape[1]) inforce = measurement.cashflows.inforce # Live = still in force. A small floor (not exact > 0) so a numerical # residual past maturity is not read as a live month, which would extend # the compared horizon into the padded tail and falsely reject the group. # Legitimate in-force is orders of magnitude above this floor. live = np.where(inforce > _INFORCE_EPS, np.arange(inforce.shape[1])[None, :], -1).max(axis=1) out_bom = np.empty((r.n_groups, bom.shape[1])) out_mid = np.empty((r.n_groups, measurement.discount_mid.shape[1])) # group -> its row indices, from a single sort rather than a full # ``inverse == g`` scan per group (which would be O(n_groups x n_mp)). group_rows = np.split(np.argsort(r.inverse, kind="stable"), np.cumsum(r.sizes)[:-1]) for g in range(r.n_groups): rows = group_rows[g] rep = rows[np.argmax(live[rows])] livemask = cols[None, :] < (live[rows] + 2)[:, None] if not np.allclose(np.where(livemask, bom[rows] - bom[rep], 0.0), 0.0): raise ValueError( f"group {labels[g]!r} mixes model points with different " "discount curves -- a group must sit in one portfolio " "(basis). Split it by basis before grouping." ) out_bom[g] = bom[rep] out_mid[g] = measurement.discount_mid[rep] monthly_rate = out_bom[:, :-1] / out_bom[:, 1:] - 1.0 else: out_bom, out_mid = bom, measurement.discount_mid monthly_rate = bom[:-1] / bom[1:] - 1.0 # _csm_roll dispatches on the rate's ndim: a segmented result carries a 2-D # per-group discount curve, a single basis a 1-D one. (VFA and reinsurance # are single-basis only, so they call the 1-D _csm_kernel directly.) csm, csm_accretion, csm_release = _csm_roll( csm0, np.ascontiguousarray(grouped_cf.inforce), monthly_rate ) return GMMMeasurement( bel=bel[:, 0], ra=ra[:, 0], csm=csm[:, 0], loss_component=loss_component, bel_path=bel, ra_path=ra, csm_path=csm, csm_accretion=csm_accretion, csm_release=csm_release, lic=r.sum(measurement.lic), cashflows=grouped_cf, discount_bom=out_bom, discount_mid=out_mid, group_labels=labels, group_sizes=r.sizes, ) @group.register def _(measurement: VFAMeasurement, by) -> VFAMeasurement: if measurement.bel_path is None: raise ValueError( "group() requires a full measurement; the trajectory fields are " "None. Re-run vfa.measure()." ) labels, r = _group_plan(measurement, by, measurement.bel_path.shape[0]) bel = r.sum(measurement.bel_path) ra = r.sum(measurement.ra_path) grouped_cf = _sum_cashflows(measurement.cashflows, r) # variable_fee (PV of the fee) and time_value (a cost) are per-MP amounts -- # additive. account_value is a per-policy level, not a group quantity (the # group's fund would be sum(inforce x av), a different field), so it does not # carry to the grouped result. time_value = r.sum(measurement.time_value) variable_fee = r.sum(measurement.variable_fee) # The CSM and loss component are re-derived on the group aggregate. The VFA # inception fulfilment cash flows fold in the guarantee time value, and the # CSM accretes at the underlying-items return (the single VFA curve, so no # per-MP curve to reconcile), released by coverage units. fcf0 = bel[:, 0] + ra[:, 0] + time_value csm0 = np.maximum(0.0, -fcf0) loss_component = np.maximum(0.0, fcf0) bom = measurement.discount_bom monthly_rate = bom[:-1] / bom[1:] - 1.0 csm, csm_accretion, csm_release = _csm_kernel( csm0, np.ascontiguousarray(grouped_cf.inforce), monthly_rate ) return VFAMeasurement( bel=bel[:, 0], ra=ra[:, 0], csm=csm[:, 0], variable_fee=variable_fee, time_value=time_value, loss_component=loss_component, bel_path=bel, ra_path=ra, csm_path=csm, account_value_path=None, csm_accretion=csm_accretion, csm_release=csm_release, lic=r.sum(measurement.lic), cashflows=grouped_cf, discount_bom=bom, model_points=None, group_labels=labels, group_sizes=r.sizes, ) @group.register def _(measurement: ReinsuranceMeasurement, by) -> ReinsuranceMeasurement: if measurement.cashflows is None or measurement.discount_bom is None: raise ValueError( "group() requires a full reinsurance measurement (cash flows and " "discount curve). Re-run reinsurance.measure()." ) labels, r = _group_plan(measurement, by, measurement.bel.shape[0]) bel = r.sum(measurement.bel) ra = r.sum(measurement.ra) recovery = r.sum(measurement.recovery) reinsurance_premium = r.sum(measurement.reinsurance_premium) grouped_cf = _sum_cashflows(measurement.cashflows, r) # Reinsurance held has no loss component and no floor (paragraph 65): the # CSM is the net cost or gain, csm0 = -(BEL - RA). That is linear, so the # grouped CSM equals the sum of the per-contract CSMs; only the accretion / # release trajectory changes, re-derived at the single discount curve and # released by the grouped coverage units. csm0 = -(bel - ra) bom = measurement.discount_bom monthly_rate = bom[:-1] / bom[1:] - 1.0 csm, csm_accretion, csm_release = _csm_kernel( csm0, np.ascontiguousarray(grouped_cf.inforce), monthly_rate ) return ReinsuranceMeasurement( bel=bel, ra=ra, csm=csm[:, 0], csm_path=csm, csm_accretion=csm_accretion, csm_release=csm_release, recovery=recovery, reinsurance_premium=reinsurance_premium, cashflows=grouped_cf, discount_bom=bom, model_points=None, group_labels=labels, group_sizes=r.sizes, ) @group.register def _(measurement: PAAMeasurement, by) -> PAAMeasurement: if measurement.lrc_path is None or measurement.fcf is None: raise ValueError( "group() requires a full PAA measurement; the trajectory fields are " "None. Re-run paa.measure()." ) labels, r = _group_plan(measurement, by, measurement.lrc_path.shape[0]) lrc_path = r.sum(measurement.lrc_path) revenue = r.sum(measurement.revenue) service_expense = r.sum(measurement.service_expense) lic = r.sum(measurement.lic) grouped_cf = _sum_cashflows(measurement.cashflows, r) # The LRC, revenue, service expense and LIC are all undiscounted and # additive -- there is no CSM (paragraphs 53-59). The only non-linear part # is the onerous loss (paragraph 57): re-derive it on the group's aggregate # fulfilment cash flows, so a profitable contract nets a marginally onerous # one within the group. fcf = r.sum(measurement.fcf) loss_component = np.maximum(0.0, fcf) return PAAMeasurement( lrc=lrc_path[:, 0], loss_component=loss_component, fcf=fcf, lrc_path=lrc_path, revenue=revenue, service_expense=service_expense, lic=lic, cashflows=grouped_cf, model_points=None, group_labels=labels, group_sizes=r.sizes, )
[문서] @singledispatch def group_of_contracts(measurement, *, portfolio: str = "product", cohort: str = "issue_year", profitability=None) -> GMMMeasurement: """Aggregate a measurement to the IFRS 17 group of insurance contracts. The unit of account (paragraphs 14-24) is a portfolio (14) x annual cohort (22) x profitability (16). This preset builds that grouping from the model points :func:`~fastcashflow.gmm.measure` stamped on the measurement and runs :func:`group`, so the CSM floor nets within a group but not across. Dispatches on the measurement type; the profitability axis differs by type (a new measurement registers with ``@group_of_contracts.register``): * ``GMMMeasurement`` / ``VFAMeasurement`` / ``PAAMeasurement`` -- insurance contracts issued, direct-participating, and short-coverage (PAA) contracts; profitability is the onerous / remaining split (paragraph 16, and 57 for the PAA). The per-type re-derivation differs (VFA accretes the CSM at the underlying-items return; the PAA has no CSM, only the LRC and the onerous loss), handled by :func:`group`'s own dispatch. * ``ReinsuranceMeasurement`` -- reinsurance contracts held; profitability is the net-gain split (paragraph 61, ``csm > 0``), and there is no loss component or floor (paragraph 65), so the grouped CSM is the sum of the contract CSMs. Arguments (keyword-only): * ``portfolio`` -- the column naming the portfolio axis (default ``"product"``: paragraph 14's product line). Pass another column name to group on a different portfolio definition. * ``cohort`` -- the column naming the annual-cohort axis (default ``"issue_year"``, derived from ``issue_date``: paragraph 22). Pass another column (e.g. ``"issue_quarter"`` carried in the data) for a finer cohort; paragraph 22 caps the span at one year, so a cohort may be finer than annual but not coarser. * ``profitability`` -- the profitability classification. ``None`` (default) derives it from the measurement, since it is an output, not a known input (paragraph 16 / 47's net-outflow test). Pass a precomputed ``(n_mp,)`` array for a custom split (e.g. the paragraph-16 three-way split using a CSM-vs-RA threshold), or a column name for a locked classification carried in the data (paragraph 24: the group is fixed at inception). Requires a ``full=True`` measurement. """ raise TypeError( "group_of_contracts is not implemented for " f"{type(measurement).__name__}; supported: GMMMeasurement, " "VFAMeasurement, ReinsuranceMeasurement, PAAMeasurement." )
def _portfolio_cohort(measurement, portfolio, cohort): """Resolve the portfolio and annual-cohort label arrays (shared by all presets).""" mp = measurement.model_points if mp is None: raise ValueError( "group_of_contracts needs the model points -- use a measurement " "returned by measure() (which stamps them)." ) portfolio_arr = mp.axis(portfolio) # cohort: issue_year (from issue_date) by default. With the default left in # place but no issue_date set, fall back to a single cohort -- all new # business sits within one year (paragraph 22). An explicit cohort column # that is missing is a typo, so let its KeyError propagate. if cohort == "issue_year": try: cohort_arr = mp.axis("issue_year") except KeyError: cohort_arr = np.zeros(mp.n_mp, dtype=np.int64) else: cohort_arr = mp.axis(cohort) return mp, portfolio_arr, cohort_arr def _resolve_profitability(mp, profitability, default): """profitability override: a column name, a custom array, or ``None`` -> default. ``default`` is the engine-derived split (an output, not a known input). A string names a stored (locked, paragraph 24) classification; an array is a custom split (e.g. the paragraph-16 three-way split). """ if profitability is None: return default if isinstance(profitability, str): return mp.axis(profitability) return np.asarray(profitability) def _group_of_contracts_onerous(measurement, *, portfolio="product", cohort="issue_year", profitability=None): """Shared GMM / VFA / PAA preset -- profitability is the onerous split. Insurance contracts issued (GMM), direct-participating contracts (VFA) and short-coverage contracts (PAA) use the same onerous / remaining classification, derived from the measurement's ``loss_component`` (paragraph 16, and 57 for the PAA); only ``group``'s per-type re-derivation differs. """ mp, portfolio_arr, cohort_arr = _portfolio_cohort(measurement, portfolio, cohort) default = np.where(measurement.loss_component > 0.0, "onerous", "remaining") prof = _resolve_profitability(mp, profitability, default) return group(measurement, [portfolio_arr, cohort_arr, prof]) group_of_contracts.register(GMMMeasurement, _group_of_contracts_onerous) group_of_contracts.register(VFAMeasurement, _group_of_contracts_onerous) group_of_contracts.register(PAAMeasurement, _group_of_contracts_onerous) @group_of_contracts.register def _(measurement: ReinsuranceMeasurement, *, portfolio: str = "product", cohort: str = "issue_year", profitability=None) -> ReinsuranceMeasurement: # Reinsurance held replaces the onerous test with a net gain at initial # recognition (paragraph 61). The CSM is the net cost (negative) or net gain # (positive), so csm > 0 is the net-gain group. mp, portfolio_arr, cohort_arr = _portfolio_cohort(measurement, portfolio, cohort) default = np.where(measurement.csm > 0.0, "net_gain", "no_net_gain") prof = _resolve_profitability(mp, profitability, default) return group(measurement, [portfolio_arr, cohort_arr, prof])