Source code for zfit.core.loss

#  Copyright (c) 2020 zfit

import abc
import warnings
from typing import Optional, Union, List, Callable, Iterable, Tuple

import tensorflow as tf
from ordered_set import OrderedSet

from .baseobject import BaseObject
from .constraint import BaseConstraint, SimpleConstraint
from .dependents import BaseDependentsMixin
from .interfaces import ZfitLoss, ZfitSpace, ZfitModel, ZfitData
from .. import z, settings
from ..util import ztyping
from ..util.cache import Cachable
from ..util.checks import NOT_SPECIFIED
from ..util.container import convert_to_container, is_container
from ..util.exception import IntentionAmbiguousError, NotExtendedPDFError, WorkInProgressError, \
from ..z.math import numerical_gradient, autodiff_gradient, autodiff_value_gradients, numerical_value_gradients, \
    automatic_value_gradients_hessian, numerical_value_gradients_hessian

# @z.function
def _unbinned_nll_tf(model: ztyping.PDFInputType, data: ztyping.DataInputType, fit_range: ZfitSpace):
    """Return unbinned negative log likelihood graph for a PDF

        model (ZfitModel): PDFs with a `.pdf` method. Has to be as many models as data
        data (ZfitData):
        fit_range ():

        graph: the unbinned nll

        ValueError: if both `probs` and `log_probs` are specified.

    if is_container(model):
        nlls = [_unbinned_nll_tf(model=p, data=d, fit_range=r)
                for p, d, r in zip(model, data, fit_range)]
        nll_finished = tf.reduce_sum(input_tensor=nlls, axis=0)
        with data.set_data_range(fit_range):
            probs = model.pdf(data, norm_range=fit_range)
        log_probs = tf.math.log(probs)
        if data.weights is not None:
            log_probs *= data.weights  # because it's prob ** weights
        nll = -tf.reduce_sum(input_tensor=log_probs, axis=0)
        nll_finished = nll
    return nll_finished

def _nll_constraints_tf(constraints):
    if not constraints:
        return z.constant(0.)  # adding 0 to nll
    probs = []
    for param, dist in constraints.items():
    # probs = [dist.pdf(param) for param, dist in constraints.items()]
    constraints_neg_log_prob = -tf.reduce_sum(input_tensor=tf.math.log(probs))
    return constraints_neg_log_prob

def _constraint_check_convert(constraints):
    checked_constraints = []
    for constr in constraints:
        if isinstance(constr, BaseConstraint):
            checked_constraints.append(SimpleConstraint(func=lambda: constr))
    return checked_constraints

[docs]class BaseLoss(BaseDependentsMixin, ZfitLoss, Cachable, BaseObject): def __init__(self, model: ztyping.ModelsInputType, data: ztyping.DataInputType, fit_range: ztyping.LimitsTypeInput = None, constraints: ztyping.ConstraintsTypeInput = None): # first doc line left blank on purpose, subclass adds class docstring (Sphinx autodoc adds the two) """ A "simultaneous fit" can be performed by giving one or more `model`, `data`, `fit_range` to the loss. The length of each has to match the length of the others. Args: model (Iterable[ZfitModel]): The model or models to evaluate the data on data (Iterable[ZfitData]): Data to use fit_range (Iterable[:py:class:`~zfit.Space`]): The fitting range. It's the norm_range for the models (if they have a norm_range) and the data_range for the data. constraints (Iterable[tf.Tensor): A Tensor representing a loss constraint. Using `zfit.constraint.*` allows for easy use of predefined constraints. """ super().__init__(name=type(self).__name__) if fit_range is not None: warnings.warn("The fit_range argument is depreceated and will maybe removed in future releases. " "It is preferred to define the range in the space" " when creating the data and the model.") self.computed_gradients = {} model, data, fit_range = self._input_check(pdf=model, data=data, fit_range=fit_range) self._model = model self._data = data self._fit_range = fit_range if constraints is None: constraints = [] self._constraints = _constraint_check_convert(convert_to_container(constraints, list)) def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._name = "UnnamedSubBaseLoss" def _input_check(self, pdf, data, fit_range): if is_container(pdf) ^ is_container(data): raise ValueError("`pdf` and `data` either both have to be a list or not.") if not is_container(pdf): if isinstance(fit_range, list): raise TypeError("`pdf` and `data` are not a `list`, `fit_range` can't be a `list` then.") if isinstance(pdf, tuple): raise TypeError("`pdf` has to be a pdf or a list of pdfs, not a tuple.") if isinstance(data, tuple): raise TypeError("`data` has to be a data or a list of data, not a tuple.") pdf, data = (convert_to_container(obj, non_containers=[tuple]) for obj in (pdf, data)) # TODO: data, range consistency? if fit_range is None: fit_range = [] for p, d in zip(pdf, data): if not p.norm_range == d.data_range: raise IntentionAmbiguousError(f"No `fit_range` is specified and `pdf` {p} as " f"well as `data` {d} have different ranges they " f"are defined in. Either make them (all) consistent " f"or specify the `fit_range`") fit_range.append(p.norm_range) else: fit_range = convert_to_container(fit_range, non_containers=[tuple]) # simultaneous fit # if is_container(pdf): # if not is_container(fit_range) or not isinstance(fit_range[0], Space): # raise ValueError( # "If several pdfs are specified, the `fit_range` has to be given as a list of `Space` " # "objects and not as pure tuples.") # else: # fit_range = pdf.convert_sort_space(limits=fit_range) # fit_range may be a tuple if not len(pdf) == len(data) == len(fit_range): raise ValueError("pdf, data and fit_range don't have the same number of components:" "\npdf: {}" "\ndata: {}" "\nfit_range: {}".format(pdf, data, fit_range)) # sanitize fit_range fit_range = [p.convert_sort_space(limits=range_) for p, range_ in zip(pdf, fit_range)] # TODO: sanitize pdf, data? self.add_cache_dependents(cache_dependents=pdf) self.add_cache_dependents(cache_dependents=data) self.add_cache_dependents(cache_dependents=fit_range) return pdf, data, fit_range
[docs] def gradients(self, params: ztyping.ParamTypeInput = None) -> List[tf.Tensor]: if params is None: params = list(self.get_dependents()) else: params = convert_to_container(params) return self._gradients(params=params)
[docs] def add_constraints(self, constraints): constraints = convert_to_container(constraints) return self._add_constraints(constraints)
def _add_constraints(self, constraints): constraints = _constraint_check_convert(convert_to_container(constraints, container=list)) self._constraints.extend(constraints) return constraints @property def name(self): return self._name @property def model(self): return self._model @property def data(self): return self._data @property def fit_range(self): fit_range = self._fit_range return fit_range @property def constraints(self): return self._constraints def _get_dependents(self): # TODO: fix, add constraints pdf_dependents = self._extract_dependents(self.model) pdf_dependents |= self._extract_dependents(self.constraints) return pdf_dependents @abc.abstractmethod def _loss_func(self, model, data, fit_range, constraints): raise NotImplementedError
[docs] def value(self): return self._value()
@property def errordef(self) -> Union[float, int]: return self._errordef def _value(self): try: return self._loss_func(model=self.model,, fit_range=self.fit_range, constraints=self.constraints) except NotImplementedError as error: raise NotImplementedError("_loss_func not properly defined!") from error def __add__(self, other): if not isinstance(other, BaseLoss): raise TypeError("Has to be a subclass of `BaseLoss` or overwrite `__add__`.") if not type(other) == type(self): raise ValueError("cannot safely add two different kind of loss.") model = self.model + other.model data = + fit_range = self.fit_range + other.fit_range constraints = self.constraints + other.constraints loss = type(self)(model=model, data=data, fit_range=fit_range, constraints=constraints) return loss def _gradients(self, params): if settings.options['numerical_grad']: gradients = numerical_gradient(self.value, params=params) else: gradients = autodiff_gradient(self.value, params=params) return gradients
[docs] def value_gradients(self, params: ztyping.ParamTypeInput) -> Tuple[tf.Tensor, tf.Tensor]: return self._value_gradients(params=params)
def _value_gradients(self, params): if settings.options['numerical_grad']: value, gradients = numerical_value_gradients(self.value, params=params) else: value, gradients = autodiff_value_gradients(self.value, params=params) return value, gradients
[docs] def value_gradients_hessian(self, params: ztyping.ParamTypeInput, hessian=None) -> Tuple[ tf.Tensor, tf.Tensor, tf.Tensor]: numerical = settings.options['numerical_grad'] vals = self._value_gradients_hessian(params=params, hessian=hessian, numerical=numerical) return vals
@z.function(wraps='loss') def _value_gradients_hessian(self, params, hessian, numerical=False): if numerical: result = numerical_value_gradients_hessian(self.value, params=params, hessian=hessian) else: result = automatic_value_gradients_hessian(self.value, params=params, hessian=hessian) return result def __repr__(self) -> str: class_name = repr(self.__class__)[:-2].split(".")[-1] string = f'<{class_name} ' \ f'model={one_two_many([ for model in self.model])} ' \ f'data={one_two_many([ for data in])} ' \ f'constraints={one_two_many(self.constraints, many="True")} ' \ f'>' return string def __str__(self) -> str: class_name = repr(self.__class__)[:-2].split(".")[-1] string = f'<{class_name}' \ f' model={one_two_many([model for model in self.model])}' \ f' data={one_two_many([data for data in])}' \ f' constraints={one_two_many(self.constraints, many="True")}' \ f'>' return string
[docs]def one_two_many(values, n=3, many='multiple'): values = convert_to_container(values) if len(values) > n: values = many return values
[docs]class CachedLoss(BaseLoss): def __init__(self, model, data, fit_range=None, constraints=None): raise WorkInProgressError("Currently, caching is not implemented in the loss and does not make" "sense, it is 'not yet upgraded to TF2'") super().__init__(model=model, data=data, fit_range=fit_range, constraints=constraints) @abc.abstractmethod def _cache_add_constraints(self, constraints): raise NotImplementedError def _value(self): if self._cache.get('loss') is None: loss = super()._value() self._cache['loss'] = loss else: loss = self._cache['loss'] return loss def _add_constraints(self, constraints): super()._add_constraints(constraints=constraints) self._cache_add_constraints(constraints=constraints) def _gradients(self, params): params_cache = self._cache.get('gradients', {}) params_todo = [] for param in params: if param not in params_cache: params_todo.append(param) if params_todo: gradients = {(p, grad) for p, grad in zip(params_todo, super()._gradients(params_todo))} params_cache.update(gradients) self._cache['gradients'] = params_cache param_gradients = [params_cache[param] for param in params] return param_gradients
# class UnbinnedNLL(CachedLoss):
[docs]class UnbinnedNLL(BaseLoss): """The Unbinned Negative Log Likelihood.""" _name = "UnbinnedNLL" def __init__(self, model, data, fit_range=None, constraints=None): super().__init__(model=model, data=data, fit_range=fit_range, constraints=constraints) self._errordef = 0.5 @z.function(wraps='loss') def _loss_func(self, model, data, fit_range, constraints): # with tf.GradientTape(persistent=True) as tape: nll = self._loss_func_watched(constraints, data, fit_range, model) # variables = tape.watched_variables() # gradients = tape.gradient(nll, sources=variables) # if any(grad is None for grad in tf.unstack(gradients, axis=0)): # none_dict = {var: grad for var, grad in zip(variables, tf.unstack(gradients, axis=0)) if grad is None} # raise LogicalUndefinedOperationError(f"One or more gradients are None and therefore the function does not" # f" depend on them:" # f" {none_dict}") # for param, grad in zip(variables, gradients): # if param in self.computed_gradients: # continue # self.computed_gradients[param] = grad return nll @z.function(wraps='loss') def _loss_func_watched(self, constraints, data, fit_range, model): nll = _unbinned_nll_tf(model=model, data=data, fit_range=fit_range) if constraints: constraints = z.reduce_sum([c.value() for c in constraints]) nll += constraints return nll
# def _cache_add_constraints(self, constraints): # if self._cache.get('loss') is not None: # constraints = [c.value() for c in constraints] # self._cache['loss'] += z.reduce_sum(constraints)
[docs]class ExtendedUnbinnedNLL(UnbinnedNLL): """An Unbinned Negative Log Likelihood with an additional poisson term for the""" @z.function(wraps='loss') def _loss_func(self, model, data, fit_range, constraints): nll = super()._loss_func(model=model, data=data, fit_range=fit_range, constraints=constraints) poisson_terms = [] for mod, dat in zip(model, data): if not mod.is_extended: raise NotExtendedPDFError("The pdf {} is not extended but has to be (for an extended fit)".format(mod)) nevents = dat.n_events if dat.weights is None else z.reduce_sum(dat.weights) poisson_terms.append(-mod.get_yield() + z.to_real(nevents) * tf.math.log(mod.get_yield())) nll -= tf.reduce_sum(input_tensor=poisson_terms) return nll
[docs]class SimpleLoss(BaseLoss): _name = "SimpleLoss" def __init__(self, func: Callable, dependents: Iterable["zfit.Parameter"] = NOT_SPECIFIED, errordef: Optional[float] = None): """Loss from a (function returning a ) Tensor. Args: func: Callable that constructs the loss and returns a tensor. dependents: The dependents (independent `zfit.Parameter`) of the loss. If not given, the dependents are figured out automatically. errordef: Definition of which change in the loss corresponds to a change of 1 sigma. For example, 1 for Chi squared, 0.5 for negative log-likelihood. """ if dependents is NOT_SPECIFIED: # depreceation raise BreakingAPIChangeError("Dependents need to be specified explicitly due to the upgrade to 0.4." "More information can be found in the upgrade guide on the website.") @z.function(wraps='loss') def wrapped_func(): return func() self._simple_func = wrapped_func self._simple_errordef = errordef self._errordef = errordef self.computed_gradients = {} dependents = convert_to_container(dependents, container=OrderedSet) self._simple_func_dependents = self._extract_dependents(dependents) super().__init__(model=[], data=[], fit_range=[]) def _get_dependents(self): dependents = self._simple_func_dependents return dependents @property def errordef(self): errordef = self._simple_errordef if errordef is None: errordef = -999 # raise RuntimeError("For this SimpleLoss, no error calculation is possible.") else: return errordef def _loss_func(self, model, data, fit_range, constraints=None): return self._simple_func() def __add__(self, other): raise IntentionAmbiguousError("Cannot add a SimpleLoss, 'addition' of losses can mean anything." "Add them manually") def _cache_add_constraints(self, constraints): raise WorkInProgressError("Needed? will probably provided in future")