Thompson sampling example

Hi, I was looking for an example of Thompson sampling in PYMC but couldn’t find one. So I put a simple example together. I am very new Thompson sampling and was hoping I could get another pair of eyeballs on my example. I think it would be nice to have an example of this in PYMC.

Here is my implementation of a simple two-arm bandits using conjugate priors beta-binomial. Please share any suggestions to improve the example.

``````import arviz as az
import numpy as np
import pymc as pm
import pytensor.tensor as pt
print(f"Running on PyMC v{pm.__version__}")

class ThompsonAgent:
def __init__(self, true_beta0: float, true_beta1: float, beta0_posterior: float = 0.0, beta1_posterior: float = 0.0):
self.trials = dict(
arm1 = dict(
success = [],
failure = [],
trials = []
),
arm2 = dict(
success = [],
failure = [],
trials = []
)
)
self.true_beta0 = true_beta0
self.true_beta1 = true_beta1
self.beta0_posterior = beta0_posterior
self.beta1_posterior = beta1_posterior

def pull_arms(self, num_pulls: int, beta0: float, beta1: float) -> None:
# arm pull
if beta0 > beta1:
arm1 = pm.Binomial.dist(n = num_pulls, p = beta0).eval()
arm2 = num_pulls - arm1
elif beta0 < beta1:
arm2 = pm.Binomial.dist(n = num_pulls, p = beta1).eval()
arm1 = num_pulls - arm2
else:
arm1 = np.array([int(num_pulls / 2)]).reshape(-1, 1)
arm2 = num_pulls - arm1

# observed rewards
Y_0 = pm.Binomial.dist(n = arm1, p = self.true_beta0).eval()
Y_1 = pm.Binomial.dist(n = arm2, p = self.true_beta1).eval()
self.trials['arm1']['success'].append(Y_0)
self.trials['arm1']['failure'].append(arm1 - Y_0)
self.trials['arm1']['trials'].append(arm1)

self.trials['arm2']['success'].append(Y_1)
self.trials['arm2']['failure'].append(arm2 - Y_1)
self.trials['arm2']['trials'].append(arm2)

def init_model(self, num_pulls: int) -> None:
self.pull_arms(num_pulls = num_pulls, beta0 = self.beta0_posterior, beta1 = self.beta1_posterior)
with pm.Model() as basic_model:
# Data Containers
beta0_success = pm.MutableData(name = 'beta0_success', value=np.sum(self.trials['arm1']['success']))
beta1_success = pm.MutableData(name = 'beta1_success', value=np.sum(self.trials['arm2']['success']))

beta0_failure = pm.MutableData(name = 'beta0_failure', value=np.sum(self.trials['arm1']['failure']))
beta1_failure = pm.MutableData(name = 'beta1_failure', value=np.sum(self.trials['arm2']['failure']))

Y0_trials = pm.MutableData(name = 'Y0_trials', value=np.sum(self.trials['arm1']['trials']))
Y1_trials = pm.MutableData(name = 'Y1_trials', value=np.sum(self.trials['arm2']['trials']))

# Priors for unknown model parameters
beta0 = pm.Beta("beta0", alpha = 1 + beta0_success, beta = 1 + beta0_failure)
beta1 = pm.Beta("beta1", alpha = 1 + beta1_success, beta = 1 + beta1_failure)

# Likelihood
Y0_obs = pm.Binomial("Y0_obs", n = Y0_trials, p = beta0, observed=beta0_success)
Y1_obs = pm.Binomial("Y1_obs", n = Y1_trials, p = beta1, observed=beta1_success)

# draw a posterior sample from each arm
trace = pm.sample(draws=1, chains=1, progressbar = False)
# Get arm samples
self.beta0_posterior = trace['posterior']['beta0'].values
self.beta1_posterior = trace['posterior']['beta1'].values

def update_priors(self, num_trials: int, num_pulls: int) -> None:
for _ in range(num_trials):
self.pull_arms(num_pulls = num_pulls, beta0 = beta0_posterior, beta1 = beta1_posterior)
with basic_model:
# Update data with posterior
beta0_success = pm.set_data({'beta0_success': np.sum(trials['arm1']['success'])})
beta1_success = pm.set_data({'beta1_success': np.sum(trials['arm2']['success'])})

