# coding=utf-8
# Author: Paul Kienzle
"""
Experimental probe.
The experimental probe describes the incoming beam for the experiment.
Scattering properties of the sample are dependent on the type and
energy of the radiation.
See :ref:`data-guide` for details.
"""
# TOF stitching introduces a lot of complexity.
# Theta offset:
# Q1 = 4 pi sin(T1 + offset)/L1
# Q2 = 4 pi sin(T2 + offset)/L2
# at offset=0,
# Q1=Q2
# at offset!=0,
# Q1' ~ Q1 + 4pi offset/L1
# Q2' ~ Q2 + 4pi offset/L2
# => Q1' != Q2'
# Thick layers:
# Since a given Q, dQ has multiple T, dT, L, dL, oversampling is going
# to be very complicated.
# Energy dependent SLD:
# Just because two points are at the same Q does not mean they have
# the same theory function when scattering length density is
# energy dependent
#
# Not stitching has its own issues.
# Calculation speed:
# overlapping points are recalculated
# profiles are recalculated
#
# Unstitched seems like the better bet.
import os
import warnings
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Sequence, Tuple, Union
from bumps.parameter import Parameter, to_dict
from bumps.plotutil import auto_shift, coordinated_colors
from bumps.util import USE_PYDANTIC, field, field_desc
if USE_PYDANTIC or TYPE_CHECKING:
from bumps.util import NDArray
import numpy as np
import numpy.fft
import numpy.random
from numpy import log, pi, sign, sqrt
from periodictable import nsf, xsf
from ..sample.material import Vacuum
from ..sample.reflectivity import BASE_GUIDE_ANGLE, convolve
from ..utils import asbytes
from . import fresnel
from .resolution import QL2T, TL2Q, dQ_broadening, dTdL2dQ
from .stitch import stitch
PROBE_KW = (
"T",
"dT",
"L",
"dL",
"data",
"name",
"filename",
"intensity",
"background",
"back_absorption",
"sample_broadening",
"theta_offset",
"back_reflectivity",
"data",
)
[docs]
def make_probe(**kw):
"""
Return a reflectometry measurement object of the given resolution.
"""
radiation = kw.pop("radiation")
kw = dict((k, v) for k, v in kw.items() if k in PROBE_KW)
if radiation == "neutron":
return NeutronProbe(**kw)
else:
return XrayProbe(**kw)
[docs]
@dataclass
class OversampledRegion:
Q_start: float
Q_end: float
n: int # number of points
[docs]
class BaseProbe:
intensity: Parameter
background: Parameter
back_absorption: Parameter
Q: "NDArray"
R: "NDArray"
name: Optional[str] = None
dR: Optional[Union[Sequence, "NDArray"]] = None
dQ: Optional[Union[Sequence, "NDArray"]] = None
back_reflectivity: bool = False
resolution: Literal["normal", "uniform"] = "uniform"
_Ro: Optional[Union[Sequence, "NDArray"]] = field(init=False)
view = "log"
plot_shift = 0
residuals_shift = 0
show_resolution = True
@property
def calc_Q(self):
"""Define in derived classes"""
raise NotImplementedError
[docs]
def log10_to_linear(self):
"""
Convert data from log to linear.
Older reflectometry reduction code stored reflectivity in log base 10
format. Call probe.log10_to_linear() after loading this data to
convert it to linear for subsequent display and fitting.
"""
if self.R is not None:
self.R = 10**self.R
if self.dR is not None:
self.dR = self.R * self.dR * log(10)
[docs]
def resynth_data(self):
"""
Generate new data according to the model R' ~ N(R, dR).
The resynthesis step is a precursor to refitting the data, as is
required for certain types of monte carlo error analysis. The
first time it is run it will save the original R into Ro. If you
reset R in the probe you will also need to reset Ro so that it
is used for subsequent resynth analysis.
"""
if not hasattr(self, "_Ro"):
self._Ro = self.R
self.R = self._Ro + numpy.random.randn(*self._Ro.shape) * self.dR
[docs]
def restore_data(self):
"""
Restore the original data after resynth.
"""
self.R = self._Ro
del self._Ro
# CRUFT: Ro doesn't need to be part of the public interface.
@property
def Ro(self):
warnings.warn("Use probe.R instead of probe.Ro.", DeprecationWarning)
return getattr(self, "_Ro", self.R)
[docs]
def simulate_data(self, theory, noise=2.0):
r"""
Set the data for the probe to R + eps with eps ~ normal(dR^2).
*theory* is (Q, R),
If the percent *noise* is provided, set dR to R*noise/100 before
simulating. *noise* defaults to 2% if no dR is present.
Note that measured data estimates uncertainty from the number of
counts. This means that points above the true value will have
larger uncertainty than points below the true value. This bias
is not captured in the simulated data.
"""
# Minimum value for dR after noise is added.
# TODO: does this need to be a parameter?
noise_floor = 1e-11
# Set the theory function.
R = np.array(theory[1], "d") # Force copy
assert R.shape == self.Q.shape
self.R = R
# Make sure scalar noise is positive. This check is here to so that
# old interfaces will fail properly.
if np.isscalar(noise) and noise <= 0.0:
raise ValueError("Noise level must be positive")
# If dR is missing then default noise to 2% so that dR will be set.
if self.dR is None and noise is None:
noise = 2.0
# Set dR if noise was given or otherwise defaulted.
if noise is not None:
self.dR = 0.01 * np.asarray(noise) * self.R
self.dR[self.dR < noise_floor] = noise_floor
# Add noise according to dR.
self.R += numpy.random.randn(*self.R.shape) * self.dR
[docs]
def Q_c(self, substrate=None, surface=None):
Srho, Sirho = (0, 0) if substrate is None else substrate.sld(self)[:2]
Vrho, Virho = (0, 0) if surface is None else surface.sld(self)[:2]
drho = Srho - Vrho if not self.back_reflectivity else Vrho - Srho
Q_c = sign(drho) * sqrt(16 * pi * abs(drho) * 1e-6)
return Q_c
[docs]
def resolution_guard(self):
r"""
Make sure each measured $Q$ point has at least 5 calculated $Q$
points contributing to it in the range $[-3\Delta Q, 3\Delta Q]$.
*Not Implemented*
"""
raise NotImplementedError
# TODO: implement resolution guard.
def _apply_resolution(self, Qin, Rin, interpolation):
"""
Apply the instrument resolution function
"""
Q, dQ = _interpolate_Q(self.Q, self.dQ, interpolation)
if np.iscomplex(Rin).any():
R_real = convolve(Qin, Rin.real, Q, dQ, resolution=self.resolution)
R_imag = convolve(Qin, Rin.imag, Q, dQ, resolution=self.resolution)
R = R_real + 1j * R_imag
else:
R = convolve(Qin, Rin, Q, dQ, resolution=self.resolution)
return Q, R
[docs]
def apply_beam(self, calc_Q, calc_R, resolution=True, interpolation=0):
r"""
Apply factors such as beam intensity, background, backabsorption,
resolution to the data.
*resolution* is True if the resolution function should be applied
to the reflectivity.
*interpolation* is the number of Q points to show between the
nominal Q points of the probe. Use this to draw a smooth theory
line between the data points. The resolution dQ is interpolated
between the resolution of the surrounding Q points.
If an amplitude signal is provided, $r$ will be scaled by
$\surd I + i \surd B / |r|$, which when squared will equal
$I |r|^2 + B$. The resolution function will be applied directly
to the amplitude. Unlike intensity and background, the resulting
$|G \ast r|^2 \ne G \ast |r|^2$ for convolution operator $\ast$,
but it should be close.
"""
# Note: in-place vector operations are not notably faster.
# Handle absorption through the substrate, which occurs when Q<0
# (condition)*C is C when condition is True or 0 when False,
# (condition)*(C-1)+1 is C when condition is True or 1 when False.
back = (calc_Q < 0) * (self.back_absorption.value - 1) + 1
calc_R = calc_R * back
# For back reflectivity, reverse the sign of Q after computing
if self.back_reflectivity:
calc_Q = -calc_Q
if calc_Q[-1] < calc_Q[0]:
calc_Q, calc_R = [v[::-1] for v in (calc_Q, calc_R)]
if resolution:
Q, R = self._apply_resolution(calc_Q, calc_R, interpolation)
else:
# Given that the target Q points should be in the set of
# calculated Q values, interp will give us the
# values of Q at the appropriate R, even if there are
# guard values, or otherwise some mixture of calculated
# Q values. The cost of doing so is going to be n log n
# in the size of Q, which is a bit pricey, but let's see
# if it is a problem before optimizing.
Q, dQ = _interpolate_Q(self.Q, self.dQ, interpolation)
Q, R = self.Q, np.interp(Q, calc_Q, calc_R)
if np.iscomplex(R).any():
# When R is an amplitude you can scale R by sqrt(A) to reproduce
# the effect of scaling the intensity in the reflectivity. To
# reproduce the effect of adding a background you can fiddle the
# phase of r as well using:
# s = (sqrt(A) + i sqrt(B)/|r|) r
# then
# |s|^2 = |sqrt(A) + i sqrt(B)/|r||^2 |r|^2
# = (A + B/|r|^2) |r|^2
# = A |r|^2 + B
# Note that this cannot work for negative background since
# |s|^2 >= 0 always, whereas negative background could push the
# reflectivity below zero.
R = np.sqrt(self.intensity.value) * R
if self.background.value > 0:
R += 1j * np.sqrt(self.background.value) * R / abs(R)
else:
R = self.intensity.value * R + self.background.value
# return calc_Q, calc_R
return Q, R
[docs]
def fresnel(self, substrate=None, surface=None):
"""
Returns a Fresnel reflectivity calculator given the surface and
and substrate. The calculated reflectivity includes The Fresnel
reflectivity for the probe reflecting from a block of material with
the given substrate.
Returns F = R(probe.Q), where R is magnitude squared reflectivity.
"""
# Doesn't use ProbeCache, but this routine is not time critical
Srho, Sirho = (0, 0) if substrate is None else substrate.sld(self)[:2]
Vrho, Virho = (0, 0) if surface is None else surface.sld(self)[:2]
if self.back_reflectivity:
Srho, Vrho = Vrho, Srho
Sirho, Virho = Virho, Sirho
if Srho == Vrho:
Srho = Vrho + 1
# I = np.ones_like(self.Q)
I = 1
calculator = fresnel.Fresnel(rho=Srho * I, irho=Sirho * I, Vrho=Vrho * I, Virho=Virho * I)
return calculator
[docs]
def save(self, filename, theory, substrate=None, surface=None):
"""
Save the data and theory to a file.
"""
fresnel_calculator = self.fresnel(substrate, surface)
Q, FQ = self.apply_beam(self.calc_Q, fresnel_calculator(self.calc_Q))
Q, R = theory
if len(Q) != len(self.Q):
# Saving interpolated data
A = np.array((Q, R, np.interp(Q, self.Q, FQ)))
header = "# %17s %20s %20s\n" % ("Q (1/A)", "theory", "fresnel")
elif getattr(self, "R", None) is not None:
A = np.array((self.Q, self.dQ, self.R, self.dR, R, FQ))
header = "# %17s %20s %20s %20s %20s %20s\n" % ("Q (1/A)", "dQ (1/A)", "R", "dR", "theory", "fresnel")
else:
A = np.array((self.Q, self.dQ, R, FQ))
header = "# %17s %20s %20s %20s\n" % ("Q (1/A)", "dQ (1/A)", "theory", "fresnel")
header = ("# intensity: %.15g\n# background: %.15g\n" % (self.intensity.value, self.background.value)) + header
with open(filename, "wb") as fid:
# print("saving", A)
fid.write(asbytes(header))
np.savetxt(fid, A.T, fmt="%20.15g")
[docs]
def plot(self, view=None, **kwargs):
"""
Plot theory against data.
Need substrate/surface for Fresnel-normalized reflectivity
"""
view = view if view is not None else self.view
if view == "linear":
self.plot_linear(**kwargs)
elif view == "log":
self.plot_log(**kwargs)
elif view == "fresnel":
self.plot_fresnel(**kwargs)
elif view == "logfresnel":
self.plot_logfresnel(**kwargs)
elif view == "q4":
self.plot_Q4(**kwargs)
elif view == "resolution":
self.plot_resolution(**kwargs)
elif view.startswith("resid"):
self.plot_residuals(**kwargs)
elif view == "fft":
self.plot_fft(**kwargs)
elif view == "SA": # SA does not plot since it does not exist
pass
else:
raise TypeError("incorrect reflectivity view '%s'" % view)
[docs]
def plot_resolution(self, suffix="", label=None, **kwargs):
import matplotlib.pyplot as plt
plt.plot(self.Q, self.dQ, label=self.label(prefix=label, suffix=suffix))
plt.xlabel(r"Q ($\AA^{-1}$)")
plt.ylabel(r"Q resolution ($1-\sigma \AA^{-1}$)")
plt.title("Measurement resolution")
[docs]
def plot_linear(self, **kwargs):
"""
Plot the data associated with probe.
"""
import matplotlib.pyplot as plt
self._plot_pair(ylabel="Reflectivity", **kwargs)
plt.yscale("linear")
[docs]
def plot_log(self, **kwargs):
"""
Plot the data associated with probe.
"""
import matplotlib.pyplot as plt
self._plot_pair(ylabel="Reflectivity", **kwargs)
plt.yscale("log")
[docs]
def plot_logfresnel(self, *args, **kw):
"""
Plot the log Fresnel-normalized reflectivity associated with the probe.
"""
import matplotlib.pyplot as plt
self.plot_fresnel(*args, **kw)
plt.yscale("log")
[docs]
def plot_fresnel(self, substrate=None, surface=None, **kwargs):
r"""
Plot the Fresnel-normalized reflectivity associated with the probe.
Note that the Fresnel reflectivity has the intensity and background
applied before normalizing so that hydrogenated samples display more
cleanly. The formula to reproduce the graph is:
.. math::
R' = R / (F(Q) I + B)
\Delta R' = \Delta R / (F(Q) I + B)
where $I$ is the intensity and $B$ is the background.
"""
if substrate is None and surface is None:
raise TypeError("Fresnel-normalized reflectivity needs substrate or surface")
F = self.fresnel(substrate=substrate, surface=surface)
# print("substrate", substrate, "surface", surface)
def scale(Q, dQ, R, dR, interpolation=0):
Q, fresnel = self.apply_beam(self.calc_Q, F(self.calc_Q), interpolation=interpolation)
return Q, dQ, R / fresnel, dR / fresnel
if substrate is None:
name = "air:%s" % surface.name
elif surface is None or isinstance(surface, Vacuum):
name = substrate.name
else:
name = "%s:%s" % (substrate.name, surface.name)
self._plot_pair(scale=scale, ylabel="R/(R(%s)" % name, **kwargs)
[docs]
def plot_Q4(self, **kwargs):
r"""
Plot the Q**4 reflectivity associated with the probe.
Note that Q**4 reflectivity has the intensity and background applied
so that hydrogenated samples display more cleanly. The formula
to reproduce the graph is:
.. math::
R' = R / ( (100*Q)^{-4} I + B)
\Delta R' = \Delta R / ( (100*Q)^{-4} I + B )
where $B$ is the background.
"""
def scale(Q, dQ, R, dR, interpolation=0):
# Q4 = np.maximum(1e-8*Q**-4, self.background.value)
Q4 = 1e-8 * Q**-4 * self.intensity.value + self.background.value
return Q, dQ, R / Q4, dR / Q4
# Q4[Q4==0] = 1
self._plot_pair(scale=scale, ylabel="R (100 Q)^4", **kwargs)
def _plot_pair(
self,
theory=None,
scale=lambda Q, dQ, R, dR, interpolation=0: (Q, dQ, R, dR),
ylabel="",
suffix="",
label=None,
plot_shift=None,
**kwargs,
):
import matplotlib.pyplot as plt
c = coordinated_colors()
plot_shift = plot_shift if plot_shift is not None else Probe.plot_shift
trans = auto_shift(plot_shift)
if hasattr(self, "R") and self.R is not None:
Q, dQ, R, dR = scale(self.Q, self.dQ, self.R, self.dR)
if not self.show_resolution:
dQ = None
plt.errorbar(
Q,
R,
yerr=dR,
xerr=dQ,
capsize=0,
fmt=".",
color=c["light"],
transform=trans,
label=self.label(prefix=label, gloss="data", suffix=suffix),
)
if theory is not None:
# TODO: completely restructure interpolation handling
# Interpolation is used to show the theory curve between the
# data points. The _calc_Q points used to predict theory at
# the measured data are used for the interpolated Q points, with
# the resolution function centered on each interpolated value.
# The result is that when interpolation != 0, there are more
# theory points than data points, and we will need to accomodate
# this when computing normalization curves for Fresnel and Q^4
# reflectivity.
# Issues with current implementation:
# * If the resolution is too tight, there won't be sufficient
# support from _calc_Q to compute theory at Q interpolated.
# * dQ for the interpolated points uses linear interpolation
# of dQ between neighbours. If measurements with tight and
# loose resolution are interleaved the result will look very
# strange.
# * There are too many assumptions about interpolation shared
# between Experiment and Probe objects. In particular, the
# Fresnel object needs to be computed at the same interpolated
# points as the theory function.
# * If there are large gaps in the data the interpolation will
# not fill them in correctly. Perhaps we should set _Q_plot
# and _calc_Q_plot independently from the data?
# * We sometimes show the theory curve without resolution
# applied; this has not been tested with interpolation
interpolation = kwargs.get("interpolation", 0)
Q, R = theory
Q, dQ, R, dR = scale(Q, 0, R, 0, interpolation=interpolation)
plt.plot(
Q,
R,
"-",
color=c["dark"],
transform=trans,
label=self.label(prefix=label, gloss="theory", suffix=suffix),
)
# from numpy.fft import fft
# x, y = Q[1::2], abs(fft(R)[:(len(R)-1)//2])
# y = y * (R.max()/y.max())
# plt.plot(x, y, '-')
plt.xlabel("Q (inv Angstroms)")
plt.ylabel(ylabel)
# plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
h = plt.legend(fancybox=True, numpoints=1)
h.get_frame().set_alpha(0.5)
[docs]
def plot_residuals(self, theory=None, suffix="", label=None, plot_shift=None, **kwargs):
import matplotlib.pyplot as plt
plot_shift = plot_shift if plot_shift is not None else Probe.residuals_shift
trans = auto_shift(plot_shift)
if theory is not None and self.R is not None:
c = coordinated_colors()
Q, R = theory
# In case theory curve is evaluated at more/different points...
R = np.interp(self.Q, Q, R)
residual = (R - self.R) / self.dR
plt.plot(
self.Q, residual, ".", color=c["light"], transform=trans, label=self.label(prefix=label, suffix=suffix)
)
plt.axhline(1, color="black", ls="--", lw=1)
plt.axhline(0, color="black", lw=1)
plt.axhline(-1, color="black", ls="--", lw=1)
plt.xlabel("Q (inv A)")
plt.ylabel("(theory-data)/error")
plt.legend(numpoints=1)
[docs]
def plot_fft(self, theory=None, suffix="", label=None, substrate=None, surface=None, **kwargs):
"""
FFT analysis of reflectivity signal.
"""
raise NotImplementedError
import matplotlib.pyplot as plt
c = coordinated_colors()
if substrate is None and surface is None:
raise TypeError("FFT reflectivity needs substrate or surface")
F = self.fresnel(substrate=substrate, surface=surface)
# Qc = sqrt(16*pi*substrate)
Qc = 0
Qmax = max(self.Q)
T = np.linspace(Qc, Qmax, len(self.Q))
z = np.linspace(0, 2 * pi / Qmax, len(self.Q) // 2)
if hasattr(self, "R"):
signal = np.interp(T, self.Q, self.R / F(self.Q))
A = abs(numpy.fft.fft(signal - np.average(signal)))
A = A[: len(A) // 2]
plt.plot(z, A, ".-", color=c["light"], label=self.label(prefix=label, gloss="data", suffix=suffix))
if theory is not None:
Q, R = theory
signal = np.interp(T, Q, R / F(Q))
A = abs(numpy.fft.fft(signal - np.average(signal)))
A = A[: len(A) // 2]
plt.plot(z, A, "-", color=c["dark"], label=self.label(prefix=label, gloss="theory", suffix=suffix))
plt.xlabel("w (A)")
if substrate is None:
name = "air:%s" % surface.name
elif surface is None or isinstance(surface, Vacuum):
name = substrate.name
else:
name = "%s:%s" % (substrate.name, surface.name)
plt.ylabel("|FFT(R/R(%s))|" % name)
[docs]
def label(self, prefix=None, gloss="", suffix=""):
if not prefix:
prefix = self.name
if prefix:
return " ".join((prefix + suffix, gloss)) if gloss else prefix
else:
return suffix + " " + gloss if gloss else None
[docs]
@dataclass(init=False)
class Probe(BaseProbe):
r"""
Defines the incident beam used to study the material.
For calculation purposes, probe needs to return the values $Q_\text{calc}$
at which the model is evaluated. This is normally going to be the measured
points only, but for some systems, such as those with very thick layers,
oversampling is needed to avoid aliasing effects.
A measurement point consists of incident angle, angular resolution,
incident wavelength, FWHM wavelength resolution, reflectivity and
uncertainty in reflectivity.
A probe is a set of points, defined by vectors for point attribute. For
convenience, the attribute can be initialized with a scalar if it is
constant throughout the measurement, but will be set to a vector in
the probe. The attributes are initialized as follows:
*T* : float or [float] | degrees
Incident angle
*dT* : float or [float] | degrees
FWHM angular divergence
*L* : float or [float] | |Ang|
Incident wavelength
*dL* : float or [float] | |Ang|
FWHM wavelength dispersion
*data* : ([float], [float])
R, dR reflectivity measurement and uncertainty
*dQ* : [float] or None | |1/Ang|
1-\$sigma$ Q resolution when it cannot be computed directly
from angular divergence and wavelength dispersion.
*resolution* : 'normal' or 'uniform'
Distribution function for Q resolution.
Measurement properties:
*intensity* : float or Parameter
Beam intensity
*background* : float or Parameter
Constant background
*back_absorption* : float or Parameter
Absorption through the substrate relative to beam intensity.
A value of 1.0 means complete transmission; a value of 0.0
means complete absorption.
*theta_offset* : float or Parameter
Offset of the sample from perfect alignment
*sample_broadening* : float or Parameter
Additional FWHM angular divergence from sample curvature.
Scale 1-$\sigma$ rms by $2 \surd(2 \ln 2) \approx 2.35$ to convert
to FWHM.
*back_reflectivity* : True or False
True if the beam enters through the substrate
Measurement properties are fittable parameters. *theta_offset* in
particular should be set using *probe.theta_offset.dev(dT)*, with *dT*
equal to the FWHM uncertainty in the peak position for the rocking curve,
as measured in radians. Changes to *theta_offset* will then be penalized
in the cost function for the fit as if it were another measurement. Use
:meth:`alignment_uncertainty` to compute dT from the shape of the
rocking curve.
Sample broadening adjusts the existing Q resolution rather than
recalculating it. This allows it the resolution to describe more
complicated effects than a simple gaussian distribution of wavelength
and angle will allow. The calculation uses the mean wavelength, angle
and angular divergence. See :func:`resolution.dQ_broadening` for details.
*intensity* and *back_absorption* are generally not needed --- scaling
the reflected signal by an appropriate intensity measurement will correct
for both of these during reduction. *background* may be needed,
particularly for samples with significant hydrogen content due to its
large isotropic incoherent scattering cross section.
"""
# Fields:
intensity: Parameter
background: Parameter
back_absorption: Parameter
theta_offset: Parameter
sample_broadening: Parameter
name: Optional[str] = None
filename: Optional[str] = None
back_reflectivity: bool = False
R: Optional[Any] = None
dR: Optional[Any] = 0
T: "NDArray" = field_desc("List of theta values (incident angle)")
dT: Optional[Any] = 0
L: "NDArray" = field_desc("List of lambda values (wavelength, in Angstroms)")
dL: Optional[Any] = 0
dQo: Optional[Union[Sequence, "NDArray"]] = None
resolution: Literal["normal", "uniform"] = "uniform"
oversampling: Optional[int] = None
oversampling_seed: int = 1
oversampled_regions: List[OversampledRegion] = field(default_factory=list)
radiation: Literal["neutron", "xray"] = "xray"
polarized = False
Aguide = BASE_GUIDE_ANGLE # default guide field for unpolarized measurements
view = "log"
plot_shift = 0
residuals_shift = 0
show_resolution = True
[docs]
@classmethod
def from_dict(cls, **kw):
R = kw.pop("R", None)
dR = kw.pop("dR", None)
if R is not None and dR is not None:
kw["data"] = (R, dR)
radiation = kw.pop("radiation")
if radiation == "neutron":
return NeutronProbe(**kw)
else:
return XrayProbe(**kw)
def __init__(
self,
T=None,
dT=0,
L=None,
dL=0,
data=None,
intensity=1,
background=0,
back_absorption=1,
theta_offset=0,
sample_broadening=0,
back_reflectivity=False,
name: Optional[str] = None,
filename=None,
dQo=None,
resolution: Literal["normal", "uniform"] = "normal",
oversampling=None,
oversampling_seed=1,
oversampled_regions: Optional[List[OversampledRegion]] = None,
):
if T is None or L is None:
raise TypeError("T and L required")
if sample_broadening is None:
sample_broadening = 0
if theta_offset is None:
theta_offset = 0
if not name and filename:
name = os.path.splitext(os.path.basename(filename))[0]
qualifier = " " + name if name is not None else ""
self.intensity = Parameter.default(intensity, name="intensity" + qualifier)
self.background = Parameter.default(background, name="background" + qualifier)
self.back_absorption = Parameter.default(back_absorption, name="back_absorption" + qualifier, limits=(0.0, 1.0))
self.theta_offset = Parameter.default(theta_offset, name="theta_offset" + qualifier)
self.sample_broadening = Parameter.default(sample_broadening, name="sample_broadening" + qualifier)
self.back_reflectivity = back_reflectivity
if data is not None:
R, dR = data
R = np.array(R)
dR = np.array(dR)
else:
R, dR = None, None
self._set_TLR(T, dT, L, dL, R, dR, dQo)
self.name = name
self.filename = filename
self.resolution = resolution
self.oversampling = oversampling
self.oversampling_seed = oversampling_seed
self.oversampled_regions = oversampled_regions if oversampled_regions is not None else []
self._apply_oversamplings()
def _set_TLR(self, T, dT, L, dL, R, dR, dQ):
# if L is None:
# L = xsf.xray_wavelength(E)
# dL = L * dE/E
# else:
# E = xsf.xray_energy(L)
# dE = E * dL/L
T = np.array(T, np.float64)
L = np.array(L, np.float64)
Q = TL2Q(T=T, L=L)
if dQ is not None:
dQ = np.asarray(dQ)
else:
dQ = dTdL2dQ(T=T, dT=dT, L=L, dL=dL)
# Make sure that we are dealing with vectors
T, dT, L, dL = [np.ones_like(Q) * v for v in (T, dT, L, dL)]
# remove nan
nan_indices = set()
for column in [T, dT, L, dL, R, dR, Q, dQ]:
if column is not None:
indices = np.argwhere(np.isnan(column)).flatten()
nan_indices.update(indices)
nan_indices = list(nan_indices)
T, dT, L, dL, R, dR, Q, dQ = (
np.delete(c, nan_indices) if c is not None else None for c in [T, dT, L, dL, R, dR, Q, dQ]
)
# Probe stores sorted values for convenience of resolution calculator
idx = np.argsort(Q)
self.T, self.dT = T[idx], dT[idx]
self.L, self.dL = L[idx], dL[idx]
self.Qo, self.dQo = Q[idx], dQ[idx]
if R is not None:
R = R[idx]
if dR is not None:
dR = dR[idx]
self.R = R
self.dR = dR
# By default the calculated points are the measured points. Use
# oversample() for a more accurate resolution calculations.
self._set_calc(self.T, self.L)
def _QdQ_key(self):
"""
Return a tuple that changes if the calculated Q or dQ values of this
probe object change (when theta_offset or sample_broadening change).
"""
return (self.theta_offset.value, self.sample_broadening.value)
[docs]
@staticmethod
def alignment_uncertainty(w, I, d=0):
r"""
Compute alignment uncertainty.
**Parameters:**
*w* : float | degrees
Rocking curve full width at half max.
*I* : float | counts
Rocking curve integrated intensity.
*d* = 0: float | degrees
Motor step size
**Returns:**
*dtheta* : float | degrees
uncertainty in alignment angle
"""
return sqrt(w**2 / I + d**2 / 12.0)
[docs]
def write_data(self, filename, columns=("Q", "R", "dR"), header=None):
"""
Save the data to a file.
*header* is a string with trailing \\n containing the file header.
*columns* is a list of column names from Q, dQ, R, dR, L, dL, T, dT.
The default is to write Q, R, dR data.
"""
if header is None:
header = "# %s\n" % " ".join(columns)
with open(filename, "wb") as fid:
fid.write(asbytes(header))
data = np.vstack([getattr(self, c) for c in columns])
np.savetxt(fid, data.T)
def _set_calc(self, T, L):
Q = TL2Q(T=T, L=L)
idx = np.argsort(Q)
self.calc_T = T[idx]
self.calc_L = L[idx]
self._calc_Q = np.unique(Q[idx])
# Only keep the scattering factors that you need
self.unique_L = np.unique(self.calc_L)
self._L_idx = np.searchsorted(self.unique_L, L)
@property
def Q(self):
if self.theta_offset.value != 0:
Q = TL2Q(T=self.T + self.theta_offset.value, L=self.L)
# TODO: this may break the Q order on measurements with varying L
else:
Q = self.Qo
return Q
@Q.setter
def Q(self, Q):
# If we explicity set Q, then forget what we know about T and L.
# This will cause theta offset != 0 to fail.
if hasattr(self, "T"):
del self.T, self.L
self.Qo = Q
@property
def dQ(self):
if self.sample_broadening.value == 0:
dQ = self.dQo
else:
dQ = dQ_broadening(dQ=self.dQo, L=self.L, T=self.T, dT=self.dT, width=self.sample_broadening.value)
return dQ
@dQ.setter
def dQ(self, dQ):
self.dQo = dQ
@property
def calc_Q(self):
# TODO: cache this value if theta_offset is not changing
if self.theta_offset.value != 0:
# NOTE: np.unique sorts the array
Q = np.unique(TL2Q(T=self.calc_T + self.theta_offset.value, L=self.calc_L))
else:
Q = self._calc_Q
return Q if not self.back_reflectivity else -Q
[docs]
def parameters(self):
return {
"intensity": self.intensity,
"background": self.background,
"back_absorption": self.back_absorption,
"theta_offset": self.theta_offset,
"sample_broadening": self.sample_broadening,
}
[docs]
def to_dict(self):
"""Return a dictionary representation of the parameters"""
return to_dict(
{
"type": type(self).__name__,
"name": self.name,
"filename": self.filename,
"intensity": self.intensity,
"background": self.background,
"back_absorption": self.back_absorption,
"theta_offset": self.theta_offset,
"sample_broadening": self.sample_broadening,
}
)
[docs]
def scattering_factors(self, material, density):
"""
Returns the scattering factors associated with the material given
the range of wavelengths/energies used in the probe.
"""
raise NotImplementedError("need radiation type in <%s> to compute sld for %s" % (self.filename, material))
[docs]
def subsample(self, dQ):
"""
Select points at most every dQ.
Use this to speed up computation early in the fitting process.
This changes the data object, and is not reversible.
The current algorithm is not picking the "best" Q value, just the
nearest, so if you have nearby Q points with different quality
statistics (as happens in overlapped regions from spallation
source measurements at different angles), then it may choose
badly. Simple solutions based on the smallest relative error dR/R
will be biased toward peaks, and smallest absolute error dR will
be biased toward valleys.
"""
# Assumes self contains sorted Qo and associated T, L
# Note: _calc_Q is already sorted
Q = np.arange(self.Qo[0], self.Qo[-1], dQ)
idx = np.unique(np.searchsorted(self.Qo, Q))
# print len(idx), len(self.Qo)
self.T, self.dT = self.T[idx], self.dT[idx]
self.L, self.dL = self.L[idx], self.dL[idx]
self.Qo, self.dQo = self.Qo[idx], self.dQo[idx]
if self.R is not None:
self.R = self.R[idx]
if self.dR is not None:
self.dR = self.dR[idx]
self._set_calc(self.T, self.L)
[docs]
def critical_edge(self, substrate=None, surface=None, n=51, delta=0.25, clear_existing=False):
r"""
Oversample points near the critical edge.
To replace an existing critical edge region, set *clear_existing* to True.
The critical edge is defined by the difference in scattering
potential for the *substrate* and *surface* materials, or the
reverse if *back_reflectivity* is true.
*n* is the number of $Q$ points to compute near the critical edge.
*delta* is the relative uncertainty in the material density,
which defines the range of values which are calculated.
The $n$ points $Q_i$ are evenly distributed around the critical
edge in $Q_c \pm \delta Q_c$ by varying angle $\theta$ for a
fixed wavelength $< \lambda >$, the average of all wavelengths
in the probe.
Specifically:
.. math::
Q_c^2 &= 16 \pi (\rho - \rho_\text{incident}) \\
Q_i &= Q_c - \delta_i Q_c (i - (n-1)/2)
\qquad \text{for} \; i \in 0 \ldots n-1 \\
\lambda_i &= < \lambda > \\
\theta_i &= \sin^{-1}(Q_i \lambda_i / 4 \pi)
"""
if not clear_existing and self.oversampled_regions:
raise ValueError(
"Cannot add critical edge region when oversampled regions already exist. "
"Set clear_existing=True to clear existing regions."
)
Q_c = self.Q_c(substrate, surface)
region = OversampledRegion(Q_start=Q_c * (1 - delta), Q_end=Q_c * (1 + delta), n=n)
self.oversampled_regions = [region] # Clear existing regions if clear_existing is True
self._apply_oversamplings()
[docs]
def oversample(self, n=20, seed=1):
"""
Generate an over-sampling of Q to avoid aliasing effects.
Oversampling is needed for thick layers, in which the underlying
reflectivity oscillates so rapidly in Q that a single measurement
has contributions from multiple Kissig fringes.
Sampling will be done using a pseudo-random generator so that
accidental structure in the function does not contribute to the
aliasing. The generator will usually be initialized with a fixed
*seed* so that the point selection will not change from run to run,
but a *seed* of None will choose a different set of points each time
oversample is called.
The value *n* is the number of points that should contribute to
each Q value when computing the resolution. These will be
distributed about the nominal measurement value, but varying in
both angle and energy according to the resolution function. This
will yield more points near the measurement and fewer farther away.
The measurement point itself will not be used to avoid accidental
bias from uniform Q steps. Depending on the problem, a value of
*n* between 20 and 100 should lead to stable values for the convolved
reflectivity.
"""
self.oversampling = n
self.oversampling_seed = seed
self._apply_oversamplings()
def _apply_oversamplings(self):
T_parts, L_parts = [self.T], [self.L]
if self.oversampling is not None:
rng = np.random.RandomState(seed=self.oversampling_seed)
T = _get_normal_oversampling_points(self.T, self.dT, self.oversampling, rng)
L = _get_normal_oversampling_points(self.L, self.dL, self.oversampling, rng)
T_parts.append(T)
L_parts.append(L)
for region in self.oversampled_regions:
Q = np.linspace(region.Q_start, region.Q_end, region.n)
avg_L = np.average(self.L)
T_parts.append(QL2T(Q=Q, L=avg_L))
L_parts.append(np.ones_like(Q) * avg_L)
T = np.hstack(T_parts)
L = np.hstack(L_parts)
self._set_calc(T, L)
[docs]
class XrayProbe(Probe):
"""
X-Ray probe.
By providing a scattering factor calculator for X-ray scattering, model
components can be defined by mass density and chemical composition.
"""
radiation = "xray"
# TODO: remove density from interface since it is always 1.0
# Density is handled as a scale factor applied to the returned sld
# in material.ProbeCache. This allows us to fit the density without
# looking up the sld each time.
[docs]
def scattering_factors(self, material, density):
# doc string is inherited from parent (see below)
# TODO: support wavelength dependent systems
rho, irho = xsf.xray_sld(material, wavelength=self.unique_L[0], density=density)
return rho, irho, 0
scattering_factors.__doc__ = Probe.scattering_factors.__doc__
[docs]
class NeutronProbe(Probe):
"""
Neutron probe.
By providing a scattering factor calculator for X-ray scattering, model
components can be defined by mass density and chemical composition.
"""
radiation = "neutron"
# TODO: remove density from interface since it is always 1.0
# Density is handled as a scale factor applied to the returned sld
# in material.ProbeCache. This allows us to fit the density without
# looking up the sld each time.
[docs]
def scattering_factors(self, material, density):
# doc string is inherited from parent (see below)
# TODO: support wavelength dependent systems
rho, irho, rho_incoh = nsf.neutron_sld(material, wavelength=self.unique_L[0], density=density)
# print(f"{material}@{density} L={self.unique_L[0]} rho={rho}")
return rho, irho, rho_incoh
scattering_factors.__doc__ = Probe.scattering_factors.__doc__
[docs]
@dataclass(init=False)
class ProbeSet:
name: Optional[str]
probes: Sequence[Probe]
polarized = False
def __init__(self, probes, name=None):
self.probes = list(probes)
self.R = np.hstack([p.R for p in self.probes])
self.dR = np.hstack([p.dR for p in self.probes])
self.name = name if name is not None else self.probes[0].name
back_refls = [f.back_reflectivity for f in self.probes]
if all(back_refls) or not any(back_refls):
self.back_reflectivity = back_refls[0]
else:
raise ValueError("Cannot mix front and back reflectivity measurements")
[docs]
def parameters(self):
return [p.parameters() for p in self.probes]
parameters.__doc__ = Probe.parameters.__doc__
[docs]
def to_dict(self):
"""Return a dictionary representation of the parameters"""
return to_dict(
{
"type": type(self).__name__,
"name": self.name,
"probes": self.probes,
}
)
[docs]
def resynth_data(self):
for p in self.probes:
p.resynth_data()
self.R = np.hstack([p.R for p in self.probes])
resynth_data.__doc__ = Probe.resynth_data.__doc__
[docs]
def restore_data(self):
for p in self.probes:
p.restore_data()
self.R = np.hstack([p.R for p in self.probes])
restore_data.__doc__ = Probe.restore_data.__doc__
[docs]
def simulate_data(self, theory, noise=2.0):
"""
Simulate data, allowing for noise to be a dR array for each Q point.
"""
Q, R = theory
dR = np.asarray(noise)
offset = 0
for p in self.probes:
n = len(p.Q)
if len(self.dR.shape) > 0:
noise = dR[offset : offset + n]
p.simulate_data(theory=(Q[offset : offset + n], R[offset : offset + n]), noise=noise)
offset += n
simulate_data.__doc__ = Probe.simulate_data.__doc__
@property
def Q(self):
return np.hstack([p.Q for p in self.probes])
@property
def calc_Q(self):
return np.unique(np.hstack([p.calc_Q for p in self.probes]))
@property
def dQ(self):
return np.hstack([p.dQ for p in self.probes])
@property
def unique_L(self):
return np.unique(np.hstack([p.unique_L for p in self.probes]))
[docs]
def oversample(self, **kw):
for p in self.probes:
p.oversample(**kw)
oversample.__doc__ = Probe.oversample.__doc__
[docs]
def scattering_factors(self, material, density):
# TODO: support wavelength dependent systems
return self.probes[0].scattering_factors(material, density)
# result = [p.scattering_factors(material, density) for p in self.probes]
# return [np.hstack(v) for v in zip(*result)]
scattering_factors.__doc__ = Probe.scattering_factors.__doc__
[docs]
def apply_beam(self, calc_Q, calc_R, interpolation=0, **kw):
result = [p.apply_beam(calc_Q, calc_R, **kw) for p in self.probes]
Q, R = [np.hstack(v) for v in zip(*result)]
return Q, R
[docs]
def fresnel(self, *args, **kw):
return self.probes[0].fresnel(*args, **kw)
fresnel.__doc__ = Probe.fresnel.__doc__
[docs]
def save(self, filename, theory, substrate=None, surface=None):
for i, (p, th) in enumerate(self.parts(theory=theory)):
p.save(filename + str(i + 1), th, substrate=substrate, surface=surface)
save.__doc__ = Probe.save.__doc__
[docs]
def plot(self, theory=None, **kw):
for p, th in self.parts(theory):
p.plot(theory=th, **kw)
plot.__doc__ = Probe.plot.__doc__
[docs]
def plot_resolution(self, **kw):
for p in self.probes:
p.plot_resolution(**kw)
plot_resolution.__doc__ = Probe.plot_resolution.__doc__
[docs]
def plot_linear(self, theory=None, **kw):
for p, th in self.parts(theory):
p.plot_linear(theory=th, **kw)
plot_linear.__doc__ = Probe.plot_linear.__doc__
[docs]
def plot_log(self, theory=None, **kw):
for p, th in self.parts(theory):
p.plot_log(theory=th, **kw)
plot_log.__doc__ = Probe.plot_log.__doc__
[docs]
def plot_fresnel(self, theory=None, **kw):
for p, th in self.parts(theory):
p.plot_fresnel(theory=th, **kw)
plot_fresnel.__doc__ = Probe.plot_fresnel.__doc__
[docs]
def plot_logfresnel(self, theory=None, **kw):
for p, th in self.parts(theory):
p.plot_logfresnel(theory=th, **kw)
plot_logfresnel.__doc__ = Probe.plot_logfresnel.__doc__
[docs]
def plot_Q4(self, theory=None, **kw):
for p, th in self.parts(theory):
p.plot_Q4(theory=th, **kw)
plot_Q4.__doc__ = Probe.plot_Q4.__doc__
[docs]
def plot_residuals(self, theory=None, **kw):
for p, th in self.parts(theory):
p.plot_residuals(theory=th, **kw)
plot_residuals.__doc__ = Probe.plot_residuals.__doc__
[docs]
def parts(self, theory):
if theory is None:
for p in self.probes:
yield p, None
else:
offset = 0
Q, R = theory
for p in self.probes:
n = len(p.Q)
yield p, (Q[offset : offset + n], R[offset : offset + n])
offset += n
[docs]
def shared_beam(self, intensity=1, background=0, back_absorption=1, theta_offset=0, sample_broadening=0):
"""
Share beam parameters across all segments.
New parameters are created for *intensity*, *background*,
*theta_offset*, *sample_broadening* and *back_absorption*
and assigned to the all segments. These can be replaced
with an explicit parameter in an individual segment if that
parameter is independent.
"""
intensity = Parameter.default(intensity, name="intensity")
background = Parameter.default(background, name="background", limits=[0, None])
back_absorption = Parameter.default(back_absorption, name="back_absorption", limits=[0, 1])
theta_offset = Parameter.default(theta_offset, name="theta_offset")
sample_broadening = Parameter.default(sample_broadening, name="sample_broadening", limits=[None, None])
for p in self.probes:
p.intensity = intensity
p.background = background
p.back_absorption = back_absorption
p.theta_offset = theta_offset
p.sample_broadening = sample_broadening
[docs]
def stitch(self, same_Q=0.001, same_dQ=0.001):
r"""
Stitch together multiple datasets into a single dataset.
Points within *tol* of each other and with the same resolution
are combined by interpolating them to a common $Q$ value then averaged
using Gaussian error propagation.
:Returns: probe | Probe
Combined data set.
:Algorithm:
To interpolate a set of points to a common value, first find the
common $Q$ value:
.. math::
\hat Q = \sum{Q_k} / n
Then for each dataset $k$, find the interval $[i, i+1]$ containing the
value $Q$, and use it to compute interpolated value for $R$:
.. math::
w &= (\hat Q - Q_i)/(Q_{i+1} - Q_i) \\
\hat R &= w R_{i+1} + (1-w) R_{i+1} \\
\hat \sigma_{R} &=
\sqrt{ w^2 \sigma^2_{R_i} + (1-w)^2 \sigma^2_{R_{i+1}} } / n
Average the resulting $R$ using Gaussian error propagation:
.. math::
\hat R &= \sum{\hat R_k}/n \\
\hat \sigma_R &= \sqrt{\sum \hat \sigma_{R_k}^2}/n
"""
Q, dQ, R, dR = stitch(self.probes)
Po = self.probes[0]
return QProbe(
Q,
dQ,
R=R,
dR=dR,
intensity=Po.intensity,
background=Po.background,
back_absorption=Po.back_absorption,
back_reflectivity=Po.back_reflectivity,
resolution=Po.resolution,
)
[docs]
@dataclass(init=False)
class QProbe(BaseProbe):
"""
A pure Q, R probe
This probe with no possibility of tricks such as looking up the
scattering length density based on wavelength, or adjusting for
alignment errors.
"""
Q: "NDArray"
dQ: "NDArray"
name: Optional[str]
filename: Optional[str]
intensity: Parameter
back_absorption: Parameter
background: Parameter
back_reflectivity: bool
R: "NDArray"
dR: "NDArray"
resolution: Literal["normal", "uniform"]
oversampling: Optional[int]
oversampling_seed: int
oversampled_regions: List[OversampledRegion]
polarized = False
def __init__(
self,
Q,
dQ,
R=None,
dR=None,
data=None, # deprecated
name=None,
filename=None,
intensity=1,
background=0,
back_absorption=1,
back_reflectivity=False,
resolution: Literal["normal", "uniform"] = "normal",
oversampling: Optional[int] = None,
oversampling_seed: int = 1,
oversampled_regions: Optional[List[OversampledRegion]] = None,
):
if not name and filename:
name = os.path.splitext(os.path.basename(filename))[0]
qualifier = " " + name if name is not None else ""
self.intensity = Parameter.default(intensity, name="intensity" + qualifier)
self.background = Parameter.default(background, name="background" + qualifier, limits=[0, None])
self.back_absorption = Parameter.default(back_absorption, name="back_absorption" + qualifier, limits=[0, 1])
self.back_reflectivity = back_reflectivity
if R is None and dR is None and data is not None:
# deprecated way of loading R and dR
# TODO: remove this
warnings.warn("data argument is deprecated, use R and dR instead")
R, dR = data
self.Q = numpy.asarray(Q)
self.dQ = numpy.asarray(dQ)
self.R = R
self.dR = dR
self.unique_L = None
self._calc_Q = np.unique(self.Q)
self.name = name
self.filename = filename
self.resolution = resolution
self.oversampling = oversampling
self.oversampling_seed = oversampling_seed
self.oversampled_regions = oversampled_regions if oversampled_regions is not None else []
self._apply_oversamplings()
@property
def calc_Q(self):
return self._calc_Q if not self.back_reflectivity else -self._calc_Q
def _QdQ_key(self):
"""
Return a tuple that changes if the calculated Q or dQ values of this
probe object change.
"""
return ()
[docs]
def parameters(self):
return {
"intensity": self.intensity,
"background": self.background,
"back_absorption": self.back_absorption,
}
[docs]
def scattering_factors(self, material, density):
raise NotImplementedError(
"need radiation type and wavelength in <%s> to compute sld for %s" % (self.filename, material)
)
scattering_factors.__doc__ = Probe.scattering_factors.__doc__
def _apply_oversamplings(self):
self._calc_Q = _get_oversampled_values(
self.Q, self.dQ, self.oversampling, self.oversampling_seed, self.oversampled_regions
)
[docs]
def oversample(self, n=20, seed=1):
self.oversampling = n
self.oversampling_seed = seed
self._apply_oversamplings()
oversample.__doc__ = Probe.oversample.__doc__
[docs]
def critical_edge(self, substrate=None, surface=None, n=51, delta=0.25, clear_existing=False):
"""
Create a new OversampledRegion around the critical edge.
If any existing OversampledRegion exists and clear_existing is True,
self.oversampled_regions will be cleared before adding the new region,
else an error will be thrown.
"""
if not clear_existing and self.oversampled_regions:
raise ValueError(
"Cannot add critical edge region when oversampled regions already exist. "
"Set clear_existing=True to clear existing regions."
)
Q_c = self.Q_c(substrate, surface)
region = OversampledRegion(Q_start=Q_c * (1 - delta), Q_end=Q_c * (1 + delta), n=n)
self.oversampled_regions = [region] # Clear existing regions if clear_existing is True
self._apply_oversamplings()
critical_edge.__doc__ = Probe.critical_edge.__doc__
[docs]
def Qmeasurement_union(xs):
"""
Determine the unique Q, dQ across all datasets.
"""
Qset = set()
for x in xs:
if x is not None:
Qset |= set(zip(x.Q, x.dQ))
Q, dQ = [np.array(v) for v in zip(*sorted(Qset))]
return Q, dQ
optional_xs = Union[NeutronProbe, Literal[None]]
[docs]
@dataclass(init=False)
class PolarizedNeutronProbe:
"""
Polarized neutron probe
*xs* (4 x NeutronProbe) is a sequence pp, pm, mp and mm.
*Aguide* (degrees) is the angle of the applied field relative
to the plane of the sample, with angle 270º in the plane of the sample.
*H* (tesla) is the magnitude of the applied field
"""
name: str
mm: optional_xs = None
mp: optional_xs = None
pm: optional_xs = None
pp: optional_xs = None
H: Parameter
Aguide: Parameter
oversampling: Optional[int] = None
oversampling_seed: int = 1
oversampled_regions: List[OversampledRegion] = field(default_factory=list)
view = None # Default to Probe.view when None
show_resolution = None # Default to Probe.show_resolution when None
substrate = surface = None
polarized = True
_xs_names = ["mm", "mp", "pm", "pp"]
def __init__(
self,
xs: Optional[Tuple] = None,
mm: optional_xs = None,
mp: optional_xs = None,
pm: optional_xs = None,
pp: optional_xs = None,
name=None,
Aguide=BASE_GUIDE_ANGLE,
H=0,
oversampling=None,
oversampling_seed=1,
oversampled_regions: Optional[List[OversampledRegion]] = None,
):
if any([mm, mp, pm, pp]):
if xs is not None:
warnings.warn("a cross-section is directly specified - xs argument will be ignored")
self.mm = mm
self.mp = mp
self.pm = pm
self.pp = pp
else:
for index, xs_name in enumerate(self._xs_names):
setattr(self, xs_name, xs[index])
self._union_cache_key = None
if name is None and self.mm is not None:
name = self.mm.name
self.name = name
# self.T, self.dT, self.L, self.dL, self.Q, self.dQ \
# = measurement_union(xs)
# self._set_calc(self.T, self.L)
self._check()
spec = " " + name if name else ""
self.H = Parameter.default(H, name="H" + spec)
self.Aguide = Parameter.default(Aguide, name="Aguide" + spec, limits=[-360, 360])
self.oversampling = oversampling
self.oversampling_seed = oversampling_seed
self.oversampled_regions = oversampled_regions if oversampled_regions is not None else []
self._calculate_union()
@property
def xs(self) -> List[Union[NeutronProbe, None]]:
return [getattr(self, xs_name) for xs_name in self._xs_names]
[docs]
def parameters(self):
xs_pars = [(xsi.parameters() if xsi else None) for xsi in self.xs]
output = dict(zip(self._xs_names, xs_pars))
output["Aguide"] = self.Aguide
output["H"] = self.H
return output
[docs]
def resynth_data(self):
for p in self.xs:
if p is not None:
p.resynth_data()
resynth_data.__doc__ = Probe.resynth_data.__doc__
[docs]
def restore_data(self):
for p in self.xs:
if p is not None:
p.restore_data()
restore_data.__doc__ = Probe.restore_data.__doc__
[docs]
def simulate_data(self, theory, noise=2.0):
if noise is None or np.isscalar(noise):
noise = [noise] * 4
for data_k, theory_k, noise_k in zip(self.xs, theory, noise):
if data_k is not None:
data_k.simulate_data(theory=theory_k, noise=noise_k)
simulate_data.__doc__ = Probe.simulate_data.__doc__
def _check(self):
back_refls = [f.back_reflectivity for f in self.xs if f is not None]
if all(back_refls) or not any(back_refls):
self.back_reflectivity = back_refls[0]
else:
raise ValueError("Cannot mix front and back reflectivity measurements")
[docs]
def shared_beam(self, intensity=1, background=0, back_absorption=1, theta_offset=0, sample_broadening=0):
"""
Share beam parameters across all four cross sections.
New parameters are created for *intensity*, *background*,
*theta_offset*, *sample_broadening* and *back_absorption*
and assigned to the all cross sections. These can be replaced
with an explicit parameter in an individual cross section if that
parameter is independent.
"""
intensity = Parameter.default(intensity, name="intensity")
background = Parameter.default(background, name="background", limits=[0, None])
back_absorption = Parameter.default(back_absorption, name="back_absorption", limits=[0, 1])
theta_offset = Parameter.default(theta_offset, name="theta_offset")
sample_broadening = Parameter.default(sample_broadening, name="sample_broadening", limits=[None, None])
for x in self.xs:
if x is not None:
x.intensity = intensity
x.background = background
x.back_absorption = back_absorption
x.theta_offset = theta_offset
x.sample_broadening = sample_broadening
[docs]
def oversample(self, n=20, seed=1):
self.oversampling = n
self.oversampling_seed = seed
oversample.__doc__ = Probe.oversample.__doc__
def _calculate_union(self):
QdQ_keys = [x._QdQ_key() for x in self.xs if x is not None]
union_cache_key = (QdQ_keys, self.oversampling, self.oversampling_seed, tuple(self.oversampled_regions))
if self._union_cache_key == union_cache_key:
# no change in offsets or oversampling: use cached values of measurement union
return
else:
Q_union, dQ_union = Qmeasurement_union(self.xs)
if self.oversampling is not None or self.oversampled_regions:
calc_Q = _get_oversampled_values(
Q_union, dQ_union, self.oversampling, self.oversampling_seed, self.oversampled_regions
)
else:
calc_Q = Q_union
self.Q = Q_union
self.dQ = dQ_union
# only record the unique values of Q for calculation
# (np.unique will sort the values)
self._calc_Q = np.unique(calc_Q)
self._union_cache_key = union_cache_key
@property
def calc_Q(self):
# print('calculating calc_Q...')
self._calculate_union()
return self._calc_Q if not self.back_reflectivity else -self._calc_Q
[docs]
def apply_beam(self, Q, R, resolution=True, interpolation=0):
"""
Apply factors such as beam intensity, background, backabsorption,
and footprint to the data.
"""
return [(xs.apply_beam(Q, Ri, resolution, interpolation) if xs else None) for xs, Ri in zip(self.xs, R)]
[docs]
def fresnel(self, *args, **kw):
return self.pp.fresnel(*args, **kw)
fresnel.__doc__ = Probe.fresnel.__doc__
[docs]
def scattering_factors(self, material, density):
# doc string is inherited from parent (see below)
# grab a non-null cross-section to get a representative wavelength
first_probe = next(xs for xs in self.xs if xs is not None)
return first_probe.scattering_factors(material, density)
scattering_factors.__doc__ = Probe.scattering_factors.__doc__
[docs]
def select_corresponding(self, theory):
"""
Select theory points corresponding to the measured data.
Since we have evaluated theory at every Q, it is safe to interpolate
measured Q into theory, since it will land on a node,
not in an interval.
"""
Qth, Rth = theory
return [
None if x_data is None else (x_data.Q, np.interp(x_data.Q, Qth, x_th)) for x_data, x_th in zip(self.xs, Rth)
]
[docs]
def save(self, filename, theory, substrate=None, surface=None):
for xsi, xsi_th, suffix in zip(self.xs, theory, ("A", "B", "C", "D")):
if xsi is not None:
xsi.save(filename + suffix, xsi_th, substrate=substrate, surface=surface)
save.__doc__ = Probe.save.__doc__
[docs]
def plot(self, view=None, **kwargs):
"""
Plot theory against data.
Need substrate/surface for Fresnel-normalized reflectivity
"""
view = view if view is not None else self.view
if view is None:
view = Probe.view # Default to Probe.view
if view == "linear":
self.plot_linear(**kwargs)
elif view == "log":
self.plot_log(**kwargs)
elif view == "fresnel":
self.plot_fresnel(**kwargs)
elif view == "logfresnel":
self.plot_logfresnel(**kwargs)
elif view == "q4":
self.plot_Q4(**kwargs)
elif view.startswith("resid"):
self.plot_residuals(**kwargs)
elif view == "SA":
self.plot_SA(**kwargs)
elif view == "resolution":
self.plot_resolution(**kwargs)
else:
raise TypeError("incorrect reflectivity view '%s'" % view)
[docs]
def plot_resolution(self, **kwargs):
self._xs_plot("plot_resolution", **kwargs)
[docs]
def plot_linear(self, **kwargs):
self._xs_plot("plot_linear", **kwargs)
[docs]
def plot_log(self, **kwargs):
self._xs_plot("plot_log", **kwargs)
[docs]
def plot_fresnel(self, **kwargs):
self._xs_plot("plot_fresnel", **kwargs)
[docs]
def plot_logfresnel(self, **kwargs):
self._xs_plot("plot_logfresnel", **kwargs)
[docs]
def plot_Q4(self, **kwargs):
self._xs_plot("plot_Q4", **kwargs)
[docs]
def plot_residuals(self, **kwargs):
self._xs_plot("plot_residuals", **kwargs)
[docs]
def plot_SA(self, theory=None, label=None, plot_shift=None, **kwargs):
import matplotlib.pyplot as plt
if self.pp is None or self.mm is None:
raise TypeError("cannot form spin asymmetry plot without ++ and --")
plot_shift = plot_shift if plot_shift is not None else Probe.plot_shift
trans = auto_shift(plot_shift)
pp, mm = self.pp, self.mm
c = coordinated_colors()
if hasattr(pp, "R") and hasattr(mm, "R") and pp.R is not None and mm.R is not None:
Q, SA, dSA = spin_asymmetry(pp.Q, pp.R, pp.dR, mm.Q, mm.R, mm.dR)
if dSA is not None:
res = self.show_resolution if self.show_resolution is not None else Probe.show_resolution
dQ = pp.dQ if res else None
plt.errorbar(
Q,
SA,
yerr=dSA,
xerr=dQ,
fmt=".",
capsize=0,
label=pp.label(prefix=label, gloss="data"),
transform=trans,
color=c["light"],
)
else:
plt.plot(Q, SA, ".", label=pp.label(prefix=label, gloss="data"), transform=trans, color=c["light"])
# Set limits based on max theoretical SA, which is in (-1.0, 1.0)
# If the error bars are bigger than that, you usually don't care.
ylim_low, ylim_high = plt.ylim()
plt.ylim(max(ylim_low, -2.5), min(ylim_high, 2.5))
if theory is not None:
mm, mp, pm, pp = theory
Q, SA, _ = spin_asymmetry(pp[0], pp[1], None, mm[0], mm[1], None)
plt.plot(Q, SA, label=self.pp.label(prefix=label, gloss="theory"), transform=trans, color=c["dark"])
plt.xlabel(r"Q (\AA^{-1})")
plt.ylabel(r"spin asymmetry $(R^{++} -\, R^{--}) / (R^{++} +\, R^{--})$")
plt.legend(numpoints=1)
def _xs_plot(self, plotter, theory=None, **kwargs):
# Plot available cross sections
if theory is None:
theory = (None, None, None, None)
for x_data, x_th, suffix in zip(self.xs, theory, ("$^{--}$", "$^{-+}$", "$^{+-}$", "$^{++}$")):
if x_data is not None:
fn = getattr(x_data, plotter)
fn(theory=x_th, suffix=suffix, **kwargs)
[docs]
def spin_asymmetry(Qp, Rp, dRp, Qm, Rm, dRm):
r"""
Compute spin asymmetry for R++, R--.
**Parameters:**
*Qp*, *Rp*, *dRp* : vector
Measured ++ cross section and uncertainty.
*Qm*, *Rm*, *dRm* : vector
Measured -- cross section and uncertainty.
If *dRp*, *dRm* are None then the returned uncertainty will also be None.
**Returns:**
*Q*, *SA*, *dSA* : vector
Computed spin asymmetry and uncertainty.
**Algorithm:**
Spin asymmetry, $S_A$, is:
.. math::
S_A = \frac{R_{++} - R_{--}}{R_{++} + R_{--}}
Uncertainty $\Delta S_A$ follows from propagation of error:
.. math::
\Delta S_A^2 = \frac{4(R_{++}^2\Delta R_{--}^2+R_{--}^2\Delta R_{++})}
{(R_{++} + R_{--})^4}
"""
Rm = np.interp(Qp, Qm, Rm)
v = (Rp - Rm) / (Rp + Rm)
if dRp is not None:
dRm = np.interp(Qp, Qm, dRm)
dvsq = 4 * ((Rp * dRm) ** 2 + (Rm * dRp) ** 2) / (Rp + Rm) ** 4
dvsq[dvsq < 0] = 0
return Qp, v, sqrt(dvsq)
else:
return Qp, v, None
def _interpolate_Q(Q, dQ, n):
"""
Helper function to interpolate between data points.
*n* is the number of points to show between existing points.
"""
if n > 0:
# Extend the Q-range by 1/2 interval on either side
Q = np.hstack((0.5 * (3.0 * Q[0] - Q[1]), Q, 0.5 * (3.0 * Q[-1] - Q[-2])))
dQ = np.hstack((0.5 * (3.0 * dQ[0] - dQ[1]), dQ, 0.5 * (3.0 * dQ[-1] - dQ[-2])))
index = np.arange(0, len(Q), dtype="d")
subindex = np.arange(0, (n + 1) * (len(Q) - 1) + 1, dtype="d") / (n + 1.0)
Q = np.interp(subindex, index, Q)
dQ = np.interp(subindex, index, dQ)
return Q, dQ
def _get_oversampled_values(V_in, dV_in, oversampling, oversampling_seed, oversampled_regions: List[OversampledRegion]):
V_parts = [V_in]
if oversampling is not None:
rng = numpy.random.RandomState(seed=oversampling_seed)
V_parts.append(_get_normal_oversampling_points(V_in, dV_in, oversampling, rng))
for region in oversampled_regions:
V_parts.append(np.linspace(region.Q_start, region.Q_end, region.n))
V = np.hstack(V_parts)
return np.unique(V) # remove duplicates and sort
def _get_normal_oversampling_points(V_in, dV_in, n, rng):
# TODO: allow extending the range of V_in to support resolution
# prepend values out to -3*dV_in and +3*dV_in
#
# append_points = np.arange(1.0, 4.0)
# prepend_points = -append_points[::-1]
# V = np.concatenate((prepend_points * dV_in[0] + V_in[0], V_in, append_points * dV_in[-1] + V_in[-1]))
# dV = np.concatenate((np.full(3, dV_in[0]), dV_in, np.full(3, dV_in[-1])))
extra = rng.normal(V_in[:, None], dV_in[:, None], size=(len(V_in), n - 1))
return extra.flatten()
[docs]
@dataclass(init=False)
class PolarizedQProbe(PolarizedNeutronProbe):
polarized = True
def __init__(
self,
xs: Optional[Tuple] = None,
mm: optional_xs = None,
mp: optional_xs = None,
pm: optional_xs = None,
pp: optional_xs = None,
name=None,
Aguide=BASE_GUIDE_ANGLE,
H=0,
oversampling: Optional[int] = None,
oversampling_seed: int = 1,
oversampled_regions: Optional[List[OversampledRegion]] = None,
):
if any([mm, mp, pm, pp]):
if xs is not None:
warnings.warn("a cross-section is directly specified - xs argument will be ignored")
self.mm = mm
self.mp = mp
self.pm = pm
self.pp = pp
else:
for index, xs_name in enumerate(self._xs_names):
setattr(self, xs_name, xs[index])
self._check()
self.name = name if name is not None else self.pp.name
self.unique_L = None
qualifier = " " + self.name if self.name is not None else ""
self.Aguide = Parameter.default(Aguide, name="Aguide" + qualifier, limits=[-360, 360])
self.H = Parameter.default(H, name="H" + qualifier)
self.oversampling = oversampling
self.oversampling_seed = oversampling_seed
self.oversampled_regions = oversampled_regions if oversampled_regions is not None else []
# this sets self.Q, self.dQ, self._calc_Q:
self._union_cache_key = None
self._calculate_union()
@property
def calc_Q(self):
return self._calc_Q if not self.back_reflectivity else -self._calc_Q
# Deprecated old long name
PolarizedNeutronQProbe = PolarizedQProbe