From 146d5af7aa2089df774955b3a5d80b198eecad98 Mon Sep 17 00:00:00 2001 From: RunLit <41996199+RunLit@users.noreply.github.com> Date: Wed, 24 Jun 2026 23:14:32 +1000 Subject: [PATCH] Feature/multi printer support (#12) * Initial implementation of multi-printer support. * Always show device dropdown and add bambu_diagnose for multi-printer troubleshooting. * Add multi-AMS support: per-unit snapshot/usage tracking, grouped dashboard panels with real type labels, and dual-nozzle card UX fixes. Fixes a real-world AMS info-code parsing bug found by inspecting live H2C data. * Add Vortek hotend rack tracking: per-SN registry with slot mapping confirmed against live MQTT capture, plus a fallback for non-inductive nozzles (e.g. H2C's fixed left nozzle) shown read-only without fabricated identity. New dashboard card hides entirely on printers with no Vortek/nozzle-info data at all. --- bambu_run/admin.py | 17 +- bambu_run/diagnostics.py | 78 ++++ .../management/commands/bambu_collector.py | 432 +++++++++++------- .../management/commands/bambu_diagnose.py | 128 ++++++ .../0005_printermetrics_vortek_raw.py | 22 + ...alter_filamentsnapshot_options_and_more.py | 63 +++ .../migrations/0007_hotend_hotendsnapshot.py | 172 +++++++ .../0008_printermetrics_nozzle_info.py | 22 + bambu_run/models.py | 149 +++++- bambu_run/mqtt_client.py | 82 ++++ bambu_run/static/bambu_run/css/dashboard.css | 90 ++++ .../static/bambu_run/js/printer_charts.js | 21 +- .../bambu_run/printer_dashboard.html | 210 +++++++-- bambu_run/urls.py | 2 + bambu_run/views.py | 92 +++- pyproject.toml | 4 + tests/__init__.py | 0 tests/settings.py | 40 ++ tests/test_ams_type_from_info.py | 25 + tests/test_diagnostics.py | 68 +++ tests/test_filament_context.py | 180 ++++++++ tests/test_hotend_collection.py | 121 +++++ tests/test_hotend_dashboard.py | 128 ++++++ tests/test_hotend_parsing.py | 100 ++++ tests/test_multi_ams_collection.py | 104 +++++ tests/test_multi_device_collection.py | 90 ++++ tests/test_printer_routing.py | 78 ++++ tests/test_resolve_printer_device.py | 78 ++++ tests/test_vortek_groundwork.py | 43 ++ tests/urls.py | 5 + 30 files changed, 2426 insertions(+), 218 deletions(-) create mode 100644 bambu_run/diagnostics.py create mode 100644 bambu_run/management/commands/bambu_diagnose.py create mode 100644 bambu_run/migrations/0005_printermetrics_vortek_raw.py create mode 100644 bambu_run/migrations/0006_alter_filamentsnapshot_options_and_more.py create mode 100644 bambu_run/migrations/0007_hotend_hotendsnapshot.py create mode 100644 bambu_run/migrations/0008_printermetrics_nozzle_info.py create mode 100644 tests/__init__.py create mode 100644 tests/settings.py create mode 100644 tests/test_ams_type_from_info.py create mode 100644 tests/test_diagnostics.py create mode 100644 tests/test_filament_context.py create mode 100644 tests/test_hotend_collection.py create mode 100644 tests/test_hotend_dashboard.py create mode 100644 tests/test_hotend_parsing.py create mode 100644 tests/test_multi_ams_collection.py create mode 100644 tests/test_multi_device_collection.py create mode 100644 tests/test_printer_routing.py create mode 100644 tests/test_resolve_printer_device.py create mode 100644 tests/test_vortek_groundwork.py create mode 100644 tests/urls.py diff --git a/bambu_run/admin.py b/bambu_run/admin.py index 03d7989..4873f13 100644 --- a/bambu_run/admin.py +++ b/bambu_run/admin.py @@ -1,5 +1,5 @@ from django.contrib import admin -from .models import Printer, PrinterMetrics, Filament, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, BambuCloudTask +from .models import Printer, PrinterMetrics, Filament, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, BambuCloudTask, Hotend, HotendSnapshot @admin.register(Printer) @@ -107,6 +107,21 @@ class FilamentUsageAdmin(admin.ModelAdmin): readonly_fields = ('consumed_percent', 'consumed_grams') +@admin.register(Hotend) +class HotendAdmin(admin.ModelAdmin): + list_display = ('printer', 'serial_number', 'nozzle_type', 'is_toolhead', 'slot_number', 'used_time_seconds', 'wear_percent', 'last_seen_at') + list_filter = ('printer', 'is_toolhead', 'nozzle_type') + search_fields = ('serial_number',) + readonly_fields = ('last_seen_at', 'created_at') + + +@admin.register(HotendSnapshot) +class HotendSnapshotAdmin(admin.ModelAdmin): + list_display = ('printer_metric', 'hotend', 'raw_id', 'used_time_seconds', 'wear_percent', 'timestamp') + list_filter = ('hotend',) + readonly_fields = ('printer_metric', 'hotend', 'raw_id', 'used_time_seconds', 'wear_percent', 'stat', 'timestamp') + + @admin.register(BambuCloudTask) class BambuCloudTaskAdmin(admin.ModelAdmin): list_display = ('task_id', 'design_title', 'plate_title', 'device_serial', 'cloud_status', 'weight_grams', 'cloud_start_time', 'synced_at') diff --git a/bambu_run/diagnostics.py b/bambu_run/diagnostics.py new file mode 100644 index 0000000..266c2c7 --- /dev/null +++ b/bambu_run/diagnostics.py @@ -0,0 +1,78 @@ +""" +Pure helpers for the `bambu_diagnose` management command. + +Kept separate from the command itself (and free of Django/network imports) +so the report-building and redaction logic can be unit-tested without +talking to the real Bambu Lab cloud or MQTT broker. +""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +# Keys whose values are always replaced outright, regardless of nesting depth. +_SECRET_KEY_SUBSTRINGS = ("password", "token", "secret", "access_code", "authorization") + +# Keys that identify a specific physical device/spool/account — not secret, +# but identifying, so they're partially masked by default before anything +# gets pasted into a public GitHub issue. +_IDENTIFIER_KEYS = {"dev_id", "device_id", "serial_number", "tray_uuid", "tag_uid", "uid"} + + +def _mask_identifier(value: Any) -> Any: + if not isinstance(value, str) or len(value) <= 8: + return "***" + return f"{value[:4]}...{value[-4:]}" + + +def redact_diagnostics(data: Any, redact: bool = True) -> Any: + """Recursively redact secrets and mask identifiers in a diagnostics payload. + + `redact=False` returns the data unchanged — only for the reporter's own + local debugging, never for anything posted publicly. + """ + if not redact: + return data + return _redact(data) + + +def _redact(obj: Any) -> Any: + if isinstance(obj, dict): + result = {} + for key, value in obj.items(): + lowered = key.lower() + if any(secret in lowered for secret in _SECRET_KEY_SUBSTRINGS): + result[key] = "***REDACTED***" + elif lowered in _IDENTIFIER_KEYS: + result[key] = _mask_identifier(value) + else: + result[key] = _redact(value) + return result + if isinstance(obj, list): + return [_redact(item) for item in obj] + return obj + + +def build_diagnostics_report( + devices: List[Dict[str, Any]], + raw_payloads: Dict[str, Optional[Dict[str, Any]]], +) -> Dict[str, Any]: + """Assemble the (pre-redaction) diagnostics report from discovered devices + and whatever raw MQTT payload was captured for each during the listen window. + """ + device_entries = [] + for device in devices: + dev_id = device.get("dev_id") + payload = raw_payloads.get(dev_id) + entry = { + "device_info": device, + "raw_mqtt_payload": payload, + } + if payload is None: + entry["note"] = "No MQTT data received within the listen window." + device_entries.append(entry) + + return { + "generated_at": datetime.now(timezone.utc).isoformat(), + "device_count": len(devices), + "devices": device_entries, + } diff --git a/bambu_run/management/commands/bambu_collector.py b/bambu_run/management/commands/bambu_collector.py index 1b7f462..ce1bcdd 100644 --- a/bambu_run/management/commands/bambu_collector.py +++ b/bambu_run/management/commands/bambu_collector.py @@ -13,8 +13,9 @@ import logging import os import ssl import time +from dataclasses import dataclass, field from decimal import Decimal -from typing import Optional +from typing import Any, Dict, Optional from django.core.management.base import BaseCommand, CommandError from django.db import transaction @@ -26,6 +27,56 @@ from bambu_run.models import Printer, PrinterMetrics logger = logging.getLogger("bambu_run.collector") +def resolve_printer_device(device_id: str, device_info: Optional[dict] = None) -> Printer: + """Find-or-create the Printer row for a Bambu cloud device, keyed by serial number. + + `device_info` is one entry from BambuClient.get_devices() (keys: name, + dev_product_name, dev_id, ...). Falls back to generic defaults when unavailable + (e.g. local-only connections that never call get_devices()). + """ + device_info = device_info or {} + name = device_info.get("name") or "Bambu Lab Printer" + model = device_info.get("dev_product_name") or "Bambu Lab" + + printer = Printer.objects.filter(serial_number=device_id).first() + + if printer is None: + # Upgrade path: a pre-multi-printer deployment has exactly one Printer row + # with no serial number yet. Backfill it instead of creating a duplicate. + # If there's more than one such row, we can't tell which one this device + # used to be, so don't guess — create a fresh row instead. + legacy_candidates = list(Printer.objects.filter(serial_number__isnull=True)[:2]) + if len(legacy_candidates) == 1: + printer = legacy_candidates[0] + printer.serial_number = device_id + + if printer is None: + printer = Printer(serial_number=device_id) + + printer.name = name + printer.model = model + printer.manufacturer = "Bambu Lab" + printer.is_active = True + printer.save() + return printer + + +@dataclass +class DeviceSession: + """Per-printer mutable state for one bound device in a multi-printer collector run.""" + + device_id: str + client: Any # BambuPrinter + printer: Printer + current_print_job: Optional[Any] = None + last_gcode_state: Optional[str] = None + last_subtask_name: Optional[str] = None + trays_used: set = field(default_factory=set) + error_count: int = 0 + success_count: int = 0 + mqtt_connect_errors: int = 0 + + class Command(BaseCommand): """ MQTT Poll -> PrinterMetrics -> FilamentSnapshot -> Auto-Match -> Update Filament @@ -51,18 +102,11 @@ class Command(BaseCommand): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.printer_client = None - self.printer_device = None + self.sessions: Dict[str, DeviceSession] = {} + self._token: Optional[str] = None self.verbose = False self.disable_ssl_verify = False - self.error_count = 0 - self.success_count = 0 - self.mqtt_connect_errors = 0 self.start_time = None - self.current_print_job = None - self.last_gcode_state = None - self.last_subtask_name = None - self.trays_used = set() def handle(self, *args, **options): self.verbose = options["verbose"] @@ -100,12 +144,13 @@ class Command(BaseCommand): self._configure_logging() try: - self._initialize_printer() + self._initialize_printers() except Exception as e: raise CommandError(f"Initialization failed: {e}") self.start_time = timezone.now() - logger.info(f"Bambu Run data collector started for printer: {self.printer_device.name}") + printer_names = ", ".join(s.printer.name for s in self.sessions.values()) + logger.info(f"Bambu Run data collector started for {len(self.sessions)} printer(s): {printer_names}") logger.info(f"Collection interval: {interval} seconds") logger.info(f"Mode: {'Single run' if run_once else 'Continuous'}") @@ -113,7 +158,8 @@ class Command(BaseCommand): if run_once: import time as _time _time.sleep(5) - self._collect_printer_data() + for session in self.sessions.values(): + self._collect_printer_data(session) logger.info("Single collection completed successfully") else: self._run_continuous_loop(interval) @@ -124,7 +170,7 @@ class Command(BaseCommand): logger.exception(f"Fatal error in main loop: {e}") raise CommandError(f"Runner failed: {e}") - def _request_full_status_when_ready(self, timeout: float = 20.0) -> None: + def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None: """Send pushall once the MQTT broker connection is confirmed. BambuPrinter._connected is set True immediately after connect(blocking=False), @@ -134,9 +180,9 @@ class Command(BaseCommand): import time as _time deadline = _time.time() + timeout while _time.time() < deadline: - mqtt_client = getattr(self.printer_client, "_mqtt", None) + mqtt_client = getattr(client, "_mqtt", None) if mqtt_client is not None and getattr(mqtt_client, "connected", False): - self.printer_client._mqtt.request_full_status() + client._mqtt.request_full_status() logger.info("Sent MQTT pushall request") return _time.sleep(0.5) @@ -155,7 +201,9 @@ class Command(BaseCommand): handler.setFormatter(formatter) logger.addHandler(handler) - def _initialize_printer(self): + def _initialize_printers(self): + """Authenticate once, discover every device bound to the account, and open + one BambuPrinter (own MQTT thread) per device — all in this single process.""" from bambu_run.mqtt_client import BambuPrinter bambu_username = os.environ.get("BAMBU_USERNAME") @@ -169,30 +217,12 @@ class Command(BaseCommand): "environment variables must be set" ) - logger.info("Connecting to Bambu Lab printer...") + logger.info("Authenticating with Bambu Lab cloud...") try: - if bambu_token: - logger.info("Using saved BAMBU_TOKEN for authentication") - self.printer_client = BambuPrinter( - token=bambu_token, device_id=bambu_device_id - ) - else: - logger.info("Authenticating with username/password") - self.printer_client = BambuPrinter( - username=bambu_username, - password=bambu_password, - device_id=bambu_device_id, - ) - - logger.info("Initiating MQTT connection...") - self.printer_client.connect(blocking=False) - logger.info("MQTT connection initiated (non-blocking)") - # Request full status so AMS + dual-nozzle data arrive on startup. - try: - self._request_full_status_when_ready() - except Exception as e: - logger.warning("pushall request skipped (non-fatal): %s", e) - + auth = BambuPrinter( + username=bambu_username, password=bambu_password, token=bambu_token, + ) + self._token = auth._ensure_token() except Exception as e: if "CERTIFICATE_VERIFY_FAILED" in str(e) or "SSL" in str(e): error_msg = ( @@ -203,56 +233,62 @@ class Command(BaseCommand): "3. pip install --upgrade certifi\n" ) raise CommandError(error_msg) - raise CommandError(f"Failed to initialize printer client: {e}") + raise CommandError(f"Failed to authenticate: {e}") - self.printer_device = self._ensure_printer_device_exists() - logger.info(f"Initialized for printer device: {self.printer_device}") - - def _ensure_printer_device_exists(self) -> Printer: - try: - snapshot = self.printer_client.get_snapshot() - - if snapshot: - device, created = Printer.objects.update_or_create( - model="Bambu Lab", - defaults={ - "name": "Bambu Lab Printer", - "manufacturer": "Bambu Lab", - "is_active": True, - }, - ) - action = "Created" if created else "Updated" - logger.info(f"{action} printer device record: {device}") - return device - else: - logger.warning("Snapshot returned None - MQTT not connected yet") - device = Printer.objects.filter(is_active=True).first() - if device: - logger.info(f"Using existing device record: {device}") - return device - else: - device = Printer.objects.create( - name="Bambu Lab Printer", - model="Bambu Lab", - manufacturer="Bambu Lab", - is_active=True, - ) - logger.info(f"Created placeholder device: {device}") - return device - - except Exception as e: - logger.error(f"Error during device initialization: {e}") + device_infos = self._discover_devices(bambu_device_id) + for device_id, device_info in device_infos.items(): try: - device = Printer.objects.filter(is_active=True).first() - if device: - logger.warning(f"Using existing device record from DB: {device}") - return device - else: - raise CommandError( - "No printer device found in database and initialization failed." - ) - except Printer.DoesNotExist: - raise CommandError("Failed to create or retrieve printer device.") + self._add_session(device_id, device_info) + except Exception as e: + logger.error(f"Failed to initialize printer {device_id}: {e}") + + if not self.sessions: + raise CommandError("No printer sessions could be initialized") + + def _discover_devices(self, explicit_device_id: Optional[str]) -> Dict[str, dict]: + """Return {device_id: device_info} for every printer to monitor. + + device_info comes from BambuClient.get_devices() (name, dev_product_name, + etc.) — empty dict when explicitly pinned to one device via BAMBU_DEVICE_ID + and the cloud listing can't be reached. + """ + from bambu_run.mqtt_client import BambuClient + + try: + cloud = BambuClient(token=self._token) + devices = cloud.get_devices() + except Exception as e: + if explicit_device_id: + logger.warning(f"Could not list account devices ({e}); using BAMBU_DEVICE_ID only") + return {explicit_device_id: {}} + raise + + device_infos = {d.get("dev_id"): d for d in devices if d.get("dev_id")} + + if explicit_device_id: + return {explicit_device_id: device_infos.get(explicit_device_id, {})} + + if not device_infos: + raise CommandError("No devices found on this account") + + return device_infos + + def _add_session(self, device_id: str, device_info: dict) -> "DeviceSession": + from bambu_run.mqtt_client import BambuPrinter + + logger.info(f"Connecting to printer {device_id} ({device_info.get('name', 'unknown')})...") + client = BambuPrinter(token=self._token, device_id=device_id) + client.connect(blocking=False) + try: + self._request_full_status_when_ready(client) + except Exception as e: + logger.warning("pushall request skipped (non-fatal): %s", e) + + printer = resolve_printer_device(device_id, device_info) + session = DeviceSession(device_id=device_id, client=client, printer=printer) + self.sessions[device_id] = session + logger.info(f"Initialized session for printer: {printer}") + return session def _run_continuous_loop(self, interval: int): iteration = 0 @@ -263,7 +299,8 @@ class Command(BaseCommand): if self.verbose: logger.debug(f"=== Iteration {iteration} ===") - self._collect_printer_data() + for session in list(self.sessions.values()): + self._collect_printer_data(session) elapsed = time.time() - loop_start sleep_time = max(0, interval - elapsed) @@ -273,9 +310,28 @@ class Command(BaseCommand): if iteration % 100 == 0: self._print_statistics() + self._refresh_devices() time.sleep(sleep_time) + def _refresh_devices(self): + """Pick up printers added to the account without restarting the process.""" + if os.environ.get("BAMBU_DEVICE_ID"): + return # pinned to a single explicit device — nothing to discover + try: + device_infos = self._discover_devices(None) + except Exception as e: + logger.warning(f"Device refresh skipped (non-fatal): {e}") + return + + for device_id, device_info in device_infos.items(): + if device_id not in self.sessions: + logger.info(f"New printer detected on account: {device_id}") + try: + self._add_session(device_id, device_info) + except Exception as e: + logger.error(f"Failed to initialize newly-detected printer {device_id}: {e}") + def _convert_mqtt_color(self, mqtt_color): if not mqtt_color: return None @@ -498,6 +554,8 @@ class Command(BaseCommand): printer_metric=printer_metric, filament=filament, tray_id=tray_id, + ams_unit_id=unit_id_int, + ams_type=tray_data.get('ams_type', '') or '', slot_name=tray_data.get('slot'), type=tray_data.get('type'), sub_type=tray_data.get('sub_type'), @@ -513,19 +571,51 @@ class Command(BaseCommand): match_method=match_method ) - def _track_print_job(self, metric, snapshot): - from bambu_run.models import PrintJob, FilamentUsage + def _update_hotends(self, printer, printer_metric, hotends_data): + from bambu_run.models import Hotend, HotendSnapshot + + for h in hotends_data: + if h.get("is_empty"): + continue + + hotend, _ = Hotend.objects.update_or_create( + printer=printer, + serial_number=h.get("serial_number"), + defaults={ + "raw_id": h.get("raw_id", 0), + "nozzle_type": h.get("nozzle_type", ""), + "diameter": self._to_decimal(h.get("diameter")), + "slot_number": h.get("slot_number"), + "is_toolhead": bool(h.get("is_toolhead")), + "last_filament_profile_id": h.get("fila_id", ""), + "last_color": h.get("color") or "", + "used_time_seconds": h.get("used_time_seconds", 0), + "wear_percent": h.get("wear_percent", 0), + }, + ) + + HotendSnapshot.objects.create( + printer_metric=printer_metric, + hotend=hotend, + raw_id=h.get("raw_id", 0), + used_time_seconds=h.get("used_time_seconds", 0), + wear_percent=h.get("wear_percent", 0), + stat=h.get("stat"), + ) + + def _track_print_job(self, session, metric, snapshot): + from bambu_run.models import PrintJob gcode_state = snapshot.get('gcode_state') subtask_name = snapshot.get('subtask_name') - if self._is_print_starting(gcode_state, subtask_name): - if self.current_print_job: - self._finalize_print_job(metric, snapshot) + if self._is_print_starting(session, gcode_state, subtask_name): + if session.current_print_job: + self._finalize_print_job(session, metric, snapshot) raw_task_id = snapshot.get('task_id') - self.current_print_job = PrintJob.objects.create( - device=self.printer_device, + session.current_print_job = PrintJob.objects.create( + device=session.printer, project_name=subtask_name, gcode_file=snapshot.get('gcode_file'), start_time=metric.timestamp, @@ -534,109 +624,119 @@ class Command(BaseCommand): completion_percent=snapshot.get('print_percent', 0), cloud_task_id_raw=int(raw_task_id) if raw_task_id else None, ) - self.trays_used = set() - logger.info(f"Print job started: {subtask_name}") + session.trays_used = set() + logger.info(f"[{session.device_id}] Print job started: {subtask_name}") - if self.current_print_job: + if session.current_print_job: tray_now = snapshot.get('tray_now', '') if tray_now not in (None, '', '255'): try: tray_id = int(tray_now) if 0 <= tray_id <= 15: - self.trays_used.add(tray_id) + session.trays_used.add(tray_id) except (ValueError, TypeError): pass - if self._is_print_ending(gcode_state) and self.current_print_job: - self._finalize_print_job(metric, snapshot) + if self._is_print_ending(session, gcode_state) and session.current_print_job: + self._finalize_print_job(session, metric, snapshot) - self.last_gcode_state = gcode_state - self.last_subtask_name = subtask_name + session.last_gcode_state = gcode_state + session.last_subtask_name = subtask_name - def _is_print_starting(self, gcode_state, subtask_name): + def _is_print_starting(self, session, gcode_state, subtask_name): is_printing = gcode_state not in ['FINISH', 'IDLE', 'FAILED', None, ''] - has_new_job = subtask_name and subtask_name != self.last_subtask_name + has_new_job = subtask_name and subtask_name != session.last_subtask_name return is_printing and has_new_job - def _is_print_ending(self, gcode_state): + def _is_print_ending(self, session, gcode_state): ending_states = ['FINISH', 'FAILED'] - return gcode_state in ending_states and self.last_gcode_state not in ending_states + return gcode_state in ending_states and session.last_gcode_state not in ending_states - def _finalize_print_job(self, metric, snapshot): + def _finalize_print_job(self, session, metric, snapshot): from bambu_run.models import FilamentUsage - self.current_print_job.end_time = metric.timestamp - self.current_print_job.end_metric = metric - self.current_print_job.final_status = snapshot.get('gcode_state') - self.current_print_job.completion_percent = snapshot.get('print_percent', 0) - self.current_print_job.calculate_duration() - self.current_print_job.save() + job = session.current_print_job + job.end_time = metric.timestamp + job.end_metric = metric + job.final_status = snapshot.get('gcode_state') + job.completion_percent = snapshot.get('print_percent', 0) + job.calculate_duration() + job.save() try: from bambu_run.bambu_cloud import fetch_and_upsert_task - fetch_and_upsert_task(self.printer_client._client, self.current_print_job) + fetch_and_upsert_task(session.client._client, job) except Exception as e: logger.warning(f"Cloud task sync skipped (non-fatal): {e}") - start_metric = self.current_print_job.start_metric + start_metric = job.start_metric if not start_metric: - logger.warning(f"No start_metric for job {self.current_print_job.id}, skipping filament usage") - elif not self.trays_used: - logger.warning(f"No trays tracked for job {self.current_print_job.project_name}, skipping filament usage") + logger.warning(f"No start_metric for job {job.id}, skipping filament usage") + elif not session.trays_used: + logger.warning(f"No trays tracked for job {job.project_name}, skipping filament usage") else: - for tray_id in self.trays_used: - start_snap = start_metric.filament_snapshots.filter( + # A bare tray_id (from `tray_now`) doesn't identify which physical AMS + # unit was active when multiple units share the same slot numbering — + # so create one usage row per (unit, tray) that had a tracked filament + # loaded at job start, rather than guessing a single "correct" unit. + created_usages = [] + for tray_id in session.trays_used: + start_snaps = start_metric.filament_snapshots.filter( tray_id=tray_id, filament__isnull=False - ).first() - if not start_snap: - continue - - end_snap = metric.filament_snapshots.filter( - filament=start_snap.filament, tray_id=tray_id - ).first() - - usage = FilamentUsage.objects.create( - print_job=self.current_print_job, - filament=start_snap.filament, - tray_id=tray_id, - starting_percent=start_snap.remain_percent or 100, - ending_percent=end_snap.remain_percent if end_snap else None, - is_primary=(len(self.trays_used) == 1), ) - usage.calculate_consumed() + for start_snap in start_snaps: + end_snap = metric.filament_snapshots.filter( + filament=start_snap.filament, + tray_id=tray_id, + ams_unit_id=start_snap.ams_unit_id, + ).first() + + usage = FilamentUsage.objects.create( + print_job=job, + filament=start_snap.filament, + tray_id=tray_id, + ams_unit_id=start_snap.ams_unit_id, + starting_percent=start_snap.remain_percent or 100, + ending_percent=end_snap.remain_percent if end_snap else None, + ) + usage.calculate_consumed() + created_usages.append(usage) + + for usage in created_usages: + usage.is_primary = len(created_usages) == 1 usage.save() if self.verbose: logger.debug( - f"Filament usage for {start_snap.filament} (tray {tray_id}): " + f"Filament usage for {usage.filament} (unit {usage.ams_unit_id}, tray {usage.tray_id}): " f"{usage.starting_percent}% -> {usage.ending_percent}%, consumed {usage.consumed_percent}%" ) logger.info( - f"Print job finished: {self.current_print_job.project_name} " - f"({self.current_print_job.final_status}) - Duration: {self.current_print_job.duration_minutes} min, " - f"Trays used: {sorted(self.trays_used) if self.trays_used else 'none tracked'}" + f"[{session.device_id}] Print job finished: {job.project_name} " + f"({job.final_status}) - Duration: {job.duration_minutes} min, " + f"Trays used: {sorted(session.trays_used) if session.trays_used else 'none tracked'}" ) - self.current_print_job = None - self.trays_used = set() + session.current_print_job = None + session.trays_used = set() - def _collect_printer_data(self): + def _collect_printer_data(self, session: "DeviceSession"): try: - snapshot = self.printer_client.get_snapshot() + snapshot = session.client.get_snapshot() if snapshot is None: - self.mqtt_connect_errors += 1 - if self.mqtt_connect_errors <= 5 or self.verbose: + session.mqtt_connect_errors += 1 + if session.mqtt_connect_errors <= 5 or self.verbose: logger.warning( - f"MQTT not connected yet or no data available " - f"(attempt {self.mqtt_connect_errors})" + f"[{session.device_id}] MQTT not connected yet or no data available " + f"(attempt {session.mqtt_connect_errors})" ) return with transaction.atomic(): metric = PrinterMetrics.objects.create( - device=self.printer_device, + device=session.printer, timestamp=timezone.now(), nozzle_temp=self._to_decimal(snapshot.get("nozzle_temp")), nozzle_target_temp=self._to_decimal(snapshot.get("nozzle_target_temp")), @@ -688,27 +788,33 @@ class Command(BaseCommand): ams_units=snapshot.get("ams_units", []), external_spool=snapshot.get("external_spool", {}), lights_report=snapshot.get("lights_report", []), + vortek_raw=snapshot.get("vortek_raw", {}), + nozzle_info=snapshot.get("hotends", []), ) filaments_data = snapshot.get('filaments', []) if filaments_data: self._create_filament_snapshots(metric, filaments_data, snapshot) - self._track_print_job(metric, snapshot) + hotends_data = snapshot.get('hotends', []) + if hotends_data: + self._update_hotends(session.printer, metric, hotends_data) - self.success_count += 1 + self._track_print_job(session, metric, snapshot) + + session.success_count += 1 if self.verbose: logger.debug( - f"Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, " + f"[{session.device_id}] Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, " f"Bed={snapshot.get('bed_temp')}C, " f"Progress={snapshot.get('print_percent')}%, " f"State={snapshot.get('gcode_state')}" ) except Exception as e: - self.error_count += 1 - logger.error(f"Error collecting printer data (total errors: {self.error_count}): {e}") + session.error_count += 1 + logger.error(f"[{session.device_id}] Error collecting printer data (total errors: {session.error_count}): {e}") if self.verbose: logger.exception("Detailed traceback:") @@ -723,16 +829,20 @@ class Command(BaseCommand): def _print_statistics(self): if self.start_time: runtime = timezone.now() - self.start_time - total_collections = self.success_count + self.error_count + success_count = sum(s.success_count for s in self.sessions.values()) + error_count = sum(s.error_count for s in self.sessions.values()) + mqtt_connect_errors = sum(s.mqtt_connect_errors for s in self.sessions.values()) + total_collections = success_count + error_count success_rate = ( - (self.success_count / total_collections * 100) + (success_count / total_collections * 100) if total_collections > 0 else 0 ) logger.info("=== Statistics ===") logger.info(f"Runtime: {runtime}") - logger.info(f"Successful collections: {self.success_count}") - logger.info(f"Failed collections: {self.error_count}") - logger.info(f"MQTT connection warnings: {self.mqtt_connect_errors}") + logger.info(f"Printers tracked: {len(self.sessions)}") + logger.info(f"Successful collections: {success_count}") + logger.info(f"Failed collections: {error_count}") + logger.info(f"MQTT connection warnings: {mqtt_connect_errors}") logger.info(f"Success rate: {success_rate:.1f}%") diff --git a/bambu_run/management/commands/bambu_diagnose.py b/bambu_run/management/commands/bambu_diagnose.py new file mode 100644 index 0000000..fc18c46 --- /dev/null +++ b/bambu_run/management/commands/bambu_diagnose.py @@ -0,0 +1,128 @@ +""" +Diagnose multi-printer cloud data for a Bambu Lab account. + +Run this if `bambu_collector` doesn't pick up all your printers, or the data +collected for a second/third printer looks wrong. It authenticates with your +Bambu Lab account, lists every device the cloud API reports, listens briefly +for raw MQTT data from each one, and writes a redacted JSON report you can +attach to a GitHub issue. + +Usage: + python manage.py bambu_diagnose + python manage.py bambu_diagnose --listen-seconds 15 + python manage.py bambu_diagnose --output my_report.json + python manage.py bambu_diagnose --no-redact # local debugging only — do NOT post this output publicly +""" + +import json +import logging +import time + +from django.core.management.base import BaseCommand, CommandError + +from bambu_run.diagnostics import build_diagnostics_report, redact_diagnostics + +logger = logging.getLogger("bambu_run.diagnose") + + +class Command(BaseCommand): + help = "Authenticate, list every printer on the account, and write a redacted diagnostics report." + + def add_arguments(self, parser): + parser.add_argument( + "--listen-seconds", type=float, default=8.0, + help="How long to listen for MQTT data per device (default: 8)", + ) + parser.add_argument( + "--output", type=str, default=None, + help="Output file path (default: bambu_diagnostics_.json)", + ) + parser.add_argument( + "--no-redact", action="store_true", + help="Keep full serials/identifiers unmasked. For your own debugging only — " + "do not paste this output into a public GitHub issue.", + ) + + def handle(self, *args, **options): + import os + from bambu_run.mqtt_client import BambuPrinter, BambuClient + + listen_seconds = options["listen_seconds"] + redact = not options["no_redact"] + + bambu_username = os.environ.get("BAMBU_USERNAME") + bambu_password = os.environ.get("BAMBU_PASSWORD") + bambu_token = os.environ.get("BAMBU_TOKEN") + + if not bambu_token and not all([bambu_username, bambu_password]): + raise CommandError( + "Either BAMBU_TOKEN or both BAMBU_USERNAME and BAMBU_PASSWORD " + "environment variables must be set" + ) + + self.stdout.write("Authenticating with Bambu Lab cloud...") + auth = BambuPrinter(username=bambu_username, password=bambu_password, token=bambu_token) + token = auth._ensure_token() + + cloud = BambuClient(token=token) + devices = cloud.get_devices() + + self.stdout.write(self.style.SUCCESS(f"Found {len(devices)} device(s) on this account:")) + for device in devices: + self.stdout.write( + f" - {device.get('name', 'unknown')} " + f"({device.get('dev_product_name', 'unknown model')}) " + f"online={device.get('online')}" + ) + if len(devices) < 2: + self.stdout.write(self.style.WARNING( + "Only one device returned by the cloud API — if you own multiple printers, " + "this is likely the root cause. Note this in the GitHub issue." + )) + + raw_payloads = {} + for device in devices: + dev_id = device.get("dev_id") + if not dev_id: + continue + self.stdout.write(f"Listening to {device.get('name', dev_id)} for {listen_seconds:.0f}s...") + client = BambuPrinter(token=token, device_id=dev_id) + try: + client.connect(blocking=False) + self._request_full_status_when_ready(client) + time.sleep(listen_seconds) + state = client.get_state() + raw_payloads[dev_id] = state._raw_data.get("print") if state._raw_data else None + except Exception as e: + self.stdout.write(self.style.WARNING(f" Could not collect data for {dev_id}: {e}")) + raw_payloads[dev_id] = None + finally: + client.disconnect() + + report = build_diagnostics_report(devices, raw_payloads) + report = redact_diagnostics(report, redact=redact) + + output_path = options["output"] or f"bambu_diagnostics_{int(time.time())}.json" + with open(output_path, "w") as f: + json.dump(report, f, indent=2, default=str) + + self.stdout.write(self.style.SUCCESS(f"\nDiagnostics written to: {output_path}")) + if not redact: + self.stdout.write(self.style.WARNING( + "--no-redact was used: this file contains unmasked serials/identifiers. " + "Do not attach it to a public GitHub issue as-is." + )) + else: + self.stdout.write( + "Serials/identifiers are masked. Please skim the file once before posting — " + "then attach it to https://github.com/RunLit/Bambu-Run/issues/10" + ) + + def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None: + deadline = time.time() + timeout + while time.time() < deadline: + mqtt_client = getattr(client, "_mqtt", None) + if mqtt_client is not None and getattr(mqtt_client, "connected", False): + client._mqtt.request_full_status() + return + time.sleep(0.5) diff --git a/bambu_run/migrations/0005_printermetrics_vortek_raw.py b/bambu_run/migrations/0005_printermetrics_vortek_raw.py new file mode 100644 index 0000000..c0b88fb --- /dev/null +++ b/bambu_run/migrations/0005_printermetrics_vortek_raw.py @@ -0,0 +1,22 @@ +# Generated by Django 5.2.8 on 2026-06-18 12:20 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("bambu_run", "0004_h2c_dual_nozzle_and_ams_fields"), + ] + + operations = [ + migrations.AddField( + model_name="printermetrics", + name="vortek_raw", + field=models.JSONField( + blank=True, + default=dict, + help_text="Raw print.device MQTT payload (Vortek rack groundwork)", + ), + ), + ] diff --git a/bambu_run/migrations/0006_alter_filamentsnapshot_options_and_more.py b/bambu_run/migrations/0006_alter_filamentsnapshot_options_and_more.py new file mode 100644 index 0000000..4565013 --- /dev/null +++ b/bambu_run/migrations/0006_alter_filamentsnapshot_options_and_more.py @@ -0,0 +1,63 @@ +# Generated by Django 5.2.8 on 2026-06-20 12:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("bambu_run", "0005_printermetrics_vortek_raw"), + ] + + operations = [ + migrations.AlterModelOptions( + name="filamentsnapshot", + options={ + "ordering": ["printer_metric", "ams_unit_id", "tray_id"], + "verbose_name": "Filament Snapshot", + "verbose_name_plural": "Filament Snapshots", + }, + ), + migrations.AddField( + model_name="filamentsnapshot", + name="ams_type", + field=models.CharField( + blank=True, + choices=[ + ("AMS", "AMS"), + ("AMS 2 Pro", "AMS 2 Pro"), + ("AMS HT", "AMS HT"), + ], + default="", + help_text="Type of the AMS unit this tray belongs to (AMS / AMS 2 Pro / AMS HT)", + max_length=32, + ), + ), + migrations.AddField( + model_name="filamentsnapshot", + name="ams_unit_id", + field=models.PositiveSmallIntegerField( + blank=True, + db_index=True, + help_text="Which physical AMS unit this tray belongs to (matches MQTT ams[i].id; 128 = AMS HT)", + null=True, + ), + ), + migrations.AddField( + model_name="filamentusage", + name="ams_unit_id", + field=models.PositiveSmallIntegerField( + blank=True, + db_index=True, + help_text="Which physical AMS unit the slot belongs to (matches MQTT ams[i].id; 128 = AMS HT)", + null=True, + ), + ), + migrations.AddIndex( + model_name="filamentsnapshot", + index=models.Index( + fields=["printer_metric", "ams_unit_id", "tray_id"], + name="infrastruct_printer_2ad168_idx", + ), + ), + ] diff --git a/bambu_run/migrations/0007_hotend_hotendsnapshot.py b/bambu_run/migrations/0007_hotend_hotendsnapshot.py new file mode 100644 index 0000000..adf7a75 --- /dev/null +++ b/bambu_run/migrations/0007_hotend_hotendsnapshot.py @@ -0,0 +1,172 @@ +# Generated by Django 5.2.8 on 2026-06-20 14:07 + +import django.db.models.deletion +import django.utils.timezone +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("bambu_run", "0006_alter_filamentsnapshot_options_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="Hotend", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("serial_number", models.CharField(db_index=True, max_length=100)), + ( + "nozzle_type", + models.CharField(blank=True, default="", max_length=50), + ), + ( + "diameter", + models.DecimalField( + blank=True, decimal_places=2, max_digits=3, null=True + ), + ), + ( + "raw_id", + models.PositiveSmallIntegerField( + help_text="Last-seen MQTT device.nozzle.info[].id" + ), + ), + ( + "slot_number", + models.PositiveSmallIntegerField( + blank=True, + help_text="Rack bay 1-6, derived from raw_id 16-21. Null if currently unknown (e.g. mounted on toolhead and id reports as the 0 sentinel).", + null=True, + ), + ), + ( + "is_toolhead", + models.BooleanField( + default=False, + help_text="True if currently mounted on the toolhead under normal polling (raw_id == 0).", + ), + ), + ( + "last_filament_profile_id", + models.CharField( + blank=True, + default="", + help_text="Bambu material profile id of the filament last loaded (MQTT fila_id, e.g. 'GFA01')", + max_length=20, + ), + ), + ( + "last_color", + models.CharField( + blank=True, + default="", + help_text="6-char hex of the filament last loaded (MQTT color_m, alpha stripped)", + max_length=6, + ), + ), + ("used_time_seconds", models.PositiveIntegerField(default=0)), + ( + "wear_percent", + models.DecimalField( + decimal_places=2, + default=0, + help_text="MQTT wear (0-128 scale) converted to a 0-100 percent", + max_digits=5, + ), + ), + ("last_seen_at", models.DateTimeField(auto_now=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "printer", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="hotends", + to="bambu_run.printer", + ), + ), + ], + options={ + "verbose_name": "Hotend", + "verbose_name_plural": "Hotends", + "db_table": "infrastructure_hotend", + "ordering": ["printer", "-is_toolhead", "slot_number", "serial_number"], + "unique_together": {("printer", "serial_number")}, + }, + ), + migrations.CreateModel( + name="HotendSnapshot", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("raw_id", models.PositiveSmallIntegerField()), + ("used_time_seconds", models.PositiveIntegerField(default=0)), + ( + "wear_percent", + models.DecimalField(decimal_places=2, default=0, max_digits=5), + ), + ( + "stat", + models.IntegerField( + blank=True, + help_text="Raw MQTT status code for this hotend", + null=True, + ), + ), + ( + "timestamp", + models.DateTimeField( + db_index=True, default=django.utils.timezone.now + ), + ), + ( + "hotend", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="snapshots", + to="bambu_run.hotend", + ), + ), + ( + "printer_metric", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="hotend_snapshots", + to="bambu_run.printermetrics", + ), + ), + ], + options={ + "verbose_name": "Hotend Snapshot", + "verbose_name_plural": "Hotend Snapshots", + "db_table": "infrastructure_hotend_snapshot", + "ordering": ["printer_metric", "hotend"], + "indexes": [ + models.Index( + fields=["printer_metric", "hotend"], + name="infrastruct_printer_b528aa_idx", + ), + models.Index( + fields=["hotend", "-timestamp"], + name="infrastruct_hotend__691f7e_idx", + ), + ], + }, + ), + ] diff --git a/bambu_run/migrations/0008_printermetrics_nozzle_info.py b/bambu_run/migrations/0008_printermetrics_nozzle_info.py new file mode 100644 index 0000000..4880437 --- /dev/null +++ b/bambu_run/migrations/0008_printermetrics_nozzle_info.py @@ -0,0 +1,22 @@ +# Generated by Django 5.2.8 on 2026-06-20 14:22 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("bambu_run", "0007_hotend_hotendsnapshot"), + ] + + operations = [ + migrations.AddField( + model_name="printermetrics", + name="nozzle_info", + field=models.JSONField( + blank=True, + default=list, + help_text="Parsed per-poll nozzle/hotend info list", + ), + ), + ] diff --git a/bambu_run/models.py b/bambu_run/models.py index cfa3ee1..85dbff3 100644 --- a/bambu_run/models.py +++ b/bambu_run/models.py @@ -21,12 +21,15 @@ AMS_TYPE_CHOICES = [ def ams_type_from_info(info_code) -> str: """Resolve an AMS unit's `info` model code to a human label. - The HT unit reports its `id` with the 0x80 bit set (e.g. 128) — when the info - code is unknown, that bit is a reasonable secondary hint for HT identification. + Real MQTT `info` codes are 8 characters (e.g. "10001003") with the type encoded + in the last 4 digits — confirmed against a live H2C with AMS 2 Pro / AMS / AMS HT. + Fall back to an exact match for the bare 4-digit form in case other firmware + reports it short. """ - if info_code is None: + if not info_code: return "" - return AMS_INFO_TO_TYPE.get(str(info_code), "") + code = str(info_code) + return AMS_INFO_TO_TYPE.get(code[-4:], "") or AMS_INFO_TO_TYPE.get(code, "") class Printer(models.Model): @@ -231,6 +234,23 @@ class PrinterMetrics(models.Model): default=list, help_text="Light status report [{node, mode}]" ) + # Groundwork for H2C's Vortek nozzle-changer rack (6 swappable hotends + 1 fixed + # left nozzle) — the full MQTT schema for per-slot state isn't confirmed yet, so + # the raw `print.device` payload is captured here unfiltered to avoid losing data + # ahead of proper per-slot modeling. + vortek_raw = models.JSONField( + default=dict, blank=True, help_text="Raw print.device MQTT payload (Vortek rack groundwork)" + ) + + # Parsed device.nozzle.info[] from this poll, one dict per entry (mirrors + # HotendInfo.to_dict()). Includes induction-chip hotends *and* non-inductive + # nozzle positions (e.g. H2C's fixed left nozzle) that have no stable serial + # number to key a Hotend registry row on — kept here so the dashboard can show + # their readable type/diameter without claiming an identity/history we don't have. + nozzle_info = models.JSONField( + default=list, blank=True, help_text="Parsed per-poll nozzle/hotend info list" + ) + class Meta: db_table = "infrastructure_printer_metrics" verbose_name = "Printer Metric" @@ -490,6 +510,15 @@ class FilamentSnapshot(models.Model): max_length=20, null=True, blank=True, help_text="Slot identifier like A00-W1" ) + ams_unit_id = models.PositiveSmallIntegerField( + null=True, blank=True, db_index=True, + help_text="Which physical AMS unit this tray belongs to (matches MQTT ams[i].id; 128 = AMS HT)" + ) + ams_type = models.CharField( + max_length=32, blank=True, default="", + choices=AMS_TYPE_CHOICES, + help_text="Type of the AMS unit this tray belongs to (AMS / AMS 2 Pro / AMS HT)" + ) type = models.CharField(max_length=50, null=True, blank=True) sub_type = models.CharField( @@ -537,9 +566,10 @@ class FilamentSnapshot(models.Model): db_table = "infrastructure_filament_snapshot" verbose_name = "Filament Snapshot" verbose_name_plural = "Filament Snapshots" - ordering = ['printer_metric', 'tray_id'] + ordering = ['printer_metric', 'ams_unit_id', 'tray_id'] indexes = [ models.Index(fields=['printer_metric', 'tray_id']), + models.Index(fields=['printer_metric', 'ams_unit_id', 'tray_id']), models.Index(fields=['filament']), ] @@ -678,6 +708,10 @@ class FilamentUsage(models.Model): ) tray_id = models.IntegerField(help_text="Which AMS slot was used") + ams_unit_id = models.PositiveSmallIntegerField( + null=True, blank=True, db_index=True, + help_text="Which physical AMS unit the slot belongs to (matches MQTT ams[i].id; 128 = AMS HT)" + ) starting_percent = models.IntegerField(help_text="Filament remaining % at job start") ending_percent = models.IntegerField( @@ -715,3 +749,108 @@ class FilamentUsage(models.Model): self.consumed_grams = int( self.filament.initial_weight_grams * (self.consumed_percent / 100.0) ) + + +class Hotend(models.Model): + """Registry of individual Vortek hotends, keyed by serial number. + + A Vortek rack holds up to 6 swappable hotends (bays, MQTT `id` 16-21) plus + 1 mounted on the toolhead at a time (MQTT `id` 0). `raw_id` reflects whichever + address was last seen on the wire for this hotend; `slot_number` is only set + when that address falls in the 16-21 rack-bay range — confirmed by watching + a "Read All" MQTT capture reassign a toolhead-mounted hotend's id from 0 to + its true bay id. + """ + + printer = models.ForeignKey( + 'Printer', on_delete=models.CASCADE, related_name='hotends' + ) + serial_number = models.CharField(max_length=100, db_index=True) + + nozzle_type = models.CharField(max_length=50, blank=True, default="") + diameter = models.DecimalField( + max_digits=3, decimal_places=2, null=True, blank=True + ) + + raw_id = models.PositiveSmallIntegerField( + help_text="Last-seen MQTT device.nozzle.info[].id" + ) + slot_number = models.PositiveSmallIntegerField( + null=True, blank=True, + help_text="Rack bay 1-6, derived from raw_id 16-21. Null if currently unknown (e.g. mounted on toolhead and id reports as the 0 sentinel)." + ) + is_toolhead = models.BooleanField( + default=False, + help_text="True if currently mounted on the toolhead under normal polling (raw_id == 0)." + ) + + last_filament_profile_id = models.CharField( + max_length=20, blank=True, default="", + help_text="Bambu material profile id of the filament last loaded (MQTT fila_id, e.g. 'GFA01')" + ) + last_color = models.CharField( + max_length=6, blank=True, default="", + help_text="6-char hex of the filament last loaded (MQTT color_m, alpha stripped)" + ) + + used_time_seconds = models.PositiveIntegerField(default=0) + wear_percent = models.DecimalField( + max_digits=5, decimal_places=2, default=0, + help_text="MQTT wear (0-128 scale) converted to a 0-100 percent" + ) + + last_seen_at = models.DateTimeField(auto_now=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + db_table = "infrastructure_hotend" + verbose_name = "Hotend" + verbose_name_plural = "Hotends" + ordering = ['printer', '-is_toolhead', 'slot_number', 'serial_number'] + unique_together = [['printer', 'serial_number']] + + def __str__(self): + location = "Toolhead" if self.is_toolhead else ( + f"Slot {self.slot_number}" if self.slot_number else "Rack" + ) + return f"{self.serial_number} ({location})" + + @property + def used_time_display(self) -> str: + hours, remainder = divmod(self.used_time_seconds, 3600) + minutes = remainder // 60 + return f"{hours}h {minutes}m" if hours else f"{minutes}m" + + +class HotendSnapshot(models.Model): + """Point-in-time reading of a Hotend, one row per collector poll.""" + + printer_metric = models.ForeignKey( + 'PrinterMetrics', on_delete=models.CASCADE, + related_name='hotend_snapshots' + ) + hotend = models.ForeignKey( + 'Hotend', on_delete=models.CASCADE, + related_name='snapshots' + ) + + raw_id = models.PositiveSmallIntegerField() + used_time_seconds = models.PositiveIntegerField(default=0) + wear_percent = models.DecimalField(max_digits=5, decimal_places=2, default=0) + stat = models.IntegerField( + null=True, blank=True, help_text="Raw MQTT status code for this hotend" + ) + timestamp = models.DateTimeField(default=timezone.now, db_index=True) + + class Meta: + db_table = "infrastructure_hotend_snapshot" + verbose_name = "Hotend Snapshot" + verbose_name_plural = "Hotend Snapshots" + ordering = ['printer_metric', 'hotend'] + indexes = [ + models.Index(fields=['printer_metric', 'hotend']), + models.Index(fields=['hotend', '-timestamp']), + ] + + def __str__(self): + return f"{self.hotend.serial_number} @ {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}" diff --git a/bambu_run/mqtt_client.py b/bambu_run/mqtt_client.py index b451c98..e8474d3 100644 --- a/bambu_run/mqtt_client.py +++ b/bambu_run/mqtt_client.py @@ -296,6 +296,73 @@ class AMSState: return loaded +@dataclass +class HotendInfo: + """A single hotend reported in `device.nozzle.info[]` (Vortek rack). + + `raw_id` semantics (confirmed by watching a live "Read All" MQTT capture): + 0 = currently mounted on the (swappable) toolhead — the sentinel hides the + true bay address until "Read All" resolves it; 1 = the fixed left nozzle on + dual-nozzle printers (no RFID chip, always reports sn="N/A"); 16-21 = rack + bay address, slot_number = raw_id - 15 (1-6). + """ + raw_id: int = 0 + serial_number: str = "" + nozzle_type: str = "" + diameter: float = 0.4 + fila_id: str = "" + color: Optional[str] = None + used_time_seconds: int = 0 + wear_percent: float = 0.0 + stat: int = 0 + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "HotendInfo": + from .utils import strip_color_padding + + return cls( + raw_id=int(data.get("id", 0)), + serial_number=data.get("sn", ""), + nozzle_type=data.get("type", ""), + diameter=float(data.get("diameter", 0.4)), + fila_id=data.get("fila_id", ""), + color=strip_color_padding(data.get("color_m")), + used_time_seconds=int(data.get("p_t", 0)), + wear_percent=round(float(data.get("wear", 0.0)) / 128.0 * 100, 2), + stat=int(data.get("stat", 0)), + ) + + @property + def is_toolhead(self) -> bool: + return self.raw_id == 0 + + @property + def is_empty(self) -> bool: + return self.serial_number in ("", "N/A") + + @property + def slot_number(self) -> Optional[int]: + if 16 <= self.raw_id <= 21: + return self.raw_id - 15 + return None + + def to_dict(self) -> Dict[str, Any]: + return { + "raw_id": self.raw_id, + "serial_number": self.serial_number, + "nozzle_type": self.nozzle_type, + "diameter": self.diameter, + "fila_id": self.fila_id, + "color": self.color, + "used_time_seconds": self.used_time_seconds, + "wear_percent": self.wear_percent, + "stat": self.stat, + "is_toolhead": self.is_toolhead, + "is_empty": self.is_empty, + "slot_number": self.slot_number, + } + + @dataclass class PrinterState: """Complete printer state parsed from MQTT data""" @@ -388,6 +455,9 @@ class PrinterState: # External spool (virtual tray) vt_tray: Optional[Dict[str, Any]] = None + # Vortek hotend rack (device.nozzle.info[]) + hotends: List[HotendInfo] = field(default_factory=list) + # Raw data for any additional fields _raw_data: Dict[str, Any] = field(default_factory=dict, repr=False) @@ -431,6 +501,12 @@ class PrinterState: nozzle_target_temp_left = float((t >> 16) & 0xFFFF) nozzle_temp_left = float(t & 0xFFFF) + # Vortek hotend rack: device.nozzle.info[] — one entry per hotend. + hotends = [ + HotendInfo.from_dict(h) + for h in (device.get("nozzle") or {}).get("info") or [] + ] + return cls( timestamp=timestamp, sequence_id=str(print_data.get("sequence_id", "")), @@ -487,6 +563,7 @@ class PrinterState: gcode_file_prepare_percent=str(print_data.get("gcode_file_prepare_percent", "")), lifecycle=print_data.get("lifecycle", ""), vt_tray=print_data.get("vt_tray"), + hotends=hotends, _raw_data=data, ) @@ -529,6 +606,11 @@ class PrinterState: "wifi_signal_dbm": self.wifi_signal_dbm, "print_error": self.print_error, "has_errors": self.print_error != 0, + # Full `print.device` payload, unfiltered. H2C's Vortek rack (6 swappable + # hotends + 1 fixed left nozzle) isn't fully modeled yet — stash everything + # here so no data is lost once the real Vortek MQTT schema is confirmed. + "vortek_raw": self._raw_data.get("print", {}).get("device", {}), + "hotends": [h.to_dict() for h in self.hotends], "hms": self.hms, "stg_cur": self.stg_cur, "lights_report": self.lights_report, diff --git a/bambu_run/static/bambu_run/css/dashboard.css b/bambu_run/static/bambu_run/css/dashboard.css index bda76e6..1f826e1 100644 --- a/bambu_run/static/bambu_run/css/dashboard.css +++ b/bambu_run/static/bambu_run/css/dashboard.css @@ -64,3 +64,93 @@ opacity: 0.9; color: rgba(255, 255, 255, 0.9); } + +/* AMS unit type colors — CSS variables so RAE/standalone can override per theme */ +:root { + --ams-badge-ams: #6c757d; + --ams-badge-ams-2-pro: #0d6efd; + --ams-badge-ams-ht: #fd7e14; + --ams-group-border-color: rgba(0, 0, 0, 0.15); +} + +[data-coreui-theme="dark"] { + --ams-group-border-color: rgba(255, 255, 255, 0.2); +} + +.ams-badge-ams { + background-color: var(--ams-badge-ams); + color: #fff; +} + +.ams-badge-ams-2-pro { + background-color: var(--ams-badge-ams-2-pro); + color: #fff; +} + +.ams-badge-ams-ht { + background-color: var(--ams-badge-ams-ht); + color: #fff; +} + +.ams-filter-pills { + display: flex; + flex-wrap: wrap; + gap: 0.5rem; +} + +.ams-filter-pill { + border-radius: 50rem; + padding: 0.25rem 0.9rem; + font-size: 0.85rem; + border: 1px solid var(--ams-group-border-color); + background-color: transparent; + opacity: 0.6; +} + +.ams-filter-pill.active { + opacity: 1; + font-weight: 600; + border-color: currentColor; +} + +/* Grouped AMS unit panels — wide (multi-slot) units stack one per row, + compact (single-slot, e.g. AMS HT) units flow side-by-side and wrap. */ +.ams-groups { + display: flex; + flex-wrap: wrap; + gap: 1rem; +} + +.ams-group { + border-radius: 0.5rem; + padding: 0.75rem; + border: 1px solid var(--ams-group-border-color); +} + +.ams-group--wide { + flex: 1 1 100%; +} + +.ams-group--compact { + flex: 0 1 auto; + min-width: 220px; +} + +.ams-badge-bg-ams { + background-color: color-mix(in srgb, var(--ams-badge-ams) 8%, transparent); + border-left: 3px solid var(--ams-badge-ams); +} + +.ams-badge-bg-ams-2-pro { + background-color: color-mix(in srgb, var(--ams-badge-ams-2-pro) 8%, transparent); + border-left: 3px solid var(--ams-badge-ams-2-pro); +} + +.ams-badge-bg-ams-ht { + background-color: color-mix(in srgb, var(--ams-badge-ams-ht) 8%, transparent); + border-left: 3px solid var(--ams-badge-ams-ht); +} + +.ams-badge-bg- { + border-left: 3px solid var(--ams-group-border-color); +} diff --git a/bambu_run/static/bambu_run/js/printer_charts.js b/bambu_run/static/bambu_run/js/printer_charts.js index a40a015..ecce8c4 100644 --- a/bambu_run/static/bambu_run/js/printer_charts.js +++ b/bambu_run/static/bambu_run/js/printer_charts.js @@ -625,8 +625,23 @@ function createFilamentDatasets(filamentTimeline, timestamps) { data: filamentTimeline[key] })); - // Sort by tray_id (numeric first, External last), then by start_idx (chronological) + // Distinct (non-null/undefined) AMS units present in this timeline — used to decide + // whether labels need an AMS unit prefix (avoid noise for the common single-AMS case). + const distinctUnits = new Set( + filamentEntries + .map(e => e.data.ams_unit_id) + .filter(uid => uid !== null && uid !== undefined) + ); + const showUnitPrefix = distinctUnits.size > 1; + + // Sort by AMS unit, then tray_id (numeric first, External last), then by start_idx filamentEntries.sort((a, b) => { + const unitA = a.data.ams_unit_id ?? -1; + const unitB = b.data.ams_unit_id ?? -1; + if (unitA !== unitB) { + return unitA - unitB; + } + const trayA = a.data.tray_id; const trayB = b.data.tray_id; @@ -659,6 +674,10 @@ function createFilamentDatasets(filamentTimeline, timestamps) { displayLabel = `Tray ${filament.tray_id} (${filament.type})`; } + if (showUnitPrefix && filament.ams_type) { + displayLabel = `${filament.ams_type} · ${displayLabel}`; + } + // Add brand if it's different from type (avoid redundancy) if (filament.brand && filament.brand !== filament.type && filament.brand !== 'External') { displayLabel += ` - ${filament.brand}`; diff --git a/bambu_run/templates/bambu_run/printer_dashboard.html b/bambu_run/templates/bambu_run/printer_dashboard.html index 3b0ecc7..a171f74 100644 --- a/bambu_run/templates/bambu_run/printer_dashboard.html +++ b/bambu_run/templates/bambu_run/printer_dashboard.html @@ -14,6 +14,20 @@ Real-time monitoring for {{ device_name }}

+ {% if show_printer_switcher %} +
+ + +
+ {% endif %} {% if error %} @@ -23,32 +37,38 @@
{% if stats.is_dual_nozzle %} - +
-
+
-
Right Nozzle
-
{{ stats.nozzle_temp|floatformat:1 }}°C
-
target {{ stats.nozzle_target_temp|floatformat:0 }}°C - {% if stats.nozzle_type %}· {{ stats.nozzle_type }}{% endif %}
+
+ + Left Nozzle +
+
{{ stats.nozzle_temp_left|floatformat:1 }}°C
+
target {{ stats.nozzle_target_temp_left|floatformat:0 }}°C + {% if stats.nozzle_type_left %}· Nozzle {{ stats.nozzle_type_left }}{% endif %}
- +
-
+
-
Left Nozzle
-
{{ stats.nozzle_temp_left|floatformat:1 }}°C
-
target {{ stats.nozzle_target_temp_left|floatformat:0 }}°C - {% if stats.nozzle_type_left %}· {{ stats.nozzle_type_left }}{% endif %}
+
+ Right Nozzle + +
+
{{ stats.nozzle_temp|floatformat:1 }}°C
+
target {{ stats.nozzle_target_temp|floatformat:0 }}°C + {% if stats.nozzle_type %}· Nozzle {{ stats.nozzle_type }}{% endif %}
@@ -58,7 +78,7 @@ {% else %}
-
+
@@ -74,7 +94,7 @@
-
+
@@ -89,7 +109,7 @@
-
+
@@ -104,7 +124,7 @@
-
+
@@ -184,34 +204,59 @@
{% if stats.filaments %} -
- {% for filament in stats.filaments %} -
-
-
-
-
Tray {{ filament.tray_id }}
- {% if filament.filament_pk %} - - - - {% endif %} -
-

{{ filament.type }} - {{ filament.brand }}

- {% if filament.color_name %}

{{ filament.color_name }}

{% endif %} -
- Remaining - {{ filament.remain_percent }}% -
-
-
+ {% if stats.ams_units|length > 1 %} +
+ + {% for unit in stats.ams_units %} + + {% endfor %} +
+ {% endif %} +
+ {% for group in stats.ams_groups %} +
+
+ {{ group.label }} + {% if group.humidity is not None or group.temp is not None %} + + {% if group.humidity is not None %}{{ group.humidity }}%RH{% endif %} + {% if group.temp is not None %}· {{ group.temp }}°C{% endif %} + + {% endif %} +
+
+ {% for filament in group.filaments %} +
+
+
+
+
Tray {{ filament.tray_id }}
+ {% if filament.filament_pk %} + + + + {% endif %} +
+

{{ filament.type }} - {{ filament.brand }}

+ {% if filament.color_name %}

{{ filament.color_name }}

{% endif %} +
+ Remaining + {{ filament.remain_percent }}% +
+
+
+
+
+ {% endfor %}
{% endfor %} +
- {% if stats.external_spool.type %} + {% if stats.external_spool.type %} +
@@ -227,8 +272,8 @@
- {% endif %}
+ {% endif %} {% else %}

No filament data available

{% endif %} @@ -237,6 +282,64 @@
+ + {% if stats.hotends or stats.nozzle_positions %} +
+
+
+
+
Hotends
+
+
+
+ {% for hotend in stats.hotends %} +
+
+
+
+
+ {% if hotend.is_toolhead %}Toolhead{% elif hotend.slot_number %}Slot {{ hotend.slot_number }}{% else %}Rack{% endif %} +
+ {% if hotend.is_toolhead %}Toolhead{% endif %} +
+

SN {{ hotend.serial_number }}

+

{{ hotend.nozzle_type }}{% if hotend.diameter %} · {{ hotend.diameter }}mm{% endif %}

+ {% if hotend.last_filament_profile_id %}

Last: {{ hotend.last_filament_profile_id }}

{% endif %} +
+ Used time + {{ hotend.used_time_display }} +
+
+ Wear + {{ hotend.wear_percent|floatformat:0 }}% +
+
+
+
+
+
+
+ {% endfor %} + {% for nozzle in stats.nozzle_positions %} +
+
+
+
+
{% if nozzle.is_toolhead %}Toolhead{% else %}Fixed Nozzle{% endif %}
+
+

{{ nozzle.nozzle_type }}{% if nozzle.diameter %} · {{ nozzle.diameter }}mm{% endif %}

+

No induction chip data

+
+
+
+ {% endfor %} +
+
+
+
+
+ {% endif %} + {% if not is_basic_user %}
@@ -423,12 +526,12 @@ -{% if not is_basic_user %} - +{% if not is_basic_user and printer_device %} + {% endif %} + {% endblock %} diff --git a/bambu_run/urls.py b/bambu_run/urls.py index 4955e3a..de412de 100644 --- a/bambu_run/urls.py +++ b/bambu_run/urls.py @@ -5,7 +5,9 @@ app_name = "bambu_run" urlpatterns = [ path("", views.PrinterDashboardView.as_view(), name="printer_dashboard"), + path("printer//", views.PrinterDashboardView.as_view(), name="printer_dashboard"), path("api/printer/", views.PrinterDataAPIView.as_view(), name="printer_api"), + path("api/printer//", views.PrinterDataAPIView.as_view(), name="printer_api"), # Filament Inventory routes path("filaments/", views.FilamentListView.as_view(), name="filament_list"), diff --git a/bambu_run/views.py b/bambu_run/views.py index fb2991b..414d5ba 100644 --- a/bambu_run/views.py +++ b/bambu_run/views.py @@ -1,8 +1,9 @@ from datetime import timedelta, datetime from django.views.generic import TemplateView, View, ListView, CreateView, UpdateView, DetailView, DeleteView from django.contrib.auth.mixins import LoginRequiredMixin +from django.shortcuts import get_object_or_404 from django.utils import timezone -from django.http import JsonResponse +from django.http import Http404, JsonResponse from django.urls import reverse_lazy from django.contrib import messages from django.db.models import Q, Sum @@ -10,7 +11,7 @@ import json import zoneinfo from .conf import app_settings -from .models import Printer, PrinterMetrics, Filament, FilamentColor, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage +from .models import Printer, PrinterMetrics, Filament, FilamentColor, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, Hotend from .forms import FilamentForm, FilamentColorForm, FilamentTypeForm _METRICS_API_FIELDS = [ @@ -26,6 +27,17 @@ _METRICS_API_FIELDS = [ _MAX_CHART_POINTS = 3000 +def resolve_printer_from_request(pk): + """Resolve which Printer a dashboard/API view should show. + + `pk` given (URL kwarg) -> that exact printer, 404 if missing/inactive. + `pk` omitted -> first active printer (today's single-printer default behavior). + """ + if pk is not None: + return get_object_or_404(Printer, pk=pk, is_active=True) + return Printer.objects.filter(is_active=True).first() + + class PrinterDashboardView(LoginRequiredMixin, TemplateView): template_name = "bambu_run/printer_dashboard.html" @@ -38,13 +50,20 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE + all_printers = Printer.objects.filter(is_active=True) + context["all_printers"] = all_printers + # Shown even with a single printer — hints that multi-printer support exists. + context["show_printer_switcher"] = all_printers.exists() + try: - printer_device = Printer.objects.filter(is_active=True).first() + printer_device = resolve_printer_from_request(self.kwargs.get("pk")) if not printer_device: context["error"] = ( "No 3D printer device found. Please run bambu_collector first." ) return context + except Http404: + raise except Exception as e: context["error"] = f"Error loading printer device: {str(e)}" return context @@ -129,6 +148,8 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView): 'brand': snapshot.sub_type or 'Unknown', 'color': snapshot.color or 'FFFFFFFF', 'remain_percent': snapshot.remain_percent or 0, + 'ams_unit_id': snapshot.ams_unit_id, + 'ams_type': snapshot.ams_type or '', } if snapshot.filament: filament_dict['color_name'] = snapshot.filament.color @@ -138,6 +159,37 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView): except Exception: filaments_list = [] + # Distinct AMS units represented in this snapshot, for the unit + # filter/badges in the template. Sort numeric unit ids first + # (AMS / AMS 2 Pro), HT (id 128 / bit 0x80 set) last. + seen_units = {} + for f in filaments_list: + uid = f.get('ams_unit_id') + if uid is not None and uid not in seen_units: + seen_units[uid] = f.get('ams_type') or '' + ams_units_list = [ + {'ams_unit_id': uid, 'ams_type': label} + for uid, label in sorted(seen_units.items()) + ] + + # Group trays by physical AMS unit for the panel-style dashboard layout — + # one tinted panel per unit, full-width for multi-slot units (AMS/AMS 2 Pro), + # compact for single-slot units (AMS HT) so several can flow side-by-side. + units_meta = { + u.get('unit_id'): u for u in (latest_metric.ams_units or []) + } + ams_groups = [] + for uid, label in sorted(seen_units.items()): + unit_meta = units_meta.get(str(uid), {}) + ams_groups.append({ + 'unit_id': uid, + 'ams_type': label, + 'label': f"{label or 'AMS'} (Unit {uid})", + 'humidity': unit_meta.get('humidity'), + 'temp': unit_meta.get('temp'), + 'filaments': [f for f in filaments_list if f.get('ams_unit_id') == uid], + }) + subtask_name = latest_metric.subtask_name or "No active print" # Look up active PrintJob for a better display name (cloud design_title) job_display_name = subtask_name @@ -177,6 +229,21 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView): "ams_temp": float(latest_metric.ams_temp) if latest_metric.ams_temp else None, "ams_humidity": latest_metric.ams_humidity, "filaments": filaments_list, + "ams_units": ams_units_list, + "ams_groups": ams_groups, + "hotends": list( + Hotend.objects.filter(printer=printer_device) + .order_by('-is_toolhead', 'slot_number', 'serial_number') + ), + # Nozzle positions with no induction chip (no stable serial number to + # key a Hotend registry row on, e.g. H2C's fixed left nozzle) — shown + # read-only from the latest poll, not persisted/historical. Entries with + # no readable type/diameter at all (i.e. genuinely nothing there) are + # dropped rather than shown as an empty placeholder. + "nozzle_positions": [ + h for h in (latest_metric.nozzle_info or []) + if h.get('is_empty') and (h.get('nozzle_type') or h.get('diameter')) + ], "external_spool": latest_metric.external_spool or {}, "timestamp": latest_metric.timestamp.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S"), } @@ -259,15 +326,19 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView): for snapshot in snapshots: tray_id = snapshot.tray_id + ams_unit_id = snapshot.ams_unit_id + ams_type = snapshot.ams_type or '' fil_type = snapshot.type or 'Unknown' fil_sub_type = snapshot.sub_type or 'Unknown' fil_color = snapshot.color or 'FFFFFFFF' - unique_key = f"{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}" + unique_key = f"{ams_unit_id}_{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}" if unique_key not in filament_data: filament_data[unique_key] = { 'tray_id': tray_id, + 'ams_unit_id': ams_unit_id, + 'ams_type': ams_type, 'type': fil_type, 'brand': fil_sub_type, 'color': fil_color, @@ -304,16 +375,21 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView): class PrinterDataAPIView(LoginRequiredMixin, View): """API endpoint for dynamic printer chart updates""" - def get(self, request): + def get(self, request, pk=None): start_date = request.GET.get("start_date") end_date = request.GET.get("end_date") start_time = request.GET.get("start_time", "00:00") end_time = request.GET.get("end_time", "23:59") try: - printer_device = Printer.objects.filter(is_active=True).first() - if not printer_device: - return JsonResponse({"error": "No printer device found"}, status=404) + if pk is not None: + printer_device = Printer.objects.filter(pk=pk, is_active=True).first() + if not printer_device: + return JsonResponse({"error": "Printer not found"}, status=404) + else: + printer_device = Printer.objects.filter(is_active=True).first() + if not printer_device: + return JsonResponse({"error": "No printer device found"}, status=404) tz = zoneinfo.ZoneInfo(app_settings.TIMEZONE) diff --git a/pyproject.toml b/pyproject.toml index ccd00c9..a0648d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,3 +62,7 @@ bambu_run = [ "templates/bambu_run/*.html", "static/bambu_run/**/*", ] + +[tool.pytest.ini_options] +DJANGO_SETTINGS_MODULE = "tests.settings" +python_files = "test_*.py" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/settings.py b/tests/settings.py new file mode 100644 index 0000000..3855817 --- /dev/null +++ b/tests/settings.py @@ -0,0 +1,40 @@ +"""Minimal Django settings for running bambu_run's pytest suite (in-memory SQLite).""" + +SECRET_KEY = "test-secret-key" + +INSTALLED_APPS = [ + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "bambu_run", +] + +MIDDLEWARE = [ + "django.contrib.sessions.middleware.SessionMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", +] + +TEMPLATES = [ + { + "BACKEND": "django.template.backends.django.DjangoTemplates", + "APP_DIRS": True, + "OPTIONS": { + "context_processors": [ + "django.template.context_processors.request", + "django.contrib.auth.context_processors.auth", + ], + }, + }, +] + +DATABASES = { + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": ":memory:", + } +} + +USE_TZ = True +DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" + +ROOT_URLCONF = "tests.urls" diff --git a/tests/test_ams_type_from_info.py b/tests/test_ams_type_from_info.py new file mode 100644 index 0000000..1b7b8da --- /dev/null +++ b/tests/test_ams_type_from_info.py @@ -0,0 +1,25 @@ +import pytest + +from bambu_run.models import ams_type_from_info + + +@pytest.mark.parametrize( + "info_code,expected", + [ + # Real-world 8-char info codes captured from a live H2C with + # AMS 2 Pro (unit 0), AMS (unit 1), AMS HT (unit 128). + ("10001003", "AMS 2 Pro"), + ("10001001", "AMS"), + ("11002104", "AMS HT"), + # Bare 4-digit codes (original assumption) still resolve. + ("1001", "AMS"), + ("1003", "AMS 2 Pro"), + ("2104", "AMS HT"), + # Unknown/missing codes resolve to empty string, not an error. + ("99999999", ""), + ("", ""), + (None, ""), + ], +) +def test_ams_type_from_info(info_code, expected): + assert ams_type_from_info(info_code) == expected diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py new file mode 100644 index 0000000..81bdc9f --- /dev/null +++ b/tests/test_diagnostics.py @@ -0,0 +1,68 @@ +import pytest + +from bambu_run.diagnostics import redact_diagnostics, build_diagnostics_report + + +def test_redacts_password_and_token_like_keys(): + data = {"BAMBU_PASSWORD": "hunter2", "access_token": "abc123", "ok": "fine"} + + redacted = redact_diagnostics(data) + + assert redacted["BAMBU_PASSWORD"] == "***REDACTED***" + assert redacted["access_token"] == "***REDACTED***" + assert redacted["ok"] == "fine" + + +def test_masks_known_identifier_keys_partially(): + data = {"dev_id": "31B8BP592601478", "tray_uuid": "EE37828FA8844DE1AB12"} + + redacted = redact_diagnostics(data) + + assert redacted["dev_id"] == "31B8...1478" + assert redacted["tray_uuid"] == "EE37...AB12" + + +def test_short_identifier_values_fully_masked(): + data = {"dev_id": "short"} + + redacted = redact_diagnostics(data) + + assert redacted["dev_id"] == "***" + + +def test_recurses_into_nested_structures(): + data = {"devices": [{"dev_id": "31B8BP592601478", "name": "RNL-H2C"}]} + + redacted = redact_diagnostics(data) + + assert redacted["devices"][0]["dev_id"] == "31B8...1478" + assert redacted["devices"][0]["name"] == "RNL-H2C" + + +def test_no_redact_passthrough_keeps_original_values(): + data = {"dev_id": "31B8BP592601478", "BAMBU_PASSWORD": "hunter2"} + + result = redact_diagnostics(data, redact=False) + + assert result == data + + +def test_build_diagnostics_report_structure(): + devices = [{"dev_id": "SERIAL-A", "name": "Printer A", "dev_product_name": "H2C"}] + raw_payloads = {"SERIAL-A": {"device": {"extruder": {"info": []}}}} + + report = build_diagnostics_report(devices, raw_payloads) + + assert report["device_count"] == 1 + assert "generated_at" in report + assert report["devices"][0]["device_info"]["dev_id"] == "SERIAL-A" + assert report["devices"][0]["raw_mqtt_payload"] == {"device": {"extruder": {"info": []}}} + + +def test_build_diagnostics_report_handles_missing_payload(): + devices = [{"dev_id": "SERIAL-A", "name": "Printer A"}] + + report = build_diagnostics_report(devices, raw_payloads={}) + + assert report["devices"][0]["raw_mqtt_payload"] is None + assert report["devices"][0]["note"] == "No MQTT data received within the listen window." diff --git a/tests/test_filament_context.py b/tests/test_filament_context.py new file mode 100644 index 0000000..c4f9cd0 --- /dev/null +++ b/tests/test_filament_context.py @@ -0,0 +1,180 @@ +import pytest +from decimal import Decimal +from django.urls import reverse +from django.utils import timezone + +from bambu_run.models import Printer, PrinterMetrics, FilamentSnapshot + + +@pytest.fixture +def logged_in_client(client, django_user_model): + user = django_user_model.objects.create_user(username="tester", password="pw") + client.force_login(user) + return client + + +@pytest.mark.django_db +def test_dashboard_filaments_carry_ams_unit_info(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now()) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS", + type="PLA", remain_percent=80, + ) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT", + type="PA-CF", remain_percent=50, + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + filaments = resp.context["stats"]["filaments"] + assert len(filaments) == 2 + units = {(f["ams_unit_id"], f["ams_type"]) for f in filaments} + assert units == {(0, "AMS"), (128, "AMS HT")} + + ams_units = resp.context["stats"]["ams_units"] + assert ams_units == [ + {"ams_unit_id": 0, "ams_type": "AMS"}, + {"ams_unit_id": 128, "ams_type": "AMS HT"}, + ] + + +@pytest.mark.django_db +def test_filament_timeline_keeps_same_tray_id_units_separate(logged_in_client): + from bambu_run.views import PrinterDashboardView + + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now()) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS", + type="PLA", sub_type="PLA Basic", color="FF0000", remain_percent=80, + ) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT", + type="PLA", sub_type="PLA Basic", color="FF0000", remain_percent=50, + ) + + view = PrinterDashboardView() + timeline = view._prepare_filament_timeline(PrinterMetrics.objects.filter(pk=metric.pk)) + + assert len(timeline) == 2 + + +@pytest.mark.django_db +def test_dashboard_renders_unit_pills_and_badges_with_multiple_units(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now()) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS", + type="PLA", color="FF0000FF", remain_percent=80, + ) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT", + type="PA-CF", color="00FF00FF", remain_percent=50, + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + assert resp.status_code == 200 + html = resp.content.decode() + assert "ams-filter-pills" in html + assert "ams-badge-ams" in html + assert "ams-badge-ams-ht" in html + assert 'data-ams-unit-id="0"' in html + assert 'data-ams-unit-id="128"' in html + + +@pytest.mark.django_db +def test_dashboard_groups_filaments_by_ams_unit(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + metric = PrinterMetrics.objects.create( + device=printer, timestamp=timezone.now(), + ams_units=[ + {"unit_id": "0", "ams_type": "AMS 2 Pro", "humidity": 5, "temp": 22.5}, + {"unit_id": "128", "ams_type": "AMS HT", "humidity": 8, "temp": 60.0}, + ], + ) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS 2 Pro", + type="ABS", remain_percent=80, + ) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=1, ams_unit_id=0, ams_type="AMS 2 Pro", + type="ABS", remain_percent=60, + ) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT", + type="PA-CF", remain_percent=50, + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + groups = resp.context["stats"]["ams_groups"] + assert len(groups) == 2 + + ams2pro_group, ht_group = groups + assert ams2pro_group["unit_id"] == 0 + assert ams2pro_group["label"] == "AMS 2 Pro (Unit 0)" + assert ams2pro_group["humidity"] == 5 + assert ams2pro_group["temp"] == 22.5 + assert len(ams2pro_group["filaments"]) == 2 + + assert ht_group["unit_id"] == 128 + assert ht_group["label"] == "AMS HT (Unit 128)" + assert ht_group["humidity"] == 8 + assert len(ht_group["filaments"]) == 1 + + +@pytest.mark.django_db +def test_dashboard_renders_wide_and_compact_panels(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + metric = PrinterMetrics.objects.create( + device=printer, timestamp=timezone.now(), + ams_units=[ + {"unit_id": "0", "ams_type": "AMS 2 Pro", "humidity": 5, "temp": 22.5}, + {"unit_id": "128", "ams_type": "AMS HT", "humidity": 8, "temp": 60.0}, + ], + ) + for tray_id in range(4): + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=tray_id, ams_unit_id=0, ams_type="AMS 2 Pro", + type="ABS", remain_percent=80, + ) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT", + type="PA-CF", remain_percent=50, + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + html = resp.content.decode() + assert "ams-group--wide" in html + assert "ams-group--compact" in html + assert "AMS 2 Pro (Unit 0)" in html + assert "AMS HT (Unit 128)" in html + + +@pytest.mark.django_db +def test_dashboard_hides_unit_pills_with_single_unit(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now()) + FilamentSnapshot.objects.create( + printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS", + type="PLA", color="FF0000FF", remain_percent=80, + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + assert resp.status_code == 200 + assert "ams-filter-pills" not in resp.content.decode() diff --git a/tests/test_hotend_collection.py b/tests/test_hotend_collection.py new file mode 100644 index 0000000..28ffda5 --- /dev/null +++ b/tests/test_hotend_collection.py @@ -0,0 +1,121 @@ +import pytest + +from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device +from bambu_run.models import Hotend, HotendSnapshot, PrinterMetrics + + +class FakeClient: + """Stub in place of BambuPrinter — returns canned snapshots, no real MQTT.""" + + def __init__(self, snapshots): + self._snapshots = snapshots + self._index = 0 + self._client = None + + def get_snapshot(self): + snap = self._snapshots[min(self._index, len(self._snapshots) - 1)] + self._index += 1 + return snap + + +def make_session(device_id, name, snapshots): + printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"}) + return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer) + + +def hotends_snapshot(used_time=11472, wear=100.0): + return { + "gcode_state": "IDLE", + "hotends": [ + { + "raw_id": 21, "serial_number": "20D06A5B2918952", "nozzle_type": "HS01", + "diameter": 0.4, "fila_id": "GFA01", "color": "FFFFFF", + "used_time_seconds": used_time, "wear_percent": wear, "stat": 0, + "is_toolhead": False, "is_empty": False, "slot_number": 6, + }, + { + "raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01", + "diameter": 0.4, "fila_id": "", "color": None, + "used_time_seconds": 0, "wear_percent": 0.0, "stat": 0, + "is_toolhead": False, "is_empty": True, "slot_number": None, + }, + { + "raw_id": 0, "serial_number": "20D06A5C0426280", "nozzle_type": "HS01", + "diameter": 0.4, "fila_id": "GFA00", "color": "FEC600", + "used_time_seconds": 93490, "wear_percent": 100.0, "stat": 0, + "is_toolhead": True, "is_empty": False, "slot_number": None, + }, + ], + } + + +@pytest.mark.django_db +def test_first_poll_creates_one_hotend_per_non_empty_entry(): + session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()]) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session) + + hotends = Hotend.objects.filter(printer=session.printer) + assert hotends.count() == 2 # empty bay (sn="N/A") skipped + + rack = hotends.get(serial_number="20D06A5B2918952") + assert rack.raw_id == 21 + assert rack.slot_number == 6 + assert rack.is_toolhead is False + assert rack.used_time_seconds == 11472 + assert rack.wear_percent == 100.0 + assert rack.nozzle_type == "HS01" + assert rack.last_filament_profile_id == "GFA01" + assert rack.last_color == "FFFFFF" + + toolhead = hotends.get(serial_number="20D06A5C0426280") + assert toolhead.is_toolhead is True + assert toolhead.slot_number is None + + +@pytest.mark.django_db +def test_first_poll_creates_one_snapshot_per_non_empty_hotend(): + session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()]) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session) + + metric = PrinterMetrics.objects.get(device=session.printer) + assert HotendSnapshot.objects.filter(printer_metric=metric).count() == 2 + + +@pytest.mark.django_db +def test_collector_persists_raw_nozzle_info_including_non_inductive_entries(): + session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()]) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session) + + metric = PrinterMetrics.objects.get(device=session.printer) + assert len(metric.nozzle_info) == 3 # all entries, including the empty/non-inductive one + serials = {h["serial_number"] for h in metric.nozzle_info} + assert serials == {"20D06A5B2918952", "N/A", "20D06A5C0426280"} + + +@pytest.mark.django_db +def test_second_poll_updates_existing_hotend_instead_of_duplicating(): + session = make_session( + "SERIAL-A", "Printer A", + [hotends_snapshot(used_time=11472, wear=100.0), hotends_snapshot(used_time=11500, wear=100.0)], + ) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session) + cmd._collect_printer_data(session) + + hotends = Hotend.objects.filter(printer=session.printer, serial_number="20D06A5B2918952") + assert hotends.count() == 1 + assert hotends.first().used_time_seconds == 11500 + + snapshots = HotendSnapshot.objects.filter(hotend=hotends.first()) + assert snapshots.count() == 2 diff --git a/tests/test_hotend_dashboard.py b/tests/test_hotend_dashboard.py new file mode 100644 index 0000000..3043721 --- /dev/null +++ b/tests/test_hotend_dashboard.py @@ -0,0 +1,128 @@ +import pytest +from django.urls import reverse +from django.utils import timezone + +from bambu_run.models import Printer, PrinterMetrics, Hotend + + +@pytest.fixture +def logged_in_client(client, django_user_model): + user = django_user_model.objects.create_user(username="tester", password="pw") + client.force_login(user) + return client + + +@pytest.mark.django_db +def test_dashboard_context_includes_hotends_toolhead_first(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + PrinterMetrics.objects.create(device=printer, timestamp=timezone.now()) + + Hotend.objects.create( + printer=printer, serial_number="RACK-SN", raw_id=16, slot_number=1, + is_toolhead=False, nozzle_type="HS01", used_time_seconds=3600, wear_percent=50, + ) + Hotend.objects.create( + printer=printer, serial_number="TOOLHEAD-SN", raw_id=0, slot_number=None, + is_toolhead=True, nozzle_type="HS01", used_time_seconds=7200, wear_percent=80, + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + hotends = resp.context["stats"]["hotends"] + assert len(hotends) == 2 + assert hotends[0].serial_number == "TOOLHEAD-SN" + assert hotends[1].serial_number == "RACK-SN" + + +@pytest.mark.django_db +def test_dashboard_context_includes_non_inductive_nozzle_positions(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + PrinterMetrics.objects.create( + device=printer, timestamp=timezone.now(), + nozzle_info=[ + { + "raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01", "diameter": 0.4, + "fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0, + "stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None, + }, + ], + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + positions = resp.context["stats"]["nozzle_positions"] + assert len(positions) == 1 + assert positions[0]["nozzle_type"] == "HS01" + + +@pytest.mark.django_db +def test_dashboard_omits_nozzle_positions_with_no_readable_data(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + PrinterMetrics.objects.create( + device=printer, timestamp=timezone.now(), + nozzle_info=[ + { + "raw_id": 1, "serial_number": "N/A", "nozzle_type": "", "diameter": 0, + "fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0, + "stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None, + }, + ], + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + assert resp.context["stats"]["nozzle_positions"] == [] + assert "
Hotends
" not in resp.content.decode() + + +@pytest.mark.django_db +def test_dashboard_renders_nozzle_position_without_serial_or_wear(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + PrinterMetrics.objects.create( + device=printer, timestamp=timezone.now(), + nozzle_info=[ + { + "raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01", "diameter": 0.4, + "fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0, + "stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None, + }, + ], + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + html = resp.content.decode() + assert "Hotends" in html + assert "HS01" in html + assert "SN: N/A" not in html + assert "SN N/A" not in html + + +@pytest.mark.django_db +def test_dashboard_renders_hotends_card(logged_in_client): + printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + PrinterMetrics.objects.create(device=printer, timestamp=timezone.now()) + + Hotend.objects.create( + printer=printer, serial_number="RACK-SN", raw_id=18, slot_number=3, + is_toolhead=False, nozzle_type="HS01", diameter=0.4, + used_time_seconds=3661, wear_percent=50, last_filament_profile_id="GFA01", + last_color="DE4343", + ) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk}) + ) + + html = resp.content.decode() + assert "Hotends" in html + assert "RACK-SN" in html + assert "Slot 3" in html diff --git a/tests/test_hotend_parsing.py b/tests/test_hotend_parsing.py new file mode 100644 index 0000000..e837385 --- /dev/null +++ b/tests/test_hotend_parsing.py @@ -0,0 +1,100 @@ +from bambu_run.mqtt_client import PrinterState + + +def real_nozzle_payload(): + """Real captured device.nozzle payload from a live H2C with a Vortek rack + (1x AMS, 1x AMS 2 Pro, 1x AMS HT physically connected — unrelated here). + SN/used-time cross-checked against the user's Bambu Studio Hotends Info table.""" + return { + "exist": 3997699, + "src_id": 17, + "tar_id": 17, + "state": 0, + "info": [ + {"id": 21, "sn": "20D06A5B2918952", "type": "HS01", "diameter": 0.4, + "fila_id": "GFA01", "color_m": "FFFFFFFF", "p_t": 11472, "wear": 128.0, "stat": 0, "tm": 350}, + {"id": 1, "sn": "N/A", "type": "HS01", "diameter": 0.4, + "fila_id": "", "color_m": "00000000", "p_t": 0, "wear": 0.0, "stat": 0, "tm": 0}, + {"id": 16, "sn": "20D06A5B2919219", "type": "HS01", "diameter": 0.4, + "fila_id": "GFA01", "color_m": "A3D8E1FF", "p_t": 105386, "wear": 128.0, "stat": 0, "tm": 350}, + {"id": 20, "sn": "20D06A590610257", "type": "HS01", "diameter": 0.4, + "fila_id": "GFG01", "color_m": "00000000", "p_t": 81506, "wear": 128.0, "stat": 0, "tm": 350}, + {"id": 18, "sn": "20D06A591506263", "type": "HS01", "diameter": 0.4, + "fila_id": "GFA01", "color_m": "DE4343FF", "p_t": 30962, "wear": 128.0, "stat": 0, "tm": 350}, + {"id": 0, "sn": "20D06A5C0426280", "type": "HS01", "diameter": 0.4, + "fila_id": "GFA00", "color_m": "FEC600FF", "p_t": 93490, "wear": 128.0, "stat": 0, "tm": 350}, + {"id": 19, "sn": "20D06A5C0207881", "type": "HS01", "diameter": 0.4, + "fila_id": "GFA01", "color_m": "DE4343FF", "p_t": 1430, "wear": 128.0, "stat": 0, "tm": 350}, + ], + } + + +def make_data(nozzle_payload): + return {"print": {"gcode_state": "IDLE", "device": {"nozzle": nozzle_payload}}} + + +def test_snapshot_includes_one_hotend_per_nozzle_info_entry(): + state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload())) + snapshot = state.get_snapshot() + + assert len(snapshot["hotends"]) == 7 + + +def test_hotend_fields_extracted_correctly(): + state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload())) + snapshot = state.get_snapshot() + + by_sn = {h["serial_number"]: h for h in snapshot["hotends"]} + h = by_sn["20D06A5B2919219"] + + assert h["raw_id"] == 16 + assert h["nozzle_type"] == "HS01" + assert h["diameter"] == 0.4 + assert h["fila_id"] == "GFA01" + assert h["color"] == "A3D8E1" # alpha stripped + assert h["used_time_seconds"] == 105386 + assert h["wear_percent"] == 100.0 # 128/128*100 + assert h["is_empty"] is False + + +def test_id_zero_is_toolhead_and_resolves_slot_number(): + state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload())) + snapshot = state.get_snapshot() + + by_sn = {h["serial_number"]: h for h in snapshot["hotends"]} + toolhead = by_sn["20D06A5C0426280"] + + assert toolhead["raw_id"] == 0 + assert toolhead["is_toolhead"] is True + assert toolhead["slot_number"] is None # true bay address hidden while id==0 sentinel + + +def test_rack_bay_ids_resolve_to_slot_numbers_one_through_six(): + state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload())) + snapshot = state.get_snapshot() + + by_sn = {h["serial_number"]: h for h in snapshot["hotends"]} + + assert by_sn["20D06A5B2919219"]["slot_number"] == 1 # raw_id 16 + assert by_sn["20D06A591506263"]["slot_number"] == 3 # raw_id 18 + assert by_sn["20D06A5C0207881"]["slot_number"] == 4 # raw_id 19 + assert by_sn["20D06A590610257"]["slot_number"] == 5 # raw_id 20 + assert by_sn["20D06A5B2918952"]["slot_number"] == 6 # raw_id 21 + + +def test_empty_bay_with_na_serial_is_flagged_empty(): + state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload())) + snapshot = state.get_snapshot() + + by_sn = {h["serial_number"]: h for h in snapshot["hotends"]} + empty = by_sn["N/A"] + + assert empty["is_empty"] is True + assert empty["is_toolhead"] is False + + +def test_snapshot_hotends_empty_list_when_no_nozzle_payload(): + state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}}) + snapshot = state.get_snapshot() + + assert snapshot["hotends"] == [] diff --git a/tests/test_multi_ams_collection.py b/tests/test_multi_ams_collection.py new file mode 100644 index 0000000..df366ae --- /dev/null +++ b/tests/test_multi_ams_collection.py @@ -0,0 +1,104 @@ +import pytest + +from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device +from bambu_run.models import Filament, FilamentSnapshot, FilamentUsage, PrinterMetrics + + +class FakeClient: + """Stub in place of BambuPrinter — returns canned snapshots, no real MQTT.""" + + def __init__(self, snapshots): + self._snapshots = snapshots + self._index = 0 + self._client = None + + def get_snapshot(self): + snap = self._snapshots[min(self._index, len(self._snapshots) - 1)] + self._index += 1 + return snap + + +def make_session(device_id, name, snapshots): + printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"}) + return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer) + + +def two_unit_tray0_snapshot(): + """Two AMS units (AMS unit_id=0, AMS HT unit_id=128) both report tray_id=0, + with different filament types loaded — these must not collide.""" + return { + "gcode_state": "IDLE", + "ams_units": [ + {"unit_id": "0", "ams_type": "AMS", "humidity": 30, "temp": 25.0}, + {"unit_id": "128", "ams_type": "AMS HT", "humidity": 20, "temp": 60.0}, + ], + "filaments": [ + { + "tray_id": 0, "type": "PLA", "sub_type": "PLA Basic", "color": "FF0000FF", + "tray_uuid": "UUID-UNIT0-TRAY0", + "remain_percent": 80, "ams_unit_id": 0, "ams_type": "AMS", + }, + { + "tray_id": 0, "type": "PA-CF", "sub_type": "PA6-CF", "color": "00FF00FF", + "tray_uuid": "UUID-UNIT128-TRAY0", + "remain_percent": 50, "ams_unit_id": 128, "ams_type": "AMS HT", + }, + ], + } + + +@pytest.mark.django_db +def test_two_ams_units_with_same_tray_id_create_distinct_snapshots(): + session = make_session("SERIAL-A", "Printer A", [two_unit_tray0_snapshot()]) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session) + + metric = PrinterMetrics.objects.get(device=session.printer) + snapshots = FilamentSnapshot.objects.filter(printer_metric=metric).order_by("ams_unit_id") + + assert snapshots.count() == 2 + + ams_snap, ht_snap = snapshots + assert ams_snap.tray_id == 0 + assert ams_snap.ams_unit_id == 0 + assert ams_snap.ams_type == "AMS" + assert ams_snap.type == "PLA" + + assert ht_snap.tray_id == 0 + assert ht_snap.ams_unit_id == 128 + assert ht_snap.ams_type == "AMS HT" + assert ht_snap.type == "PA-CF" + + +@pytest.mark.django_db +def test_filament_usage_matches_correct_unit_when_tray_ids_collide(): + start_snapshot = two_unit_tray0_snapshot() + start_snapshot.update({"gcode_state": "RUNNING", "subtask_name": "job_1", "print_percent": 1, "tray_now": "0"}) + + end_snapshot = two_unit_tray0_snapshot() + end_snapshot["filaments"][0]["remain_percent"] = 70 # AMS unit 0 consumed + end_snapshot["filaments"][1]["remain_percent"] = 50 # AMS HT unit 128 untouched + end_snapshot.update({"gcode_state": "FINISH", "subtask_name": "job_1", "print_percent": 100}) + + session = make_session("SERIAL-A", "Printer A", [start_snapshot, end_snapshot]) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session) + cmd._collect_printer_data(session) + + usages = FilamentUsage.objects.filter(print_job__device=session.printer).order_by("ams_unit_id") + # Both units reported tray_id=0 with a tracked filament loaded throughout the + # job — usage is recorded per physical unit, not collapsed into one ambiguous row. + assert usages.count() == 2 + + ams_usage, ht_usage = usages + assert ams_usage.ams_unit_id == 0 + assert ams_usage.starting_percent == 80 + assert ams_usage.ending_percent == 70 + + assert ht_usage.ams_unit_id == 128 + assert ht_usage.starting_percent == 50 + assert ht_usage.ending_percent == 50 diff --git a/tests/test_multi_device_collection.py b/tests/test_multi_device_collection.py new file mode 100644 index 0000000..9118984 --- /dev/null +++ b/tests/test_multi_device_collection.py @@ -0,0 +1,90 @@ +import pytest + +from bambu_run.management.commands.bambu_collector import ( + Command, + DeviceSession, + resolve_printer_device, +) +from bambu_run.models import PrintJob, PrinterMetrics + + +class FakeClient: + """Stub in place of BambuPrinter — returns canned snapshots, no real MQTT.""" + + def __init__(self, snapshots): + self._snapshots = snapshots + self._index = 0 + self._client = None # cloud BambuClient handle used by cloud task sync + + def get_snapshot(self): + snap = self._snapshots[min(self._index, len(self._snapshots) - 1)] + self._index += 1 + return snap + + +def make_session(device_id, name, snapshots): + printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"}) + return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer) + + +@pytest.mark.django_db +def test_collects_metrics_against_the_correct_printer_per_session(): + session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}]) + session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}]) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session_a) + cmd._collect_printer_data(session_b) + + metric_a = PrinterMetrics.objects.get(device=session_a.printer) + metric_b = PrinterMetrics.objects.get(device=session_b.printer) + assert metric_a.nozzle_temp == 200 + assert metric_b.nozzle_temp == 210 + + +@pytest.mark.django_db +def test_print_job_tracking_is_isolated_per_session(): + session_a = make_session( + "SERIAL-A", + "Printer A", + [ + {"gcode_state": "RUNNING", "subtask_name": "job_A", "print_percent": 10}, + {"gcode_state": "FINISH", "subtask_name": "job_A", "print_percent": 100}, + ], + ) + session_b = make_session("SERIAL-B", "Printer B", [{"gcode_state": "IDLE"}]) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session_a) + cmd._collect_printer_data(session_b) + cmd._collect_printer_data(session_a) + + assert PrintJob.objects.filter(device=session_a.printer).count() == 1 + job = PrintJob.objects.get(device=session_a.printer) + assert job.final_status == "FINISH" + assert session_a.current_print_job is None + + assert PrintJob.objects.filter(device=session_b.printer).count() == 0 + assert session_b.current_print_job is None + + +@pytest.mark.django_db +def test_one_session_error_does_not_affect_another_session(): + session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}]) + session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}]) + + class ExplodingClient: + def get_snapshot(self): + raise RuntimeError("MQTT connection lost") + + session_a.client = ExplodingClient() + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session_a) + cmd._collect_printer_data(session_b) + + assert session_a.error_count == 1 + assert PrinterMetrics.objects.filter(device=session_b.printer).exists() diff --git a/tests/test_printer_routing.py b/tests/test_printer_routing.py new file mode 100644 index 0000000..b762675 --- /dev/null +++ b/tests/test_printer_routing.py @@ -0,0 +1,78 @@ +import pytest +from django.urls import reverse + +from bambu_run.models import Printer + + +@pytest.fixture +def logged_in_client(client, django_user_model): + user = django_user_model.objects.create_user(username="tester", password="pw") + client.force_login(user) + return client + + +@pytest.mark.django_db +def test_dashboard_with_no_printers_shows_error(logged_in_client): + resp = logged_in_client.get(reverse("bambu_run:printer_dashboard")) + assert resp.status_code == 200 + assert "error" in resp.context + + +@pytest.mark.django_db +def test_dashboard_defaults_to_first_active_printer(logged_in_client): + printer = Printer.objects.create(name="Only Printer", model="H2C", is_active=True) + + resp = logged_in_client.get(reverse("bambu_run:printer_dashboard")) + + assert resp.context["printer_device"].pk == printer.pk + assert list(resp.context["all_printers"]) == [printer] + # Switcher shows even with a single printer, as a hint that multi-printer exists. + assert resp.context["show_printer_switcher"] is True + + +@pytest.mark.django_db +def test_dashboard_hides_switcher_with_zero_printers(logged_in_client): + resp = logged_in_client.get(reverse("bambu_run:printer_dashboard")) + + assert resp.context["show_printer_switcher"] is False + + +@pytest.mark.django_db +def test_dashboard_pk_route_shows_requested_printer(logged_in_client): + Printer.objects.create(name="Printer A", model="H2C", is_active=True) + printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True) + + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": printer_b.pk}) + ) + + assert resp.context["printer_device"].pk == printer_b.pk + assert resp.context["device_name"] == "Printer B" + + +@pytest.mark.django_db +def test_dashboard_unknown_pk_returns_404(logged_in_client): + resp = logged_in_client.get( + reverse("bambu_run:printer_dashboard", kwargs={"pk": 99999}) + ) + assert resp.status_code == 404 + + +@pytest.mark.django_db +def test_api_pk_route_returns_only_requested_printer_data(logged_in_client): + from bambu_run.models import PrinterMetrics + from django.utils import timezone + from decimal import Decimal + + printer_a = Printer.objects.create(name="Printer A", model="H2C", is_active=True) + printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True) + PrinterMetrics.objects.create(device=printer_a, timestamp=timezone.now(), nozzle_temp=Decimal("200")) + PrinterMetrics.objects.create(device=printer_b, timestamp=timezone.now(), nozzle_temp=Decimal("210")) + + resp = logged_in_client.get( + reverse("bambu_run:printer_api", kwargs={"pk": printer_b.pk}) + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["nozzle_temp"] == [210.0] diff --git a/tests/test_resolve_printer_device.py b/tests/test_resolve_printer_device.py new file mode 100644 index 0000000..8fd5651 --- /dev/null +++ b/tests/test_resolve_printer_device.py @@ -0,0 +1,78 @@ +import pytest + +from bambu_run.management.commands.bambu_collector import resolve_printer_device +from bambu_run.models import Printer + + +@pytest.mark.django_db +def test_creates_new_printer_keyed_by_serial(): + printer = resolve_printer_device( + "0309DA123456", {"name": "RNL-H2C", "dev_product_name": "H2C"} + ) + + assert printer.serial_number == "0309DA123456" + assert printer.name == "RNL-H2C" + assert printer.model == "H2C" + assert printer.is_active is True + + +@pytest.mark.django_db +def test_second_call_with_same_serial_does_not_create_duplicate(): + first = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"}) + second = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"}) + + assert first.pk == second.pk + assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1 + + +@pytest.mark.django_db +def test_two_different_serials_create_two_printers(): + a = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"}) + b = resolve_printer_device("SERIAL-B", {"name": "Printer B", "dev_product_name": "X1C"}) + + assert a.pk != b.pk + assert Printer.objects.count() == 2 + + +@pytest.mark.django_db +def test_backfills_single_legacy_printer_with_null_serial(): + legacy = Printer.objects.create( + name="Bambu Lab Printer", model="Bambu Lab", manufacturer="Bambu Lab", is_active=True + ) + + resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"}) + + legacy.refresh_from_db() + assert resolved.pk == legacy.pk + assert legacy.serial_number == "SERIAL-A" + assert Printer.objects.count() == 1 + + +@pytest.mark.django_db +def test_does_not_guess_when_multiple_legacy_printers_exist(): + Printer.objects.create(name="Legacy 1", model="Bambu Lab") + Printer.objects.create(name="Legacy 2", model="Bambu Lab") + + resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"}) + + assert resolved.serial_number == "SERIAL-A" + assert Printer.objects.count() == 3 + + +@pytest.mark.django_db +def test_falls_back_to_generic_defaults_without_device_info(): + printer = resolve_printer_device("SERIAL-A", None) + + assert printer.serial_number == "SERIAL-A" + assert printer.name == "Bambu Lab Printer" + assert printer.model == "Bambu Lab" + + +@pytest.mark.django_db +def test_updates_name_and_model_on_existing_printer_when_changed(): + resolve_printer_device("SERIAL-A", {"name": "Old Name", "dev_product_name": "H2C"}) + + updated = resolve_printer_device("SERIAL-A", {"name": "New Name", "dev_product_name": "H2C"}) + + assert updated.name == "New Name" + assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1 diff --git a/tests/test_vortek_groundwork.py b/tests/test_vortek_groundwork.py new file mode 100644 index 0000000..3b2b604 --- /dev/null +++ b/tests/test_vortek_groundwork.py @@ -0,0 +1,43 @@ +import pytest + +from bambu_run.mqtt_client import PrinterState +from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device +from bambu_run.models import PrinterMetrics + + +def test_snapshot_includes_raw_device_payload_for_future_vortek_modeling(): + raw_device = { + "extruder": {"info": [{"id": 0, "temp": 12058720}, {"id": 1, "temp": 11534560}]}, + "nozzle": {"info": [{"id": 0, "diameter": 0.4}]}, + } + data = {"print": {"device": raw_device, "gcode_state": "IDLE"}} + + state = PrinterState.from_mqtt_data(data) + snapshot = state.get_snapshot() + + assert snapshot["vortek_raw"] == raw_device + + +def test_snapshot_vortek_raw_defaults_to_empty_dict_when_no_device_payload(): + state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}}) + snapshot = state.get_snapshot() + + assert snapshot["vortek_raw"] == {} + + +@pytest.mark.django_db +def test_collector_persists_vortek_raw_onto_printer_metrics(): + printer = resolve_printer_device("SERIAL-A", {"name": "H2C", "dev_product_name": "H2C"}) + + class FakeClient: + def get_snapshot(self): + return {"gcode_state": "IDLE", "vortek_raw": {"extruder": {"info": []}}} + + session = DeviceSession(device_id="SERIAL-A", client=FakeClient(), printer=printer) + + cmd = Command() + cmd.verbose = False + cmd._collect_printer_data(session) + + metric = PrinterMetrics.objects.get(device=printer) + assert metric.vortek_raw == {"extruder": {"info": []}} diff --git a/tests/urls.py b/tests/urls.py new file mode 100644 index 0000000..82233a5 --- /dev/null +++ b/tests/urls.py @@ -0,0 +1,5 @@ +from django.urls import include, path + +urlpatterns = [ + path("", include("bambu_run.urls")), +]