The code below (apologies for complexity) incorporates a random distribution on matrices defined using DensityDist. The matrices represent ways of transforming a set of n ordered objects into n singletons via a sequence of n-1 refinements. Hence the entries to matrices are positive integers and the sums of the rows are always n as the rows represent ordered partitions. The random method for the DensityDist construction is given by the function random_mx(). In the code below, this function is first tested using the stateless DensityDist.dist distribution, which gives correct results. However, when I run it from within a model context using the run_true_MCMC() function, the matrices generated are clearly incorrect (they have non-integer and negative entries, leaving aside the test values). Is this because the function random_mx() is not written in then?
import numpy as np
from pymc3 import *
from pymc3.distributions.multivariate import Dirichlet, Multinomial
from pymc3.distributions.dist_math import bound, logpow, factln
from pymc3.distributions.special import gammaln
import theano.tensor as tt
from theano import config
import warnings
import re
from scipy.special import binom
def random_mx():
"""Function to generate random matrix values."""
def replace_zero(s, j):
"""Replace jth 0 in string s with a 1. First character is given by j=0 etc."""
count = 0
for i in range(len(s)):
if s[i] == '0':
count += 1
if count == j + 1:
return (s[:i] + '1' + s[i + 1:])
def make_row(s):
"""Convert string of zeros (+) and ones (,) to matrix row, i.e. counting
partitions by size."""
c = re.split('1', s)
return [len(x) + 1 for x in c]
s = '0' * (n - 1)
result = list()
for i in range(n - 1):
num0 = n - 1 - i
j = np.random.choice(num0)
s = replace_zero(s, j)
row = make_row(s)
row = np.pad(row, (0, n - len(row)), mode='constant', constant_values=0)
result.append(row)
mx = np.array(result, dtype=np.int32)
return mx
def mx_logpdf():
return 0.0
def get_testval(n):
"""Get matrix testval."""
m1 = np.ones((n-1,n-1), dtype=np.int32)
m2 = np.tril(m1)
m3 = np.arange(1, n)
m4 = m3.reshape(-1,1)
m5 = np.flipud(m4)
return np.concatenate((m5, m2), axis=1)
def run_true_MCMC(sfs, prior, mut_rate, draws):
"""Define and run MCMC model for coalescent tree branch lengths."""
n = len(sfs) + 1
j_n = np.diag(1 / np.arange(2, n + 1))
sfs = np.array(sfs)
seg_sites = sum(sfs)
testval = get_testval(n)
with Model() as model:
def mx_logpdf(value=testval):
return 0.0
tmx = DensityDist('tmx', mx_logpdf, random=random_mx, shape=(n-1,n), testval=testval)
probs = Dirichlet('probs', prior, testval=prior)
tmx_print = tt.printing.Print('tmx')(tmx)
result = list()
for rowix in range(n - 1):
orp = tmx_print[rowix]
orp2 = tt.cast(orp, 'int32')
row = tt.bincount(orp2, minlength=n+1)
row = row[1:-1]
result.append(row)
mx = tt.stack(result, axis=0)
mx = tt.reshape(mx, (n-1,n-1))
mx = tt.transpose(mx)
jmx = tt.dot(mx, j_n)
conditional_probs = tt.dot(jmx, probs.T)
sfs_obs = Multinomial('sfs_obs', n=seg_sites, p=conditional_probs, observed=sfs)
with model:
step = Metropolis([probs, tmx])
trace = sample(5, step=step, tune=0, chains=1, progressbar=False)
print(summary(trace, varnames=['probs']))
return trace
sfs = [36, 4, 32, 0, 13, 0, 0]
n = len(sfs) + 1
#Generate a random matrix using DensityDist.dist. This gives a matrix of the desired form.
tmx = DensityDist.dist(mx_logpdf, random=random_mx)
print('Matrix generated by DensityDist.dist\n', tmx.random(), '\n')
#Now run the MCMC algorithm using DensityDist. The matrices now generated (excluding testval)
#do not have the correct form.
draws = 50000
mutation_rate = 5e-9
length = 1000
seq_mutation_rate = mutation_rate * length
prior = np.ones(n - 1) / (n - 1)
trace = run_true_MCMC(sfs, prior, seq_mutation_rate, draws)