diff --git a/bambu_run/admin.py b/bambu_run/admin.py
index 03d7989..4873f13 100644
--- a/bambu_run/admin.py
+++ b/bambu_run/admin.py
@@ -1,5 +1,5 @@
from django.contrib import admin
-from .models import Printer, PrinterMetrics, Filament, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, BambuCloudTask
+from .models import Printer, PrinterMetrics, Filament, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, BambuCloudTask, Hotend, HotendSnapshot
@admin.register(Printer)
@@ -107,6 +107,21 @@ class FilamentUsageAdmin(admin.ModelAdmin):
readonly_fields = ('consumed_percent', 'consumed_grams')
+@admin.register(Hotend)
+class HotendAdmin(admin.ModelAdmin):
+ list_display = ('printer', 'serial_number', 'nozzle_type', 'is_toolhead', 'slot_number', 'used_time_seconds', 'wear_percent', 'last_seen_at')
+ list_filter = ('printer', 'is_toolhead', 'nozzle_type')
+ search_fields = ('serial_number',)
+ readonly_fields = ('last_seen_at', 'created_at')
+
+
+@admin.register(HotendSnapshot)
+class HotendSnapshotAdmin(admin.ModelAdmin):
+ list_display = ('printer_metric', 'hotend', 'raw_id', 'used_time_seconds', 'wear_percent', 'timestamp')
+ list_filter = ('hotend',)
+ readonly_fields = ('printer_metric', 'hotend', 'raw_id', 'used_time_seconds', 'wear_percent', 'stat', 'timestamp')
+
+
@admin.register(BambuCloudTask)
class BambuCloudTaskAdmin(admin.ModelAdmin):
list_display = ('task_id', 'design_title', 'plate_title', 'device_serial', 'cloud_status', 'weight_grams', 'cloud_start_time', 'synced_at')
diff --git a/bambu_run/diagnostics.py b/bambu_run/diagnostics.py
new file mode 100644
index 0000000..266c2c7
--- /dev/null
+++ b/bambu_run/diagnostics.py
@@ -0,0 +1,78 @@
+"""
+Pure helpers for the `bambu_diagnose` management command.
+
+Kept separate from the command itself (and free of Django/network imports)
+so the report-building and redaction logic can be unit-tested without
+talking to the real Bambu Lab cloud or MQTT broker.
+"""
+
+from datetime import datetime, timezone
+from typing import Any, Dict, List, Optional
+
+# Keys whose values are always replaced outright, regardless of nesting depth.
+_SECRET_KEY_SUBSTRINGS = ("password", "token", "secret", "access_code", "authorization")
+
+# Keys that identify a specific physical device/spool/account — not secret,
+# but identifying, so they're partially masked by default before anything
+# gets pasted into a public GitHub issue.
+_IDENTIFIER_KEYS = {"dev_id", "device_id", "serial_number", "tray_uuid", "tag_uid", "uid"}
+
+
+def _mask_identifier(value: Any) -> Any:
+ if not isinstance(value, str) or len(value) <= 8:
+ return "***"
+ return f"{value[:4]}...{value[-4:]}"
+
+
+def redact_diagnostics(data: Any, redact: bool = True) -> Any:
+ """Recursively redact secrets and mask identifiers in a diagnostics payload.
+
+ `redact=False` returns the data unchanged — only for the reporter's own
+ local debugging, never for anything posted publicly.
+ """
+ if not redact:
+ return data
+ return _redact(data)
+
+
+def _redact(obj: Any) -> Any:
+ if isinstance(obj, dict):
+ result = {}
+ for key, value in obj.items():
+ lowered = key.lower()
+ if any(secret in lowered for secret in _SECRET_KEY_SUBSTRINGS):
+ result[key] = "***REDACTED***"
+ elif lowered in _IDENTIFIER_KEYS:
+ result[key] = _mask_identifier(value)
+ else:
+ result[key] = _redact(value)
+ return result
+ if isinstance(obj, list):
+ return [_redact(item) for item in obj]
+ return obj
+
+
+def build_diagnostics_report(
+ devices: List[Dict[str, Any]],
+ raw_payloads: Dict[str, Optional[Dict[str, Any]]],
+) -> Dict[str, Any]:
+ """Assemble the (pre-redaction) diagnostics report from discovered devices
+ and whatever raw MQTT payload was captured for each during the listen window.
+ """
+ device_entries = []
+ for device in devices:
+ dev_id = device.get("dev_id")
+ payload = raw_payloads.get(dev_id)
+ entry = {
+ "device_info": device,
+ "raw_mqtt_payload": payload,
+ }
+ if payload is None:
+ entry["note"] = "No MQTT data received within the listen window."
+ device_entries.append(entry)
+
+ return {
+ "generated_at": datetime.now(timezone.utc).isoformat(),
+ "device_count": len(devices),
+ "devices": device_entries,
+ }
diff --git a/bambu_run/management/commands/bambu_collector.py b/bambu_run/management/commands/bambu_collector.py
index 1b7f462..ce1bcdd 100644
--- a/bambu_run/management/commands/bambu_collector.py
+++ b/bambu_run/management/commands/bambu_collector.py
@@ -13,8 +13,9 @@ import logging
import os
import ssl
import time
+from dataclasses import dataclass, field
from decimal import Decimal
-from typing import Optional
+from typing import Any, Dict, Optional
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
@@ -26,6 +27,56 @@ from bambu_run.models import Printer, PrinterMetrics
logger = logging.getLogger("bambu_run.collector")
+def resolve_printer_device(device_id: str, device_info: Optional[dict] = None) -> Printer:
+ """Find-or-create the Printer row for a Bambu cloud device, keyed by serial number.
+
+ `device_info` is one entry from BambuClient.get_devices() (keys: name,
+ dev_product_name, dev_id, ...). Falls back to generic defaults when unavailable
+ (e.g. local-only connections that never call get_devices()).
+ """
+ device_info = device_info or {}
+ name = device_info.get("name") or "Bambu Lab Printer"
+ model = device_info.get("dev_product_name") or "Bambu Lab"
+
+ printer = Printer.objects.filter(serial_number=device_id).first()
+
+ if printer is None:
+ # Upgrade path: a pre-multi-printer deployment has exactly one Printer row
+ # with no serial number yet. Backfill it instead of creating a duplicate.
+ # If there's more than one such row, we can't tell which one this device
+ # used to be, so don't guess — create a fresh row instead.
+ legacy_candidates = list(Printer.objects.filter(serial_number__isnull=True)[:2])
+ if len(legacy_candidates) == 1:
+ printer = legacy_candidates[0]
+ printer.serial_number = device_id
+
+ if printer is None:
+ printer = Printer(serial_number=device_id)
+
+ printer.name = name
+ printer.model = model
+ printer.manufacturer = "Bambu Lab"
+ printer.is_active = True
+ printer.save()
+ return printer
+
+
+@dataclass
+class DeviceSession:
+ """Per-printer mutable state for one bound device in a multi-printer collector run."""
+
+ device_id: str
+ client: Any # BambuPrinter
+ printer: Printer
+ current_print_job: Optional[Any] = None
+ last_gcode_state: Optional[str] = None
+ last_subtask_name: Optional[str] = None
+ trays_used: set = field(default_factory=set)
+ error_count: int = 0
+ success_count: int = 0
+ mqtt_connect_errors: int = 0
+
+
class Command(BaseCommand):
"""
MQTT Poll -> PrinterMetrics -> FilamentSnapshot -> Auto-Match -> Update Filament
@@ -51,18 +102,11 @@ class Command(BaseCommand):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self.printer_client = None
- self.printer_device = None
+ self.sessions: Dict[str, DeviceSession] = {}
+ self._token: Optional[str] = None
self.verbose = False
self.disable_ssl_verify = False
- self.error_count = 0
- self.success_count = 0
- self.mqtt_connect_errors = 0
self.start_time = None
- self.current_print_job = None
- self.last_gcode_state = None
- self.last_subtask_name = None
- self.trays_used = set()
def handle(self, *args, **options):
self.verbose = options["verbose"]
@@ -100,12 +144,13 @@ class Command(BaseCommand):
self._configure_logging()
try:
- self._initialize_printer()
+ self._initialize_printers()
except Exception as e:
raise CommandError(f"Initialization failed: {e}")
self.start_time = timezone.now()
- logger.info(f"Bambu Run data collector started for printer: {self.printer_device.name}")
+ printer_names = ", ".join(s.printer.name for s in self.sessions.values())
+ logger.info(f"Bambu Run data collector started for {len(self.sessions)} printer(s): {printer_names}")
logger.info(f"Collection interval: {interval} seconds")
logger.info(f"Mode: {'Single run' if run_once else 'Continuous'}")
@@ -113,7 +158,8 @@ class Command(BaseCommand):
if run_once:
import time as _time
_time.sleep(5)
- self._collect_printer_data()
+ for session in self.sessions.values():
+ self._collect_printer_data(session)
logger.info("Single collection completed successfully")
else:
self._run_continuous_loop(interval)
@@ -124,7 +170,7 @@ class Command(BaseCommand):
logger.exception(f"Fatal error in main loop: {e}")
raise CommandError(f"Runner failed: {e}")
- def _request_full_status_when_ready(self, timeout: float = 20.0) -> None:
+ def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None:
"""Send pushall once the MQTT broker connection is confirmed.
BambuPrinter._connected is set True immediately after connect(blocking=False),
@@ -134,9 +180,9 @@ class Command(BaseCommand):
import time as _time
deadline = _time.time() + timeout
while _time.time() < deadline:
- mqtt_client = getattr(self.printer_client, "_mqtt", None)
+ mqtt_client = getattr(client, "_mqtt", None)
if mqtt_client is not None and getattr(mqtt_client, "connected", False):
- self.printer_client._mqtt.request_full_status()
+ client._mqtt.request_full_status()
logger.info("Sent MQTT pushall request")
return
_time.sleep(0.5)
@@ -155,7 +201,9 @@ class Command(BaseCommand):
handler.setFormatter(formatter)
logger.addHandler(handler)
- def _initialize_printer(self):
+ def _initialize_printers(self):
+ """Authenticate once, discover every device bound to the account, and open
+ one BambuPrinter (own MQTT thread) per device — all in this single process."""
from bambu_run.mqtt_client import BambuPrinter
bambu_username = os.environ.get("BAMBU_USERNAME")
@@ -169,30 +217,12 @@ class Command(BaseCommand):
"environment variables must be set"
)
- logger.info("Connecting to Bambu Lab printer...")
+ logger.info("Authenticating with Bambu Lab cloud...")
try:
- if bambu_token:
- logger.info("Using saved BAMBU_TOKEN for authentication")
- self.printer_client = BambuPrinter(
- token=bambu_token, device_id=bambu_device_id
- )
- else:
- logger.info("Authenticating with username/password")
- self.printer_client = BambuPrinter(
- username=bambu_username,
- password=bambu_password,
- device_id=bambu_device_id,
- )
-
- logger.info("Initiating MQTT connection...")
- self.printer_client.connect(blocking=False)
- logger.info("MQTT connection initiated (non-blocking)")
- # Request full status so AMS + dual-nozzle data arrive on startup.
- try:
- self._request_full_status_when_ready()
- except Exception as e:
- logger.warning("pushall request skipped (non-fatal): %s", e)
-
+ auth = BambuPrinter(
+ username=bambu_username, password=bambu_password, token=bambu_token,
+ )
+ self._token = auth._ensure_token()
except Exception as e:
if "CERTIFICATE_VERIFY_FAILED" in str(e) or "SSL" in str(e):
error_msg = (
@@ -203,56 +233,62 @@ class Command(BaseCommand):
"3. pip install --upgrade certifi\n"
)
raise CommandError(error_msg)
- raise CommandError(f"Failed to initialize printer client: {e}")
+ raise CommandError(f"Failed to authenticate: {e}")
- self.printer_device = self._ensure_printer_device_exists()
- logger.info(f"Initialized for printer device: {self.printer_device}")
-
- def _ensure_printer_device_exists(self) -> Printer:
- try:
- snapshot = self.printer_client.get_snapshot()
-
- if snapshot:
- device, created = Printer.objects.update_or_create(
- model="Bambu Lab",
- defaults={
- "name": "Bambu Lab Printer",
- "manufacturer": "Bambu Lab",
- "is_active": True,
- },
- )
- action = "Created" if created else "Updated"
- logger.info(f"{action} printer device record: {device}")
- return device
- else:
- logger.warning("Snapshot returned None - MQTT not connected yet")
- device = Printer.objects.filter(is_active=True).first()
- if device:
- logger.info(f"Using existing device record: {device}")
- return device
- else:
- device = Printer.objects.create(
- name="Bambu Lab Printer",
- model="Bambu Lab",
- manufacturer="Bambu Lab",
- is_active=True,
- )
- logger.info(f"Created placeholder device: {device}")
- return device
-
- except Exception as e:
- logger.error(f"Error during device initialization: {e}")
+ device_infos = self._discover_devices(bambu_device_id)
+ for device_id, device_info in device_infos.items():
try:
- device = Printer.objects.filter(is_active=True).first()
- if device:
- logger.warning(f"Using existing device record from DB: {device}")
- return device
- else:
- raise CommandError(
- "No printer device found in database and initialization failed."
- )
- except Printer.DoesNotExist:
- raise CommandError("Failed to create or retrieve printer device.")
+ self._add_session(device_id, device_info)
+ except Exception as e:
+ logger.error(f"Failed to initialize printer {device_id}: {e}")
+
+ if not self.sessions:
+ raise CommandError("No printer sessions could be initialized")
+
+ def _discover_devices(self, explicit_device_id: Optional[str]) -> Dict[str, dict]:
+ """Return {device_id: device_info} for every printer to monitor.
+
+ device_info comes from BambuClient.get_devices() (name, dev_product_name,
+ etc.) — empty dict when explicitly pinned to one device via BAMBU_DEVICE_ID
+ and the cloud listing can't be reached.
+ """
+ from bambu_run.mqtt_client import BambuClient
+
+ try:
+ cloud = BambuClient(token=self._token)
+ devices = cloud.get_devices()
+ except Exception as e:
+ if explicit_device_id:
+ logger.warning(f"Could not list account devices ({e}); using BAMBU_DEVICE_ID only")
+ return {explicit_device_id: {}}
+ raise
+
+ device_infos = {d.get("dev_id"): d for d in devices if d.get("dev_id")}
+
+ if explicit_device_id:
+ return {explicit_device_id: device_infos.get(explicit_device_id, {})}
+
+ if not device_infos:
+ raise CommandError("No devices found on this account")
+
+ return device_infos
+
+ def _add_session(self, device_id: str, device_info: dict) -> "DeviceSession":
+ from bambu_run.mqtt_client import BambuPrinter
+
+ logger.info(f"Connecting to printer {device_id} ({device_info.get('name', 'unknown')})...")
+ client = BambuPrinter(token=self._token, device_id=device_id)
+ client.connect(blocking=False)
+ try:
+ self._request_full_status_when_ready(client)
+ except Exception as e:
+ logger.warning("pushall request skipped (non-fatal): %s", e)
+
+ printer = resolve_printer_device(device_id, device_info)
+ session = DeviceSession(device_id=device_id, client=client, printer=printer)
+ self.sessions[device_id] = session
+ logger.info(f"Initialized session for printer: {printer}")
+ return session
def _run_continuous_loop(self, interval: int):
iteration = 0
@@ -263,7 +299,8 @@ class Command(BaseCommand):
if self.verbose:
logger.debug(f"=== Iteration {iteration} ===")
- self._collect_printer_data()
+ for session in list(self.sessions.values()):
+ self._collect_printer_data(session)
elapsed = time.time() - loop_start
sleep_time = max(0, interval - elapsed)
@@ -273,9 +310,28 @@ class Command(BaseCommand):
if iteration % 100 == 0:
self._print_statistics()
+ self._refresh_devices()
time.sleep(sleep_time)
+ def _refresh_devices(self):
+ """Pick up printers added to the account without restarting the process."""
+ if os.environ.get("BAMBU_DEVICE_ID"):
+ return # pinned to a single explicit device — nothing to discover
+ try:
+ device_infos = self._discover_devices(None)
+ except Exception as e:
+ logger.warning(f"Device refresh skipped (non-fatal): {e}")
+ return
+
+ for device_id, device_info in device_infos.items():
+ if device_id not in self.sessions:
+ logger.info(f"New printer detected on account: {device_id}")
+ try:
+ self._add_session(device_id, device_info)
+ except Exception as e:
+ logger.error(f"Failed to initialize newly-detected printer {device_id}: {e}")
+
def _convert_mqtt_color(self, mqtt_color):
if not mqtt_color:
return None
@@ -498,6 +554,8 @@ class Command(BaseCommand):
printer_metric=printer_metric,
filament=filament,
tray_id=tray_id,
+ ams_unit_id=unit_id_int,
+ ams_type=tray_data.get('ams_type', '') or '',
slot_name=tray_data.get('slot'),
type=tray_data.get('type'),
sub_type=tray_data.get('sub_type'),
@@ -513,19 +571,51 @@ class Command(BaseCommand):
match_method=match_method
)
- def _track_print_job(self, metric, snapshot):
- from bambu_run.models import PrintJob, FilamentUsage
+ def _update_hotends(self, printer, printer_metric, hotends_data):
+ from bambu_run.models import Hotend, HotendSnapshot
+
+ for h in hotends_data:
+ if h.get("is_empty"):
+ continue
+
+ hotend, _ = Hotend.objects.update_or_create(
+ printer=printer,
+ serial_number=h.get("serial_number"),
+ defaults={
+ "raw_id": h.get("raw_id", 0),
+ "nozzle_type": h.get("nozzle_type", ""),
+ "diameter": self._to_decimal(h.get("diameter")),
+ "slot_number": h.get("slot_number"),
+ "is_toolhead": bool(h.get("is_toolhead")),
+ "last_filament_profile_id": h.get("fila_id", ""),
+ "last_color": h.get("color") or "",
+ "used_time_seconds": h.get("used_time_seconds", 0),
+ "wear_percent": h.get("wear_percent", 0),
+ },
+ )
+
+ HotendSnapshot.objects.create(
+ printer_metric=printer_metric,
+ hotend=hotend,
+ raw_id=h.get("raw_id", 0),
+ used_time_seconds=h.get("used_time_seconds", 0),
+ wear_percent=h.get("wear_percent", 0),
+ stat=h.get("stat"),
+ )
+
+ def _track_print_job(self, session, metric, snapshot):
+ from bambu_run.models import PrintJob
gcode_state = snapshot.get('gcode_state')
subtask_name = snapshot.get('subtask_name')
- if self._is_print_starting(gcode_state, subtask_name):
- if self.current_print_job:
- self._finalize_print_job(metric, snapshot)
+ if self._is_print_starting(session, gcode_state, subtask_name):
+ if session.current_print_job:
+ self._finalize_print_job(session, metric, snapshot)
raw_task_id = snapshot.get('task_id')
- self.current_print_job = PrintJob.objects.create(
- device=self.printer_device,
+ session.current_print_job = PrintJob.objects.create(
+ device=session.printer,
project_name=subtask_name,
gcode_file=snapshot.get('gcode_file'),
start_time=metric.timestamp,
@@ -534,109 +624,119 @@ class Command(BaseCommand):
completion_percent=snapshot.get('print_percent', 0),
cloud_task_id_raw=int(raw_task_id) if raw_task_id else None,
)
- self.trays_used = set()
- logger.info(f"Print job started: {subtask_name}")
+ session.trays_used = set()
+ logger.info(f"[{session.device_id}] Print job started: {subtask_name}")
- if self.current_print_job:
+ if session.current_print_job:
tray_now = snapshot.get('tray_now', '')
if tray_now not in (None, '', '255'):
try:
tray_id = int(tray_now)
if 0 <= tray_id <= 15:
- self.trays_used.add(tray_id)
+ session.trays_used.add(tray_id)
except (ValueError, TypeError):
pass
- if self._is_print_ending(gcode_state) and self.current_print_job:
- self._finalize_print_job(metric, snapshot)
+ if self._is_print_ending(session, gcode_state) and session.current_print_job:
+ self._finalize_print_job(session, metric, snapshot)
- self.last_gcode_state = gcode_state
- self.last_subtask_name = subtask_name
+ session.last_gcode_state = gcode_state
+ session.last_subtask_name = subtask_name
- def _is_print_starting(self, gcode_state, subtask_name):
+ def _is_print_starting(self, session, gcode_state, subtask_name):
is_printing = gcode_state not in ['FINISH', 'IDLE', 'FAILED', None, '']
- has_new_job = subtask_name and subtask_name != self.last_subtask_name
+ has_new_job = subtask_name and subtask_name != session.last_subtask_name
return is_printing and has_new_job
- def _is_print_ending(self, gcode_state):
+ def _is_print_ending(self, session, gcode_state):
ending_states = ['FINISH', 'FAILED']
- return gcode_state in ending_states and self.last_gcode_state not in ending_states
+ return gcode_state in ending_states and session.last_gcode_state not in ending_states
- def _finalize_print_job(self, metric, snapshot):
+ def _finalize_print_job(self, session, metric, snapshot):
from bambu_run.models import FilamentUsage
- self.current_print_job.end_time = metric.timestamp
- self.current_print_job.end_metric = metric
- self.current_print_job.final_status = snapshot.get('gcode_state')
- self.current_print_job.completion_percent = snapshot.get('print_percent', 0)
- self.current_print_job.calculate_duration()
- self.current_print_job.save()
+ job = session.current_print_job
+ job.end_time = metric.timestamp
+ job.end_metric = metric
+ job.final_status = snapshot.get('gcode_state')
+ job.completion_percent = snapshot.get('print_percent', 0)
+ job.calculate_duration()
+ job.save()
try:
from bambu_run.bambu_cloud import fetch_and_upsert_task
- fetch_and_upsert_task(self.printer_client._client, self.current_print_job)
+ fetch_and_upsert_task(session.client._client, job)
except Exception as e:
logger.warning(f"Cloud task sync skipped (non-fatal): {e}")
- start_metric = self.current_print_job.start_metric
+ start_metric = job.start_metric
if not start_metric:
- logger.warning(f"No start_metric for job {self.current_print_job.id}, skipping filament usage")
- elif not self.trays_used:
- logger.warning(f"No trays tracked for job {self.current_print_job.project_name}, skipping filament usage")
+ logger.warning(f"No start_metric for job {job.id}, skipping filament usage")
+ elif not session.trays_used:
+ logger.warning(f"No trays tracked for job {job.project_name}, skipping filament usage")
else:
- for tray_id in self.trays_used:
- start_snap = start_metric.filament_snapshots.filter(
+ # A bare tray_id (from `tray_now`) doesn't identify which physical AMS
+ # unit was active when multiple units share the same slot numbering —
+ # so create one usage row per (unit, tray) that had a tracked filament
+ # loaded at job start, rather than guessing a single "correct" unit.
+ created_usages = []
+ for tray_id in session.trays_used:
+ start_snaps = start_metric.filament_snapshots.filter(
tray_id=tray_id, filament__isnull=False
- ).first()
- if not start_snap:
- continue
-
- end_snap = metric.filament_snapshots.filter(
- filament=start_snap.filament, tray_id=tray_id
- ).first()
-
- usage = FilamentUsage.objects.create(
- print_job=self.current_print_job,
- filament=start_snap.filament,
- tray_id=tray_id,
- starting_percent=start_snap.remain_percent or 100,
- ending_percent=end_snap.remain_percent if end_snap else None,
- is_primary=(len(self.trays_used) == 1),
)
- usage.calculate_consumed()
+ for start_snap in start_snaps:
+ end_snap = metric.filament_snapshots.filter(
+ filament=start_snap.filament,
+ tray_id=tray_id,
+ ams_unit_id=start_snap.ams_unit_id,
+ ).first()
+
+ usage = FilamentUsage.objects.create(
+ print_job=job,
+ filament=start_snap.filament,
+ tray_id=tray_id,
+ ams_unit_id=start_snap.ams_unit_id,
+ starting_percent=start_snap.remain_percent or 100,
+ ending_percent=end_snap.remain_percent if end_snap else None,
+ )
+ usage.calculate_consumed()
+ created_usages.append(usage)
+
+ for usage in created_usages:
+ usage.is_primary = len(created_usages) == 1
usage.save()
if self.verbose:
logger.debug(
- f"Filament usage for {start_snap.filament} (tray {tray_id}): "
+ f"Filament usage for {usage.filament} (unit {usage.ams_unit_id}, tray {usage.tray_id}): "
f"{usage.starting_percent}% -> {usage.ending_percent}%, consumed {usage.consumed_percent}%"
)
logger.info(
- f"Print job finished: {self.current_print_job.project_name} "
- f"({self.current_print_job.final_status}) - Duration: {self.current_print_job.duration_minutes} min, "
- f"Trays used: {sorted(self.trays_used) if self.trays_used else 'none tracked'}"
+ f"[{session.device_id}] Print job finished: {job.project_name} "
+ f"({job.final_status}) - Duration: {job.duration_minutes} min, "
+ f"Trays used: {sorted(session.trays_used) if session.trays_used else 'none tracked'}"
)
- self.current_print_job = None
- self.trays_used = set()
+ session.current_print_job = None
+ session.trays_used = set()
- def _collect_printer_data(self):
+ def _collect_printer_data(self, session: "DeviceSession"):
try:
- snapshot = self.printer_client.get_snapshot()
+ snapshot = session.client.get_snapshot()
if snapshot is None:
- self.mqtt_connect_errors += 1
- if self.mqtt_connect_errors <= 5 or self.verbose:
+ session.mqtt_connect_errors += 1
+ if session.mqtt_connect_errors <= 5 or self.verbose:
logger.warning(
- f"MQTT not connected yet or no data available "
- f"(attempt {self.mqtt_connect_errors})"
+ f"[{session.device_id}] MQTT not connected yet or no data available "
+ f"(attempt {session.mqtt_connect_errors})"
)
return
with transaction.atomic():
metric = PrinterMetrics.objects.create(
- device=self.printer_device,
+ device=session.printer,
timestamp=timezone.now(),
nozzle_temp=self._to_decimal(snapshot.get("nozzle_temp")),
nozzle_target_temp=self._to_decimal(snapshot.get("nozzle_target_temp")),
@@ -688,27 +788,33 @@ class Command(BaseCommand):
ams_units=snapshot.get("ams_units", []),
external_spool=snapshot.get("external_spool", {}),
lights_report=snapshot.get("lights_report", []),
+ vortek_raw=snapshot.get("vortek_raw", {}),
+ nozzle_info=snapshot.get("hotends", []),
)
filaments_data = snapshot.get('filaments', [])
if filaments_data:
self._create_filament_snapshots(metric, filaments_data, snapshot)
- self._track_print_job(metric, snapshot)
+ hotends_data = snapshot.get('hotends', [])
+ if hotends_data:
+ self._update_hotends(session.printer, metric, hotends_data)
- self.success_count += 1
+ self._track_print_job(session, metric, snapshot)
+
+ session.success_count += 1
if self.verbose:
logger.debug(
- f"Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, "
+ f"[{session.device_id}] Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, "
f"Bed={snapshot.get('bed_temp')}C, "
f"Progress={snapshot.get('print_percent')}%, "
f"State={snapshot.get('gcode_state')}"
)
except Exception as e:
- self.error_count += 1
- logger.error(f"Error collecting printer data (total errors: {self.error_count}): {e}")
+ session.error_count += 1
+ logger.error(f"[{session.device_id}] Error collecting printer data (total errors: {session.error_count}): {e}")
if self.verbose:
logger.exception("Detailed traceback:")
@@ -723,16 +829,20 @@ class Command(BaseCommand):
def _print_statistics(self):
if self.start_time:
runtime = timezone.now() - self.start_time
- total_collections = self.success_count + self.error_count
+ success_count = sum(s.success_count for s in self.sessions.values())
+ error_count = sum(s.error_count for s in self.sessions.values())
+ mqtt_connect_errors = sum(s.mqtt_connect_errors for s in self.sessions.values())
+ total_collections = success_count + error_count
success_rate = (
- (self.success_count / total_collections * 100)
+ (success_count / total_collections * 100)
if total_collections > 0
else 0
)
logger.info("=== Statistics ===")
logger.info(f"Runtime: {runtime}")
- logger.info(f"Successful collections: {self.success_count}")
- logger.info(f"Failed collections: {self.error_count}")
- logger.info(f"MQTT connection warnings: {self.mqtt_connect_errors}")
+ logger.info(f"Printers tracked: {len(self.sessions)}")
+ logger.info(f"Successful collections: {success_count}")
+ logger.info(f"Failed collections: {error_count}")
+ logger.info(f"MQTT connection warnings: {mqtt_connect_errors}")
logger.info(f"Success rate: {success_rate:.1f}%")
diff --git a/bambu_run/management/commands/bambu_diagnose.py b/bambu_run/management/commands/bambu_diagnose.py
new file mode 100644
index 0000000..fc18c46
--- /dev/null
+++ b/bambu_run/management/commands/bambu_diagnose.py
@@ -0,0 +1,128 @@
+"""
+Diagnose multi-printer cloud data for a Bambu Lab account.
+
+Run this if `bambu_collector` doesn't pick up all your printers, or the data
+collected for a second/third printer looks wrong. It authenticates with your
+Bambu Lab account, lists every device the cloud API reports, listens briefly
+for raw MQTT data from each one, and writes a redacted JSON report you can
+attach to a GitHub issue.
+
+Usage:
+ python manage.py bambu_diagnose
+ python manage.py bambu_diagnose --listen-seconds 15
+ python manage.py bambu_diagnose --output my_report.json
+ python manage.py bambu_diagnose --no-redact # local debugging only — do NOT post this output publicly
+"""
+
+import json
+import logging
+import time
+
+from django.core.management.base import BaseCommand, CommandError
+
+from bambu_run.diagnostics import build_diagnostics_report, redact_diagnostics
+
+logger = logging.getLogger("bambu_run.diagnose")
+
+
+class Command(BaseCommand):
+ help = "Authenticate, list every printer on the account, and write a redacted diagnostics report."
+
+ def add_arguments(self, parser):
+ parser.add_argument(
+ "--listen-seconds", type=float, default=8.0,
+ help="How long to listen for MQTT data per device (default: 8)",
+ )
+ parser.add_argument(
+ "--output", type=str, default=None,
+ help="Output file path (default: bambu_diagnostics_.json)",
+ )
+ parser.add_argument(
+ "--no-redact", action="store_true",
+ help="Keep full serials/identifiers unmasked. For your own debugging only — "
+ "do not paste this output into a public GitHub issue.",
+ )
+
+ def handle(self, *args, **options):
+ import os
+ from bambu_run.mqtt_client import BambuPrinter, BambuClient
+
+ listen_seconds = options["listen_seconds"]
+ redact = not options["no_redact"]
+
+ bambu_username = os.environ.get("BAMBU_USERNAME")
+ bambu_password = os.environ.get("BAMBU_PASSWORD")
+ bambu_token = os.environ.get("BAMBU_TOKEN")
+
+ if not bambu_token and not all([bambu_username, bambu_password]):
+ raise CommandError(
+ "Either BAMBU_TOKEN or both BAMBU_USERNAME and BAMBU_PASSWORD "
+ "environment variables must be set"
+ )
+
+ self.stdout.write("Authenticating with Bambu Lab cloud...")
+ auth = BambuPrinter(username=bambu_username, password=bambu_password, token=bambu_token)
+ token = auth._ensure_token()
+
+ cloud = BambuClient(token=token)
+ devices = cloud.get_devices()
+
+ self.stdout.write(self.style.SUCCESS(f"Found {len(devices)} device(s) on this account:"))
+ for device in devices:
+ self.stdout.write(
+ f" - {device.get('name', 'unknown')} "
+ f"({device.get('dev_product_name', 'unknown model')}) "
+ f"online={device.get('online')}"
+ )
+ if len(devices) < 2:
+ self.stdout.write(self.style.WARNING(
+ "Only one device returned by the cloud API — if you own multiple printers, "
+ "this is likely the root cause. Note this in the GitHub issue."
+ ))
+
+ raw_payloads = {}
+ for device in devices:
+ dev_id = device.get("dev_id")
+ if not dev_id:
+ continue
+ self.stdout.write(f"Listening to {device.get('name', dev_id)} for {listen_seconds:.0f}s...")
+ client = BambuPrinter(token=token, device_id=dev_id)
+ try:
+ client.connect(blocking=False)
+ self._request_full_status_when_ready(client)
+ time.sleep(listen_seconds)
+ state = client.get_state()
+ raw_payloads[dev_id] = state._raw_data.get("print") if state._raw_data else None
+ except Exception as e:
+ self.stdout.write(self.style.WARNING(f" Could not collect data for {dev_id}: {e}"))
+ raw_payloads[dev_id] = None
+ finally:
+ client.disconnect()
+
+ report = build_diagnostics_report(devices, raw_payloads)
+ report = redact_diagnostics(report, redact=redact)
+
+ output_path = options["output"] or f"bambu_diagnostics_{int(time.time())}.json"
+ with open(output_path, "w") as f:
+ json.dump(report, f, indent=2, default=str)
+
+ self.stdout.write(self.style.SUCCESS(f"\nDiagnostics written to: {output_path}"))
+ if not redact:
+ self.stdout.write(self.style.WARNING(
+ "--no-redact was used: this file contains unmasked serials/identifiers. "
+ "Do not attach it to a public GitHub issue as-is."
+ ))
+ else:
+ self.stdout.write(
+ "Serials/identifiers are masked. Please skim the file once before posting — "
+ "then attach it to https://github.com/RunLit/Bambu-Run/issues/10"
+ )
+
+ def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None:
+ deadline = time.time() + timeout
+ while time.time() < deadline:
+ mqtt_client = getattr(client, "_mqtt", None)
+ if mqtt_client is not None and getattr(mqtt_client, "connected", False):
+ client._mqtt.request_full_status()
+ return
+ time.sleep(0.5)
diff --git a/bambu_run/migrations/0005_printermetrics_vortek_raw.py b/bambu_run/migrations/0005_printermetrics_vortek_raw.py
new file mode 100644
index 0000000..c0b88fb
--- /dev/null
+++ b/bambu_run/migrations/0005_printermetrics_vortek_raw.py
@@ -0,0 +1,22 @@
+# Generated by Django 5.2.8 on 2026-06-18 12:20
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("bambu_run", "0004_h2c_dual_nozzle_and_ams_fields"),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name="printermetrics",
+ name="vortek_raw",
+ field=models.JSONField(
+ blank=True,
+ default=dict,
+ help_text="Raw print.device MQTT payload (Vortek rack groundwork)",
+ ),
+ ),
+ ]
diff --git a/bambu_run/migrations/0006_alter_filamentsnapshot_options_and_more.py b/bambu_run/migrations/0006_alter_filamentsnapshot_options_and_more.py
new file mode 100644
index 0000000..4565013
--- /dev/null
+++ b/bambu_run/migrations/0006_alter_filamentsnapshot_options_and_more.py
@@ -0,0 +1,63 @@
+# Generated by Django 5.2.8 on 2026-06-20 12:40
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("bambu_run", "0005_printermetrics_vortek_raw"),
+ ]
+
+ operations = [
+ migrations.AlterModelOptions(
+ name="filamentsnapshot",
+ options={
+ "ordering": ["printer_metric", "ams_unit_id", "tray_id"],
+ "verbose_name": "Filament Snapshot",
+ "verbose_name_plural": "Filament Snapshots",
+ },
+ ),
+ migrations.AddField(
+ model_name="filamentsnapshot",
+ name="ams_type",
+ field=models.CharField(
+ blank=True,
+ choices=[
+ ("AMS", "AMS"),
+ ("AMS 2 Pro", "AMS 2 Pro"),
+ ("AMS HT", "AMS HT"),
+ ],
+ default="",
+ help_text="Type of the AMS unit this tray belongs to (AMS / AMS 2 Pro / AMS HT)",
+ max_length=32,
+ ),
+ ),
+ migrations.AddField(
+ model_name="filamentsnapshot",
+ name="ams_unit_id",
+ field=models.PositiveSmallIntegerField(
+ blank=True,
+ db_index=True,
+ help_text="Which physical AMS unit this tray belongs to (matches MQTT ams[i].id; 128 = AMS HT)",
+ null=True,
+ ),
+ ),
+ migrations.AddField(
+ model_name="filamentusage",
+ name="ams_unit_id",
+ field=models.PositiveSmallIntegerField(
+ blank=True,
+ db_index=True,
+ help_text="Which physical AMS unit the slot belongs to (matches MQTT ams[i].id; 128 = AMS HT)",
+ null=True,
+ ),
+ ),
+ migrations.AddIndex(
+ model_name="filamentsnapshot",
+ index=models.Index(
+ fields=["printer_metric", "ams_unit_id", "tray_id"],
+ name="infrastruct_printer_2ad168_idx",
+ ),
+ ),
+ ]
diff --git a/bambu_run/migrations/0007_hotend_hotendsnapshot.py b/bambu_run/migrations/0007_hotend_hotendsnapshot.py
new file mode 100644
index 0000000..adf7a75
--- /dev/null
+++ b/bambu_run/migrations/0007_hotend_hotendsnapshot.py
@@ -0,0 +1,172 @@
+# Generated by Django 5.2.8 on 2026-06-20 14:07
+
+import django.db.models.deletion
+import django.utils.timezone
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("bambu_run", "0006_alter_filamentsnapshot_options_and_more"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="Hotend",
+ fields=[
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("serial_number", models.CharField(db_index=True, max_length=100)),
+ (
+ "nozzle_type",
+ models.CharField(blank=True, default="", max_length=50),
+ ),
+ (
+ "diameter",
+ models.DecimalField(
+ blank=True, decimal_places=2, max_digits=3, null=True
+ ),
+ ),
+ (
+ "raw_id",
+ models.PositiveSmallIntegerField(
+ help_text="Last-seen MQTT device.nozzle.info[].id"
+ ),
+ ),
+ (
+ "slot_number",
+ models.PositiveSmallIntegerField(
+ blank=True,
+ help_text="Rack bay 1-6, derived from raw_id 16-21. Null if currently unknown (e.g. mounted on toolhead and id reports as the 0 sentinel).",
+ null=True,
+ ),
+ ),
+ (
+ "is_toolhead",
+ models.BooleanField(
+ default=False,
+ help_text="True if currently mounted on the toolhead under normal polling (raw_id == 0).",
+ ),
+ ),
+ (
+ "last_filament_profile_id",
+ models.CharField(
+ blank=True,
+ default="",
+ help_text="Bambu material profile id of the filament last loaded (MQTT fila_id, e.g. 'GFA01')",
+ max_length=20,
+ ),
+ ),
+ (
+ "last_color",
+ models.CharField(
+ blank=True,
+ default="",
+ help_text="6-char hex of the filament last loaded (MQTT color_m, alpha stripped)",
+ max_length=6,
+ ),
+ ),
+ ("used_time_seconds", models.PositiveIntegerField(default=0)),
+ (
+ "wear_percent",
+ models.DecimalField(
+ decimal_places=2,
+ default=0,
+ help_text="MQTT wear (0-128 scale) converted to a 0-100 percent",
+ max_digits=5,
+ ),
+ ),
+ ("last_seen_at", models.DateTimeField(auto_now=True)),
+ ("created_at", models.DateTimeField(auto_now_add=True)),
+ (
+ "printer",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="hotends",
+ to="bambu_run.printer",
+ ),
+ ),
+ ],
+ options={
+ "verbose_name": "Hotend",
+ "verbose_name_plural": "Hotends",
+ "db_table": "infrastructure_hotend",
+ "ordering": ["printer", "-is_toolhead", "slot_number", "serial_number"],
+ "unique_together": {("printer", "serial_number")},
+ },
+ ),
+ migrations.CreateModel(
+ name="HotendSnapshot",
+ fields=[
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("raw_id", models.PositiveSmallIntegerField()),
+ ("used_time_seconds", models.PositiveIntegerField(default=0)),
+ (
+ "wear_percent",
+ models.DecimalField(decimal_places=2, default=0, max_digits=5),
+ ),
+ (
+ "stat",
+ models.IntegerField(
+ blank=True,
+ help_text="Raw MQTT status code for this hotend",
+ null=True,
+ ),
+ ),
+ (
+ "timestamp",
+ models.DateTimeField(
+ db_index=True, default=django.utils.timezone.now
+ ),
+ ),
+ (
+ "hotend",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="snapshots",
+ to="bambu_run.hotend",
+ ),
+ ),
+ (
+ "printer_metric",
+ models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="hotend_snapshots",
+ to="bambu_run.printermetrics",
+ ),
+ ),
+ ],
+ options={
+ "verbose_name": "Hotend Snapshot",
+ "verbose_name_plural": "Hotend Snapshots",
+ "db_table": "infrastructure_hotend_snapshot",
+ "ordering": ["printer_metric", "hotend"],
+ "indexes": [
+ models.Index(
+ fields=["printer_metric", "hotend"],
+ name="infrastruct_printer_b528aa_idx",
+ ),
+ models.Index(
+ fields=["hotend", "-timestamp"],
+ name="infrastruct_hotend__691f7e_idx",
+ ),
+ ],
+ },
+ ),
+ ]
diff --git a/bambu_run/migrations/0008_printermetrics_nozzle_info.py b/bambu_run/migrations/0008_printermetrics_nozzle_info.py
new file mode 100644
index 0000000..4880437
--- /dev/null
+++ b/bambu_run/migrations/0008_printermetrics_nozzle_info.py
@@ -0,0 +1,22 @@
+# Generated by Django 5.2.8 on 2026-06-20 14:22
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("bambu_run", "0007_hotend_hotendsnapshot"),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name="printermetrics",
+ name="nozzle_info",
+ field=models.JSONField(
+ blank=True,
+ default=list,
+ help_text="Parsed per-poll nozzle/hotend info list",
+ ),
+ ),
+ ]
diff --git a/bambu_run/models.py b/bambu_run/models.py
index cfa3ee1..85dbff3 100644
--- a/bambu_run/models.py
+++ b/bambu_run/models.py
@@ -21,12 +21,15 @@ AMS_TYPE_CHOICES = [
def ams_type_from_info(info_code) -> str:
"""Resolve an AMS unit's `info` model code to a human label.
- The HT unit reports its `id` with the 0x80 bit set (e.g. 128) — when the info
- code is unknown, that bit is a reasonable secondary hint for HT identification.
+ Real MQTT `info` codes are 8 characters (e.g. "10001003") with the type encoded
+ in the last 4 digits — confirmed against a live H2C with AMS 2 Pro / AMS / AMS HT.
+ Fall back to an exact match for the bare 4-digit form in case other firmware
+ reports it short.
"""
- if info_code is None:
+ if not info_code:
return ""
- return AMS_INFO_TO_TYPE.get(str(info_code), "")
+ code = str(info_code)
+ return AMS_INFO_TO_TYPE.get(code[-4:], "") or AMS_INFO_TO_TYPE.get(code, "")
class Printer(models.Model):
@@ -231,6 +234,23 @@ class PrinterMetrics(models.Model):
default=list, help_text="Light status report [{node, mode}]"
)
+ # Groundwork for H2C's Vortek nozzle-changer rack (6 swappable hotends + 1 fixed
+ # left nozzle) — the full MQTT schema for per-slot state isn't confirmed yet, so
+ # the raw `print.device` payload is captured here unfiltered to avoid losing data
+ # ahead of proper per-slot modeling.
+ vortek_raw = models.JSONField(
+ default=dict, blank=True, help_text="Raw print.device MQTT payload (Vortek rack groundwork)"
+ )
+
+ # Parsed device.nozzle.info[] from this poll, one dict per entry (mirrors
+ # HotendInfo.to_dict()). Includes induction-chip hotends *and* non-inductive
+ # nozzle positions (e.g. H2C's fixed left nozzle) that have no stable serial
+ # number to key a Hotend registry row on — kept here so the dashboard can show
+ # their readable type/diameter without claiming an identity/history we don't have.
+ nozzle_info = models.JSONField(
+ default=list, blank=True, help_text="Parsed per-poll nozzle/hotend info list"
+ )
+
class Meta:
db_table = "infrastructure_printer_metrics"
verbose_name = "Printer Metric"
@@ -490,6 +510,15 @@ class FilamentSnapshot(models.Model):
max_length=20, null=True, blank=True,
help_text="Slot identifier like A00-W1"
)
+ ams_unit_id = models.PositiveSmallIntegerField(
+ null=True, blank=True, db_index=True,
+ help_text="Which physical AMS unit this tray belongs to (matches MQTT ams[i].id; 128 = AMS HT)"
+ )
+ ams_type = models.CharField(
+ max_length=32, blank=True, default="",
+ choices=AMS_TYPE_CHOICES,
+ help_text="Type of the AMS unit this tray belongs to (AMS / AMS 2 Pro / AMS HT)"
+ )
type = models.CharField(max_length=50, null=True, blank=True)
sub_type = models.CharField(
@@ -537,9 +566,10 @@ class FilamentSnapshot(models.Model):
db_table = "infrastructure_filament_snapshot"
verbose_name = "Filament Snapshot"
verbose_name_plural = "Filament Snapshots"
- ordering = ['printer_metric', 'tray_id']
+ ordering = ['printer_metric', 'ams_unit_id', 'tray_id']
indexes = [
models.Index(fields=['printer_metric', 'tray_id']),
+ models.Index(fields=['printer_metric', 'ams_unit_id', 'tray_id']),
models.Index(fields=['filament']),
]
@@ -678,6 +708,10 @@ class FilamentUsage(models.Model):
)
tray_id = models.IntegerField(help_text="Which AMS slot was used")
+ ams_unit_id = models.PositiveSmallIntegerField(
+ null=True, blank=True, db_index=True,
+ help_text="Which physical AMS unit the slot belongs to (matches MQTT ams[i].id; 128 = AMS HT)"
+ )
starting_percent = models.IntegerField(help_text="Filament remaining % at job start")
ending_percent = models.IntegerField(
@@ -715,3 +749,108 @@ class FilamentUsage(models.Model):
self.consumed_grams = int(
self.filament.initial_weight_grams * (self.consumed_percent / 100.0)
)
+
+
+class Hotend(models.Model):
+ """Registry of individual Vortek hotends, keyed by serial number.
+
+ A Vortek rack holds up to 6 swappable hotends (bays, MQTT `id` 16-21) plus
+ 1 mounted on the toolhead at a time (MQTT `id` 0). `raw_id` reflects whichever
+ address was last seen on the wire for this hotend; `slot_number` is only set
+ when that address falls in the 16-21 rack-bay range — confirmed by watching
+ a "Read All" MQTT capture reassign a toolhead-mounted hotend's id from 0 to
+ its true bay id.
+ """
+
+ printer = models.ForeignKey(
+ 'Printer', on_delete=models.CASCADE, related_name='hotends'
+ )
+ serial_number = models.CharField(max_length=100, db_index=True)
+
+ nozzle_type = models.CharField(max_length=50, blank=True, default="")
+ diameter = models.DecimalField(
+ max_digits=3, decimal_places=2, null=True, blank=True
+ )
+
+ raw_id = models.PositiveSmallIntegerField(
+ help_text="Last-seen MQTT device.nozzle.info[].id"
+ )
+ slot_number = models.PositiveSmallIntegerField(
+ null=True, blank=True,
+ help_text="Rack bay 1-6, derived from raw_id 16-21. Null if currently unknown (e.g. mounted on toolhead and id reports as the 0 sentinel)."
+ )
+ is_toolhead = models.BooleanField(
+ default=False,
+ help_text="True if currently mounted on the toolhead under normal polling (raw_id == 0)."
+ )
+
+ last_filament_profile_id = models.CharField(
+ max_length=20, blank=True, default="",
+ help_text="Bambu material profile id of the filament last loaded (MQTT fila_id, e.g. 'GFA01')"
+ )
+ last_color = models.CharField(
+ max_length=6, blank=True, default="",
+ help_text="6-char hex of the filament last loaded (MQTT color_m, alpha stripped)"
+ )
+
+ used_time_seconds = models.PositiveIntegerField(default=0)
+ wear_percent = models.DecimalField(
+ max_digits=5, decimal_places=2, default=0,
+ help_text="MQTT wear (0-128 scale) converted to a 0-100 percent"
+ )
+
+ last_seen_at = models.DateTimeField(auto_now=True)
+ created_at = models.DateTimeField(auto_now_add=True)
+
+ class Meta:
+ db_table = "infrastructure_hotend"
+ verbose_name = "Hotend"
+ verbose_name_plural = "Hotends"
+ ordering = ['printer', '-is_toolhead', 'slot_number', 'serial_number']
+ unique_together = [['printer', 'serial_number']]
+
+ def __str__(self):
+ location = "Toolhead" if self.is_toolhead else (
+ f"Slot {self.slot_number}" if self.slot_number else "Rack"
+ )
+ return f"{self.serial_number} ({location})"
+
+ @property
+ def used_time_display(self) -> str:
+ hours, remainder = divmod(self.used_time_seconds, 3600)
+ minutes = remainder // 60
+ return f"{hours}h {minutes}m" if hours else f"{minutes}m"
+
+
+class HotendSnapshot(models.Model):
+ """Point-in-time reading of a Hotend, one row per collector poll."""
+
+ printer_metric = models.ForeignKey(
+ 'PrinterMetrics', on_delete=models.CASCADE,
+ related_name='hotend_snapshots'
+ )
+ hotend = models.ForeignKey(
+ 'Hotend', on_delete=models.CASCADE,
+ related_name='snapshots'
+ )
+
+ raw_id = models.PositiveSmallIntegerField()
+ used_time_seconds = models.PositiveIntegerField(default=0)
+ wear_percent = models.DecimalField(max_digits=5, decimal_places=2, default=0)
+ stat = models.IntegerField(
+ null=True, blank=True, help_text="Raw MQTT status code for this hotend"
+ )
+ timestamp = models.DateTimeField(default=timezone.now, db_index=True)
+
+ class Meta:
+ db_table = "infrastructure_hotend_snapshot"
+ verbose_name = "Hotend Snapshot"
+ verbose_name_plural = "Hotend Snapshots"
+ ordering = ['printer_metric', 'hotend']
+ indexes = [
+ models.Index(fields=['printer_metric', 'hotend']),
+ models.Index(fields=['hotend', '-timestamp']),
+ ]
+
+ def __str__(self):
+ return f"{self.hotend.serial_number} @ {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}"
diff --git a/bambu_run/mqtt_client.py b/bambu_run/mqtt_client.py
index b451c98..e8474d3 100644
--- a/bambu_run/mqtt_client.py
+++ b/bambu_run/mqtt_client.py
@@ -296,6 +296,73 @@ class AMSState:
return loaded
+@dataclass
+class HotendInfo:
+ """A single hotend reported in `device.nozzle.info[]` (Vortek rack).
+
+ `raw_id` semantics (confirmed by watching a live "Read All" MQTT capture):
+ 0 = currently mounted on the (swappable) toolhead — the sentinel hides the
+ true bay address until "Read All" resolves it; 1 = the fixed left nozzle on
+ dual-nozzle printers (no RFID chip, always reports sn="N/A"); 16-21 = rack
+ bay address, slot_number = raw_id - 15 (1-6).
+ """
+ raw_id: int = 0
+ serial_number: str = ""
+ nozzle_type: str = ""
+ diameter: float = 0.4
+ fila_id: str = ""
+ color: Optional[str] = None
+ used_time_seconds: int = 0
+ wear_percent: float = 0.0
+ stat: int = 0
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> "HotendInfo":
+ from .utils import strip_color_padding
+
+ return cls(
+ raw_id=int(data.get("id", 0)),
+ serial_number=data.get("sn", ""),
+ nozzle_type=data.get("type", ""),
+ diameter=float(data.get("diameter", 0.4)),
+ fila_id=data.get("fila_id", ""),
+ color=strip_color_padding(data.get("color_m")),
+ used_time_seconds=int(data.get("p_t", 0)),
+ wear_percent=round(float(data.get("wear", 0.0)) / 128.0 * 100, 2),
+ stat=int(data.get("stat", 0)),
+ )
+
+ @property
+ def is_toolhead(self) -> bool:
+ return self.raw_id == 0
+
+ @property
+ def is_empty(self) -> bool:
+ return self.serial_number in ("", "N/A")
+
+ @property
+ def slot_number(self) -> Optional[int]:
+ if 16 <= self.raw_id <= 21:
+ return self.raw_id - 15
+ return None
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "raw_id": self.raw_id,
+ "serial_number": self.serial_number,
+ "nozzle_type": self.nozzle_type,
+ "diameter": self.diameter,
+ "fila_id": self.fila_id,
+ "color": self.color,
+ "used_time_seconds": self.used_time_seconds,
+ "wear_percent": self.wear_percent,
+ "stat": self.stat,
+ "is_toolhead": self.is_toolhead,
+ "is_empty": self.is_empty,
+ "slot_number": self.slot_number,
+ }
+
+
@dataclass
class PrinterState:
"""Complete printer state parsed from MQTT data"""
@@ -388,6 +455,9 @@ class PrinterState:
# External spool (virtual tray)
vt_tray: Optional[Dict[str, Any]] = None
+ # Vortek hotend rack (device.nozzle.info[])
+ hotends: List[HotendInfo] = field(default_factory=list)
+
# Raw data for any additional fields
_raw_data: Dict[str, Any] = field(default_factory=dict, repr=False)
@@ -431,6 +501,12 @@ class PrinterState:
nozzle_target_temp_left = float((t >> 16) & 0xFFFF)
nozzle_temp_left = float(t & 0xFFFF)
+ # Vortek hotend rack: device.nozzle.info[] — one entry per hotend.
+ hotends = [
+ HotendInfo.from_dict(h)
+ for h in (device.get("nozzle") or {}).get("info") or []
+ ]
+
return cls(
timestamp=timestamp,
sequence_id=str(print_data.get("sequence_id", "")),
@@ -487,6 +563,7 @@ class PrinterState:
gcode_file_prepare_percent=str(print_data.get("gcode_file_prepare_percent", "")),
lifecycle=print_data.get("lifecycle", ""),
vt_tray=print_data.get("vt_tray"),
+ hotends=hotends,
_raw_data=data,
)
@@ -529,6 +606,11 @@ class PrinterState:
"wifi_signal_dbm": self.wifi_signal_dbm,
"print_error": self.print_error,
"has_errors": self.print_error != 0,
+ # Full `print.device` payload, unfiltered. H2C's Vortek rack (6 swappable
+ # hotends + 1 fixed left nozzle) isn't fully modeled yet — stash everything
+ # here so no data is lost once the real Vortek MQTT schema is confirmed.
+ "vortek_raw": self._raw_data.get("print", {}).get("device", {}),
+ "hotends": [h.to_dict() for h in self.hotends],
"hms": self.hms,
"stg_cur": self.stg_cur,
"lights_report": self.lights_report,
diff --git a/bambu_run/static/bambu_run/css/dashboard.css b/bambu_run/static/bambu_run/css/dashboard.css
index bda76e6..1f826e1 100644
--- a/bambu_run/static/bambu_run/css/dashboard.css
+++ b/bambu_run/static/bambu_run/css/dashboard.css
@@ -64,3 +64,93 @@
opacity: 0.9;
color: rgba(255, 255, 255, 0.9);
}
+
+/* AMS unit type colors — CSS variables so RAE/standalone can override per theme */
+:root {
+ --ams-badge-ams: #6c757d;
+ --ams-badge-ams-2-pro: #0d6efd;
+ --ams-badge-ams-ht: #fd7e14;
+ --ams-group-border-color: rgba(0, 0, 0, 0.15);
+}
+
+[data-coreui-theme="dark"] {
+ --ams-group-border-color: rgba(255, 255, 255, 0.2);
+}
+
+.ams-badge-ams {
+ background-color: var(--ams-badge-ams);
+ color: #fff;
+}
+
+.ams-badge-ams-2-pro {
+ background-color: var(--ams-badge-ams-2-pro);
+ color: #fff;
+}
+
+.ams-badge-ams-ht {
+ background-color: var(--ams-badge-ams-ht);
+ color: #fff;
+}
+
+.ams-filter-pills {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 0.5rem;
+}
+
+.ams-filter-pill {
+ border-radius: 50rem;
+ padding: 0.25rem 0.9rem;
+ font-size: 0.85rem;
+ border: 1px solid var(--ams-group-border-color);
+ background-color: transparent;
+ opacity: 0.6;
+}
+
+.ams-filter-pill.active {
+ opacity: 1;
+ font-weight: 600;
+ border-color: currentColor;
+}
+
+/* Grouped AMS unit panels — wide (multi-slot) units stack one per row,
+ compact (single-slot, e.g. AMS HT) units flow side-by-side and wrap. */
+.ams-groups {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 1rem;
+}
+
+.ams-group {
+ border-radius: 0.5rem;
+ padding: 0.75rem;
+ border: 1px solid var(--ams-group-border-color);
+}
+
+.ams-group--wide {
+ flex: 1 1 100%;
+}
+
+.ams-group--compact {
+ flex: 0 1 auto;
+ min-width: 220px;
+}
+
+.ams-badge-bg-ams {
+ background-color: color-mix(in srgb, var(--ams-badge-ams) 8%, transparent);
+ border-left: 3px solid var(--ams-badge-ams);
+}
+
+.ams-badge-bg-ams-2-pro {
+ background-color: color-mix(in srgb, var(--ams-badge-ams-2-pro) 8%, transparent);
+ border-left: 3px solid var(--ams-badge-ams-2-pro);
+}
+
+.ams-badge-bg-ams-ht {
+ background-color: color-mix(in srgb, var(--ams-badge-ams-ht) 8%, transparent);
+ border-left: 3px solid var(--ams-badge-ams-ht);
+}
+
+.ams-badge-bg- {
+ border-left: 3px solid var(--ams-group-border-color);
+}
diff --git a/bambu_run/static/bambu_run/js/printer_charts.js b/bambu_run/static/bambu_run/js/printer_charts.js
index a40a015..ecce8c4 100644
--- a/bambu_run/static/bambu_run/js/printer_charts.js
+++ b/bambu_run/static/bambu_run/js/printer_charts.js
@@ -625,8 +625,23 @@ function createFilamentDatasets(filamentTimeline, timestamps) {
data: filamentTimeline[key]
}));
- // Sort by tray_id (numeric first, External last), then by start_idx (chronological)
+ // Distinct (non-null/undefined) AMS units present in this timeline — used to decide
+ // whether labels need an AMS unit prefix (avoid noise for the common single-AMS case).
+ const distinctUnits = new Set(
+ filamentEntries
+ .map(e => e.data.ams_unit_id)
+ .filter(uid => uid !== null && uid !== undefined)
+ );
+ const showUnitPrefix = distinctUnits.size > 1;
+
+ // Sort by AMS unit, then tray_id (numeric first, External last), then by start_idx
filamentEntries.sort((a, b) => {
+ const unitA = a.data.ams_unit_id ?? -1;
+ const unitB = b.data.ams_unit_id ?? -1;
+ if (unitA !== unitB) {
+ return unitA - unitB;
+ }
+
const trayA = a.data.tray_id;
const trayB = b.data.tray_id;
@@ -659,6 +674,10 @@ function createFilamentDatasets(filamentTimeline, timestamps) {
displayLabel = `Tray ${filament.tray_id} (${filament.type})`;
}
+ if (showUnitPrefix && filament.ams_type) {
+ displayLabel = `${filament.ams_type} · ${displayLabel}`;
+ }
+
// Add brand if it's different from type (avoid redundancy)
if (filament.brand && filament.brand !== filament.type && filament.brand !== 'External') {
displayLabel += ` - ${filament.brand}`;
diff --git a/bambu_run/templates/bambu_run/printer_dashboard.html b/bambu_run/templates/bambu_run/printer_dashboard.html
index 3b0ecc7..a171f74 100644
--- a/bambu_run/templates/bambu_run/printer_dashboard.html
+++ b/bambu_run/templates/bambu_run/printer_dashboard.html
@@ -14,6 +14,20 @@
Real-time monitoring for {{ device_name }}
+ {% if show_printer_switcher %}
+
+ Device:
+
+ {% for p in all_printers %}
+
+ {{ p.name }}
+
+ {% endfor %}
+
+
+ {% endif %}
{% if error %}
@@ -23,32 +37,38 @@
{% if stats.is_dual_nozzle %}
-
+
-
+
-
Right Nozzle
-
{{ stats.nozzle_temp|floatformat:1 }}°C
-
target {{ stats.nozzle_target_temp|floatformat:0 }}°C
- {% if stats.nozzle_type %}· {{ stats.nozzle_type }}{% endif %}
+
+
+ Left Nozzle
+
+
{{ stats.nozzle_temp_left|floatformat:1 }}°C
+
target {{ stats.nozzle_target_temp_left|floatformat:0 }}°C
+ {% if stats.nozzle_type_left %}· Nozzle {{ stats.nozzle_type_left }}{% endif %}
-
+
-
+
-
Left Nozzle
-
{{ stats.nozzle_temp_left|floatformat:1 }}°C
-
target {{ stats.nozzle_target_temp_left|floatformat:0 }}°C
- {% if stats.nozzle_type_left %}· {{ stats.nozzle_type_left }}{% endif %}
+
+ Right Nozzle
+
+
+
{{ stats.nozzle_temp|floatformat:1 }}°C
+
target {{ stats.nozzle_target_temp|floatformat:0 }}°C
+ {% if stats.nozzle_type %}· Nozzle {{ stats.nozzle_type }}{% endif %}
@@ -58,7 +78,7 @@
{% else %}
-
+
@@ -74,7 +94,7 @@
-
+
@@ -89,7 +109,7 @@
-
+
@@ -104,7 +124,7 @@
-
+
@@ -184,34 +204,59 @@
{% if stats.filaments %}
-
- {% for filament in stats.filaments %}
-
-
-
-
-
Tray {{ filament.tray_id }}
- {% if filament.filament_pk %}
-
-
-
- {% endif %}
-
-
{{ filament.type }} - {{ filament.brand }}
- {% if filament.color_name %}
{{ filament.color_name }}
{% endif %}
-
- Remaining
- {{ filament.remain_percent }}%
-
-
-
+ {% if stats.ams_units|length > 1 %}
+
+ All
+ {% for unit in stats.ams_units %}
+ {{ unit.ams_type|default:"AMS" }}
+ {% endfor %}
+
+ {% endif %}
+
+ {% for group in stats.ams_groups %}
+
+
+
+ {% for filament in group.filaments %}
+
+
+
+
+
Tray {{ filament.tray_id }}
+ {% if filament.filament_pk %}
+
+
+
+ {% endif %}
+
+
{{ filament.type }} - {{ filament.brand }}
+ {% if filament.color_name %}
{{ filament.color_name }}
{% endif %}
+
+ Remaining
+ {{ filament.remain_percent }}%
+
+
+
+ {% endfor %}
{% endfor %}
+
- {% if stats.external_spool.type %}
+ {% if stats.external_spool.type %}
+
+ {% endif %}
{% else %}
No filament data available
{% endif %}
@@ -237,6 +282,64 @@
+
+ {% if stats.hotends or stats.nozzle_positions %}
+
+
+
+
+
+
+ {% for hotend in stats.hotends %}
+
+
+
+
+
+ {% if hotend.is_toolhead %}Toolhead{% elif hotend.slot_number %}Slot {{ hotend.slot_number }}{% else %}Rack{% endif %}
+
+ {% if hotend.is_toolhead %}Toolhead {% endif %}
+
+
SN {{ hotend.serial_number }}
+
{{ hotend.nozzle_type }} {% if hotend.diameter %} · {{ hotend.diameter }}mm{% endif %}
+ {% if hotend.last_filament_profile_id %}
Last: {{ hotend.last_filament_profile_id }}
{% endif %}
+
+ Used time
+ {{ hotend.used_time_display }}
+
+
+ Wear
+ {{ hotend.wear_percent|floatformat:0 }}%
+
+
+
+
+
+ {% endfor %}
+ {% for nozzle in stats.nozzle_positions %}
+
+
+
+
+
{% if nozzle.is_toolhead %}Toolhead{% else %}Fixed Nozzle{% endif %}
+
+
{{ nozzle.nozzle_type }} {% if nozzle.diameter %} · {{ nozzle.diameter }}mm{% endif %}
+
No induction chip data
+
+
+
+ {% endfor %}
+
+
+
+
+
+ {% endif %}
+
{% if not is_basic_user %}
@@ -423,12 +526,12 @@
-{% if not is_basic_user %}
-
+{% if not is_basic_user and printer_device %}
+
{% endif %}
+
{% endblock %}
diff --git a/bambu_run/urls.py b/bambu_run/urls.py
index 4955e3a..de412de 100644
--- a/bambu_run/urls.py
+++ b/bambu_run/urls.py
@@ -5,7 +5,9 @@ app_name = "bambu_run"
urlpatterns = [
path("", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
+ path("printer/
/", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
path("api/printer/", views.PrinterDataAPIView.as_view(), name="printer_api"),
+ path("api/printer//", views.PrinterDataAPIView.as_view(), name="printer_api"),
# Filament Inventory routes
path("filaments/", views.FilamentListView.as_view(), name="filament_list"),
diff --git a/bambu_run/views.py b/bambu_run/views.py
index fb2991b..414d5ba 100644
--- a/bambu_run/views.py
+++ b/bambu_run/views.py
@@ -1,8 +1,9 @@
from datetime import timedelta, datetime
from django.views.generic import TemplateView, View, ListView, CreateView, UpdateView, DetailView, DeleteView
from django.contrib.auth.mixins import LoginRequiredMixin
+from django.shortcuts import get_object_or_404
from django.utils import timezone
-from django.http import JsonResponse
+from django.http import Http404, JsonResponse
from django.urls import reverse_lazy
from django.contrib import messages
from django.db.models import Q, Sum
@@ -10,7 +11,7 @@ import json
import zoneinfo
from .conf import app_settings
-from .models import Printer, PrinterMetrics, Filament, FilamentColor, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage
+from .models import Printer, PrinterMetrics, Filament, FilamentColor, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, Hotend
from .forms import FilamentForm, FilamentColorForm, FilamentTypeForm
_METRICS_API_FIELDS = [
@@ -26,6 +27,17 @@ _METRICS_API_FIELDS = [
_MAX_CHART_POINTS = 3000
+def resolve_printer_from_request(pk):
+ """Resolve which Printer a dashboard/API view should show.
+
+ `pk` given (URL kwarg) -> that exact printer, 404 if missing/inactive.
+ `pk` omitted -> first active printer (today's single-printer default behavior).
+ """
+ if pk is not None:
+ return get_object_or_404(Printer, pk=pk, is_active=True)
+ return Printer.objects.filter(is_active=True).first()
+
+
class PrinterDashboardView(LoginRequiredMixin, TemplateView):
template_name = "bambu_run/printer_dashboard.html"
@@ -38,13 +50,20 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
context = super().get_context_data(**kwargs)
context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE
+ all_printers = Printer.objects.filter(is_active=True)
+ context["all_printers"] = all_printers
+ # Shown even with a single printer — hints that multi-printer support exists.
+ context["show_printer_switcher"] = all_printers.exists()
+
try:
- printer_device = Printer.objects.filter(is_active=True).first()
+ printer_device = resolve_printer_from_request(self.kwargs.get("pk"))
if not printer_device:
context["error"] = (
"No 3D printer device found. Please run bambu_collector first."
)
return context
+ except Http404:
+ raise
except Exception as e:
context["error"] = f"Error loading printer device: {str(e)}"
return context
@@ -129,6 +148,8 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
'brand': snapshot.sub_type or 'Unknown',
'color': snapshot.color or 'FFFFFFFF',
'remain_percent': snapshot.remain_percent or 0,
+ 'ams_unit_id': snapshot.ams_unit_id,
+ 'ams_type': snapshot.ams_type or '',
}
if snapshot.filament:
filament_dict['color_name'] = snapshot.filament.color
@@ -138,6 +159,37 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
except Exception:
filaments_list = []
+ # Distinct AMS units represented in this snapshot, for the unit
+ # filter/badges in the template. Sort numeric unit ids first
+ # (AMS / AMS 2 Pro), HT (id 128 / bit 0x80 set) last.
+ seen_units = {}
+ for f in filaments_list:
+ uid = f.get('ams_unit_id')
+ if uid is not None and uid not in seen_units:
+ seen_units[uid] = f.get('ams_type') or ''
+ ams_units_list = [
+ {'ams_unit_id': uid, 'ams_type': label}
+ for uid, label in sorted(seen_units.items())
+ ]
+
+ # Group trays by physical AMS unit for the panel-style dashboard layout —
+ # one tinted panel per unit, full-width for multi-slot units (AMS/AMS 2 Pro),
+ # compact for single-slot units (AMS HT) so several can flow side-by-side.
+ units_meta = {
+ u.get('unit_id'): u for u in (latest_metric.ams_units or [])
+ }
+ ams_groups = []
+ for uid, label in sorted(seen_units.items()):
+ unit_meta = units_meta.get(str(uid), {})
+ ams_groups.append({
+ 'unit_id': uid,
+ 'ams_type': label,
+ 'label': f"{label or 'AMS'} (Unit {uid})",
+ 'humidity': unit_meta.get('humidity'),
+ 'temp': unit_meta.get('temp'),
+ 'filaments': [f for f in filaments_list if f.get('ams_unit_id') == uid],
+ })
+
subtask_name = latest_metric.subtask_name or "No active print"
# Look up active PrintJob for a better display name (cloud design_title)
job_display_name = subtask_name
@@ -177,6 +229,21 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
"ams_temp": float(latest_metric.ams_temp) if latest_metric.ams_temp else None,
"ams_humidity": latest_metric.ams_humidity,
"filaments": filaments_list,
+ "ams_units": ams_units_list,
+ "ams_groups": ams_groups,
+ "hotends": list(
+ Hotend.objects.filter(printer=printer_device)
+ .order_by('-is_toolhead', 'slot_number', 'serial_number')
+ ),
+ # Nozzle positions with no induction chip (no stable serial number to
+ # key a Hotend registry row on, e.g. H2C's fixed left nozzle) — shown
+ # read-only from the latest poll, not persisted/historical. Entries with
+ # no readable type/diameter at all (i.e. genuinely nothing there) are
+ # dropped rather than shown as an empty placeholder.
+ "nozzle_positions": [
+ h for h in (latest_metric.nozzle_info or [])
+ if h.get('is_empty') and (h.get('nozzle_type') or h.get('diameter'))
+ ],
"external_spool": latest_metric.external_spool or {},
"timestamp": latest_metric.timestamp.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S"),
}
@@ -259,15 +326,19 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
for snapshot in snapshots:
tray_id = snapshot.tray_id
+ ams_unit_id = snapshot.ams_unit_id
+ ams_type = snapshot.ams_type or ''
fil_type = snapshot.type or 'Unknown'
fil_sub_type = snapshot.sub_type or 'Unknown'
fil_color = snapshot.color or 'FFFFFFFF'
- unique_key = f"{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}"
+ unique_key = f"{ams_unit_id}_{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}"
if unique_key not in filament_data:
filament_data[unique_key] = {
'tray_id': tray_id,
+ 'ams_unit_id': ams_unit_id,
+ 'ams_type': ams_type,
'type': fil_type,
'brand': fil_sub_type,
'color': fil_color,
@@ -304,16 +375,21 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
class PrinterDataAPIView(LoginRequiredMixin, View):
"""API endpoint for dynamic printer chart updates"""
- def get(self, request):
+ def get(self, request, pk=None):
start_date = request.GET.get("start_date")
end_date = request.GET.get("end_date")
start_time = request.GET.get("start_time", "00:00")
end_time = request.GET.get("end_time", "23:59")
try:
- printer_device = Printer.objects.filter(is_active=True).first()
- if not printer_device:
- return JsonResponse({"error": "No printer device found"}, status=404)
+ if pk is not None:
+ printer_device = Printer.objects.filter(pk=pk, is_active=True).first()
+ if not printer_device:
+ return JsonResponse({"error": "Printer not found"}, status=404)
+ else:
+ printer_device = Printer.objects.filter(is_active=True).first()
+ if not printer_device:
+ return JsonResponse({"error": "No printer device found"}, status=404)
tz = zoneinfo.ZoneInfo(app_settings.TIMEZONE)
diff --git a/pyproject.toml b/pyproject.toml
index ccd00c9..a0648d8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -62,3 +62,7 @@ bambu_run = [
"templates/bambu_run/*.html",
"static/bambu_run/**/*",
]
+
+[tool.pytest.ini_options]
+DJANGO_SETTINGS_MODULE = "tests.settings"
+python_files = "test_*.py"
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/settings.py b/tests/settings.py
new file mode 100644
index 0000000..3855817
--- /dev/null
+++ b/tests/settings.py
@@ -0,0 +1,40 @@
+"""Minimal Django settings for running bambu_run's pytest suite (in-memory SQLite)."""
+
+SECRET_KEY = "test-secret-key"
+
+INSTALLED_APPS = [
+ "django.contrib.auth",
+ "django.contrib.contenttypes",
+ "django.contrib.sessions",
+ "bambu_run",
+]
+
+MIDDLEWARE = [
+ "django.contrib.sessions.middleware.SessionMiddleware",
+ "django.contrib.auth.middleware.AuthenticationMiddleware",
+]
+
+TEMPLATES = [
+ {
+ "BACKEND": "django.template.backends.django.DjangoTemplates",
+ "APP_DIRS": True,
+ "OPTIONS": {
+ "context_processors": [
+ "django.template.context_processors.request",
+ "django.contrib.auth.context_processors.auth",
+ ],
+ },
+ },
+]
+
+DATABASES = {
+ "default": {
+ "ENGINE": "django.db.backends.sqlite3",
+ "NAME": ":memory:",
+ }
+}
+
+USE_TZ = True
+DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
+
+ROOT_URLCONF = "tests.urls"
diff --git a/tests/test_ams_type_from_info.py b/tests/test_ams_type_from_info.py
new file mode 100644
index 0000000..1b7b8da
--- /dev/null
+++ b/tests/test_ams_type_from_info.py
@@ -0,0 +1,25 @@
+import pytest
+
+from bambu_run.models import ams_type_from_info
+
+
+@pytest.mark.parametrize(
+ "info_code,expected",
+ [
+ # Real-world 8-char info codes captured from a live H2C with
+ # AMS 2 Pro (unit 0), AMS (unit 1), AMS HT (unit 128).
+ ("10001003", "AMS 2 Pro"),
+ ("10001001", "AMS"),
+ ("11002104", "AMS HT"),
+ # Bare 4-digit codes (original assumption) still resolve.
+ ("1001", "AMS"),
+ ("1003", "AMS 2 Pro"),
+ ("2104", "AMS HT"),
+ # Unknown/missing codes resolve to empty string, not an error.
+ ("99999999", ""),
+ ("", ""),
+ (None, ""),
+ ],
+)
+def test_ams_type_from_info(info_code, expected):
+ assert ams_type_from_info(info_code) == expected
diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py
new file mode 100644
index 0000000..81bdc9f
--- /dev/null
+++ b/tests/test_diagnostics.py
@@ -0,0 +1,68 @@
+import pytest
+
+from bambu_run.diagnostics import redact_diagnostics, build_diagnostics_report
+
+
+def test_redacts_password_and_token_like_keys():
+ data = {"BAMBU_PASSWORD": "hunter2", "access_token": "abc123", "ok": "fine"}
+
+ redacted = redact_diagnostics(data)
+
+ assert redacted["BAMBU_PASSWORD"] == "***REDACTED***"
+ assert redacted["access_token"] == "***REDACTED***"
+ assert redacted["ok"] == "fine"
+
+
+def test_masks_known_identifier_keys_partially():
+ data = {"dev_id": "31B8BP592601478", "tray_uuid": "EE37828FA8844DE1AB12"}
+
+ redacted = redact_diagnostics(data)
+
+ assert redacted["dev_id"] == "31B8...1478"
+ assert redacted["tray_uuid"] == "EE37...AB12"
+
+
+def test_short_identifier_values_fully_masked():
+ data = {"dev_id": "short"}
+
+ redacted = redact_diagnostics(data)
+
+ assert redacted["dev_id"] == "***"
+
+
+def test_recurses_into_nested_structures():
+ data = {"devices": [{"dev_id": "31B8BP592601478", "name": "RNL-H2C"}]}
+
+ redacted = redact_diagnostics(data)
+
+ assert redacted["devices"][0]["dev_id"] == "31B8...1478"
+ assert redacted["devices"][0]["name"] == "RNL-H2C"
+
+
+def test_no_redact_passthrough_keeps_original_values():
+ data = {"dev_id": "31B8BP592601478", "BAMBU_PASSWORD": "hunter2"}
+
+ result = redact_diagnostics(data, redact=False)
+
+ assert result == data
+
+
+def test_build_diagnostics_report_structure():
+ devices = [{"dev_id": "SERIAL-A", "name": "Printer A", "dev_product_name": "H2C"}]
+ raw_payloads = {"SERIAL-A": {"device": {"extruder": {"info": []}}}}
+
+ report = build_diagnostics_report(devices, raw_payloads)
+
+ assert report["device_count"] == 1
+ assert "generated_at" in report
+ assert report["devices"][0]["device_info"]["dev_id"] == "SERIAL-A"
+ assert report["devices"][0]["raw_mqtt_payload"] == {"device": {"extruder": {"info": []}}}
+
+
+def test_build_diagnostics_report_handles_missing_payload():
+ devices = [{"dev_id": "SERIAL-A", "name": "Printer A"}]
+
+ report = build_diagnostics_report(devices, raw_payloads={})
+
+ assert report["devices"][0]["raw_mqtt_payload"] is None
+ assert report["devices"][0]["note"] == "No MQTT data received within the listen window."
diff --git a/tests/test_filament_context.py b/tests/test_filament_context.py
new file mode 100644
index 0000000..c4f9cd0
--- /dev/null
+++ b/tests/test_filament_context.py
@@ -0,0 +1,180 @@
+import pytest
+from decimal import Decimal
+from django.urls import reverse
+from django.utils import timezone
+
+from bambu_run.models import Printer, PrinterMetrics, FilamentSnapshot
+
+
+@pytest.fixture
+def logged_in_client(client, django_user_model):
+ user = django_user_model.objects.create_user(username="tester", password="pw")
+ client.force_login(user)
+ return client
+
+
+@pytest.mark.django_db
+def test_dashboard_filaments_carry_ams_unit_info(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
+ type="PLA", remain_percent=80,
+ )
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
+ type="PA-CF", remain_percent=50,
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ filaments = resp.context["stats"]["filaments"]
+ assert len(filaments) == 2
+ units = {(f["ams_unit_id"], f["ams_type"]) for f in filaments}
+ assert units == {(0, "AMS"), (128, "AMS HT")}
+
+ ams_units = resp.context["stats"]["ams_units"]
+ assert ams_units == [
+ {"ams_unit_id": 0, "ams_type": "AMS"},
+ {"ams_unit_id": 128, "ams_type": "AMS HT"},
+ ]
+
+
+@pytest.mark.django_db
+def test_filament_timeline_keeps_same_tray_id_units_separate(logged_in_client):
+ from bambu_run.views import PrinterDashboardView
+
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
+ type="PLA", sub_type="PLA Basic", color="FF0000", remain_percent=80,
+ )
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
+ type="PLA", sub_type="PLA Basic", color="FF0000", remain_percent=50,
+ )
+
+ view = PrinterDashboardView()
+ timeline = view._prepare_filament_timeline(PrinterMetrics.objects.filter(pk=metric.pk))
+
+ assert len(timeline) == 2
+
+
+@pytest.mark.django_db
+def test_dashboard_renders_unit_pills_and_badges_with_multiple_units(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
+ type="PLA", color="FF0000FF", remain_percent=80,
+ )
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
+ type="PA-CF", color="00FF00FF", remain_percent=50,
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ assert resp.status_code == 200
+ html = resp.content.decode()
+ assert "ams-filter-pills" in html
+ assert "ams-badge-ams" in html
+ assert "ams-badge-ams-ht" in html
+ assert 'data-ams-unit-id="0"' in html
+ assert 'data-ams-unit-id="128"' in html
+
+
+@pytest.mark.django_db
+def test_dashboard_groups_filaments_by_ams_unit(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ metric = PrinterMetrics.objects.create(
+ device=printer, timestamp=timezone.now(),
+ ams_units=[
+ {"unit_id": "0", "ams_type": "AMS 2 Pro", "humidity": 5, "temp": 22.5},
+ {"unit_id": "128", "ams_type": "AMS HT", "humidity": 8, "temp": 60.0},
+ ],
+ )
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS 2 Pro",
+ type="ABS", remain_percent=80,
+ )
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=1, ams_unit_id=0, ams_type="AMS 2 Pro",
+ type="ABS", remain_percent=60,
+ )
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
+ type="PA-CF", remain_percent=50,
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ groups = resp.context["stats"]["ams_groups"]
+ assert len(groups) == 2
+
+ ams2pro_group, ht_group = groups
+ assert ams2pro_group["unit_id"] == 0
+ assert ams2pro_group["label"] == "AMS 2 Pro (Unit 0)"
+ assert ams2pro_group["humidity"] == 5
+ assert ams2pro_group["temp"] == 22.5
+ assert len(ams2pro_group["filaments"]) == 2
+
+ assert ht_group["unit_id"] == 128
+ assert ht_group["label"] == "AMS HT (Unit 128)"
+ assert ht_group["humidity"] == 8
+ assert len(ht_group["filaments"]) == 1
+
+
+@pytest.mark.django_db
+def test_dashboard_renders_wide_and_compact_panels(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ metric = PrinterMetrics.objects.create(
+ device=printer, timestamp=timezone.now(),
+ ams_units=[
+ {"unit_id": "0", "ams_type": "AMS 2 Pro", "humidity": 5, "temp": 22.5},
+ {"unit_id": "128", "ams_type": "AMS HT", "humidity": 8, "temp": 60.0},
+ ],
+ )
+ for tray_id in range(4):
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=tray_id, ams_unit_id=0, ams_type="AMS 2 Pro",
+ type="ABS", remain_percent=80,
+ )
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
+ type="PA-CF", remain_percent=50,
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ html = resp.content.decode()
+ assert "ams-group--wide" in html
+ assert "ams-group--compact" in html
+ assert "AMS 2 Pro (Unit 0)" in html
+ assert "AMS HT (Unit 128)" in html
+
+
+@pytest.mark.django_db
+def test_dashboard_hides_unit_pills_with_single_unit(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
+ FilamentSnapshot.objects.create(
+ printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
+ type="PLA", color="FF0000FF", remain_percent=80,
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ assert resp.status_code == 200
+ assert "ams-filter-pills" not in resp.content.decode()
diff --git a/tests/test_hotend_collection.py b/tests/test_hotend_collection.py
new file mode 100644
index 0000000..28ffda5
--- /dev/null
+++ b/tests/test_hotend_collection.py
@@ -0,0 +1,121 @@
+import pytest
+
+from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
+from bambu_run.models import Hotend, HotendSnapshot, PrinterMetrics
+
+
+class FakeClient:
+ """Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
+
+ def __init__(self, snapshots):
+ self._snapshots = snapshots
+ self._index = 0
+ self._client = None
+
+ def get_snapshot(self):
+ snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
+ self._index += 1
+ return snap
+
+
+def make_session(device_id, name, snapshots):
+ printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
+ return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
+
+
+def hotends_snapshot(used_time=11472, wear=100.0):
+ return {
+ "gcode_state": "IDLE",
+ "hotends": [
+ {
+ "raw_id": 21, "serial_number": "20D06A5B2918952", "nozzle_type": "HS01",
+ "diameter": 0.4, "fila_id": "GFA01", "color": "FFFFFF",
+ "used_time_seconds": used_time, "wear_percent": wear, "stat": 0,
+ "is_toolhead": False, "is_empty": False, "slot_number": 6,
+ },
+ {
+ "raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01",
+ "diameter": 0.4, "fila_id": "", "color": None,
+ "used_time_seconds": 0, "wear_percent": 0.0, "stat": 0,
+ "is_toolhead": False, "is_empty": True, "slot_number": None,
+ },
+ {
+ "raw_id": 0, "serial_number": "20D06A5C0426280", "nozzle_type": "HS01",
+ "diameter": 0.4, "fila_id": "GFA00", "color": "FEC600",
+ "used_time_seconds": 93490, "wear_percent": 100.0, "stat": 0,
+ "is_toolhead": True, "is_empty": False, "slot_number": None,
+ },
+ ],
+ }
+
+
+@pytest.mark.django_db
+def test_first_poll_creates_one_hotend_per_non_empty_entry():
+ session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()])
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session)
+
+ hotends = Hotend.objects.filter(printer=session.printer)
+ assert hotends.count() == 2 # empty bay (sn="N/A") skipped
+
+ rack = hotends.get(serial_number="20D06A5B2918952")
+ assert rack.raw_id == 21
+ assert rack.slot_number == 6
+ assert rack.is_toolhead is False
+ assert rack.used_time_seconds == 11472
+ assert rack.wear_percent == 100.0
+ assert rack.nozzle_type == "HS01"
+ assert rack.last_filament_profile_id == "GFA01"
+ assert rack.last_color == "FFFFFF"
+
+ toolhead = hotends.get(serial_number="20D06A5C0426280")
+ assert toolhead.is_toolhead is True
+ assert toolhead.slot_number is None
+
+
+@pytest.mark.django_db
+def test_first_poll_creates_one_snapshot_per_non_empty_hotend():
+ session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()])
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session)
+
+ metric = PrinterMetrics.objects.get(device=session.printer)
+ assert HotendSnapshot.objects.filter(printer_metric=metric).count() == 2
+
+
+@pytest.mark.django_db
+def test_collector_persists_raw_nozzle_info_including_non_inductive_entries():
+ session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()])
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session)
+
+ metric = PrinterMetrics.objects.get(device=session.printer)
+ assert len(metric.nozzle_info) == 3 # all entries, including the empty/non-inductive one
+ serials = {h["serial_number"] for h in metric.nozzle_info}
+ assert serials == {"20D06A5B2918952", "N/A", "20D06A5C0426280"}
+
+
+@pytest.mark.django_db
+def test_second_poll_updates_existing_hotend_instead_of_duplicating():
+ session = make_session(
+ "SERIAL-A", "Printer A",
+ [hotends_snapshot(used_time=11472, wear=100.0), hotends_snapshot(used_time=11500, wear=100.0)],
+ )
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session)
+ cmd._collect_printer_data(session)
+
+ hotends = Hotend.objects.filter(printer=session.printer, serial_number="20D06A5B2918952")
+ assert hotends.count() == 1
+ assert hotends.first().used_time_seconds == 11500
+
+ snapshots = HotendSnapshot.objects.filter(hotend=hotends.first())
+ assert snapshots.count() == 2
diff --git a/tests/test_hotend_dashboard.py b/tests/test_hotend_dashboard.py
new file mode 100644
index 0000000..3043721
--- /dev/null
+++ b/tests/test_hotend_dashboard.py
@@ -0,0 +1,128 @@
+import pytest
+from django.urls import reverse
+from django.utils import timezone
+
+from bambu_run.models import Printer, PrinterMetrics, Hotend
+
+
+@pytest.fixture
+def logged_in_client(client, django_user_model):
+ user = django_user_model.objects.create_user(username="tester", password="pw")
+ client.force_login(user)
+ return client
+
+
+@pytest.mark.django_db
+def test_dashboard_context_includes_hotends_toolhead_first(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
+
+ Hotend.objects.create(
+ printer=printer, serial_number="RACK-SN", raw_id=16, slot_number=1,
+ is_toolhead=False, nozzle_type="HS01", used_time_seconds=3600, wear_percent=50,
+ )
+ Hotend.objects.create(
+ printer=printer, serial_number="TOOLHEAD-SN", raw_id=0, slot_number=None,
+ is_toolhead=True, nozzle_type="HS01", used_time_seconds=7200, wear_percent=80,
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ hotends = resp.context["stats"]["hotends"]
+ assert len(hotends) == 2
+ assert hotends[0].serial_number == "TOOLHEAD-SN"
+ assert hotends[1].serial_number == "RACK-SN"
+
+
+@pytest.mark.django_db
+def test_dashboard_context_includes_non_inductive_nozzle_positions(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ PrinterMetrics.objects.create(
+ device=printer, timestamp=timezone.now(),
+ nozzle_info=[
+ {
+ "raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01", "diameter": 0.4,
+ "fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0,
+ "stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None,
+ },
+ ],
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ positions = resp.context["stats"]["nozzle_positions"]
+ assert len(positions) == 1
+ assert positions[0]["nozzle_type"] == "HS01"
+
+
+@pytest.mark.django_db
+def test_dashboard_omits_nozzle_positions_with_no_readable_data(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ PrinterMetrics.objects.create(
+ device=printer, timestamp=timezone.now(),
+ nozzle_info=[
+ {
+ "raw_id": 1, "serial_number": "N/A", "nozzle_type": "", "diameter": 0,
+ "fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0,
+ "stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None,
+ },
+ ],
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ assert resp.context["stats"]["nozzle_positions"] == []
+ assert "Hotends " not in resp.content.decode()
+
+
+@pytest.mark.django_db
+def test_dashboard_renders_nozzle_position_without_serial_or_wear(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ PrinterMetrics.objects.create(
+ device=printer, timestamp=timezone.now(),
+ nozzle_info=[
+ {
+ "raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01", "diameter": 0.4,
+ "fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0,
+ "stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None,
+ },
+ ],
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ html = resp.content.decode()
+ assert "Hotends" in html
+ assert "HS01" in html
+ assert "SN: N/A" not in html
+ assert "SN N/A" not in html
+
+
+@pytest.mark.django_db
+def test_dashboard_renders_hotends_card(logged_in_client):
+ printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
+
+ Hotend.objects.create(
+ printer=printer, serial_number="RACK-SN", raw_id=18, slot_number=3,
+ is_toolhead=False, nozzle_type="HS01", diameter=0.4,
+ used_time_seconds=3661, wear_percent=50, last_filament_profile_id="GFA01",
+ last_color="DE4343",
+ )
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
+ )
+
+ html = resp.content.decode()
+ assert "Hotends" in html
+ assert "RACK-SN" in html
+ assert "Slot 3" in html
diff --git a/tests/test_hotend_parsing.py b/tests/test_hotend_parsing.py
new file mode 100644
index 0000000..e837385
--- /dev/null
+++ b/tests/test_hotend_parsing.py
@@ -0,0 +1,100 @@
+from bambu_run.mqtt_client import PrinterState
+
+
+def real_nozzle_payload():
+ """Real captured device.nozzle payload from a live H2C with a Vortek rack
+ (1x AMS, 1x AMS 2 Pro, 1x AMS HT physically connected — unrelated here).
+ SN/used-time cross-checked against the user's Bambu Studio Hotends Info table."""
+ return {
+ "exist": 3997699,
+ "src_id": 17,
+ "tar_id": 17,
+ "state": 0,
+ "info": [
+ {"id": 21, "sn": "20D06A5B2918952", "type": "HS01", "diameter": 0.4,
+ "fila_id": "GFA01", "color_m": "FFFFFFFF", "p_t": 11472, "wear": 128.0, "stat": 0, "tm": 350},
+ {"id": 1, "sn": "N/A", "type": "HS01", "diameter": 0.4,
+ "fila_id": "", "color_m": "00000000", "p_t": 0, "wear": 0.0, "stat": 0, "tm": 0},
+ {"id": 16, "sn": "20D06A5B2919219", "type": "HS01", "diameter": 0.4,
+ "fila_id": "GFA01", "color_m": "A3D8E1FF", "p_t": 105386, "wear": 128.0, "stat": 0, "tm": 350},
+ {"id": 20, "sn": "20D06A590610257", "type": "HS01", "diameter": 0.4,
+ "fila_id": "GFG01", "color_m": "00000000", "p_t": 81506, "wear": 128.0, "stat": 0, "tm": 350},
+ {"id": 18, "sn": "20D06A591506263", "type": "HS01", "diameter": 0.4,
+ "fila_id": "GFA01", "color_m": "DE4343FF", "p_t": 30962, "wear": 128.0, "stat": 0, "tm": 350},
+ {"id": 0, "sn": "20D06A5C0426280", "type": "HS01", "diameter": 0.4,
+ "fila_id": "GFA00", "color_m": "FEC600FF", "p_t": 93490, "wear": 128.0, "stat": 0, "tm": 350},
+ {"id": 19, "sn": "20D06A5C0207881", "type": "HS01", "diameter": 0.4,
+ "fila_id": "GFA01", "color_m": "DE4343FF", "p_t": 1430, "wear": 128.0, "stat": 0, "tm": 350},
+ ],
+ }
+
+
+def make_data(nozzle_payload):
+ return {"print": {"gcode_state": "IDLE", "device": {"nozzle": nozzle_payload}}}
+
+
+def test_snapshot_includes_one_hotend_per_nozzle_info_entry():
+ state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
+ snapshot = state.get_snapshot()
+
+ assert len(snapshot["hotends"]) == 7
+
+
+def test_hotend_fields_extracted_correctly():
+ state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
+ snapshot = state.get_snapshot()
+
+ by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
+ h = by_sn["20D06A5B2919219"]
+
+ assert h["raw_id"] == 16
+ assert h["nozzle_type"] == "HS01"
+ assert h["diameter"] == 0.4
+ assert h["fila_id"] == "GFA01"
+ assert h["color"] == "A3D8E1" # alpha stripped
+ assert h["used_time_seconds"] == 105386
+ assert h["wear_percent"] == 100.0 # 128/128*100
+ assert h["is_empty"] is False
+
+
+def test_id_zero_is_toolhead_and_resolves_slot_number():
+ state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
+ snapshot = state.get_snapshot()
+
+ by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
+ toolhead = by_sn["20D06A5C0426280"]
+
+ assert toolhead["raw_id"] == 0
+ assert toolhead["is_toolhead"] is True
+ assert toolhead["slot_number"] is None # true bay address hidden while id==0 sentinel
+
+
+def test_rack_bay_ids_resolve_to_slot_numbers_one_through_six():
+ state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
+ snapshot = state.get_snapshot()
+
+ by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
+
+ assert by_sn["20D06A5B2919219"]["slot_number"] == 1 # raw_id 16
+ assert by_sn["20D06A591506263"]["slot_number"] == 3 # raw_id 18
+ assert by_sn["20D06A5C0207881"]["slot_number"] == 4 # raw_id 19
+ assert by_sn["20D06A590610257"]["slot_number"] == 5 # raw_id 20
+ assert by_sn["20D06A5B2918952"]["slot_number"] == 6 # raw_id 21
+
+
+def test_empty_bay_with_na_serial_is_flagged_empty():
+ state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
+ snapshot = state.get_snapshot()
+
+ by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
+ empty = by_sn["N/A"]
+
+ assert empty["is_empty"] is True
+ assert empty["is_toolhead"] is False
+
+
+def test_snapshot_hotends_empty_list_when_no_nozzle_payload():
+ state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}})
+ snapshot = state.get_snapshot()
+
+ assert snapshot["hotends"] == []
diff --git a/tests/test_multi_ams_collection.py b/tests/test_multi_ams_collection.py
new file mode 100644
index 0000000..df366ae
--- /dev/null
+++ b/tests/test_multi_ams_collection.py
@@ -0,0 +1,104 @@
+import pytest
+
+from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
+from bambu_run.models import Filament, FilamentSnapshot, FilamentUsage, PrinterMetrics
+
+
+class FakeClient:
+ """Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
+
+ def __init__(self, snapshots):
+ self._snapshots = snapshots
+ self._index = 0
+ self._client = None
+
+ def get_snapshot(self):
+ snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
+ self._index += 1
+ return snap
+
+
+def make_session(device_id, name, snapshots):
+ printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
+ return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
+
+
+def two_unit_tray0_snapshot():
+ """Two AMS units (AMS unit_id=0, AMS HT unit_id=128) both report tray_id=0,
+ with different filament types loaded — these must not collide."""
+ return {
+ "gcode_state": "IDLE",
+ "ams_units": [
+ {"unit_id": "0", "ams_type": "AMS", "humidity": 30, "temp": 25.0},
+ {"unit_id": "128", "ams_type": "AMS HT", "humidity": 20, "temp": 60.0},
+ ],
+ "filaments": [
+ {
+ "tray_id": 0, "type": "PLA", "sub_type": "PLA Basic", "color": "FF0000FF",
+ "tray_uuid": "UUID-UNIT0-TRAY0",
+ "remain_percent": 80, "ams_unit_id": 0, "ams_type": "AMS",
+ },
+ {
+ "tray_id": 0, "type": "PA-CF", "sub_type": "PA6-CF", "color": "00FF00FF",
+ "tray_uuid": "UUID-UNIT128-TRAY0",
+ "remain_percent": 50, "ams_unit_id": 128, "ams_type": "AMS HT",
+ },
+ ],
+ }
+
+
+@pytest.mark.django_db
+def test_two_ams_units_with_same_tray_id_create_distinct_snapshots():
+ session = make_session("SERIAL-A", "Printer A", [two_unit_tray0_snapshot()])
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session)
+
+ metric = PrinterMetrics.objects.get(device=session.printer)
+ snapshots = FilamentSnapshot.objects.filter(printer_metric=metric).order_by("ams_unit_id")
+
+ assert snapshots.count() == 2
+
+ ams_snap, ht_snap = snapshots
+ assert ams_snap.tray_id == 0
+ assert ams_snap.ams_unit_id == 0
+ assert ams_snap.ams_type == "AMS"
+ assert ams_snap.type == "PLA"
+
+ assert ht_snap.tray_id == 0
+ assert ht_snap.ams_unit_id == 128
+ assert ht_snap.ams_type == "AMS HT"
+ assert ht_snap.type == "PA-CF"
+
+
+@pytest.mark.django_db
+def test_filament_usage_matches_correct_unit_when_tray_ids_collide():
+ start_snapshot = two_unit_tray0_snapshot()
+ start_snapshot.update({"gcode_state": "RUNNING", "subtask_name": "job_1", "print_percent": 1, "tray_now": "0"})
+
+ end_snapshot = two_unit_tray0_snapshot()
+ end_snapshot["filaments"][0]["remain_percent"] = 70 # AMS unit 0 consumed
+ end_snapshot["filaments"][1]["remain_percent"] = 50 # AMS HT unit 128 untouched
+ end_snapshot.update({"gcode_state": "FINISH", "subtask_name": "job_1", "print_percent": 100})
+
+ session = make_session("SERIAL-A", "Printer A", [start_snapshot, end_snapshot])
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session)
+ cmd._collect_printer_data(session)
+
+ usages = FilamentUsage.objects.filter(print_job__device=session.printer).order_by("ams_unit_id")
+ # Both units reported tray_id=0 with a tracked filament loaded throughout the
+ # job — usage is recorded per physical unit, not collapsed into one ambiguous row.
+ assert usages.count() == 2
+
+ ams_usage, ht_usage = usages
+ assert ams_usage.ams_unit_id == 0
+ assert ams_usage.starting_percent == 80
+ assert ams_usage.ending_percent == 70
+
+ assert ht_usage.ams_unit_id == 128
+ assert ht_usage.starting_percent == 50
+ assert ht_usage.ending_percent == 50
diff --git a/tests/test_multi_device_collection.py b/tests/test_multi_device_collection.py
new file mode 100644
index 0000000..9118984
--- /dev/null
+++ b/tests/test_multi_device_collection.py
@@ -0,0 +1,90 @@
+import pytest
+
+from bambu_run.management.commands.bambu_collector import (
+ Command,
+ DeviceSession,
+ resolve_printer_device,
+)
+from bambu_run.models import PrintJob, PrinterMetrics
+
+
+class FakeClient:
+ """Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
+
+ def __init__(self, snapshots):
+ self._snapshots = snapshots
+ self._index = 0
+ self._client = None # cloud BambuClient handle used by cloud task sync
+
+ def get_snapshot(self):
+ snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
+ self._index += 1
+ return snap
+
+
+def make_session(device_id, name, snapshots):
+ printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
+ return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
+
+
+@pytest.mark.django_db
+def test_collects_metrics_against_the_correct_printer_per_session():
+ session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
+ session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session_a)
+ cmd._collect_printer_data(session_b)
+
+ metric_a = PrinterMetrics.objects.get(device=session_a.printer)
+ metric_b = PrinterMetrics.objects.get(device=session_b.printer)
+ assert metric_a.nozzle_temp == 200
+ assert metric_b.nozzle_temp == 210
+
+
+@pytest.mark.django_db
+def test_print_job_tracking_is_isolated_per_session():
+ session_a = make_session(
+ "SERIAL-A",
+ "Printer A",
+ [
+ {"gcode_state": "RUNNING", "subtask_name": "job_A", "print_percent": 10},
+ {"gcode_state": "FINISH", "subtask_name": "job_A", "print_percent": 100},
+ ],
+ )
+ session_b = make_session("SERIAL-B", "Printer B", [{"gcode_state": "IDLE"}])
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session_a)
+ cmd._collect_printer_data(session_b)
+ cmd._collect_printer_data(session_a)
+
+ assert PrintJob.objects.filter(device=session_a.printer).count() == 1
+ job = PrintJob.objects.get(device=session_a.printer)
+ assert job.final_status == "FINISH"
+ assert session_a.current_print_job is None
+
+ assert PrintJob.objects.filter(device=session_b.printer).count() == 0
+ assert session_b.current_print_job is None
+
+
+@pytest.mark.django_db
+def test_one_session_error_does_not_affect_another_session():
+ session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
+ session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
+
+ class ExplodingClient:
+ def get_snapshot(self):
+ raise RuntimeError("MQTT connection lost")
+
+ session_a.client = ExplodingClient()
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session_a)
+ cmd._collect_printer_data(session_b)
+
+ assert session_a.error_count == 1
+ assert PrinterMetrics.objects.filter(device=session_b.printer).exists()
diff --git a/tests/test_printer_routing.py b/tests/test_printer_routing.py
new file mode 100644
index 0000000..b762675
--- /dev/null
+++ b/tests/test_printer_routing.py
@@ -0,0 +1,78 @@
+import pytest
+from django.urls import reverse
+
+from bambu_run.models import Printer
+
+
+@pytest.fixture
+def logged_in_client(client, django_user_model):
+ user = django_user_model.objects.create_user(username="tester", password="pw")
+ client.force_login(user)
+ return client
+
+
+@pytest.mark.django_db
+def test_dashboard_with_no_printers_shows_error(logged_in_client):
+ resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
+ assert resp.status_code == 200
+ assert "error" in resp.context
+
+
+@pytest.mark.django_db
+def test_dashboard_defaults_to_first_active_printer(logged_in_client):
+ printer = Printer.objects.create(name="Only Printer", model="H2C", is_active=True)
+
+ resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
+
+ assert resp.context["printer_device"].pk == printer.pk
+ assert list(resp.context["all_printers"]) == [printer]
+ # Switcher shows even with a single printer, as a hint that multi-printer exists.
+ assert resp.context["show_printer_switcher"] is True
+
+
+@pytest.mark.django_db
+def test_dashboard_hides_switcher_with_zero_printers(logged_in_client):
+ resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
+
+ assert resp.context["show_printer_switcher"] is False
+
+
+@pytest.mark.django_db
+def test_dashboard_pk_route_shows_requested_printer(logged_in_client):
+ Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": printer_b.pk})
+ )
+
+ assert resp.context["printer_device"].pk == printer_b.pk
+ assert resp.context["device_name"] == "Printer B"
+
+
+@pytest.mark.django_db
+def test_dashboard_unknown_pk_returns_404(logged_in_client):
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_dashboard", kwargs={"pk": 99999})
+ )
+ assert resp.status_code == 404
+
+
+@pytest.mark.django_db
+def test_api_pk_route_returns_only_requested_printer_data(logged_in_client):
+ from bambu_run.models import PrinterMetrics
+ from django.utils import timezone
+ from decimal import Decimal
+
+ printer_a = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
+ printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
+ PrinterMetrics.objects.create(device=printer_a, timestamp=timezone.now(), nozzle_temp=Decimal("200"))
+ PrinterMetrics.objects.create(device=printer_b, timestamp=timezone.now(), nozzle_temp=Decimal("210"))
+
+ resp = logged_in_client.get(
+ reverse("bambu_run:printer_api", kwargs={"pk": printer_b.pk})
+ )
+
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["nozzle_temp"] == [210.0]
diff --git a/tests/test_resolve_printer_device.py b/tests/test_resolve_printer_device.py
new file mode 100644
index 0000000..8fd5651
--- /dev/null
+++ b/tests/test_resolve_printer_device.py
@@ -0,0 +1,78 @@
+import pytest
+
+from bambu_run.management.commands.bambu_collector import resolve_printer_device
+from bambu_run.models import Printer
+
+
+@pytest.mark.django_db
+def test_creates_new_printer_keyed_by_serial():
+ printer = resolve_printer_device(
+ "0309DA123456", {"name": "RNL-H2C", "dev_product_name": "H2C"}
+ )
+
+ assert printer.serial_number == "0309DA123456"
+ assert printer.name == "RNL-H2C"
+ assert printer.model == "H2C"
+ assert printer.is_active is True
+
+
+@pytest.mark.django_db
+def test_second_call_with_same_serial_does_not_create_duplicate():
+ first = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
+ second = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
+
+ assert first.pk == second.pk
+ assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1
+
+
+@pytest.mark.django_db
+def test_two_different_serials_create_two_printers():
+ a = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
+ b = resolve_printer_device("SERIAL-B", {"name": "Printer B", "dev_product_name": "X1C"})
+
+ assert a.pk != b.pk
+ assert Printer.objects.count() == 2
+
+
+@pytest.mark.django_db
+def test_backfills_single_legacy_printer_with_null_serial():
+ legacy = Printer.objects.create(
+ name="Bambu Lab Printer", model="Bambu Lab", manufacturer="Bambu Lab", is_active=True
+ )
+
+ resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
+
+ legacy.refresh_from_db()
+ assert resolved.pk == legacy.pk
+ assert legacy.serial_number == "SERIAL-A"
+ assert Printer.objects.count() == 1
+
+
+@pytest.mark.django_db
+def test_does_not_guess_when_multiple_legacy_printers_exist():
+ Printer.objects.create(name="Legacy 1", model="Bambu Lab")
+ Printer.objects.create(name="Legacy 2", model="Bambu Lab")
+
+ resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
+
+ assert resolved.serial_number == "SERIAL-A"
+ assert Printer.objects.count() == 3
+
+
+@pytest.mark.django_db
+def test_falls_back_to_generic_defaults_without_device_info():
+ printer = resolve_printer_device("SERIAL-A", None)
+
+ assert printer.serial_number == "SERIAL-A"
+ assert printer.name == "Bambu Lab Printer"
+ assert printer.model == "Bambu Lab"
+
+
+@pytest.mark.django_db
+def test_updates_name_and_model_on_existing_printer_when_changed():
+ resolve_printer_device("SERIAL-A", {"name": "Old Name", "dev_product_name": "H2C"})
+
+ updated = resolve_printer_device("SERIAL-A", {"name": "New Name", "dev_product_name": "H2C"})
+
+ assert updated.name == "New Name"
+ assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1
diff --git a/tests/test_vortek_groundwork.py b/tests/test_vortek_groundwork.py
new file mode 100644
index 0000000..3b2b604
--- /dev/null
+++ b/tests/test_vortek_groundwork.py
@@ -0,0 +1,43 @@
+import pytest
+
+from bambu_run.mqtt_client import PrinterState
+from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
+from bambu_run.models import PrinterMetrics
+
+
+def test_snapshot_includes_raw_device_payload_for_future_vortek_modeling():
+ raw_device = {
+ "extruder": {"info": [{"id": 0, "temp": 12058720}, {"id": 1, "temp": 11534560}]},
+ "nozzle": {"info": [{"id": 0, "diameter": 0.4}]},
+ }
+ data = {"print": {"device": raw_device, "gcode_state": "IDLE"}}
+
+ state = PrinterState.from_mqtt_data(data)
+ snapshot = state.get_snapshot()
+
+ assert snapshot["vortek_raw"] == raw_device
+
+
+def test_snapshot_vortek_raw_defaults_to_empty_dict_when_no_device_payload():
+ state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}})
+ snapshot = state.get_snapshot()
+
+ assert snapshot["vortek_raw"] == {}
+
+
+@pytest.mark.django_db
+def test_collector_persists_vortek_raw_onto_printer_metrics():
+ printer = resolve_printer_device("SERIAL-A", {"name": "H2C", "dev_product_name": "H2C"})
+
+ class FakeClient:
+ def get_snapshot(self):
+ return {"gcode_state": "IDLE", "vortek_raw": {"extruder": {"info": []}}}
+
+ session = DeviceSession(device_id="SERIAL-A", client=FakeClient(), printer=printer)
+
+ cmd = Command()
+ cmd.verbose = False
+ cmd._collect_printer_data(session)
+
+ metric = PrinterMetrics.objects.get(device=printer)
+ assert metric.vortek_raw == {"extruder": {"info": []}}
diff --git a/tests/urls.py b/tests/urls.py
new file mode 100644
index 0000000..82233a5
--- /dev/null
+++ b/tests/urls.py
@@ -0,0 +1,5 @@
+from django.urls import include, path
+
+urlpatterns = [
+ path("", include("bambu_run.urls")),
+]