Source code for pydgc.models.gae_ssc

# -*- coding: utf-8 -*-
import os
import torch

from . import GAE
from torch import Tensor
from ..metrics import DGCMetric
from ..modules import SSCLayer
from .dgc_model import DGCModel
from typing import Tuple, Any
from torch_geometric.data import Data
from yacs.config import CfgNode as CN
from ..utils import validate_and_create_path


[docs]class GAESSC(DGCModel): """Graph-autoencoder with self-supervised clustering used in DEC. Args: logger (Logger): Logger object. cfg (CN): Configuration object. """ def __init__(self, logger, cfg): super(GAESSC, self).__init__(logger, cfg) self.device = torch.device(cfg.device) self.gae = GAE(logger, cfg).to(self.device) self.ssc = SSCLayer(in_channels=self.cfg.model.dims[-1], out_channels=self.cfg.dataset.n_clusters, method='kl_div').to(self.device) self.loss_curve = [] self.nmi_curve = [] self.pretrain_loss_curve = [] self.best_embedding = None self.best_predicted_labels = None self.best_results = {'ACC': -1} self.reset_parameters()
[docs] def reset_parameters(self): self.gae.reset_parameters() self.ssc.reset_parameters()
[docs] def forward(self, data) -> Any: hat_adj, embedding = self.gae(data) q = self.ssc(embedding) return hat_adj, q
[docs] def loss(self, edge_index, hat_adj: Tensor, q: Tensor) -> Tensor: reconstruct_loss = self.gae.loss(edge_index, hat_adj) ssc_loss = self.ssc.loss(q, method='kl_div') loss_total = reconstruct_loss + float(self.cfg.train.alpha) * ssc_loss return loss_total
[docs] def pretrain(self, data: Data, cfg: CN = None, flag: str = "PRETRAIN GAE"): flag += f'-{self.gae.cfg.model.gnn_type.upper()}' if cfg is None: cfg = self.cfg.train.pretrain self.pretrain_loss_curve = self.gae.train_model(data, cfg, flag) validate_and_create_path(cfg.dir) pretrain_file_name = os.path.join(cfg.dir, f'gae_{self.cfg.model.gnn_type}.pth') torch.save(self.gae.state_dict(), pretrain_file_name)
[docs] def train_model(self, data: Data, cfg: CN = None, flag: str = "TRAIN GAE-SSC"): flag += f'-{self.cfg.model.gnn_type.upper()}' if cfg is None: cfg = self.cfg.train # load pretrained gae model pretrain_file_name = os.path.join(cfg.pretrain.dir, f'gae_{self.cfg.model.gnn_type}.pth') if not os.path.exists(pretrain_file_name): self.pretrain(data, cfg.pretrain, flag='PRETRAIN GAE') self.gae.load_state_dict(torch.load(pretrain_file_name, map_location=self.device)) optimizer = torch.optim.Adam(self.parameters(), lr=float(cfg.lr)) # initialize ssc layer _, _, cluster_centers = self.gae.clustering(data) self.ssc.cluster_centers.data = cluster_centers.to(self.device) self.gae.evaluate(data) self.logger.flag(flag) # train for epoch in range(1, cfg.max_epoch+1): self.train() optimizer.zero_grad() hat_adj, q = self.forward(data) loss = self.loss(data.edge_index, hat_adj, q) loss.backward() optimizer.step() self.loss_curve.append(loss.item()) self.logger.loss(epoch, loss) if epoch % 1 == 0: if self.cfg.evaluate.each: embedding, predicted_labels, results = self.evaluate(data) self.nmi_curve.append(results['NMI']) if results['ACC'] > self.best_results['ACC']: self.best_embedding = embedding self.best_predicted_labels = predicted_labels self.best_results = results if not self.cfg.evaluate.each: embedding, predicted_labels, results = self.evaluate(data) return self.loss_curve, self.nmi_curve, embedding, predicted_labels, results return self.loss_curve, self.nmi_curve, self.best_embedding, self.best_predicted_labels, self.best_results
[docs] def get_embedding(self, data) -> Tensor: with torch.no_grad(): self.eval() return self.gae.get_embedding(data)
[docs] def clustering(self, data) -> Tuple[Tensor, Tensor, Tensor]: embedding = self.get_embedding(data) labels_ = torch.from_numpy(self.ssc.get_q(embedding).detach().cpu().numpy().argmax(axis=1)) clustering_centers = self.ssc.cluster_centers.data return embedding, labels_, clustering_centers
[docs] def evaluate(self, data: Data): embedding, predicted_labels, clustering_centers = self.clustering(data) ground_truth = data.y.numpy() metric = DGCMetric(ground_truth, predicted_labels.numpy(), embedding, data.edge_index) results = metric.evaluate_one_epoch(self.logger, self.cfg.evaluate) return embedding, predicted_labels, results