Hi guys, would be very glad if you can help me with Bandit Optimization issue.
I have three arms with three mean accordingly:
arm_0 0.248503
arm_1 0.333333
arm_2 0.498498
After Thompson Sampling the weights values are
arm_0: 0.0
arm_1: 0.0
arm_2: 1.0
I'm wondering why one arm gets 100 % and another two zero ? Even then the difference between means is not really significant ?
Can anybody give me any way forward on the above issue ?
Thank you very much in advance
Hi @LN5user ! Would you be able to give us a full code sample so we can repro?
Hello! Thank you for the quick response
Metric:
from ax import Metric
from ax.core.data import Data
from ax.utils.stats.statstools import agresti_coull_sem
import pandas as pd
from typing import Any
class SimpleBanditMetric(Metric):
def __init__(self, name: str):
super().__init__(name)
self._evaluation_result = None
@property
def evaluation_result(self):
return self._evaluation_result
@evaluation_result.setter
def evaluation_result(self, result):
self._evaluation_result = result
def prevent_div_by_zero(self, value):
"""
Changes a Value to a fixed parameter if its zero.
"""
fixture = 0.01
return fixture if not value > 0 else value
def fetch_trial_data(self, trial: "core.base_trial.BaseTrial") -> Data:
"""
Fetches Data of the previous round for each arm of the current Trial.
It returns the values in an adjusted pandas dataframe.
"""
records = []
for arm_name, arm in trial.arms_by_name.items():
variant = list(arm.parameters.values())[0]
record = {
"arm_name": arm_name,
"metric_name": self.name,
"mean": self.prevent_div_by_zero(self.evaluation_result.get_mean(variant)),
"sem": self.prevent_div_by_zero(self.evaluation_result.get_sem(variant)),
"trial_index": trial.index,
}
records.append(record)
return Data(df=pd.DataFrame.from_records(records))
Optimizer
from typing import Dict
from numpy import log
from ax import (ChoiceParameter, Experiment, Metric, Models, Objective,
ParameterType, SearchSpace, load, save)
from ax.core.optimization_config import OptimizationConfig
from modules.optimizer.abstract_optimizer_service import Optimizer
from modules.optimizer.simple_bandit.model.metric import SimpleBanditMetric
from modules.optimizer.simple_bandit.model.runner import SimpleBanditRunner
class SimpleBanditOptimizer(Optimizer):
"""
Manages the Lifecycle of the FAx Experiment from creation,
to the initial factorial run (to generate arms) and
finally to its optimition runs (repeatedly executed after
each generated batch has been consumed).
"""
def __init__(self, fax_experiment: Experiment):
self._fax_experiment = fax_experiment
@staticmethod
def create(name: str, metadata: Dict):
"""
Creates a FAx Experiment using a list of unique integer values
to define the SearchSpace. The Batch size is necessary to define
the length of the Output of randomized integer values.
"""
search_space = SearchSpace(
parameters=[
ChoiceParameter(
name="variant",
parameter_type=ParameterType.INT,
values=metadata["variants"],
)
]
)
fax_experiment = Experiment(
name=name,
search_space=search_space,
runner=SimpleBanditRunner(metadata["batch_size"], metadata["mode"])
)
return fax_experiment
def run(self, last_round_object):
"""
Runs a full optimization round and returns either a batch,
generated by using the optimized weight values, or if its
the first run, using equally distributed weights.
"""
if self._last_trial == {}:
return self._initial_trial()
metric = SimpleBanditMetric(name="simple_bandit")
metric.evaluation_result = last_round_object
optimization_config = OptimizationConfig(
objective=Objective(
metric= metric,
minimize=False,
)
)
self.fax_experiment.optimization_config = optimization_config
return self._thompson_sampler()
def _thompson_sampler(self):
"""
# THOMPSON Sampler
We use Thompson Sampling to suggest a set of arms (combinations of factors
and levels) on which to collect more data
We run TS, which assigns a weight to each arm that is proportional
to the probability of that arm being the best.
"""
data = self._last_trial.fetch_data()
last_run_metadata = self._last_trial.run_metadata
thompson = Models.THOMPSON(experiment=self._fax_experiment, data=data,
min_weight=-1, uniform_weights=False)
thompson_run = thompson.gen(n=-1)
current_trial = self.fax_experiment.new_batch_trial(generator_run=thompson_run)
current_trial.run()
current_trial.mark_completed()
return self._enrich_run_metadata(current_trial.run_metadata,last_run_metadata)
def _initial_trial(self):
"""
Executes a factorial Run to initialize the first set of Arms.
Returns a randomized batch of uniformly distributed variation IDs.
"""
factorial = Models.FACTORIAL(search_space=self.fax_experiment.search_space)
factorial_run = factorial.gen(n=-1)
factorial_trial = self.fax_experiment.new_batch_trial(generator_run=factorial_run)
factorial_trial.run()
factorial_trial.mark_completed()
return self._enrich_run_metadata(factorial_trial.run_metadata)
def _enrich_run_metadata(self, rmd, rmd_last = {}):
"""
Calculates the difference and relative change between the current weight
of a variant and its previous weight.
"""
rmd["delta"] = {}
rmd["delta_rel"] = {}
if rmd_last != {}:
for variant, w_old in rmd_last["weights"].items():
variant = rmd_last["map"][variant]
if variant in rmd["map"].keys():
w_new = rmd["weights"][variant]
rmd["delta"][variant] = w_new - w_old
rmd["delta_rel"][variant] = (w_new - w_old)/w_old
return rmd
@property
def _last_trial(self):
"""
Return the latest Trial
"""
try:
return self._get_trials[-1]
except:
return {}
@property
def _get_trials(self):
"""
Returns a list containing every trial of the experiment.
"""
return list(self.fax_experiment.trials.values())
@property
def fax_experiment(self):
return self._fax_experiment
@fax_experiment.setter
def fax_experiment(self, fax_experiment):
self._fax_experiment = fax_experiment
Runner
from ax import Runner
from numpy.random import choice, shuffle
from collections import Counter
from typing import Dict, Tuple, List
def gen_batch(weights: Dict[int,float], batch_size: int) -> List[int]:
"""
Generates a list in randomized order out of Variation IDs in the
ratio of its weight to the batch_size.
"""
#calculates the quantity of each parameter in respect to the batch_size and its weight
weights_t = {key:batch_size/sum(weights.values())*value for key, value in weights.items()}
#separates the floating numbers into its integer and its decimal ranges
weights_t = {key:[int(value), value % 1] for key, value in weights_t.items()}
#return a list of parameter to ensure the right order
keys = list(weights_t.keys())
values = [weights_t[key] for key in keys]
size = batch_size -sum([v[0] for v in values])
#generates a list of parameter in respect to its weights decimal range to fill the remaining spaces
if size > 0:
for key in choice(a=keys, size=size, p=[v[1]/size for v in values]):
weights_t[key][0] += 1
#generates the still ordered batch using the ordained counts of each parameter
batch = [key for key, value in weights_t.items() for _ in range(value[0])]
shuffle(batch)
return batch
class SimpleBanditRunner(Runner):
"""
Customized Runner for the SimpleBandit Optimization Module.
This specific Runner generates a new Batch containing random
variation IDs in the distribution of the resulting weights of
the preceding preformed optimation process.
"""
def __init__(self, batch_size: int, mode: str):
self.batch_size = batch_size
self.mode = mode
def run(self, trial: "core.base_trial.BaseTrial"):
"""
Extracts the normalized arm weights from the Trial after its
optimization phase, and generates a new population in respect
of the normalized arm weights in randomized order.
The return Value will be stored in the parameter run_metadata
"""
params_with_weights = trial.normalized_arm_weights()
vid_to_weight = {
([vid for vid in arm.parameters.values()][0]): weight
for arm, weight in params_with_weights.items()
}
if self.mode == "static":
vid_to_weight_vers = {vid: 1/len(vid_to_weight) for vid in vid_to_weight.keys()}
elif self.mode == "dynamic":
vid_to_weight_vers = vid_to_weight
else:
raise ValueError
return {
"weights":vid_to_weight,
"batch":gen_batch(vid_to_weight_vers, self.batch_size),
"map":{vid:vid for vid in vid_to_weight.keys()}
}
Logging from Metric
arm_name mean metric_name sem trial_index
0 0_0 0.249249 simple_bandit 0.023799 0
1 0_1 0.332335 simple_bandit 0.025813 0
2 0_2 0.498498 simple_bandit 0.027400 0
Thanks! Assigning to @2timesjay to try to repro / investigate.
Hi,
Looking at the means and standard error of the means, arm3 should be statistically significantly better than the other two arms.
Just eyeballing it, the 95% lower bound for arm 2 and 95% Upper bounds for arms 0 and 1 don’t cross, so I would expect arm 2 to get 100% of the weight.
Note that it is not uncommon for TS to put arbitrary weight on arms with the same true mean (such that noise in the measurement of one arm may lead to one of the other arms winning out). However, that is totally OK from a regret minimization perspective.
Ax by default uses an empirical Bayes estimator that considers the measurements of all other arms as a prior. This tends to causal the algorithm to (on average) select the set of best arms with higher probability compared with vanilla TS, and ultimately select the best arm with higher probability. And of course, TS, compared with uniform sampling (when there are >2 arms), will find the best arm with higher probability because it focuses more samples on arms that are likely to be better.
E
1 0_1 0.332335 simple_bandit 0.025813 0
2 0_2 0.498498 simple_bandit 0.027400
e
Sent from my iPhone
On Jul 22, 2020, at 11:01 PM, LN5user notifications@github.com wrote:

