"""
gridsearch for the hyperparameter space
def add_next_param_from_list is an recursive function to make cartesian product along all the scalar hyper-parameters, this resursive function is used
in def grid_task
"""
import copy
import json
import os
import warnings
import numpy as np
import pandas as pd
import domainlab.utils.hyperparameter_sampling as sampling
from domainlab import g_name_num_shared_param_samples_rand_search
from domainlab.utils.get_git_tag import get_git_tag
from domainlab.utils.logger import Logger
G_MODEL_NA = "model"
G_METHOD_NA = "method"
[docs]
def round_to_discreate_grid_normal(grid, param_config):
"""
round the values of the grid to the grid spacing specified in the config
for normal and lognormal grids
"""
if float(param_config["step"]) == 0:
return grid
# for normal and lognormal no min and max is provided
# in this case the grid is constructed around the mean
neg_steps = np.ceil(
(float(param_config["mean"]) - np.min(grid)) / float(param_config["step"])
)
pos_steps = np.ceil(
(np.max(grid) - float(param_config["mean"])) / float(param_config["step"])
)
mini = float(param_config["mean"]) - float(param_config["step"]) * neg_steps
maxi = float(param_config["mean"]) + float(param_config["step"]) * pos_steps
discreate_gird = np.arange(mini, maxi, step=float(param_config["step"]))
for num, elem in enumerate(list(grid)):
grid[num] = discreate_gird[(np.abs(discreate_gird - elem)).argmin()]
return np.unique(grid)
[docs]
def normal_grid(param_config, lognormal=False):
"""
get a normal distributed grid given the specifications in the param_config
param_config: config which needs to contain 'num', 'mean', 'std'
"""
if int(param_config["num"]) == 1:
return np.array([float(param_config["mean"])])
# Box–Muller transform to get from a uniform distribution to a normal distribution
num = int(np.floor(int(param_config["num"]) / 2))
step = 2 / (int(param_config["num"]) + 1)
# for a even number of samples
if int(param_config["num"]) % 2 == 0:
param_grid = np.arange(step, 1, step=step)[:num]
stnormal_grid = np.sqrt(-2 * np.log(param_grid))
stnormal_grid = np.append(stnormal_grid, -stnormal_grid)
stnormal_grid = stnormal_grid / np.std(stnormal_grid)
stnormal_grid = float(param_config["std"]) * stnormal_grid + float(
param_config["mean"]
)
# for a odd number of samples
else:
param_grid = np.arange(step, 1, step=step)[:num]
stnormal_grid = np.sqrt(-2 * np.log(param_grid))
stnormal_grid = np.append(stnormal_grid, -stnormal_grid)
stnormal_grid = np.append(stnormal_grid, 0)
stnormal_grid = stnormal_grid / np.std(stnormal_grid)
stnormal_grid = float(param_config["std"]) * stnormal_grid + float(
param_config["mean"]
)
if "step" in param_config.keys() and lognormal is False:
return round_to_discreate_grid_normal(stnormal_grid, param_config)
return stnormal_grid
[docs]
def lognormal_grid(param_config):
"""
get a normal distributed grid given the specifications in the param_config
param_config: config which needs to contain 'num', 'mean', 'std'
"""
grid = 10 ** normal_grid(param_config, lognormal=True)
if "step" in param_config.keys():
return round_to_discreate_grid_normal(grid, param_config)
return grid
[docs]
def add_next_param_from_list(param_grid: dict, grid: dict, grid_df: pd.DataFrame):
"""
can be used in a recoursive fassion to add all combinations of the parameters in
param_grid to grid_df
param_grid: dictionary with all possible values for each parameter
{'p1': [1, 2, 3], 'p2': [0, 5], ...}
grid: a grid which will build itself in the recursion, start with grid = {}
after one step grid = {p1: 1}
grid_df: dataframe which will save the finished grids
task_name: task name
also: G_MODEL_NA name
"""
if len(param_grid.keys()) != 0:
# specify the parameter to be used
param_name = list(param_grid.keys())[0]
# for all values of this parameter perform
for param in param_grid[param_name]:
# add the parameter to the grid
grid_new = dict(grid)
grid_new.update({param_name: param})
# remove the parameter from param_grid
param_grid_new = dict(param_grid)
param_grid_new.pop(param_name)
# resume with the next parameter
add_next_param_from_list(param_grid_new, grid_new, grid_df)
else:
# add sample to grid_df
grid_df.loc[len(grid_df.index)] = [grid]
[docs]
def add_references_and_check_constraints(
grid_df_prior, grid_df, referenced_params, config, task_name
):
"""
in the last step all parameters which are referenced need to be add to the
grid. All gridpoints not satisfying the constraints are removed afterwards.
"""
for dictio in grid_df_prior["params"]:
for key, val in dictio.items():
exec(f"{key} = val")
# add referenced params
for rev_param, val in referenced_params.items():
val = eval(val)
dictio.update({rev_param: val})
exec(f"{rev_param} = val")
# check constraints
if "hyperparameters" in config.keys():
constraints = config["hyperparameters"].get("constraints", None)
else:
constraints = config.get("constraints", None)
if constraints is not None:
accepted = True
for constr in constraints:
if not eval(constr):
accepted = False
if accepted:
grid_df.loc[len(grid_df.index)] = [task_name, config["model"], dictio]
else:
grid_df.loc[len(grid_df.index)] = [task_name, config["model"], dictio]
[docs]
def sample_grid(param_config):
"""
given the parameter config, this function samples all parameters which are distributed
according the the categorical, uniform, loguniform, normal or lognormal distribution.
"""
# sample cathegorical parameter
if param_config["distribution"] == "categorical":
param_grid = sampling.CategoricalHyperparameter("", param_config).allowed_values
# sample uniform parameter
elif param_config["distribution"] == "uniform":
param_grid = uniform_grid(param_config)
# sample loguniform parameter
elif param_config["distribution"] == "loguniform":
param_grid = loguniform_grid(param_config)
# sample normal parameter
elif param_config["distribution"] == "normal":
param_grid = normal_grid(param_config)
# sample lognormal parameter
elif param_config["distribution"] == "lognormal":
param_grid = lognormal_grid(param_config)
else:
raise RuntimeError(
f'distribution "{param_config["distribution"]}" not '
f"implemented use a distribution from "
f"[categorical, uniform, loguniform, normal, lognormal]"
)
# ensure that the gird does have the correct datatype
# (only check for int, othervise float is used)
if "datatype" in param_config.keys():
if param_config["datatype"] == "int":
param_grid = np.array(param_grid)
param_grid = param_grid.astype(int)
# NOTE: converting int to float will cause error for VAE, avoid do
# it here
return param_grid
[docs]
def build_param_grid_of_shared_params(shared_df):
"""
go back from the data frame format of the shared hyperparamters to a list format
"""
if shared_df is None:
return None
shared_grid = {}
for key in shared_df["params"].iloc[0].keys():
grid_points = []
for i in shared_df["params"].keys():
grid_points.append(shared_df["params"][i][key])
shared_grid[key] = np.array(grid_points)
return shared_grid
[docs]
def rais_error_if_num_not_specified(param_name: str, param_config: dict):
"""
for each parameter a number of grid points needs to be specified
This function raises an error if this is not the case
param_name: parameter name under consideration
param_config: config of this parameter
"""
if param_name == g_name_num_shared_param_samples_rand_search:
raise RuntimeError(f"{g_name_num_shared_param_samples_rand_search} only for random search!")
if not param_name == "constraints":
if (
not "num" in param_config.keys()
and not "reference" in param_config.keys()
and not param_config["distribution"] == "categorical"
):
raise RuntimeError(
f"the number of parameters in the grid direction "
f"of {param_name} needs to be specified"
)
[docs]
def add_shared_params_to_param_grids(shared_df, dict_param_grids, config):
"""
use the parameters in the dataframe of shared parameters and add them
to the dictionary of parameters for the current task
only the shared parameters specified in the config are respected
shared_df: Dataframe of shared hyperparameters
dict_param_grids: dictionary of the parameter grids
config: config for the current task
"""
dict_shared_grid = build_param_grid_of_shared_params(shared_df)
if "shared" in config.keys():
list_names = config["shared"]
dict_shared_grid = {key: dict_shared_grid[key] for key in config["shared"]}
if dict_shared_grid is not None:
for key in dict_shared_grid.keys():
dict_param_grids[key] = dict_shared_grid[key]
return dict_param_grids
[docs]
def grid_task(
grid_df: pd.DataFrame, task_name: str, config: dict, shared_df: pd.DataFrame
):
"""create grid for one sampling task for a method and add it to the dataframe"""
if "hyperparameters" in config.keys():
dict_param_grids = {}
referenced_params = {}
for param_name in config["hyperparameters"].keys():
param_config = config["hyperparameters"][param_name]
rais_error_if_num_not_specified(param_name, param_config)
# constraints are not parameters
if not param_name == "constraints":
# remember all parameters which are reverenced
if "datatype" not in param_config.keys():
warnings.warn(
f"datatype not specified in {param_config} \
for {param_name}, take float as default"
)
param_config["datatype"] = "float"
if "reference" in param_config.keys():
referenced_params.update({param_name: param_config["reference"]})
# sample other parameter
elif param_name != "constraints":
dict_param_grids.update({param_name: sample_grid(param_config)})
# create the grid from the individual parameter grids
# constraints are not respected in this step
grid_df_prior = pd.DataFrame(columns=["params"])
# add shared parameters to dict_param_grids
dict_param_grids = add_shared_params_to_param_grids(
shared_df, dict_param_grids, config
)
add_next_param_from_list(dict_param_grids, {}, grid_df_prior)
# add referenced params and check constraints
add_references_and_check_constraints(
grid_df_prior, grid_df, referenced_params, config, task_name
)
if grid_df[grid_df[G_MODEL_NA] == config["model"]].shape[0] == 0:
raise RuntimeError(
"No valid value found for this grid spacing, refine grid"
)
return grid_df
elif "shared" in config.keys():
shared_grid = shared_df.copy()
shared_grid[G_MODEL_NA] = config["model"]
shared_grid[G_METHOD_NA] = task_name
if "constraints" in config.keys():
config["hyperparameters"] = {"constraints": config["constraints"]}
add_references_and_check_constraints(
shared_grid, grid_df, {}, config, task_name
)
return grid_df
else:
# add single line if no varying hyperparameters are specified.
grid_df.loc[len(grid_df.index)] = [task_name, config["model"], {}]
return grid_df
[docs]
def sample_gridsearch(config: dict, dest: str = None) -> pd.DataFrame:
"""
create the hyperparameters grid according to the given
config, which should be the dictionary of the full
benchmark config yaml.
Result is saved to 'output_dir/hyperparameters.csv' of the
config if not specified explicitly.
Note: Parts of the yaml content are executed. Thus use this
only with trusted config files.
"""
if dest is None:
dest = config["output_dir"] + os.sep + "hyperparameters.csv"
logger = Logger.get_logger()
samples = pd.DataFrame(columns=[G_METHOD_NA, G_MODEL_NA, "params"])
shared_samples_full = pd.DataFrame(columns=[G_METHOD_NA, G_MODEL_NA, "params"])
if "Shared params" in config.keys():
shared_val = {"model": "all", "hyperparameters": config["Shared params"]}
# fill up the dataframe shared samples
shared_samples_full = grid_task(shared_samples_full, "all", shared_val, None)
else:
shared_samples_full = None
for key, val in config.items():
if sampling.is_dict_with_key(val, "model"):
if shared_samples_full is not None:
shared_samples = shared_samples_full.copy(deep=True)
if "shared" in val.keys():
shared = val["shared"]
else:
shared = []
for line_num in range(shared_samples.shape[0]):
hyper_p_dict = shared_samples.iloc[line_num]["params"].copy()
key_list = copy.deepcopy(list(hyper_p_dict.keys()))
if not all(x in key_list for x in shared):
raise RuntimeError(
f"shared keys: {shared} not included in global shared keys {key_list}"
)
for key_ in key_list:
if key_ not in shared:
del hyper_p_dict[key_]
shared_samples.iloc[line_num]["params"] = hyper_p_dict
# remove all duplicates
shared_samples = shared_samples.drop_duplicates(subset="params")
else:
shared_samples = None
samples = grid_task(samples, key, val, shared_samples)
logger.info(
f"number of gridpoints for {key} : "
f'{samples[samples[G_MODEL_NA] == val["model"]].shape[0]}'
)
os.makedirs(os.path.dirname(dest), exist_ok=True)
logger.info(f"number of total sampled gridpoints: {samples.shape[0]}")
samples.to_csv(dest)
# create a txt file with the commit information
with open(
config["output_dir"] + os.sep + "commit.txt", "w", encoding="utf8"
) as file:
file.writelines("use git log |grep \n")
file.writelines("consider remove leading b in the line below \n")
file.write(get_git_tag())
with open(
config["output_dir"] + os.sep + "config.txt", "w", encoding="utf8"
) as file:
json.dump(config, file)
return samples