4 Commits

30 changed files with 2426 additions and 218 deletions

View File

@@ -1,5 +1,5 @@
from django.contrib import admin from django.contrib import admin
from .models import Printer, PrinterMetrics, Filament, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, BambuCloudTask from .models import Printer, PrinterMetrics, Filament, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, BambuCloudTask, Hotend, HotendSnapshot
@admin.register(Printer) @admin.register(Printer)
@@ -107,6 +107,21 @@ class FilamentUsageAdmin(admin.ModelAdmin):
readonly_fields = ('consumed_percent', 'consumed_grams') readonly_fields = ('consumed_percent', 'consumed_grams')
@admin.register(Hotend)
class HotendAdmin(admin.ModelAdmin):
list_display = ('printer', 'serial_number', 'nozzle_type', 'is_toolhead', 'slot_number', 'used_time_seconds', 'wear_percent', 'last_seen_at')
list_filter = ('printer', 'is_toolhead', 'nozzle_type')
search_fields = ('serial_number',)
readonly_fields = ('last_seen_at', 'created_at')
@admin.register(HotendSnapshot)
class HotendSnapshotAdmin(admin.ModelAdmin):
list_display = ('printer_metric', 'hotend', 'raw_id', 'used_time_seconds', 'wear_percent', 'timestamp')
list_filter = ('hotend',)
readonly_fields = ('printer_metric', 'hotend', 'raw_id', 'used_time_seconds', 'wear_percent', 'stat', 'timestamp')
@admin.register(BambuCloudTask) @admin.register(BambuCloudTask)
class BambuCloudTaskAdmin(admin.ModelAdmin): class BambuCloudTaskAdmin(admin.ModelAdmin):
list_display = ('task_id', 'design_title', 'plate_title', 'device_serial', 'cloud_status', 'weight_grams', 'cloud_start_time', 'synced_at') list_display = ('task_id', 'design_title', 'plate_title', 'device_serial', 'cloud_status', 'weight_grams', 'cloud_start_time', 'synced_at')

78
bambu_run/diagnostics.py Normal file
View File

@@ -0,0 +1,78 @@
"""
Pure helpers for the `bambu_diagnose` management command.
Kept separate from the command itself (and free of Django/network imports)
so the report-building and redaction logic can be unit-tested without
talking to the real Bambu Lab cloud or MQTT broker.
"""
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
# Keys whose values are always replaced outright, regardless of nesting depth.
_SECRET_KEY_SUBSTRINGS = ("password", "token", "secret", "access_code", "authorization")
# Keys that identify a specific physical device/spool/account — not secret,
# but identifying, so they're partially masked by default before anything
# gets pasted into a public GitHub issue.
_IDENTIFIER_KEYS = {"dev_id", "device_id", "serial_number", "tray_uuid", "tag_uid", "uid"}
def _mask_identifier(value: Any) -> Any:
if not isinstance(value, str) or len(value) <= 8:
return "***"
return f"{value[:4]}...{value[-4:]}"
def redact_diagnostics(data: Any, redact: bool = True) -> Any:
"""Recursively redact secrets and mask identifiers in a diagnostics payload.
`redact=False` returns the data unchanged — only for the reporter's own
local debugging, never for anything posted publicly.
"""
if not redact:
return data
return _redact(data)
def _redact(obj: Any) -> Any:
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
lowered = key.lower()
if any(secret in lowered for secret in _SECRET_KEY_SUBSTRINGS):
result[key] = "***REDACTED***"
elif lowered in _IDENTIFIER_KEYS:
result[key] = _mask_identifier(value)
else:
result[key] = _redact(value)
return result
if isinstance(obj, list):
return [_redact(item) for item in obj]
return obj
def build_diagnostics_report(
devices: List[Dict[str, Any]],
raw_payloads: Dict[str, Optional[Dict[str, Any]]],
) -> Dict[str, Any]:
"""Assemble the (pre-redaction) diagnostics report from discovered devices
and whatever raw MQTT payload was captured for each during the listen window.
"""
device_entries = []
for device in devices:
dev_id = device.get("dev_id")
payload = raw_payloads.get(dev_id)
entry = {
"device_info": device,
"raw_mqtt_payload": payload,
}
if payload is None:
entry["note"] = "No MQTT data received within the listen window."
device_entries.append(entry)
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"device_count": len(devices),
"devices": device_entries,
}

View File

