Skip to content

Temporal Graph

This class is used to get the temporal graph of a word.

Methods:

Name Description
add_graph

str, level: int, k: int, c: int, dataset: List[str], word2vec_model: Word2VecInference, mlm_model: Union[RobertaInference, BertInference]) This method is used to add a snapshot to the temporal graph.

get_aligned_graph

dict, previous_graph: dict) -> (dict, dict) This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.

label_previous_graph

dict, previous_graph: dict, label_feature_idx: int = 1) -> (np.ndarray, np.ndarray) This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.

Source code in semantics/graphs/temporal_graph.py
class TemporalGraph:
    """
    This class is used to get the temporal graph of a word.

    methods:
        __init__(self)
            The constructor of the TemporalGraph class.
        __getitem__(self, idx)
            Retrieves the snapshot at the specified index.
        add_graph(self, target_word: str, level: int, k: int, c: int, dataset: List[str], word2vec_model: Word2VecInference, mlm_model: Union[RobertaInference, BertInference])
            This method is used to add a snapshot to the temporal graph.
        construct_graph(self, current_index, current_node_feature_matrix, current_embeddings, current_edge_index, current_edge_feature_matrix)
            This method is used to construct the temporal graph.
        get_aligned_graph(self, current_graph: dict, previous_graph: dict) -> (dict, dict)
            This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.
        label_previous_graph(self, current_graph: dict, previous_graph: dict, label_feature_idx: int = 1) -> (np.ndarray, np.ndarray)
            This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.
    """
    def __init__(
            self
            ):

        """
        Attributes:
            snapshots (List[dict]): the snapshots of the temporal graph. Each snapshot is a dictionary containing the index of the nodes of the snapshot.
            xs (List[np.ndarray]): the features of the nodes of the temporal graph.
            edge_indices (List[np.ndarray]): the edge index of the temporal graph.
            edge_features (List[np.ndarray]): the edge features of the temporal graph.
            ys (List[np.ndarray]): the labels of the edges of the temporal graph.
            y_indices (List[np.ndarray]): the indices of the labels of the edges of the temporal graph.

        """

        self.snapshots = []
        self.xs = []
        self.edge_indices = []
        self.edge_features = []
        self.ys = []
        self.y_indices = []

    def __getitem__(self, idx):
        """
        Retrieves the snapshot at the specified index.

        Parameters:
            idx (int): Index of the item to retrieve.

        Returns:
            snapshot (dict): the graph data at the specified index.
            node_features (np.ndarray): the features of the nodes of the graph at the specified index.
            edge_index (np.ndarray): the edge index of the graph at the specified index.
            edge_feature (np.ndarray): the edge features of the graph at the specified index.
            labels (np.ndarray): the labels of the edges of the graph at the specified index.
            labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.
        """
        # Get the tokenized inputs at the specified index
        snapshot = self.snapshots[idx]
        node_features = self.xs[idx]
        edge_index = self.edge_indices[idx]
        edge_feature = self.edge_features[idx]
        labels = self.ys[idx]
        labels_mask = self.y_indices[idx]

        return snapshot, node_features, edge_index, edge_feature, labels, labels_mask


    def add_graph(
            self,
            target_word: str, 
            level: int, 
            k: int, 
            c: int,
            dataset: List[str], 
            word2vec_model: Word2VecInference, 
            mlm_model: Union[RobertaInference, BertInference]
            ) -> None:
        """
        This method is used to add a snapshot to the temporal graph.

        Args:
            target_word (str): the word to get the nodes for
            level (int): the level of the graph to get
            k (int): the number of similar nodes to get for each occurrence of the target word
            c (int): the number of context nodes to get for the target word
            dataset (List[str]): the sentences to get the nodes from
            word2vec_model (Word2VecInference): the word2vec model's Inference class
            mlm_model (RobertaInference, BertInference): the MLM model's Inference class

        Examples:
            >>> word2vec = Word2VecInference('word2vec.model')
            >>> mlm = RobertaInference('MLM_roberta')
            >>> tg = TemporalGraph()
            >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence'], word2vec_model = word2vec, mlm_model = mlm)
            >>> snapshot, node_features, edge_index, edge_feature, _, _= tg[0]
            >>> print(snapshot)
            {'index_to_key': {0: 'sentence', 1: 'this', 2: 'is', 3: 'a', 4: 'another'}, 'key_to_index': {'sentence': 0, 'this': 1, 'is': 2, 'a': 3, 'another': 4}
            >>> print(node_features)
            [[0, 0, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [2, 1, 2, ...]]
            >>> print(edge_index)
            [[0, 0, 0, 1, 1, 2, 2, 3, 3, 4], [1, 2, 4, 1, 3, 1, 4, 1, 4, 4]]
            >>> print(edge_feature)
            [[0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0]]

            >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence', 'this is a third sentence'], word2vec_model = word2vec, mlm_model = mlm)
            >>> _, _, _, _, labels, label_mask= tg[0]
            >>> print(labels)
            [[0.9999, 1., 0.0001]]
            >>> print(label_mask)
            [[0, 1, 0], [1, 2, 4]]
        """

        print(f'Adding the nodes of the word graph for the word "{target_word}"...')
        nodes = Nodes(
            target_word= target_word,
            dataset=dataset,
            level= level,
            k= k,
            c= c,
            word2vec_model = word2vec_model,
            mlm_model = mlm_model
            )

        nds = nodes.get_nodes()
        print('Getting their features...', '\n')
        index, node_feature_matrix, embeddings = nodes.get_node_features(nds)
        print(f'Adding the edges of the word graph for the word "{target_word}"...')
        edges = Edges(
            index_to_key=index['index_to_key'],
            node_features=node_feature_matrix,
            node_embeddings=embeddings
        )
        edge_index, edge_feature_matrix = edges.get_edge_features(dataset)

        print('Constructing the temporal graph...', '\n')
        self.construct_graph(
            current_index=index,
            current_node_feature_matrix=node_feature_matrix,
            current_embeddings=embeddings,
            current_edge_index=edge_index,
            current_edge_feature_matrix=edge_feature_matrix
        )


    def construct_graph(
            self, 
            current_index, 
            current_node_feature_matrix, 
            current_embeddings, 
            current_edge_index, 
            current_edge_feature_matrix
            ):

        """
        This method is used to construct the temporal graph.

        Args:
            current_index (dict): the index of the nodes of the current snapshot.
            current_node_feature_matrix (np.ndarray): the features of the nodes of the current snapshot.
            current_embeddings (np.ndarray): the embeddings of the nodes of the current snapshot.
            current_edge_index (np.ndarray): the edge index of the current snapshot.
            current_edge_feature_matrix (np.ndarray): the edge features of the current snapshot.
        """

        if len(self.snapshots) == 0:
            print('Adding the first snapshot to the temporal graph...', '\n')
            self.snapshots.append(current_index)
            self.xs.append(np.concatenate((current_node_feature_matrix, current_embeddings), axis=1))
            self.edge_indices.append(current_edge_index)
            self.edge_features.append(current_edge_feature_matrix)
            self.ys.append([])
            self.y_indices.append([])

        else:
            print(f'Adding the {len(self.snapshots)} snapshot to the temporal graph...', '\n')
            previous_index, previous_node_features, previous_edge_index, previous_edge_feature, _, _= self[-1]
            current_node_features = np.concatenate((current_node_feature_matrix, current_embeddings), axis=1)

            previous_graph = {
                'index': previous_index,
                'node_features': previous_node_features,
                'edge_index': previous_edge_index,
                'edge_features': previous_edge_feature
            }

            current_graph = {
                'index': current_index,
                'node_features': current_node_features,
                'edge_index': current_edge_index,
                'edge_features': current_edge_feature_matrix
            }

            print('Aligning the nodes of the current snapshot with the nodes of the previous snapshot...', '\n')
            aligned_previous_graph, aligned_current_graph = self.get_aligned_graph(current_graph, previous_graph)
            print('Labeling the edges of the previous snapshot with the edge feature values in the current snapshot...', '\n')
            previous_labels, previous_label_mask = self.label_previous_graph(current_graph, previous_graph)

            self.snapshots[-1] = aligned_previous_graph['index']
            self.xs[-1] = aligned_previous_graph['node_features']
            self.edge_indices[-1] = aligned_previous_graph['edge_index']
            self.edge_features[-1] = aligned_previous_graph['edge_features']
            self.ys[-1] = previous_labels
            self.y_indices[-1] = previous_label_mask

            self.snapshots.append(aligned_current_graph['index'])
            self.xs.append(aligned_current_graph['node_features'])
            self.edge_indices.append(aligned_current_graph['edge_index'])
            self.edge_features.append(aligned_current_graph['edge_features'])
            self.ys.append([])
            self.y_indices.append([])



    def get_aligned_graph(
            self, 
            current_graph: dict, 
            previous_graph: dict
            ) -> (dict, dict):

        """
        This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.

        Args:
            current_graph (dict): the current snapshot of the temporal graph to align with the previous snapshot.
            previous_graph (dict): the previous snapshot of the temporal graph to align with the current snapshot.

        Returns:
            aligned_previous_graph (dict): the aligned previous snapshot of the temporal graph.
            aligned_current_graph (dict): the aligned current snapshot of the temporal graph.
        """

        current_index = current_graph['index']
        previous_index = previous_graph['index']

        if current_index == previous_index:
            return current_graph

        current_words = set(current_index['key_to_index'].keys())
        previous_words = set(previous_index['key_to_index'].keys())

        dynamic_graph = current_words != previous_words

        if not dynamic_graph:
            index_mapping = {current_index['key_to_index'][key]: previous_index['key_to_index'][key] for key in current_index['key_to_index']}

            reordered_node_feature_matrix = np.zeros_like(current_graph['node_features'])
            for current_idx, previous_idx in index_mapping.items():
                reordered_node_feature_matrix[previous_idx] = current_graph['node_features'][current_idx]


            updated_edge_index = np.zeros_like(current_graph['edge_index'])
            for i in range(current_graph['edge_index'].shape[1]):
                updated_edge_index[0, i] = index_mapping.get(current_graph['edge_index'][0, i], -1)
                updated_edge_index[1, i] = index_mapping.get(current_graph['edge_index'][1, i], -1)
            # Remove edges where one of the nodes does not exist anymore (indicated by -1)
            updated_edge_index = updated_edge_index[:, ~(updated_edge_index == -1).any(axis=0)]

            aligned_current_graph = {
                'index': previous_graph['index'],
                'node_features': reordered_node_feature_matrix,
                'edge_index': updated_edge_index,
                'edge_features': current_graph['edge_features']
            }
            return previous_graph, aligned_current_graph


        else:
            all_words = current_words | previous_words
            unified_dict = {word: idx for idx, word in enumerate(all_words)}
            unified_dict_reverse = {idx: word for idx, word in enumerate(all_words)}
            reordered_index = {'index_to_key': unified_dict_reverse, 'key_to_index': unified_dict}

            reordered_previous_node_feature_matrix = np.zeros((len(unified_dict), previous_graph['node_features'].shape[1]))
            for word, index in previous_index['key_to_index'].items():
                if word in unified_dict:
                    reordered_previous_node_feature_matrix[unified_dict[word]] = previous_graph['node_features'][index]


            reordered_current_node_feature_matrix = np.zeros((len(unified_dict), current_graph['node_features'].shape[1]))
            for word, index in current_index['key_to_index'].items():
                if word in unified_dict:
                    reordered_current_node_feature_matrix[unified_dict[word]] = current_graph['node_features'][index]


            # Mapping old indices to new indices for the previous dictionary
            previous_index_mapping = {old_index: unified_dict[word] for word, old_index in previous_index['key_to_index'].items()}
            updated_previous_edge_index = np.array(previous_graph['edge_index'])
            for i in range(previous_graph['edge_index'].shape[1]):
                updated_previous_edge_index[0, i] = previous_index_mapping.get(previous_graph['edge_index'][0, i], -1)
                updated_previous_edge_index[1, i] = previous_index_mapping.get(previous_graph['edge_index'][1, i], -1)
            # Remove edges where one of the nodes does not exist anymore (indicated by -1)
            updated_previous_edge_index = updated_previous_edge_index[:, ~(updated_previous_edge_index == -1).any(axis=0)]

            # Mapping old indices to new indices for the current dictionary
            current_index_mapping = {old_index: unified_dict[word] for word, old_index in current_index['key_to_index'].items()}
            updated_current_edge_index = np.array(current_graph['edge_index'])
            for i in range(current_graph['edge_index'].shape[1]):
                updated_current_edge_index[0, i] = current_index_mapping.get(current_graph['edge_index'][0, i], -1)
                updated_current_edge_index[1, i] = current_index_mapping.get(current_graph['edge_index'][1, i], -1)
            # Remove edges where one of the nodes does not exist anymore (indicated by -1)
            updated_current_edge_index = updated_current_edge_index[:, ~(updated_current_edge_index == -1).any(axis=0)]

            aligned_previous_graph = {
                'index': reordered_index,
                'node_features': reordered_previous_node_feature_matrix,
                'edge_index': updated_previous_edge_index,
                'edge_features': previous_graph['edge_features']
            }

            aligned_current_graph = {
                'index': reordered_index,
                'node_features': reordered_current_node_feature_matrix,
                'edge_index': updated_current_edge_index,
                'edge_features': current_graph['edge_features']
            }

            return aligned_previous_graph, aligned_current_graph



    def label_previous_graph(
            self,
            current_graph: dict,
            previous_graph: dict,
            label_feature_idx: int = 1
            ) -> (np.ndarray, np.ndarray):
        """
        This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.

        Args:
            current_graph (dict): the current snapshot of the temporal graph to use for labeling the previous snapshot.
            previous_graph (dict): the previous snapshot of the temporal graph to label.
            label_feature_idx (int): the index of the feature to use as labels. Default: 1.

        Returns:
            labels (np.ndarray): the labels of the edges of the graph at the specified index.
            labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.
        """

        current_edge_index = current_graph['edge_index']
        current_edge_features = current_graph['edge_features']

        previous_edge_index = previous_graph['edge_index']

        previous_edges = [tuple(edge) for edge in previous_edge_index.T]
        current_edges  = [tuple(edge) for edge in current_edge_index.T]

        labels = []

        label_mask_1 = []
        label_mask_2 = []

        for i, previous_edge in enumerate(previous_edges):
            if previous_edge in current_edges:
                label_mask_1.append(previous_edge[0])
                label_mask_2.append(previous_edge[1])

                current_index = current_edges.index(previous_edge)
                labels.append(current_edge_features[current_index][label_feature_idx])

        label_mask = np.stack([label_mask_1, label_mask_2])
        labels = np.array(labels)

        return labels, label_mask

