Coverage for src/causalspyne/dataset.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-05-15 16:30 +0000

1import numpy as np 

2from numpy.random import default_rng 

3import matplotlib.pyplot as plt 

4import matplotlib.colors as mcolors 

5from matplotlib.lines import Line2D 

6 

7from causalspyne.dag_interface import MatDAG 

8from causalspyne.noise_idiosyncratic import Idiosyncratic 

9from causalspyne.data_gen import DataGen 

10 

11 

12def simpson(size_sample=200, p=0.2, 

13 confounder_effect: float = -5, 

14 treatment_effect: float = 0.1, 

15 propensity: float = 3, 

16 std: float = 1.5): 

17 # 0 as confounder: 0->1, 0->2, 1->2 

18 mat_weighted_adjacency = np.array( 

19 [ 

20 # 0 1 2 3 

21 [0, 0, 0], # V0: confounder, root variable 

22 [propensity, 0, 0], # V1: V0->V1, propensity of getting treatment 

23 [confounder_effect, treatment_effect, 0], # 2: 0->2, 1->2 

24 ] 

25 ) 

26 

27 dag = MatDAG(mat_weighted_adjacency, 

28 name_prefix="V", 

29 rng=default_rng()) 

30 

31 confounder = Idiosyncratic(class_name="Bernoulli", 

32 dict_params={"p": p}, 

33 rng=default_rng() 

34 ) 

35 

36 data_gen = DataGen(dag, edge_model=None, 

37 dict_params={"std": std}, 

38 idiosynchratic={0: confounder}) 

39 

40 arr = data_gen.gen(size_sample) 

41 scenario = arr[:, 0] # 1st column for scenario/confounder 

42 treatment = arr[:, 1] # 2nd column: treatment 

43 effect = arr[:, 2] # 3rd column: effect/performance 

44 return scenario, treatment, effect 

45 

46 

47def visualize_simpson(scenario, treatment, effect, 

48 na_treatment="algorithm", na_confounder="scenario", 

49 cut_off=0.75): 

50 x = treatment 

51 y = effect 

52 y = (y - np.min(y)) / (np.max(y) - np.min(y)) 

53 

54 ints_scenarios = np.unique(scenario) 

55 

56 median_treatment = np.quantile(x, cut_off) 

57 

58 discrete_treatment = np.zeros_like(x, dtype=int) # discrete_treatment 0 

59 discrete_treatment[ 

60 (scenario == ints_scenarios[0]) & (x > median_treatment)] = 1 

61 discrete_treatment[ 

62 (scenario == ints_scenarios[1]) & (x > median_treatment)] = 1 

63 # arr_discrete_aug = np.column_stack((arr, di[crete_treatment)) 

64 

65 ints_treatment = np.unique(discrete_treatment) 

66 

67 colors = ['green', 'orange'] 

68 cmap = mcolors.ListedColormap(colors) 

69 # Boundaries separate the two values: 0 and 1 

70 bounds = [-0.5, 0.5, 1.5] 

71 norm = mcolors.BoundaryNorm(bounds, cmap.N) 

72 marker_map = {ints_scenarios[0]: 'o', ints_scenarios[1]: 's'} 

73 

74 # fig, axs = plt.subplots(2, 2); ax=axs[0] 

75 fig, ax = plt.subplots() 

76 for ind_scenario in ints_scenarios: 

77 idx = np.where(scenario == ind_scenario) 

78 scatter = ax.scatter( 

79 x[idx], y[idx], c=discrete_treatment[idx], 

80 marker=marker_map[ind_scenario], 

81 edgecolor='k', 

82 label=f'scenario {ind_scenario}', 

83 cmap=cmap, norm=norm, s=100) 

84 

85 ax.set_xlabel(f'jittered {na_treatment} w.r.t. {na_confounder}') 

86 ax.set_ylabel('performance') 

87 ax.tick_params(axis='x', labelbottom=False) 

88 ax.tick_params(axis='y', labelleft=True) 

89 cbar = fig.colorbar(scatter, boundaries=bounds, 

90 ticks=[ints_treatment[0], ints_treatment[1]]) 

91 cbar.ax.set_yticklabels([f'{na_treatment} {ints_treatment[0]}', 

92 f'{na_treatment} {ints_treatment[1]}']) 

93 proxy_o = Line2D([0], [0], marker='o', color='black', linestyle='None', 

94 markerfacecolor='none') 

95 proxy_s = Line2D([0], [0], marker='s', color='black', linestyle='None', 

96 markerfacecolor='none') 

97 ax.legend(title=f'{na_confounder}', handles=[proxy_o, proxy_s], 

98 labels=[f'{na_confounder} {ints_scenarios[0]}', 

99 f'{na_confounder} {ints_scenarios[1]}']) 

100 ax.set_title("jittered scatter plot") 

101 fig.savefig("simpson_jitter.pdf") 

102 

103 fig, axs = plt.subplots(2, 2) 

104 grouped_data = [y[discrete_treatment == g] for g in ints_treatment] 

105 axs[0, 1].boxplot(grouped_data, tick_labels=ints_treatment) 

106 axs[0, 1].set_title(f"{na_confounder}s combined") 

107 axs[0, 1].set_xlabel(f'{na_treatment}') 

108 axs[0, 1].set_ylabel('performance') 

109 y0 = y[scenario == ints_scenarios[0]] 

110 discrete_treatment0 = discrete_treatment[scenario == ints_scenarios[0]] 

111 grouped_data0 = [y0[discrete_treatment0 == g] for g in ints_treatment] 

112 axs[1, 0].boxplot(grouped_data0, tick_labels=ints_treatment) 

113 axs[1, 0].set_title(f"{na_confounder} {ints_scenarios[0]}") 

114 axs[1, 0].set_xlabel(f'{na_treatment}') 

115 axs[1, 0].set_ylabel('performance') 

116 

117 y1 = y[scenario == ints_scenarios[1]] 

118 discrete_treatment1 = discrete_treatment[scenario == ints_scenarios[1]] 

119 grouped_data1 = [y1[discrete_treatment1 == g] for g in ints_treatment] 

120 axs[1, 1].boxplot(grouped_data1, tick_labels=ints_treatment) 

121 axs[1, 1].set_title(f"{na_confounder} {ints_scenarios[1]}") 

122 axs[1, 1].set_xlabel(f'{na_treatment}') 

123 axs[1, 1].set_ylabel('performance') 

124 

125 fig.suptitle('simpson treatment effect') 

126 fig.tight_layout() 

127 return fig