@@ -13,8 +13,9 @@ import logging
import os import os
import ssl import ssl
import time import time
from dataclasses import dataclass, field
from decimal import Decimal from decimal import Decimal
from typing import Optional from typing import Any, Dict, Optional
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.db import transaction from django.db import transaction
@@ -26,6 +27,56 @@ from bambu_run.models import Printer, PrinterMetrics
logger = logging.getLogger("bambu_run.collector") logger = logging.getLogger("bambu_run.collector")
def resolve_printer_device(device_id: str, device_info: Optional[dict] = None) -> Printer:
"""Find-or-create the Printer row for a Bambu cloud device, keyed by serial number.
`device_info` is one entry from BambuClient.get_devices() (keys: name,
dev_product_name, dev_id, ...). Falls back to generic defaults when unavailable
(e.g. local-only connections that never call get_devices()).
"""
device_info = device_info or {}
name = device_info.get("name") or "Bambu Lab Printer"
model = device_info.get("dev_product_name") or "Bambu Lab"
printer = Printer.objects.filter(serial_number=device_id).first()
if printer is None:
# Upgrade path: a pre-multi-printer deployment has exactly one Printer row
# with no serial number yet. Backfill it instead of creating a duplicate.
# If there's more than one such row, we can't tell which one this device
# used to be, so don't guess — create a fresh row instead.
legacy_candidates = list(Printer.objects.filter(serial_number__isnull=True)[:2])
if len(legacy_candidates) == 1:
printer = legacy_candidates[0]
printer.serial_number = device_id
if printer is None:
printer = Printer(serial_number=device_id)
printer.name = name
printer.model = model
printer.manufacturer = "Bambu Lab"
printer.is_active = True
printer.save()
return printer
@dataclass
class DeviceSession:
"""Per-printer mutable state for one bound device in a multi-printer collector run."""
device_id: str
client: Any # BambuPrinter
printer: Printer
current_print_job: Optional[Any] = None
last_gcode_state: Optional[str] = None
last_subtask_name: Optional[str] = None
trays_used: set = field(default_factory=set)
error_count: int = 0
success_count: int = 0
mqtt_connect_errors: int = 0
class Command(BaseCommand): class Command(BaseCommand):
""" """
MQTT Poll -> PrinterMetrics -> FilamentSnapshot -> Auto-Match -> Update Filament MQTT Poll -> PrinterMetrics -> FilamentSnapshot -> Auto-Match -> Update Filament
@@ -51,18 +102,11 @@ class Command(BaseCommand):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.printer_client = None self.sessions: Dict[str, DeviceSession] = {}
self.printer_device = None self._token: Optional[str] = None
self.verbose = False self.verbose = False
self.disable_ssl_verify = False self.disable_ssl_verify = False
self.error_count = 0
self.success_count = 0
self.mqtt_connect_errors = 0
self.start_time = None self.start_time = None
self.current_print_job = None
self.last_gcode_state = None
self.last_subtask_name = None
self.trays_used = set()
def handle(self, *args, **options): def handle(self, *args, **options):
self.verbose = options["verbose"] self.verbose = options["verbose"]
@@ -100,12 +144,13 @@ class Command(BaseCommand):
self._configure_logging() self._configure_logging()
try: try:
self._initialize_printer() self._initialize_printers()
except Exception as e: except Exception as e:
raise CommandError(f"Initialization failed: {e}") raise CommandError(f"Initialization failed: {e}")
self.start_time = timezone.now() self.start_time = timezone.now()
logger.info(f"Bambu Run data collector started for printer: {self.printer_device.name}") printer_names = ", ".join(s.printer.name for s in self.sessions.values())
logger.info(f"Bambu Run data collector started for {len(self.sessions)} printer(s): {printer_names}")
logger.info(f"Collection interval: {interval} seconds") logger.info(f"Collection interval: {interval} seconds")
logger.info(f"Mode: {'Single run' if run_once else 'Continuous'}") logger.info(f"Mode: {'Single run' if run_once else 'Continuous'}")
@@ -113,7 +158,8 @@ class Command(BaseCommand):
if run_once: if run_once:
import time as _time import time as _time
_time.sleep(5) _time.sleep(5)
self._collect_printer_data() for session in self.sessions.values():
self._collect_printer_data(session)
logger.info("Single collection completed successfully") logger.info("Single collection completed successfully")
else: else:
self._run_continuous_loop(interval) self._run_continuous_loop(interval)
@@ -124,7 +170,7 @@ class Command(BaseCommand):
logger.exception(f"Fatal error in main loop: {e}") logger.exception(f"Fatal error in main loop: {e}")
raise CommandError(f"Runner failed: {e}") raise CommandError(f"Runner failed: {e}")
def _request_full_status_when_ready(self, timeout: float = 20.0) -> None: def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None:
"""Send pushall once the MQTT broker connection is confirmed. """Send pushall once the MQTT broker connection is confirmed.
BambuPrinter._connected is set True immediately after connect(blocking=False), BambuPrinter._connected is set True immediately after connect(blocking=False),
@@ -134,9 +180,9 @@ class Command(BaseCommand):
import time as _time import time as _time
deadline = _time.time() + timeout deadline = _time.time() + timeout
while _time.time() < deadline: while _time.time() < deadline:
mqtt_client = getattr(self.printer_client, "_mqtt", None) mqtt_client = getattr(client, "_mqtt", None)
if mqtt_client is not None and getattr(mqtt_client, "connected", False): if mqtt_client is not None and getattr(mqtt_client, "connected", False):
self.printer_client._mqtt.request_full_status() client._mqtt.request_full_status()
logger.info("Sent MQTT pushall request") logger.info("Sent MQTT pushall request")
return return
_time.sleep(0.5) _time.sleep(0.5)
@@ -155,7 +201,9 @@ class Command(BaseCommand):
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
def _initialize_printer(self): def _initialize_printers(self):
"""Authenticate once, discover every device bound to the account, and open
one BambuPrinter (own MQTT thread) per device — all in this single process."""
from bambu_run.mqtt_client import BambuPrinter from bambu_run.mqtt_client import BambuPrinter
bambu_username = os.environ.get("BAMBU_USERNAME") bambu_username = os.environ.get("BAMBU_USERNAME")
@@ -169,30 +217,12 @@ class Command(BaseCommand):
"environment variables must be set" "environment variables must be set"
) )
logger.info("Connecting to Bambu Lab printer...") logger.info("Authenticating with Bambu Lab cloud...")
try: try:
if bambu_token: auth = BambuPrinter(
logger.info("Using saved BAMBU_TOKEN for authentication") username=bambu_username, password=bambu_password, token=bambu_token,
self.printer_client = BambuPrinter(
token=bambu_token, device_id=bambu_device_id
) )
else: self._token = auth._ensure_token()
logger.info("Authenticating with username/password")
self.printer_client = BambuPrinter(
username=bambu_username,
password=bambu_password,
device_id=bambu_device_id,
)
logger.info("Initiating MQTT connection...")
self.printer_client.connect(blocking=False)
logger.info("MQTT connection initiated (non-blocking)")
# Request full status so AMS + dual-nozzle data arrive on startup.
try:
self._request_full_status_when_ready()
except Exception as e:
logger.warning("pushall request skipped (non-fatal): %s", e)
except Exception as e: except Exception as e:
if "CERTIFICATE_VERIFY_FAILED" in str(e) or "SSL" in str(e): if "CERTIFICATE_VERIFY_FAILED" in str(e) or "SSL" in str(e):
error_msg = ( error_msg = (
@@ -203,56 +233,62 @@ class Command(BaseCommand):
"3. pip install --upgrade certifi\n" "3. pip install --upgrade certifi\n"
) )
raise CommandError(error_msg) raise CommandError(error_msg)
raise CommandError(f"Failed to initialize printer client: {e}") raise CommandError(f"Failed to authenticate: {e}")
self.printer_device = self._ensure_printer_device_exists() device_infos = self._discover_devices(bambu_device_id)
logger.info(f"Initialized for printer device: {self.printer_device}") for device_id, device_info in device_infos.items():
def _ensure_printer_device_exists(self) -> Printer:
try: try:
snapshot = self.printer_client.get_snapshot() self._add_session(device_id, device_info)
if snapshot:
device, created = Printer.objects.update_or_create(
model="Bambu Lab",
defaults={
"name": "Bambu Lab Printer",
"manufacturer": "Bambu Lab",
"is_active": True,
},
)
action = "Created" if created else "Updated"
logger.info(f"{action} printer device record: {device}")
return device
else:
logger.warning("Snapshot returned None - MQTT not connected yet")
device = Printer.objects.filter(is_active=True).first()
if device:
logger.info(f"Using existing device record: {device}")
return device
else:
device = Printer.objects.create(
name="Bambu Lab Printer",
model="Bambu Lab",
manufacturer="Bambu Lab",
is_active=True,
)
logger.info(f"Created placeholder device: {device}")
return device
except Exception as e: except Exception as e:
logger.error(f"Error during device initialization: {e}") logger.error(f"Failed to initialize printer {device_id}: {e}")
if not self.sessions:
raise CommandError("No printer sessions could be initialized")
def _discover_devices(self, explicit_device_id: Optional[str]) -> Dict[str, dict]:
"""Return {device_id: device_info} for every printer to monitor.
device_info comes from BambuClient.get_devices() (name, dev_product_name,
etc.) — empty dict when explicitly pinned to one device via BAMBU_DEVICE_ID
and the cloud listing can't be reached.
"""
from bambu_run.mqtt_client import BambuClient
try: try:
device = Printer.objects.filter(is_active=True).first() cloud = BambuClient(token=self._token)
if device: devices = cloud.get_devices()
logger.warning(f"Using existing device record from DB: {device}") except Exception as e:
return device if explicit_device_id:
else: logger.warning(f"Could not list account devices ({e}); using BAMBU_DEVICE_ID only")
raise CommandError( return {explicit_device_id: {}}
"No printer device found in database and initialization failed." raise
)
except Printer.DoesNotExist: device_infos = {d.get("dev_id"): d for d in devices if d.get("dev_id")}
raise CommandError("Failed to create or retrieve printer device.")
if explicit_device_id:
return {explicit_device_id: device_infos.get(explicit_device_id, {})}
if not device_infos:
raise CommandError("No devices found on this account")
return device_infos
def _add_session(self, device_id: str, device_info: dict) -> "DeviceSession":
from bambu_run.mqtt_client import BambuPrinter
logger.info(f"Connecting to printer {device_id} ({device_info.get('name', 'unknown')})...")
client = BambuPrinter(token=self._token, device_id=device_id)
client.connect(blocking=False)
try:
self._request_full_status_when_ready(client)
except Exception as e:
logger.warning("pushall request skipped (non-fatal): %s", e)
printer = resolve_printer_device(device_id, device_info)
session = DeviceSession(device_id=device_id, client=client, printer=printer)
self.sessions[device_id] = session
logger.info(f"Initialized session for printer: {printer}")
return session
def _run_continuous_loop(self, interval: int): def _run_continuous_loop(self, interval: int):
iteration = 0 iteration = 0
@@ -263,7 +299,8 @@ class Command(BaseCommand):
if self.verbose: if self.verbose:
logger.debug(f"=== Iteration {iteration} ===") logger.debug(f"=== Iteration {iteration} ===")
self._collect_printer_data() for session in list(self.sessions.values()):
self._collect_printer_data(session)
elapsed = time.time() - loop_start elapsed = time.time() - loop_start
sleep_time = max(0, interval - elapsed) sleep_time = max(0, interval - elapsed)
@@ -273,9 +310,28 @@ class Command(BaseCommand):
if iteration % 100 == 0: if iteration % 100 == 0:
self._print_statistics() self._print_statistics()
self._refresh_devices()
time.sleep(sleep_time) time.sleep(sleep_time)
def _refresh_devices(self):
"""Pick up printers added to the account without restarting the process."""
if os.environ.get("BAMBU_DEVICE_ID"):
return # pinned to a single explicit device — nothing to discover
try:
device_infos = self._discover_devices(None)
except Exception as e:
logger.warning(f"Device refresh skipped (non-fatal): {e}")
return
for device_id, device_info in device_infos.items():
if device_id not in self.sessions:
logger.info(f"New printer detected on account: {device_id}")
try:
self._add_session(device_id, device_info)
except Exception as e:
logger.error(f"Failed to initialize newly-detected printer {device_id}: {e}")
def _convert_mqtt_color(self, mqtt_color): def _convert_mqtt_color(self, mqtt_color):
if not mqtt_color: if not mqtt_color:
return None return None
@@ -498,6 +554,8 @@ class Command(BaseCommand):
printer_metric=printer_metric, printer_metric=printer_metric,
filament=filament, filament=filament,
tray_id=tray_id, tray_id=tray_id,
ams_unit_id=unit_id_int,
ams_type=tray_data.get('ams_type', '') or '',
slot_name=tray_data.get('slot'), slot_name=tray_data.get('slot'),
type=tray_data.get('type'), type=tray_data.get('type'),
sub_type=tray_data.get('sub_type'), sub_type=tray_data.get('sub_type'),
@@ -513,19 +571,51 @@ class Command(BaseCommand):
match_method=match_method match_method=match_method
) )
def _track_print_job(self, metric, snapshot): def _update_hotends(self, printer, printer_metric, hotends_data):
from bambu_run.models import PrintJob, FilamentUsage from bambu_run.models import Hotend, HotendSnapshot
for h in hotends_data:
if h.get("is_empty"):
continue
hotend, _ = Hotend.objects.update_or_create(
printer=printer,
serial_number=h.get("serial_number"),
defaults={
"raw_id": h.get("raw_id", 0),
"nozzle_type": h.get("nozzle_type", ""),
"diameter": self._to_decimal(h.get("diameter")),
"slot_number": h.get("slot_number"),
"is_toolhead": bool(h.get("is_toolhead")),
"last_filament_profile_id": h.get("fila_id", ""),
"last_color": h.get("color") or "",
"used_time_seconds": h.get("used_time_seconds", 0),
"wear_percent": h.get("wear_percent", 0),
},
)
HotendSnapshot.objects.create(
printer_metric=printer_metric,
hotend=hotend,
raw_id=h.get("raw_id", 0),
used_time_seconds=h.get("used_time_seconds", 0),
wear_percent=h.get("wear_percent", 0),
stat=h.get("stat"),
)
def _track_print_job(self, session, metric, snapshot):
from bambu_run.models import PrintJob
gcode_state = snapshot.get('gcode_state') gcode_state = snapshot.get('gcode_state')
subtask_name = snapshot.get('subtask_name') subtask_name = snapshot.get('subtask_name')
if self._is_print_starting(gcode_state, subtask_name): if self._is_print_starting(session, gcode_state, subtask_name):
if self.current_print_job: if session.current_print_job:
self._finalize_print_job(metric, snapshot) self._finalize_print_job(session, metric, snapshot)
raw_task_id = snapshot.get('task_id') raw_task_id = snapshot.get('task_id')
self.current_print_job = PrintJob.objects.create( session.current_print_job = PrintJob.objects.create(
device=self.printer_device, device=session.printer,
project_name=subtask_name, project_name=subtask_name,
gcode_file=snapshot.get('gcode_file'), gcode_file=snapshot.get('gcode_file'),
start_time=metric.timestamp, start_time=metric.timestamp,
@@ -534,109 +624,119 @@ class Command(BaseCommand):
completion_percent=snapshot.get('print_percent', 0), completion_percent=snapshot.get('print_percent', 0),
cloud_task_id_raw=int(raw_task_id) if raw_task_id else None, cloud_task_id_raw=int(raw_task_id) if raw_task_id else None,
) )
self.trays_used = set() session.trays_used = set()
logger.info(f"Print job started: {subtask_name}") logger.info(f"[{session.device_id}] Print job started: {subtask_name}")
if self.current_print_job: if session.current_print_job:
tray_now = snapshot.get('tray_now', '') tray_now = snapshot.get('tray_now', '')
if tray_now not in (None, '', '255'): if tray_now not in (None, '', '255'):
try: try:
tray_id = int(tray_now) tray_id = int(tray_now)
if 0 <= tray_id <= 15: if 0 <= tray_id <= 15:
self.trays_used.add(tray_id) session.trays_used.add(tray_id)
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
if self._is_print_ending(gcode_state) and self.current_print_job: if self._is_print_ending(session, gcode_state) and session.current_print_job:
self._finalize_print_job(metric, snapshot) self._finalize_print_job(session, metric, snapshot)
self.last_gcode_state = gcode_state session.last_gcode_state = gcode_state
self.last_subtask_name = subtask_name session.last_subtask_name = subtask_name
def _is_print_starting(self, gcode_state, subtask_name): def _is_print_starting(self, session, gcode_state, subtask_name):
is_printing = gcode_state not in ['FINISH', 'IDLE', 'FAILED', None, ''] is_printing = gcode_state not in ['FINISH', 'IDLE', 'FAILED', None, '']
has_new_job = subtask_name and subtask_name != self.last_subtask_name has_new_job = subtask_name and subtask_name != session.last_subtask_name
return is_printing and has_new_job return is_printing and has_new_job
def _is_print_ending(self, gcode_state): def _is_print_ending(self, session, gcode_state):
ending_states = ['FINISH', 'FAILED'] ending_states = ['FINISH', 'FAILED']
return gcode_state in ending_states and self.last_gcode_state not in ending_states return gcode_state in ending_states and session.last_gcode_state not in ending_states
def _finalize_print_job(self, metric, snapshot): def _finalize_print_job(self, session, metric, snapshot):
from bambu_run.models import FilamentUsage from bambu_run.models import FilamentUsage
self.current_print_job.end_time = metric.timestamp job = session.current_print_job
self.current_print_job.end_metric = metric job.end_time = metric.timestamp
self.current_print_job.final_status = snapshot.get('gcode_state') job.end_metric = metric
self.current_print_job.completion_percent = snapshot.get('print_percent', 0) job.final_status = snapshot.get('gcode_state')
self.current_print_job.calculate_duration() job.completion_percent = snapshot.get('print_percent', 0)
self.current_print_job.save() job.calculate_duration()
job.save()
try: try:
from bambu_run.bambu_cloud import fetch_and_upsert_task from bambu_run.bambu_cloud import fetch_and_upsert_task
fetch_and_upsert_task(self.printer_client._client, self.current_print_job) fetch_and_upsert_task(session.client._client, job)
except Exception as e: except Exception as e:
logger.warning(f"Cloud task sync skipped (non-fatal): {e}") logger.warning(f"Cloud task sync skipped (non-fatal): {e}")
start_metric = self.current_print_job.start_metric start_metric = job.start_metric
if not start_metric: if not start_metric:
logger.warning(f"No start_metric for job {self.current_print_job.id}, skipping filament usage") logger.warning(f"No start_metric for job {job.id}, skipping filament usage")
elif not self.trays_used: elif not session.trays_used:
logger.warning(f"No trays tracked for job {self.current_print_job.project_name}, skipping filament usage") logger.warning(f"No trays tracked for job {job.project_name}, skipping filament usage")
else: else:
for tray_id in self.trays_used: # A bare tray_id (from `tray_now`) doesn't identify which physical AMS
start_snap = start_metric.filament_snapshots.filter( # unit was active when multiple units share the same slot numbering —
# so create one usage row per (unit, tray) that had a tracked filament
# loaded at job start, rather than guessing a single "correct" unit.
created_usages = []
for tray_id in session.trays_used:
start_snaps = start_metric.filament_snapshots.filter(
tray_id=tray_id, filament__isnull=False tray_id=tray_id, filament__isnull=False
).first() )
if not start_snap: for start_snap in start_snaps:
continue
end_snap = metric.filament_snapshots.filter( end_snap = metric.filament_snapshots.filter(
filament=start_snap.filament, tray_id=tray_id filament=start_snap.filament,
tray_id=tray_id,
ams_unit_id=start_snap.ams_unit_id,
).first() ).first()
usage = FilamentUsage.objects.create( usage = FilamentUsage.objects.create(
print_job=self.current_print_job, print_job=job,
filament=start_snap.filament, filament=start_snap.filament,
tray_id=tray_id, tray_id=tray_id,
ams_unit_id=start_snap.ams_unit_id,
starting_percent=start_snap.remain_percent or 100, starting_percent=start_snap.remain_percent or 100,
ending_percent=end_snap.remain_percent if end_snap else None, ending_percent=end_snap.remain_percent if end_snap else None,
is_primary=(len(self.trays_used) == 1),
) )
usage.calculate_consumed() usage.calculate_consumed()
created_usages.append(usage)
for usage in created_usages:
usage.is_primary = len(created_usages) == 1
usage.save() usage.save()
if self.verbose: if self.verbose:
logger.debug( logger.debug(
f"Filament usage for {start_snap.filament} (tray {tray_id}): " f"Filament usage for {usage.filament} (unit {usage.ams_unit_id}, tray {usage.tray_id}): "
f"{usage.starting_percent}% -> {usage.ending_percent}%, consumed {usage.consumed_percent}%" f"{usage.starting_percent}% -> {usage.ending_percent}%, consumed {usage.consumed_percent}%"
) )
logger.info( logger.info(
f"Print job finished: {self.current_print_job.project_name} " f"[{session.device_id}] Print job finished: {job.project_name} "
f"({self.current_print_job.final_status}) - Duration: {self.current_print_job.duration_minutes} min, " f"({job.final_status}) - Duration: {job.duration_minutes} min, "
f"Trays used: {sorted(self.trays_used) if self.trays_used else 'none tracked'}" f"Trays used: {sorted(session.trays_used) if session.trays_used else 'none tracked'}"
) )
self.current_print_job = None session.current_print_job = None
self.trays_used = set() session.trays_used = set()
def _collect_printer_data(self): def _collect_printer_data(self, session: "DeviceSession"):
try: try:
snapshot = self.printer_client.get_snapshot() snapshot = session.client.get_snapshot()
if snapshot is None: if snapshot is None:
self.mqtt_connect_errors += 1 session.mqtt_connect_errors += 1
if self.mqtt_connect_errors <= 5 or self.verbose: if session.mqtt_connect_errors <= 5 or self.verbose:
logger.warning( logger.warning(
f"MQTT not connected yet or no data available " f"[{session.device_id}] MQTT not connected yet or no data available "
f"(attempt {self.mqtt_connect_errors})" f"(attempt {session.mqtt_connect_errors})"
) )
return return
with transaction.atomic(): with transaction.atomic():
metric = PrinterMetrics.objects.create( metric = PrinterMetrics.objects.create(
device=self.printer_device, device=session.printer,
timestamp=timezone.now(), timestamp=timezone.now(),
nozzle_temp=self._to_decimal(snapshot.get("nozzle_temp")), nozzle_temp=self._to_decimal(snapshot.get("nozzle_temp")),
nozzle_target_temp=self._to_decimal(snapshot.get("nozzle_target_temp")), nozzle_target_temp=self._to_decimal(snapshot.get("nozzle_target_temp")),
@@ -688,27 +788,33 @@ class Command(BaseCommand):
ams_units=snapshot.get("ams_units", []), ams_units=snapshot.get("ams_units", []),
external_spool=snapshot.get("external_spool", {}), external_spool=snapshot.get("external_spool", {}),
lights_report=snapshot.get("lights_report", []), lights_report=snapshot.get("lights_report", []),
vortek_raw=snapshot.get("vortek_raw", {}),
nozzle_info=snapshot.get("hotends", []),
) )
filaments_data = snapshot.get('filaments', []) filaments_data = snapshot.get('filaments', [])
if filaments_data: if filaments_data:
self._create_filament_snapshots(metric, filaments_data, snapshot) self._create_filament_snapshots(metric, filaments_data, snapshot)
self._track_print_job(metric, snapshot) hotends_data = snapshot.get('hotends', [])
if hotends_data:
self._update_hotends(session.printer, metric, hotends_data)
self.success_count += 1 self._track_print_job(session, metric, snapshot)
session.success_count += 1
if self.verbose: if self.verbose:
logger.debug( logger.debug(
f"Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, " f"[{session.device_id}] Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, "
f"Bed={snapshot.get('bed_temp')}C, " f"Bed={snapshot.get('bed_temp')}C, "
f"Progress={snapshot.get('print_percent')}%, " f"Progress={snapshot.get('print_percent')}%, "
f"State={snapshot.get('gcode_state')}" f"State={snapshot.get('gcode_state')}"
) )
except Exception as e: except Exception as e:
self.error_count += 1 session.error_count += 1
logger.error(f"Error collecting printer data (total errors: {self.error_count}): {e}") logger.error(f"[{session.device_id}] Error collecting printer data (total errors: {session.error_count}): {e}")
if self.verbose: if self.verbose:
logger.exception("Detailed traceback:") logger.exception("Detailed traceback:")
@@ -723,16 +829,20 @@ class Command(BaseCommand):
def _print_statistics(self): def _print_statistics(self):
if self.start_time: if self.start_time:
runtime = timezone.now() - self.start_time runtime = timezone.now() - self.start_time
total_collections = self.success_count + self.error_count success_count = sum(s.success_count for s in self.sessions.values())
error_count = sum(s.error_count for s in self.sessions.values())
mqtt_connect_errors = sum(s.mqtt_connect_errors for s in self.sessions.values())
total_collections = success_count + error_count
success_rate = ( success_rate = (
(self.success_count / total_collections * 100) (success_count / total_collections * 100)
if total_collections > 0 if total_collections > 0
else 0 else 0
) )
logger.info("=== Statistics ===") logger.info("=== Statistics ===")
logger.info(f"Runtime: {runtime}") logger.info(f"Runtime: {runtime}")
logger.info(f"Successful collections: {self.success_count}") logger.info(f"Printers tracked: {len(self.sessions)}")
logger.info(f"Failed collections: {self.error_count}") logger.info(f"Successful collections: {success_count}")
logger.info(f"MQTT connection warnings: {self.mqtt_connect_errors}") logger.info(f"Failed collections: {error_count}")
logger.info(f"MQTT connection warnings: {mqtt_connect_errors}")
logger.info(f"Success rate: {success_rate:.1f}%") logger.info(f"Success rate: {success_rate:.1f}%")