Hello! Thank you for the quick responseMetric:
from ax import Metric
from ax.core.data import Data
from ax.utils.stats.statstools import agresti_coull_sem
import pandas as pd
from typing import Anyclass SimpleBanditMetric(Metric):
def __init__(self, name: str): super().__init__(name) self._evaluation_result = None @property def evaluation_result(self): return self._evaluation_result @evaluation_result.setter def evaluation_result(self, result): self._evaluation_result = result def prevent_div_by_zero(self, value): """ Changes a Value to a fixed parameter if its zero. """ fixture = 0.01 return fixture if not value > 0 else value def fetch_trial_data(self, trial: "core.base_trial.BaseTrial") -> Data: """ Fetches Data of the previous round for each arm of the current Trial. It returns the values in an adjusted pandas dataframe. """ records = [] for arm_name, arm in trial.arms_by_name.items(): variant = list(arm.parameters.values())[0] record = { "arm_name": arm_name, "metric_name": self.name, "mean": self.prevent_div_by_zero(self.evaluation_result.get_mean(variant)), "sem": self.prevent_div_by_zero(self.evaluation_result.get_sem(variant)), "trial_index": trial.index, } records.append(record) return Data(df=pd.DataFrame.from_records(records))Optimizer
from typing import Dict
from numpy import log
from ax import (ChoiceParameter, Experiment, Metric, Models, Objective,
ParameterType, SearchSpace, load, save)
from ax.core.optimization_config import OptimizationConfigfrom modules.optimizer.abstract_optimizer_service import Optimizer
from modules.optimizer.simple_bandit.model.metric import SimpleBanditMetric
from modules.optimizer.simple_bandit.model.runner import SimpleBanditRunnerclass SimpleBanditOptimizer(Optimizer):
"""
Manages the Lifecycle of the FAx Experiment from creation,
to the initial factorial run (to generate arms) and
finally to its optimition runs (repeatedly executed after
each generated batch has been consumed).
"""def __init__(self, fax_experiment: Experiment): self._fax_experiment = fax_experiment @staticmethod def create(name: str, metadata: Dict): """ Creates a FAx Experiment using a list of unique integer values to define the SearchSpace. The Batch size is necessary to define the length of the Output of randomized integer values. """ search_space = SearchSpace( parameters=[ ChoiceParameter( name="variant", parameter_type=ParameterType.INT, values=metadata["variants"], ) ] ) fax_experiment = Experiment( name=name, search_space=search_space, runner=SimpleBanditRunner(metadata["batch_size"], metadata["mode"]) ) return fax_experiment def run(self, last_round_object): """ Runs a full optimization round and returns either a batch, generated by using the optimized weight values, or if its the first run, using equally distributed weights. """ if self._last_trial == {}: return self._initial_trial() metric = SimpleBanditMetric(name="simple_bandit") metric.evaluation_result = last_round_object optimization_config = OptimizationConfig( objective=Objective( metric= metric, minimize=False, ) ) self.fax_experiment.optimization_config = optimization_config return self._thompson_sampler() def _thompson_sampler(self): """ # THOMPSON Sampler We use Thompson Sampling to suggest a set of arms (combinations of factors and levels) on which to collect more data We run TS, which assigns a weight to each arm that is proportional to the probability of that arm being the best. """ data = self._last_trial.fetch_data() last_run_metadata = self._last_trial.run_metadata thompson = Models.THOMPSON(experiment=self._fax_experiment, data=data, min_weight=-1, uniform_weights=False) thompson_run = thompson.gen(n=-1) current_trial = self.fax_experiment.new_batch_trial(generator_run=thompson_run) current_trial.run() current_trial.mark_completed() return self._enrich_run_metadata(current_trial.run_metadata,last_run_metadata) def _initial_trial(self): """ Executes a factorial Run to initialize the first set of Arms. Returns a randomized batch of uniformly distributed variation IDs. """ factorial = Models.FACTORIAL(search_space=self.fax_experiment.search_space) factorial_run = factorial.gen(n=-1) factorial_trial = self.fax_experiment.new_batch_trial(generator_run=factorial_run) factorial_trial.run() factorial_trial.mark_completed() return self._enrich_run_metadata(factorial_trial.run_metadata) def _enrich_run_metadata(self, rmd, rmd_last = {}): """ Calculates the difference and relative change between the current weight of a variant and its previous weight. """ rmd["delta"] = {} rmd["delta_rel"] = {} if rmd_last != {}: for variant, w_old in rmd_last["weights"].items(): variant = rmd_last["map"][variant] if variant in rmd["map"].keys(): w_new = rmd["weights"][variant] rmd["delta"][variant] = w_new - w_old rmd["delta_rel"][variant] = (w_new - w_old)/w_old return rmd @property def _last_trial(self): """ Return the latest Trial """ try: return self._get_trials[-1] except: return {} @property def _get_trials(self): """ Returns a list containing every trial of the experiment. """ return list(self.fax_experiment.trials.values()) @property def fax_experiment(self): return self._fax_experiment @fax_experiment.setter def fax_experiment(self, fax_experiment): self._fax_experiment = fax_experimentRunner
from ax import Runner
from numpy.random import choice, shuffle
from collections import Counter
from typing import Dict, Tuple, Listdef gen_batch(weights: Dict[int,float], batch_size: int) -> List[int]:
"""
Generates a list in randomized order out of Variation IDs in the
ratio of its weight to the batch_size.
"""#calculates the quantity of each parameter in respect to the batch_size and its weight weights_t = {key:batch_size/sum(weights.values())*value for key, value in weights.items()} #separates the floating numbers into its integer and its decimal ranges weights_t = {key:[int(value), value % 1] for key, value in weights_t.items()} #return a list of parameter to ensure the right order keys = list(weights_t.keys()) values = [weights_t[key] for key in keys] size = batch_size -sum([v[0] for v in values]) #generates a list of parameter in respect to its weights decimal range to fill the remaining spaces if size > 0: for key in choice(a=keys, size=size, p=[v[1]/size for v in values]): weights_t[key][0] += 1 #generates the still ordered batch using the ordained counts of each parameter batch = [key for key, value in weights_t.items() for _ in range(value[0])] shuffle(batch) return batchclass SimpleBanditRunner(Runner):
"""
Customized Runner for the SimpleBandit Optimization Module.
This specific Runner generates a new Batch containing random
variation IDs in the distribution of the resulting weights of
the preceding preformed optimation process.
"""def __init__(self, batch_size: int, mode: str): self.batch_size = batch_size self.mode = mode def run(self, trial: "core.base_trial.BaseTrial"): """ Extracts the normalized arm weights from the Trial after its optimization phase, and generates a new population in respect of the normalized arm weights in randomized order. The return Value will be stored in the parameter run_metadata """ params_with_weights = trial.normalized_arm_weights() vid_to_weight = { ([vid for vid in arm.parameters.values()][0]): weight for arm, weight in params_with_weights.items() } if self.mode == "static": vid_to_weight_vers = {vid: 1/len(vid_to_weight) for vid in vid_to_weight.keys()} elif self.mode == "dynamic": vid_to_weight_vers = vid_to_weight else: raise ValueError return { "weights":vid_to_weight, "batch":gen_batch(vid_to_weight_vers, self.batch_size), "map":{vid:vid for vid in vid_to_weight.keys()} }Logging from Metric
arm_name mean metric_name sem trial_index
0 0_0 0.249249 simple_bandit 0.023799 0 0
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub, or unsubscribe.
@eytan gave a pretty complete answer; this is the expected behavior. Closing.