Source code for iabm_behavior.main

"""Command-line entry point for Model_B behavioral sequence analysis."""

from __future__ import annotations

import argparse
import sys
from pathlib import Path
from typing import Callable

import pandas as pd
from pandas import DatetimeTZDtype

from .sequences import BehavioralSequenceAnalyzer
from .utils import setup_i18n


[docs] def parse_arguments(translator: Callable[[str], str]) -> argparse.Namespace: """Build the CLI parser with translated help messages. Args: translator: Translation function returned by :func:`setup_i18n`. Returns: Parsed command-line arguments driving the sequence-analysis workflow. """ _ = translator parser = argparse.ArgumentParser( description=_("Behavioral sequence analysis for industrial state timelines") ) parser.add_argument( "--input", required=True, help=_("Path to the state timeline file generated by Model_A or digital labeling."), ) parser.add_argument( "--nominal-input", help=_("Optional nominal reference timeline used for anomaly comparison."), ) parser.add_argument( "--output-dir", required=True, help=_("Directory where Model_B reports will be written."), ) parser.add_argument( "--lang", default="en", choices=["es", "en"], help=_("Interface language."), ) parser.add_argument( "--state-column", default="Predicted_State", help=_("State column to analyze."), ) parser.add_argument( "--smooth-short-runs", action="store_true", help=_("Apply smoothing to transient short runs before sequence extraction."), ) parser.add_argument( "--min-duration-seconds", type=float, default=1.0, help=_("Maximum duration treated as a transient run."), ) parser.add_argument( "--min-samples", type=int, default=1, help=_("Maximum sample count treated as a transient run."), ) parser.add_argument( "--anomaly-threshold", type=float, default=1.0, help=_("Threshold applied to the anomaly score when nominal comparison is enabled."), ) return parser.parse_args()
[docs] def main() -> None: """Run the Model_B sequence-analysis workflow from the command line. The workflow can operate in two levels. At minimum, it extracts runs, active sequences, and repeated sequence words from a state timeline. When a nominal timeline is also provided, it derives a nominal reference set and produces an anomaly-oriented comparison report. """ lang = _detect_language(sys.argv) translator = setup_i18n(lang) args = parse_arguments(translator) analyzer = BehavioralSequenceAnalyzer(state_column=args.state_column) timeline = analyzer.load_state_timeline(args.input) if args.smooth_short_runs: # Smoothing is applied before both run extraction and nominal matching # so downstream words are less sensitive to one-sample transients. timeline = analyzer.smooth_short_runs( timeline, min_duration_seconds=args.min_duration_seconds, min_samples=args.min_samples, ) runs = analyzer.extract_runs(timeline) sequences = analyzer.extract_active_sequences(timeline) words = analyzer.summarize_sequence_words(sequences) comparisons = pd.DataFrame() if args.nominal_input: nominal_timeline = analyzer.load_state_timeline(args.nominal_input) if args.smooth_short_runs: nominal_timeline = analyzer.smooth_short_runs( nominal_timeline, min_duration_seconds=args.min_duration_seconds, min_samples=args.min_samples, ) nominal_sequences = analyzer.extract_active_sequences(nominal_timeline) nominal_reference = analyzer.build_nominal_reference(nominal_sequences) # The comparison report provides the bridge from behavioral words to # quantitative anomaly assessment. comparisons = analyzer.compare_to_nominal( sequences, nominal_reference, anomaly_threshold=args.anomaly_threshold, ) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) runs_frame = pd.DataFrame([run.__dict__ for run in runs]) sequences_frame = pd.DataFrame( [ { "start_time": sequence.start_time, "end_time": sequence.end_time, "states": str(sequence.states), "total_duration_seconds": sequence.total_duration_seconds, "run_count": sequence.run_count, } for sequence in sequences ] ) runs_path = output_dir / "state_runs.xlsx" sequences_path = output_dir / "active_sequences.xlsx" words_path = output_dir / "sequence_words.xlsx" comparison_path = output_dir / "sequence_comparison.xlsx" _prepare_excel_frame(runs_frame).to_excel(runs_path, index=False) _prepare_excel_frame(sequences_frame).to_excel(sequences_path, index=False) words.to_excel(words_path, index=False) if not comparisons.empty: comparisons.to_excel(comparison_path, index=False) print(translator("State-run report saved to: {}").format(runs_path)) print(translator("Active-sequence report saved to: {}").format(sequences_path)) print(translator("Sequence-word summary saved to: {}").format(words_path)) if not comparisons.empty: print(translator("Sequence-comparison report saved to: {}").format(comparison_path))
def _detect_language(argv: list[str]) -> str: """Extract the requested language before parsing the translated CLI. Args: argv: Raw command-line token list. Returns: The requested language code, or ``"en"`` when no valid language token is available. """ if "--lang" in argv: try: return argv[argv.index("--lang") + 1] except (IndexError, ValueError): return "en" return "en" def _prepare_excel_frame(frame: pd.DataFrame) -> pd.DataFrame: """Convert timezone-aware datetime columns into Excel-safe strings. Args: frame: DataFrame about to be exported to Excel. Returns: A copy whose timezone-aware datetime columns have been converted into stable string representations compatible with spreadsheet writers. """ prepared = frame.copy() for column in prepared.columns: if isinstance(prepared[column].dtype, DatetimeTZDtype): prepared[column] = prepared[column].dt.strftime("%Y-%m-%d %H:%M:%S%z") return prepared if __name__ == "__main__": main()