View File

@@ -0,0 +1,128 @@
"""
Diagnose multi-printer cloud data for a Bambu Lab account.
Run this if `bambu_collector` doesn't pick up all your printers, or the data
collected for a second/third printer looks wrong. It authenticates with your
Bambu Lab account, lists every device the cloud API reports, listens briefly
for raw MQTT data from each one, and writes a redacted JSON report you can
attach to a GitHub issue.
Usage:
python manage.py bambu_diagnose
python manage.py bambu_diagnose --listen-seconds 15
python manage.py bambu_diagnose --output my_report.json
python manage.py bambu_diagnose --no-redact # local debugging only — do NOT post this output publicly
"""
import json
import logging
import time
from django.core.management.base import BaseCommand, CommandError
from bambu_run.diagnostics import build_diagnostics_report, redact_diagnostics
logger = logging.getLogger("bambu_run.diagnose")
class Command(BaseCommand):
help = "Authenticate, list every printer on the account, and write a redacted diagnostics report."
def add_arguments(self, parser):
parser.add_argument(
"--listen-seconds", type=float, default=8.0,
help="How long to listen for MQTT data per device (default: 8)",
)
parser.add_argument(
"--output", type=str, default=None,
help="Output file path (default: bambu_diagnostics_<timestamp>.json)",
)
parser.add_argument(
"--no-redact", action="store_true",
help="Keep full serials/identifiers unmasked. For your own debugging only — "
"do not paste this output into a public GitHub issue.",
)
def handle(self, *args, **options):
import os
from bambu_run.mqtt_client import BambuPrinter, BambuClient
listen_seconds = options["listen_seconds"]
redact = not options["no_redact"]
bambu_username = os.environ.get("BAMBU_USERNAME")
bambu_password = os.environ.get("BAMBU_PASSWORD")
bambu_token = os.environ.get("BAMBU_TOKEN")
if not bambu_token and not all([bambu_username, bambu_password]):
raise CommandError(
"Either BAMBU_TOKEN or both BAMBU_USERNAME and BAMBU_PASSWORD "
"environment variables must be set"
)
self.stdout.write("Authenticating with Bambu Lab cloud...")
auth = BambuPrinter(username=bambu_username, password=bambu_password, token=bambu_token)
token = auth._ensure_token()
cloud = BambuClient(token=token)
devices = cloud.get_devices()
self.stdout.write(self.style.SUCCESS(f"Found {len(devices)} device(s) on this account:"))
for device in devices:
self.stdout.write(
f" - {device.get('name', 'unknown')} "
f"({device.get('dev_product_name', 'unknown model')}) "
f"online={device.get('online')}"
)
if len(devices) < 2:
self.stdout.write(self.style.WARNING(
"Only one device returned by the cloud API — if you own multiple printers, "
"this is likely the root cause. Note this in the GitHub issue."
))
raw_payloads = {}
for device in devices:
dev_id = device.get("dev_id")
if not dev_id:
continue
self.stdout.write(f"Listening to {device.get('name', dev_id)} for {listen_seconds:.0f}s...")
client = BambuPrinter(token=token, device_id=dev_id)
try:
client.connect(blocking=False)
self._request_full_status_when_ready(client)
time.sleep(listen_seconds)
state = client.get_state()
raw_payloads[dev_id] = state._raw_data.get("print") if state._raw_data else None
except Exception as e:
self.stdout.write(self.style.WARNING(f" Could not collect data for {dev_id}: {e}"))
raw_payloads[dev_id] = None
finally:
client.disconnect()
report = build_diagnostics_report(devices, raw_payloads)
report = redact_diagnostics(report, redact=redact)
output_path = options["output"] or f"bambu_diagnostics_{int(time.time())}.json"
with open(output_path, "w") as f:
json.dump(report, f, indent=2, default=str)
self.stdout.write(self.style.SUCCESS(f"\nDiagnostics written to: {output_path}"))
if not redact:
self.stdout.write(self.style.WARNING(
"--no-redact was used: this file contains unmasked serials/identifiers. "
"Do not attach it to a public GitHub issue as-is."
))
else:
self.stdout.write(
"Serials/identifiers are masked. Please skim the file once before posting — "
"then attach it to https://github.com/RunLit/Bambu-Run/issues/10"
)
def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
mqtt_client = getattr(client, "_mqtt", None)
if mqtt_client is not None and getattr(mqtt_client, "connected", False):
client._mqtt.request_full_status()
return
time.sleep(0.5)

View File

@@ -0,0 +1,22 @@
# Generated by Django 5.2.8 on 2026-06-18 12:20
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bambu_run", "0004_h2c_dual_nozzle_and_ams_fields"),
]
operations = [
migrations.AddField(
model_name="printermetrics",
name="vortek_raw",
field=models.JSONField(
blank=True,
default=dict,
help_text="Raw print.device MQTT payload (Vortek rack groundwork)",
),
),
]

View File

@@ -0,0 +1,63 @@
# Generated by Django 5.2.8 on 2026-06-20 12:40
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bambu_run", "0005_printermetrics_vortek_raw"),
]
operations = [
migrations.AlterModelOptions(
name="filamentsnapshot",
options={
"ordering": ["printer_metric", "ams_unit_id", "tray_id"],
"verbose_name": "Filament Snapshot",
"verbose_name_plural": "Filament Snapshots",
},
),
migrations.AddField(
model_name="filamentsnapshot",
name="ams_type",
field=models.CharField(
blank=True,
choices=[
("AMS", "AMS"),
("AMS 2 Pro", "AMS 2 Pro"),
("AMS HT", "AMS HT"),
],
default="",
help_text="Type of the AMS unit this tray belongs to (AMS / AMS 2 Pro / AMS HT)",
max_length=32,
),
),
migrations.AddField(
model_name="filamentsnapshot",
name="ams_unit_id",
field=models.PositiveSmallIntegerField(
blank=True,
db_index=True,
help_text="Which physical AMS unit this tray belongs to (matches MQTT ams[i].id; 128 = AMS HT)",
null=True,
),
),
migrations.AddField(
model_name="filamentusage",
name="ams_unit_id",
field=models.PositiveSmallIntegerField(
blank=True,
db_index=True,
help_text="Which physical AMS unit the slot belongs to (matches MQTT ams[i].id; 128 = AMS HT)",
null=True,
),
),
migrations.AddIndex(
model_name="filamentsnapshot",
index=models.Index(
fields=["printer_metric", "ams_unit_id", "tray_id"],
name="infrastruct_printer_2ad168_idx",
),
),
]

View File

@@ -0,0 +1,172 @@
# Generated by Django 5.2.8 on 2026-06-20 14:07
import django.db.models.deletion
import django.utils.timezone
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bambu_run", "0006_alter_filamentsnapshot_options_and_more"),
]
operations = [
migrations.CreateModel(
name="Hotend",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("serial_number", models.CharField(db_index=True, max_length=100)),
(
"nozzle_type",
models.CharField(blank=True, default="", max_length=50),
),
(
"diameter",
models.DecimalField(
blank=True, decimal_places=2, max_digits=3, null=True
),
),
(
"raw_id",
models.PositiveSmallIntegerField(
help_text="Last-seen MQTT device.nozzle.info[].id"
),
),
(
"slot_number",
models.PositiveSmallIntegerField(
blank=True,
help_text="Rack bay 1-6, derived from raw_id 16-21. Null if currently unknown (e.g. mounted on toolhead and id reports as the 0 sentinel).",
null=True,
),
),
(
"is_toolhead",
models.BooleanField(
default=False,
help_text="True if currently mounted on the toolhead under normal polling (raw_id == 0).",
),
),
(
"last_filament_profile_id",
models.CharField(
blank=True,
default="",
help_text="Bambu material profile id of the filament last loaded (MQTT fila_id, e.g. 'GFA01')",
max_length=20,
),
),
(
"last_color",
models.CharField(
blank=True,
default="",
help_text="6-char hex of the filament last loaded (MQTT color_m, alpha stripped)",
max_length=6,
),
),
("used_time_seconds", models.PositiveIntegerField(default=0)),
(
"wear_percent",
models.DecimalField(
decimal_places=2,
default=0,
help_text="MQTT wear (0-128 scale) converted to a 0-100 percent",
max_digits=5,
),
),
("last_seen_at", models.DateTimeField(auto_now=True)),
("created_at", models.DateTimeField(auto_now_add=True)),
(
"printer",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="hotends",
to="bambu_run.printer",
),
),
],
options={
"verbose_name": "Hotend",
"verbose_name_plural": "Hotends",
"db_table": "infrastructure_hotend",
"ordering": ["printer", "-is_toolhead", "slot_number", "serial_number"],
"unique_together": {("printer", "serial_number")},
},
),
migrations.CreateModel(
name="HotendSnapshot",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("raw_id", models.PositiveSmallIntegerField()),
("used_time_seconds", models.PositiveIntegerField(default=0)),
(
"wear_percent",
models.DecimalField(decimal_places=2, default=0, max_digits=5),
),
(
"stat",
models.IntegerField(
blank=True,
help_text="Raw MQTT status code for this hotend",
null=True,
),
),
(
"timestamp",
models.DateTimeField(
db_index=True, default=django.utils.timezone.now
),
),
(
"hotend",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="snapshots",
to="bambu_run.hotend",
),
),
(
"printer_metric",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="hotend_snapshots",
to="bambu_run.printermetrics",
),
),
],
options={
"verbose_name": "Hotend Snapshot",
"verbose_name_plural": "Hotend Snapshots",
"db_table": "infrastructure_hotend_snapshot",
"ordering": ["printer_metric", "hotend"],
"indexes": [
models.Index(
fields=["printer_metric", "hotend"],
name="infrastruct_printer_b528aa_idx",
),
models.Index(
fields=["hotend", "-timestamp"],
name="infrastruct_hotend__691f7e_idx",
),
],
},
),
]

View File

@@ -0,0 +1,22 @@
# Generated by Django 5.2.8 on 2026-06-20 14:22
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bambu_run", "0007_hotend_hotendsnapshot"),
]
operations = [
migrations.AddField(
model_name="printermetrics",
name="nozzle_info",
field=models.JSONField(
blank=True,
default=list,
help_text="Parsed per-poll nozzle/hotend info list",
),
),
]

View File

