Source code for astromodels.functions.functions_1D.functions

from __future__ import division

import astropy.units as astropy_units
import numpy as np
from past.utils import old_div

from astromodels.core.units import get_units
from astromodels.functions.function import Function1D, FunctionMeta
from astromodels.utils.configuration import astromodels_config
from astromodels.utils.logging import setup_logger

log = setup_logger(__name__)

__author__ = "giacomov"
# DMFitFunction and DMSpectra add by Andrea Albert (aalbert@slac.stanford.edu) Oct 26, 2016

erg2keV = 6.24151e8


[docs] class GSLNotAvailable(ImportWarning): pass
[docs] class NaimaNotAvailable(ImportWarning): pass
[docs] class InvalidUsageForFunction(Exception): pass
# Now let's try and import optional dependencies import astropy.units as u try: # Naima is for numerical computation of Synch. and Inverse compton spectra in randomly oriented # magnetic fields import naima except ImportError: if astromodels_config.logging.startup_warnings: log.warning( "The naima package is not available. Models that depend on it will not be available" ) has_naima = False else: has_naima = True try: # GSL is the GNU Scientific Library. Pygsl is the python wrapper for it. It is used by some # functions for faster computation from pygsl.testing.sf import gamma_inc except ImportError: if astromodels_config.logging.startup_warnings: log.warning( "The GSL library or the pygsl wrapper cannot be loaded. Models that depend on it will not be " "available." ) has_gsl = False else: has_gsl = True
[docs] class GenericFunction(Function1D, metaclass=FunctionMeta): r""" description : Return k*f(x) latex : $ k $ parameters : k : desc : Constant value initial value : 1 """
[docs] def set_function(self, f): self._function = f
[docs] def get_function(self): return self._function
def _set_units(self, x_unit, y_unit): self.k.unit = y_unit function = property( get_function, set_function, doc="""Get/set function""", )
[docs] def evaluate(self, x, k): try: return k * self._function(x) except TypeError: if isinstance(x, u.Quantity): ones = np.ones_like(x).value else: ones = np.ones_like(x) return k * ones except AttributeError: log.error('You must define a function with set_function!')
[docs] def to_dict(self, minimal=False): data = super(Function1D, self).to_dict(minimal) if not minimal: try: f = self._function except AttributeError: f = None data["extra_setup"] = {"function": f} return data
[docs] class StepFunction(Function1D, metaclass=FunctionMeta): r""" description : A function which is constant on the interval lower_bound - upper_bound and 0 outside the interval. The extremes of the interval are counted as part of the interval. latex : $ f(x)=\begin{cases}0 & x < \text{lower_bound} \\\text{value} & \text{lower_bound} \le x \le \text{upper_bound} \\ 0 & x > \text{upper_bound} \end{cases}$ parameters : lower_bound : desc : Lower bound for the interval initial value : 0 upper_bound : desc : Upper bound for the interval initial value : 1 value : desc : Value in the interval initial value : 1.0 tests : - { x : 0.5, function value: 1.0, tolerance: 1e-20} - { x : -0.5, function value: 0, tolerance: 1e-20} """ def _set_units(self, x_unit, y_unit): # Lower and upper bound has the same unit as x self.lower_bound.unit = x_unit self.upper_bound.unit = x_unit # value has the same unit as y self.value.unit = y_unit
[docs] def evaluate(self, x, lower_bound, upper_bound, value): # The value * 0 is to keep the units right result = np.zeros(x.shape) * value * 0 idx = (x >= lower_bound) & (x <= upper_bound) result[idx] = value return result
[docs] class StepFunctionUpper(Function1D, metaclass=FunctionMeta): r""" description : A function which is constant on the interval lower_bound - upper_bound and 0 outside the interval. The upper interval is open. latex : $ f(x)=\begin{cases}0 & x < \text{lower_bound} \\\text{value} & \text{lower_bound} \le x \le \text{upper_bound} \\ 0 & x > \text{upper_bound} \end{cases}$ parameters : lower_bound : desc : Lower bound for the interval initial value : 0 fix : yes upper_bound : desc : Upper bound for the interval initial value : 1 fix : yes value : desc : Value in the interval initial value : 1.0 tests : - { x : 0.5, function value: 1.0, tolerance: 1e-20} - { x : -0.5, function value: 0, tolerance: 1e-20} """ def _set_units(self, x_unit, y_unit): # Lower and upper bound has the same unit as x self.lower_bound.unit = x_unit self.upper_bound.unit = x_unit # value has the same unit as y self.value.unit = y_unit
[docs] def evaluate(self, x, lower_bound, upper_bound, value): # The value * 0 is to keep the units right result = np.zeros(x.shape) * value * 0 idx = (x >= lower_bound) & (x < upper_bound) result[idx] = value return result
# noinspection PyPep8Naming # noinspection PyPep8Naming
[docs] class Sin(Function1D, metaclass=FunctionMeta): r""" description : A sinusodial function latex : $ K~\sin{(2\pi f x + \phi)} $ parameters : K : desc : Normalization initial value : 1 is_normalization : True f : desc : frequency initial value : 1.0 / (2 * np.pi) min : 0 phi : desc : phase initial value : 0 min : -np.pi max : +np.pi unit: rad tests : - { x : 0.0, function value: 0.0, tolerance: 1e-10} - { x : 1.5707963267948966, function value: 1.0, tolerance: 1e-10} """ def _set_units(self, x_unit, y_unit): # The normalization has the same unit of y self.K.unit = y_unit # The unit of f is 1 / [x] because fx must be a pure number. However, # np.pi of course doesn't have units, so we add a rad self.f.unit = x_unit ** (-1) * astropy_units.rad # The unit of phi is always the same (radians) self.phi.unit = astropy_units.rad # noinspection PyPep8Naming
[docs] def evaluate(self, x, K, f, phi): return K * np.sin(2 * np.pi * f * x + phi)
[docs] class DiracDelta(Function1D, metaclass=FunctionMeta): r""" description : return at zero_point latex : $ value $ parameters : value : desc : Constant value initial value : 0 zero_point: desc: value at which function is non-zero initial value : 0 fix : yes """ def _set_units(self, x_unit, y_unit): self.value.unit = y_unit self.zero_point.unit = x_unit
[docs] def evaluate(self, x, value, zero_point): out = np.zeros(x.shape) * value * 0 out[x == zero_point] = value return out
if has_naima: class Synchrotron(Function1D, metaclass=FunctionMeta): r""" description : Synchrotron spectrum from an input particle distribution, using Naima (naima.readthedocs.org) latex: not available parameters : B : desc : magnetic field initial value : 3.24e-6 unit: Gauss distance : desc : distance of the source initial value : 1.0 unit : kpc emin : desc : minimum energy for the particle distribution initial value : 1 fix : yes unit: GeV emax : desc : maximum energy for the particle distribution initial value : 510e3 fix : yes unit: GeV need: desc: number of points per decade in which to evaluate the function initial value : 10 min : 2 max : 100 fix : yes """ def _setup(self): # this is needed to load from # yaml for k, v in self._children.items(): if k not in self._parameters: # ok, this is probably a # particle distribution self.set_particle_distribution(v) break def _set_units(self, x_unit, y_unit): # This function can only be used as a spectrum, # so let's check that x_unit is a energy and y_unit is # differential flux if hasattr(x_unit, "physical_type") and x_unit.physical_type == "energy": # Now check that y is a differential flux current_units = get_units() should_be_unitless = y_unit * ( current_units.energy * current_units.time * current_units.area ) if ( not hasattr(should_be_unitless, "physical_type") or should_be_unitless.decompose().physical_type != "dimensionless" ): # y is not a differential flux raise InvalidUsageForFunction( "Unit for y is not differential flux. The function synchrotron " "can only be used as a spectrum." ) else: raise InvalidUsageForFunction( "Unit for x is not an energy. The function synchrotron can only be used " "as a spectrum" ) # we actually don't need to do anything as the units are already set up def set_particle_distribution(self, function): if "particle_distribution" in self._external_functions: self.unlink_external_function("particle_distribution") self.link_external_function(function, "particle_distribution") self._particle_distribution_name = function.name self._particle_distribution = function # Now set the units for the function current_units = get_units() self._particle_distribution.set_units( current_units.energy, current_units.energy ** (-1) ) # Naima wants a function which accepts a quantity as x (in units of eV) and returns an astropy quantity, # so we need to create a wrapper which will remove the unit from x and add the unit to the return # value self._particle_distribution_wrapper = lambda x: old_div( function(x.value), current_units.energy ) def get_particle_distribution(self): return self._external_functions["particle_distribution"] particle_distribution = property( get_particle_distribution, set_particle_distribution, doc="""Get/set particle distribution for electrons""", ) def fix_units(self, x, B, distance, emin, emax): if isinstance(x, u.Quantity): return ( True, x.to(get_units().energy), B.to(u.Gauss), distance.to(u.kpc), emin.to(u.GeV), emax.to(u.GeV), ) else: return ( False, x * (get_units().energy), B * (u.Gauss), distance * (u.kpc), emin * (u.GeV), emax * (u.GeV), ) # noinspection PyPep8Naming def evaluate(self, x, B, distance, emin, emax, need): has_units, x, B, distance, emin, emax = self.fix_units( x, B, distance, emin, emax ) _synch = naima.models.Synchrotron( self._particle_distribution_wrapper, B, Eemin=emin, Eemax=emax, nEed=need, ) if has_units: return _synch.flux(x, distance=distance) else: return _synch.flux(x, distance=distance).value def to_dict(self, minimal=False): data = super(Function1D, self).to_dict(minimal) # if not minimal: # data["extra_setup"] = { # "particle_distribution": self.particle_distribution.path # } return data class _ComplexTestFunction(Function1D, metaclass=FunctionMeta): r""" description : A useless function to be used during automatic tests latex: not available parameters : A : desc : none initial value : 3.24e-6 min : 1e-6 max : 1e-5 B : desc : none initial value : -10 min : -100 max : 100 delta : 0.1 properties: file_name: desc: a file name defer: True dummy: desc: a dummy property initial value: test allowed values: - test - love """ def _set_units(self, x_unit, y_unit): self.A.unit = y_unit self.B.unit = old_div(y_unit, x_unit) def set_particle_distribution(self, function): if "particle_distribution" in self._external_functions: self.unlink_external_function("particle_distribution") self.link_external_function(function, "particle_distribution") self._particle_distribution_name = function.name self._particle_distribution = function def get_particle_distribution(self): return self._external_functions["particle_distribution"] particle_distribution = property( get_particle_distribution, set_particle_distribution, doc="""Get/set particle distribution for electrons""", ) # noinspection PyPep8Naming def evaluate(self, x, A, B): return A + B * x def to_dict(self, minimal=False): data = super(Function1D, self).to_dict(minimal) # if not minimal: # data["extra_setup"] = { # "particle_distribution": self.particle_distribution.name # } return data
[docs] class Log_parabola(Function1D, metaclass=FunctionMeta): r""" description : A log-parabolic function. NOTE that we use the high-energy convention of using the natural log in place of the base-10 logarithm. This means that beta is a factor 1 / log10(e) larger than what returned by those software using the other convention. latex : $ K \left( \frac{x}{piv} \right)^{\alpha -\beta \log{\left( \frac{x}{piv} \right)}} $ parameters : K : desc : Normalization initial value : 1.0 is_normalization : True transformation : log10 min : 1e-30 max : 1e5 piv : desc : Pivot (keep this fixed) initial value : 1 fix : yes alpha : desc : index initial value : -2.0 beta : desc : curvature (positive is concave, negative is convex) initial value : 1.0 """ def _set_units(self, x_unit, y_unit): # K has units of y self.K.unit = y_unit # piv has the same dimension as x self.piv.unit = x_unit # alpha and beta are dimensionless self.alpha.unit = astropy_units.dimensionless_unscaled self.beta.unit = astropy_units.dimensionless_unscaled
[docs] def evaluate(self, x, K, piv, alpha, beta): # print("Receiving %s" % ([K, piv, alpha, beta])) xx = np.divide(x, piv) try: return K * xx ** (alpha - beta * np.log(xx)) except ValueError: # The current version of astropy (1.1.x) has a bug for which quantities that have become # dimensionless because of a division (like xx here) are not recognized as such by the power # operator, which throws an exception: ValueError: Quantities and Units may only be raised to a scalar power # This is a quick fix, waiting for astropy 1.2 which will fix this xx = xx.to("") return K * xx ** (alpha - beta * np.log(xx))
@property def peak_energy(self): """ Returns the peak energy in the nuFnu spectrum :return: peak energy in keV """ # Eq. 6 in Massaro et al. 2004 # (http://adsabs.harvard.edu/abs/2004A%26A...413..489M) return self.piv.value * pow( 10, old_div(((2 + self.alpha.value) * np.log(10)), (2 * self.beta.value)), )
if has_gsl: class Cutoff_powerlaw_flux(Function1D, metaclass=FunctionMeta): r""" description : A cutoff power law having the flux as normalization, which should reduce the correlation among parameters. latex : $ \frac{F}{T(b)-T(a)} ~x^{index}~\exp{(-x/x_{c})}~\text{with}~T(x)=-x_{c}^{index+1} \Gamma(index+1, x/C)~\text{(}\Gamma\text{ is the incomplete gamma function)} $ parameters : F : desc : Integral between a and b initial value : 1e-5 is_normalization : True index : desc : photon index initial value : -2.0 xc : desc : cutoff position initial value : 50.0 a : desc : lower bound for the band in which computing the integral F initial value : 1.0 fix : yes b : desc : upper bound for the band in which computing the integral F initial value : 100.0 fix : yes """ def _set_units(self, x_unit, y_unit): # K has units of y * x self.F.unit = y_unit * x_unit # alpha is dimensionless self.index.unit = astropy_units.dimensionless_unscaled # xc, a and b have the same dimension as x self.xc.unit = x_unit self.a.unit = x_unit self.b.unit = x_unit @staticmethod def _integral(a, b, index, ec): ap1 = index + 1 def integrand(x): return -pow(ec, ap1) * gamma_inc(ap1, old_div(x, ec)) return integrand(b) - integrand(a) def evaluate(self, x, F, index, xc, a, b): this_integral = self._integral(a, b, index, xc) return ( F / this_integral * np.power(x, index) * np.exp(-1 * np.divide(x, xc)) )
[docs] class Exponential_cutoff(Function1D, metaclass=FunctionMeta): r""" description : An exponential cutoff latex : $ K \exp{(-x/xc)} $ parameters : K : desc : Normalization initial value : 1.0 fix : no is_normalization : True xc : desc : cutoff initial value : 100 min : 1 """ def _set_units(self, x_unit, y_unit): # K has units of y self.K.unit = y_unit # piv has the same dimension as x self.xc.unit = x_unit
[docs] def evaluate(self, x, K, xc): return K * np.exp(np.divide(x, -xc))