Source code for refl1d.lib_numba.convolve
import numba
from math import erf, sqrt, exp
PI4 = 12.56637061435917295385
PI_180 = 0.01745329251994329576
LN256 = 5.54517744447956247533
SQRT2 = 1.41421356237309504880
SQRT2PI = 2.50662827463100050241
LOG_RESLIMIT = -6.90775527898213703123
root_12_over_2 = sqrt(3)
[docs]
@numba.njit('(f8[:], f8[:], f8[:], f8[:], f8[:])', cache=True, parallel=False)
def convolve_uniform(xi, yi, x, dx, y):
left_index = 0
N_xi = len(xi)
N_x = len(x)
for k in numba.prange(N_x):
x_k = x[k]
# Convert 1-sigma width to 1/2 width of the region
limit = dx[k] * root_12_over_2
# print(f"point {x_k} +/- {limit}")
# Find integration limits, bound by the range of the data
left, right = max(x_k - limit, xi[0]), min(x_k + limit, xi[-1])
if right < left:
# Convolution does not overlap data range.
y[k] = 0.
continue
# Find the starting point for the convolution by first scanning
# forward until we reach the next point greater than the limit
# (we might already be there if the next output point has wider
# resolution than the current point), then scanning backwards to
# get to the last point before the limit. Make sure we have at
# least one interval so that we don't have to check edge cases
# later.
while left_index < N_xi-2 and xi[left_index] < left:
left_index += 1
while left_index > 0 and xi[left_index] > left:
left_index -= 1
# Set the first interval.
total = 0.
right_index = left_index + 1
x1, y1 = xi[left_index], yi[left_index]
x2, y2 = xi[right_index], yi[right_index]
# Subtract the excess from left interval before the left edge.
# print(f" left {left} in {(x1, y1)}, {(x2, y2)}")
if x1 < left:
# Subtract the area of the rectangle from (x1, 0) to (left, y1)
# plus 1/2 the rectangle from (x1, y1) to (left, y'),
# where y' is y value where the line (x1, y1) to (x2, y2)
# intersects x=left. This can be computed as follows:
# offset = left - x1
# slope = (y2 - y1)/(x2 - x1)
# yleft = y1 + slope*offset
# area = offset * y1 + offset * (yleft-y1)/2
# It can be simplified to the following:
# area = offset * (y1 + slope*offset/2)
offset = left - x1
slope = (y2 - y1)/(x2 - x1)
area = offset * (y1 + 0.5*slope*offset)
total -= area
# print(f" left correction {area}")
# Do trapezoidal integration up to and including the end interval
while right_index < N_xi-1 and x2 < right:
# Add the current interval if it isn't empty
if x1 != x2:
area = 0.5*(y1 + y2)*(x2 - x1)
total += area
# print(f" adding {(x1,y1)}, {(x2, y2)} as {area}")
# Move to the next interval
right_index += 1
x1, y1, x2, y2 = x2, y2, xi[right_index], yi[right_index]
if x1 != x2:
area = 0.5*(y1 + y2)*(x2 - x1)
total += area
# print(f" adding final {(x1,y1)}, {(x2, y2)} as {area}")
# Subtract the excess from the right interval after the right edge.
# print(f" right {right} in {(x1, y1)}, {(x2, y2)}")
if x2 > right:
# Expression for area to subtract using rectangles is as follows:
# offset = x2 - right
# slope = (y2 - y1)/(x2 - x1)
# yright = y2 - slope*offset
# area = -(offset * yright + offset * (y2-yright)/2)
# It can be simplified to the following:
# area = -offset * (y2 - slope*offset/2)
offset = x2 - right
slope = (y2 - y1)/(x2 - x1)
area = offset * (y2 - 0.5*slope*offset)
total -= area
# print(f" right correction {area}")
# Normalize by interval length
if left < right:
# print(f" normalize by length {right} - {left}")
y[k] = total / (right - left)
elif x1 < x2:
# If dx = 0 using the value interpolated at x (with left=right=x).
# print(f" dirac delta at {left} = {right} in {(x1, y1)}, {(x2, y2)}")
offset = left - x1
slope = (y2 - y1)/(x2 - x1)
y[k] = y1 + slope*offset
else:
# At an empty interval in the theory function. Average the y.
# print(f" empty interval with {left} = {right} in {(x1, y1)}, {(x2, y2)}")
y[k] = 0.5*(y1 + y2)
@numba.njit('f8(f8[:], f8[:], i8, i8, f8, f8, f8)', cache=True, parallel=False, locals={
"z": numba.float64,
"Glo": numba.float64,
"erflo": numba.float64,
"erfmin": numba.float64,
"y": numba.float64,
"zhi": numba.float64,
"Ghi": numba.float64,
"erfhi": numba.float64,
"m": numba.float64,
"b": numba.float64,
})
def convolve_gaussian_point(xin, yin, k, n,
xo, limit, sigma):
two_sigma_sq = 2. * sigma * sigma
# double z, Glo, erflo, erfmin, y
z = xo - xin[k]
Glo = exp(-z*z/two_sigma_sq)
erfmin = erflo = erf(-z/(SQRT2*sigma))
y = 0.
# /* printf("%5.3f: (%5.3f,%11.5g)",xo,xin[k],yin[k]); */
while (k < n-1):
k += 1
if (xin[k] != xin[k-1]):
# /* No additional contribution from duplicate points. */
# /* Compute the next endpoint */
zhi = xo - xin[k]
Ghi = exp(-zhi*zhi/two_sigma_sq)
erfhi = erf(-zhi/(SQRT2*sigma))
m = (yin[k]-yin[k-1])/(xin[k]-xin[k-1])
b = yin[k] - m * xin[k]
# /* Add the integrals. */
y += 0.5*(m*xo+b)*(erfhi-erflo) - sigma/SQRT2PI*m*(Ghi-Glo)
# /* Debug computation failures. */
# if isnan(y) {
# print("NaN from %d: zhi=%g, Ghi=%g, erfhi=%g, m=%g, b=%g\n",
# % (k,zhi,Ghi,erfhi,m,b))
# }
# /* Save the endpoint for next trapezoid. */
Glo = Ghi
erflo = erfhi
# /* Check if we've calculated far enough */
if (xin[k] >= xo+limit):
break
# /* printf(" (%5.3f,%11.5g)",xin[k<n?k:n-1],yin[k<n?k:n-1]); */
# /* Normalize by the area of the truncated gaussian */
# /* At this point erflo = erfmax */
# /* printf ("---> %11.5g\n",2*y/(erflo-erfmin)); */
return 2 * y / (erflo - erfmin)
# has same performance when using guvectorize instead of njit:
# @numba.guvectorize("(i8, f8[:], f8[:], i8, f8[:], f8[:], f8[:])", '(),(m),(m),(),(n),(n)->(n)')
[docs]
@numba.njit("(f8[:], f8[:], f8[:], f8[:], f8[:])", cache=True, parallel=False, locals={
"sigma": numba.float64,
"xo": numba.float64,
"limit": numba.float64,
"k_in": numba.int64,
"k_out": numba.int64,
})
def convolve_gaussian(xin, yin, x, dx, y):
# size_t in,out;
Nin = len(xin)
Nout = len(x)
# /* FIXME fails if xin are not sorted; slow if x not sorted */
# assert(Nin>1)
# /* Scan through all x values to be calculated */
# /* Re: omp, each thread is going through the entire input array,
# * independently, computing the resolution from the neighbourhood
# * around its individual output points. The firstprivate(in)
# * clause sets each thread to keep its own copy of in, initialized
# * at in's initial value of zero. The "schedule(static,1)" clause
# * puts neighbouring points in separate threads, which is a benefit
# * since there will be less backtracking if resolution width increases
# * from point to point. Because the schedule is static, this does not
# * significantly increase the parallelization overhead. Because the
# * threads are operating on interleaved points, there should be fewer cache
# * misses than if each thread were given different stretches of x to
# * convolve.
# */
k_in = 0
for k_out in range(Nout):
# /* width of resolution window for x is w = 2 dx^2. */
sigma = dx[k_out]
xo = x[k_out]
limit = sqrt(-2.*sigma*sigma * LOG_RESLIMIT)
# // if (out%20==0)
# /* Line up the left edge of the convolution window */
# /* It is probably forward from the current position, */
# /* but if the next dx is a lot higher than the current */
# /* dx or if the x are not sorted, then it may be before */
# /* the current position. */
# /* FIXME verify that the convolution window is just right */
while (k_in < Nin-1 and xin[k_in] < xo-limit):
k_in += 1
while (k_in > 0 and xin[k_in] > xo-limit):
k_in -= 1
# /* Special handling to avoid 0/0 for w=0. */
if (sigma > 0.):
y[k_out] = convolve_gaussian_point(
xin, yin, k_in, Nin, xo, limit, sigma)
elif (k_in < Nin-1):
# /* Linear interpolation */
m = (yin[k_in+1]-yin[k_in])/(xin[k_in+1]-xin[k_in])
b = yin[k_in] - m*xin[k_in]
y[k_out] = m*xo + b
elif (k_in > 0):
# /* Linear extrapolation */
m = (yin[k_in]-yin[k_in-1])/(xin[k_in]-xin[k_in-1])
b = yin[k_in] - m*xin[k_in]
y[k_out] = m*xo + b
else:
# /* Can't happen because there is more than one point in xin. */
# assert(Nin>1)
pass