__getitem__(idx)

Retrieves the snapshot at the specified index.

Parameters:

Name Type Description Default
idx int

Index of the item to retrieve.

required

Returns:

Name Type Description
snapshot dict

the graph data at the specified index.

node_features ndarray

the features of the nodes of the graph at the specified index.

edge_index ndarray

the edge index of the graph at the specified index.

edge_feature ndarray

the edge features of the graph at the specified index.

labels ndarray

the labels of the edges of the graph at the specified index.

labels_mask ndarray

the indices of the labels of the edges of the graph at the specified index.

Source code in semantics/graphs/temporal_graph.py
def __getitem__(self, idx):
    """
    Retrieves the snapshot at the specified index.

    Parameters:
        idx (int): Index of the item to retrieve.

    Returns:
        snapshot (dict): the graph data at the specified index.
        node_features (np.ndarray): the features of the nodes of the graph at the specified index.
        edge_index (np.ndarray): the edge index of the graph at the specified index.
        edge_feature (np.ndarray): the edge features of the graph at the specified index.
        labels (np.ndarray): the labels of the edges of the graph at the specified index.
        labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.
    """
    # Get the tokenized inputs at the specified index
    snapshot = self.snapshots[idx]
    node_features = self.xs[idx]
    edge_index = self.edge_indices[idx]
    edge_feature = self.edge_features[idx]
    labels = self.ys[idx]
    labels_mask = self.y_indices[idx]

    return snapshot, node_features, edge_index, edge_feature, labels, labels_mask

