Source code for iabm_semantics.semantics

"""Semantic interpretation utilities for Model_C."""

from __future__ import annotations

import ast
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable

import pandas as pd


DEFAULT_COMPONENT_MAP = {
    1: "DIVING_PUMP_1",
    2: "DIVING_PUMP_2",
    4: "FEEDBACK_PUMP_1",
    8: "FEEDBACK_PUMP_2",
    16: "FLOCCULANT_PUMP",
    32: "BASIN_PUMP",
}


[docs] @dataclass(frozen=True) class SemanticRule: """Define an explicit semantic label for a sequence of components. Attributes: required_components: Components that must all be present. operating_mode: High-level operating-mode label. working_mode: More specific working-mode label. """ required_components: tuple[str, ...] operating_mode: str working_mode: str
[docs] @dataclass(frozen=True) class SemanticAssignment: """Represent the semantic interpretation of one active sequence. Attributes: sequence_states: Original sequence word. components: Sorted tuple of unique components involved. operating_mode: High-level operating-mode label. working_mode: More specific working-mode label. semantic_status: Normal or anomalous semantic status. anomaly_score: Optional anomaly score inherited from Model_B. """ sequence_states: tuple[int, ...] components: tuple[str, ...] operating_mode: str working_mode: str semantic_status: str anomaly_score: float | None
[docs] class SemanticModeInterpreter: """Interpret Model_B active sequences as operating and working modes. The interpreter decodes state words into component activity, assigns semantic labels through a combination of explicit rules and heuristics, and can enrich the result with anomaly information generated by Model_B. """ def __init__( self, component_map: dict[int, str] | None = None, rules: Iterable[SemanticRule] | None = None, ) -> None: """Initialize the interpreter with component and semantic mappings. Args: component_map: Optional override for the default state-bitmask to component mapping. rules: Optional explicit semantic rules evaluated before heuristics. """ self.component_map = dict(component_map or DEFAULT_COMPONENT_MAP) self.rules = list(rules or [])
[docs] def load_active_sequences(self, file_path: str | Path) -> pd.DataFrame: """Load Model_B active sequences from Excel, CSV, or Parquet. Args: file_path: Path to the active-sequence report exported by Model_B. Returns: A DataFrame ready for semantic interpretation. """ path = Path(file_path) if path.suffix.lower() == ".parquet": frame = pd.read_parquet(path) elif path.suffix.lower() in {".xlsx", ".xls"}: frame = pd.read_excel(path) elif path.suffix.lower() == ".csv": frame = pd.read_csv(path) else: raise ValueError(f"Unsupported file extension: {path.suffix}") return frame
[docs] def load_comparison_report(self, file_path: str | Path) -> pd.DataFrame: """Load a Model_B sequence-comparison report. Args: file_path: Path to the comparison report exported by Model_B. Returns: A DataFrame aligned with the active-sequence report. """ return self.load_active_sequences(file_path)
[docs] def load_rules(self, file_path: str | Path) -> list[SemanticRule]: """Load semantic rules from a JSON file. Args: file_path: Path to a JSON file containing semantic-rule records. Returns: The parsed list of semantic rules stored in the interpreter. """ payload = json.loads(Path(file_path).read_text(encoding="utf-8")) self.rules = [ SemanticRule( required_components=tuple(item["required_components"]), operating_mode=item["operating_mode"], working_mode=item["working_mode"], ) for item in payload ] return self.rules
[docs] def interpret_sequences( self, sequences: pd.DataFrame, *, comparison: pd.DataFrame | None = None, ) -> pd.DataFrame: """Assign semantic operating and working modes to sequence rows. Args: sequences: Active sequence report produced by Model_B. comparison: Optional anomaly-comparison report produced by Model_B. Returns: A DataFrame containing semantic assignments for each sequence. """ comparison = comparison.reset_index(drop=True) if comparison is not None else None assignments = [] for index, row in sequences.reset_index(drop=True).iterrows(): # Sequence words are serialized in Model_B reports, so they are # decoded first and then mapped to industrial components. states = self._parse_states(row["states"]) components = self._decode_sequence_components(states) operating_mode, working_mode = self._assign_modes(components) anomaly_score = None semantic_status = "NORMAL" if comparison is not None and index < len(comparison): anomaly_score = float(comparison.loc[index, "anomaly_score"]) semantic_status = ( "ANOMALOUS" if bool(comparison.loc[index, "is_anomalous"]) else "NORMAL" ) assignments.append( SemanticAssignment( sequence_states=states, components=components, operating_mode=operating_mode, working_mode=working_mode, semantic_status=semantic_status, anomaly_score=anomaly_score, ).__dict__ ) return pd.DataFrame(assignments)
[docs] def summarize_modes(self, assignments: pd.DataFrame) -> pd.DataFrame: """Summarize interpreted modes across all assignments. Args: assignments: Per-sequence semantic assignments. Returns: An aggregated count table grouped by operating mode, working mode, and semantic status. """ if assignments.empty: return pd.DataFrame( columns=["operating_mode", "working_mode", "semantic_status", "count"] ) summary = ( assignments.groupby( ["operating_mode", "working_mode", "semantic_status"], dropna=False, ) .size() .reset_index(name="count") .sort_values("count", ascending=False) ) return summary
def _decode_sequence_components(self, states: tuple[int, ...]) -> tuple[str, ...]: """Decode a state word into a sorted set of active components. Args: states: Sequence word represented as integer state identifiers. Returns: A sorted tuple of unique components activated across the sequence. """ components: set[str] = set() for state in states: for bitmask, component in self.component_map.items(): if state & bitmask: components.add(component) return tuple(sorted(components)) def _assign_modes(self, components: tuple[str, ...]) -> tuple[str, str]: """Assign operating and working modes using rules and heuristics. Args: components: Unique components active in the interpreted sequence. Returns: A tuple containing the operating-mode and working-mode labels. """ component_set = set(components) for rule in self.rules: if set(rule.required_components).issubset(component_set): return rule.operating_mode, rule.working_mode # The default heuristic mapping provides a first operational vocabulary # that can later be replaced or refined with explicit semantic rules. if not components: return "IDLE", "NO_ACTIVE_COMPONENTS" if "BASIN_PUMP" in component_set: return "TRANSFER_MODE", "BASIN_TRANSFER" if "FLOCCULANT_PUMP" in component_set and component_set & {"DIVING_PUMP_1", "DIVING_PUMP_2"}: return "TREATMENT_MODE", "FLOCCULANT_ASSISTED_CYCLE" if component_set & {"FEEDBACK_PUMP_1", "FEEDBACK_PUMP_2"} and component_set & {"DIVING_PUMP_1", "DIVING_PUMP_2"}: return "RECIRCULATION_MODE", "DIVING_FEEDBACK_CYCLE" if component_set <= {"DIVING_PUMP_1", "DIVING_PUMP_2"}: return "PUMPING_MODE", "DIVING_ONLY" if component_set <= {"FEEDBACK_PUMP_1", "FEEDBACK_PUMP_2"}: return "RECIRCULATION_MODE", "FEEDBACK_ONLY" return "COMPOSITE_MODE", "+".join(components) @staticmethod def _parse_states(value: str | tuple[int, ...] | list[int]) -> tuple[int, ...]: """Parse serialized state tuples from Model_B reports. Args: value: Serialized or already structured state tuple. Returns: A normalized tuple of integer state identifiers. """ if isinstance(value, tuple): return tuple(int(item) for item in value) if isinstance(value, list): return tuple(int(item) for item in value) parsed = ast.literal_eval(value) return tuple(int(item) for item in parsed)