@@ -21,12 +21,15 @@ AMS_TYPE_CHOICES = [
def ams_type_from_info(info_code) -> str: def ams_type_from_info(info_code) -> str:
"""Resolve an AMS unit's `info` model code to a human label. """Resolve an AMS unit's `info` model code to a human label.
The HT unit reports its `id` with the 0x80 bit set (e.g. 128) — when the info Real MQTT `info` codes are 8 characters (e.g. "10001003") with the type encoded
code is unknown, that bit is a reasonable secondary hint for HT identification. in the last 4 digits — confirmed against a live H2C with AMS 2 Pro / AMS / AMS HT.
Fall back to an exact match for the bare 4-digit form in case other firmware
reports it short.
""" """
if info_code is None: if not info_code:
return "" return ""
return AMS_INFO_TO_TYPE.get(str(info_code), "") code = str(info_code)
return AMS_INFO_TO_TYPE.get(code[-4:], "") or AMS_INFO_TO_TYPE.get(code, "")
class Printer(models.Model): class Printer(models.Model):
@@ -231,6 +234,23 @@ class PrinterMetrics(models.Model):
default=list, help_text="Light status report [{node, mode}]" default=list, help_text="Light status report [{node, mode}]"
) )
# Groundwork for H2C's Vortek nozzle-changer rack (6 swappable hotends + 1 fixed
# left nozzle) — the full MQTT schema for per-slot state isn't confirmed yet, so
# the raw `print.device` payload is captured here unfiltered to avoid losing data
# ahead of proper per-slot modeling.
vortek_raw = models.JSONField(
default=dict, blank=True, help_text="Raw print.device MQTT payload (Vortek rack groundwork)"
)
# Parsed device.nozzle.info[] from this poll, one dict per entry (mirrors
# HotendInfo.to_dict()). Includes induction-chip hotends *and* non-inductive
# nozzle positions (e.g. H2C's fixed left nozzle) that have no stable serial
# number to key a Hotend registry row on — kept here so the dashboard can show
# their readable type/diameter without claiming an identity/history we don't have.
nozzle_info = models.JSONField(
default=list, blank=True, help_text="Parsed per-poll nozzle/hotend info list"
)
class Meta: class Meta:
db_table = "infrastructure_printer_metrics" db_table = "infrastructure_printer_metrics"
verbose_name = "Printer Metric" verbose_name = "Printer Metric"
@@ -490,6 +510,15 @@ class FilamentSnapshot(models.Model):
max_length=20, null=True, blank=True, max_length=20, null=True, blank=True,
help_text="Slot identifier like A00-W1" help_text="Slot identifier like A00-W1"
) )
ams_unit_id = models.PositiveSmallIntegerField(
null=True, blank=True, db_index=True,
help_text="Which physical AMS unit this tray belongs to (matches MQTT ams[i].id; 128 = AMS HT)"
)
ams_type = models.CharField(
max_length=32, blank=True, default="",
choices=AMS_TYPE_CHOICES,
help_text="Type of the AMS unit this tray belongs to (AMS / AMS 2 Pro / AMS HT)"
)
type = models.CharField(max_length=50, null=True, blank=True) type = models.CharField(max_length=50, null=True, blank=True)
sub_type = models.CharField( sub_type = models.CharField(
@@ -537,9 +566,10 @@ class FilamentSnapshot(models.Model):
db_table = "infrastructure_filament_snapshot" db_table = "infrastructure_filament_snapshot"
verbose_name = "Filament Snapshot" verbose_name = "Filament Snapshot"
verbose_name_plural = "Filament Snapshots" verbose_name_plural = "Filament Snapshots"
ordering = ['printer_metric', 'tray_id'] ordering = ['printer_metric', 'ams_unit_id', 'tray_id']
indexes = [ indexes = [
models.Index(fields=['printer_metric', 'tray_id']), models.Index(fields=['printer_metric', 'tray_id']),
models.Index(fields=['printer_metric', 'ams_unit_id', 'tray_id']),
models.Index(fields=['filament']), models.Index(fields=['filament']),
] ]
@@ -678,6 +708,10 @@ class FilamentUsage(models.Model):
) )
tray_id = models.IntegerField(help_text="Which AMS slot was used") tray_id = models.IntegerField(help_text="Which AMS slot was used")
ams_unit_id = models.PositiveSmallIntegerField(
null=True, blank=True, db_index=True,
help_text="Which physical AMS unit the slot belongs to (matches MQTT ams[i].id; 128 = AMS HT)"
)
starting_percent = models.IntegerField(help_text="Filament remaining % at job start") starting_percent = models.IntegerField(help_text="Filament remaining % at job start")
ending_percent = models.IntegerField( ending_percent = models.IntegerField(
@@ -715,3 +749,108 @@ class FilamentUsage(models.Model):
self.consumed_grams = int( self.consumed_grams = int(
self.filament.initial_weight_grams * (self.consumed_percent / 100.0) self.filament.initial_weight_grams * (self.consumed_percent / 100.0)
) )
class Hotend(models.Model):
"""Registry of individual Vortek hotends, keyed by serial number.
A Vortek rack holds up to 6 swappable hotends (bays, MQTT `id` 16-21) plus
1 mounted on the toolhead at a time (MQTT `id` 0). `raw_id` reflects whichever
address was last seen on the wire for this hotend; `slot_number` is only set
when that address falls in the 16-21 rack-bay range — confirmed by watching
a "Read All" MQTT capture reassign a toolhead-mounted hotend's id from 0 to
its true bay id.
"""
printer = models.ForeignKey(
'Printer', on_delete=models.CASCADE, related_name='hotends'
)
serial_number = models.CharField(max_length=100, db_index=True)
nozzle_type = models.CharField(max_length=50, blank=True, default="")
diameter = models.DecimalField(
max_digits=3, decimal_places=2, null=True, blank=True
)
raw_id = models.PositiveSmallIntegerField(
help_text="Last-seen MQTT device.nozzle.info[].id"
)
slot_number = models.PositiveSmallIntegerField(
null=True, blank=True,
help_text="Rack bay 1-6, derived from raw_id 16-21. Null if currently unknown (e.g. mounted on toolhead and id reports as the 0 sentinel)."
)
is_toolhead = models.BooleanField(
default=False,
help_text="True if currently mounted on the toolhead under normal polling (raw_id == 0)."
)
last_filament_profile_id = models.CharField(
max_length=20, blank=True, default="",
help_text="Bambu material profile id of the filament last loaded (MQTT fila_id, e.g. 'GFA01')"
)
last_color = models.CharField(
max_length=6, blank=True, default="",
help_text="6-char hex of the filament last loaded (MQTT color_m, alpha stripped)"
)
used_time_seconds = models.PositiveIntegerField(default=0)
wear_percent = models.DecimalField(
max_digits=5, decimal_places=2, default=0,
help_text="MQTT wear (0-128 scale) converted to a 0-100 percent"
)
last_seen_at = models.DateTimeField(auto_now=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "infrastructure_hotend"
verbose_name = "Hotend"
verbose_name_plural = "Hotends"
ordering = ['printer', '-is_toolhead', 'slot_number', 'serial_number']
unique_together = [['printer', 'serial_number']]
def __str__(self):
location = "Toolhead" if self.is_toolhead else (
f"Slot {self.slot_number}" if self.slot_number else "Rack"
)
return f"{self.serial_number} ({location})"
@property
def used_time_display(self) -> str:
hours, remainder = divmod(self.used_time_seconds, 3600)
minutes = remainder // 60
return f"{hours}h {minutes}m" if hours else f"{minutes}m"
class HotendSnapshot(models.Model):
"""Point-in-time reading of a Hotend, one row per collector poll."""
printer_metric = models.ForeignKey(
'PrinterMetrics', on_delete=models.CASCADE,
related_name='hotend_snapshots'
)
hotend = models.ForeignKey(
'Hotend', on_delete=models.CASCADE,
related_name='snapshots'
)
raw_id = models.PositiveSmallIntegerField()
used_time_seconds = models.PositiveIntegerField(default=0)
wear_percent = models.DecimalField(max_digits=5, decimal_places=2, default=0)
stat = models.IntegerField(
null=True, blank=True, help_text="Raw MQTT status code for this hotend"
)
timestamp = models.DateTimeField(default=timezone.now, db_index=True)
class Meta:
db_table = "infrastructure_hotend_snapshot"
verbose_name = "Hotend Snapshot"
verbose_name_plural = "Hotend Snapshots"
ordering = ['printer_metric', 'hotend']
indexes = [
models.Index(fields=['printer_metric', 'hotend']),
models.Index(fields=['hotend', '-timestamp']),
]
def __str__(self):
return f"{self.hotend.serial_number} @ {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}"

View File

@@ -296,6 +296,73 @@ class AMSState:
return loaded return loaded
@dataclass
class HotendInfo:
"""A single hotend reported in `device.nozzle.info[]` (Vortek rack).
`raw_id` semantics (confirmed by watching a live "Read All" MQTT capture):
0 = currently mounted on the (swappable) toolhead — the sentinel hides the
true bay address until "Read All" resolves it; 1 = the fixed left nozzle on
dual-nozzle printers (no RFID chip, always reports sn="N/A"); 16-21 = rack
bay address, slot_number = raw_id - 15 (1-6).
"""
raw_id: int = 0
serial_number: str = ""
nozzle_type: str = ""
diameter: float = 0.4
fila_id: str = ""
color: Optional[str] = None
used_time_seconds: int = 0
wear_percent: float = 0.0
stat: int = 0
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "HotendInfo":
from .utils import strip_color_padding
return cls(
raw_id=int(data.get("id", 0)),
serial_number=data.get("sn", ""),
nozzle_type=data.get("type", ""),
diameter=float(data.get("diameter", 0.4)),
fila_id=data.get("fila_id", ""),
color=strip_color_padding(data.get("color_m")),
used_time_seconds=int(data.get("p_t", 0)),
wear_percent=round(float(data.get("wear", 0.0)) / 128.0 * 100, 2),
stat=int(data.get("stat", 0)),
)
@property
def is_toolhead(self) -> bool:
return self.raw_id == 0
@property
def is_empty(self) -> bool:
return self.serial_number in ("", "N/A")
@property
def slot_number(self) -> Optional[int]:
if 16 <= self.raw_id <= 21:
return self.raw_id - 15
return None
def to_dict(self) -> Dict[str, Any]:
return {
"raw_id": self.raw_id,
"serial_number": self.serial_number,
"nozzle_type": self.nozzle_type,
"diameter": self.diameter,
"fila_id": self.fila_id,
"color": self.color,
"used_time_seconds": self.used_time_seconds,
"wear_percent": self.wear_percent,
"stat": self.stat,
"is_toolhead": self.is_toolhead,
"is_empty": self.is_empty,
"slot_number": self.slot_number,
}
@dataclass @dataclass
class PrinterState: class PrinterState:
"""Complete printer state parsed from MQTT data""" """Complete printer state parsed from MQTT data"""
@@ -388,6 +455,9 @@ class PrinterState:
# External spool (virtual tray) # External spool (virtual tray)
vt_tray: Optional[Dict[str, Any]] = None vt_tray: Optional[Dict[str, Any]] = None
# Vortek hotend rack (device.nozzle.info[])
hotends: List[HotendInfo] = field(default_factory=list)
# Raw data for any additional fields # Raw data for any additional fields
_raw_data: Dict[str, Any] = field(default_factory=dict, repr=False) _raw_data: Dict[str, Any] = field(default_factory=dict, repr=False)
@@ -431,6 +501,12 @@ class PrinterState:
nozzle_target_temp_left = float((t >> 16) & 0xFFFF) nozzle_target_temp_left = float((t >> 16) & 0xFFFF)
nozzle_temp_left = float(t & 0xFFFF) nozzle_temp_left = float(t & 0xFFFF)
# Vortek hotend rack: device.nozzle.info[] — one entry per hotend.
hotends = [
HotendInfo.from_dict(h)
for h in (device.get("nozzle") or {}).get("info") or []
]
return cls( return cls(
timestamp=timestamp, timestamp=timestamp,
sequence_id=str(print_data.get("sequence_id", "")), sequence_id=str(print_data.get("sequence_id", "")),
@@ -487,6 +563,7 @@ class PrinterState:
gcode_file_prepare_percent=str(print_data.get("gcode_file_prepare_percent", "")), gcode_file_prepare_percent=str(print_data.get("gcode_file_prepare_percent", "")),
lifecycle=print_data.get("lifecycle", ""), lifecycle=print_data.get("lifecycle", ""),
vt_tray=print_data.get("vt_tray"), vt_tray=print_data.get("vt_tray"),
hotends=hotends,
_raw_data=data, _raw_data=data,
) )
@@ -529,6 +606,11 @@ class PrinterState:
"wifi_signal_dbm": self.wifi_signal_dbm, "wifi_signal_dbm": self.wifi_signal_dbm,
"print_error": self.print_error, "print_error": self.print_error,
"has_errors": self.print_error != 0, "has_errors": self.print_error != 0,
# Full `print.device` payload, unfiltered. H2C's Vortek rack (6 swappable
# hotends + 1 fixed left nozzle) isn't fully modeled yet — stash everything
# here so no data is lost once the real Vortek MQTT schema is confirmed.
"vortek_raw": self._raw_data.get("print", {}).get("device", {}),
"hotends": [h.to_dict() for h in self.hotends],
"hms": self.hms, "hms": self.hms,
"stg_cur": self.stg_cur, "stg_cur": self.stg_cur,
"lights_report": self.lights_report, "lights_report": self.lights_report,

View File

@@ -64,3 +64,93 @@
opacity: 0.9; opacity: 0.9;
color: rgba(255, 255, 255, 0.9); color: rgba(255, 255, 255, 0.9);
} }
/* AMS unit type colors — CSS variables so RAE/standalone can override per theme */
:root {
--ams-badge-ams: #6c757d;
--ams-badge-ams-2-pro: #0d6efd;
--ams-badge-ams-ht: #fd7e14;
--ams-group-border-color: rgba(0, 0, 0, 0.15);
}
[data-coreui-theme="dark"] {
--ams-group-border-color: rgba(255, 255, 255, 0.2);
}
.ams-badge-ams {
background-color: var(--ams-badge-ams);
color: #fff;
}
.ams-badge-ams-2-pro {
background-color: var(--ams-badge-ams-2-pro);
color: #fff;
}
.ams-badge-ams-ht {
background-color: var(--ams-badge-ams-ht);
color: #fff;
}
.ams-filter-pills {
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
}
.ams-filter-pill {
border-radius: 50rem;
padding: 0.25rem 0.9rem;
font-size: 0.85rem;
border: 1px solid var(--ams-group-border-color);
background-color: transparent;
opacity: 0.6;
}
.ams-filter-pill.active {
opacity: 1;
font-weight: 600;
border-color: currentColor;
}
/* Grouped AMS unit panels — wide (multi-slot) units stack one per row,
compact (single-slot, e.g. AMS HT) units flow side-by-side and wrap. */
.ams-groups {
display: flex;
flex-wrap: wrap;
gap: 1rem;
}
.ams-group {
border-radius: 0.5rem;
padding: 0.75rem;
border: 1px solid var(--ams-group-border-color);
}
.ams-group--wide {
flex: 1 1 100%;
}
.ams-group--compact {
flex: 0 1 auto;
min-width: 220px;
}
.ams-badge-bg-ams {
background-color: color-mix(in srgb, var(--ams-badge-ams) 8%, transparent);
border-left: 3px solid var(--ams-badge-ams);
}
.ams-badge-bg-ams-2-pro {
background-color: color-mix(in srgb, var(--ams-badge-ams-2-pro) 8%, transparent);
border-left: 3px solid var(--ams-badge-ams-2-pro);
}
.ams-badge-bg-ams-ht {
background-color: color-mix(in srgb, var(--ams-badge-ams-ht) 8%, transparent);
border-left: 3px solid var(--ams-badge-ams-ht);
}
.ams-badge-bg- {
border-left: 3px solid var(--ams-group-border-color);
}

View File

@@ -625,8 +625,23 @@ function createFilamentDatasets(filamentTimeline, timestamps) {
data: filamentTimeline[key] data: filamentTimeline[key]
})); }));
// Sort by tray_id (numeric first, External last), then by start_idx (chronological) // Distinct (non-null/undefined) AMS units present in this timeline — used to decide
// whether labels need an AMS unit prefix (avoid noise for the common single-AMS case).
const distinctUnits = new Set(
filamentEntries
.map(e => e.data.ams_unit_id)
.filter(uid => uid !== null && uid !== undefined)
);
const showUnitPrefix = distinctUnits.size > 1;
// Sort by AMS unit, then tray_id (numeric first, External last), then by start_idx
filamentEntries.sort((a, b) => { filamentEntries.sort((a, b) => {
const unitA = a.data.ams_unit_id ?? -1;
const unitB = b.data.ams_unit_id ?? -1;
if (unitA !== unitB) {
return unitA - unitB;
}
const trayA = a.data.tray_id; const trayA = a.data.tray_id;
const trayB = b.data.tray_id; const trayB = b.data.tray_id;
@@ -659,6 +674,10 @@ function createFilamentDatasets(filamentTimeline, timestamps) {
displayLabel = `Tray ${filament.tray_id} (${filament.type})`; displayLabel = `Tray ${filament.tray_id} (${filament.type})`;
} }
if (showUnitPrefix && filament.ams_type) {
displayLabel = `${filament.ams_type} · ${displayLabel}`;
}
// Add brand if it's different from type (avoid redundancy) // Add brand if it's different from type (avoid redundancy)
if (filament.brand && filament.brand !== filament.type && filament.brand !== 'External') { if (filament.brand && filament.brand !== filament.type && filament.brand !== 'External') {
displayLabel += ` - ${filament.brand}`; displayLabel += ` - ${filament.brand}`;

View File

@@ -14,6 +14,20 @@
Real-time monitoring for {{ device_name }} Real-time monitoring for {{ device_name }}
</p> </p>
</div> </div>
{% if show_printer_switcher %}
<div class="col-auto d-flex align-items-center gap-2">
<label for="printerSwitcher" class="form-label mb-0 text-nowrap">Device:</label>
<select id="printerSwitcher" class="form-select" aria-label="Select printer"
onchange="if (this.value) { window.location.href = this.value; }">
{% for p in all_printers %}
<option value="{% url 'bambu_run:printer_dashboard' pk=p.pk %}"
{% if printer_device.pk == p.pk %}selected{% endif %}>
{{ p.name }}
</option>
{% endfor %}
</select>
</div>
{% endif %}
</div> </div>
{% if error %} {% if error %}
@@ -23,32 +37,38 @@
<!-- Summary Cards Row --> <!-- Summary Cards Row -->
<div class="row g-3 mb-4"> <div class="row g-3 mb-4">
{% if stats.is_dual_nozzle %} {% if stats.is_dual_nozzle %}
<!-- Right Nozzle (dual-nozzle printers, e.g. H2C) --> <!-- Left Nozzle (dual-nozzle printers, e.g. H2C) -->
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-warning"> <div class="card infra-card-warning h-100">
<div class="card-body"> <div class="card-body">
<div class="d-flex justify-content-between align-items-start"> <div class="d-flex justify-content-between align-items-start">
<div> <div>
<div class="stat-label">Right Nozzle</div> <div class="stat-label d-flex align-items-center gap-1">
<div class="stat-value">{{ stats.nozzle_temp|floatformat:1 }}&deg;C</div> <svg class="icon" style="width: 1.25rem; height: 1.25rem;"><use href="{% static 'bambu_run/vendors/coreui-icons-free.svg' %}#cil-arrow-thick-left"></use></svg>
<div class="text-muted small">target {{ stats.nozzle_target_temp|floatformat:0 }}&deg;C Left Nozzle
{% if stats.nozzle_type %}· {{ stats.nozzle_type }}{% endif %}</div> </div>
<div class="stat-value">{{ stats.nozzle_temp_left|floatformat:1 }}&deg;C</div>
<div class="text-muted small">target {{ stats.nozzle_target_temp_left|floatformat:0 }}&deg;C
{% if stats.nozzle_type_left %}· Nozzle {{ stats.nozzle_type_left }}{% endif %}</div>
</div> </div>
<i class="bi bi-thermometer-high" style="font-size: 2rem; opacity: 0.3;"></i> <i class="bi bi-thermometer-high" style="font-size: 2rem; opacity: 0.3;"></i>
</div> </div>
</div> </div>
</div> </div>
</div> </div>
<!-- Left Nozzle --> <!-- Right Nozzle -->
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-warning"> <div class="card infra-card-warning h-100">
<div class="card-body"> <div class="card-body">
<div class="d-flex justify-content-between align-items-start"> <div class="d-flex justify-content-between align-items-start">
<div> <div>
<div class="stat-label">Left Nozzle</div> <div class="stat-label d-flex align-items-center gap-1">
<div class="stat-value">{{ stats.nozzle_temp_left|floatformat:1 }}&deg;C</div> Right Nozzle
<div class="text-muted small">target {{ stats.nozzle_target_temp_left|floatformat:0 }}&deg;C <svg class="icon" style="width: 1.25rem; height: 1.25rem;"><use href="{% static 'bambu_run/vendors/coreui-icons-free.svg' %}#cil-arrow-thick-right"></use></svg>
{% if stats.nozzle_type_left %}· {{ stats.nozzle_type_left }}{% endif %}</div> </div>
<div class="stat-value">{{ stats.nozzle_temp|floatformat:1 }}&deg;C</div>
<div class="text-muted small">target {{ stats.nozzle_target_temp|floatformat:0 }}&deg;C
{% if stats.nozzle_type %}· Nozzle {{ stats.nozzle_type }}{% endif %}</div>
</div> </div>
<i class="bi bi-thermometer-high" style="font-size: 2rem; opacity: 0.3;"></i> <i class="bi bi-thermometer-high" style="font-size: 2rem; opacity: 0.3;"></i>
</div> </div>
@@ -58,7 +78,7 @@
{% else %} {% else %}
<!-- Nozzle Temperature Card (single-nozzle printers) --> <!-- Nozzle Temperature Card (single-nozzle printers) -->
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-warning"> <div class="card infra-card-warning h-100">
<div class="card-body"> <div class="card-body">
<div class="d-flex justify-content-between align-items-start"> <div class="d-flex justify-content-between align-items-start">
<div> <div>
@@ -74,7 +94,7 @@
<!-- Bed Temperature Card --> <!-- Bed Temperature Card -->
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-danger"> <div class="card infra-card-danger h-100">
<div class="card-body"> <div class="card-body">
<div class="d-flex justify-content-between align-items-start"> <div class="d-flex justify-content-between align-items-start">
<div> <div>
@@ -89,7 +109,7 @@
<!-- Print Progress Card --> <!-- Print Progress Card -->
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-info"> <div class="card infra-card-info h-100">
<div class="card-body"> <div class="card-body">
<div class="d-flex justify-content-between align-items-start"> <div class="d-flex justify-content-between align-items-start">
<div> <div>
@@ -104,7 +124,7 @@
<!-- Chamber Light Card --> <!-- Chamber Light Card -->
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 col-md-6 col-lg-3">
<div class="card {% if stats.chamber_light == 'on' %}infra-card-success{% else %}infra-card-secondary{% endif %}"> <div class="card h-100 {% if stats.chamber_light == 'on' %}infra-card-success{% else %}infra-card-secondary{% endif %}">
<div class="card-body"> <div class="card-body">
<div class="d-flex justify-content-between align-items-start"> <div class="d-flex justify-content-between align-items-start">
<div> <div>
@@ -184,9 +204,29 @@
</div> </div>
<div class="card-body"> <div class="card-body">
{% if stats.filaments %} {% if stats.filaments %}
{% if stats.ams_units|length > 1 %}
<div class="ams-filter-pills mb-3" id="amsFilterPills">
<button type="button" class="btn ams-filter-pill active" data-ams-filter="all">All</button>
{% for unit in stats.ams_units %}
<button type="button" class="btn ams-filter-pill ams-badge-{{ unit.ams_type|slugify }}" data-ams-filter="{{ unit.ams_unit_id }}">{{ unit.ams_type|default:"AMS" }}</button>
{% endfor %}
</div>
{% endif %}
<div class="ams-groups">
{% for group in stats.ams_groups %}
<div class="ams-group ams-badge-bg-{{ group.ams_type|slugify }} {% if group.filaments|length > 1 %}ams-group--wide{% else %}ams-group--compact{% endif %}" data-ams-unit-id="{{ group.unit_id }}">
<div class="ams-group-header d-flex justify-content-between align-items-center mb-2">
<strong class="small">{{ group.label }}</strong>
{% if group.humidity is not None or group.temp is not None %}
<span class="small text-body-secondary">
{% if group.humidity is not None %}{{ group.humidity }}%RH{% endif %}
{% if group.temp is not None %}&middot; {{ group.temp }}&deg;C{% endif %}
</span>
{% endif %}
</div>
<div class="row g-3"> <div class="row g-3">
{% for filament in stats.filaments %} {% for filament in group.filaments %}
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 {% if group.filaments|length > 1 %}col-md-6 col-lg-3{% endif %}">
<div class="card filament-card" data-filament-color="{{ filament.color|slice:':6' }}"{% if filament.is_transparent %} data-filament-transparent="true"{% endif %}> <div class="card filament-card" data-filament-color="{{ filament.color|slice:':6' }}"{% if filament.is_transparent %} data-filament-transparent="true"{% endif %}>
<div class="card-body"> <div class="card-body">
<div class="d-flex justify-content-between align-items-center mb-2"> <div class="d-flex justify-content-between align-items-center mb-2">
@@ -210,8 +250,13 @@
</div> </div>
</div> </div>
{% endfor %} {% endfor %}
</div>
</div>
{% endfor %}
</div>
{% if stats.external_spool.type %} {% if stats.external_spool.type %}
<div class="row g-3 mt-1">
<div class="col-12 col-md-6 col-lg-3"> <div class="col-12 col-md-6 col-lg-3">
<div class="card filament-card" data-filament-color="{{ stats.external_spool.color|slice:':6' }}"> <div class="card filament-card" data-filament-color="{{ stats.external_spool.color|slice:':6' }}">
<div class="card-body"> <div class="card-body">
@@ -227,8 +272,8 @@
</div> </div>
</div> </div>
</div> </div>
{% endif %}
</div> </div>
{% endif %}
{% else %} {% else %}
<p class="text-body-secondary">No filament data available</p> <p class="text-body-secondary">No filament data available</p>
{% endif %} {% endif %}
@@ -237,6 +282,64 @@
</div> </div>
</div> </div>
<!-- Hotends Section (Vortek rack + any plain/non-inductive nozzles) -->
{% if stats.hotends or stats.nozzle_positions %}
<div class="row mb-4">
<div class="col-12">
<div class="card">
<div class="card-header">
<h5>Hotends</h5>
</div>
<div class="card-body">
<div class="row g-3">
{% for hotend in stats.hotends %}
<div class="col-12 col-md-6 col-lg-3">
<div class="card filament-card" data-filament-color="{{ hotend.last_color|default:'888888' }}">
<div class="card-body">
<div class="d-flex justify-content-between align-items-center mb-2">
<h6 class="mb-0">
{% if hotend.is_toolhead %}Toolhead{% elif hotend.slot_number %}Slot {{ hotend.slot_number }}{% else %}Rack{% endif %}
</h6>
{% if hotend.is_toolhead %}<span class="badge filament-badge">Toolhead</span>{% endif %}
</div>
<p class="mb-1 small text-body-secondary">SN {{ hotend.serial_number }}</p>
<p class="mb-1 small"><strong>{{ hotend.nozzle_type }}</strong>{% if hotend.diameter %} &middot; {{ hotend.diameter }}mm{% endif %}</p>
{% if hotend.last_filament_profile_id %}<p class="mb-1 small text-body-secondary">Last: {{ hotend.last_filament_profile_id }}</p>{% endif %}
<div class="d-flex justify-content-between align-items-center mb-1">
<span class="small">Used time</span>
<span class="small">{{ hotend.used_time_display }}</span>
</div>
<div class="d-flex justify-content-between align-items-center mb-2">
<span class="small">Wear</span>
<span class="badge filament-badge">{{ hotend.wear_percent|floatformat:0 }}%</span>
</div>
<div class="progress" style="height: 10px; background-color: rgba(0,0,0,0.1);">
<div class="progress-bar filament-progress" role="progressbar" style="width: {{ hotend.wear_percent }}%;" aria-valuenow="{{ hotend.wear_percent }}" aria-valuemin="0" aria-valuemax="100"></div>
</div>
</div>
</div>
</div>
{% endfor %}
{% for nozzle in stats.nozzle_positions %}
<div class="col-12 col-md-6 col-lg-3">
<div class="card">
<div class="card-body">
<div class="d-flex justify-content-between align-items-center mb-2">
<h6 class="mb-0">{% if nozzle.is_toolhead %}Toolhead{% else %}Fixed Nozzle{% endif %}</h6>
</div>
<p class="mb-1 small"><strong>{{ nozzle.nozzle_type }}</strong>{% if nozzle.diameter %} &middot; {{ nozzle.diameter }}mm{% endif %}</p>
<p class="mb-0 small text-body-secondary">No induction chip data</p>
</div>
</div>
</div>
{% endfor %}
</div>
</div>
</div>
</div>
</div>
{% endif %}
<!-- Date/Time Filter Controls --> <!-- Date/Time Filter Controls -->
{% if not is_basic_user %} {% if not is_basic_user %}
<div class="row mb-4"> <div class="row mb-4">
@@ -423,12 +526,12 @@
<script src="https://cdn.jsdelivr.net/npm/chartjs-plugin-annotation@3.0.1"></script> <script src="https://cdn.jsdelivr.net/npm/chartjs-plugin-annotation@3.0.1"></script>
<script src="{% static 'bambu_run/js/printer_charts.js' %}"></script> <script src="{% static 'bambu_run/js/printer_charts.js' %}"></script>
<script src="{% static 'bambu_run/js/printer_charts_control.js' %}"></script> <script src="{% static 'bambu_run/js/printer_charts_control.js' %}"></script>
{% if not is_basic_user %} {% if not is_basic_user and printer_device %}
<div id="printerApiUrl" data-url="{% url 'bambu_run:printer_api' %}" style="display: none;"></div> <div id="printerApiUrl" data-url="{% url 'bambu_run:printer_api' pk=printer_device.pk %}" style="display: none;"></div>
<script> <script>
document.addEventListener('DOMContentLoaded', function() { document.addEventListener('DOMContentLoaded', function() {
const printerData = {{ printer_data_json|safe }}; const printerData = {{ printer_data_json|safe }};
const apiUrl = '{% url "bambu_run:printer_api" %}'; const apiUrl = '{% url "bambu_run:printer_api" pk=printer_device.pk %}';
initPrinterCharts(printerData, apiUrl); initPrinterCharts(printerData, apiUrl);
// Add project markers if they exist // Add project markers if they exist
@@ -453,4 +556,27 @@
}); });
</script> </script>
{% endif %} {% endif %}
<script>
document.addEventListener('DOMContentLoaded', function() {
const pillsContainer = document.getElementById('amsFilterPills');
if (!pillsContainer) return;
const items = document.querySelectorAll('.ams-groups .ams-group');
pillsContainer.addEventListener('click', function(e) {
const pill = e.target.closest('.ams-filter-pill');
if (!pill) return;
pillsContainer.querySelectorAll('.ams-filter-pill').forEach(function(p) {
p.classList.remove('active');
});
pill.classList.add('active');
const filter = pill.dataset.amsFilter;
items.forEach(function(item) {
const show = filter === 'all' || item.dataset.amsUnitId === filter;
item.classList.toggle('d-none', !show);
});
});
});
</script>
{% endblock %} {% endblock %}

View File

@@ -5,7 +5,9 @@ app_name = "bambu_run"
urlpatterns = [ urlpatterns = [
path("", views.PrinterDashboardView.as_view(), name="printer_dashboard"), path("", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
path("printer/<int:pk>/", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
path("api/printer/", views.PrinterDataAPIView.as_view(), name="printer_api"), path("api/printer/", views.PrinterDataAPIView.as_view(), name="printer_api"),
path("api/printer/<int:pk>/", views.PrinterDataAPIView.as_view(), name="printer_api"),
# Filament Inventory routes # Filament Inventory routes
path("filaments/", views.FilamentListView.as_view(), name="filament_list"), path("filaments/", views.FilamentListView.as_view(), name="filament_list"),

View File

@@ -1,8 +1,9 @@
from datetime import timedelta, datetime from datetime import timedelta, datetime
from django.views.generic import TemplateView, View, ListView, CreateView, UpdateView, DetailView, DeleteView from django.views.generic import TemplateView, View, ListView, CreateView, UpdateView, DetailView, DeleteView
from django.contrib.auth.mixins import LoginRequiredMixin from django.contrib.auth.mixins import LoginRequiredMixin
from django.shortcuts import get_object_or_404
from django.utils import timezone from django.utils import timezone
from django.http import JsonResponse from django.http import Http404, JsonResponse
from django.urls import reverse_lazy from django.urls import reverse_lazy
from django.contrib import messages from django.contrib import messages
from django.db.models import Q, Sum from django.db.models import Q, Sum
@@ -10,7 +11,7 @@ import json
import zoneinfo import zoneinfo
from .conf import app_settings from .conf import app_settings
from .models import Printer, PrinterMetrics, Filament, FilamentColor, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage from .models import Printer, PrinterMetrics, Filament, FilamentColor, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage, Hotend
from .forms import FilamentForm, FilamentColorForm, FilamentTypeForm from .forms import FilamentForm, FilamentColorForm, FilamentTypeForm
_METRICS_API_FIELDS = [ _METRICS_API_FIELDS = [
@@ -26,6 +27,17 @@ _METRICS_API_FIELDS = [
_MAX_CHART_POINTS = 3000 _MAX_CHART_POINTS = 3000
def resolve_printer_from_request(pk):
"""Resolve which Printer a dashboard/API view should show.
`pk` given (URL kwarg) -> that exact printer, 404 if missing/inactive.
`pk` omitted -> first active printer (today's single-printer default behavior).
"""
if pk is not None:
return get_object_or_404(Printer, pk=pk, is_active=True)
return Printer.objects.filter(is_active=True).first()
class PrinterDashboardView(LoginRequiredMixin, TemplateView): class PrinterDashboardView(LoginRequiredMixin, TemplateView):
template_name = "bambu_run/printer_dashboard.html" template_name = "bambu_run/printer_dashboard.html"
@@ -38,13 +50,20 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
context = super().get_context_data(**kwargs) context = super().get_context_data(**kwargs)
context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE
all_printers = Printer.objects.filter(is_active=True)
context["all_printers"] = all_printers
# Shown even with a single printer — hints that multi-printer support exists.
context["show_printer_switcher"] = all_printers.exists()
try: try:
printer_device = Printer.objects.filter(is_active=True).first() printer_device = resolve_printer_from_request(self.kwargs.get("pk"))
if not printer_device: if not printer_device:
context["error"] = ( context["error"] = (
"No 3D printer device found. Please run bambu_collector first." "No 3D printer device found. Please run bambu_collector first."
) )
return context return context
except Http404:
raise
except Exception as e: except Exception as e:
context["error"] = f"Error loading printer device: {str(e)}" context["error"] = f"Error loading printer device: {str(e)}"
return context return context
@@ -129,6 +148,8 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
'brand': snapshot.sub_type or 'Unknown', 'brand': snapshot.sub_type or 'Unknown',
'color': snapshot.color or 'FFFFFFFF', 'color': snapshot.color or 'FFFFFFFF',
'remain_percent': snapshot.remain_percent or 0, 'remain_percent': snapshot.remain_percent or 0,
'ams_unit_id': snapshot.ams_unit_id,
'ams_type': snapshot.ams_type or '',
} }
if snapshot.filament: if snapshot.filament:
filament_dict['color_name'] = snapshot.filament.color filament_dict['color_name'] = snapshot.filament.color
@@ -138,6 +159,37 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
except Exception: except Exception:
filaments_list = [] filaments_list = []
# Distinct AMS units represented in this snapshot, for the unit
# filter/badges in the template. Sort numeric unit ids first
# (AMS / AMS 2 Pro), HT (id 128 / bit 0x80 set) last.
seen_units = {}
for f in filaments_list:
uid = f.get('ams_unit_id')
if uid is not None and uid not in seen_units:
seen_units[uid] = f.get('ams_type') or ''
ams_units_list = [
{'ams_unit_id': uid, 'ams_type': label}
for uid, label in sorted(seen_units.items())
]
# Group trays by physical AMS unit for the panel-style dashboard layout —
# one tinted panel per unit, full-width for multi-slot units (AMS/AMS 2 Pro),
# compact for single-slot units (AMS HT) so several can flow side-by-side.
units_meta = {
u.get('unit_id'): u for u in (latest_metric.ams_units or [])
}
ams_groups = []
for uid, label in sorted(seen_units.items()):
unit_meta = units_meta.get(str(uid), {})
ams_groups.append({
'unit_id': uid,
'ams_type': label,
'label': f"{label or 'AMS'} (Unit {uid})",
'humidity': unit_meta.get('humidity'),
'temp': unit_meta.get('temp'),
'filaments': [f for f in filaments_list if f.get('ams_unit_id') == uid],
})
subtask_name = latest_metric.subtask_name or "No active print" subtask_name = latest_metric.subtask_name or "No active print"
# Look up active PrintJob for a better display name (cloud design_title) # Look up active PrintJob for a better display name (cloud design_title)
job_display_name = subtask_name job_display_name = subtask_name
@@ -177,6 +229,21 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
"ams_temp": float(latest_metric.ams_temp) if latest_metric.ams_temp else None, "ams_temp": float(latest_metric.ams_temp) if latest_metric.ams_temp else None,
"ams_humidity": latest_metric.ams_humidity, "ams_humidity": latest_metric.ams_humidity,
"filaments": filaments_list, "filaments": filaments_list,
"ams_units": ams_units_list,
"ams_groups": ams_groups,
"hotends": list(
Hotend.objects.filter(printer=printer_device)
.order_by('-is_toolhead', 'slot_number', 'serial_number')
),
# Nozzle positions with no induction chip (no stable serial number to
# key a Hotend registry row on, e.g. H2C's fixed left nozzle) — shown
# read-only from the latest poll, not persisted/historical. Entries with
# no readable type/diameter at all (i.e. genuinely nothing there) are
# dropped rather than shown as an empty placeholder.
"nozzle_positions": [
h for h in (latest_metric.nozzle_info or [])
if h.get('is_empty') and (h.get('nozzle_type') or h.get('diameter'))
],
"external_spool": latest_metric.external_spool or {}, "external_spool": latest_metric.external_spool or {},
"timestamp": latest_metric.timestamp.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S"), "timestamp": latest_metric.timestamp.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S"),
} }
@@ -259,15 +326,19 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
for snapshot in snapshots: for snapshot in snapshots:
tray_id = snapshot.tray_id tray_id = snapshot.tray_id
ams_unit_id = snapshot.ams_unit_id
ams_type = snapshot.ams_type or ''
fil_type = snapshot.type or 'Unknown' fil_type = snapshot.type or 'Unknown'
fil_sub_type = snapshot.sub_type or 'Unknown' fil_sub_type = snapshot.sub_type or 'Unknown'
fil_color = snapshot.color or 'FFFFFFFF' fil_color = snapshot.color or 'FFFFFFFF'
unique_key = f"{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}" unique_key = f"{ams_unit_id}_{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}"
if unique_key not in filament_data: if unique_key not in filament_data:
filament_data[unique_key] = { filament_data[unique_key] = {
'tray_id': tray_id, 'tray_id': tray_id,
'ams_unit_id': ams_unit_id,
'ams_type': ams_type,
'type': fil_type, 'type': fil_type,
'brand': fil_sub_type, 'brand': fil_sub_type,
'color': fil_color, 'color': fil_color,
@@ -304,13 +375,18 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
class PrinterDataAPIView(LoginRequiredMixin, View): class PrinterDataAPIView(LoginRequiredMixin, View):
"""API endpoint for dynamic printer chart updates""" """API endpoint for dynamic printer chart updates"""
def get(self, request): def get(self, request, pk=None):
start_date = request.GET.get("start_date") start_date = request.GET.get("start_date")
end_date = request.GET.get("end_date") end_date = request.GET.get("end_date")
start_time = request.GET.get("start_time", "00:00") start_time = request.GET.get("start_time", "00:00")
end_time = request.GET.get("end_time", "23:59") end_time = request.GET.get("end_time", "23:59")
try: try:
if pk is not None:
printer_device = Printer.objects.filter(pk=pk, is_active=True).first()
if not printer_device:
return JsonResponse({"error": "Printer not found"}, status=404)
else:
printer_device = Printer.objects.filter(is_active=True).first() printer_device = Printer.objects.filter(is_active=True).first()
if not printer_device: if not printer_device:
return JsonResponse({"error": "No printer device found"}, status=404) return JsonResponse({"error": "No printer device found"}, status=404)

View File

@@ -62,3 +62,7 @@ bambu_run = [
"templates/bambu_run/*.html", "templates/bambu_run/*.html",
"static/bambu_run/**/*", "static/bambu_run/**/*",
] ]
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "tests.settings"
python_files = "test_*.py"

0
tests/__init__.py Normal file
View File

40
tests/settings.py Normal file
View File

@@ -0,0 +1,40 @@
"""Minimal Django settings for running bambu_run's pytest suite (in-memory SQLite)."""
SECRET_KEY = "test-secret-key"
INSTALLED_APPS = [
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"bambu_run",
]
MIDDLEWARE = [
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
]
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
],
},
},
]
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": ":memory:",
}
}
USE_TZ = True
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
ROOT_URLCONF = "tests.urls"

View File

@@ -0,0 +1,25 @@
import pytest
from bambu_run.models import ams_type_from_info
@pytest.mark.parametrize(
"info_code,expected",
[
# Real-world 8-char info codes captured from a live H2C with
# AMS 2 Pro (unit 0), AMS (unit 1), AMS HT (unit 128).
("10001003", "AMS 2 Pro"),
("10001001", "AMS"),
("11002104", "AMS HT"),
# Bare 4-digit codes (original assumption) still resolve.
("1001", "AMS"),
("1003", "AMS 2 Pro"),
("2104", "AMS HT"),
# Unknown/missing codes resolve to empty string, not an error.
("99999999", ""),
("", ""),
(None, ""),
],
)
def test_ams_type_from_info(info_code, expected):
assert ams_type_from_info(info_code) == expected

68
tests/test_diagnostics.py Normal file
View File

@@ -0,0 +1,68 @@
import pytest
from bambu_run.diagnostics import redact_diagnostics, build_diagnostics_report
def test_redacts_password_and_token_like_keys():
data = {"BAMBU_PASSWORD": "hunter2", "access_token": "abc123", "ok": "fine"}
redacted = redact_diagnostics(data)
assert redacted["BAMBU_PASSWORD"] == "***REDACTED***"
assert redacted["access_token"] == "***REDACTED***"
assert redacted["ok"] == "fine"
def test_masks_known_identifier_keys_partially():
data = {"dev_id": "31B8BP592601478", "tray_uuid": "EE37828FA8844DE1AB12"}
redacted = redact_diagnostics(data)
assert redacted["dev_id"] == "31B8...1478"
assert redacted["tray_uuid"] == "EE37...AB12"
def test_short_identifier_values_fully_masked():
data = {"dev_id": "short"}
redacted = redact_diagnostics(data)
assert redacted["dev_id"] == "***"
def test_recurses_into_nested_structures():
data = {"devices": [{"dev_id": "31B8BP592601478", "name": "RNL-H2C"}]}
redacted = redact_diagnostics(data)
assert redacted["devices"][0]["dev_id"] == "31B8...1478"
assert redacted["devices"][0]["name"] == "RNL-H2C"
def test_no_redact_passthrough_keeps_original_values():
data = {"dev_id": "31B8BP592601478", "BAMBU_PASSWORD": "hunter2"}
result = redact_diagnostics(data, redact=False)
assert result == data
def test_build_diagnostics_report_structure():
devices = [{"dev_id": "SERIAL-A", "name": "Printer A", "dev_product_name": "H2C"}]
raw_payloads = {"SERIAL-A": {"device": {"extruder": {"info": []}}}}
report = build_diagnostics_report(devices, raw_payloads)
assert report["device_count"] == 1
assert "generated_at" in report
assert report["devices"][0]["device_info"]["dev_id"] == "SERIAL-A"
assert report["devices"][0]["raw_mqtt_payload"] == {"device": {"extruder": {"info": []}}}
def test_build_diagnostics_report_handles_missing_payload():
devices = [{"dev_id": "SERIAL-A", "name": "Printer A"}]
report = build_diagnostics_report(devices, raw_payloads={})
assert report["devices"][0]["raw_mqtt_payload"] is None
assert report["devices"][0]["note"] == "No MQTT data received within the listen window."

View File

@@ -0,0 +1,180 @@
import pytest
from decimal import Decimal
from django.urls import reverse
from django.utils import timezone
from bambu_run.models import Printer, PrinterMetrics, FilamentSnapshot
@pytest.fixture
def logged_in_client(client, django_user_model):
user = django_user_model.objects.create_user(username="tester", password="pw")
client.force_login(user)
return client
@pytest.mark.django_db
def test_dashboard_filaments_carry_ams_unit_info(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
type="PLA", remain_percent=80,
)
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
type="PA-CF", remain_percent=50,
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
filaments = resp.context["stats"]["filaments"]
assert len(filaments) == 2
units = {(f["ams_unit_id"], f["ams_type"]) for f in filaments}
assert units == {(0, "AMS"), (128, "AMS HT")}
ams_units = resp.context["stats"]["ams_units"]
assert ams_units == [
{"ams_unit_id": 0, "ams_type": "AMS"},
{"ams_unit_id": 128, "ams_type": "AMS HT"},
]
@pytest.mark.django_db
def test_filament_timeline_keeps_same_tray_id_units_separate(logged_in_client):
from bambu_run.views import PrinterDashboardView
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
type="PLA", sub_type="PLA Basic", color="FF0000", remain_percent=80,
)
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
type="PLA", sub_type="PLA Basic", color="FF0000", remain_percent=50,
)
view = PrinterDashboardView()
timeline = view._prepare_filament_timeline(PrinterMetrics.objects.filter(pk=metric.pk))
assert len(timeline) == 2
@pytest.mark.django_db
def test_dashboard_renders_unit_pills_and_badges_with_multiple_units(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
type="PLA", color="FF0000FF", remain_percent=80,
)
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
type="PA-CF", color="00FF00FF", remain_percent=50,
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
assert resp.status_code == 200
html = resp.content.decode()
assert "ams-filter-pills" in html
assert "ams-badge-ams" in html
assert "ams-badge-ams-ht" in html
assert 'data-ams-unit-id="0"' in html
assert 'data-ams-unit-id="128"' in html
@pytest.mark.django_db
def test_dashboard_groups_filaments_by_ams_unit(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
metric = PrinterMetrics.objects.create(
device=printer, timestamp=timezone.now(),
ams_units=[
{"unit_id": "0", "ams_type": "AMS 2 Pro", "humidity": 5, "temp": 22.5},
{"unit_id": "128", "ams_type": "AMS HT", "humidity": 8, "temp": 60.0},
],
)
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS 2 Pro",
type="ABS", remain_percent=80,
)
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=1, ams_unit_id=0, ams_type="AMS 2 Pro",
type="ABS", remain_percent=60,
)
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
type="PA-CF", remain_percent=50,
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
groups = resp.context["stats"]["ams_groups"]
assert len(groups) == 2
ams2pro_group, ht_group = groups
assert ams2pro_group["unit_id"] == 0
assert ams2pro_group["label"] == "AMS 2 Pro (Unit 0)"
assert ams2pro_group["humidity"] == 5
assert ams2pro_group["temp"] == 22.5
assert len(ams2pro_group["filaments"]) == 2
assert ht_group["unit_id"] == 128
assert ht_group["label"] == "AMS HT (Unit 128)"
assert ht_group["humidity"] == 8
assert len(ht_group["filaments"]) == 1
@pytest.mark.django_db
def test_dashboard_renders_wide_and_compact_panels(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
metric = PrinterMetrics.objects.create(
device=printer, timestamp=timezone.now(),
ams_units=[
{"unit_id": "0", "ams_type": "AMS 2 Pro", "humidity": 5, "temp": 22.5},
{"unit_id": "128", "ams_type": "AMS HT", "humidity": 8, "temp": 60.0},
],
)
for tray_id in range(4):
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=tray_id, ams_unit_id=0, ams_type="AMS 2 Pro",
type="ABS", remain_percent=80,
)
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=128, ams_type="AMS HT",
type="PA-CF", remain_percent=50,
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
html = resp.content.decode()
assert "ams-group--wide" in html
assert "ams-group--compact" in html
assert "AMS 2 Pro (Unit 0)" in html
assert "AMS HT (Unit 128)" in html
@pytest.mark.django_db
def test_dashboard_hides_unit_pills_with_single_unit(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
metric = PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
FilamentSnapshot.objects.create(
printer_metric=metric, tray_id=0, ams_unit_id=0, ams_type="AMS",
type="PLA", color="FF0000FF", remain_percent=80,
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
assert resp.status_code == 200
assert "ams-filter-pills" not in resp.content.decode()

View File

@@ -0,0 +1,121 @@
import pytest
from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
from bambu_run.models import Hotend, HotendSnapshot, PrinterMetrics
class FakeClient:
"""Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
def __init__(self, snapshots):
self._snapshots = snapshots
self._index = 0
self._client = None
def get_snapshot(self):
snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
self._index += 1
return snap
def make_session(device_id, name, snapshots):
printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
def hotends_snapshot(used_time=11472, wear=100.0):
return {
"gcode_state": "IDLE",
"hotends": [
{
"raw_id": 21, "serial_number": "20D06A5B2918952", "nozzle_type": "HS01",
"diameter": 0.4, "fila_id": "GFA01", "color": "FFFFFF",
"used_time_seconds": used_time, "wear_percent": wear, "stat": 0,
"is_toolhead": False, "is_empty": False, "slot_number": 6,
},
{
"raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01",
"diameter": 0.4, "fila_id": "", "color": None,
"used_time_seconds": 0, "wear_percent": 0.0, "stat": 0,
"is_toolhead": False, "is_empty": True, "slot_number": None,
},
{
"raw_id": 0, "serial_number": "20D06A5C0426280", "nozzle_type": "HS01",
"diameter": 0.4, "fila_id": "GFA00", "color": "FEC600",
"used_time_seconds": 93490, "wear_percent": 100.0, "stat": 0,
"is_toolhead": True, "is_empty": False, "slot_number": None,
},
],
}
@pytest.mark.django_db
def test_first_poll_creates_one_hotend_per_non_empty_entry():
session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
hotends = Hotend.objects.filter(printer=session.printer)
assert hotends.count() == 2 # empty bay (sn="N/A") skipped
rack = hotends.get(serial_number="20D06A5B2918952")
assert rack.raw_id == 21
assert rack.slot_number == 6
assert rack.is_toolhead is False
assert rack.used_time_seconds == 11472
assert rack.wear_percent == 100.0
assert rack.nozzle_type == "HS01"
assert rack.last_filament_profile_id == "GFA01"
assert rack.last_color == "FFFFFF"
toolhead = hotends.get(serial_number="20D06A5C0426280")
assert toolhead.is_toolhead is True
assert toolhead.slot_number is None
@pytest.mark.django_db
def test_first_poll_creates_one_snapshot_per_non_empty_hotend():
session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
metric = PrinterMetrics.objects.get(device=session.printer)
assert HotendSnapshot.objects.filter(printer_metric=metric).count() == 2
@pytest.mark.django_db
def test_collector_persists_raw_nozzle_info_including_non_inductive_entries():
session = make_session("SERIAL-A", "Printer A", [hotends_snapshot()])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
metric = PrinterMetrics.objects.get(device=session.printer)
assert len(metric.nozzle_info) == 3 # all entries, including the empty/non-inductive one
serials = {h["serial_number"] for h in metric.nozzle_info}
assert serials == {"20D06A5B2918952", "N/A", "20D06A5C0426280"}
@pytest.mark.django_db
def test_second_poll_updates_existing_hotend_instead_of_duplicating():
session = make_session(
"SERIAL-A", "Printer A",
[hotends_snapshot(used_time=11472, wear=100.0), hotends_snapshot(used_time=11500, wear=100.0)],
)
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
cmd._collect_printer_data(session)
hotends = Hotend.objects.filter(printer=session.printer, serial_number="20D06A5B2918952")
assert hotends.count() == 1
assert hotends.first().used_time_seconds == 11500
snapshots = HotendSnapshot.objects.filter(hotend=hotends.first())
assert snapshots.count() == 2

View File

@@ -0,0 +1,128 @@
import pytest
from django.urls import reverse
from django.utils import timezone
from bambu_run.models import Printer, PrinterMetrics, Hotend
@pytest.fixture
def logged_in_client(client, django_user_model):
user = django_user_model.objects.create_user(username="tester", password="pw")
client.force_login(user)
return client
@pytest.mark.django_db
def test_dashboard_context_includes_hotends_toolhead_first(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
Hotend.objects.create(
printer=printer, serial_number="RACK-SN", raw_id=16, slot_number=1,
is_toolhead=False, nozzle_type="HS01", used_time_seconds=3600, wear_percent=50,
)
Hotend.objects.create(
printer=printer, serial_number="TOOLHEAD-SN", raw_id=0, slot_number=None,
is_toolhead=True, nozzle_type="HS01", used_time_seconds=7200, wear_percent=80,
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
hotends = resp.context["stats"]["hotends"]
assert len(hotends) == 2
assert hotends[0].serial_number == "TOOLHEAD-SN"
assert hotends[1].serial_number == "RACK-SN"
@pytest.mark.django_db
def test_dashboard_context_includes_non_inductive_nozzle_positions(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
PrinterMetrics.objects.create(
device=printer, timestamp=timezone.now(),
nozzle_info=[
{
"raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01", "diameter": 0.4,
"fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0,
"stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None,
},
],
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
positions = resp.context["stats"]["nozzle_positions"]
assert len(positions) == 1
assert positions[0]["nozzle_type"] == "HS01"
@pytest.mark.django_db
def test_dashboard_omits_nozzle_positions_with_no_readable_data(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
PrinterMetrics.objects.create(
device=printer, timestamp=timezone.now(),
nozzle_info=[
{
"raw_id": 1, "serial_number": "N/A", "nozzle_type": "", "diameter": 0,
"fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0,
"stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None,
},
],
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
assert resp.context["stats"]["nozzle_positions"] == []
assert "<h5>Hotends</h5>" not in resp.content.decode()
@pytest.mark.django_db
def test_dashboard_renders_nozzle_position_without_serial_or_wear(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
PrinterMetrics.objects.create(
device=printer, timestamp=timezone.now(),
nozzle_info=[
{
"raw_id": 1, "serial_number": "N/A", "nozzle_type": "HS01", "diameter": 0.4,
"fila_id": "", "color": None, "used_time_seconds": 0, "wear_percent": 0.0,
"stat": 0, "is_toolhead": False, "is_empty": True, "slot_number": None,
},
],
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
html = resp.content.decode()
assert "Hotends" in html
assert "HS01" in html
assert "SN: N/A" not in html
assert "SN N/A" not in html
@pytest.mark.django_db
def test_dashboard_renders_hotends_card(logged_in_client):
printer = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
PrinterMetrics.objects.create(device=printer, timestamp=timezone.now())
Hotend.objects.create(
printer=printer, serial_number="RACK-SN", raw_id=18, slot_number=3,
is_toolhead=False, nozzle_type="HS01", diameter=0.4,
used_time_seconds=3661, wear_percent=50, last_filament_profile_id="GFA01",
last_color="DE4343",
)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer.pk})
)
html = resp.content.decode()
assert "Hotends" in html
assert "RACK-SN" in html
assert "Slot 3" in html

View File

@@ -0,0 +1,100 @@
from bambu_run.mqtt_client import PrinterState
def real_nozzle_payload():
"""Real captured device.nozzle payload from a live H2C with a Vortek rack
(1x AMS, 1x AMS 2 Pro, 1x AMS HT physically connected — unrelated here).
SN/used-time cross-checked against the user's Bambu Studio Hotends Info table."""
return {
"exist": 3997699,
"src_id": 17,
"tar_id": 17,
"state": 0,
"info": [
{"id": 21, "sn": "20D06A5B2918952", "type": "HS01", "diameter": 0.4,
"fila_id": "GFA01", "color_m": "FFFFFFFF", "p_t": 11472, "wear": 128.0, "stat": 0, "tm": 350},
{"id": 1, "sn": "N/A", "type": "HS01", "diameter": 0.4,
"fila_id": "", "color_m": "00000000", "p_t": 0, "wear": 0.0, "stat": 0, "tm": 0},
{"id": 16, "sn": "20D06A5B2919219", "type": "HS01", "diameter": 0.4,
"fila_id": "GFA01", "color_m": "A3D8E1FF", "p_t": 105386, "wear": 128.0, "stat": 0, "tm": 350},
{"id": 20, "sn": "20D06A590610257", "type": "HS01", "diameter": 0.4,
"fila_id": "GFG01", "color_m": "00000000", "p_t": 81506, "wear": 128.0, "stat": 0, "tm": 350},
{"id": 18, "sn": "20D06A591506263", "type": "HS01", "diameter": 0.4,
"fila_id": "GFA01", "color_m": "DE4343FF", "p_t": 30962, "wear": 128.0, "stat": 0, "tm": 350},
{"id": 0, "sn": "20D06A5C0426280", "type": "HS01", "diameter": 0.4,
"fila_id": "GFA00", "color_m": "FEC600FF", "p_t": 93490, "wear": 128.0, "stat": 0, "tm": 350},
{"id": 19, "sn": "20D06A5C0207881", "type": "HS01", "diameter": 0.4,
"fila_id": "GFA01", "color_m": "DE4343FF", "p_t": 1430, "wear": 128.0, "stat": 0, "tm": 350},
],
}
def make_data(nozzle_payload):
return {"print": {"gcode_state": "IDLE", "device": {"nozzle": nozzle_payload}}}
def test_snapshot_includes_one_hotend_per_nozzle_info_entry():
state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
snapshot = state.get_snapshot()
assert len(snapshot["hotends"]) == 7
def test_hotend_fields_extracted_correctly():
state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
snapshot = state.get_snapshot()
by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
h = by_sn["20D06A5B2919219"]
assert h["raw_id"] == 16
assert h["nozzle_type"] == "HS01"
assert h["diameter"] == 0.4
assert h["fila_id"] == "GFA01"
assert h["color"] == "A3D8E1" # alpha stripped
assert h["used_time_seconds"] == 105386
assert h["wear_percent"] == 100.0 # 128/128*100
assert h["is_empty"] is False
def test_id_zero_is_toolhead_and_resolves_slot_number():
state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
snapshot = state.get_snapshot()
by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
toolhead = by_sn["20D06A5C0426280"]
assert toolhead["raw_id"] == 0
assert toolhead["is_toolhead"] is True
assert toolhead["slot_number"] is None # true bay address hidden while id==0 sentinel
def test_rack_bay_ids_resolve_to_slot_numbers_one_through_six():
state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
snapshot = state.get_snapshot()
by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
assert by_sn["20D06A5B2919219"]["slot_number"] == 1 # raw_id 16
assert by_sn["20D06A591506263"]["slot_number"] == 3 # raw_id 18
assert by_sn["20D06A5C0207881"]["slot_number"] == 4 # raw_id 19
assert by_sn["20D06A590610257"]["slot_number"] == 5 # raw_id 20
assert by_sn["20D06A5B2918952"]["slot_number"] == 6 # raw_id 21
def test_empty_bay_with_na_serial_is_flagged_empty():
state = PrinterState.from_mqtt_data(make_data(real_nozzle_payload()))
snapshot = state.get_snapshot()
by_sn = {h["serial_number"]: h for h in snapshot["hotends"]}
empty = by_sn["N/A"]
assert empty["is_empty"] is True
assert empty["is_toolhead"] is False
def test_snapshot_hotends_empty_list_when_no_nozzle_payload():
state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}})
snapshot = state.get_snapshot()
assert snapshot["hotends"] == []

