Source code for iabm.data_processor

"""Data preparation utilities for Model_A industrial-state classifiers."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Sequence

import numpy as np
import pandas as pd


[docs] @dataclass(frozen=True) class TrainingDataset: """Bundle supervised features and labels for classifier training. The dataclass keeps the public API explicit and avoids passing loosely coupled tuples around the codebase when training workflows evolve. """ features: pd.DataFrame labels: pd.Series
[docs] @dataclass(frozen=True) class InferenceDataset: """Bundle inference-ready features together with activity bookkeeping. The source frame and activity mask allow the CLI to reconstruct outputs aligned with the original timestamps, including optional inactive rows. """ features: pd.DataFrame active_mask: pd.Series source_frame: pd.DataFrame
[docs] @dataclass(frozen=True) class EvaluationDataset: """Bundle features, labels, and alignment data for quality assessment.""" features: pd.DataFrame labels: pd.Series | None active_mask: pd.Series source_frame: pd.DataFrame
[docs] class IndustrialDataProcessor: """Prepare industrial analog and digital signals for Model_A workflows. The processor encapsulates the study-specific preprocessing rules so the rest of the package can work with clean training and inference datasets through a stable, object-oriented interface. """ DEFAULT_FEATURE_COLUMNS = [ "Vrms1", "Vrms2", "Vrms3", "Irms1", "Irms2", "Irms3", "PF1", "PF2", "PF3", ] POWER_COLUMNS = ["RP1", "RP2", "RP3"] THREE_PHASE_BLOCKS = [ ["Vrms1", "Vrms2", "Vrms3"], ["RP1", "RP2", "RP3"], ["Irms1", "Irms2", "Irms3"], ["PF1", "PF2", "PF3"], ] SINGLE_PHASE_BLOCKS = [["Vrms4"], ["RP4"], ["Irms4"], ["PF4"]] def __init__( self, analog_path: str, digital_path: Optional[str] = None, *, threshold: float = 50.0, feature_columns: Optional[Sequence[str]] = None, ) -> None: """Initialize the processor and eagerly load the configured datasets. Args: analog_path: Path to the analog Parquet dataset. digital_path: Optional path to the digital Parquet dataset used for labels. threshold: Minimum active power threshold used to discard inactive rows. feature_columns: Feature columns to expose to the classifier. """ self.analog_df = pd.read_parquet(analog_path).sort_index() self.digital_df = ( pd.read_parquet(digital_path).sort_index() if digital_path else None ) self.threshold = threshold self.feature_columns = list(feature_columns or self.DEFAULT_FEATURE_COLUMNS)
[docs] def prepare_training_data(self, start: str, end: str) -> TrainingDataset: """Return supervised features and labels for the requested time range. Args: start: Inclusive lower timestamp bound. end: Inclusive upper timestamp bound. Returns: A :class:`TrainingDataset` containing active rows only, with labels synchronized from the digital signal stream. """ analog_window = self._get_analog_window(start, end) labeled_window = self._attach_labels(analog_window) active_window = labeled_window[labeled_window["estado"] != 0].copy() return TrainingDataset( features=active_window[self.feature_columns], labels=active_window["estado"].astype(np.int32), )
[docs] def prepare_inference_data( self, start: str, end: str, *, drop_inactive: bool = True, ) -> InferenceDataset: """Return inference-ready analog features without requiring digital labels. Args: start: Inclusive lower timestamp bound. end: Inclusive upper timestamp bound. drop_inactive: Whether to keep only rows above the activity threshold. Returns: An :class:`InferenceDataset` with the feature matrix, a boolean mask identifying active rows, and the imputed source analog window. """ analog_window = self._get_analog_window(start, end) active_mask = self._build_activity_mask(analog_window) features = analog_window.loc[active_mask, self.feature_columns].copy() if not drop_inactive: features = analog_window[self.feature_columns].copy() return InferenceDataset( features=features, active_mask=active_mask, source_frame=analog_window, )
[docs] def prepare_evaluation_data(self, start: str, end: str) -> EvaluationDataset: """Return aligned features and optional labels for model evaluation. Args: start: Inclusive lower timestamp bound. end: Inclusive upper timestamp bound. Returns: An :class:`EvaluationDataset` containing the active feature matrix, optional real labels aligned to the full analog window, the activity mask, and the imputed source analog frame. """ inference_dataset = self.prepare_inference_data(start, end) labels: pd.Series | None = None if self.digital_df is not None: labeled_frame = self._attach_labels(inference_dataset.source_frame) labels = labeled_frame["estado"].astype(np.int32) return EvaluationDataset( features=inference_dataset.features, labels=labels, active_mask=inference_dataset.active_mask, source_frame=inference_dataset.source_frame, )
def _get_analog_window(self, start: str, end: str) -> pd.DataFrame: """Filter analog signals by period and apply the configured imputations. Args: start: Inclusive lower timestamp bound. end: Inclusive upper timestamp bound. Returns: An imputed analog window ready for activity filtering or labeling. """ analog_window = self._slice_by_period(self.analog_df, start, end) return self._impute_nans(analog_window) @staticmethod def _slice_by_period(df: pd.DataFrame, start: str, end: str) -> pd.DataFrame: """Return a copy of the rows included in the requested time interval. Args: df: Time-indexed DataFrame to slice. start: Inclusive lower timestamp bound. end: Inclusive upper timestamp bound. Returns: A copy of the requested time window. """ mask = (df.index >= start) & (df.index <= end) return df.loc[mask].copy() def _build_activity_mask(self, df: pd.DataFrame) -> pd.Series: """Mark rows whose maximum active power exceeds the configured threshold. Args: df: Analog signal window containing the configured power columns. Returns: A boolean Series where ``True`` identifies active rows. """ return df[self.POWER_COLUMNS].max(axis=1) >= self.threshold def _attach_labels(self, analog_df: pd.DataFrame) -> pd.DataFrame: """Synchronize analog rows with the nearest previous active digital label. The implementation mirrors the original Random Forest labeling logic by using the nearest earlier non-zero digital state for active analog rows. """ if self.digital_df is None: raise ValueError("Digital data is required to prepare supervised labels.") labeled_df = analog_df.copy() labeled_df["estado"] = 0 active_mask = self._build_activity_mask(labeled_df) if not active_mask.any(): return labeled_df relevant_digitals = self.digital_df[self.digital_df["estado"] != 0] if relevant_digitals.empty: raise ValueError("Digital dataset does not contain active state labels.") # Label transfer uses the nearest previous non-zero digital event so the # analog timeline inherits the operational state known at that instant. indexer = relevant_digitals.index.get_indexer( labeled_df.index[active_mask], method="pad" ) valid_positions = indexer >= 0 if valid_positions.any(): active_index = labeled_df.index[active_mask] labeled_df.loc[active_index[valid_positions], "estado"] = ( relevant_digitals["estado"].iloc[indexer[valid_positions]].to_numpy() ) labeled_df["estado"] = labeled_df["estado"].astype(np.int32) return labeled_df def _impute_nans(self, df: pd.DataFrame) -> pd.DataFrame: """Impute missing analog values with the legacy study-compatible strategy. Three-phase variables follow the original heuristic of using the minimum available phase value and then propagating neighbouring observations. The optional fourth-channel variables are interpolated when present. """ imputed_df = df.copy() self._impute_three_phase_blocks(imputed_df) self._impute_single_phase_blocks(imputed_df) return imputed_df.dropna(subset=self.feature_columns + self.POWER_COLUMNS) def _impute_three_phase_blocks(self, df: pd.DataFrame) -> None: """Impute three-phase channels via phase minimum, next, and previous values. Args: df: DataFrame updated in place. """ for block in self.THREE_PHASE_BLOCKS: available_block = [column for column in block if column in df.columns] if not available_block: continue block_frame = df[available_block] for column in available_block: nan_mask = df[column].isna() if nan_mask.any(): # The legacy heuristic first borrows the minimum valid value # from the remaining phases before applying temporal fills. df.loc[nan_mask, column] = block_frame.loc[nan_mask].min(axis=1) df[column] = df[column].bfill().ffill() def _impute_single_phase_blocks(self, df: pd.DataFrame) -> None: """Impute single-channel variables with linear interpolation when present. Args: df: DataFrame updated in place. """ for block in self.SINGLE_PHASE_BLOCKS: column = block[0] if column not in df.columns: continue # Single auxiliary channels do not have multi-phase redundancy, so # interpolation is the least invasive default imputation rule here. df[column] = df[column].interpolate( method="linear", limit_direction="both" )