Skip to content

Temporal Graph

This class is used to get the temporal graph of a word.


Name Description

str, level: int, k: int, c: int, dataset: List[str], word2vec_model: Word2VecInference, mlm_model: Union[RobertaInference, BertInference]) This method is used to add a snapshot to the temporal graph.


dict, previous_graph: dict) -> (dict, dict) This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.


dict, previous_graph: dict, label_feature_idx: int = 1) -> (np.ndarray, np.ndarray) This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.

Source code in semantics/graphs/
class TemporalGraph:
    This class is used to get the temporal graph of a word.

            The constructor of the TemporalGraph class.
        __getitem__(self, idx)
            Retrieves the snapshot at the specified index.
        add_graph(self, target_word: str, level: int, k: int, c: int, dataset: List[str], word2vec_model: Word2VecInference, mlm_model: Union[RobertaInference, BertInference])
            This method is used to add a snapshot to the temporal graph.
        construct_graph(self, current_index, current_node_feature_matrix, current_embeddings, current_edge_index, current_edge_feature_matrix)
            This method is used to construct the temporal graph.
        get_aligned_graph(self, current_graph: dict, previous_graph: dict) -> (dict, dict)
            This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.
        label_previous_graph(self, current_graph: dict, previous_graph: dict, label_feature_idx: int = 1) -> (np.ndarray, np.ndarray)
            This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.
    def __init__(

            snapshots (List[dict]): the snapshots of the temporal graph. Each snapshot is a dictionary containing the index of the nodes of the snapshot.
            xs (List[np.ndarray]): the features of the nodes of the temporal graph.
            edge_indices (List[np.ndarray]): the edge index of the temporal graph.
            edge_features (List[np.ndarray]): the edge features of the temporal graph.
            ys (List[np.ndarray]): the labels of the edges of the temporal graph.
            y_indices (List[np.ndarray]): the indices of the labels of the edges of the temporal graph.


        self.snapshots = []
        self.xs = []
        self.edge_indices = []
        self.edge_features = []
        self.ys = []
        self.y_indices = []

    def __getitem__(self, idx):
        Retrieves the snapshot at the specified index.

            idx (int): Index of the item to retrieve.

            snapshot (dict): the graph data at the specified index.
            node_features (np.ndarray): the features of the nodes of the graph at the specified index.
            edge_index (np.ndarray): the edge index of the graph at the specified index.
            edge_feature (np.ndarray): the edge features of the graph at the specified index.
            labels (np.ndarray): the labels of the edges of the graph at the specified index.
            labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.
        # Get the tokenized inputs at the specified index
        snapshot = self.snapshots[idx]
        node_features = self.xs[idx]
        edge_index = self.edge_indices[idx]
        edge_feature = self.edge_features[idx]
        labels = self.ys[idx]
        labels_mask = self.y_indices[idx]

        return snapshot, node_features, edge_index, edge_feature, labels, labels_mask

    def add_graph(
            target_word: str, 
            level: int, 
            k: int, 
            c: int,
            dataset: List[str], 
            word2vec_model: Word2VecInference, 
            mlm_model: Union[RobertaInference, BertInference]
            ) -> None:
        This method is used to add a snapshot to the temporal graph.

            target_word (str): the word to get the nodes for
            level (int): the level of the graph to get
            k (int): the number of similar nodes to get for each occurrence of the target word
            c (int): the number of context nodes to get for the target word
            dataset (List[str]): the sentences to get the nodes from
            word2vec_model (Word2VecInference): the word2vec model's Inference class
            mlm_model (RobertaInference, BertInference): the MLM model's Inference class

            >>> word2vec = Word2VecInference('word2vec.model')
            >>> mlm = RobertaInference('MLM_roberta')
            >>> tg = TemporalGraph()
            >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence'], word2vec_model = word2vec, mlm_model = mlm)
            >>> snapshot, node_features, edge_index, edge_feature, _, _= tg[0]
            >>> print(snapshot)
            {'index_to_key': {0: 'sentence', 1: 'this', 2: 'is', 3: 'a', 4: 'another'}, 'key_to_index': {'sentence': 0, 'this': 1, 'is': 2, 'a': 3, 'another': 4}
            >>> print(node_features)
            [[0, 0, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [2, 1, 2, ...]]
            >>> print(edge_index)
            [[0, 0, 0, 1, 1, 2, 2, 3, 3, 4], [1, 2, 4, 1, 3, 1, 4, 1, 4, 4]]
            >>> print(edge_feature)
            [[0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0]]

            >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence', 'this is a third sentence'], word2vec_model = word2vec, mlm_model = mlm)
            >>> _, _, _, _, labels, label_mask= tg[0]
            >>> print(labels)
            [[0.9999, 1., 0.0001]]
            >>> print(label_mask)
            [[0, 1, 0], [1, 2, 4]]

        print(f'Adding the nodes of the word graph for the word "{target_word}"...')
        nodes = Nodes(
            target_word= target_word,
            level= level,
            k= k,
            c= c,
            word2vec_model = word2vec_model,
            mlm_model = mlm_model

        nds = nodes.get_nodes()
        print('Getting their features...', '\n')
        index, node_feature_matrix, embeddings = nodes.get_node_features(nds)
        print(f'Adding the edges of the word graph for the word "{target_word}"...')
        edges = Edges(
        edge_index, edge_feature_matrix = edges.get_edge_features(dataset)

        print('Constructing the temporal graph...', '\n')

    def construct_graph(

        This method is used to construct the temporal graph.

            current_index (dict): the index of the nodes of the current snapshot.
            current_node_feature_matrix (np.ndarray): the features of the nodes of the current snapshot.
            current_embeddings (np.ndarray): the embeddings of the nodes of the current snapshot.
            current_edge_index (np.ndarray): the edge index of the current snapshot.
            current_edge_feature_matrix (np.ndarray): the edge features of the current snapshot.

        if len(self.snapshots) == 0:
            print('Adding the first snapshot to the temporal graph...', '\n')
            self.xs.append(np.concatenate((current_node_feature_matrix, current_embeddings), axis=1))

            print(f'Adding the {len(self.snapshots)} snapshot to the temporal graph...', '\n')
            previous_index, previous_node_features, previous_edge_index, previous_edge_feature, _, _= self[-1]
            current_node_features = np.concatenate((current_node_feature_matrix, current_embeddings), axis=1)

            previous_graph = {
                'index': previous_index,
                'node_features': previous_node_features,
                'edge_index': previous_edge_index,
                'edge_features': previous_edge_feature

            current_graph = {
                'index': current_index,
                'node_features': current_node_features,
                'edge_index': current_edge_index,
                'edge_features': current_edge_feature_matrix

            print('Aligning the nodes of the current snapshot with the nodes of the previous snapshot...', '\n')
            aligned_previous_graph, aligned_current_graph = self.get_aligned_graph(current_graph, previous_graph)
            print('Labeling the edges of the previous snapshot with the edge feature values in the current snapshot...', '\n')
            previous_labels, previous_label_mask = self.label_previous_graph(current_graph, previous_graph)

            self.snapshots[-1] = aligned_previous_graph['index']
            self.xs[-1] = aligned_previous_graph['node_features']
            self.edge_indices[-1] = aligned_previous_graph['edge_index']
            self.edge_features[-1] = aligned_previous_graph['edge_features']
            self.ys[-1] = previous_labels
            self.y_indices[-1] = previous_label_mask


    def get_aligned_graph(
            current_graph: dict, 
            previous_graph: dict
            ) -> (dict, dict):

        This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.

            current_graph (dict): the current snapshot of the temporal graph to align with the previous snapshot.
            previous_graph (dict): the previous snapshot of the temporal graph to align with the current snapshot.

            aligned_previous_graph (dict): the aligned previous snapshot of the temporal graph.
            aligned_current_graph (dict): the aligned current snapshot of the temporal graph.

        current_index = current_graph['index']
        previous_index = previous_graph['index']

        if current_index == previous_index:
            return current_graph

        current_words = set(current_index['key_to_index'].keys())
        previous_words = set(previous_index['key_to_index'].keys())

        dynamic_graph = current_words != previous_words

        if not dynamic_graph:
            index_mapping = {current_index['key_to_index'][key]: previous_index['key_to_index'][key] for key in current_index['key_to_index']}

            reordered_node_feature_matrix = np.zeros_like(current_graph['node_features'])
            for current_idx, previous_idx in index_mapping.items():
                reordered_node_feature_matrix[previous_idx] = current_graph['node_features'][current_idx]

            updated_edge_index = np.zeros_like(current_graph['edge_index'])
            for i in range(current_graph['edge_index'].shape[1]):
                updated_edge_index[0, i] = index_mapping.get(current_graph['edge_index'][0, i], -1)
                updated_edge_index[1, i] = index_mapping.get(current_graph['edge_index'][1, i], -1)
            # Remove edges where one of the nodes does not exist anymore (indicated by -1)
            updated_edge_index = updated_edge_index[:, ~(updated_edge_index == -1).any(axis=0)]

            aligned_current_graph = {
                'index': previous_graph['index'],
                'node_features': reordered_node_feature_matrix,
                'edge_index': updated_edge_index,
                'edge_features': current_graph['edge_features']
            return previous_graph, aligned_current_graph

            all_words = current_words | previous_words
            unified_dict = {word: idx for idx, word in enumerate(all_words)}
            unified_dict_reverse = {idx: word for idx, word in enumerate(all_words)}
            reordered_index = {'index_to_key': unified_dict_reverse, 'key_to_index': unified_dict}

            reordered_previous_node_feature_matrix = np.zeros((len(unified_dict), previous_graph['node_features'].shape[1]))
            for word, index in previous_index['key_to_index'].items():
                if word in unified_dict:
                    reordered_previous_node_feature_matrix[unified_dict[word]] = previous_graph['node_features'][index]

            reordered_current_node_feature_matrix = np.zeros((len(unified_dict), current_graph['node_features'].shape[1]))
            for word, index in current_index['key_to_index'].items():
                if word in unified_dict:
                    reordered_current_node_feature_matrix[unified_dict[word]] = current_graph['node_features'][index]

            # Mapping old indices to new indices for the previous dictionary
            previous_index_mapping = {old_index: unified_dict[word] for word, old_index in previous_index['key_to_index'].items()}
            updated_previous_edge_index = np.array(previous_graph['edge_index'])
            for i in range(previous_graph['edge_index'].shape[1]):
                updated_previous_edge_index[0, i] = previous_index_mapping.get(previous_graph['edge_index'][0, i], -1)
                updated_previous_edge_index[1, i] = previous_index_mapping.get(previous_graph['edge_index'][1, i], -1)
            # Remove edges where one of the nodes does not exist anymore (indicated by -1)
            updated_previous_edge_index = updated_previous_edge_index[:, ~(updated_previous_edge_index == -1).any(axis=0)]

            # Mapping old indices to new indices for the current dictionary
            current_index_mapping = {old_index: unified_dict[word] for word, old_index in current_index['key_to_index'].items()}
            updated_current_edge_index = np.array(current_graph['edge_index'])
            for i in range(current_graph['edge_index'].shape[1]):
                updated_current_edge_index[0, i] = current_index_mapping.get(current_graph['edge_index'][0, i], -1)
                updated_current_edge_index[1, i] = current_index_mapping.get(current_graph['edge_index'][1, i], -1)
            # Remove edges where one of the nodes does not exist anymore (indicated by -1)
            updated_current_edge_index = updated_current_edge_index[:, ~(updated_current_edge_index == -1).any(axis=0)]

            aligned_previous_graph = {
                'index': reordered_index,
                'node_features': reordered_previous_node_feature_matrix,
                'edge_index': updated_previous_edge_index,
                'edge_features': previous_graph['edge_features']

            aligned_current_graph = {
                'index': reordered_index,
                'node_features': reordered_current_node_feature_matrix,
                'edge_index': updated_current_edge_index,
                'edge_features': current_graph['edge_features']

            return aligned_previous_graph, aligned_current_graph

    def label_previous_graph(
            current_graph: dict,
            previous_graph: dict,
            label_feature_idx: int = 1
            ) -> (np.ndarray, np.ndarray):
        This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.

            current_graph (dict): the current snapshot of the temporal graph to use for labeling the previous snapshot.
            previous_graph (dict): the previous snapshot of the temporal graph to label.
            label_feature_idx (int): the index of the feature to use as labels. Default: 1.

            labels (np.ndarray): the labels of the edges of the graph at the specified index.
            labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.

        current_edge_index = current_graph['edge_index']
        current_edge_features = current_graph['edge_features']

        previous_edge_index = previous_graph['edge_index']

        previous_edges = [tuple(edge) for edge in previous_edge_index.T]
        current_edges  = [tuple(edge) for edge in current_edge_index.T]

        labels = []

        label_mask_1 = []
        label_mask_2 = []

        for i, previous_edge in enumerate(previous_edges):
            if previous_edge in current_edges:

                current_index = current_edges.index(previous_edge)

        label_mask = np.stack([label_mask_1, label_mask_2])
        labels = np.array(labels)

        return labels, label_mask


Retrieves the snapshot at the specified index.


Name Type Description Default
idx int

Index of the item to retrieve.



Name Type Description
snapshot dict

the graph data at the specified index.

node_features ndarray

the features of the nodes of the graph at the specified index.

edge_index ndarray

the edge index of the graph at the specified index.

edge_feature ndarray

the edge features of the graph at the specified index.

labels ndarray

the labels of the edges of the graph at the specified index.

labels_mask ndarray

the indices of the labels of the edges of the graph at the specified index.

Source code in semantics/graphs/
def __getitem__(self, idx):
    Retrieves the snapshot at the specified index.

        idx (int): Index of the item to retrieve.

        snapshot (dict): the graph data at the specified index.
        node_features (np.ndarray): the features of the nodes of the graph at the specified index.
        edge_index (np.ndarray): the edge index of the graph at the specified index.
        edge_feature (np.ndarray): the edge features of the graph at the specified index.
        labels (np.ndarray): the labels of the edges of the graph at the specified index.
        labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.
    # Get the tokenized inputs at the specified index
    snapshot = self.snapshots[idx]
    node_features = self.xs[idx]
    edge_index = self.edge_indices[idx]
    edge_feature = self.edge_features[idx]
    labels = self.ys[idx]
    labels_mask = self.y_indices[idx]

    return snapshot, node_features, edge_index, edge_feature, labels, labels_mask



Name Type Description
snapshots List[dict]

the snapshots of the temporal graph. Each snapshot is a dictionary containing the index of the nodes of the snapshot.

xs List[ndarray]

the features of the nodes of the temporal graph.

edge_indices List[ndarray]

the edge index of the temporal graph.

edge_features List[ndarray]

the edge features of the temporal graph.

ys List[ndarray]

the labels of the edges of the temporal graph.

y_indices List[ndarray]

the indices of the labels of the edges of the temporal graph.

Source code in semantics/graphs/
def __init__(

        snapshots (List[dict]): the snapshots of the temporal graph. Each snapshot is a dictionary containing the index of the nodes of the snapshot.
        xs (List[np.ndarray]): the features of the nodes of the temporal graph.
        edge_indices (List[np.ndarray]): the edge index of the temporal graph.
        edge_features (List[np.ndarray]): the edge features of the temporal graph.
        ys (List[np.ndarray]): the labels of the edges of the temporal graph.
        y_indices (List[np.ndarray]): the indices of the labels of the edges of the temporal graph.


    self.snapshots = []
    self.xs = []
    self.edge_indices = []
    self.edge_features = []
    self.ys = []
    self.y_indices = []

add_graph(target_word, level, k, c, dataset, word2vec_model, mlm_model)

This method is used to add a snapshot to the temporal graph.


Name Type Description Default
target_word str

the word to get the nodes for

level int

the level of the graph to get

k int

the number of similar nodes to get for each occurrence of the target word

c int

the number of context nodes to get for the target word

dataset List[str]

the sentences to get the nodes from

word2vec_model Word2VecInference

the word2vec model's Inference class

mlm_model (RobertaInference, BertInference)

the MLM model's Inference class



>>> word2vec = Word2VecInference('word2vec.model')
>>> mlm = RobertaInference('MLM_roberta')
>>> tg = TemporalGraph()
>>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence'], word2vec_model = word2vec, mlm_model = mlm)
>>> snapshot, node_features, edge_index, edge_feature, _, _= tg[0]
>>> print(snapshot)
{'index_to_key': {0: 'sentence', 1: 'this', 2: 'is', 3: 'a', 4: 'another'}, 'key_to_index': {'sentence': 0, 'this': 1, 'is': 2, 'a': 3, 'another': 4}
>>> print(node_features)
[[0, 0, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [2, 1, 2, ...]]
>>> print(edge_index)
[[0, 0, 0, 1, 1, 2, 2, 3, 3, 4], [1, 2, 4, 1, 3, 1, 4, 1, 4, 4]]
>>> print(edge_feature)
[[0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0]]
>>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence', 'this is a third sentence'], word2vec_model = word2vec, mlm_model = mlm)
>>> _, _, _, _, labels, label_mask= tg[0]
>>> print(labels)
[[0.9999, 1., 0.0001]]
>>> print(label_mask)
[[0, 1, 0], [1, 2, 4]]
Source code in semantics/graphs/
def add_graph(
        target_word: str, 
        level: int, 
        k: int, 
        c: int,
        dataset: List[str], 
        word2vec_model: Word2VecInference, 
        mlm_model: Union[RobertaInference, BertInference]
        ) -> None:
    This method is used to add a snapshot to the temporal graph.

        target_word (str): the word to get the nodes for
        level (int): the level of the graph to get
        k (int): the number of similar nodes to get for each occurrence of the target word
        c (int): the number of context nodes to get for the target word
        dataset (List[str]): the sentences to get the nodes from
        word2vec_model (Word2VecInference): the word2vec model's Inference class
        mlm_model (RobertaInference, BertInference): the MLM model's Inference class

        >>> word2vec = Word2VecInference('word2vec.model')
        >>> mlm = RobertaInference('MLM_roberta')
        >>> tg = TemporalGraph()
        >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence'], word2vec_model = word2vec, mlm_model = mlm)
        >>> snapshot, node_features, edge_index, edge_feature, _, _= tg[0]
        >>> print(snapshot)
        {'index_to_key': {0: 'sentence', 1: 'this', 2: 'is', 3: 'a', 4: 'another'}, 'key_to_index': {'sentence': 0, 'this': 1, 'is': 2, 'a': 3, 'another': 4}
        >>> print(node_features)
        [[0, 0, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [1, 1, 2, ...], [2, 1, 2, ...]]
        >>> print(edge_index)
        [[0, 0, 0, 1, 1, 2, 2, 3, 3, 4], [1, 2, 4, 1, 3, 1, 4, 1, 4, 4]]
        >>> print(edge_feature)
        [[0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [0, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0], [1, 0.9999999403953552, 0.0]]

        >>> tg.add_graph(target_word='sentence', level=3, k=2, c=2, dataset=['this is a sentence', 'this is another sentence', 'this is a third sentence'], word2vec_model = word2vec, mlm_model = mlm)
        >>> _, _, _, _, labels, label_mask= tg[0]
        >>> print(labels)
        [[0.9999, 1., 0.0001]]
        >>> print(label_mask)
        [[0, 1, 0], [1, 2, 4]]

    print(f'Adding the nodes of the word graph for the word "{target_word}"...')
    nodes = Nodes(
        target_word= target_word,
        level= level,
        k= k,
        c= c,
        word2vec_model = word2vec_model,
        mlm_model = mlm_model

    nds = nodes.get_nodes()
    print('Getting their features...', '\n')
    index, node_feature_matrix, embeddings = nodes.get_node_features(nds)
    print(f'Adding the edges of the word graph for the word "{target_word}"...')
    edges = Edges(
    edge_index, edge_feature_matrix = edges.get_edge_features(dataset)

    print('Constructing the temporal graph...', '\n')

construct_graph(current_index, current_node_feature_matrix, current_embeddings, current_edge_index, current_edge_feature_matrix)

This method is used to construct the temporal graph.


Name Type Description Default
current_index dict

the index of the nodes of the current snapshot.

current_node_feature_matrix ndarray

the features of the nodes of the current snapshot.

current_embeddings ndarray

the embeddings of the nodes of the current snapshot.

current_edge_index ndarray

the edge index of the current snapshot.

current_edge_feature_matrix ndarray

the edge features of the current snapshot.

Source code in semantics/graphs/
def construct_graph(

    This method is used to construct the temporal graph.

        current_index (dict): the index of the nodes of the current snapshot.
        current_node_feature_matrix (np.ndarray): the features of the nodes of the current snapshot.
        current_embeddings (np.ndarray): the embeddings of the nodes of the current snapshot.
        current_edge_index (np.ndarray): the edge index of the current snapshot.
        current_edge_feature_matrix (np.ndarray): the edge features of the current snapshot.

    if len(self.snapshots) == 0:
        print('Adding the first snapshot to the temporal graph...', '\n')
        self.xs.append(np.concatenate((current_node_feature_matrix, current_embeddings), axis=1))

        print(f'Adding the {len(self.snapshots)} snapshot to the temporal graph...', '\n')
        previous_index, previous_node_features, previous_edge_index, previous_edge_feature, _, _= self[-1]
        current_node_features = np.concatenate((current_node_feature_matrix, current_embeddings), axis=1)

        previous_graph = {
            'index': previous_index,
            'node_features': previous_node_features,
            'edge_index': previous_edge_index,
            'edge_features': previous_edge_feature

        current_graph = {
            'index': current_index,
            'node_features': current_node_features,
            'edge_index': current_edge_index,
            'edge_features': current_edge_feature_matrix

        print('Aligning the nodes of the current snapshot with the nodes of the previous snapshot...', '\n')
        aligned_previous_graph, aligned_current_graph = self.get_aligned_graph(current_graph, previous_graph)
        print('Labeling the edges of the previous snapshot with the edge feature values in the current snapshot...', '\n')
        previous_labels, previous_label_mask = self.label_previous_graph(current_graph, previous_graph)

        self.snapshots[-1] = aligned_previous_graph['index']
        self.xs[-1] = aligned_previous_graph['node_features']
        self.edge_indices[-1] = aligned_previous_graph['edge_index']
        self.edge_features[-1] = aligned_previous_graph['edge_features']
        self.ys[-1] = previous_labels
        self.y_indices[-1] = previous_label_mask


get_aligned_graph(current_graph, previous_graph)

This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.


Name Type Description Default
current_graph dict

the current snapshot of the temporal graph to align with the previous snapshot.

previous_graph dict

the previous snapshot of the temporal graph to align with the current snapshot.



Name Type Description
aligned_previous_graph dict

the aligned previous snapshot of the temporal graph.

aligned_current_graph dict

the aligned current snapshot of the temporal graph.

Source code in semantics/graphs/
def get_aligned_graph(
        current_graph: dict, 
        previous_graph: dict
        ) -> (dict, dict):

    This method is used to align the nodes of the current snapshot with the nodes of the previous snapshot.

        current_graph (dict): the current snapshot of the temporal graph to align with the previous snapshot.
        previous_graph (dict): the previous snapshot of the temporal graph to align with the current snapshot.

        aligned_previous_graph (dict): the aligned previous snapshot of the temporal graph.
        aligned_current_graph (dict): the aligned current snapshot of the temporal graph.

    current_index = current_graph['index']
    previous_index = previous_graph['index']

    if current_index == previous_index:
        return current_graph

    current_words = set(current_index['key_to_index'].keys())
    previous_words = set(previous_index['key_to_index'].keys())

    dynamic_graph = current_words != previous_words

    if not dynamic_graph:
        index_mapping = {current_index['key_to_index'][key]: previous_index['key_to_index'][key] for key in current_index['key_to_index']}

        reordered_node_feature_matrix = np.zeros_like(current_graph['node_features'])
        for current_idx, previous_idx in index_mapping.items():
            reordered_node_feature_matrix[previous_idx] = current_graph['node_features'][current_idx]

        updated_edge_index = np.zeros_like(current_graph['edge_index'])
        for i in range(current_graph['edge_index'].shape[1]):
            updated_edge_index[0, i] = index_mapping.get(current_graph['edge_index'][0, i], -1)
            updated_edge_index[1, i] = index_mapping.get(current_graph['edge_index'][1, i], -1)
        # Remove edges where one of the nodes does not exist anymore (indicated by -1)
        updated_edge_index = updated_edge_index[:, ~(updated_edge_index == -1).any(axis=0)]

        aligned_current_graph = {
            'index': previous_graph['index'],
            'node_features': reordered_node_feature_matrix,
            'edge_index': updated_edge_index,
            'edge_features': current_graph['edge_features']
        return previous_graph, aligned_current_graph

        all_words = current_words | previous_words
        unified_dict = {word: idx for idx, word in enumerate(all_words)}
        unified_dict_reverse = {idx: word for idx, word in enumerate(all_words)}
        reordered_index = {'index_to_key': unified_dict_reverse, 'key_to_index': unified_dict}

        reordered_previous_node_feature_matrix = np.zeros((len(unified_dict), previous_graph['node_features'].shape[1]))
        for word, index in previous_index['key_to_index'].items():
            if word in unified_dict:
                reordered_previous_node_feature_matrix[unified_dict[word]] = previous_graph['node_features'][index]

        reordered_current_node_feature_matrix = np.zeros((len(unified_dict), current_graph['node_features'].shape[1]))
        for word, index in current_index['key_to_index'].items():
            if word in unified_dict:
                reordered_current_node_feature_matrix[unified_dict[word]] = current_graph['node_features'][index]

        # Mapping old indices to new indices for the previous dictionary
        previous_index_mapping = {old_index: unified_dict[word] for word, old_index in previous_index['key_to_index'].items()}
        updated_previous_edge_index = np.array(previous_graph['edge_index'])
        for i in range(previous_graph['edge_index'].shape[1]):
            updated_previous_edge_index[0, i] = previous_index_mapping.get(previous_graph['edge_index'][0, i], -1)
            updated_previous_edge_index[1, i] = previous_index_mapping.get(previous_graph['edge_index'][1, i], -1)
        # Remove edges where one of the nodes does not exist anymore (indicated by -1)
        updated_previous_edge_index = updated_previous_edge_index[:, ~(updated_previous_edge_index == -1).any(axis=0)]

        # Mapping old indices to new indices for the current dictionary
        current_index_mapping = {old_index: unified_dict[word] for word, old_index in current_index['key_to_index'].items()}
        updated_current_edge_index = np.array(current_graph['edge_index'])
        for i in range(current_graph['edge_index'].shape[1]):
            updated_current_edge_index[0, i] = current_index_mapping.get(current_graph['edge_index'][0, i], -1)
            updated_current_edge_index[1, i] = current_index_mapping.get(current_graph['edge_index'][1, i], -1)
        # Remove edges where one of the nodes does not exist anymore (indicated by -1)
        updated_current_edge_index = updated_current_edge_index[:, ~(updated_current_edge_index == -1).any(axis=0)]

        aligned_previous_graph = {
            'index': reordered_index,
            'node_features': reordered_previous_node_feature_matrix,
            'edge_index': updated_previous_edge_index,
            'edge_features': previous_graph['edge_features']

        aligned_current_graph = {
            'index': reordered_index,
            'node_features': reordered_current_node_feature_matrix,
            'edge_index': updated_current_edge_index,
            'edge_features': current_graph['edge_features']

        return aligned_previous_graph, aligned_current_graph

label_previous_graph(current_graph, previous_graph, label_feature_idx=1)

This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.


Name Type Description Default
current_graph dict

the current snapshot of the temporal graph to use for labeling the previous snapshot.

previous_graph dict

the previous snapshot of the temporal graph to label.

label_feature_idx int

the index of the feature to use as labels. Default: 1.



Name Type Description
labels ndarray

the labels of the edges of the graph at the specified index.

labels_mask ndarray

the indices of the labels of the edges of the graph at the specified index.

Source code in semantics/graphs/
def label_previous_graph(
        current_graph: dict,
        previous_graph: dict,
        label_feature_idx: int = 1
        ) -> (np.ndarray, np.ndarray):
    This method is used to label the edges of the previous snapshot with the edge feature values in the current snapshot.

        current_graph (dict): the current snapshot of the temporal graph to use for labeling the previous snapshot.
        previous_graph (dict): the previous snapshot of the temporal graph to label.
        label_feature_idx (int): the index of the feature to use as labels. Default: 1.

        labels (np.ndarray): the labels of the edges of the graph at the specified index.
        labels_mask (np.ndarray): the indices of the labels of the edges of the graph at the specified index.

    current_edge_index = current_graph['edge_index']
    current_edge_features = current_graph['edge_features']

    previous_edge_index = previous_graph['edge_index']

    previous_edges = [tuple(edge) for edge in previous_edge_index.T]
    current_edges  = [tuple(edge) for edge in current_edge_index.T]

    labels = []

    label_mask_1 = []
    label_mask_2 = []

    for i, previous_edge in enumerate(previous_edges):
        if previous_edge in current_edges:

            current_index = current_edges.index(previous_edge)

    label_mask = np.stack([label_mask_1, label_mask_2])
    labels = np.array(labels)

    return labels, label_mask