"""Semantic interpretation utilities for Model_C."""
from __future__ import annotations
import ast
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import pandas as pd
DEFAULT_COMPONENT_MAP = {
1: "DIVING_PUMP_1",
2: "DIVING_PUMP_2",
4: "FEEDBACK_PUMP_1",
8: "FEEDBACK_PUMP_2",
16: "FLOCCULANT_PUMP",
32: "BASIN_PUMP",
}
[docs]
@dataclass(frozen=True)
class SemanticRule:
"""Define an explicit semantic label for a sequence of components.
Attributes:
required_components: Components that must all be present.
operating_mode: High-level operating-mode label.
working_mode: More specific working-mode label.
"""
required_components: tuple[str, ...]
operating_mode: str
working_mode: str
[docs]
@dataclass(frozen=True)
class SemanticAssignment:
"""Represent the semantic interpretation of one active sequence.
Attributes:
sequence_states: Original sequence word.
components: Sorted tuple of unique components involved.
operating_mode: High-level operating-mode label.
working_mode: More specific working-mode label.
semantic_status: Normal or anomalous semantic status.
anomaly_score: Optional anomaly score inherited from Model_B.
"""
sequence_states: tuple[int, ...]
components: tuple[str, ...]
operating_mode: str
working_mode: str
semantic_status: str
anomaly_score: float | None
[docs]
class SemanticModeInterpreter:
"""Interpret Model_B active sequences as operating and working modes.
The interpreter decodes state words into component activity, assigns
semantic labels through a combination of explicit rules and heuristics, and
can enrich the result with anomaly information generated by Model_B.
"""
def __init__(
self,
component_map: dict[int, str] | None = None,
rules: Iterable[SemanticRule] | None = None,
) -> None:
"""Initialize the interpreter with component and semantic mappings.
Args:
component_map: Optional override for the default state-bitmask to
component mapping.
rules: Optional explicit semantic rules evaluated before heuristics.
"""
self.component_map = dict(component_map or DEFAULT_COMPONENT_MAP)
self.rules = list(rules or [])
[docs]
def load_active_sequences(self, file_path: str | Path) -> pd.DataFrame:
"""Load Model_B active sequences from Excel, CSV, or Parquet.
Args:
file_path: Path to the active-sequence report exported by Model_B.
Returns:
A DataFrame ready for semantic interpretation.
"""
path = Path(file_path)
if path.suffix.lower() == ".parquet":
frame = pd.read_parquet(path)
elif path.suffix.lower() in {".xlsx", ".xls"}:
frame = pd.read_excel(path)
elif path.suffix.lower() == ".csv":
frame = pd.read_csv(path)
else:
raise ValueError(f"Unsupported file extension: {path.suffix}")
return frame
[docs]
def load_comparison_report(self, file_path: str | Path) -> pd.DataFrame:
"""Load a Model_B sequence-comparison report.
Args:
file_path: Path to the comparison report exported by Model_B.
Returns:
A DataFrame aligned with the active-sequence report.
"""
return self.load_active_sequences(file_path)
[docs]
def load_rules(self, file_path: str | Path) -> list[SemanticRule]:
"""Load semantic rules from a JSON file.
Args:
file_path: Path to a JSON file containing semantic-rule records.
Returns:
The parsed list of semantic rules stored in the interpreter.
"""
payload = json.loads(Path(file_path).read_text(encoding="utf-8"))
self.rules = [
SemanticRule(
required_components=tuple(item["required_components"]),
operating_mode=item["operating_mode"],
working_mode=item["working_mode"],
)
for item in payload
]
return self.rules
[docs]
def interpret_sequences(
self,
sequences: pd.DataFrame,
*,
comparison: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""Assign semantic operating and working modes to sequence rows.
Args:
sequences: Active sequence report produced by Model_B.
comparison: Optional anomaly-comparison report produced by Model_B.
Returns:
A DataFrame containing semantic assignments for each sequence.
"""
comparison = comparison.reset_index(drop=True) if comparison is not None else None
assignments = []
for index, row in sequences.reset_index(drop=True).iterrows():
# Sequence words are serialized in Model_B reports, so they are
# decoded first and then mapped to industrial components.
states = self._parse_states(row["states"])
components = self._decode_sequence_components(states)
operating_mode, working_mode = self._assign_modes(components)
anomaly_score = None
semantic_status = "NORMAL"
if comparison is not None and index < len(comparison):
anomaly_score = float(comparison.loc[index, "anomaly_score"])
semantic_status = (
"ANOMALOUS" if bool(comparison.loc[index, "is_anomalous"]) else "NORMAL"
)
assignments.append(
SemanticAssignment(
sequence_states=states,
components=components,
operating_mode=operating_mode,
working_mode=working_mode,
semantic_status=semantic_status,
anomaly_score=anomaly_score,
).__dict__
)
return pd.DataFrame(assignments)
[docs]
def summarize_modes(self, assignments: pd.DataFrame) -> pd.DataFrame:
"""Summarize interpreted modes across all assignments.
Args:
assignments: Per-sequence semantic assignments.
Returns:
An aggregated count table grouped by operating mode, working mode,
and semantic status.
"""
if assignments.empty:
return pd.DataFrame(
columns=["operating_mode", "working_mode", "semantic_status", "count"]
)
summary = (
assignments.groupby(
["operating_mode", "working_mode", "semantic_status"],
dropna=False,
)
.size()
.reset_index(name="count")
.sort_values("count", ascending=False)
)
return summary
def _decode_sequence_components(self, states: tuple[int, ...]) -> tuple[str, ...]:
"""Decode a state word into a sorted set of active components.
Args:
states: Sequence word represented as integer state identifiers.
Returns:
A sorted tuple of unique components activated across the sequence.
"""
components: set[str] = set()
for state in states:
for bitmask, component in self.component_map.items():
if state & bitmask:
components.add(component)
return tuple(sorted(components))
def _assign_modes(self, components: tuple[str, ...]) -> tuple[str, str]:
"""Assign operating and working modes using rules and heuristics.
Args:
components: Unique components active in the interpreted sequence.
Returns:
A tuple containing the operating-mode and working-mode labels.
"""
component_set = set(components)
for rule in self.rules:
if set(rule.required_components).issubset(component_set):
return rule.operating_mode, rule.working_mode
# The default heuristic mapping provides a first operational vocabulary
# that can later be replaced or refined with explicit semantic rules.
if not components:
return "IDLE", "NO_ACTIVE_COMPONENTS"
if "BASIN_PUMP" in component_set:
return "TRANSFER_MODE", "BASIN_TRANSFER"
if "FLOCCULANT_PUMP" in component_set and component_set & {"DIVING_PUMP_1", "DIVING_PUMP_2"}:
return "TREATMENT_MODE", "FLOCCULANT_ASSISTED_CYCLE"
if component_set & {"FEEDBACK_PUMP_1", "FEEDBACK_PUMP_2"} and component_set & {"DIVING_PUMP_1", "DIVING_PUMP_2"}:
return "RECIRCULATION_MODE", "DIVING_FEEDBACK_CYCLE"
if component_set <= {"DIVING_PUMP_1", "DIVING_PUMP_2"}:
return "PUMPING_MODE", "DIVING_ONLY"
if component_set <= {"FEEDBACK_PUMP_1", "FEEDBACK_PUMP_2"}:
return "RECIRCULATION_MODE", "FEEDBACK_ONLY"
return "COMPOSITE_MODE", "+".join(components)
@staticmethod
def _parse_states(value: str | tuple[int, ...] | list[int]) -> tuple[int, ...]:
"""Parse serialized state tuples from Model_B reports.
Args:
value: Serialized or already structured state tuple.
Returns:
A normalized tuple of integer state identifiers.
"""
if isinstance(value, tuple):
return tuple(int(item) for item in value)
if isinstance(value, list):
return tuple(int(item) for item in value)
parsed = ast.literal_eval(value)
return tuple(int(item) for item in parsed)