| """Training utilities for transformer-based multi-label classification. |
| |
| This module contains a small training harness around HuggingFace |
| `AutoModelForSequenceClassification` specialized for the project's |
| multi-label code-comment classification task. It provides: |
| |
| - `TransformerConfig` dataclass for configurable training runs. |
| - `CommentDataset` to wrap tokenization of pandas DataFrames. |
| - `TransformerTrainer` which runs the training loop, evaluation and |
| model export (with MLflow logging hooks). |
| |
| The helpers are intended for experimental, small-scale training and |
| instrumentation rather than production-grade distributed training. |
| """ |
|
|
| from dataclasses import asdict, dataclass |
| import logging |
| import os |
| from typing import Dict, List, Tuple |
|
|
| import mlflow |
| import numpy as np |
| import pandas as pd |
| from sklearn.metrics import ( |
| accuracy_score, |
| classification_report, |
| f1_score, |
| precision_score, |
| recall_score, |
| ) |
| import torch |
| from torch.utils.data import DataLoader, Dataset |
| from tqdm.auto import tqdm |
| from transformers import ( |
| AutoModelForSequenceClassification, |
| AutoTokenizer, |
| get_linear_schedule_with_warmup, |
| ) |
|
|
| from .preprocessing import load_or_prepare_data |
|
|
| logger = logging.getLogger(__name__) |
|
|
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| print(f"Using device: {DEVICE}") |
|
|
|
|
| |
| LABELS: Dict[str, Tuple[str, ...]] = { |
| "java": ( |
| "summary", |
| "Ownership", |
| "Expand", |
| "usage", |
| "Pointer", |
| "deprecation", |
| "rational", |
| ), |
| "python": ( |
| "Usage", |
| "Parameters", |
| "DevelopmentNotes", |
| "Expand", |
| "Summary", |
| ), |
| "pharo": ( |
| "Keyimplementationpoints", |
| "Example", |
| "Responsibilities", |
| "Intent", |
| "Keymessages", |
| "Collaborators", |
| ), |
| } |
|
|
|
|
| @dataclass |
| class TransformerConfig: |
| """Configuration for transformer training runs. |
| |
| Attributes are intentionally simple dataclass fields and map directly to |
| CLI/YAML configuration keys used by the training harness. |
| """ |
|
|
| lang: str |
| raw_data_dir: str |
| processed_data_dir: str |
| model_output_path: str |
| pretrained_model_name: str = "microsoft/codebert-base" |
| max_length: int = 128 |
| batch_size: int = 16 |
| lr: float = 2e-5 |
| num_epochs: int = 5 |
| warmup_ratio: float = 0.1 |
| pos_weight_cap: float = 30.0 |
| threshold: float = 0.5 |
| preprocessing: bool = False |
| preprocessing_factor: float = 1.0 |
|
|
| def __post_init__(self) -> None: |
| """Force correct types even if YAML provides strings.""" |
| self.max_length = int(self.max_length) |
| self.batch_size = int(self.batch_size) |
| self.lr = float(self.lr) |
| self.num_epochs = int(self.num_epochs) |
| self.warmup_ratio = float(self.warmup_ratio) |
| self.pos_weight_cap = float(self.pos_weight_cap) |
| self.threshold = float(self.threshold) |
| self.preprocessing_factor = float(self.preprocessing_factor) |
|
|
| |
| if isinstance(self.preprocessing, str): |
| self.preprocessing = self.preprocessing.lower() == "true" |
|
|
|
|
| class CommentDataset(Dataset): |
| """Simple Dataset wrapper around a pandas DataFrame with 'combo' and 'labels_array'.""" |
|
|
| def __init__(self, df: pd.DataFrame, tokenizer: AutoTokenizer, max_length: int): |
| """Create a dataset that tokenizes rows on demand. |
| |
| Parameters |
| ---------- |
| df : pandas.DataFrame |
| Input frame containing at least `combo` and `labels_array` columns. |
| tokenizer : transformers.AutoTokenizer |
| Tokenizer used to encode text into model inputs. |
| max_length : int |
| Maximum tokenization length (used for padding/truncation). |
| |
| """ |
| self.df = df.reset_index(drop=True) |
| self.tokenizer = tokenizer |
| self.max_length = max_length |
|
|
| def __len__(self) -> int: |
| """Return the number of examples in the dataset.""" |
| return len(self.df) |
|
|
| def __getitem__(self, idx: int): |
| """Return a single tokenized example and its labels as tensors. |
| |
| The returned dict contains tokenized inputs (PyTorch tensors) and a |
| `labels` tensor suitable for BCEWithLogitsLoss for multi-label tasks. |
| """ |
| row = self.df.iloc[idx] |
| text = str(row["combo"]) |
| labels = np.asarray(row["labels_array"], dtype=np.float32) |
|
|
| enc = self.tokenizer( |
| text, |
| truncation=True, |
| max_length=self.max_length, |
| padding="max_length", |
| return_tensors="pt", |
| ) |
|
|
| item = {k: v.squeeze(0) for k, v in enc.items()} |
| item["labels"] = torch.from_numpy(labels) |
| return item |
|
|
|
|
| class TransformerTrainer: |
| """End-to-end transformer trainer for the code comment multi-label task.""" |
|
|
| def __init__(self, cfg: TransformerConfig) -> None: |
| """Initialize training state, data loaders, model and optimizer. |
| |
| Parameters |
| ---------- |
| cfg : TransformerConfig |
| Training configuration containing data paths and hyperparameters. |
| |
| """ |
| self.cfg = cfg |
| if cfg.lang not in LABELS: |
| raise ValueError(f"No LABELS defined for language '{cfg.lang}'.") |
|
|
| self.label_names = LABELS[cfg.lang] |
| self.num_labels = len(self.label_names) |
|
|
| logger.info("Initializing TransformerTrainer for language '%s'.", cfg.lang) |
| logger.info("Raw data directory: %s", cfg.raw_data_dir) |
| logger.info("Processed data directory: %s", cfg.processed_data_dir) |
| logger.info("Model output path: %s", cfg.model_output_path) |
|
|
| |
| self.train_df, self.eval_df, self.preprocessing_used = load_or_prepare_data( |
| lang=cfg.lang, |
| raw_data_dir=cfg.raw_data_dir, |
| processed_data_dir=cfg.processed_data_dir, |
| preprocessing_enabled=cfg.preprocessing, |
| preprocessing_factor=cfg.preprocessing_factor, |
| random_state=42, |
| ) |
|
|
| logger.info("Preprocessing used for this run: %s", self.preprocessing_used) |
| logger.info("Using device: %s", DEVICE) |
| logger.info( |
| "Train size: %d rows, Eval size: %d rows", |
| len(self.train_df), |
| len(self.eval_df), |
| ) |
|
|
| |
| try: |
| cfg_dict = asdict(self.cfg) |
| mlflow.log_params({f"cfg_{k}": v for k, v in cfg_dict.items()}) |
| mlflow.log_param("num_labels", self.num_labels) |
| mlflow.log_param("label_names", ",".join(self.label_names)) |
| mlflow.log_param("train_samples", len(self.train_df)) |
| mlflow.log_param("eval_samples", len(self.eval_df)) |
| mlflow.log_param("preprocessing_used", self.preprocessing_used) |
| except Exception as e: |
| logger.warning("Could not log transformer config to MLflow: %s", e) |
|
|
| |
| logger.info("Loading tokenizer '%s'.", cfg.pretrained_model_name) |
| self.tokenizer = AutoTokenizer.from_pretrained(cfg.pretrained_model_name) |
|
|
| |
| y_train = np.stack(self.train_df["labels_array"].to_numpy()) |
| self.pos_weight = self._compute_pos_weight(y_train) |
|
|
| |
| train_dataset = CommentDataset(self.train_df, self.tokenizer, cfg.max_length) |
| eval_dataset = CommentDataset(self.eval_df, self.tokenizer, cfg.max_length) |
|
|
| self.train_loader = DataLoader( |
| train_dataset, |
| batch_size=cfg.batch_size, |
| shuffle=True, |
| ) |
| self.eval_loader = DataLoader( |
| eval_dataset, |
| batch_size=cfg.batch_size, |
| shuffle=False, |
| ) |
|
|
| logger.info( |
| "Hyperparameters – lr=%s (type=%s), batch_size=%s, num_epochs=%s", |
| self.cfg.lr, |
| type(self.cfg.lr), |
| self.cfg.batch_size, |
| self.cfg.num_epochs, |
| ) |
|
|
| |
| logger.info("Loading base model '%s'.", cfg.pretrained_model_name) |
| self.model = AutoModelForSequenceClassification.from_pretrained( |
| cfg.pretrained_model_name, |
| num_labels=self.num_labels, |
| problem_type="multi_label_classification", |
| ).to(DEVICE) |
|
|
| self.loss_fn = torch.nn.BCEWithLogitsLoss(pos_weight=self.pos_weight.to(DEVICE)) |
| self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.cfg.lr) |
|
|
| num_training_steps = cfg.num_epochs * len(self.train_loader) |
| num_warmup_steps = int(cfg.warmup_ratio * num_training_steps) |
| logger.info( |
| "Total training steps: %d, warmup steps: %d.", |
| num_training_steps, |
| num_warmup_steps, |
| ) |
|
|
| self.scheduler = get_linear_schedule_with_warmup( |
| self.optimizer, |
| num_warmup_steps=num_warmup_steps, |
| num_training_steps=num_training_steps, |
| ) |
|
|
| self.best_state_dict = None |
| self.best_val_macro_f1 = 0.0 |
|
|
| def _compute_pos_weight(self, y: np.ndarray) -> torch.Tensor: |
| if y.ndim == 1: |
| y = y[:, None] |
| freq = y.sum(axis=0).astype(np.float64) |
| num_samples = y.shape[0] |
|
|
| pos_weight = (num_samples - freq) / np.clip(freq, 1.0, None) |
| pos_weight = np.clip(pos_weight, 1.0, self.cfg.pos_weight_cap) |
|
|
| logger.info("Positive class weights (clipped): %s", pos_weight.tolist()) |
| return torch.tensor(pos_weight, dtype=torch.float32) |
|
|
| def _step_batch(self, batch, train: bool): |
| batch = {k: v.to(DEVICE) for k, v in batch.items()} |
| labels = batch.pop("labels") |
|
|
| outputs = self.model(**batch) |
| logits = outputs.logits |
| loss = self.loss_fn(logits, labels) |
|
|
| if train: |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) |
| self.optimizer.step() |
| self.scheduler.step() |
| self.optimizer.zero_grad() |
|
|
| return loss, logits, labels |
|
|
| def train_one_epoch(self, epoch: int) -> float: |
| """Run a single training epoch over `self.train_loader`. |
| |
| Returns |
| ------- |
| float |
| The average training loss over the epoch. |
| |
| """ |
| self.model.train() |
| total_loss = 0.0 |
| n_samples = 0 |
|
|
| num_batches = len(self.train_loader) |
| logger.info("Starting epoch %d training. Number of batches: %d", epoch, num_batches) |
|
|
| progress_bar = tqdm( |
| self.train_loader, |
| desc=f"Epoch {epoch} [train]", |
| total=num_batches, |
| leave=False, |
| ) |
|
|
| for step, batch in enumerate(progress_bar, start=1): |
| loss, _, _ = self._step_batch(batch, train=True) |
| batch_size = batch["input_ids"].size(0) |
| total_loss += loss.item() * batch_size |
| n_samples += batch_size |
|
|
| avg_loss_so_far = total_loss / max(n_samples, 1) |
| progress_bar.set_postfix({"loss": f"{avg_loss_so_far:.4f}"}) |
|
|
| avg_loss = total_loss / max(n_samples, 1) |
| logger.info("Epoch %d training completed. Average loss: %.4f.", epoch, avg_loss) |
|
|
| mlflow.log_metric("train_loss", avg_loss, step=epoch) |
|
|
| return avg_loss |
|
|
| def evaluate( |
| self, |
| epoch: int, |
| split_name: str = "eval", |
| ) -> Tuple[float, float, float, np.ndarray, np.ndarray]: |
| """Evaluate the model on `self.eval_loader` and compute metrics. |
| |
| Parameters |
| ---------- |
| epoch : int |
| Current epoch number (used for logging). |
| split_name : str |
| Name of the evaluation split used for MLflow metric keys. |
| |
| Returns |
| ------- |
| tuple |
| (avg_loss, micro_f1, macro_f1, y_true, y_pred) |
| |
| """ |
| self.model.eval() |
| total_loss = 0.0 |
| n_samples = 0 |
| all_preds: List[np.ndarray] = [] |
| all_labels: List[np.ndarray] = [] |
|
|
| logger.info("Starting evaluation for epoch %d on split '%s'.", epoch, split_name) |
|
|
| num_batches = len(self.eval_loader) |
| progress_bar = tqdm( |
| self.eval_loader, |
| desc=f"Epoch {epoch} [{split_name}]", |
| total=num_batches, |
| leave=False, |
| ) |
|
|
| with torch.no_grad(): |
| for batch in progress_bar: |
| loss, logits, labels = self._step_batch(batch, train=False) |
| batch_size = logits.size(0) |
| total_loss += loss.item() * batch_size |
| n_samples += batch_size |
|
|
| probs = torch.sigmoid(logits) |
| preds = (probs > self.cfg.threshold).long() |
|
|
| all_preds.append(preds.cpu().numpy()) |
| all_labels.append(labels.cpu().numpy()) |
|
|
| avg_loss_so_far = total_loss / max(n_samples, 1) |
| progress_bar.set_postfix({"loss": f"{avg_loss_so_far:.4f}"}) |
|
|
| avg_loss = total_loss / max(n_samples, 1) |
| y_pred = np.concatenate(all_preds, axis=0) |
| y_true = np.concatenate(all_labels, axis=0) |
|
|
| |
| micro_f1 = f1_score(y_true, y_pred, average="micro", zero_division=0) |
| macro_f1 = f1_score(y_true, y_pred, average="macro", zero_division=0) |
|
|
| |
| micro_precision = precision_score(y_true, y_pred, average="micro", zero_division=0) |
| macro_precision = precision_score(y_true, y_pred, average="macro", zero_division=0) |
|
|
| |
| micro_recall = recall_score(y_true, y_pred, average="micro", zero_division=0) |
| macro_recall = recall_score(y_true, y_pred, average="macro", zero_division=0) |
|
|
| |
| |
| subset_accuracy = accuracy_score(y_true, y_pred) |
| |
| micro_accuracy = accuracy_score(y_true.flatten(), y_pred.flatten()) |
|
|
| logger.info( |
| "Eval results [%s] - loss: %.4f | " |
| "micro-F1: %.4f, macro-F1: %.4f | " |
| "micro-P: %.4f, macro-P: %.4f | " |
| "micro-R: %.4f, macro-R: %.4f | " |
| "subset-acc: %.4f, micro-acc: %.4f", |
| split_name, |
| avg_loss, |
| micro_f1, |
| macro_f1, |
| micro_precision, |
| macro_precision, |
| micro_recall, |
| macro_recall, |
| subset_accuracy, |
| micro_accuracy, |
| ) |
|
|
| |
| mlflow.log_metric(f"{split_name}_loss", avg_loss, step=epoch) |
| mlflow.log_metric(f"{split_name}_micro_f1", micro_f1, step=epoch) |
| mlflow.log_metric(f"{split_name}_macro_f1", macro_f1, step=epoch) |
| mlflow.log_metric(f"{split_name}_micro_precision", micro_precision, step=epoch) |
| mlflow.log_metric(f"{split_name}_macro_precision", macro_precision, step=epoch) |
| mlflow.log_metric(f"{split_name}_micro_recall", micro_recall, step=epoch) |
| mlflow.log_metric(f"{split_name}_macro_recall", macro_recall, step=epoch) |
| mlflow.log_metric(f"{split_name}_subset_accuracy", subset_accuracy, step=epoch) |
| mlflow.log_metric(f"{split_name}_micro_accuracy", micro_accuracy, step=epoch) |
|
|
| return avg_loss, micro_f1, macro_f1, y_true, y_pred |
|
|
| def run(self) -> Dict[str, float]: |
| """Execute the full training loop and save the best model. |
| |
| Returns |
| ------- |
| dict |
| Summary metrics from the final evaluation (micro/macro F1). |
| |
| """ |
| logger.info("Starting training loop for %d epochs.", self.cfg.num_epochs) |
| for epoch in range(1, self.cfg.num_epochs + 1): |
| train_loss = self.train_one_epoch(epoch) |
| val_loss, val_micro_f1, val_macro_f1, _, _ = self.evaluate(epoch, split_name="eval") |
|
|
| logger.info( |
| "[%s] epoch=%d train_loss=%.4f val_loss=%.4f val_micro_f1=%.4f val_macro_f1=%.4f", |
| self.cfg.lang, |
| epoch, |
| train_loss, |
| val_loss, |
| val_micro_f1, |
| val_macro_f1, |
| ) |
|
|
| if val_macro_f1 > self.best_val_macro_f1: |
| logger.info( |
| "New best macro-F1: %.4f (previous: %.4f). Saving current model state.", |
| val_macro_f1, |
| self.best_val_macro_f1, |
| ) |
| self.best_val_macro_f1 = val_macro_f1 |
| self.best_state_dict = {k: v.cpu() for k, v in self.model.state_dict().items()} |
|
|
| if self.best_state_dict is not None: |
| logger.info("Loading best model weights (macro-F1 = %.4f).", self.best_val_macro_f1) |
| self.model.load_state_dict(self.best_state_dict) |
|
|
| |
| _, micro_f1, macro_f1, y_true, y_pred = self.evaluate( |
| epoch=self.cfg.num_epochs, |
| split_name="eval", |
| ) |
|
|
| logger.info( |
| "[%s] FINAL micro-F1 = %.4f, macro-F1 = %.4f.", |
| self.cfg.lang, |
| micro_f1, |
| macro_f1, |
| ) |
| logger.info( |
| "Per-label classification report:\n%s", |
| classification_report(y_true, y_pred, target_names=self.label_names, zero_division=0), |
| ) |
|
|
| |
| os.makedirs(self.cfg.model_output_path, exist_ok=True) |
| logger.info("Saving model and tokenizer to '%s'.", self.cfg.model_output_path) |
| self.model.save_pretrained(self.cfg.model_output_path) |
| self.tokenizer.save_pretrained(self.cfg.model_output_path) |
|
|
| |
| logger.info("Logging final model artifacts to MLflow.") |
| mlflow.log_artifacts( |
| self.cfg.model_output_path, |
| artifact_path=f"{self.cfg.lang}_transformer_model", |
| ) |
|
|
| logger.info("Logging HF transformers model to MLflow via mlflow.transformers.log_model.") |
| model_info = mlflow.transformers.log_model( |
| transformers_model=self.cfg.model_output_path, |
| artifact_path=f"{self.cfg.lang}_transformer_model", |
| task="text-classification", |
| ) |
|
|
| logger.info( |
| "Logged transformers model to MLflow with URI: %s", |
| model_info.model_uri, |
| ) |
|
|
| return { |
| "micro_f1": float(micro_f1), |
| "macro_f1": float(macro_f1), |
| } |
|
|