Source code for refl1d.garefl

"""
Load garefl models into refl1d.

The models themselves don't need to be modified.  See the garefl documentation
for setting up the model.

One extension provided to refl1d that is not available in garefl is the use
of penalty values in the constraints.  The model constraints is able to set::

    fit[0].penalty = FIT_REJECT_PENALTY + distance

Here, *distance* is the distance to the valid region of the search space so
that any fitter that gets lost in a penalty region can more quickly return
to the valid region.  Any penalty value above *FIT_REJECT_PENALTY* will
suppress the evaluation of the model at that point during the fit.

Consider a model with layers (Si | Au | FeNi | air) and the constraint that
d_Au + d_FeNi < 200 A.  The constraints function would be written something
like::

    double excess = fit[0].m.d[1] + fit[0].m.d[2] - 200;
    fit[0].penalty = excess > 0 ? excess*excess+FIT_REJECT_PENALTY : 0.;

Then, if the fit algorithm proposes a value such as Au=125, FeNi=90, the
excess will be 15, and the penalty will be FIT_REJECT_PENALTY+225.

You can use penalties less than *FIT_REJECT_PENALTY*, but these should
correspond to the negative log likelihood of seeing that constraint value
within the model in order for the MCMC uncertainty analysis to work correctly.
*FIT_REJECT_PENALTY* is set to 1e6, which should be high enough that it
doesn't perturb the fit.
"""
__all__ = ["load"]

import sys
import os
from os import getpid
from ctypes import CDLL, c_int, c_double, c_void_p, c_char_p, byref
from threading import current_thread

import numpy as np
from numpy import empty, zeros, array

from bumps.parameter import Parameter, to_dict
from bumps.fitproblem import FitProblem

from .probe import QProbe, PolarizedNeutronQProbe
from .experiment import Experiment
from .model import Stack
from .profile import Microslabs
from .material import SLD, Vacuum

if sys.version_info[0] >= 3:
    def tostr(s):
        return s.decode('ascii')
else:
    def tostr(s):
        return s

def trace(fn):
    """simple function trace function"""
    return fn  # Comment this to turn on tracing
    def wrapper(*args, **kw):
        print("entering %s for thread %s:%s"
              %(fn.func_name, getpid(), current_thread()))
        ret = fn(*args, **kw)
        print("leaving %s for thread %s:%s"
              %(fn.func_name, getpid(), current_thread()))
        return ret
    return wrapper

