Source code for domainlab.utils.hyperparameter_sampling

"""
Samples the hyperparameters according to a benchmark configuration file.

# Structure of this file:
- Class Hyperparameter
# Inherited Classes
# Functions to sample hyper-parameters and log into csv file
"""
import copy
import json
import os
from ast import literal_eval  # literal_eval can safe evaluate python expression
from pydoc import locate
from typing import List

import numpy as np
import pandas as pd

from domainlab.utils.get_git_tag import get_git_tag
from domainlab.utils.logger import Logger

G_MODEL_NA = "model"
G_METHOD_NA = "method"


[docs] class Hyperparameter: """ Represents a hyperparameter. The datatype of .val is int if step and p1 is integer valued, else float. p1: min or mean p2: max or scale reference: None or name of referenced hyperparameter """ def __init__(self, name: str): self.name = name self.val = 0 def _ensure_step(self): """Make sure that the hyperparameter sticks to the discrete grid""" raise NotImplementedError
[docs] def sample(self): """Sample this parameter, respecting properties""" raise NotImplementedError
[docs] def get_val(self): """Returns the current value of the hyperparameter""" return self.val
[docs] def datatype(self): """ Returns the datatype of this parameter. This does not apply for references. """ raise NotImplementedError
[docs] class SampledHyperparameter(Hyperparameter): """ A numeric hyperparameter that shall be sampled """ def __init__(self, name: str, config: dict): super().__init__(name) self.step = config.get("step", 0) try: self.distribution = config["distribution"] if self.distribution in {"uniform", "loguniform"}: self.p_1 = config["min"] self.p_2 = config["max"] elif self.distribution in {"normal", "lognormal"}: self.p_1 = config["mean"] self.p_2 = config["std"] else: raise RuntimeError( f"Unsupported distribution type: {self.distribution}." ) except KeyError as ex: raise RuntimeError(f"Missing required key for parameter {name}.") from ex self.p_1 = float(self.p_1) self.p_2 = float(self.p_2) def _ensure_step(self): """Make sure that the hyperparameter sticks to the discrete grid""" if self.step == 0: return # continous parameter # round to next discrete value. # p_1 is the lower bound of the hyper-parameter range, p_2 the upper bound off = (self.val - self.p_1) % self.step # $off$ is always smaller than step, depending on whether $off$ falls on the left half # or right half of [0, step], move the hyper-parameter to the boundary so that # updated hyper-parameter % step = 0 if off < self.step / 2: self.val -= off else: self.val += self.step - off # ensure correct datatype if self.datatype() == int: self.val = self.datatype()(np.round(self.val))
[docs] def sample(self): """Sample this parameter, respecting properties""" if self.distribution == "uniform": self.val = np.random.uniform(self.p_1, self.p_2) elif self.distribution == "loguniform": self.val = 10 ** np.random.uniform(np.log10(self.p_1), np.log10(self.p_2)) elif self.distribution == "normal": self.val = np.random.normal(self.p_1, self.p_2) elif self.distribution == "lognormal": self.val = 10 ** np.random.normal(self.p_1, self.p_2) else: raise RuntimeError(f"Unsupported distribution type: {self.distribution}.") self._ensure_step()
[docs] def datatype(self): return int if self.step % 1 == 0 and self.p_1 % 1 == 0 else float
[docs] class ReferenceHyperparameter(Hyperparameter): """ Hyperparameter that references only a different one. Thus, this parameter is not sampled but set after sampling. """ def __init__(self, name: str, config: dict): super().__init__(name) self.reference = config.get("reference", None) def _ensure_step(self): """Make sure that the hyperparameter sticks to the discrete grid""" # nothing to do for references return
[docs] def sample(self): """Sample this parameter, respecting properties""" # nothing to do for references return
[docs] def datatype(self): raise RuntimeError("Datatype unknown for ReferenceHyperparameter")
[docs] class CategoricalHyperparameter(Hyperparameter): """ A sampled hyperparameter, which is constraint to fixed, user given values and datatype """ def __init__(self, name: str, config: dict): super().__init__(name) self.allowed_values = config["values"] if "datatype" not in config: raise RuntimeError( "Please specifiy datatype for all categorical hyper-parameters!, e.g. datatype=str" ) self.type = locate(config["datatype"]) self.allowed_values = [self.type(v) for v in self.allowed_values] def _ensure_step(self): """Make sure that the hyperparameter sticks to the discrete grid""" # nothing to do for categorical ones return
[docs] def sample(self): """Sample this parameter, respecting properties""" # nothing to do for references idx = np.random.randint(0, len(self.allowed_values)) self.val = self.allowed_values[idx]
[docs] def datatype(self): return self.type
[docs] def get_hyperparameter(name: str, config: dict) -> Hyperparameter: """Factory function. Instantiates the correct Hyperparameter""" if "reference" in config.keys(): return ReferenceHyperparameter(name, config) dist = config.get("distribution", None) if dist == "categorical": return CategoricalHyperparameter(name, config) return SampledHyperparameter(name, config)
[docs] def check_constraints(params: List[Hyperparameter], constraints) -> bool: """Check if the constraints are fulfilled.""" # set each param as a local variable for par in params: locals().update({par.name: par.val}) # set references for par in params: if isinstance(par, ReferenceHyperparameter): try: setattr(par, "val", eval(par.reference)) # NOTE: literal_eval will cause ValueError: malformed node or string except Exception as ex: logger = Logger.get_logger() logger.info(f"error in evaluating expression: {par.reference}") raise ex locals().update({par.name: par.val}) if constraints is None: return True # shortcut # check all constraints for constr in constraints: try: const_res = eval(constr) # NOTE: literal_eval will cause ValueError: malformed node or string except SyntaxError as ex: raise SyntaxError(f"Invalid syntax in yaml config: {constr}") from ex if not const_res: return False return True
[docs] def sample_parameters( init_params: List[Hyperparameter], constraints, shared_config=None, shared_samples=None, ) -> dict: """ Tries to sample from the hyperparameter list. Errors if in 10_0000 attempts no sample complying with the constraints is found. """ for _ in range(10_000): params = copy.deepcopy(init_params) for par in params: par.sample() # add a random hyperparameter from the shared hyperparameter dataframe if shared_samples is not None: # sample one line from the pandas dataframe shared_samp = shared_samples.sample(1).iloc[0]["params"] for key in shared_samp.keys(): par = Hyperparameter(key) par.val = shared_samp[key] par.name = key params.append(par) # check constrained if check_constraints(params, constraints): samples = {} for par in params: samples[par.name] = par.val return samples # if there was no sample found fullfilling the constrained above, # this may be due to the shared hyperparameters. # If so, new samples are generated for the shared hyperparameters logger = Logger.get_logger() logger.warning( "The constrainds coundn't be met with the shared Hyperparameters, " "shared dataframe pool will be ignored for now." ) for _ in range(10_000): params = copy.deepcopy(init_params) # add the shared hyperparameter as a sampled hyperparameter if shared_samples is not None: shared_samp = shared_samples.sample(1).iloc[0]["params"] for key in shared_samp.keys(): par = SampledHyperparameter(key, shared_config[key]) par.sample() par.name = key params.append(par) for par in params: par.sample() # check constrained if check_constraints(params, constraints): samples = {} for par in params: samples[par.name] = par.val return samples raise RuntimeError( "Could not find an acceptable sample in 10,000 runs." "Are the bounds and constraints reasonable?" )
[docs] def create_samples_from_shared_samples( shared_samples: pd.DataFrame, config: dict, task_name: str ): """ add informations like task, G_MODEL_NA and constrainds to the shared samples Parameters: shared_samples: pd Dataframe with columns [G_METHOD_NA, G_MODEL_NA, 'params'] config: dataframe with yaml configuration of the current task task_name: name of the current task """ shared_samp = shared_samples.copy() shared_samp[G_MODEL_NA] = config["model"] shared_samp[G_METHOD_NA] = task_name # respect the constraints if specified in the task if "constraints" in config.keys(): for idx in range(shared_samp.shape[0] - 1, -1, -1): name = list(shared_samp["params"].iloc[idx].keys())[0] value = shared_samp["params"].iloc[idx][name] par = Hyperparameter(name) par.val = value if not check_constraints([par], config["constraints"]): shared_samp = shared_samp.drop(idx) return shared_samp
[docs] def sample_task_only_shared( num_samples, task_name, sample_df, config, shared_conf_samp ): """ sample one task and add it to the dataframe for task descriptions which only contain shared hyperparameters """ shared_config, shared_samples = shared_conf_samp # copy the shared samples dataframe and add the corrct G_MODEL_NA and taks names shared_samp = create_samples_from_shared_samples(shared_samples, config, task_name) # for the case that we expect more hyperparameter samples for the G_MODEL_NArithm as provided # in the shared sampes we use the shared config to sample new hyperparameters to ensure # that we have distinct hyperparameters if num_samples - shared_samp.shape[0] > 0: s_config = shared_config.copy() s_dict = {} for keys in s_config.keys(): if keys != "num_shared_param_samples": s_dict[keys] = s_config[keys] if "constraints" in config.keys(): s_dict["constraints"] = config["constraints"] s_config["model"] = config["model"] s_config["hyperparameters"] = s_dict # sample new shared hyperparameters sample_df = sample_task( num_samples - shared_samp.shape[0], task_name, (s_config, sample_df), (None, None), ) # add previously sampled shared hyperparameters sample_df = sample_df.append(shared_samp, ignore_index=True) # for the case that the number of shared samples is >= the expected number of # sampled hyperparameters we randomly choose rows in the sampled hyperparameters df else: shared_samp = shared_samp.sample(num_samples) sample_df = sample_df.append(shared_samp, ignore_index=True) return sample_df
[docs] def sample_task( num_samples: int, task_name: str, conf_samp: tuple, shared_conf_samp: tuple ): """Sample one task and add it to the dataframe""" config, sample_df = conf_samp shared_config, shared_samples = shared_conf_samp if "hyperparameters" in config.keys(): # in benchmark configuration file, sub-section hyperparameters # means changing hyper-parameters params = [] for key, val in config["hyperparameters"].items(): if key in ("constraints", "num_shared_param_samples"): continue params += [get_hyperparameter(key, val)] constraints = config["hyperparameters"].get("constraints", None) for _ in range(num_samples): sample = sample_parameters( params, constraints, shared_config, shared_samples ) sample_df.loc[len(sample_df.index)] = [task_name, config["model"], sample] elif "shared" in config.keys(): sample_df = sample_task_only_shared( num_samples, task_name, sample_df, config, (shared_config, shared_samples) ) else: # add single line if no varying hyperparameters are specified. sample_df.loc[len(sample_df.index)] = [task_name, config["model"], {}] return sample_df
[docs] def is_dict_with_key(input_dict, key) -> bool: """Determines if the input argument is a dictionary and it has key""" return isinstance(input_dict, dict) and key in input_dict.keys()
[docs] def get_shared_samples( shared_samples_full: pd.DataFrame, shared_config_full: dict, task_config: dict ): """ - creates a dataframe with columns [task, G_MODEL_NA, params], task and G_MODEL_NA are all for all rows, but params is filled with the shared parameters of shared_samples_full requested by task_config. - creates a shared config containing only information about the shared hyperparameters requested by the task_config """ shared_samples = shared_samples_full.copy(deep=True) shared_config = shared_config_full.copy() if "shared" in task_config.keys(): shared = task_config["shared"] else: shared = [] for line_num in range(shared_samples.shape[0]): hyper_p_dict = shared_samples.iloc[line_num]["params"].copy() key_list = copy.deepcopy(list(hyper_p_dict.keys())) for key_ in key_list: if key_ not in shared: del hyper_p_dict[key_] shared_samples.iloc[line_num]["params"] = hyper_p_dict for key_ in key_list: if not key_ == "num_shared_param_samples": if key_ not in shared: del shared_config[key_] # remove all duplicates shared_samples = shared_samples.drop_duplicates(subset="params") return shared_samples, shared_config
[docs] def sample_hyperparameters( config: dict, dest: str = None, sampling_seed: int = None ) -> pd.DataFrame: """ Samples the hyperparameters according to the given config, which should be the dictionary of the full benchmark config yaml. Result is saved to 'output_dir/hyperparameters.csv' of the config if not specified explicitly. Note: Parts of the yaml content are executed. Thus use this only with trusted config files. """ if dest is None: dest = config["output_dir"] + os.sep + "hyperparameters.csv" if sampling_seed is not None: np.random.seed(sampling_seed) num_samples = config["num_param_samples"] samples = pd.DataFrame(columns=[G_METHOD_NA, G_MODEL_NA, "params"]) if "Shared params" in config.keys(): shared_config_full = config["Shared params"] shared_samples_full = pd.DataFrame(columns=[G_METHOD_NA, G_MODEL_NA, "params"]) shared_val = {"model": "all", "hyperparameters": config["Shared params"]} # fill up the dataframe shared samples shared_samples_full = sample_task( shared_config_full["num_shared_param_samples"], "all", (shared_val, shared_samples_full), (None, None), ) else: shared_samples_full = None for key, val in config.items(): if is_dict_with_key(val, "model"): if shared_samples_full is not None: shared_samples, shared_config = get_shared_samples( shared_samples_full, shared_config_full, val ) else: shared_config = None shared_samples = None samples = sample_task( num_samples, key, (val, samples), (shared_config, shared_samples) ) os.makedirs(os.path.dirname(dest), exist_ok=True) # create a txt file with the commit information with open( config["output_dir"] + os.sep + "commit.txt", "w", encoding="utf8" ) as file: file.writelines("use git log |grep \n") file.writelines("consider remove leading b in the line below \n") file.write(get_git_tag()) with open( config["output_dir"] + os.sep + "config.txt", "w", encoding="utf8" ) as file: json.dump(config, file) samples.to_csv(dest) return samples