Files
Bambu-Run/bambu_run/mqtt_client.py

878 lines
31 KiB
Python

"""
BambuLab Cloud API Client
Provides authentication, device management, and real-time MQTT monitoring
for BambuLab 3D printers via the Cloud API.
Requires: pip install bambu-lab-cloud-api
Usage:
from bambu_run.mqtt_client import BambuPrinter, PrinterState
printer = BambuPrinter(token="your_token", device_id="your_device_id")
printer.connect()
state = printer.get_state()
snapshot = printer.get_snapshot()
printer.disconnect()
"""
import io
import logging
import os
import platform
import sys
import select
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
from zoneinfo import ZoneInfo
from .conf import app_settings
# Re-export from bambu-lab-cloud-api package
try:
from bambulab import BambuAuthenticator, BambuClient, MQTTClient
except ImportError as e:
raise ImportError(
"bambu-lab-cloud-api package is required. Install with: pip install bambu-lab-cloud-api"
) from e
logger = logging.getLogger(__name__)
@contextmanager
def suppress_stdout():
"""Context manager to suppress stdout (for silencing library print statements)"""
old_stdout = sys.stdout
sys.stdout = io.StringIO()
try:
yield
finally:
sys.stdout = old_stdout
def timed_input(prompt: str, timeout_sec: int = 300) -> str:
"""
Get user input with a timeout.
Args:
prompt: The prompt to display
timeout_sec: Timeout in seconds (default 300 = 5 minutes)
Returns:
User input string
Raises:
TimeoutError: If no input received within timeout
"""
print(prompt, end='', flush=True)
if platform.system() == 'Windows':
import threading
result = {'value': None, 'done': False}
def get_input():
try:
result['value'] = input()
except EOFError:
result['value'] = None
result['done'] = True
thread = threading.Thread(target=get_input, daemon=True)
thread.start()
thread.join(timeout=timeout_sec)
if not result['done']:
print()
raise TimeoutError(f"No input received within {timeout_sec} seconds")
return result['value'] or ""
else:
ready, _, _ = select.select([sys.stdin], [], [], timeout_sec)
if ready:
return sys.stdin.readline().strip()
else:
print()
raise TimeoutError(f"No input received within {timeout_sec} seconds")
@dataclass
class FilamentTray:
"""Represents a single filament tray in an AMS unit"""
tray_id: str = ""
tray_id_name: str = ""
tray_type: str = ""
tray_sub_brands: str = ""
tray_color: str = ""
remain_percent: int = -1
tray_weight: int = 0
tray_diameter: float = 1.75
tray_temp: int = 0
nozzle_temp_min: int = 0
nozzle_temp_max: int = 0
state: int = 0
tag_uid: str = ""
tray_uuid: str = ""
k: float = 0.0
n: float = 0.0
cali_idx: int = -1
total_len: int = 0
tray_info_idx: str = ""
tray_time: int = 0
tray_bed_temp: int = 0
bed_temp_type: int = 0
cols: List[str] = field(default_factory=list)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "FilamentTray":
"""Create FilamentTray from MQTT tray data"""
return cls(
tray_id=str(data.get("id", "")),
tray_id_name=data.get("tray_id_name", ""),
tray_type=data.get("tray_type", ""),
tray_sub_brands=data.get("tray_sub_brands", ""),
tray_color=data.get("tray_color", ""),
remain_percent=data.get("remain", -1),
tray_weight=int(data.get("tray_weight", 0)),
tray_diameter=float(data.get("tray_diameter", 1.75)),
tray_temp=int(data.get("tray_temp", 0)),
nozzle_temp_min=int(data.get("nozzle_temp_min", 0)),
nozzle_temp_max=int(data.get("nozzle_temp_max", 0)),
state=data.get("state", 0),
tag_uid=data.get("tag_uid", ""),
tray_uuid=data.get("tray_uuid", ""),
k=float(data.get("k", 0.0)),
n=float(data.get("n", 0.0)),
cali_idx=int(data.get("cali_idx", -1)),
total_len=int(data.get("total_len", 0)),
tray_info_idx=data.get("tray_info_idx", ""),
tray_time=int(data.get("tray_time", 0)),
tray_bed_temp=int(data.get("bed_temp", 0)),
bed_temp_type=int(data.get("bed_temp_type", 0)),
cols=data.get("cols", []),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for database storage"""
return {
"tray_id": self.tray_id,
"tray_id_name": self.tray_id_name,
"tray_type": self.tray_type,
"tray_sub_brands": self.tray_sub_brands,
"tray_color": self.tray_color,
"remain_percent": self.remain_percent,
"tray_weight": self.tray_weight,
"tray_diameter": self.tray_diameter,
"tray_temp": self.tray_temp,
"nozzle_temp_min": self.nozzle_temp_min,
"nozzle_temp_max": self.nozzle_temp_max,
"state": self.state,
"tag_uid": self.tag_uid,
"tray_uuid": self.tray_uuid,
"k": self.k,
"n": self.n,
"cali_idx": self.cali_idx,
"total_len": self.total_len,
"tray_info_idx": self.tray_info_idx,
"tray_time": self.tray_time,
"tray_bed_temp": self.tray_bed_temp,
"bed_temp_type": self.bed_temp_type,
"cols": self.cols,
}
@dataclass
class AMSUnit:
"""Represents a single AMS (Automatic Material System) unit"""
ams_id: str = ""
unit_id: str = ""
humidity: int = -1
humidity_raw: int = -1
temp: float = 0.0
dry_time: int = 0
chip_id: str = ""
info: str = ""
trays: List[FilamentTray] = field(default_factory=list)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AMSUnit":
"""Create AMSUnit from MQTT ams data"""
trays = [FilamentTray.from_dict(t) for t in data.get("tray", [])]
return cls(
ams_id=data.get("ams_id", ""),
unit_id=str(data.get("id", "")),
humidity=int(data.get("humidity", -1)),
humidity_raw=int(data.get("humidity_raw", -1)),
temp=float(data.get("temp", 0.0)),
dry_time=data.get("dry_time", 0),
chip_id=data.get("chip_id", ""),
info=data.get("info", ""),
trays=trays,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for database storage"""
return {
"ams_id": self.ams_id,
"unit_id": self.unit_id,
"humidity": self.humidity,
"humidity_raw": self.humidity_raw,
"temp": self.temp,
"dry_time": self.dry_time,
"chip_id": self.chip_id,
"info": self.info,
"trays": [t.to_dict() for t in self.trays],
}
@dataclass
class AMSState:
"""Complete AMS system state including all units"""
ams_exist_bits: str = ""
tray_exist_bits: str = ""
tray_now: str = ""
tray_pre: str = ""
tray_tar: str = ""
ams_status: int = 0
ams_rfid_status: int = 0
tray_is_bbl_bits: str = ""
tray_read_done_bits: str = ""
version: int = 0
insert_flag: bool = False
power_on_flag: bool = False
units: List[AMSUnit] = field(default_factory=list)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AMSState":
"""Create AMSState from MQTT ams data"""
units = [AMSUnit.from_dict(u) for u in data.get("ams", [])]
return cls(
ams_exist_bits=data.get("ams_exist_bits", ""),
tray_exist_bits=data.get("tray_exist_bits", ""),
tray_now=data.get("tray_now", ""),
tray_pre=data.get("tray_pre", ""),
tray_tar=data.get("tray_tar", ""),
ams_status=data.get("ams_status", 0),
ams_rfid_status=data.get("ams_rfid_status", 0),
tray_is_bbl_bits=data.get("tray_is_bbl_bits", ""),
tray_read_done_bits=data.get("tray_read_done_bits", ""),
version=int(data.get("version", 0)),
insert_flag=bool(data.get("insert_flag", False)),
power_on_flag=bool(data.get("power_on_flag", False)),
units=units,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for database storage"""
return {
"ams_exist_bits": self.ams_exist_bits,
"tray_exist_bits": self.tray_exist_bits,
"tray_now": self.tray_now,
"tray_pre": self.tray_pre,
"tray_tar": self.tray_tar,
"ams_status": self.ams_status,
"ams_rfid_status": self.ams_rfid_status,
"tray_is_bbl_bits": self.tray_is_bbl_bits,
"tray_read_done_bits": self.tray_read_done_bits,
"version": self.version,
"insert_flag": self.insert_flag,
"power_on_flag": self.power_on_flag,
"units": [u.to_dict() for u in self.units],
}
@property
def total_trays(self) -> int:
"""Total number of trays across all units"""
return sum(len(u.trays) for u in self.units)
@property
def loaded_trays(self) -> List[FilamentTray]:
"""Get all trays that have filament loaded"""
loaded = []
for unit in self.units:
for tray in unit.trays:
if tray.tray_type:
loaded.append(tray)
return loaded
@dataclass
class PrinterState:
"""Complete printer state parsed from MQTT data"""
timestamp: str = ""
sequence_id: str = ""
# Temperature info
nozzle_temp: float = 0.0
nozzle_target_temp: float = 0.0
bed_temp: float = 0.0
bed_target_temp: float = 0.0
chamber_temp: float = 0.0
# Print progress
gcode_state: str = ""
print_percent: int = 0
remaining_time_min: int = 0
layer_num: int = 0
total_layer_num: int = 0
print_line_number: int = 0
# Current job info
gcode_file: str = ""
subtask_name: str = ""
subtask_id: str = ""
task_id: str = ""
project_id: str = ""
profile_id: str = ""
print_type: str = ""
# Fan speeds
fan_gear: int = 0
cooling_fan_speed: int = 0
heatbreak_fan_speed: int = 0
# WiFi / Network
wifi_signal: str = ""
wifi_signal_dbm: int = 0
# Nozzle info
nozzle_diameter: float = 0.4
nozzle_type: str = ""
# System status
home_flag: int = 0
hw_switch_state: int = 0
mc_print_stage: str = ""
mc_print_sub_stage: int = 0
print_error: int = 0
stg_cur: int = 0
# AMS state
ams: Optional[AMSState] = None
# Upgrade state
upgrade_state: Dict[str, Any] = field(default_factory=dict)
# Version info
version: Dict[str, Any] = field(default_factory=dict)
# Camera / Timelapse
ipcam: Dict[str, Any] = field(default_factory=dict)
timelapse: Dict[str, Any] = field(default_factory=dict)
# Lights
lights_report: List[Dict[str, Any]] = field(default_factory=list)
# HMS (Health Management System) messages
hms: List[Dict[str, Any]] = field(default_factory=list)
# Speed settings
spd_lvl: int = 0
spd_mag: int = 0
# Auxiliary fans
big_fan1_speed: int = 0
big_fan2_speed: int = 0
# System info
sdcard: bool = False
gcode_file_prepare_percent: str = ""
lifecycle: str = ""
# External spool (virtual tray)
vt_tray: Optional[Dict[str, Any]] = None
# Raw data for any additional fields
_raw_data: Dict[str, Any] = field(default_factory=dict, repr=False)
@staticmethod
def _parse_wifi_signal(signal_str: str) -> int:
"""Parse WiFi signal string (e.g., '-34dBm') to integer dBm"""
if not signal_str:
return 0
try:
return int(signal_str.replace("dBm", ""))
except (ValueError, AttributeError):
return 0
@classmethod
def from_mqtt_data(cls, data: Dict[str, Any], timestamp: Optional[str] = None) -> "PrinterState":
"""Create PrinterState from MQTT push_status data."""
if timestamp is None:
timestamp = datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat()
print_data = data.get("print", {})
# Parse AMS data if present
ams = None
if "ams" in print_data:
ams = AMSState.from_dict(print_data["ams"])
wifi_signal = print_data.get("wifi_signal", "")
return cls(
timestamp=timestamp,
sequence_id=str(print_data.get("sequence_id", "")),
nozzle_temp=float(print_data.get("nozzle_temper", 0.0)),
nozzle_target_temp=float(print_data.get("nozzle_target_temper", 0.0)),
bed_temp=float(print_data.get("bed_temper", 0.0)),
bed_target_temp=float(print_data.get("bed_target_temper", 0.0)),
chamber_temp=float(print_data.get("chamber_temper", 0.0)),
gcode_state=print_data.get("gcode_state", ""),
print_percent=int(print_data.get("mc_percent", 0)),
remaining_time_min=int(print_data.get("mc_remaining_time", 0)),
layer_num=int(print_data.get("layer_num", 0)),
total_layer_num=int(print_data.get("total_layer_num", 0)),
print_line_number=int(print_data.get("mc_print_line_number", 0)),
gcode_file=print_data.get("gcode_file", ""),
subtask_name=print_data.get("subtask_name", ""),
subtask_id=print_data.get("subtask_id", ""),
task_id=print_data.get("task_id", ""),
project_id=print_data.get("project_id", ""),
profile_id=print_data.get("profile_id", ""),
print_type=print_data.get("print_type", ""),
fan_gear=int(print_data.get("fan_gear", 0)),
cooling_fan_speed=int(print_data.get("cooling_fan_speed", 0)),
heatbreak_fan_speed=int(print_data.get("heatbreak_fan_speed", 0)),
wifi_signal=wifi_signal,
wifi_signal_dbm=cls._parse_wifi_signal(wifi_signal),
nozzle_diameter=float(print_data.get("nozzle_diameter", 0.4)),
nozzle_type=print_data.get("nozzle_type", ""),
home_flag=int(print_data.get("home_flag", 0)),
hw_switch_state=int(print_data.get("hw_switch_state", 0)),
mc_print_stage=str(print_data.get("mc_print_stage", "")),
mc_print_sub_stage=int(print_data.get("mc_print_sub_stage", 0)),
print_error=int(print_data.get("print_error", 0)),
stg_cur=int(print_data.get("stg_cur", 0)),
ams=ams,
upgrade_state=print_data.get("upgrade_state", {}),
version=print_data.get("version", {}),
ipcam=print_data.get("ipcam", {}),
timelapse=print_data.get("timelapse", {}),
lights_report=print_data.get("lights_report", []),
hms=print_data.get("hms", []),
spd_lvl=int(print_data.get("spd_lvl", 0)),
spd_mag=int(print_data.get("spd_mag", 0)),
big_fan1_speed=int(print_data.get("big_fan1_speed", 0)),
big_fan2_speed=int(print_data.get("big_fan2_speed", 0)),
sdcard=bool(print_data.get("sdcard", False)),
gcode_file_prepare_percent=str(print_data.get("gcode_file_prepare_percent", "")),
lifecycle=print_data.get("lifecycle", ""),
vt_tray=print_data.get("vt_tray"),
_raw_data=data,
)
def get_snapshot(self) -> Dict[str, Any]:
"""Get a simplified snapshot for database logging."""
snapshot = {
"timestamp": self.timestamp,
"nozzle_temp": round(self.nozzle_temp, 2),
"nozzle_target_temp": round(self.nozzle_target_temp, 2),
"bed_temp": round(self.bed_temp, 2),
"bed_target_temp": round(self.bed_target_temp, 2),
"chamber_temp": round(self.chamber_temp, 2),
"nozzle_diameter": self.nozzle_diameter,
"nozzle_type": self.nozzle_type,
"gcode_state": self.gcode_state,
"print_type": self.print_type,
"print_percent": self.print_percent,
"remaining_time_min": self.remaining_time_min,
"layer_num": self.layer_num,
"total_layer_num": self.total_layer_num,
"print_line_number": self.print_line_number,
"subtask_name": self.subtask_name,
"gcode_file": self.gcode_file,
"cooling_fan_speed": self.cooling_fan_speed,
"heatbreak_fan_speed": self.heatbreak_fan_speed,
"big_fan1_speed": self.big_fan1_speed,
"big_fan2_speed": self.big_fan2_speed,
"spd_lvl": self.spd_lvl,
"spd_mag": self.spd_mag,
"wifi_signal_dbm": self.wifi_signal_dbm,
"print_error": self.print_error,
"has_errors": self.print_error != 0,
"hms": self.hms,
"stg_cur": self.stg_cur,
"lights_report": self.lights_report,
"chamber_light": self._get_chamber_light_status(),
"ipcam_record": self.ipcam.get("ipcam_record", ""),
"timelapse": self.ipcam.get("timelapse", ""),
"sdcard": self.sdcard,
"gcode_file_prepare_percent": self.gcode_file_prepare_percent,
"lifecycle": self.lifecycle,
}
if self.ams:
snapshot["ams_unit_count"] = len(self.ams.units)
snapshot["ams_status"] = self.ams.ams_status
snapshot["ams_rfid_status"] = self.ams.ams_rfid_status
snapshot["ams_exist_bits"] = self.ams.ams_exist_bits
snapshot["tray_exist_bits"] = self.ams.tray_exist_bits
snapshot["tray_is_bbl_bits"] = self.ams.tray_is_bbl_bits
snapshot["tray_read_done_bits"] = self.ams.tray_read_done_bits
snapshot["tray_now"] = self.ams.tray_now
snapshot["ams_version"] = self.ams.version
filaments = []
for unit in self.ams.units:
for tray in unit.trays:
if tray.tray_type:
filaments.append({
"tray_id": tray.tray_id,
"slot": tray.tray_id_name,
"type": tray.tray_type,
"sub_type": tray.tray_sub_brands,
"color": tray.tray_color,
"remain_percent": tray.remain_percent,
"tray_weight": tray.tray_weight,
"tray_diameter": tray.tray_diameter,
"nozzle_temp_min": tray.nozzle_temp_min,
"nozzle_temp_max": tray.nozzle_temp_max,
"tag_uid": tray.tag_uid,
"state": tray.state,
"tray_uuid": tray.tray_uuid,
"k": tray.k,
"n": tray.n,
"cali_idx": tray.cali_idx,
"total_len": tray.total_len,
"tray_info_idx": tray.tray_info_idx,
"tray_time": tray.tray_time,
"tray_bed_temp": tray.tray_bed_temp,
"bed_temp_type": tray.bed_temp_type,
"cols": tray.cols,
})
snapshot["filaments"] = filaments
ams_units = []
for unit in self.ams.units:
ams_units.append({
"unit_id": unit.unit_id,
"ams_id": unit.ams_id,
"chip_id": unit.chip_id,
"info": unit.info,
"humidity": unit.humidity,
"humidity_raw": unit.humidity_raw,
"temp": unit.temp,
"dry_time": unit.dry_time,
})
snapshot["ams_units"] = ams_units
if self.ams.units:
snapshot["ams_humidity"] = self.ams.units[0].humidity
snapshot["ams_humidity_raw"] = self.ams.units[0].humidity_raw
snapshot["ams_temp"] = self.ams.units[0].temp
if self.vt_tray:
snapshot["external_spool"] = {
"type": self.vt_tray.get("tray_type", ""),
"color": self.vt_tray.get("tray_color", ""),
"remain": self.vt_tray.get("remain", 0),
}
return snapshot
def _get_chamber_light_status(self) -> str:
"""Extract chamber light status from lights_report"""
for light in self.lights_report:
if light.get("node") == "chamber_light":
return light.get("mode", "unknown")
return "unknown"
@property
def is_printing(self) -> bool:
return self.gcode_state.upper() in ("RUNNING", "PRINTING")
@property
def is_idle(self) -> bool:
return self.gcode_state.upper() in ("IDLE", "FINISH", "")
@property
def is_paused(self) -> bool:
return self.gcode_state.upper() == "PAUSE"
class PrinterStateAccumulator:
"""
Accumulates MQTT updates into a complete printer state.
BambuLab MQTT sends incremental updates - each message may only contain
a subset of fields that have changed. This class maintains the complete
state by merging updates.
"""
def __init__(self):
self._state_data: Dict[str, Any] = {"print": {}}
self._last_update: Optional[str] = None
self._update_count: int = 0
def update(self, data: Dict[str, Any]) -> PrinterState:
"""Merge new MQTT data into accumulated state and return complete PrinterState."""
timestamp = datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat()
self._last_update = timestamp
self._update_count += 1
if "print" in data:
self._deep_merge(self._state_data["print"], data["print"])
return PrinterState.from_mqtt_data(self._state_data, timestamp)
def _deep_merge(self, base: Dict, update: Dict) -> None:
"""Recursively merge update into base dict"""
for key, value in update.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
self._deep_merge(base[key], value)
else:
base[key] = value
def get_state(self) -> PrinterState:
"""Get current accumulated state without updating"""
timestamp = self._last_update or datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat()
return PrinterState.from_mqtt_data(self._state_data, timestamp)
def reset(self) -> None:
"""Reset accumulated state"""
self._state_data = {"print": {}}
self._last_update = None
self._update_count = 0
@property
def update_count(self) -> int:
return self._update_count
@property
def last_update(self) -> Optional[str]:
return self._last_update
class BambuPrinter:
"""
High-level interface for BambuLab printer monitoring.
Combines authentication, client, and MQTT into a single interface.
"""
def __init__(
self,
username: Optional[str] = None,
password: Optional[str] = None,
token: Optional[str] = None,
device_id: Optional[str] = None,
on_update: Optional[Callable[[PrinterState], None]] = None,
silent: bool = True,
verification_timeout: int = 300,
):
self.username = username or os.getenv("BAMBU_USERNAME")
self.password = password or os.getenv("BAMBU_PASSWORD")
self._token = token or os.getenv("BAMBU_TOKEN")
self._device_id = device_id or os.getenv("BAMBU_DEVICE_ID")
self._uid: Optional[str] = None
self._on_update = on_update
self._silent = silent
self._verification_timeout = verification_timeout
self._client: Optional[BambuClient] = None
self._mqtt: Optional[MQTTClient] = None
self._accumulator = PrinterStateAccumulator()
self._connected = False
self._devices: List[Dict[str, Any]] = []
def _get_fresh_token(self, verification_code_timeout: int = 300) -> str:
"""Get a fresh token using credentials."""
if not self.username or not self.password:
raise ValueError(
"Username and password required for token refresh. Provide as arguments "
"or set BAMBU_USERNAME and BAMBU_PASSWORD environment variables."
)
print("\n" + "=" * 60)
print("BambuLab Authentication")
print("=" * 60)
print(f"Authenticating as: {self.username}")
print()
print(">>> ACTION MAY BE REQUIRED <<<")
print("Bambu Lab will send a 6-digit verification code to your")
print("registered email. Watch this terminal — if a prompt")
print(f"appears below, enter the code and press Enter.")
print(f"(You have {verification_code_timeout} seconds to respond.)")
print("=" * 60)
print()
auth = BambuAuthenticator()
try:
# Always show stdout during auth — suppress_stdout would hide
# interactive prompts from the library (e.g. verification code input).
token = auth.get_or_create_token(
username=self.username,
password=self.password
)
self._token = token
print("Authentication successful!")
print(f"Token: {token[:20]}...{token[-10:]}")
print("=" * 60 + "\n")
logger.info("BambuLab token obtained successfully")
return token
except Exception as e:
error_msg = str(e).lower()
if "verification" in error_msg or "code" in error_msg or "2fa" in error_msg:
print("\n" + "-" * 60)
print("EMAIL VERIFICATION REQUIRED")
print("-" * 60)
print("A verification code has been sent to your email.")
print(f"You have {verification_code_timeout} seconds to enter it.")
print()
try:
code = timed_input(
"Enter verification code: ",
timeout_sec=verification_code_timeout
)
if not code:
raise ValueError("No verification code entered")
print("Verifying code...")
token = auth.login(
self.username,
self.password,
verification_code=code
)
self._token = token
print("\nAuthentication successful!")
print(f"Token: {token[:20]}...{token[-10:]}")
print("=" * 60 + "\n")
print("TIP: Save this token to BAMBU_TOKEN env var to skip login next time")
logger.info("BambuLab token obtained with 2FA verification")
return token
except TimeoutError:
print("\nVerification timed out!")
raise TimeoutError(
f"Verification code not entered within {verification_code_timeout} seconds"
)
else:
print(f"\nAuthentication failed: {e}")
raise
def _ensure_token(self) -> str:
"""Ensure we have a valid token, refreshing if needed"""
if self._token:
logger.debug("Using existing token")
return self._token
print("\n" + "!" * 60)
print("NO TOKEN FOUND")
print("!" * 60)
print("Checked:")
print(" - Constructor 'token' parameter: Not provided")
print(" - Environment variable 'BAMBU_TOKEN': Not set")
print()
print("Will attempt to authenticate with username/password...")
print("!" * 60 + "\n")
return self._get_fresh_token(verification_code_timeout=self._verification_timeout)
def _on_mqtt_message(self, device_id: str, data: Dict[str, Any]) -> None:
"""Internal MQTT message handler"""
if not data:
return
state = self._accumulator.update(data)
if self._on_update:
self._on_update(state)
def connect(self, blocking: bool = False, retry_on_auth_error: bool = True) -> None:
"""Connect to printer via MQTT."""
token = self._ensure_token()
try:
self._client = BambuClient(token=token)
user_info = self._client.get_user_info()
self._uid = str(user_info.get("uid", ""))
if not self._device_id:
self._devices = self._client.get_devices()
if not self._devices:
raise RuntimeError("No devices found on this account")
self._device_id = self._devices[0].get("dev_id")
self._mqtt = MQTTClient(
self._uid,
token,
self._device_id,
on_message=self._on_mqtt_message
)
self._mqtt.connect(blocking=blocking)
self._connected = True
logger.info(f"Connected to BambuLab printer: {self._device_id}")
except Exception as e:
error_msg = str(e).lower()
is_auth_error = any(x in error_msg for x in ["401", "unauthorized", "token", "auth", "expired"])
if is_auth_error and retry_on_auth_error and self.username and self.password:
logger.warning("Auth error detected, refreshing token and retrying...")
self._token = None
self._get_fresh_token()
self.connect(blocking=blocking, retry_on_auth_error=False)
else:
raise
def reconnect(self, blocking: bool = False) -> None:
"""Disconnect and reconnect."""
self.disconnect()
self._accumulator.reset()
self.connect(blocking=blocking)
def disconnect(self) -> None:
"""Disconnect from MQTT"""
if self._mqtt:
try:
self._mqtt.disconnect()
except Exception:
pass
self._connected = False
logger.debug("Disconnected from BambuLab printer")
def get_state(self) -> PrinterState:
"""Get current accumulated printer state"""
return self._accumulator.get_state()
def get_snapshot(self) -> Dict[str, Any]:
"""Get simplified snapshot for database logging"""
return self._accumulator.get_state().get_snapshot()
@property
def device_id(self) -> Optional[str]:
return self._device_id
@property
def devices(self) -> List[Dict[str, Any]]:
return self._devices
@property
def is_connected(self) -> bool:
return self._connected
def __enter__(self):
self.connect(blocking=False)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
__all__ = [
"BambuAuthenticator",
"BambuClient",
"MQTTClient",
"FilamentTray",
"AMSUnit",
"AMSState",
"PrinterState",
"PrinterStateAccumulator",
"BambuPrinter",
]