View File

@@ -0,0 +1,104 @@
import pytest
from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
from bambu_run.models import Filament, FilamentSnapshot, FilamentUsage, PrinterMetrics
class FakeClient:
"""Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
def __init__(self, snapshots):
self._snapshots = snapshots
self._index = 0
self._client = None
def get_snapshot(self):
snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
self._index += 1
return snap
def make_session(device_id, name, snapshots):
printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
def two_unit_tray0_snapshot():
"""Two AMS units (AMS unit_id=0, AMS HT unit_id=128) both report tray_id=0,
with different filament types loaded — these must not collide."""
return {
"gcode_state": "IDLE",
"ams_units": [
{"unit_id": "0", "ams_type": "AMS", "humidity": 30, "temp": 25.0},
{"unit_id": "128", "ams_type": "AMS HT", "humidity": 20, "temp": 60.0},
],
"filaments": [
{
"tray_id": 0, "type": "PLA", "sub_type": "PLA Basic", "color": "FF0000FF",
"tray_uuid": "UUID-UNIT0-TRAY0",
"remain_percent": 80, "ams_unit_id": 0, "ams_type": "AMS",
},
{
"tray_id": 0, "type": "PA-CF", "sub_type": "PA6-CF", "color": "00FF00FF",
"tray_uuid": "UUID-UNIT128-TRAY0",
"remain_percent": 50, "ams_unit_id": 128, "ams_type": "AMS HT",
},
],
}
@pytest.mark.django_db
def test_two_ams_units_with_same_tray_id_create_distinct_snapshots():
session = make_session("SERIAL-A", "Printer A", [two_unit_tray0_snapshot()])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
metric = PrinterMetrics.objects.get(device=session.printer)
snapshots = FilamentSnapshot.objects.filter(printer_metric=metric).order_by("ams_unit_id")
assert snapshots.count() == 2
ams_snap, ht_snap = snapshots
assert ams_snap.tray_id == 0
assert ams_snap.ams_unit_id == 0
assert ams_snap.ams_type == "AMS"
assert ams_snap.type == "PLA"
assert ht_snap.tray_id == 0
assert ht_snap.ams_unit_id == 128
assert ht_snap.ams_type == "AMS HT"
assert ht_snap.type == "PA-CF"
@pytest.mark.django_db
def test_filament_usage_matches_correct_unit_when_tray_ids_collide():
start_snapshot = two_unit_tray0_snapshot()
start_snapshot.update({"gcode_state": "RUNNING", "subtask_name": "job_1", "print_percent": 1, "tray_now": "0"})
end_snapshot = two_unit_tray0_snapshot()
end_snapshot["filaments"][0]["remain_percent"] = 70 # AMS unit 0 consumed
end_snapshot["filaments"][1]["remain_percent"] = 50 # AMS HT unit 128 untouched
end_snapshot.update({"gcode_state": "FINISH", "subtask_name": "job_1", "print_percent": 100})
session = make_session("SERIAL-A", "Printer A", [start_snapshot, end_snapshot])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
cmd._collect_printer_data(session)
usages = FilamentUsage.objects.filter(print_job__device=session.printer).order_by("ams_unit_id")
# Both units reported tray_id=0 with a tracked filament loaded throughout the
# job — usage is recorded per physical unit, not collapsed into one ambiguous row.
assert usages.count() == 2
ams_usage, ht_usage = usages
assert ams_usage.ams_unit_id == 0
assert ams_usage.starting_percent == 80
assert ams_usage.ending_percent == 70
assert ht_usage.ams_unit_id == 128
assert ht_usage.starting_percent == 50
assert ht_usage.ending_percent == 50

View File

@@ -0,0 +1,90 @@
import pytest
from bambu_run.management.commands.bambu_collector import (
Command,
DeviceSession,
resolve_printer_device,
)
from bambu_run.models import PrintJob, PrinterMetrics
class FakeClient:
"""Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
def __init__(self, snapshots):
self._snapshots = snapshots
self._index = 0
self._client = None # cloud BambuClient handle used by cloud task sync
def get_snapshot(self):
snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
self._index += 1
return snap
def make_session(device_id, name, snapshots):
printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
@pytest.mark.django_db
def test_collects_metrics_against_the_correct_printer_per_session():
session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
metric_a = PrinterMetrics.objects.get(device=session_a.printer)
metric_b = PrinterMetrics.objects.get(device=session_b.printer)
assert metric_a.nozzle_temp == 200
assert metric_b.nozzle_temp == 210
@pytest.mark.django_db
def test_print_job_tracking_is_isolated_per_session():
session_a = make_session(
"SERIAL-A",
"Printer A",
[
{"gcode_state": "RUNNING", "subtask_name": "job_A", "print_percent": 10},
{"gcode_state": "FINISH", "subtask_name": "job_A", "print_percent": 100},
],
)
session_b = make_session("SERIAL-B", "Printer B", [{"gcode_state": "IDLE"}])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
cmd._collect_printer_data(session_a)
assert PrintJob.objects.filter(device=session_a.printer).count() == 1
job = PrintJob.objects.get(device=session_a.printer)
assert job.final_status == "FINISH"
assert session_a.current_print_job is None
assert PrintJob.objects.filter(device=session_b.printer).count() == 0
assert session_b.current_print_job is None
@pytest.mark.django_db
def test_one_session_error_does_not_affect_another_session():
session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
class ExplodingClient:
def get_snapshot(self):
raise RuntimeError("MQTT connection lost")
session_a.client = ExplodingClient()
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
assert session_a.error_count == 1
assert PrinterMetrics.objects.filter(device=session_b.printer).exists()