beta0_failure = pm.set_data({'beta0_failure': np.sum(trials['arm1']['failure'])})
beta1_failure = pm.set_data({'beta1_failure': np.sum(trials['arm2']['failure'])})

Y0_trials = pm.set_data({'Y0_trials': np.sum(trials['arm1']['trials'])})
Y1_trials = pm.set_data({'Y1_trials': np.sum(trials['arm2']['trials'])})
trace = pm.sample(draws=1, chains=1, progressbar = False)
self.beta0_posterior = trace['posterior']['beta0'].values
self.beta1_posterior = trace['posterior']['beta1'].values
``````
``````agent = ThompsonAgent(true_beta0 = 0.4, true_beta1 = 0.7)
agent.init_model(num_pulls = 100)
``````
``````agent.update_priors(num_trials = 10, num_pulls = 100)
``````
``````agent.beta0_posterior
``````
``````agent.beta1_posterior
``````

This is a really cool problem! I love RL, and it would be great to see some work on agents in PyMC.

I think one place to start would be to divide your agent and your environment. The best practice is to create a `gym` environment, so I started there. I found this repo that had a k-armed bandit environment a la Sutton and Barto chapter 2, which I adapted to the win/lose setup:

``````import pymc as pm
import numpy as np
import gym

import pytensor
import pytensor.tensor as pt

from scipy.special import expit

from typing import Optional
import matplotlib.pyplot as plt

class MultiarmedBanditsEnv(gym.Env):
"""Environment for multiarmed bandits"""

def __init__(self, render_mode: Optional[str]=None, nr_arms=10):
self.action_space = gym.spaces.Discrete(nr_arms)
self.observation_space = gym.spaces.Discrete(1)
self.state = 0

def step(self, action : int):
assert self.action_space.contains(
action
), f"{action!r} ({type(action)}) invalid"

reward = (self.np_random.uniform(0, 1) < self.p[action]) * 1.0

return self.state, reward.item(), False, False, {'optimal' : self.optimal}

def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
super().reset(seed=seed)

self.p = expit(self.np_random.standard_normal(self.action_space.n) - 1)
self.optimal = np.argmax(self.p)

return self.state, {'optimal' : self.optimal}

def render(self, mode='human', close=False):
return "You are playing a %d-armed bandit" % self.action_space.n
``````

With that in hand, let’s look at the agent. `pm.sample` is actually used for MCMC sampling, which you’re not doing here. There is a different function, `pm.draw`, which can be used to get sample from an arbitrary PyMC distribution. You will need that to choose actions, but you don’t need it for updating parameters in this case, since you’re using a conjugate model. Instead, we just need to update the parameters according to the posterior formula.

For tracking the parameter values, I’m going to use `pytensor.shared` variables instead of dictionaries. These are special pytensor variables that, unlike symbolic tensors, can be be updated in real time. So the workflow will be:

1. Sample action probabilities from beta distributions using the prior parameters
2. Choose the best action by taking the argmax over those probabilities
3. Perform the action, receive a reward signal from the environment
4. Update the parameters using the conjugate formula based on the action and reward

Here is my agent class:

``````class ConjugateAgent:

def __init__(self, n_actions, initial_alphas=None, initial_betas=None):
self.n_actions = n_actions

if initial_alphas is None:
initial_alphas = np.ones(n_actions)
elif isinstance(initial_alphas, (int, float)):
initial_alphas = np.ones(n_actions) * initial_alphas
if initial_betas is None:
initial_betas = np.ones(n_actions)
elif isinstance(initial_betas, (int, float)):
initial_betas = np.ones(n_actions) * initial_betas

self.alphas = pytensor.shared(initial_alphas, name='alphas')
self.betas = pytensor.shared(initial_betas, name='betas')
self.n_pulls = pytensor.shared(np.zeros(n_actions), name='n_pulls')

# This machinery is to make the agent sample actions faster than using pm.draw
rng = pytensor.shared(np.random.default_rng())
n_draws = pt.iscalar('n_draws')

