Coverage for causalspyne/dag_manipulator.py: 95%

40 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-19 14:58 +0000

1""" 

2insert edges to have more confounders 

3""" 

4import numpy as np 

5 

6 

7class DAGManipulator: 

8 """ 

9 insert edges to have more confounders 

10 """ 

11 def __init__(self, dag, obj_gen_weight, rng): 

12 self.dag = dag 

13 self._obj_gen_weight = obj_gen_weight 

14 self.rng = rng 

15 

16 def mk_confound(self, ind_arbitrary_confound_input=None, 

17 continue_top_up=True, 

18 skip_sink=False): 

19 """ 

20 ind_arbitrary_confound_input is the arbitrary index for a node, if 

21 does not set as argument, a random index will be generated, the 

22 function make the current vertex confounder 

23 continue_top_up: if not successful, move up toplogically to try again 

24 skip_sink: if a node is sink node (can be a high toplogical order), 

25 then does not add edge to other nodes with lower toplogical order 

26 """ 

27 ind_arbitrary_confound = ind_arbitrary_confound_input 

28 if ind_arbitrary_confound is None: 

29 ind_arbitrary_confound = \ 

30 self.rng.choice(self.dag.list_ind_nodes_sorted) 

31 # list_ind_nodes_sorted looks like [8, 3, 10, 9, 100], 

32 # each entry is an arbitrary index/name for the node 

33 # rembmer (i,j) entry means there is arrow from j to i 

34 # count how many children this current node has 

35 nnzero = np.count_nonzero( 

36 self.dag.mat_adjacency[:, ind_arbitrary_confound]) 

37 flag_success = False 

38 if nnzero == 0: # 0 means sink node 

39 # sink node can also be quite high in toplogical rank 

40 if not skip_sink: 

41 flag_success_1 = self.add_new_edge(ind_arbitrary_confound) 

42 flag_success_2 = self.add_new_edge(ind_arbitrary_confound) 

43 flag_success = flag_success_1 & flag_success_2 

44 elif nnzero == 1: # not a sink node but only parent a single child 

45 # add another child to this current node 

46 flag_success = self.add_new_edge(ind_arbitrary_confound) 

47 # not successul in making this node to have two children 

48 else: # already a confounder 

49 flag_success = False # only when new confounder 

50 if flag_success: 

51 return True 

52 # now flag_success must be False 

53 if continue_top_up: 

54 # increase the toplogical order and try until success 

55 pos = self.dag.global_arbitrary_ind2topind(ind_arbitrary_confound) 

56 if pos - 1 < 0: 

57 return False 

58 ind_arbitrary = self.dag.top_ind2global_arbitrary(pos - 1) 

59 return self.mk_confound( 

60 ind_arbitrary_confound_input=ind_arbitrary) 

61 return False 

62 

63 def add_new_edge(self, ind_arbitrary_confound): 

64 """ 

65 insert a new edge pointing to lower toplogical order 

66 """ 

67 # get toplogical order of this node 

68 pos = self.dag.global_arbitrary_ind2topind(ind_arbitrary_confound) 

69 # randomly choose one node which ranked later than the 

70 # current node 

71 if pos + 1 == self.dag.num_nodes: 

72 return False 

73 ind_toplogical_arrow_head = self.rng.integers( 

74 pos + 1, self.dag.num_nodes) 

75 ind_arbitrary_arrow_head = self.dag.top_ind2global_arbitrary( 

76 ind_toplogical_arrow_head) 

77 

78 if ( 

79 self.dag.mat_adjacency[ 

80 ind_arbitrary_arrow_head, ind_arbitrary_confound] 

81 == 0 

82 ): 

83 self.dag.mat_adjacency[ 

84 ind_arbitrary_arrow_head, ind_arbitrary_confound 

85 ] = self._obj_gen_weight.gen(1) 

86 self.dag.check() # check if still a DAG 

87 return True 

88 # in very rare cases, the randomly chosen node is already a 

89 # child of this current node 

90 return False