Coverage for causalspyne/dag_manipulator.py: 95%
40 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-19 14:58 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-19 14:58 +0000
1"""
2insert edges to have more confounders
3"""
4import numpy as np
7class DAGManipulator:
8 """
9 insert edges to have more confounders
10 """
11 def __init__(self, dag, obj_gen_weight, rng):
12 self.dag = dag
13 self._obj_gen_weight = obj_gen_weight
14 self.rng = rng
16 def mk_confound(self, ind_arbitrary_confound_input=None,
17 continue_top_up=True,
18 skip_sink=False):
19 """
20 ind_arbitrary_confound_input is the arbitrary index for a node, if
21 does not set as argument, a random index will be generated, the
22 function make the current vertex confounder
23 continue_top_up: if not successful, move up toplogically to try again
24 skip_sink: if a node is sink node (can be a high toplogical order),
25 then does not add edge to other nodes with lower toplogical order
26 """
27 ind_arbitrary_confound = ind_arbitrary_confound_input
28 if ind_arbitrary_confound is None:
29 ind_arbitrary_confound = \
30 self.rng.choice(self.dag.list_ind_nodes_sorted)
31 # list_ind_nodes_sorted looks like [8, 3, 10, 9, 100],
32 # each entry is an arbitrary index/name for the node
33 # rembmer (i,j) entry means there is arrow from j to i
34 # count how many children this current node has
35 nnzero = np.count_nonzero(
36 self.dag.mat_adjacency[:, ind_arbitrary_confound])
37 flag_success = False
38 if nnzero == 0: # 0 means sink node
39 # sink node can also be quite high in toplogical rank
40 if not skip_sink:
41 flag_success_1 = self.add_new_edge(ind_arbitrary_confound)
42 flag_success_2 = self.add_new_edge(ind_arbitrary_confound)
43 flag_success = flag_success_1 & flag_success_2
44 elif nnzero == 1: # not a sink node but only parent a single child
45 # add another child to this current node
46 flag_success = self.add_new_edge(ind_arbitrary_confound)
47 # not successul in making this node to have two children
48 else: # already a confounder
49 flag_success = False # only when new confounder
50 if flag_success:
51 return True
52 # now flag_success must be False
53 if continue_top_up:
54 # increase the toplogical order and try until success
55 pos = self.dag.global_arbitrary_ind2topind(ind_arbitrary_confound)
56 if pos - 1 < 0:
57 return False
58 ind_arbitrary = self.dag.top_ind2global_arbitrary(pos - 1)
59 return self.mk_confound(
60 ind_arbitrary_confound_input=ind_arbitrary)
61 return False
63 def add_new_edge(self, ind_arbitrary_confound):
64 """
65 insert a new edge pointing to lower toplogical order
66 """
67 # get toplogical order of this node
68 pos = self.dag.global_arbitrary_ind2topind(ind_arbitrary_confound)
69 # randomly choose one node which ranked later than the
70 # current node
71 if pos + 1 == self.dag.num_nodes:
72 return False
73 ind_toplogical_arrow_head = self.rng.integers(
74 pos + 1, self.dag.num_nodes)
75 ind_arbitrary_arrow_head = self.dag.top_ind2global_arbitrary(
76 ind_toplogical_arrow_head)
78 if (
79 self.dag.mat_adjacency[
80 ind_arbitrary_arrow_head, ind_arbitrary_confound]
81 == 0
82 ):
83 self.dag.mat_adjacency[
84 ind_arbitrary_arrow_head, ind_arbitrary_confound
85 ] = self._obj_gen_weight.gen(1)
86 self.dag.check() # check if still a DAG
87 return True
88 # in very rare cases, the randomly chosen node is already a
89 # child of this current node
90 return False