Source code for cogdl.tasks.attributed_graph_clustering

import argparse
from typing import Dict
import numpy as np
import networkx as nx

from sklearn.cluster import KMeans, SpectralClustering
from sklearn.metrics.cluster import normalized_mutual_info_score
from sklearn.metrics import f1_score
from scipy.optimize import linear_sum_assignment

import torch
import torch.nn.functional as F

from cogdl.datasets import build_dataset
from cogdl.models import build_model
from . import BaseTask, register_task


[docs]@register_task("attributed_graph_clustering") class AttributedGraphClustering(BaseTask): """Attributed graph clustring task."""
[docs] @staticmethod def add_args(parser: argparse.ArgumentParser): """Add task-specific arguments to the parser.""" # fmt: off # parser.add_argument("--num-features", type=int) parser.add_argument("--num-clusters", type=int, default=7) parser.add_argument("--cluster-method", type=str, default="kmeans") parser.add_argument("--hidden-size", type=int, default=128) parser.add_argument("--model-type", type=str, default="content") parser.add_argument("--evaluate", type=str, default="full") parser.add_argument('--enhance', type=str, default=None, help='use prone or prone++ to enhance embedding')
# fmt: on def __init__( self, args, dataset=None, _=None, ): super(AttributedGraphClustering, self).__init__(args) self.args = args self.model_name = args.model self.device = "cpu" if not torch.cuda.is_available() or args.cpu else args.device_id[0] if dataset is None: dataset = build_dataset(args) self.dataset = dataset self.data = dataset[0] self.num_nodes = self.data.y.shape[0] args.num_clusters = torch.max(self.data.y) + 1 if args.model == "prone": self.hidden_size = args.hidden_size = args.num_features = 13 else: self.hidden_size = args.hidden_size = args.hidden_size args.num_features = dataset.num_features self.model = build_model(args) self.num_clusters = args.num_clusters if args.cluster_method not in ["kmeans", "spectral"]: raise Exception("cluster method must be kmeans or spectral") if args.model_type not in ["content", "spectral", "both"]: raise Exception("model type must be content, spectral or both") self.cluster_method = args.cluster_method if args.evaluate not in ["full", "NMI"]: raise Exception("evaluation must be full or NMI") self.model_type = args.model_type self.evaluate = args.evaluate self.is_weighted = self.data.edge_attr is not None self.enhance = args.enhance
[docs] def train(self) -> Dict[str, float]: if self.model_type == "content": features_matrix = self.data.x elif self.model_type == "spectral": G = nx.Graph() edge_index = torch.stack(self.data.edge_index).t().tolist() if self.is_weighted: edges, weight = ( edge_index, self.data.edge_attr.tolist(), ) G.add_weighted_edges_from([(edges[i][0], edges[i][1], weight[i][0]) for i in range(len(edges))]) else: G.add_edges_from(edge_index) embeddings = self.model.train(G) if self.enhance is not None: embeddings = self.enhance_emb(G, embeddings) # Map node2id features_matrix = np.zeros((self.num_nodes, self.hidden_size)) for vid, node in enumerate(G.nodes()): features_matrix[node] = embeddings[vid] features_matrix = torch.tensor(features_matrix) features_matrix = F.normalize(features_matrix, p=2, dim=1) else: trainer = self.model.get_trainer(self.args)(self.args) self.model = trainer.fit(self.model, self.data) features_matrix = self.model.get_features(self.data) features_matrix = features_matrix.cpu().numpy() print("Clustering...") if self.cluster_method == "kmeans": kmeans = KMeans(n_clusters=self.num_clusters, random_state=0).fit(features_matrix) clusters = kmeans.labels_ else: clustering = SpectralClustering( n_clusters=self.num_clusters, assign_labels="discretize", random_state=0 ).fit(features_matrix) clusters = clustering.labels_ if self.evaluate == "full": return self.__evaluate(clusters, True) else: return self.__evaluate(clusters, False)
def __evaluate(self, clusters, full=True) -> Dict[str, float]: print("Evaluating...") truth = self.data.y.cpu().numpy() if full: mat = np.zeros([self.num_clusters, self.num_clusters]) for i in range(self.num_nodes): mat[clusters[i]][truth[i]] -= 1 _, row_idx = linear_sum_assignment(mat) acc = -mat[_, row_idx].sum() / self.num_nodes for i in range(self.num_nodes): clusters[i] = row_idx[clusters[i]] macro_f1 = f1_score(truth, clusters, average="macro") return dict(Accuracy=acc, NMI=normalized_mutual_info_score(clusters, truth), Macro_F1=macro_f1) else: return dict(NMI=normalized_mutual_info_score(clusters, truth))