mirror of
https://github.com/RunLit/Bambu-Run.git
synced 2026-06-22 14:09:04 +01:00
* bypass bambu cloud api opencb requirement * project root add to managepy * update instruction to do migration; mqtt login more verbose * migrations up to date model * use migrations from django migrate * print full token to copy paste * allow local network hosts * added side bar toggle * removed standalone css from dashboard css * added icon and fixed text trunction issue * fixed chart missing whitenoise and not rendering * aded favicon and fixed ui issues
878 lines
31 KiB
Python
878 lines
31 KiB
Python
"""
|
|
BambuLab Cloud API Client
|
|
Provides authentication, device management, and real-time MQTT monitoring
|
|
for BambuLab 3D printers via the Cloud API.
|
|
|
|
Requires: pip install bambu-lab-cloud-api
|
|
|
|
Usage:
|
|
from bambu_run.mqtt_client import BambuPrinter, PrinterState
|
|
|
|
printer = BambuPrinter(token="your_token", device_id="your_device_id")
|
|
printer.connect()
|
|
state = printer.get_state()
|
|
snapshot = printer.get_snapshot()
|
|
printer.disconnect()
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
import os
|
|
import platform
|
|
import sys
|
|
import select
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from .conf import app_settings
|
|
|
|
# Re-export from bambu-lab-cloud-api package
|
|
try:
|
|
from bambulab import BambuAuthenticator, BambuClient, MQTTClient
|
|
except ImportError as e:
|
|
raise ImportError(
|
|
"bambu-lab-cloud-api package is required. Install with: pip install bambu-lab-cloud-api"
|
|
) from e
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@contextmanager
|
|
def suppress_stdout():
|
|
"""Context manager to suppress stdout (for silencing library print statements)"""
|
|
old_stdout = sys.stdout
|
|
sys.stdout = io.StringIO()
|
|
try:
|
|
yield
|
|
finally:
|
|
sys.stdout = old_stdout
|
|
|
|
|
|
def timed_input(prompt: str, timeout_sec: int = 300) -> str:
|
|
"""
|
|
Get user input with a timeout.
|
|
|
|
Args:
|
|
prompt: The prompt to display
|
|
timeout_sec: Timeout in seconds (default 300 = 5 minutes)
|
|
|
|
Returns:
|
|
User input string
|
|
|
|
Raises:
|
|
TimeoutError: If no input received within timeout
|
|
"""
|
|
print(prompt, end='', flush=True)
|
|
|
|
if platform.system() == 'Windows':
|
|
import threading
|
|
result = {'value': None, 'done': False}
|
|
|
|
def get_input():
|
|
try:
|
|
result['value'] = input()
|
|
except EOFError:
|
|
result['value'] = None
|
|
result['done'] = True
|
|
|
|
thread = threading.Thread(target=get_input, daemon=True)
|
|
thread.start()
|
|
thread.join(timeout=timeout_sec)
|
|
|
|
if not result['done']:
|
|
print()
|
|
raise TimeoutError(f"No input received within {timeout_sec} seconds")
|
|
return result['value'] or ""
|
|
else:
|
|
ready, _, _ = select.select([sys.stdin], [], [], timeout_sec)
|
|
if ready:
|
|
return sys.stdin.readline().strip()
|
|
else:
|
|
print()
|
|
raise TimeoutError(f"No input received within {timeout_sec} seconds")
|
|
|
|
|
|
@dataclass
|
|
class FilamentTray:
|
|
"""Represents a single filament tray in an AMS unit"""
|
|
tray_id: str = ""
|
|
tray_id_name: str = ""
|
|
tray_type: str = ""
|
|
tray_sub_brands: str = ""
|
|
tray_color: str = ""
|
|
remain_percent: int = -1
|
|
tray_weight: int = 0
|
|
tray_diameter: float = 1.75
|
|
tray_temp: int = 0
|
|
nozzle_temp_min: int = 0
|
|
nozzle_temp_max: int = 0
|
|
state: int = 0
|
|
tag_uid: str = ""
|
|
tray_uuid: str = ""
|
|
k: float = 0.0
|
|
n: float = 0.0
|
|
cali_idx: int = -1
|
|
total_len: int = 0
|
|
tray_info_idx: str = ""
|
|
tray_time: int = 0
|
|
tray_bed_temp: int = 0
|
|
bed_temp_type: int = 0
|
|
cols: List[str] = field(default_factory=list)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "FilamentTray":
|
|
"""Create FilamentTray from MQTT tray data"""
|
|
return cls(
|
|
tray_id=str(data.get("id", "")),
|
|
tray_id_name=data.get("tray_id_name", ""),
|
|
tray_type=data.get("tray_type", ""),
|
|
tray_sub_brands=data.get("tray_sub_brands", ""),
|
|
tray_color=data.get("tray_color", ""),
|
|
remain_percent=data.get("remain", -1),
|
|
tray_weight=int(data.get("tray_weight", 0)),
|
|
tray_diameter=float(data.get("tray_diameter", 1.75)),
|
|
tray_temp=int(data.get("tray_temp", 0)),
|
|
nozzle_temp_min=int(data.get("nozzle_temp_min", 0)),
|
|
nozzle_temp_max=int(data.get("nozzle_temp_max", 0)),
|
|
state=data.get("state", 0),
|
|
tag_uid=data.get("tag_uid", ""),
|
|
tray_uuid=data.get("tray_uuid", ""),
|
|
k=float(data.get("k", 0.0)),
|
|
n=float(data.get("n", 0.0)),
|
|
cali_idx=int(data.get("cali_idx", -1)),
|
|
total_len=int(data.get("total_len", 0)),
|
|
tray_info_idx=data.get("tray_info_idx", ""),
|
|
tray_time=int(data.get("tray_time", 0)),
|
|
tray_bed_temp=int(data.get("bed_temp", 0)),
|
|
bed_temp_type=int(data.get("bed_temp_type", 0)),
|
|
cols=data.get("cols", []),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for database storage"""
|
|
return {
|
|
"tray_id": self.tray_id,
|
|
"tray_id_name": self.tray_id_name,
|
|
"tray_type": self.tray_type,
|
|
"tray_sub_brands": self.tray_sub_brands,
|
|
"tray_color": self.tray_color,
|
|
"remain_percent": self.remain_percent,
|
|
"tray_weight": self.tray_weight,
|
|
"tray_diameter": self.tray_diameter,
|
|
"tray_temp": self.tray_temp,
|
|
"nozzle_temp_min": self.nozzle_temp_min,
|
|
"nozzle_temp_max": self.nozzle_temp_max,
|
|
"state": self.state,
|
|
"tag_uid": self.tag_uid,
|
|
"tray_uuid": self.tray_uuid,
|
|
"k": self.k,
|
|
"n": self.n,
|
|
"cali_idx": self.cali_idx,
|
|
"total_len": self.total_len,
|
|
"tray_info_idx": self.tray_info_idx,
|
|
"tray_time": self.tray_time,
|
|
"tray_bed_temp": self.tray_bed_temp,
|
|
"bed_temp_type": self.bed_temp_type,
|
|
"cols": self.cols,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class AMSUnit:
|
|
"""Represents a single AMS (Automatic Material System) unit"""
|
|
ams_id: str = ""
|
|
unit_id: str = ""
|
|
humidity: int = -1
|
|
humidity_raw: int = -1
|
|
temp: float = 0.0
|
|
dry_time: int = 0
|
|
chip_id: str = ""
|
|
info: str = ""
|
|
trays: List[FilamentTray] = field(default_factory=list)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "AMSUnit":
|
|
"""Create AMSUnit from MQTT ams data"""
|
|
trays = [FilamentTray.from_dict(t) for t in data.get("tray", [])]
|
|
return cls(
|
|
ams_id=data.get("ams_id", ""),
|
|
unit_id=str(data.get("id", "")),
|
|
humidity=int(data.get("humidity", -1)),
|
|
humidity_raw=int(data.get("humidity_raw", -1)),
|
|
temp=float(data.get("temp", 0.0)),
|
|
dry_time=data.get("dry_time", 0),
|
|
chip_id=data.get("chip_id", ""),
|
|
info=data.get("info", ""),
|
|
trays=trays,
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for database storage"""
|
|
return {
|
|
"ams_id": self.ams_id,
|
|
"unit_id": self.unit_id,
|
|
"humidity": self.humidity,
|
|
"humidity_raw": self.humidity_raw,
|
|
"temp": self.temp,
|
|
"dry_time": self.dry_time,
|
|
"chip_id": self.chip_id,
|
|
"info": self.info,
|
|
"trays": [t.to_dict() for t in self.trays],
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class AMSState:
|
|
"""Complete AMS system state including all units"""
|
|
ams_exist_bits: str = ""
|
|
tray_exist_bits: str = ""
|
|
tray_now: str = ""
|
|
tray_pre: str = ""
|
|
tray_tar: str = ""
|
|
ams_status: int = 0
|
|
ams_rfid_status: int = 0
|
|
tray_is_bbl_bits: str = ""
|
|
tray_read_done_bits: str = ""
|
|
version: int = 0
|
|
insert_flag: bool = False
|
|
power_on_flag: bool = False
|
|
units: List[AMSUnit] = field(default_factory=list)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "AMSState":
|
|
"""Create AMSState from MQTT ams data"""
|
|
units = [AMSUnit.from_dict(u) for u in data.get("ams", [])]
|
|
return cls(
|
|
ams_exist_bits=data.get("ams_exist_bits", ""),
|
|
tray_exist_bits=data.get("tray_exist_bits", ""),
|
|
tray_now=data.get("tray_now", ""),
|
|
tray_pre=data.get("tray_pre", ""),
|
|
tray_tar=data.get("tray_tar", ""),
|
|
ams_status=data.get("ams_status", 0),
|
|
ams_rfid_status=data.get("ams_rfid_status", 0),
|
|
tray_is_bbl_bits=data.get("tray_is_bbl_bits", ""),
|
|
tray_read_done_bits=data.get("tray_read_done_bits", ""),
|
|
version=int(data.get("version", 0)),
|
|
insert_flag=bool(data.get("insert_flag", False)),
|
|
power_on_flag=bool(data.get("power_on_flag", False)),
|
|
units=units,
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for database storage"""
|
|
return {
|
|
"ams_exist_bits": self.ams_exist_bits,
|
|
"tray_exist_bits": self.tray_exist_bits,
|
|
"tray_now": self.tray_now,
|
|
"tray_pre": self.tray_pre,
|
|
"tray_tar": self.tray_tar,
|
|
"ams_status": self.ams_status,
|
|
"ams_rfid_status": self.ams_rfid_status,
|
|
"tray_is_bbl_bits": self.tray_is_bbl_bits,
|
|
"tray_read_done_bits": self.tray_read_done_bits,
|
|
"version": self.version,
|
|
"insert_flag": self.insert_flag,
|
|
"power_on_flag": self.power_on_flag,
|
|
"units": [u.to_dict() for u in self.units],
|
|
}
|
|
|
|
@property
|
|
def total_trays(self) -> int:
|
|
"""Total number of trays across all units"""
|
|
return sum(len(u.trays) for u in self.units)
|
|
|
|
@property
|
|
def loaded_trays(self) -> List[FilamentTray]:
|
|
"""Get all trays that have filament loaded"""
|
|
loaded = []
|
|
for unit in self.units:
|
|
for tray in unit.trays:
|
|
if tray.tray_type:
|
|
loaded.append(tray)
|
|
return loaded
|
|
|
|
|
|
@dataclass
|
|
class PrinterState:
|
|
"""Complete printer state parsed from MQTT data"""
|
|
timestamp: str = ""
|
|
sequence_id: str = ""
|
|
|
|
# Temperature info
|
|
nozzle_temp: float = 0.0
|
|
nozzle_target_temp: float = 0.0
|
|
bed_temp: float = 0.0
|
|
bed_target_temp: float = 0.0
|
|
chamber_temp: float = 0.0
|
|
|
|
# Print progress
|
|
gcode_state: str = ""
|
|
print_percent: int = 0
|
|
remaining_time_min: int = 0
|
|
layer_num: int = 0
|
|
total_layer_num: int = 0
|
|
print_line_number: int = 0
|
|
|
|
# Current job info
|
|
gcode_file: str = ""
|
|
subtask_name: str = ""
|
|
subtask_id: str = ""
|
|
task_id: str = ""
|
|
project_id: str = ""
|
|
profile_id: str = ""
|
|
print_type: str = ""
|
|
|
|
# Fan speeds
|
|
fan_gear: int = 0
|
|
cooling_fan_speed: int = 0
|
|
heatbreak_fan_speed: int = 0
|
|
|
|
# WiFi / Network
|
|
wifi_signal: str = ""
|
|
wifi_signal_dbm: int = 0
|
|
|
|
# Nozzle info
|
|
nozzle_diameter: float = 0.4
|
|
nozzle_type: str = ""
|
|
|
|
# System status
|
|
home_flag: int = 0
|
|
hw_switch_state: int = 0
|
|
mc_print_stage: str = ""
|
|
mc_print_sub_stage: int = 0
|
|
print_error: int = 0
|
|
stg_cur: int = 0
|
|
|
|
# AMS state
|
|
ams: Optional[AMSState] = None
|
|
|
|
# Upgrade state
|
|
upgrade_state: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
# Version info
|
|
version: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
# Camera / Timelapse
|
|
ipcam: Dict[str, Any] = field(default_factory=dict)
|
|
timelapse: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
# Lights
|
|
lights_report: List[Dict[str, Any]] = field(default_factory=list)
|
|
|
|
# HMS (Health Management System) messages
|
|
hms: List[Dict[str, Any]] = field(default_factory=list)
|
|
|
|
# Speed settings
|
|
spd_lvl: int = 0
|
|
spd_mag: int = 0
|
|
|
|
# Auxiliary fans
|
|
big_fan1_speed: int = 0
|
|
big_fan2_speed: int = 0
|
|
|
|
# System info
|
|
sdcard: bool = False
|
|
gcode_file_prepare_percent: str = ""
|
|
lifecycle: str = ""
|
|
|
|
# External spool (virtual tray)
|
|
vt_tray: Optional[Dict[str, Any]] = None
|
|
|
|
# Raw data for any additional fields
|
|
_raw_data: Dict[str, Any] = field(default_factory=dict, repr=False)
|
|
|
|
@staticmethod
|
|
def _parse_wifi_signal(signal_str: str) -> int:
|
|
"""Parse WiFi signal string (e.g., '-34dBm') to integer dBm"""
|
|
if not signal_str:
|
|
return 0
|
|
try:
|
|
return int(signal_str.replace("dBm", ""))
|
|
except (ValueError, AttributeError):
|
|
return 0
|
|
|
|
@classmethod
|
|
def from_mqtt_data(cls, data: Dict[str, Any], timestamp: Optional[str] = None) -> "PrinterState":
|
|
"""Create PrinterState from MQTT push_status data."""
|
|
if timestamp is None:
|
|
timestamp = datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat()
|
|
|
|
print_data = data.get("print", {})
|
|
|
|
# Parse AMS data if present
|
|
ams = None
|
|
if "ams" in print_data:
|
|
ams = AMSState.from_dict(print_data["ams"])
|
|
|
|
wifi_signal = print_data.get("wifi_signal", "")
|
|
|
|
return cls(
|
|
timestamp=timestamp,
|
|
sequence_id=str(print_data.get("sequence_id", "")),
|
|
nozzle_temp=float(print_data.get("nozzle_temper", 0.0)),
|
|
nozzle_target_temp=float(print_data.get("nozzle_target_temper", 0.0)),
|
|
bed_temp=float(print_data.get("bed_temper", 0.0)),
|
|
bed_target_temp=float(print_data.get("bed_target_temper", 0.0)),
|
|
chamber_temp=float(print_data.get("chamber_temper", 0.0)),
|
|
gcode_state=print_data.get("gcode_state", ""),
|
|
print_percent=int(print_data.get("mc_percent", 0)),
|
|
remaining_time_min=int(print_data.get("mc_remaining_time", 0)),
|
|
layer_num=int(print_data.get("layer_num", 0)),
|
|
total_layer_num=int(print_data.get("total_layer_num", 0)),
|
|
print_line_number=int(print_data.get("mc_print_line_number", 0)),
|
|
gcode_file=print_data.get("gcode_file", ""),
|
|
subtask_name=print_data.get("subtask_name", ""),
|
|
subtask_id=print_data.get("subtask_id", ""),
|
|
task_id=print_data.get("task_id", ""),
|
|
project_id=print_data.get("project_id", ""),
|
|
profile_id=print_data.get("profile_id", ""),
|
|
print_type=print_data.get("print_type", ""),
|
|
fan_gear=int(print_data.get("fan_gear", 0)),
|
|
cooling_fan_speed=int(print_data.get("cooling_fan_speed", 0)),
|
|
heatbreak_fan_speed=int(print_data.get("heatbreak_fan_speed", 0)),
|
|
wifi_signal=wifi_signal,
|
|
wifi_signal_dbm=cls._parse_wifi_signal(wifi_signal),
|
|
nozzle_diameter=float(print_data.get("nozzle_diameter", 0.4)),
|
|
nozzle_type=print_data.get("nozzle_type", ""),
|
|
home_flag=int(print_data.get("home_flag", 0)),
|
|
hw_switch_state=int(print_data.get("hw_switch_state", 0)),
|
|
mc_print_stage=str(print_data.get("mc_print_stage", "")),
|
|
mc_print_sub_stage=int(print_data.get("mc_print_sub_stage", 0)),
|
|
print_error=int(print_data.get("print_error", 0)),
|
|
stg_cur=int(print_data.get("stg_cur", 0)),
|
|
ams=ams,
|
|
upgrade_state=print_data.get("upgrade_state", {}),
|
|
version=print_data.get("version", {}),
|
|
ipcam=print_data.get("ipcam", {}),
|
|
timelapse=print_data.get("timelapse", {}),
|
|
lights_report=print_data.get("lights_report", []),
|
|
hms=print_data.get("hms", []),
|
|
spd_lvl=int(print_data.get("spd_lvl", 0)),
|
|
spd_mag=int(print_data.get("spd_mag", 0)),
|
|
big_fan1_speed=int(print_data.get("big_fan1_speed", 0)),
|
|
big_fan2_speed=int(print_data.get("big_fan2_speed", 0)),
|
|
sdcard=bool(print_data.get("sdcard", False)),
|
|
gcode_file_prepare_percent=str(print_data.get("gcode_file_prepare_percent", "")),
|
|
lifecycle=print_data.get("lifecycle", ""),
|
|
vt_tray=print_data.get("vt_tray"),
|
|
_raw_data=data,
|
|
)
|
|
|
|
def get_snapshot(self) -> Dict[str, Any]:
|
|
"""Get a simplified snapshot for database logging."""
|
|
snapshot = {
|
|
"timestamp": self.timestamp,
|
|
"nozzle_temp": round(self.nozzle_temp, 2),
|
|
"nozzle_target_temp": round(self.nozzle_target_temp, 2),
|
|
"bed_temp": round(self.bed_temp, 2),
|
|
"bed_target_temp": round(self.bed_target_temp, 2),
|
|
"chamber_temp": round(self.chamber_temp, 2),
|
|
"nozzle_diameter": self.nozzle_diameter,
|
|
"nozzle_type": self.nozzle_type,
|
|
"gcode_state": self.gcode_state,
|
|
"print_type": self.print_type,
|
|
"print_percent": self.print_percent,
|
|
"remaining_time_min": self.remaining_time_min,
|
|
"layer_num": self.layer_num,
|
|
"total_layer_num": self.total_layer_num,
|
|
"print_line_number": self.print_line_number,
|
|
"subtask_name": self.subtask_name,
|
|
"gcode_file": self.gcode_file,
|
|
"cooling_fan_speed": self.cooling_fan_speed,
|
|
"heatbreak_fan_speed": self.heatbreak_fan_speed,
|
|
"big_fan1_speed": self.big_fan1_speed,
|
|
"big_fan2_speed": self.big_fan2_speed,
|
|
"spd_lvl": self.spd_lvl,
|
|
"spd_mag": self.spd_mag,
|
|
"wifi_signal_dbm": self.wifi_signal_dbm,
|
|
"print_error": self.print_error,
|
|
"has_errors": self.print_error != 0,
|
|
"hms": self.hms,
|
|
"stg_cur": self.stg_cur,
|
|
"lights_report": self.lights_report,
|
|
"chamber_light": self._get_chamber_light_status(),
|
|
"ipcam_record": self.ipcam.get("ipcam_record", ""),
|
|
"timelapse": self.ipcam.get("timelapse", ""),
|
|
"sdcard": self.sdcard,
|
|
"gcode_file_prepare_percent": self.gcode_file_prepare_percent,
|
|
"lifecycle": self.lifecycle,
|
|
}
|
|
|
|
if self.ams:
|
|
snapshot["ams_unit_count"] = len(self.ams.units)
|
|
snapshot["ams_status"] = self.ams.ams_status
|
|
snapshot["ams_rfid_status"] = self.ams.ams_rfid_status
|
|
snapshot["ams_exist_bits"] = self.ams.ams_exist_bits
|
|
snapshot["tray_exist_bits"] = self.ams.tray_exist_bits
|
|
snapshot["tray_is_bbl_bits"] = self.ams.tray_is_bbl_bits
|
|
snapshot["tray_read_done_bits"] = self.ams.tray_read_done_bits
|
|
snapshot["tray_now"] = self.ams.tray_now
|
|
snapshot["ams_version"] = self.ams.version
|
|
|
|
filaments = []
|
|
for unit in self.ams.units:
|
|
for tray in unit.trays:
|
|
if tray.tray_type:
|
|
filaments.append({
|
|
"tray_id": tray.tray_id,
|
|
"slot": tray.tray_id_name,
|
|
"type": tray.tray_type,
|
|
"sub_type": tray.tray_sub_brands,
|
|
"color": tray.tray_color,
|
|
"remain_percent": tray.remain_percent,
|
|
"tray_weight": tray.tray_weight,
|
|
"tray_diameter": tray.tray_diameter,
|
|
"nozzle_temp_min": tray.nozzle_temp_min,
|
|
"nozzle_temp_max": tray.nozzle_temp_max,
|
|
"tag_uid": tray.tag_uid,
|
|
"state": tray.state,
|
|
"tray_uuid": tray.tray_uuid,
|
|
"k": tray.k,
|
|
"n": tray.n,
|
|
"cali_idx": tray.cali_idx,
|
|
"total_len": tray.total_len,
|
|
"tray_info_idx": tray.tray_info_idx,
|
|
"tray_time": tray.tray_time,
|
|
"tray_bed_temp": tray.tray_bed_temp,
|
|
"bed_temp_type": tray.bed_temp_type,
|
|
"cols": tray.cols,
|
|
})
|
|
snapshot["filaments"] = filaments
|
|
|
|
ams_units = []
|
|
for unit in self.ams.units:
|
|
ams_units.append({
|
|
"unit_id": unit.unit_id,
|
|
"ams_id": unit.ams_id,
|
|
"chip_id": unit.chip_id,
|
|
"info": unit.info,
|
|
"humidity": unit.humidity,
|
|
"humidity_raw": unit.humidity_raw,
|
|
"temp": unit.temp,
|
|
"dry_time": unit.dry_time,
|
|
})
|
|
snapshot["ams_units"] = ams_units
|
|
|
|
if self.ams.units:
|
|
snapshot["ams_humidity"] = self.ams.units[0].humidity
|
|
snapshot["ams_humidity_raw"] = self.ams.units[0].humidity_raw
|
|
snapshot["ams_temp"] = self.ams.units[0].temp
|
|
|
|
if self.vt_tray:
|
|
snapshot["external_spool"] = {
|
|
"type": self.vt_tray.get("tray_type", ""),
|
|
"color": self.vt_tray.get("tray_color", ""),
|
|
"remain": self.vt_tray.get("remain", 0),
|
|
}
|
|
|
|
return snapshot
|
|
|
|
def _get_chamber_light_status(self) -> str:
|
|
"""Extract chamber light status from lights_report"""
|
|
for light in self.lights_report:
|
|
if light.get("node") == "chamber_light":
|
|
return light.get("mode", "unknown")
|
|
return "unknown"
|
|
|
|
@property
|
|
def is_printing(self) -> bool:
|
|
return self.gcode_state.upper() in ("RUNNING", "PRINTING")
|
|
|
|
@property
|
|
def is_idle(self) -> bool:
|
|
return self.gcode_state.upper() in ("IDLE", "FINISH", "")
|
|
|
|
@property
|
|
def is_paused(self) -> bool:
|
|
return self.gcode_state.upper() == "PAUSE"
|
|
|
|
|
|
class PrinterStateAccumulator:
|
|
"""
|
|
Accumulates MQTT updates into a complete printer state.
|
|
|
|
BambuLab MQTT sends incremental updates - each message may only contain
|
|
a subset of fields that have changed. This class maintains the complete
|
|
state by merging updates.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._state_data: Dict[str, Any] = {"print": {}}
|
|
self._last_update: Optional[str] = None
|
|
self._update_count: int = 0
|
|
|
|
def update(self, data: Dict[str, Any]) -> PrinterState:
|
|
"""Merge new MQTT data into accumulated state and return complete PrinterState."""
|
|
timestamp = datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat()
|
|
self._last_update = timestamp
|
|
self._update_count += 1
|
|
|
|
if "print" in data:
|
|
self._deep_merge(self._state_data["print"], data["print"])
|
|
|
|
return PrinterState.from_mqtt_data(self._state_data, timestamp)
|
|
|
|
def _deep_merge(self, base: Dict, update: Dict) -> None:
|
|
"""Recursively merge update into base dict"""
|
|
for key, value in update.items():
|
|
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
|
|
self._deep_merge(base[key], value)
|
|
else:
|
|
base[key] = value
|
|
|
|
def get_state(self) -> PrinterState:
|
|
"""Get current accumulated state without updating"""
|
|
timestamp = self._last_update or datetime.now(ZoneInfo(app_settings.TIMEZONE)).isoformat()
|
|
return PrinterState.from_mqtt_data(self._state_data, timestamp)
|
|
|
|
def reset(self) -> None:
|
|
"""Reset accumulated state"""
|
|
self._state_data = {"print": {}}
|
|
self._last_update = None
|
|
self._update_count = 0
|
|
|
|
@property
|
|
def update_count(self) -> int:
|
|
return self._update_count
|
|
|
|
@property
|
|
def last_update(self) -> Optional[str]:
|
|
return self._last_update
|
|
|
|
|
|
class BambuPrinter:
|
|
"""
|
|
High-level interface for BambuLab printer monitoring.
|
|
Combines authentication, client, and MQTT into a single interface.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
username: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
token: Optional[str] = None,
|
|
device_id: Optional[str] = None,
|
|
on_update: Optional[Callable[[PrinterState], None]] = None,
|
|
silent: bool = True,
|
|
verification_timeout: int = 300,
|
|
):
|
|
self.username = username or os.getenv("BAMBU_USERNAME")
|
|
self.password = password or os.getenv("BAMBU_PASSWORD")
|
|
self._token = token or os.getenv("BAMBU_TOKEN")
|
|
self._device_id = device_id or os.getenv("BAMBU_DEVICE_ID")
|
|
self._uid: Optional[str] = None
|
|
self._on_update = on_update
|
|
self._silent = silent
|
|
self._verification_timeout = verification_timeout
|
|
|
|
self._client: Optional[BambuClient] = None
|
|
self._mqtt: Optional[MQTTClient] = None
|
|
self._accumulator = PrinterStateAccumulator()
|
|
self._connected = False
|
|
self._devices: List[Dict[str, Any]] = []
|
|
|
|
def _get_fresh_token(self, verification_code_timeout: int = 300) -> str:
|
|
"""Get a fresh token using credentials."""
|
|
if not self.username or not self.password:
|
|
raise ValueError(
|
|
"Username and password required for token refresh. Provide as arguments "
|
|
"or set BAMBU_USERNAME and BAMBU_PASSWORD environment variables."
|
|
)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("BambuLab Authentication")
|
|
print("=" * 60)
|
|
print(f"Authenticating as: {self.username}")
|
|
print()
|
|
print(">>> ACTION MAY BE REQUIRED <<<")
|
|
print("Bambu Lab will send a 6-digit verification code to your")
|
|
print("registered email. Watch this terminal — if a prompt")
|
|
print(f"appears below, enter the code and press Enter.")
|
|
print(f"(You have {verification_code_timeout} seconds to respond.)")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
auth = BambuAuthenticator()
|
|
|
|
try:
|
|
# Always show stdout during auth — suppress_stdout would hide
|
|
# interactive prompts from the library (e.g. verification code input).
|
|
token = auth.get_or_create_token(
|
|
username=self.username,
|
|
password=self.password
|
|
)
|
|
|
|
self._token = token
|
|
print("Authentication successful!")
|
|
print(f"Token: {token}")
|
|
print("=" * 60 + "\n")
|
|
logger.info("BambuLab token obtained successfully")
|
|
return token
|
|
|
|
except Exception as e:
|
|
error_msg = str(e).lower()
|
|
|
|
if "verification" in error_msg or "code" in error_msg or "2fa" in error_msg:
|
|
print("\n" + "-" * 60)
|
|
print("EMAIL VERIFICATION REQUIRED")
|
|
print("-" * 60)
|
|
print("A verification code has been sent to your email.")
|
|
print(f"You have {verification_code_timeout} seconds to enter it.")
|
|
print()
|
|
|
|
try:
|
|
code = timed_input(
|
|
"Enter verification code: ",
|
|
timeout_sec=verification_code_timeout
|
|
)
|
|
|
|
if not code:
|
|
raise ValueError("No verification code entered")
|
|
|
|
print("Verifying code...")
|
|
token = auth.login(
|
|
self.username,
|
|
self.password,
|
|
verification_code=code
|
|
)
|
|
|
|
self._token = token
|
|
print("\nAuthentication successful!")
|
|
print(f"Token: {token[:20]}...{token[-10:]}")
|
|
print("=" * 60 + "\n")
|
|
print("TIP: Save this token to BAMBU_TOKEN env var to skip login next time")
|
|
logger.info("BambuLab token obtained with 2FA verification")
|
|
return token
|
|
|
|
except TimeoutError:
|
|
print("\nVerification timed out!")
|
|
raise TimeoutError(
|
|
f"Verification code not entered within {verification_code_timeout} seconds"
|
|
)
|
|
else:
|
|
print(f"\nAuthentication failed: {e}")
|
|
raise
|
|
|
|
def _ensure_token(self) -> str:
|
|
"""Ensure we have a valid token, refreshing if needed"""
|
|
if self._token:
|
|
logger.debug("Using existing token")
|
|
return self._token
|
|
|
|
print("\n" + "!" * 60)
|
|
print("NO TOKEN FOUND")
|
|
print("!" * 60)
|
|
print("Checked:")
|
|
print(" - Constructor 'token' parameter: Not provided")
|
|
print(" - Environment variable 'BAMBU_TOKEN': Not set")
|
|
print()
|
|
print("Will attempt to authenticate with username/password...")
|
|
print("!" * 60 + "\n")
|
|
|
|
return self._get_fresh_token(verification_code_timeout=self._verification_timeout)
|
|
|
|
def _on_mqtt_message(self, device_id: str, data: Dict[str, Any]) -> None:
|
|
"""Internal MQTT message handler"""
|
|
if not data:
|
|
return
|
|
state = self._accumulator.update(data)
|
|
if self._on_update:
|
|
self._on_update(state)
|
|
|
|
def connect(self, blocking: bool = False, retry_on_auth_error: bool = True) -> None:
|
|
"""Connect to printer via MQTT."""
|
|
token = self._ensure_token()
|
|
|
|
try:
|
|
self._client = BambuClient(token=token)
|
|
user_info = self._client.get_user_info()
|
|
self._uid = str(user_info.get("uid", ""))
|
|
|
|
if not self._device_id:
|
|
self._devices = self._client.get_devices()
|
|
if not self._devices:
|
|
raise RuntimeError("No devices found on this account")
|
|
self._device_id = self._devices[0].get("dev_id")
|
|
|
|
self._mqtt = MQTTClient(
|
|
self._uid,
|
|
token,
|
|
self._device_id,
|
|
on_message=self._on_mqtt_message
|
|
)
|
|
self._mqtt.connect(blocking=blocking)
|
|
self._connected = True
|
|
logger.info(f"Connected to BambuLab printer: {self._device_id}")
|
|
|
|
except Exception as e:
|
|
error_msg = str(e).lower()
|
|
is_auth_error = any(x in error_msg for x in ["401", "unauthorized", "token", "auth", "expired"])
|
|
|
|
if is_auth_error and retry_on_auth_error and self.username and self.password:
|
|
logger.warning("Auth error detected, refreshing token and retrying...")
|
|
self._token = None
|
|
self._get_fresh_token()
|
|
self.connect(blocking=blocking, retry_on_auth_error=False)
|
|
else:
|
|
raise
|
|
|
|
def reconnect(self, blocking: bool = False) -> None:
|
|
"""Disconnect and reconnect."""
|
|
self.disconnect()
|
|
self._accumulator.reset()
|
|
self.connect(blocking=blocking)
|
|
|
|
def disconnect(self) -> None:
|
|
"""Disconnect from MQTT"""
|
|
if self._mqtt:
|
|
try:
|
|
self._mqtt.disconnect()
|
|
except Exception:
|
|
pass
|
|
self._connected = False
|
|
logger.debug("Disconnected from BambuLab printer")
|
|
|
|
def get_state(self) -> PrinterState:
|
|
"""Get current accumulated printer state"""
|
|
return self._accumulator.get_state()
|
|
|
|
def get_snapshot(self) -> Dict[str, Any]:
|
|
"""Get simplified snapshot for database logging"""
|
|
return self._accumulator.get_state().get_snapshot()
|
|
|
|
@property
|
|
def device_id(self) -> Optional[str]:
|
|
return self._device_id
|
|
|
|
@property
|
|
def devices(self) -> List[Dict[str, Any]]:
|
|
return self._devices
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return self._connected
|
|
|
|
def __enter__(self):
|
|
self.connect(blocking=False)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.disconnect()
|
|
|
|
|
|
__all__ = [
|
|
"BambuAuthenticator",
|
|
"BambuClient",
|
|
"MQTTClient",
|
|
"FilamentTray",
|
|
"AMSUnit",
|
|
"AMSState",
|
|
"PrinterState",
|
|
"PrinterStateAccumulator",
|
|
"BambuPrinter",
|
|
]
|