View File

@@ -0,0 +1,78 @@
import pytest
from django.urls import reverse
from bambu_run.models import Printer
@pytest.fixture
def logged_in_client(client, django_user_model):
user = django_user_model.objects.create_user(username="tester", password="pw")
client.force_login(user)
return client
@pytest.mark.django_db
def test_dashboard_with_no_printers_shows_error(logged_in_client):
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.status_code == 200
assert "error" in resp.context
@pytest.mark.django_db
def test_dashboard_defaults_to_first_active_printer(logged_in_client):
printer = Printer.objects.create(name="Only Printer", model="H2C", is_active=True)
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.context["printer_device"].pk == printer.pk
assert list(resp.context["all_printers"]) == [printer]
# Switcher shows even with a single printer, as a hint that multi-printer exists.
assert resp.context["show_printer_switcher"] is True
@pytest.mark.django_db
def test_dashboard_hides_switcher_with_zero_printers(logged_in_client):
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.context["show_printer_switcher"] is False
@pytest.mark.django_db
def test_dashboard_pk_route_shows_requested_printer(logged_in_client):
Printer.objects.create(name="Printer A", model="H2C", is_active=True)
printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer_b.pk})
)
assert resp.context["printer_device"].pk == printer_b.pk
assert resp.context["device_name"] == "Printer B"
@pytest.mark.django_db
def test_dashboard_unknown_pk_returns_404(logged_in_client):
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": 99999})
)
assert resp.status_code == 404
@pytest.mark.django_db
def test_api_pk_route_returns_only_requested_printer_data(logged_in_client):
from bambu_run.models import PrinterMetrics
from django.utils import timezone
from decimal import Decimal
printer_a = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
PrinterMetrics.objects.create(device=printer_a, timestamp=timezone.now(), nozzle_temp=Decimal("200"))
PrinterMetrics.objects.create(device=printer_b, timestamp=timezone.now(), nozzle_temp=Decimal("210"))
resp = logged_in_client.get(
reverse("bambu_run:printer_api", kwargs={"pk": printer_b.pk})
)
assert resp.status_code == 200
data = resp.json()
assert data["nozzle_temp"] == [210.0]

