Thompson sampling example

Hi, I was looking for an example of Thompson sampling in PYMC but couldn’t find one. So I put a simple example together. I am very new Thompson sampling and was hoping I could get another pair of eyeballs on my example. I think it would be nice to have an example of this in PYMC.

Here is my implementation of a simple two-arm bandits using conjugate priors beta-binomial. Please share any suggestions to improve the example.

import arviz as az
import numpy as np
import pymc as pm
import pytensor.tensor as pt
print(f"Running on PyMC v{pm.__version__}")


class ThompsonAgent:
    def __init__(self, true_beta0: float, true_beta1: float, beta0_posterior: float = 0.0, beta1_posterior: float = 0.0):
        self.trials = dict(
                            arm1 = dict(
                                success = [],
                                failure = [],
                                trials = []
                            ),
                            arm2 = dict(
                                success = [],
                                failure = [],
                                trials = []
                            )
                        )
        self.true_beta0 = true_beta0
        self.true_beta1 = true_beta1
        self.beta0_posterior = beta0_posterior
        self.beta1_posterior = beta1_posterior
    
    def pull_arms(self, num_pulls: int, beta0: float, beta1: float) -> None:
        # arm pull
        if beta0 > beta1:
            arm1 = pm.Binomial.dist(n = num_pulls, p = beta0).eval()
            arm2 = num_pulls - arm1
        elif beta0 < beta1:
            arm2 = pm.Binomial.dist(n = num_pulls, p = beta1).eval()
            arm1 = num_pulls - arm2
        else:
            arm1 = np.array([int(num_pulls / 2)]).reshape(-1, 1)
            arm2 = num_pulls - arm1

        # observed rewards
        Y_0 = pm.Binomial.dist(n = arm1, p = self.true_beta0).eval()
        Y_1 = pm.Binomial.dist(n = arm2, p = self.true_beta1).eval()
        self.trials['arm1']['success'].append(Y_0)
        self.trials['arm1']['failure'].append(arm1 - Y_0)
        self.trials['arm1']['trials'].append(arm1)

        self.trials['arm2']['success'].append(Y_1)
        self.trials['arm2']['failure'].append(arm2 - Y_1)
        self.trials['arm2']['trials'].append(arm2)

    def init_model(self, num_pulls: int) -> None:
        self.pull_arms(num_pulls = num_pulls, beta0 = self.beta0_posterior, beta1 = self.beta1_posterior)
        with pm.Model() as basic_model:
            # Data Containers
            beta0_success = pm.MutableData(name = 'beta0_success', value=np.sum(self.trials['arm1']['success']))
            beta1_success = pm.MutableData(name = 'beta1_success', value=np.sum(self.trials['arm2']['success']))

            beta0_failure = pm.MutableData(name = 'beta0_failure', value=np.sum(self.trials['arm1']['failure']))
            beta1_failure = pm.MutableData(name = 'beta1_failure', value=np.sum(self.trials['arm2']['failure']))

            Y0_trials = pm.MutableData(name = 'Y0_trials', value=np.sum(self.trials['arm1']['trials']))
            Y1_trials = pm.MutableData(name = 'Y1_trials', value=np.sum(self.trials['arm2']['trials']))

            # Priors for unknown model parameters
            beta0 = pm.Beta("beta0", alpha = 1 + beta0_success, beta = 1 + beta0_failure)
            beta1 = pm.Beta("beta1", alpha = 1 + beta1_success, beta = 1 + beta1_failure)

            # Likelihood 
            Y0_obs = pm.Binomial("Y0_obs", n = Y0_trials, p = beta0, observed=beta0_success)
            Y1_obs = pm.Binomial("Y1_obs", n = Y1_trials, p = beta1, observed=beta1_success)

            # draw a posterior sample from each arm
            trace = pm.sample(draws=1, chains=1, progressbar = False)
        # Get arm samples
        self.beta0_posterior = trace['posterior']['beta0'].values
        self.beta1_posterior = trace['posterior']['beta1'].values

    def update_priors(self, num_trials: int, num_pulls: int) -> None:
        for _ in range(num_trials):
            self.pull_arms(num_pulls = num_pulls, beta0 = beta0_posterior, beta1 = beta1_posterior)
            with basic_model:
                # Update data with posterior
                beta0_success = pm.set_data({'beta0_success': np.sum(trials['arm1']['success'])})
                beta1_success = pm.set_data({'beta1_success': np.sum(trials['arm2']['success'])})

                beta0_failure = pm.set_data({'beta0_failure': np.sum(trials['arm1']['failure'])})
                beta1_failure = pm.set_data({'beta1_failure': np.sum(trials['arm2']['failure'])})

                Y0_trials = pm.set_data({'Y0_trials': np.sum(trials['arm1']['trials'])})
                Y1_trials = pm.set_data({'Y1_trials': np.sum(trials['arm2']['trials'])})
                trace = pm.sample(draws=1, chains=1, progressbar = False)
            self.beta0_posterior = trace['posterior']['beta0'].values
            self.beta1_posterior = trace['posterior']['beta1'].values
