Coverage for src/causalspyne/dataset.py: 100%
71 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-05-15 16:30 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-05-15 16:30 +0000
1import numpy as np
2from numpy.random import default_rng
3import matplotlib.pyplot as plt
4import matplotlib.colors as mcolors
5from matplotlib.lines import Line2D
7from causalspyne.dag_interface import MatDAG
8from causalspyne.noise_idiosyncratic import Idiosyncratic
9from causalspyne.data_gen import DataGen
12def simpson(size_sample=200, p=0.2,
13 confounder_effect: float = -5,
14 treatment_effect: float = 0.1,
15 propensity: float = 3,
16 std: float = 1.5):
17 # 0 as confounder: 0->1, 0->2, 1->2
18 mat_weighted_adjacency = np.array(
19 [
20 # 0 1 2 3
21 [0, 0, 0], # V0: confounder, root variable
22 [propensity, 0, 0], # V1: V0->V1, propensity of getting treatment
23 [confounder_effect, treatment_effect, 0], # 2: 0->2, 1->2
24 ]
25 )
27 dag = MatDAG(mat_weighted_adjacency,
28 name_prefix="V",
29 rng=default_rng())
31 confounder = Idiosyncratic(class_name="Bernoulli",
32 dict_params={"p": p},
33 rng=default_rng()
34 )
36 data_gen = DataGen(dag, edge_model=None,
37 dict_params={"std": std},
38 idiosynchratic={0: confounder})
40 arr = data_gen.gen(size_sample)
41 scenario = arr[:, 0] # 1st column for scenario/confounder
42 treatment = arr[:, 1] # 2nd column: treatment
43 effect = arr[:, 2] # 3rd column: effect/performance
44 return scenario, treatment, effect
47def visualize_simpson(scenario, treatment, effect,
48 na_treatment="algorithm", na_confounder="scenario",
49 cut_off=0.75):
50 x = treatment
51 y = effect
52 y = (y - np.min(y)) / (np.max(y) - np.min(y))
54 ints_scenarios = np.unique(scenario)
56 median_treatment = np.quantile(x, cut_off)
58 discrete_treatment = np.zeros_like(x, dtype=int) # discrete_treatment 0
59 discrete_treatment[
60 (scenario == ints_scenarios[0]) & (x > median_treatment)] = 1
61 discrete_treatment[
62 (scenario == ints_scenarios[1]) & (x > median_treatment)] = 1
63 # arr_discrete_aug = np.column_stack((arr, di[crete_treatment))
65 ints_treatment = np.unique(discrete_treatment)
67 colors = ['green', 'orange']
68 cmap = mcolors.ListedColormap(colors)
69 # Boundaries separate the two values: 0 and 1
70 bounds = [-0.5, 0.5, 1.5]
71 norm = mcolors.BoundaryNorm(bounds, cmap.N)
72 marker_map = {ints_scenarios[0]: 'o', ints_scenarios[1]: 's'}
74 # fig, axs = plt.subplots(2, 2); ax=axs[0]
75 fig, ax = plt.subplots()
76 for ind_scenario in ints_scenarios:
77 idx = np.where(scenario == ind_scenario)
78 scatter = ax.scatter(
79 x[idx], y[idx], c=discrete_treatment[idx],
80 marker=marker_map[ind_scenario],
81 edgecolor='k',
82 label=f'scenario {ind_scenario}',
83 cmap=cmap, norm=norm, s=100)
85 ax.set_xlabel(f'jittered {na_treatment} w.r.t. {na_confounder}')
86 ax.set_ylabel('performance')
87 ax.tick_params(axis='x', labelbottom=False)
88 ax.tick_params(axis='y', labelleft=True)
89 cbar = fig.colorbar(scatter, boundaries=bounds,
90 ticks=[ints_treatment[0], ints_treatment[1]])
91 cbar.ax.set_yticklabels([f'{na_treatment} {ints_treatment[0]}',
92 f'{na_treatment} {ints_treatment[1]}'])
93 proxy_o = Line2D([0], [0], marker='o', color='black', linestyle='None',
94 markerfacecolor='none')
95 proxy_s = Line2D([0], [0], marker='s', color='black', linestyle='None',
96 markerfacecolor='none')
97 ax.legend(title=f'{na_confounder}', handles=[proxy_o, proxy_s],
98 labels=[f'{na_confounder} {ints_scenarios[0]}',
99 f'{na_confounder} {ints_scenarios[1]}'])
100 ax.set_title("jittered scatter plot")
101 fig.savefig("simpson_jitter.pdf")
103 fig, axs = plt.subplots(2, 2)
104 grouped_data = [y[discrete_treatment == g] for g in ints_treatment]
105 axs[0, 1].boxplot(grouped_data, tick_labels=ints_treatment)
106 axs[0, 1].set_title(f"{na_confounder}s combined")
107 axs[0, 1].set_xlabel(f'{na_treatment}')
108 axs[0, 1].set_ylabel('performance')
109 y0 = y[scenario == ints_scenarios[0]]
110 discrete_treatment0 = discrete_treatment[scenario == ints_scenarios[0]]
111 grouped_data0 = [y0[discrete_treatment0 == g] for g in ints_treatment]
112 axs[1, 0].boxplot(grouped_data0, tick_labels=ints_treatment)
113 axs[1, 0].set_title(f"{na_confounder} {ints_scenarios[0]}")
114 axs[1, 0].set_xlabel(f'{na_treatment}')
115 axs[1, 0].set_ylabel('performance')
117 y1 = y[scenario == ints_scenarios[1]]
118 discrete_treatment1 = discrete_treatment[scenario == ints_scenarios[1]]
119 grouped_data1 = [y1[discrete_treatment1 == g] for g in ints_treatment]
120 axs[1, 1].boxplot(grouped_data1, tick_labels=ints_treatment)
121 axs[1, 1].set_title(f"{na_confounder} {ints_scenarios[1]}")
122 axs[1, 1].set_xlabel(f'{na_treatment}')
123 axs[1, 1].set_ylabel('performance')
125 fig.suptitle('simpson treatment effect')
126 fig.tight_layout()
127 return fig