View File

@@ -0,0 +1,78 @@
import pytest
from bambu_run.management.commands.bambu_collector import resolve_printer_device
from bambu_run.models import Printer
@pytest.mark.django_db
def test_creates_new_printer_keyed_by_serial():
printer = resolve_printer_device(
"0309DA123456", {"name": "RNL-H2C", "dev_product_name": "H2C"}
)
assert printer.serial_number == "0309DA123456"
assert printer.name == "RNL-H2C"
assert printer.model == "H2C"
assert printer.is_active is True
@pytest.mark.django_db
def test_second_call_with_same_serial_does_not_create_duplicate():
first = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
second = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
assert first.pk == second.pk
assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1
@pytest.mark.django_db
def test_two_different_serials_create_two_printers():
a = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
b = resolve_printer_device("SERIAL-B", {"name": "Printer B", "dev_product_name": "X1C"})
assert a.pk != b.pk
assert Printer.objects.count() == 2
@pytest.mark.django_db
def test_backfills_single_legacy_printer_with_null_serial():
legacy = Printer.objects.create(
name="Bambu Lab Printer", model="Bambu Lab", manufacturer="Bambu Lab", is_active=True
)
resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
legacy.refresh_from_db()
assert resolved.pk == legacy.pk
assert legacy.serial_number == "SERIAL-A"
assert Printer.objects.count() == 1
@pytest.mark.django_db
def test_does_not_guess_when_multiple_legacy_printers_exist():
Printer.objects.create(name="Legacy 1", model="Bambu Lab")
Printer.objects.create(name="Legacy 2", model="Bambu Lab")
resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
assert resolved.serial_number == "SERIAL-A"
assert Printer.objects.count() == 3
@pytest.mark.django_db
def test_falls_back_to_generic_defaults_without_device_info():
printer = resolve_printer_device("SERIAL-A", None)
assert printer.serial_number == "SERIAL-A"
assert printer.name == "Bambu Lab Printer"
assert printer.model == "Bambu Lab"
@pytest.mark.django_db
def test_updates_name_and_model_on_existing_printer_when_changed():
resolve_printer_device("SERIAL-A", {"name": "Old Name", "dev_product_name": "H2C"})
updated = resolve_printer_device("SERIAL-A", {"name": "New Name", "dev_product_name": "H2C"})
assert updated.name == "New Name"
assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1