agent = ThompsonAgent(true_beta0 = 0.4, true_beta1 = 0.7)
agent.init_model(num_pulls = 100)
agent.update_priors(num_trials = 10, num_pulls = 100)
agent.beta0_posterior
agent.beta1_posterior

This is a really cool problem! I love RL, and it would be great to see some work on agents in PyMC.

I think one place to start would be to divide your agent and your environment. The best practice is to create a gym environment, so I started there. I found this repo that had a k-armed bandit environment a la Sutton and Barto chapter 2, which I adapted to the win/lose setup:

import pymc as pm
import numpy as np
import gym

import pytensor
import pytensor.tensor as pt

from scipy.special import expit

from typing import Optional
import matplotlib.pyplot as plt

class MultiarmedBanditsEnv(gym.Env):
    """Environment for multiarmed bandits"""
    metadata = {"render_modes": ["ansi"]}

    def __init__(self, render_mode: Optional[str]=None, nr_arms=10):
        self.action_space = gym.spaces.Discrete(nr_arms)
        self.observation_space = gym.spaces.Discrete(1)
        self.state = 0

    def step(self, action : int):
        assert self.action_space.contains(
            action
        ), f"{action!r} ({type(action)}) invalid"

        reward = (self.np_random.uniform(0, 1) < self.p[action]) * 1.0

        return self.state, reward.item(), False, False, {'optimal' : self.optimal}

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        
        self.p = expit(self.np_random.standard_normal(self.action_space.n) - 1)
        self.optimal = np.argmax(self.p)

        return self.state, {'optimal' : self.optimal}

    def render(self, mode='human', close=False):
        return "You are playing a %d-armed bandit" % self.action_space.n

With that in hand, let’s look at the agent. pm.sample is actually used for MCMC sampling, which you’re not doing here. There is a different function, pm.draw, which can be used to get sample from an arbitrary PyMC distribution. You will need that to choose actions, but you don’t need it for updating parameters in this case, since you’re using a conjugate model. Instead, we just need to update the parameters according to the posterior formula.

For tracking the parameter values, I’m going to use pytensor.shared variables instead of dictionaries. These are special pytensor variables that, unlike symbolic tensors, can be be updated in real time. So the workflow will be:

  1. Sample action probabilities from beta distributions using the prior parameters
  2. Choose the best action by taking the argmax over those probabilities
  3. Perform the action, receive a reward signal from the environment
  4. Update the parameters using the conjugate formula based on the action and reward

Here is my agent class:

class ConjugateAgent:

    def __init__(self, n_actions, initial_alphas=None, initial_betas=None):
        self.n_actions = n_actions

        if initial_alphas is None:
            initial_alphas = np.ones(n_actions)
        elif isinstance(initial_alphas, (int, float)):
            initial_alphas = np.ones(n_actions) * initial_alphas
        if initial_betas is None:
            initial_betas = np.ones(n_actions)
        elif isinstance(initial_betas, (int, float)):
            initial_betas = np.ones(n_actions) * initial_betas
        
        self.alphas = pytensor.shared(initial_alphas, name='alphas')
        self.betas = pytensor.shared(initial_betas, name='betas')
        self.n_pulls = pytensor.shared(np.zeros(n_actions), name='n_pulls')
        
        # This machinery is to make the agent sample actions faster than using pm.draw
        rng = pytensor.shared(np.random.default_rng())
        n_draws = pt.iscalar('n_draws')

        new_rng, action_distribution = pm.Beta.dist(alpha=self.alphas, 
                                                    beta=self.betas,
                                                    shape=(n_draws, n_actions),
                                                    rng=rng).owner.outputs
        self.f_action = pytensor.function([n_draws], action_distribution, updates={rng:new_rng})

    def choose_action(self, n_pulls=1):
        action_probs = self.f_action(n_pulls)
        return np.argmax(action_probs, axis=-1).squeeze()

    def learn(self, action, reward):
        current_alpha = self.alphas.get_value()
        current_beta = self.betas.get_value()
        current_n_pulls = self.n_pulls.get_value()

        current_n_pulls[action] += 1
        current_alpha[action] += reward
        current_beta[action] += 1 - reward
        
        self.alphas.set_value(current_alpha)
        self.betas.set_value(current_beta)
        self.n_pulls.set_value(current_n_pulls)