new_rng, action_distribution = pm.Beta.dist(alpha=self.alphas,
beta=self.betas,
shape=(n_draws, n_actions),
rng=rng).owner.outputs

def choose_action(self, n_pulls=1):
action_probs = self.f_action(n_pulls)
return np.argmax(action_probs, axis=-1).squeeze()

def learn(self, action, reward):
current_alpha = self.alphas.get_value()
current_beta = self.betas.get_value()
current_n_pulls = self.n_pulls.get_value()

current_n_pulls[action] += 1
current_alpha[action] += reward
current_beta[action] += 1 - reward

self.alphas.set_value(current_alpha)
self.betas.set_value(current_beta)
self.n_pulls.set_value(current_n_pulls)
``````

Sutton and Barto use games of 1000 timesteps to evaluate their agents, so we can do the same:

``````env = MultiarmedBanditsEnv()
agent = ConjugateAgent(env.action_space.n, initial_alphas=1, initial_betas=1)

state, info = env.reset()
game_length = 1000
reward_history = np.zeros(game_length)
for t in range(game_length):
action = agent.choose_action()
next_state, reward, done, terminal, info = env.step(action)
agent.learn(action, reward)

reward_history[t] = reward
``````

Learning plot:

And the posterior policy:

You can see the agent learned to (pretty much) always play the optimal action (action 3), though it also played quite a bit of action 5, which is nearly as good.

A cool extension of this would be to dump the conjugate stuff and actually do MCMC inside the `learn` method. You could follow this (slightly out of date) example to see how to use a sampled posterior as a prior.

Also, here’s the posterior probability of reward for each action:

``````import xarray as xr
post = xr.DataArray(agent.f_action(100_000),
coords={'draw':np.arange(100_000), 'action':np.arange(10)},
dims=['draw', 'action'])
az.plot_posterior(post, grid=(2, 5), ref_val=env.p.tolist());
``````

2 Likes

WOW!!! This really is incredible. I really appreciate the detailed explanations as it helps me learn a lot more. It would be really nice if you submit your work to the PYMC examples. I am sure others will find this very helpful.

Would you mind explaining what the pm.Beta.dist().owner.outputs is doing?

Thank you for taking the time to respond to my post.

It’s accessing hidden variables needed for `updates` so that the Beta draws change everytime the function is evaluated. This isn’t needed if you use `pymc.pytensorf.compile_pymc` instead of `pytensor.function`, as the pymc helper takes care of that automatically.

1 Like

I also could have just directly used a `RandomStream`, as in here, yes?

Looking back, I regret putting that bit in the code I presented, because it’s a bit too low-level. Here’s a more obvious implementation, although it’s significantly slower:

``````class ConjugateAgent:

def __init__(self, n_actions, initial_alphas=None, initial_betas=None):
self.n_actions = n_actions

if initial_alphas is None:
initial_alphas = np.ones(n_actions)
elif isinstance(initial_alphas, (int, float)):
initial_alphas = np.ones(n_actions) * initial_alphas
if initial_betas is None:
initial_betas = np.ones(n_actions)
elif isinstance(initial_betas, (int, float)):
initial_betas = np.ones(n_actions) * initial_betas

self.alphas = pytensor.shared(initial_alphas, name='alphas')
self.betas = pytensor.shared(initial_betas, name='betas')
self.n_pulls = pytensor.shared(np.zeros(n_actions), name='n_pulls')

self.action_distribution = pm.Beta.dist(alpha=self.alpha, beta=self.betas)

def choose_action(self, n_pulls=1):
action_probs = pm.draw(self.action_distribution, n_pulls)
return np.argmax(action_probs, axis=-1).squeeze()
``````

The `learn` method is unchanged.

That compiles a function everytime an action is taken (pm.draw). For repeated draws it’s better to do it like you first did.

You can use RandomStreams yes, but I prefer `compile_pymc`. Although a better name wouldn’t hurt.

Thank you for the clarification! I have never seen that before. I will read through the pytensor docs to learn more.

1 Like

Thank you for your help. I am glad that you did it the other way first as it gives me the opportunity to learn a more efficient way of coding it.

2 Likes