__init__()

Attributes:

Name Type Description
snapshots List[dict]

the snapshots of the temporal graph. Each snapshot is a dictionary containing the index of the nodes of the snapshot.

xs List[ndarray]

the features of the nodes of the temporal graph.

edge_indices List[ndarray]

the edge index of the temporal graph.

edge_features List[ndarray]

the edge features of the temporal graph.

ys List[ndarray]

the labels of the edges of the temporal graph.

y_indices List[ndarray]

the indices of the labels of the edges of the temporal graph.

Source code in semantics/graphs/temporal_graph.py
def __init__(
        self
        ):

    """
    Attributes:
        snapshots (List[dict]): the snapshots of the temporal graph. Each snapshot is a dictionary containing the index of the nodes of the snapshot.
        xs (List[np.ndarray]): the features of the nodes of the temporal graph.
        edge_indices (List[np.ndarray]): the edge index of the temporal graph.
        edge_features (List[np.ndarray]): the edge features of the temporal graph.
        ys (List[np.ndarray]): the labels of the edges of the temporal graph.
        y_indices (List[np.ndarray]): the indices of the labels of the edges of the temporal graph.

    """

    self.snapshots = []
    self.xs = []
    self.edge_indices = []
    self.edge_features = []
    self.ys = []
    self.y_indices = []