Sutton and Barto use games of 1000 timesteps to evaluate their agents, so we can do the same:

env = MultiarmedBanditsEnv()
agent = ConjugateAgent(env.action_space.n, initial_alphas=1, initial_betas=1)

state, info = env.reset()
game_length = 1000
reward_history = np.zeros(game_length)
for t in range(game_length):
    action = agent.choose_action()
    next_state, reward, done, terminal, info = env.step(action)
    agent.learn(action, reward)
    
    reward_history[t] = reward

Learning plot:

image

And the posterior policy:

image

You can see the agent learned to (pretty much) always play the optimal action (action 3), though it also played quite a bit of action 5, which is nearly as good.

A cool extension of this would be to dump the conjugate stuff and actually do MCMC inside the learn method. You could follow this (slightly out of date) example to see how to use a sampled posterior as a prior.

Also, here’s the posterior probability of reward for each action:

import xarray as xr
post = xr.DataArray(agent.f_action(100_000), 
                    coords={'draw':np.arange(100_000), 'action':np.arange(10)},
                   dims=['draw', 'action'])
az.plot_posterior(post, grid=(2, 5), ref_val=env.p.tolist());

2 Likes

WOW!!! This really is incredible. I really appreciate the detailed explanations as it helps me learn a lot more. It would be really nice if you submit your work to the PYMC examples. I am sure others will find this very helpful.

Would you mind explaining what the pm.Beta.dist().owner.outputs is doing?

Thank you for taking the time to respond to my post.

It’s accessing hidden variables needed for updates so that the Beta draws change everytime the function is evaluated. This isn’t needed if you use pymc.pytensorf.compile_pymc instead of pytensor.function, as the pymc helper takes care of that automatically.

1 Like

I also could have just directly used a RandomStream, as in here, yes?

Looking back, I regret putting that bit in the code I presented, because it’s a bit too low-level. Here’s a more obvious implementation, although it’s significantly slower:

class ConjugateAgent:

    def __init__(self, n_actions, initial_alphas=None, initial_betas=None):
        self.n_actions = n_actions

        if initial_alphas is None:
            initial_alphas = np.ones(n_actions)
        elif isinstance(initial_alphas, (int, float)):
            initial_alphas = np.ones(n_actions) * initial_alphas
        if initial_betas is None:
            initial_betas = np.ones(n_actions)
        elif isinstance(initial_betas, (int, float)):
            initial_betas = np.ones(n_actions) * initial_betas
        
        self.alphas = pytensor.shared(initial_alphas, name='alphas')
        self.betas = pytensor.shared(initial_betas, name='betas')
        self.n_pulls = pytensor.shared(np.zeros(n_actions), name='n_pulls')
        
        self.action_distribution = pm.Beta.dist(alpha=self.alpha, beta=self.betas)

    def choose_action(self, n_pulls=1):
        action_probs = pm.draw(self.action_distribution, n_pulls)
        return np.argmax(action_probs, axis=-1).squeeze()

The learn method is unchanged.

That compiles a function everytime an action is taken (pm.draw). For repeated draws it’s better to do it like you first did.

You can use RandomStreams yes, but I prefer compile_pymc. Although a better name wouldn’t hurt.

Thank you for the clarification! I have never seen that before. I will read through the pytensor docs to learn more.

1 Like

Thank you for your help. I am glad that you did it the other way first as it gives me the opportunity to learn a more efficient way of coding it.

2 Likes