Here’s what I came up with:
def gp_1d2d(name, x, approx_args=None, reparameterize=True):
"""
Compute a 1D-input/2D-output GP
Arguments:
name (str): base name for variables
x (array): 1D input array of shape (n_x,)
approx_args (dict, optional): Approximation arguments for HSGP, if None, use Latent GP
reparameterize (bool, optional): If True, reparameterize the GP using a latent variable. Defaults to True.
Returns:
tensor: A 2D tensor representing the latent function evaluated at the input x, reshaped to (n_x, 2).
"""
n_x = x.shape[0]
Xall = np.vstack([
np.column_stack([x, np.zeros_like(x)]), # output 0
np.column_stack([x, np.ones_like(x)]) # output 1
])
lengthscale = pm.Weibull(f"{name}_lengthscale", alpha=2, beta=1)
K_input = pm.gp.cov.ExpQuad(input_dim=2, ls=lengthscale)
r = pm.Uniform(f"{name}_r", lower=0, upper=1) # NOTE: for my particular problem, I know the correlation is positive; you likely need to change this to a lower=-1 for less constrained scenarios
coreg = pm.gp.cov.Coregion(
input_dim = 2
, B = pt.stacklists([ [1.0, r], [r,1.0] ])
, active_dims = [1]
)
multi_kernel = K_input * coreg
if approx_args is None:
gp_obj = pm.gp.Latent(cov_func=multi_kernel)
else:
gp_obj = pm.gp.HSGP(
m = [approx_args['M']] # basis functions in the 1D input
, c = approx_args['c'] # boundary multiplier
, cov_func = multi_kernel
, drop_first = approx_args.get('drop_first', False)
)
f_all = gp_obj.prior(
f"{name}_f"
, X = Xall
, reparameterize = reparameterize
)
f_2d = f_all.reshape((2, n_x)).T
return shift_and_scale_z_2d(name=name + "_f", z_2d=f_2d)
Where the function called at the end is below, but reflects an apparently idiosyncratic preference on my part to do GPs on the unit scale without intercepts and shift/scale them later, so maybe you’d want to :
def shift_and_scale_z_2d(name,z_2d):
"""
Shift and scale a two-dimensional latent variable using amplitude and intercept transformations.
Arguments:
name (str): Base name for variables
z_2d (tensor): Input 2D latent variable.
Returns:
tensor: The shifted and scaled two-dimensional latent function.
"""
# Get the number of observations from the first dimension of z_2d
n_x = z_2d.shape[0]
# Define priors for the amplitude with a mean and a half-normal difference to induce mild information sharing across dimensions
amp_mean = pm.Normal(name=name+"_amp_mean", mu=0, sigma=1.0)
amp_diff = pm.HalfNormal(name=name+"_amp_diff", sigma=1.0)
# Compute one-dimensional amplitude ensuring positivity via an exponential transformation
amp_1d = pm.math.sqrt( pm.math.exp( pm.math.stack([amp_mean - amp_diff, amp_mean + amp_diff], axis=0) ) )
# Reshape amplitude to a 2D tensor for broadcasting across observations
amp_2d = amp_1d.reshape((1,2))
# Repeat the amplitude for each observation
amp_2d_repeated = pt.repeat(amp_2d, n_x, axis=0)
# Define priors for the intercept with a similar mean and half-normal difference parameterization
intercept_mean = pm.Normal(name=name+"_intercept_mean", mu=0, sigma=1.0)
intercept_diff = pm.HalfNormal(name=name+"_intercept_diff", sigma=1.0)
# Compute one-dimensional intercept values for both dimensions
intercept_1d = pm.math.stack([intercept_mean - intercept_diff, intercept_mean + intercept_diff], axis=0)
# Reshape and repeat the intercept for all observations
intercept_2d = intercept_1d.reshape((1,2))
intercept_2d_repeated = pt.repeat(intercept_2d, n_x, axis=0)
# Apply the amplitude scaling and intercept shifting to the latent variable
f_scaled_and_shifted = (
z_2d
* amp_2d_repeated
+ intercept_2d_repeated
)
# Return the shifted and scaled latent function
return f_scaled_and_shifted