add_graph(target_word, level, k, c, dataset, word2vec_model, mlm_model)

This method is used to add a snapshot to the temporal graph.

Parameters:

Name Type Description Default
target_word str

the word to get the nodes for

required
level int

the level of the graph to get

required
k int

the number of similar nodes to get for each occurrence of the target word

required
c int

the number of context nodes to get for the target word

required
dataset List[str]

the sentences to get the nodes from

required
word2vec_model Word2VecInference

the word2vec model's Inference class

required
mlm_model (RobertaInference, BertInference)

the MLM model's Inference class

required

Examples:

>>> word2vec = Word2VecInference('word2vec.model')
>>> mlm = RobertaInference('MLM_roberta')
>>> tg = TemporalGraph()
>>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence'], word2vec_model = word2vec, mlm_model = mlm)
>>> snapshot, node_features, edge_index, edge_feature, _, _= tg[0]
>>> print(snapshot)
{'index_to_key': {0: 'sentence', 1: 'this', 2: 'is', 3: 'a', 4: 'another'}, 'key_to_index': {'sentence': 0, 'this': 1, 'is': 2, 'a': 3, 'another': 4}
>>> print(node_features)
[[0, 0, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [2, 1, 2, ...]]
>>> print(edge_index)
[[0, 0, 0, 1, 1, 2, 2, 3, 3, 4], [1, 2, 4, 1, 3, 1, 4, 1, 4, 4]]
>>> print(edge_feature)
[[0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0]]
>>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence', 'this is a third sentence'], word2vec_model = word2vec, mlm_model = mlm)
>>> _, _, _, _, labels, label_mask= tg[0]
>>> print(labels)
[[0.9999, 1., 0.0001]]
>>> print(label_mask)
[[0, 1, 0], [1, 2, 4]]
Source code in semantics/graphs/temporal_graph.py
def add_graph(
        self,
        target_word: str, 
        level: int, 
        k: int, 
        c: int,
        dataset: List[str], 
        word2vec_model: Word2VecInference, 
        mlm_model: Union[RobertaInference, BertInference]
        ) -> None:
    """
    This method is used to add a snapshot to the temporal graph.

    Args:
        target_word (str): the word to get the nodes for
        level (int): the level of the graph to get
        k (int): the number of similar nodes to get for each occurrence of the target word
        c (int): the number of context nodes to get for the target word
        dataset (List[str]): the sentences to get the nodes from
        word2vec_model (Word2VecInference): the word2vec model's Inference class
        mlm_model (RobertaInference, BertInference): the MLM model's Inference class

    Examples:
        >>> word2vec = Word2VecInference('word2vec.model')
        >>> mlm = RobertaInference('MLM_roberta')
        >>> tg = TemporalGraph()
        >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence'], word2vec_model = word2vec, mlm_model = mlm)
        >>> snapshot, node_features, edge_index, edge_feature, _, _= tg[0]
        >>> print(snapshot)
        {'index_to_key': {0: 'sentence', 1: 'this', 2: 'is', 3: 'a', 4: 'another'}, 'key_to_index': {'sentence': 0, 'this': 1, 'is': 2, 'a': 3, 'another': 4}
        >>> print(node_features)
        [[0, 0, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [2, 1, 2, ...]]
        >>> print(edge_index)
        [[0, 0, 0, 1, 1, 2, 2, 3, 3, 4], [1, 2, 4, 1, 3, 1, 4, 1, 4, 4]]
        >>> print(edge_feature)
        [[0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0]]

        >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence', 'this is a third sentence'], word2vec_model = word2vec, mlm_model = mlm)
        >>> _, _, _, _, labels, label_mask= tg[0]
        >>> print(labels)
        [[0.9999, 1., 0.0001]]
        >>> print(label_mask)
        [[0, 1, 0], [1, 2, 4]]
    """

    print(f'Adding the nodes of the word graph for the word "{target_word}"...')
    nodes = Nodes(
        target_word= target_word,
        dataset=dataset,
        level= level,
        k= k,
        c= c,
        word2vec_model = word2vec_model,
        mlm_model = mlm_model
        )

    nds = nodes.get_nodes()
    print('Getting their features...', '\n')
    index, node_feature_matrix, embeddings = nodes.get_node_features(nds)
    print(f'Adding the edges of the word graph for the word "{target_word}"...')
    edges = Edges(
        index_to_key=index['index_to_key'],
        node_features=node_feature_matrix,
        node_embeddings=embeddings
    )
    edge_index, edge_feature_matrix = edges.get_edge_features(dataset)

    print('Constructing the temporal graph...', '\n')
    self.construct_graph(
        current_index=index,
        current_node_feature_matrix=node_feature_matrix,
        current_embeddings=embeddings,
        current_edge_index=edge_index,
        current_edge_feature_matrix=edge_feature_matrix
    )

construct_graph(current_index, current_node_feature_matrix, current_embeddings, current_edge_index, current_edge_feature_matrix)

This method is used to construct the temporal graph.

Parameters:

Name Type Description Default
current_index dict

the index of the nodes of the current snapshot.

required
current_node_feature_matrix ndarray

the features of the nodes of the current snapshot.

required
current_embeddings ndarray

the embeddings of the nodes of the current snapshot.

required
current_edge_index ndarray

the edge index of the current snapshot.

required
current_edge_feature_matrix ndarray

the edge features of the current snapshot.

required
Source code in semantics/graphs/temporal_graph.py
def construct_graph(
        self, 
        current_index, 
        current_node_feature_matrix, 
        current_embeddings, 
        current_edge_index, 
        current_edge_feature_matrix
        ):

    """
    This method is used to construct the temporal graph.

    Args:
        current_index (dict): the index of the nodes of the current snapshot.
        current_node_feature_matrix (np.ndarray): the features of the nodes of the current snapshot.
        current_embeddings (np.ndarray): the embeddings of the nodes of the current snapshot.
        current_edge_index (np.ndarray): the edge index of the current snapshot.
        current_edge_feature_matrix (np.ndarray): the edge features of the current snapshot.
    """

    if len(self.snapshots) == 0:
        print('Adding the first snapshot to the temporal graph...', '\n')
        self.snapshots.append(current_index)
        self.xs.append(np.concatenate((current_node_feature_matrix, current_embeddings), axis=1))
        self.edge_indices.append(current_edge_index)
        self.edge_features.append(current_edge_feature_matrix)
        self.ys.append([])
        self.y_indices.append([])

    else:
        print(f'Adding the {len(self.snapshots)} snapshot to the temporal graph...', '\n')
        previous_index, previous_node_features, previous_edge_index, previous_edge_feature, _, _= self[-1]
        current_node_features = np.concatenate((current_node_feature_matrix, current_embeddings), axis=1)

        previous_graph = {
            'index': previous_index,
            'node_features': previous_node_features,
            'edge_index': previous_edge_index,
            'edge_features': previous_edge_feature
        }

        current_graph = {
            'index': current_index,
            'node_features': current_node_features,
            'edge_index': current_edge_index,
            'edge_features': current_edge_feature_matrix
        }

        print('Aligning the nodes of the current snapshot with the nodes of the previous snapshot...', '\n')
        aligned_previous_graph, aligned_current_graph = self.get_aligned_graph(current_graph, previous_graph)
        print('Labeling the edges of the previous snapshot with the edge feature values in the current snapshot...', '\n')
        previous_labels, previous_label_mask = self.label_previous_graph(current_graph, previous_graph)

        self.snapshots[-1] = aligned_previous_graph['index']
        self.xs[-1] = aligned_previous_graph['node_features']
        self.edge_indices[-1] = aligned_previous_graph['edge_index']
        self.edge_features[-1] = aligned_previous_graph['edge_features']
        self.ys[-1] = previous_labels
        self.y_indices[-1] = previous_label_mask

        self.snapshots.append(aligned_current_graph['index'])
        self.xs.append(aligned_current_graph['node_features'])
        self.edge_indices.append(aligned_current_graph['edge_index'])
        self.edge_features.append(aligned_current_graph['edge_features'])
        self.ys.append([])
        self.y_indices.append([])

get_aligned_graph(current_graph, previous_graph)

This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.

Parameters:

Name Type Description Default
current_graph dict

the current snapshot of the temporal graph to align with the previous snapshot.

required
previous_graph dict

the previous snapshot of the temporal graph to align with the current snapshot.

required

Returns:

Name Type Description
aligned_previous_graph dict

the aligned previous snapshot of the temporal graph.

aligned_current_graph dict

the aligned current snapshot of the temporal graph.

Source code in semantics/graphs/temporal_graph.py
def get_aligned_graph(
        self, 
        current_graph: dict, 
        previous_graph: dict
        ) -> (dict, dict):

    """
    This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.

    Args:
        current_graph (dict): the current snapshot of the temporal graph to align with the previous snapshot.
        previous_graph (dict): the previous snapshot of the temporal graph to align with the current snapshot.

    Returns:
        aligned_previous_graph (dict): the aligned previous snapshot of the temporal graph.
        aligned_current_graph (dict): the aligned current snapshot of the temporal graph.
    """

    current_index = current_graph['index']
    previous_index = previous_graph['index']

    if current_index == previous_index:
        return current_graph

    current_words = set(current_index['key_to_index'].keys())
    previous_words = set(previous_index['key_to_index'].keys())

    dynamic_graph = current_words != previous_words

    if not dynamic_graph:
        index_mapping = {current_index['key_to_index'][key]: previous_index['key_to_index'][key] for key in current_index['key_to_index']}

        reordered_node_feature_matrix = np.zeros_like(current_graph['node_features'])
        for current_idx, previous_idx in index_mapping.items():
            reordered_node_feature_matrix[previous_idx] = current_graph['node_features'][current_idx]


        updated_edge_index = np.zeros_like(current_graph['edge_index'])
        for i in range(current_graph['edge_index'].shape[1]):
            updated_edge_index[0, i] = index_mapping.get(current_graph['edge_index'][0, i], -1)
            updated_edge_index[1, i] = index_mapping.get(current_graph['edge_index'][1, i], -1)
        # Remove edges where one of the nodes does not exist anymore (indicated by -1)
        updated_edge_index = updated_edge_index[:, ~(updated_edge_index == -1).any(axis=0)]

        aligned_current_graph = {
            'index': previous_graph['index'],
            'node_features': reordered_node_feature_matrix,
            'edge_index': updated_edge_index,
            'edge_features': current_graph['edge_features']
        }
        return previous_graph, aligned_current_graph


    else:
        all_words = current_words | previous_words
        unified_dict = {word: idx for idx, word in enumerate(all_words)}
        unified_dict_reverse = {idx: word for idx, word in enumerate(all_words)}
        reordered_index = {'index_to_key': unified_dict_reverse, 'key_to_index': unified_dict}

        reordered_previous_node_feature_matrix = np.zeros((len(unified_dict), previous_graph['node_features'].shape[1]))
        for word, index in previous_index['key_to_index'].items():
            if word in unified_dict:
                reordered_previous_node_feature_matrix[unified_dict[word]] = previous_graph['node_features'][index]


        reordered_current_node_feature_matrix = np.zeros((len(unified_dict), current_graph['node_features'].shape[1]))
        for word, index in current_index['key_to_index'].items():
            if word in unified_dict:
                reordered_current_node_feature_matrix[unified_dict[word]] = current_graph['node_features'][index]


        # Mapping old indices to new indices for the previous dictionary
        previous_index_mapping = {old_index: unified_dict[word] for word, old_index in previous_index['key_to_index'].items()}
        updated_previous_edge_index = np.array(previous_graph['edge_index'])
        for i in range(previous_graph['edge_index'].shape[1]):
            updated_previous_edge_index[0, i] = previous_index_mapping.get(previous_graph['edge_index'][0, i], -1)
            updated_previous_edge_index[1, i] = previous_index_mapping.get(previous_graph['edge_index'][1, i], -1)
        # Remove edges where one of the nodes does not exist anymore (indicated by -1)
        updated_previous_edge_index = updated_previous_edge_index[:, ~(updated_previous_edge_index == -1).any(axis=0)]

        # Mapping old indices to new indices for the current dictionary
        current_index_mapping = {old_index: unified_dict[word] for word, old_index in current_index['key_to_index'].items()}
        updated_current_edge_index = np.array(current_graph['edge_index'])
        for i in range(current_graph['edge_index'].shape[1]):
            updated_current_edge_index[0, i] = current_index_mapping.get(current_graph['edge_index'][0, i], -1)
            updated_current_edge_index[1, i] = current_index_mapping.get(current_graph['edge_index'][1, i], -1)
        # Remove edges where one of the nodes does not exist anymore (indicated by -1)
        updated_current_edge_index = updated_current_edge_index[:, ~(updated_current_edge_index == -1).any(axis=0)]

        aligned_previous_graph = {
            'index': reordered_index,
            'node_features': reordered_previous_node_feature_matrix,
            'edge_index': updated_previous_edge_index,
            'edge_features': previous_graph['edge_features']
        }

        aligned_current_graph = {
            'index': reordered_index,
            'node_features': reordered_current_node_feature_matrix,
            'edge_index': updated_current_edge_index,
            'edge_features': current_graph['edge_features']
        }

        return aligned_previous_graph, aligned_current_graph

label_previous_graph(current_graph, previous_graph, label_feature_idx=1)

This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.

Parameters:

Name Type Description Default
current_graph dict

the current snapshot of the temporal graph to use for labeling the previous snapshot.

required
previous_graph dict

the previous snapshot of the temporal graph to label.

required
label_feature_idx int

the index of the feature to use as labels. Default: 1.

1

Returns:

Name Type Description
labels ndarray

the labels of the edges of the graph at the specified index.

labels_mask ndarray

the indices of the labels of the edges of the graph at the specified index.

Source code in semantics/graphs/temporal_graph.py
def label_previous_graph(
        self,
        current_graph: dict,
        previous_graph: dict,
        label_feature_idx: int = 1
        ) -> (np.ndarray, np.ndarray):
    """
    This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.

    Args:
        current_graph (dict): the current snapshot of the temporal graph to use for labeling the previous snapshot.
        previous_graph (dict): the previous snapshot of the temporal graph to label.
        label_feature_idx (int): the index of the feature to use as labels. Default: 1.

    Returns:
        labels (np.ndarray): the labels of the edges of the graph at the specified index.
        labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.
    """

    current_edge_index = current_graph['edge_index']
    current_edge_features = current_graph['edge_features']

    previous_edge_index = previous_graph['edge_index']

    previous_edges = [tuple(edge) for edge in previous_edge_index.T]
    current_edges  = [tuple(edge) for edge in current_edge_index.T]

    labels = []

    label_mask_1 = []
    label_mask_2 = []

    for i, previous_edge in enumerate(previous_edges):
        if previous_edge in current_edges:
            label_mask_1.append(previous_edge[0])
            label_mask_2.append(previous_edge[1])

            current_index = current_edges.index(previous_edge)
            labels.append(current_edge_features[current_index][label_feature_idx])

    label_mask = np.stack([label_mask_1, label_mask_2])
    labels = np.array(labels)

    return labels, label_mask