[docs]def load(modelfile, probes=None): """ Load a garefl model file as an experiment. *modelfile* is a model.so file created from setup.c. *probes* is a list of datasets to fit to the models in the model file, or None if the model file provides its own data. """ M = experiment(modelfile, probes) constraints = M[0]._get_penalty if len(M) > 1: return FitProblem(M, constraints=constraints) else: return FitProblem(M[0], constraints=constraints)
def experiment(modelfile, probes=None): setup = GareflModel(modelfile) if probes: if len(probes) != setup.num_models: raise ValueError("Number of datasets must match number of models") M = [GareflExperiment(setup, k, probe=probes[k]) for k in range(setup.num_models)] else: M = [GareflExperiment(setup, k) for k in range(setup.num_models)] names = setup.par_names() low, high = setup.par_bounds() value = setup.par_values() pars = [Parameter(v, name=s, bounds=(L, H)) for v, s, L, H in zip(value, names, low, high)] M[0]._pars = pars return M NOTHING=Vacuum() NOTHING.name = '' class GareflExperiment(Experiment): def __init__(self, model, index, dz=1, step_interfaces=None, probe=None): self.model = model self.index = index if probe is None: probe = model.get_probe(index) else: model.set_probe(probe) self.probe = probe self.sample = Stack([NOTHING, NOTHING]) self.sample[0].interface.fittable = False self.step_interfaces = True self._slabs = Microslabs(1, dz=dz) self._cache = {} # Cache calculated profiles/reflectivities self._pars = None self.roughness_limit = 2.35 self._substrate = SLD(name='substrate', rho=0) self._surface = SLD(name='surface', rho=0) self._name = None self.interpolation = 0 def parameters(self): return self._pars def to_dict(self): return to_dict({ 'type': type(self).__name__, 'dll_path': self.model._dll_path, 'index': self.index, 'parameters': self.parameters(), }) def _render_slabs(self): """ Build a slab description of the model from the individual layers. """ key = 'rendered' if key not in self._cache: if self._pars is not None: pvec = array([p.value for p in self._pars], 'd') self._chisq = self.model.update_model(pvec, forced=True) self._slabs.clear() w, rho, irho, rhoM, thetaM = self.model.get_profile(self.index) rho, irho, rhoM = 1e6*rho, 1e6*irho, 1e6*rhoM # remove zeros self._slabs.extend(w=w, rho=rho[None, :], irho=irho[None, :]) # TODO: What about rhoM, thetaM # Set values for the Fresnel-normalized reflectivity plot self._substrate.rho.value = rho[0] self._substrate.irho.value = irho[0] self._surface.rho.value = rho[-1] self._surface.irho.value = irho[-1] self._cache[key] = True return self._slabs def _get_penalty(self): """ Update the model if necessary and return the penalty value for the point. """ self._render_slabs() return self.model.get_penalty() def amplitude(self, resolution=True): """ Calculate reflectivity amplitude at the probe points. """ raise NotImplementedError("amplitude not yet available from garefl") def reflectivity(self, resolution=True, interpolation=0): """ Calculate predicted reflectivity. """ key = 'reflectivity' if key not in self._cache: self._render_slabs() # Force recacluation if self.probe.polarized: Q, Rmm = self.model.get_reflectivity(self.index, 0) Q, Rmp = self.model.get_reflectivity(self.index, 1) Q, Rpm = self.model.get_reflectivity(self.index, 2) Q, Rpp = self.model.get_reflectivity(self.index, 3) self._cache[key] = Q, (Rmm, Rmp, Rpm, Rpp) else: Q, R = self.model.get_reflectivity(self.index, 0) self._cache[key] = Q, R return self._cache[key] def output_model(self): self.model.output_model() class GareflModel(object): def __init__(self, path): self._dll_path = os.path.abspath(path) self._load_dll() self._setup_model() @trace def _load_dll(self): dll = CDLL(self._dll_path) dll.ex_get_data.restype = c_char_p dll.ex_set_data.restype = c_int dll.setup_models.restype = c_void_p dll.ex_par_name.restype = c_char_p dll.ex_get_penalty.restype = c_double self.dll = dll self.num_models = 0 @trace def _setup_model(self): if self.num_models: raise RuntimeError("Model already loaded") MODELS = c_int() self.models = c_void_p(self.dll.setup_models(byref(MODELS))) self.num_models = MODELS.value self.num_pars = self.dll.ex_npars(self.models) lo, hi = self._par_bounds() small = np.max(np.vstack((abs(lo), abs(hi))), axis=0) < 1e-3 self.scale = np.where(small, 1e6, 1) # TODO: better way to force recalc on load self.update_model(self.par_values()) # Pickle protocol doesn't support ctypes linkage; reload the # module on the other side. def __getstate__(self): return self._dll_path def __setstate__(self, state): self._dll_path = state self._load_dll() self._setup_model() def clear_model(self): if self.num_models: self.dll.ex_fit_destroy(self.models) self.num_models = 0 @trace def update_model(self, p, weighted=1, approximate_roughness=0, forced=False): p = p/self.scale #assert p.flags.aligned and (p.flags.c_contiguous or p.flags.f_contiguous) #assert p.size == self.num_pars self.dll.ex_set_pars(self.models, p.ctypes) chisq = self.dll.ex_update_models(self.models, self.num_models, weighted, approximate_roughness, int(forced)) return chisq @trace def get_probe(self, k): """ Return a probe for an individual garefl model. """ data = [self._getdata(k, xs) for xs in range(4)] if any(d is not None for d in data[1:]): probe = PolarizedNeutronQProbe(xs=data) else: probe = data[0] return probe def _getdata(self, k, xs): """ Convert a single model into a probe """ n = self.dll.ex_ndata(self.models, k, xs) if n == 0: return None data = empty((n, 4), 'd') filename = tostr(self.dll.ex_get_data(self.models, k, xs, data.ctypes)) Q, dQ, R, dR = data.T probe = QProbe(Q, dQ, data=(R, dR), name=filename) return probe @trace def set_probe(self, k, probe): """ Return a probe for an individual garefl model. """ if probe.polarized: for xs, probe_xs in enumerate(probe.xs): self._setdata(k, xs, probe_xs) else: self._setdata(k, 0, probe) def _setdata(self, k, xs, probe): if probe is not None: n = probe.Q data = empty((n, 4), dtype='d', order='F') data[:, 0] = probe.Q data[:, 1] = probe.dQ data[:, 2] = probe.R data[:, 3] = probe.dR else: n = 0 data = empty((n, 4), 'd') result = self.dll.ex_set_data(self.models, k, xs, n, data.ctypes) if result < 0: raise RuntimeError("unable to create data in garefl") @trace def get_profile(self, k): n = self.dll.ex_nprofile(self.models, k) w, rho, irho, rhoM, thetaM = [zeros(n, 'd') for _ in range(5)] self.dll.ex_get_profile(self.models, k, w.ctypes, rho.ctypes, irho.ctypes, rhoM.ctypes, thetaM.ctypes) return w[::-1], rho[::-1], irho[::-1], rhoM[::-1], thetaM[::-1] @trace def get_reflectivity(self, k, xs): n = self.dll.ex_ncalc(self.models, k) Q, R = empty(n, 'd'), empty(n, 'd') self.dll.ex_get_reflectivity(self.models, k, xs, Q.ctypes, R.ctypes) return Q, R @trace def get_penalty(self): #print "penalty", self.dll.ex_get_penalty(self.models), self.par_values() return self.dll.ex_get_penalty(self.models) @trace def output_model(self): """ Run the output_model function """ return self.dll.ex_output_model(self.models) @trace def par_names(self): return [tostr(self.dll.ex_par_name(self.models, i)) for i in range(self.num_pars)] @trace def par_bounds(self): return self._par_bounds()*self.scale @trace def _par_bounds(self): lo, hi = empty(self.num_pars, 'd'), empty(self.num_pars, 'd') self.dll.ex_par_bounds(self.models, lo.ctypes, hi.ctypes) return lo, hi @trace def par_values(self): p = empty(self.num_pars, 'd') self.dll.ex_par_values(self.models, p.ctypes) return p*self.scale