""" BambuLab Cloud API Client Provides authentication, device management, and real-time MQTT monitoring for BambuLab 3D printers via the Cloud API. Requires: pip install bambu-lab-cloud-api Usage: from bambu_run.mqtt_client import BambuPrinter, PrinterState printer = BambuPrinter(token="your_token", device_id="your_device_id") printer.connect() state = printer.get_state() snapshot = printer.get_snapshot() printer.disconnect() """ import io import logging import os import platform import sys import select from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime from typing import Any, Callable, Dict, List, Optional from zoneinfo import ZoneInfo from .conf import app_settings # Re-export from bambu-lab-cloud-api package try: from bambulab import BambuAuthenticator, BambuClient, MQTTClient except ImportError as e: raise ImportError( "bambu-lab-cloud-api package is required. Install with: pip install bambu-lab-cloud-api" ) from e logger = logging.getLogger(__name__) @contextmanager def suppress_stdout(): """Context manager to suppress stdout (for silencing library print statements)""" old_stdout = sys.stdout sys.stdout = io.StringIO() try: yield finally: sys.stdout = old_stdout def timed_input(prompt: str, timeout_sec: int = 300) -> str: """ Get user input with a timeout. Args: prompt: The prompt to display timeout_sec: Timeout in seconds (default 300 = 5 minutes) Returns: User input string Raises: TimeoutError: If no input received within timeout """ print(prompt, end='', flush=True) if platform.system() == 'Windows': import threading result = {'value': None, 'done': False} def get_input(): try: result['value'] = input() except EOFError: result['value'] = None result['done'] = True thread = threading.Thread(target=get_input, daemon=True) thread.start() thread.join(timeout=timeout_sec) if not result['done']: print() raise TimeoutError(f"No input received within {timeout_sec} seconds") return result['value'] or "" else: ready, _, _ = select.select([sys.stdin], [], [], timeout_sec) if ready: return sys.stdin.readline().strip() else: print() raise TimeoutError(f"No input received within {timeout_sec} seconds") @dataclass class FilamentTray: """Represents a single filament tray in an AMS unit""" tray_id: str = "" tray_id_name: str = "" tray_type: str = "" tray_sub_brands: str = "" tray_color: str = "" remain_percent: int = -1 tray_weight: int = 0 tray_diameter: float = 1.75 tray_temp: int = 0 nozzle_temp_min: int = 0 nozzle_temp_max: int = 0 state: int = 0 tag_uid: str = "" tray_uuid: str = "" k: float = 0.0 n: float = 0.0 cali_idx: int = -1 total_len: int = 0 tray_info_idx: str = "" tray_time: int = 0 tray_bed_temp: int = 0 bed_temp_type: int = 0 cols: List[str] = field(default_factory=list) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "FilamentTray": """Create FilamentTray from MQTT tray data""" return cls( tray_id=str(data.get("id", "")), tray_id_name=data.get("tray_id_name", ""), tray_type=data.get("tray_type", ""), tray_sub_brands=data.get("tray_sub_brands", ""), tray_color=data.get("tray_color", ""), remain_percent=data.get("remain", -1), tray_weight=int(data.get("tray_weight", 0)), tray_diameter=float(data.get("tray_diameter", 1.75)), tray_temp=int(data.get("tray_temp", 0)), nozzle_temp_min=int(data.get("nozzle_temp_min", 0)), nozzle_temp_max=int(data.get("nozzle_temp_max", 0)), state=data.get("state", 0), tag_uid=data.get("tag_uid", ""), tray_uuid=data.get("tray_uuid", ""), k=float(data.get("k", 0.0)), n=float(data.get("n", 0.0)), cali_idx=int(data.get("cali_idx", -1)), total_len=int(data.get("total_len", 0)), tray_info_idx=data.get("tray_info_idx", ""), tray_time=int(data.get("tray_time", 0)), tray_bed_temp=int(data.get("bed_temp", 0)), bed_temp_type=int(data.get("bed_temp_type", 0)), cols=data.get("cols", []), ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for database storage""" return { "tray_id": self.tray_id, "tray_id_name": self.tray_id_name, "tray_type": self.tray_type, "tray_sub_brands": self.tray_sub_brands, "tray_color": self.tray_color, "remain_percent": self.remain_percent, "tray_weight": self.tray_weight, "tray_diameter": self.tray_diameter, "tray_temp": self.tray_temp, "nozzle_temp_min": self.nozzle_temp_min, "nozzle_temp_max": self.nozzle_temp_max, "state": self.state, "tag_uid": self.tag_uid, "tray_uuid": self.tray_uuid, "k": self.k, "n": self.n, "cali_idx": self.cali_idx, "total_len": self.total_len, "tray_info_idx": self.tray_info_idx, "tray_time": self.tray_time, "tray_bed_temp": self.tray_bed_temp, "bed_temp_type": self.bed_temp_type, "cols": self.cols, } @dataclass class AMSUnit: """Represents a single AMS (Automatic Material System) unit""" ams_id: str = "" unit_id: str = "" humidity: int = -1 humidity_raw: int = -1 temp: float = 0.0 dry_time: int = 0 chip_id: str = "" info: str = "" trays: List[FilamentTray] = field(default_factory=list) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "AMSUnit": """Create AMSUnit from MQTT ams data""" trays = [FilamentTray.from_dict(t) for t in data.get("tray", [])] return cls( ams_id=data.get("ams_id", ""), unit_id=str(data.get("id", "")), humidity=int(data.get("humidity", -1)), humidity_raw=int(data.get("humidity_raw", -1)), temp=float(data.get("temp", 0.0)), dry_time=data.get("dry_time", 0), chip_id=data.get("chip_id", ""), info=data.get("info", ""), trays=trays, ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for database storage""" return { "ams_id": self.ams_id, "unit_id": self.unit_id, "humidity": self.humidity, "humidity_raw": self.humidity_raw, "temp": self.temp, "dry_time": self.dry_time, "chip_id": self.chip_id, "info": self.info, "trays": [t.to_dict() for t in self.trays], } @dataclass class AMSState: """Complete AMS system state including all units""" ams_exist_bits: str = "" tray_exist_bits: str = "" tray_now: str = "" tray_pre: str = "" tray_tar: str = "" ams_status: int = 0 ams_rfid_status: int = 0 tray_is_bbl_bits: str = "" tray_read_done_bits: str = "" version: int = 0 insert_flag: bool = False power_on_flag: bool = False units: List[AMSUnit] = field(default_factory=list) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "AMSState": """Create AMSState from MQTT ams data""" units = [AMSUnit.from_dict(u) for u in data.get("ams", [])] return cls( ams_exist_bits=data.get("ams_exist_bits", ""), tray_exist_bits=data.get("tray_exist_bits", ""), tray_now=data.get("tray_now", ""), tray_pre=data.get("tray_pre", ""), tray_tar=data.get("tray_tar", ""), ams_status=data.get("ams_status", 0), ams_rfid_status=data.get("ams_rfid_status", 0), tray_is_bbl_bits=data.get("tray_is_bbl_bits", ""), tray_read_done_bits=data.get("tray_read_done_bits", ""), version=int(data.get("version", 0)), insert_flag=bool(data.get("insert_flag", False)), power_on_flag=bool(data.get("power_on_flag", False)), units=units, ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for database storage""" return { "ams_exist_bits": self.ams_exist_bits, "tray_exist_bits": self.tray_exist_bits, "tray_now": self.tray_now, "tray_pre": self.tray_pre, "tray_tar": self.tray_tar, "ams_status": self.ams_status, "ams_rfid_status": self.ams_rfid_status, "tray_is_bbl_bits": self.tray_is_bbl_bits, "tray_read_done_bits": self.tray_read_done_bits, "version": self.version, "insert_flag": self.insert_flag, "power_on_flag": self.power_on_flag, "units": [u.to_dict() for u in self.units], } @property def total_trays(self) -> int: """Total number of trays across all units""" return sum(len(u.trays) for u in self.units) @property def loaded_trays(self) -> List[FilamentTray]: """Get all trays that have filament loaded""" loaded = [] for unit in self.units: for tray in unit.trays: if tray.tray_type: loaded.append(tray) return loaded @dataclass class HotendInfo: """A single hotend reported in `device.nozzle.info[]` (Vortek rack). `raw_id` semantics (confirmed by watching a live "Read All" MQTT capture): 0 = currently mounted on the (swappable) toolhead — the sentinel hides the true bay address until "Read All" resolves it; 1 = the fixed left nozzle on dual-nozzle printers (no RFID chip, always reports sn="N/A"); 16-21 = rack bay address, slot_number = raw_id - 15 (1-6). """ raw_id: int = 0 serial_number: str = "" nozzle_type: str = "" diameter: float = 0.4 fila_id: str = "" color: Optional[str] = None used_time_seconds: int = 0 wear_percent: float = 0.0 stat: int = 0 @classmethod def from_dict(cls, data: Dict[str, Any]) -> "HotendInfo": from .utils import strip_color_padding return cls( raw_id=int(data.get("id", 0)), serial_number=data.get("sn", ""), nozzle_type=data.get("type", ""), diameter=float(data.get("diameter", 0.4)), fila_id=data.get("fila_id", ""), color=strip_color_padding(data.get("color_m")), used_time_seconds=int(data.get("p_t", 0)), wear_percent=round(float(data.get("wear", 0.0)) / 128.0 * 100, 2), stat=int(data.get("stat", 0)), ) @property def is_toolhead(self) -> bool: return self.raw_id == 0 @property def is_empty(self) -> bool: return self.serial_number in ("", "N/A") @property def slot_number(self) -> Optional[int]: if 16 <= self.raw_id <= 21: return self.raw_id - 15 return None def to_dict(self) -> Dict[str, Any]: return { "raw_id": self.raw_id, "serial_number": self.serial_number, "nozzle_type": self.nozzle_type, "diameter": self.diameter, "fila_id": self.fila_id, "color": self.color, "used_time_seconds": self.used_time_seconds, "wear_percent": self.wear_percent, "stat": self.stat, "is_toolhead": self.is_toolhead, "is_empty": self.is_empty, "slot_number": self.slot_number, } @dataclass class PrinterState: """Complete printer state parsed from MQTT data""" timestamp: str = "" sequence_id: str = "" # Temperature info nozzle_temp: float = 0.0 nozzle_target_temp: float = 0.0 bed_temp: float = 0.0 bed_target_temp: float = 0.0 chamber_temp: float = 0.0 # Print progress gcode_state: str = "" print_percent: int = 0 remaining_time_min: int = 0 layer_num: int = 0 total_layer_num: int = 0 print_line_number: int = 0 # Current job info gcode_file: str = "" subtask_name: str = "" subtask_id: str = "" task_id: str = "" project_id: str = "" profile_id: str = "" print_type: str = "" # Fan speeds fan_gear: int = 0 cooling_fan_speed: int = 0 heatbreak_fan_speed: int = 0 # WiFi / Network wifi_signal: str = "" wifi_signal_dbm: int = 0 # Nozzle info — single-nozzle / right-side back-compat fields. nozzle_diameter: float = 0.4 nozzle_type: str = "" # H2C dual-nozzle: left-side fields (None on single-nozzle printers). nozzle_temp_left: Optional[float] = None nozzle_target_temp_left: Optional[float] = None nozzle_diameter_left: Optional[float] = None nozzle_type_left: Optional[str] = None # System status home_flag: int = 0 hw_switch_state: int = 0 mc_print_stage: str = "" mc_print_sub_stage: int = 0 print_error: int = 0 stg_cur: int = 0 # AMS state ams: Optional[AMSState] = None # Upgrade state upgrade_state: Dict[str, Any] = field(default_factory=dict) # Version info version: Dict[str, Any] = field(default_factory=dict) # Camera / Timelapse ipcam: Dict[str, Any] = field(default_factory=dict) timelapse: Dict[str, Any] = field(default_factory=dict) # Lights lights_report: List[Dict[str, Any]] = field(default_factory=list) # HMS (Health Management System) messages hms: List[Dict[str, Any]] = field(default_factory=list) # Speed settings spd_lvl: int = 0 spd_mag: int = 0 # Auxiliary fans big_fan1_speed: int = 0 big_fan2_speed: int = 0 # System info sdcard: bool = False gcode_file_prepare_percent: str = "" lifecycle: str = "" # External spool (virtual tray) vt_tray: Optional[Dict[str, Any]] = None # Vortek hotend rack (device.nozzle.info[]) hotends: List[HotendInfo] = field(default_factory=list) # Raw data for any additional fields _raw_data: Dict[str, Any] = field(default_factory=dict, repr=False) @staticmethod def _parse_wifi_signal(signal_str: str) -> int: """Parse WiFi signal string (e.g., '-34dBm') to integer dBm""" if not signal_str: return 0 try: return int(signal_str.replace("dBm", "")) except (ValueError, AttributeError): return 0 @classmethod def from_mqtt_data(cls, data: Dict[str, Any], timestamp: Optional[str] = None) -> "PrinterState": """Create PrinterState from MQTT push_status data.""" if timestamp is None: timestamp = datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat() print_data = data.get("print", {}) # Parse AMS data if present ams = None if "ams" in print_data: ams = AMSState.from_dict(print_data["ams"]) wifi_signal = print_data.get("wifi_signal", "") # H2C dual-nozzle decoding. The H2C reports per-extruder temperatures # under `print.device.extruder.info[]` as a 2-element array (index 0 = # right, index 1 = left). The `temp` field is bit-packed: # `temp_raw = (target << 16) | current`, both °C as ints. nozzle_temp_left = None nozzle_target_temp_left = None device = print_data.get("device") or {} extruders = (device.get("extruder") or {}).get("info") or [] if len(extruders) >= 2: left = extruders[1] t = left.get("temp") if isinstance(t, int): nozzle_target_temp_left = float((t >> 16) & 0xFFFF) nozzle_temp_left = float(t & 0xFFFF) # Vortek hotend rack: device.nozzle.info[] — one entry per hotend. hotends = [ HotendInfo.from_dict(h) for h in (device.get("nozzle") or {}).get("info") or [] ] return cls( timestamp=timestamp, sequence_id=str(print_data.get("sequence_id", "")), nozzle_temp=float(print_data.get("nozzle_temper", 0.0)), nozzle_target_temp=float(print_data.get("nozzle_target_temper", 0.0)), bed_temp=float(print_data.get("bed_temper", 0.0)), bed_target_temp=float(print_data.get("bed_target_temper", 0.0)), chamber_temp=float(print_data.get("chamber_temper", 0.0)), gcode_state=print_data.get("gcode_state", ""), print_percent=int(print_data.get("mc_percent", 0)), remaining_time_min=int(print_data.get("mc_remaining_time", 0)), layer_num=int(print_data.get("layer_num", 0)), total_layer_num=int(print_data.get("total_layer_num", 0)), print_line_number=int(print_data.get("mc_print_line_number", 0)), gcode_file=print_data.get("gcode_file", ""), subtask_name=print_data.get("subtask_name", ""), subtask_id=print_data.get("subtask_id", ""), task_id=print_data.get("task_id", ""), project_id=print_data.get("project_id", ""), profile_id=print_data.get("profile_id", ""), print_type=print_data.get("print_type", ""), fan_gear=int(print_data.get("fan_gear", 0)), cooling_fan_speed=int(print_data.get("cooling_fan_speed", 0)), heatbreak_fan_speed=int(print_data.get("heatbreak_fan_speed", 0)), wifi_signal=wifi_signal, wifi_signal_dbm=cls._parse_wifi_signal(wifi_signal), nozzle_diameter=float(print_data.get("nozzle_diameter", 0.4)), nozzle_type=print_data.get("nozzle_type", ""), nozzle_temp_left=nozzle_temp_left, nozzle_target_temp_left=nozzle_target_temp_left, # Diameter/type per side: H2C currently uses uniform nozzles, so reuse top-level # values. If a future probe shows per-side diameter/type variance, plumb it from # `device.nozzle.info[]` cross-referenced against `device.extruder.info[i].id`. nozzle_diameter_left=float(print_data.get("nozzle_diameter", 0.4)) if nozzle_temp_left is not None else None, nozzle_type_left=print_data.get("nozzle_type", "") if nozzle_temp_left is not None else None, home_flag=int(print_data.get("home_flag", 0)), hw_switch_state=int(print_data.get("hw_switch_state", 0)), mc_print_stage=str(print_data.get("mc_print_stage", "")), mc_print_sub_stage=int(print_data.get("mc_print_sub_stage", 0)), print_error=int(print_data.get("print_error", 0)), stg_cur=int(print_data.get("stg_cur", 0)), ams=ams, upgrade_state=print_data.get("upgrade_state", {}), version=print_data.get("version", {}), ipcam=print_data.get("ipcam", {}), timelapse=print_data.get("timelapse", {}), lights_report=print_data.get("lights_report", []), hms=print_data.get("hms", []), spd_lvl=int(print_data.get("spd_lvl", 0)), spd_mag=int(print_data.get("spd_mag", 0)), big_fan1_speed=int(print_data.get("big_fan1_speed", 0)), big_fan2_speed=int(print_data.get("big_fan2_speed", 0)), sdcard=bool(print_data.get("sdcard", False)), gcode_file_prepare_percent=str(print_data.get("gcode_file_prepare_percent", "")), lifecycle=print_data.get("lifecycle", ""), vt_tray=print_data.get("vt_tray"), hotends=hotends, _raw_data=data, ) def get_snapshot(self) -> Dict[str, Any]: """Get a simplified snapshot for database logging.""" snapshot = { "timestamp": self.timestamp, "nozzle_temp": round(self.nozzle_temp, 2), "nozzle_target_temp": round(self.nozzle_target_temp, 2), "bed_temp": round(self.bed_temp, 2), "bed_target_temp": round(self.bed_target_temp, 2), "chamber_temp": round(self.chamber_temp, 2), "nozzle_diameter": self.nozzle_diameter, "nozzle_type": self.nozzle_type, "nozzle_temp_left": ( round(self.nozzle_temp_left, 2) if self.nozzle_temp_left is not None else None ), "nozzle_target_temp_left": ( round(self.nozzle_target_temp_left, 2) if self.nozzle_target_temp_left is not None else None ), "nozzle_diameter_left": self.nozzle_diameter_left, "nozzle_type_left": self.nozzle_type_left, "gcode_state": self.gcode_state, "print_type": self.print_type, "print_percent": self.print_percent, "remaining_time_min": self.remaining_time_min, "layer_num": self.layer_num, "total_layer_num": self.total_layer_num, "print_line_number": self.print_line_number, "subtask_name": self.subtask_name, "gcode_file": self.gcode_file, "task_id": self.task_id, "project_id": self.project_id, "cooling_fan_speed": self.cooling_fan_speed, "heatbreak_fan_speed": self.heatbreak_fan_speed, "big_fan1_speed": self.big_fan1_speed, "big_fan2_speed": self.big_fan2_speed, "spd_lvl": self.spd_lvl, "spd_mag": self.spd_mag, "wifi_signal_dbm": self.wifi_signal_dbm, "print_error": self.print_error, "has_errors": self.print_error != 0, # Full `print.device` payload, unfiltered. H2C's Vortek rack (6 swappable # hotends + 1 fixed left nozzle) isn't fully modeled yet — stash everything # here so no data is lost once the real Vortek MQTT schema is confirmed. "vortek_raw": self._raw_data.get("print", {}).get("device", {}), "hotends": [h.to_dict() for h in self.hotends], "hms": self.hms, "stg_cur": self.stg_cur, "lights_report": self.lights_report, "chamber_light": self._get_chamber_light_status(), "ipcam_record": self.ipcam.get("ipcam_record", ""), "timelapse": self.ipcam.get("timelapse", ""), "sdcard": self.sdcard, "gcode_file_prepare_percent": self.gcode_file_prepare_percent, "lifecycle": self.lifecycle, } if self.ams: snapshot["ams_unit_count"] = len(self.ams.units) snapshot["ams_status"] = self.ams.ams_status snapshot["ams_rfid_status"] = self.ams.ams_rfid_status snapshot["ams_exist_bits"] = self.ams.ams_exist_bits snapshot["tray_exist_bits"] = self.ams.tray_exist_bits snapshot["tray_is_bbl_bits"] = self.ams.tray_is_bbl_bits snapshot["tray_read_done_bits"] = self.ams.tray_read_done_bits snapshot["tray_now"] = self.ams.tray_now snapshot["ams_version"] = self.ams.version from .models import ams_type_from_info filaments = [] for unit in self.ams.units: # `unit_id` is the AMS unit's own id from the MQTT payload — for the # original AMS / AMS 2 Pro it's a small int (0,1,2,...); for AMS HT # it has the 0x80 bit set (e.g. 128). Don't compute tray_id // 4 — # multi-AMS-type setups are not contiguous. try: unit_id_int = int(unit.unit_id) except (TypeError, ValueError): unit_id_int = None ams_type_label = ams_type_from_info(unit.info) for tray in unit.trays: if tray.tray_type: filaments.append({ "tray_id": tray.tray_id, "slot": tray.tray_id_name, "type": tray.tray_type, "sub_type": tray.tray_sub_brands, "color": tray.tray_color, "remain_percent": tray.remain_percent, "tray_weight": tray.tray_weight, "tray_diameter": tray.tray_diameter, "nozzle_temp_min": tray.nozzle_temp_min, "nozzle_temp_max": tray.nozzle_temp_max, "tag_uid": tray.tag_uid, "state": tray.state, "tray_uuid": tray.tray_uuid, "k": tray.k, "n": tray.n, "cali_idx": tray.cali_idx, "total_len": tray.total_len, "tray_info_idx": tray.tray_info_idx, "tray_time": tray.tray_time, "tray_bed_temp": tray.tray_bed_temp, "bed_temp_type": tray.bed_temp_type, "cols": tray.cols, "ams_unit_id": unit_id_int, "ams_info": unit.info, "ams_type": ams_type_label, }) snapshot["filaments"] = filaments ams_units = [] for unit in self.ams.units: ams_units.append({ "unit_id": unit.unit_id, "ams_id": unit.ams_id, "chip_id": unit.chip_id, "info": unit.info, "ams_type": ams_type_from_info(unit.info), "humidity": unit.humidity, "humidity_raw": unit.humidity_raw, "temp": unit.temp, "dry_time": unit.dry_time, }) snapshot["ams_units"] = ams_units if self.ams.units: snapshot["ams_humidity"] = self.ams.units[0].humidity snapshot["ams_humidity_raw"] = self.ams.units[0].humidity_raw snapshot["ams_temp"] = self.ams.units[0].temp if self.vt_tray: snapshot["external_spool"] = { "type": self.vt_tray.get("tray_type", ""), "color": self.vt_tray.get("tray_color", ""), "remain": self.vt_tray.get("remain", 0), } return snapshot def _get_chamber_light_status(self) -> str: """Extract chamber light status from lights_report""" for light in self.lights_report: if light.get("node") == "chamber_light": return light.get("mode", "unknown") return "unknown" @property def is_printing(self) -> bool: return self.gcode_state.upper() in ("RUNNING", "PRINTING") @property def is_idle(self) -> bool: return self.gcode_state.upper() in ("IDLE", "FINISH", "") @property def is_paused(self) -> bool: return self.gcode_state.upper() == "PAUSE" class PrinterStateAccumulator: """ Accumulates MQTT updates into a complete printer state. BambuLab MQTT sends incremental updates - each message may only contain a subset of fields that have changed. This class maintains the complete state by merging updates. """ def __init__(self): self._state_data: Dict[str, Any] = {"print": {}} self._last_update: Optional[str] = None self._update_count: int = 0 def update(self, data: Dict[str, Any]) -> PrinterState: """Merge new MQTT data into accumulated state and return complete PrinterState.""" timestamp = datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat() self._last_update = timestamp self._update_count += 1 if "print" in data: self._deep_merge(self._state_data["print"], data["print"]) return PrinterState.from_mqtt_data(self._state_data, timestamp) def _deep_merge(self, base: Dict, update: Dict) -> None: """Recursively merge update into base dict""" for key, value in update.items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): self._deep_merge(base[key], value) else: base[key] = value def get_state(self) -> PrinterState: """Get current accumulated state without updating""" timestamp = self._last_update or datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat() return PrinterState.from_mqtt_data(self._state_data, timestamp) def reset(self) -> None: """Reset accumulated state""" self._state_data = {"print": {}} self._last_update = None self._update_count = 0 @property def update_count(self) -> int: return self._update_count @property def last_update(self) -> Optional[str]: return self._last_update class BambuPrinter: """ High-level interface for BambuLab printer monitoring. Combines authentication, client, and MQTT into a single interface. """ def __init__( self, username: Optional[str] = None, password: Optional[str] = None, token: Optional[str] = None, device_id: Optional[str] = None, on_update: Optional[Callable[[PrinterState], None]] = None, silent: bool = True, verification_timeout: int = 300, ): self.username = username or os.getenv("BAMBU_USERNAME") self.password = password or os.getenv("BAMBU_PASSWORD") self._token = token or os.getenv("BAMBU_TOKEN") self._device_id = device_id or os.getenv("BAMBU_DEVICE_ID") self._uid: Optional[str] = None self._on_update = on_update self._silent = silent self._verification_timeout = verification_timeout self._client: Optional[BambuClient] = None self._mqtt: Optional[MQTTClient] = None self._accumulator = PrinterStateAccumulator() self._connected = False self._devices: List[Dict[str, Any]] = [] def _get_fresh_token(self, verification_code_timeout: int = 300) -> str: """Get a fresh token using credentials.""" if not self.username or not self.password: raise ValueError( "Username and password required for token refresh. Provide as arguments " "or set BAMBU_USERNAME and BAMBU_PASSWORD environment variables." ) print("\n" + "=" * 60) print("BambuLab Authentication") print("=" * 60) print(f"Authenticating as: {self.username}") print() print(">>> ACTION MAY BE REQUIRED <<<") print("Bambu Lab will send a 6-digit verification code to your") print("registered email. Watch this terminal — if a prompt") print(f"appears below, enter the code and press Enter.") print(f"(You have {verification_code_timeout} seconds to respond.)") print("=" * 60) print() auth = BambuAuthenticator() try: # Always show stdout during auth — suppress_stdout would hide # interactive prompts from the library (e.g. verification code input). token = auth.get_or_create_token( username=self.username, password=self.password ) self._token = token print("Authentication successful!") print(f"Token: {token}") print("=" * 60 + "\n") logger.info("BambuLab token obtained successfully") return token except Exception as e: error_msg = str(e).lower() if "verification" in error_msg or "code" in error_msg or "2fa" in error_msg: print("\n" + "-" * 60) print("EMAIL VERIFICATION REQUIRED") print("-" * 60) print("A verification code has been sent to your email.") print(f"You have {verification_code_timeout} seconds to enter it.") print() try: code = timed_input( "Enter verification code: ", timeout_sec=verification_code_timeout ) if not code: raise ValueError("No verification code entered") print("Verifying code...") token = auth.login( self.username, self.password, verification_code=code ) self._token = token print("\nAuthentication successful!") print(f"Token: {token[:20]}...{token[-10:]}") print("=" * 60 + "\n") print("TIP: Save this token to BAMBU_TOKEN env var to skip login next time") logger.info("BambuLab token obtained with 2FA verification") return token except TimeoutError: print("\nVerification timed out!") raise TimeoutError( f"Verification code not entered within {verification_code_timeout} seconds" ) else: print(f"\nAuthentication failed: {e}") raise def _ensure_token(self) -> str: """Ensure we have a valid token, refreshing if needed""" if self._token: logger.debug("Using existing token") return self._token print("\n" + "!" * 60) print("NO TOKEN FOUND") print("!" * 60) print("Checked:") print(" - Constructor 'token' parameter: Not provided") print(" - Environment variable 'BAMBU_TOKEN': Not set") print() print("Will attempt to authenticate with username/password...") print("!" * 60 + "\n") return self._get_fresh_token(verification_code_timeout=self._verification_timeout) def _on_mqtt_message(self, device_id: str, data: Dict[str, Any]) -> None: """Internal MQTT message handler""" if not data: return state = self._accumulator.update(data) if self._on_update: self._on_update(state) def connect(self, blocking: bool = False, retry_on_auth_error: bool = True) -> None: """Connect to printer via MQTT.""" token = self._ensure_token() try: self._client = BambuClient(token=token) user_info = self._client.get_user_info() self._uid = str(user_info.get("uid", "")) if not self._device_id: self._devices = self._client.get_devices() if not self._devices: raise RuntimeError("No devices found on this account") self._device_id = self._devices[0].get("dev_id") self._mqtt = MQTTClient( self._uid, token, self._device_id, on_message=self._on_mqtt_message ) self._mqtt.connect(blocking=blocking) self._connected = True logger.info(f"Connected to BambuLab printer: {self._device_id}") except Exception as e: error_msg = str(e).lower() is_auth_error = any(x in error_msg for x in ["401", "unauthorized", "token", "auth", "expired"]) if is_auth_error and retry_on_auth_error and self.username and self.password: logger.warning("Auth error detected, refreshing token and retrying...") self._token = None self._get_fresh_token() self.connect(blocking=blocking, retry_on_auth_error=False) else: raise def reconnect(self, blocking: bool = False) -> None: """Disconnect and reconnect.""" self.disconnect() self._accumulator.reset() self.connect(blocking=blocking) def disconnect(self) -> None: """Disconnect from MQTT""" if self._mqtt: try: self._mqtt.disconnect() except Exception: pass self._connected = False logger.debug("Disconnected from BambuLab printer") def get_state(self) -> PrinterState: """Get current accumulated printer state""" return self._accumulator.get_state() def get_snapshot(self) -> Dict[str, Any]: """Get simplified snapshot for database logging""" return self._accumulator.get_state().get_snapshot() @property def device_id(self) -> Optional[str]: return self._device_id @property def devices(self) -> List[Dict[str, Any]]: return self._devices @property def is_connected(self) -> bool: return self._connected def __enter__(self): self.connect(blocking=False) return self def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect() __all__ = [ "BambuAuthenticator", "BambuClient", "MQTTClient", "FilamentTray", "AMSUnit", "AMSState", "PrinterState", "PrinterStateAccumulator", "BambuPrinter", ]