View File

@@ -0,0 +1,43 @@
import pytest
from bambu_run.mqtt_client import PrinterState
from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
from bambu_run.models import PrinterMetrics
def test_snapshot_includes_raw_device_payload_for_future_vortek_modeling():
raw_device = {
"extruder": {"info": [{"id": 0, "temp": 12058720}, {"id": 1, "temp": 11534560}]},
"nozzle": {"info": [{"id": 0, "diameter": 0.4}]},
}
data = {"print": {"device": raw_device, "gcode_state": "IDLE"}}
state = PrinterState.from_mqtt_data(data)
snapshot = state.get_snapshot()
assert snapshot["vortek_raw"] == raw_device
def test_snapshot_vortek_raw_defaults_to_empty_dict_when_no_device_payload():
state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}})
snapshot = state.get_snapshot()
assert snapshot["vortek_raw"] == {}
@pytest.mark.django_db
def test_collector_persists_vortek_raw_onto_printer_metrics():
printer = resolve_printer_device("SERIAL-A", {"name": "H2C", "dev_product_name": "H2C"})
class FakeClient:
def get_snapshot(self):
return {"gcode_state": "IDLE", "vortek_raw": {"extruder": {"info": []}}}
session = DeviceSession(device_id="SERIAL-A", client=FakeClient(), printer=printer)
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
metric = PrinterMetrics.objects.get(device=printer)
assert metric.vortek_raw == {"extruder": {"info": []}}

5
tests/urls.py Normal file
View File

@@ -0,0 +1,5 @@
from django.urls import include, path
urlpatterns = [
path("", include("bambu_run.urls")),
]