13 Commits

Author SHA1 Message Date
RNL
58ebdf518e Always show device dropdown and add bambu_diagnose for multi-printer troubleshooting. 2026-06-20 14:48:45 +10:00
RNL
e7bc3291b6 Initial implementation of multi-printer support. 2026-06-18 22:50:39 +10:00
github-actions[bot]
34293ce81a chore: bump version to 0.1.7 [skip ci] 2026-06-16 00:54:32 +00:00
Ben Roberts
af845e3490 Add CSRF_TRUSTED_ORIGINS env var support (#9) 2026-06-16 10:54:17 +10:00
github-actions[bot]
6aab42b03e chore: bump version to 0.1.6 [skip ci] 2026-06-13 13:26:37 +00:00
RNLgit
61d1f630d3 Merge branch 'main' of github.com:RunLit/Bambu-Run 2026-06-13 23:25:33 +10:00
RNLgit
c032745eb5 python 3.9 shall also work for old os 2026-06-13 23:25:11 +10:00
github-actions[bot]
2af3509010 chore: bump version to 0.1.5 [skip ci] 2026-05-07 05:05:19 +00:00
RNL
dd57a963ac Add H2C dual-nozzle and multi-AMS-type support
Schema (migration 0004):
- PrinterMetrics: nozzle_temp_left, nozzle_target_temp_left,
  nozzle_diameter_left, nozzle_type_left (all nullable)
- Filament: ams_unit_id (nullable int), ams_type (AMS/AMS 2 Pro/AMS HT)
- AMS_INFO_TO_TYPE map and AMS_TYPE_CHOICES on models

Parser (mqtt_client.py):
- Decode bit-packed temps from device.extruder.info[] for left/right nozzle
- Emit per-nozzle fields in get_snapshot(); legacy keys mirror right side
- AMS unit type from info code per unit dict

Collector (bambu_collector.py):
- Write left-nozzle fields to PrinterMetrics
- Set ams_unit_id + ams_type on Filament records
- Fix: poll MQTTClient.connected before pushall (not BambuPrinter._connected)
- Add 5s post-pushall wait in --once mode so response arrives before collect

Views: API and dashboard include left-nozzle series; is_dual_nozzle flag
Templates: dual-nozzle cards + chart; AMS-type badge + filter on filament list
Charts: left nozzle temp chart with conditional render
Forms: fix tray_id max=3 → max=15; add ams_unit_id, ams_type fields
2026-05-07 14:51:31 +10:00
github-actions[bot]
6fadccb527 chore: bump version to 0.1.4 [skip ci] 2026-03-29 12:16:07 +00:00
RunLit
fa90ef11b6 feat: MCP server, Bambu Cloud task sync & display name fix (#7)
* added mcp initial trail files

* timestamp use your local django timezone

* added bambu cloud task sync with correct endpoint other than py cloud api

* back fill and relink print name using cloud if there is

* use correct bump-version
2026-03-29 23:15:59 +11:00
github-actions[bot]
9a91b14593 chore: bump version to 0.1.3 [skip ci] 2026-03-29 05:10:50 +00:00
RNL
0b07221827 bump version in workflows 2026-03-29 16:09:49 +11:00
26 changed files with 1384 additions and 173 deletions

View File

@@ -20,3 +20,4 @@ BAMBU_PASSWORD=your_password
# DEBUG=True
# DJANGO_SECRET_KEY=change-me-to-a-random-string
# ALLOWED_HOSTS=localhost,127.0.0.1
# CSRF_TRUSTED_ORIGINS="https://bambu-run.example.com,http://bambu-run.example.com"

56
.github/workflows/bump-version.yml vendored Normal file
View File

@@ -0,0 +1,56 @@
name: Bump Patch Version on Merge to Main
on:
push:
branches:
- main
jobs:
bump-version:
runs-on: ubuntu-latest
# Skip if this push was itself the version bump commit (prevents infinite loop)
if: "!contains(github.event.head_commit.message, '[skip ci]')"
permissions:
contents: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
# Need full git history and ability to push
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Bump patch version in pyproject.toml
id: bump
run: |
# Read current version
CURRENT=$(grep '^version = ' pyproject.toml | sed 's/version = "\(.*\)"/\1/')
echo "Current version: $CURRENT"
# Split into parts and increment patch
MAJOR=$(echo $CURRENT | cut -d. -f1)
MINOR=$(echo $CURRENT | cut -d. -f2)
PATCH=$(echo $CURRENT | cut -d. -f3)
NEW_PATCH=$((PATCH + 1))
NEW_VERSION="$MAJOR.$MINOR.$NEW_PATCH"
echo "New version: $NEW_VERSION"
# Write back to pyproject.toml
sed -i "s/^version = \"$CURRENT\"/version = \"$NEW_VERSION\"/" pyproject.toml
# Export for later steps
echo "new_version=$NEW_VERSION" >> $GITHUB_OUTPUT
- name: Commit and push bumped version
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add pyproject.toml
git commit -m "chore: bump version to ${{ steps.bump.outputs.new_version }} [skip ci]"
git push
- name: Tag the release
run: |
git tag "v${{ steps.bump.outputs.new_version }}"
git push origin "v${{ steps.bump.outputs.new_version }}"

78
bambu_run/diagnostics.py Normal file
View File

@@ -0,0 +1,78 @@
"""
Pure helpers for the `bambu_diagnose` management command.
Kept separate from the command itself (and free of Django/network imports)
so the report-building and redaction logic can be unit-tested without
talking to the real Bambu Lab cloud or MQTT broker.
"""
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
# Keys whose values are always replaced outright, regardless of nesting depth.
_SECRET_KEY_SUBSTRINGS = ("password", "token", "secret", "access_code", "authorization")
# Keys that identify a specific physical device/spool/account — not secret,
# but identifying, so they're partially masked by default before anything
# gets pasted into a public GitHub issue.
_IDENTIFIER_KEYS = {"dev_id", "device_id", "serial_number", "tray_uuid", "tag_uid", "uid"}
def _mask_identifier(value: Any) -> Any:
if not isinstance(value, str) or len(value) <= 8:
return "***"
return f"{value[:4]}...{value[-4:]}"
def redact_diagnostics(data: Any, redact: bool = True) -> Any:
"""Recursively redact secrets and mask identifiers in a diagnostics payload.
`redact=False` returns the data unchanged — only for the reporter's own
local debugging, never for anything posted publicly.
"""
if not redact:
return data
return _redact(data)
def _redact(obj: Any) -> Any:
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
lowered = key.lower()
if any(secret in lowered for secret in _SECRET_KEY_SUBSTRINGS):
result[key] = "***REDACTED***"
elif lowered in _IDENTIFIER_KEYS:
result[key] = _mask_identifier(value)
else:
result[key] = _redact(value)
return result
if isinstance(obj, list):
return [_redact(item) for item in obj]
return obj
def build_diagnostics_report(
devices: List[Dict[str, Any]],
raw_payloads: Dict[str, Optional[Dict[str, Any]]],
) -> Dict[str, Any]:
"""Assemble the (pre-redaction) diagnostics report from discovered devices
and whatever raw MQTT payload was captured for each during the listen window.
"""
device_entries = []
for device in devices:
dev_id = device.get("dev_id")
payload = raw_payloads.get(dev_id)
entry = {
"device_info": device,
"raw_mqtt_payload": payload,
}
if payload is None:
entry["note"] = "No MQTT data received within the listen window."
device_entries.append(entry)
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"device_count": len(devices),
"devices": device_entries,
}

View File

@@ -55,7 +55,7 @@ class FilamentForm(forms.ModelForm):
'filament_type', 'type', 'sub_type', 'brand', 'color', 'color_hex', 'is_transparent',
'diameter', 'initial_weight_grams',
'remaining_percent', 'remaining_weight_grams',
'is_loaded_in_ams', 'current_tray_id',
'is_loaded_in_ams', 'current_tray_id', 'ams_unit_id', 'ams_type',
'purchase_date', 'purchase_price', 'supplier', 'notes'
]
widgets = {
@@ -87,7 +87,15 @@ class FilamentForm(forms.ModelForm):
'remaining_weight_grams': forms.NumberInput(attrs={'class': 'form-control', 'readonly': 'readonly'}),
'is_transparent': forms.CheckboxInput(attrs={'class': 'form-check-input', 'id': 'id_is_transparent'}),
'is_loaded_in_ams': forms.CheckboxInput(attrs={'class': 'form-check-input'}),
'current_tray_id': forms.NumberInput(attrs={'class': 'form-control', 'min': '0', 'max': '3'}),
'current_tray_id': forms.NumberInput(attrs={
'class': 'form-control', 'min': '0', 'max': '15',
'placeholder': '03 for AMS / AMS 2 Pro, 0 for AMS HT',
}),
'ams_unit_id': forms.NumberInput(attrs={
'class': 'form-control', 'min': '0', 'max': '255',
'placeholder': 'AMS unit id (0,1,… or 128 for AMS HT)',
}),
'ams_type': forms.Select(attrs={'class': 'form-select'}),
'purchase_date': forms.DateInput(attrs={'class': 'form-control', 'type': 'date'}),
'purchase_price': forms.NumberInput(attrs={'class': 'form-control', 'step': '0.01'}),
'supplier': forms.TextInput(attrs={'class': 'form-control'}),
@@ -106,6 +114,8 @@ class FilamentForm(forms.ModelForm):
self.fields['type'].required = False
self.fields['sub_type'].required = False
self.fields['brand'].required = False
self.fields['ams_unit_id'].required = False
self.fields['ams_type'].required = False
self._populate_color_choices()

View File

@@ -13,8 +13,9 @@ import logging
import os
import ssl
import time
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Optional
from typing import Any, Dict, Optional
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
@@ -26,6 +27,56 @@ from bambu_run.models import Printer, PrinterMetrics
logger = logging.getLogger("bambu_run.collector")
def resolve_printer_device(device_id: str, device_info: Optional[dict] = None) -> Printer:
"""Find-or-create the Printer row for a Bambu cloud device, keyed by serial number.
`device_info` is one entry from BambuClient.get_devices() (keys: name,
dev_product_name, dev_id, ...). Falls back to generic defaults when unavailable
(e.g. local-only connections that never call get_devices()).
"""
device_info = device_info or {}
name = device_info.get("name") or "Bambu Lab Printer"
model = device_info.get("dev_product_name") or "Bambu Lab"
printer = Printer.objects.filter(serial_number=device_id).first()
if printer is None:
# Upgrade path: a pre-multi-printer deployment has exactly one Printer row
# with no serial number yet. Backfill it instead of creating a duplicate.
# If there's more than one such row, we can't tell which one this device
# used to be, so don't guess — create a fresh row instead.
legacy_candidates = list(Printer.objects.filter(serial_number__isnull=True)[:2])
if len(legacy_candidates) == 1:
printer = legacy_candidates[0]
printer.serial_number = device_id
if printer is None:
printer = Printer(serial_number=device_id)
printer.name = name
printer.model = model
printer.manufacturer = "Bambu Lab"
printer.is_active = True
printer.save()
return printer
@dataclass
class DeviceSession:
"""Per-printer mutable state for one bound device in a multi-printer collector run."""
device_id: str
client: Any # BambuPrinter
printer: Printer
current_print_job: Optional[Any] = None
last_gcode_state: Optional[str] = None
last_subtask_name: Optional[str] = None
trays_used: set = field(default_factory=set)
error_count: int = 0
success_count: int = 0
mqtt_connect_errors: int = 0
class Command(BaseCommand):
"""
MQTT Poll -> PrinterMetrics -> FilamentSnapshot -> Auto-Match -> Update Filament
@@ -51,18 +102,11 @@ class Command(BaseCommand):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.printer_client = None
self.printer_device = None
self.sessions: Dict[str, DeviceSession] = {}
self._token: Optional[str] = None
self.verbose = False
self.disable_ssl_verify = False
self.error_count = 0
self.success_count = 0
self.mqtt_connect_errors = 0
self.start_time = None
self.current_print_job = None
self.last_gcode_state = None
self.last_subtask_name = None
self.trays_used = set()
def handle(self, *args, **options):
self.verbose = options["verbose"]
@@ -100,18 +144,22 @@ class Command(BaseCommand):
self._configure_logging()
try:
self._initialize_printer()
self._initialize_printers()
except Exception as e:
raise CommandError(f"Initialization failed: {e}")
self.start_time = timezone.now()
logger.info(f"Bambu Run data collector started for printer: {self.printer_device.name}")
printer_names = ", ".join(s.printer.name for s in self.sessions.values())
logger.info(f"Bambu Run data collector started for {len(self.sessions)} printer(s): {printer_names}")
logger.info(f"Collection interval: {interval} seconds")
logger.info(f"Mode: {'Single run' if run_once else 'Continuous'}")
try:
if run_once:
self._collect_printer_data()
import time as _time
_time.sleep(5)
for session in self.sessions.values():
self._collect_printer_data(session)
logger.info("Single collection completed successfully")
else:
self._run_continuous_loop(interval)
@@ -122,6 +170,24 @@ class Command(BaseCommand):
logger.exception(f"Fatal error in main loop: {e}")
raise CommandError(f"Runner failed: {e}")
def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None:
"""Send pushall once the MQTT broker connection is confirmed.
BambuPrinter._connected is set True immediately after connect(blocking=False),
before the broker handshake. Poll MQTTClient.connected (set in _on_connect)
instead, so publish() won't raise "Not connected to broker".
"""
import time as _time
deadline = _time.time() + timeout
while _time.time() < deadline:
mqtt_client = getattr(client, "_mqtt", None)
if mqtt_client is not None and getattr(mqtt_client, "connected", False):
client._mqtt.request_full_status()
logger.info("Sent MQTT pushall request")
return
_time.sleep(0.5)
logger.warning("MQTT broker connection not confirmed within %.1fs; skipping pushall", timeout)
def _configure_logging(self):
log_level = logging.DEBUG if self.verbose else logging.INFO
logger.setLevel(log_level)
@@ -135,7 +201,9 @@ class Command(BaseCommand):
handler.setFormatter(formatter)
logger.addHandler(handler)
def _initialize_printer(self):
def _initialize_printers(self):
"""Authenticate once, discover every device bound to the account, and open
one BambuPrinter (own MQTT thread) per device — all in this single process."""
from bambu_run.mqtt_client import BambuPrinter
bambu_username = os.environ.get("BAMBU_USERNAME")
@@ -149,25 +217,12 @@ class Command(BaseCommand):
"environment variables must be set"
)
logger.info("Connecting to Bambu Lab printer...")
logger.info("Authenticating with Bambu Lab cloud...")
try:
if bambu_token:
logger.info("Using saved BAMBU_TOKEN for authentication")
self.printer_client = BambuPrinter(
token=bambu_token, device_id=bambu_device_id
)
else:
logger.info("Authenticating with username/password")
self.printer_client = BambuPrinter(
username=bambu_username,
password=bambu_password,
device_id=bambu_device_id,
)
logger.info("Initiating MQTT connection...")
self.printer_client.connect(blocking=False)
logger.info("MQTT connection initiated (non-blocking)")
auth = BambuPrinter(
username=bambu_username, password=bambu_password, token=bambu_token,
)
self._token = auth._ensure_token()
except Exception as e:
if "CERTIFICATE_VERIFY_FAILED" in str(e) or "SSL" in str(e):
error_msg = (
@@ -178,56 +233,62 @@ class Command(BaseCommand):
"3. pip install --upgrade certifi\n"
)
raise CommandError(error_msg)
raise CommandError(f"Failed to initialize printer client: {e}")
raise CommandError(f"Failed to authenticate: {e}")
self.printer_device = self._ensure_printer_device_exists()
logger.info(f"Initialized for printer device: {self.printer_device}")
def _ensure_printer_device_exists(self) -> Printer:
try:
snapshot = self.printer_client.get_snapshot()
if snapshot:
device, created = Printer.objects.update_or_create(
model="Bambu Lab",
defaults={
"name": "Bambu Lab Printer",
"manufacturer": "Bambu Lab",
"is_active": True,
},
)
action = "Created" if created else "Updated"
logger.info(f"{action} printer device record: {device}")
return device
else:
logger.warning("Snapshot returned None - MQTT not connected yet")
device = Printer.objects.filter(is_active=True).first()
if device:
logger.info(f"Using existing device record: {device}")
return device
else:
device = Printer.objects.create(
name="Bambu Lab Printer",
model="Bambu Lab",
manufacturer="Bambu Lab",
is_active=True,
)
logger.info(f"Created placeholder device: {device}")
return device
except Exception as e:
logger.error(f"Error during device initialization: {e}")
device_infos = self._discover_devices(bambu_device_id)
for device_id, device_info in device_infos.items():
try:
device = Printer.objects.filter(is_active=True).first()
if device:
logger.warning(f"Using existing device record from DB: {device}")
return device
else:
raise CommandError(
"No printer device found in database and initialization failed."
)
except Printer.DoesNotExist:
raise CommandError("Failed to create or retrieve printer device.")
self._add_session(device_id, device_info)
except Exception as e:
logger.error(f"Failed to initialize printer {device_id}: {e}")
if not self.sessions:
raise CommandError("No printer sessions could be initialized")
def _discover_devices(self, explicit_device_id: Optional[str]) -> Dict[str, dict]:
"""Return {device_id: device_info} for every printer to monitor.
device_info comes from BambuClient.get_devices() (name, dev_product_name,
etc.) — empty dict when explicitly pinned to one device via BAMBU_DEVICE_ID
and the cloud listing can't be reached.
"""
from bambu_run.mqtt_client import BambuClient
try:
cloud = BambuClient(token=self._token)
devices = cloud.get_devices()
except Exception as e:
if explicit_device_id:
logger.warning(f"Could not list account devices ({e}); using BAMBU_DEVICE_ID only")
return {explicit_device_id: {}}
raise
device_infos = {d.get("dev_id"): d for d in devices if d.get("dev_id")}
if explicit_device_id:
return {explicit_device_id: device_infos.get(explicit_device_id, {})}
if not device_infos:
raise CommandError("No devices found on this account")
return device_infos
def _add_session(self, device_id: str, device_info: dict) -> "DeviceSession":
from bambu_run.mqtt_client import BambuPrinter
logger.info(f"Connecting to printer {device_id} ({device_info.get('name', 'unknown')})...")
client = BambuPrinter(token=self._token, device_id=device_id)
client.connect(blocking=False)
try:
self._request_full_status_when_ready(client)
except Exception as e:
logger.warning("pushall request skipped (non-fatal): %s", e)
printer = resolve_printer_device(device_id, device_info)
session = DeviceSession(device_id=device_id, client=client, printer=printer)
self.sessions[device_id] = session
logger.info(f"Initialized session for printer: {printer}")
return session
def _run_continuous_loop(self, interval: int):
iteration = 0
@@ -238,7 +299,8 @@ class Command(BaseCommand):
if self.verbose:
logger.debug(f"=== Iteration {iteration} ===")
self._collect_printer_data()
for session in list(self.sessions.values()):
self._collect_printer_data(session)
elapsed = time.time() - loop_start
sleep_time = max(0, interval - elapsed)
@@ -248,9 +310,28 @@ class Command(BaseCommand):
if iteration % 100 == 0:
self._print_statistics()
self._refresh_devices()
time.sleep(sleep_time)
def _refresh_devices(self):
"""Pick up printers added to the account without restarting the process."""
if os.environ.get("BAMBU_DEVICE_ID"):
return # pinned to a single explicit device — nothing to discover
try:
device_infos = self._discover_devices(None)
except Exception as e:
logger.warning(f"Device refresh skipped (non-fatal): {e}")
return
for device_id, device_info in device_infos.items():
if device_id not in self.sessions:
logger.info(f"New printer detected on account: {device_id}")
try:
self._add_session(device_id, device_info)
except Exception as e:
logger.error(f"Failed to initialize newly-detected printer {device_id}: {e}")
def _convert_mqtt_color(self, mqtt_color):
if not mqtt_color:
return None
@@ -377,6 +458,8 @@ class Command(BaseCommand):
created_by='Auto Detection',
is_loaded_in_ams=True,
current_tray_id=tray_data.get('tray_id'),
ams_unit_id=tray_data.get('ams_unit_id'),
ams_type=tray_data.get('ams_type', '') or '',
last_loaded_date=timezone.now(),
)
@@ -390,9 +473,13 @@ class Command(BaseCommand):
return filament
def _update_filament_status(self, filament, tray_id, remain_percent):
def _update_filament_status(self, filament, tray_id, remain_percent, tray_data=None):
from bambu_run.models import Filament
tray_data = tray_data or {}
ams_unit_id = tray_data.get('ams_unit_id')
ams_type_label = tray_data.get('ams_type', '') or ''
if filament.remaining_percent != remain_percent:
filament.remaining_percent = remain_percent
filament.update_remaining_weight()
@@ -400,10 +487,19 @@ class Command(BaseCommand):
if self.verbose:
logger.debug(f"Updated filament {filament}: {remain_percent}%")
if not filament.is_loaded_in_ams or filament.current_tray_id != tray_id:
previous_filament = Filament.objects.filter(
location_changed = (
not filament.is_loaded_in_ams
or filament.current_tray_id != tray_id
or (ams_unit_id is not None and filament.ams_unit_id != ams_unit_id)
)
if location_changed:
# Unload anything previously occupying THIS exact (unit, tray) slot.
unload_qs = Filament.objects.filter(
is_loaded_in_ams=True, current_tray_id=tray_id
).exclude(id=filament.id).first()
).exclude(id=filament.id)
if ams_unit_id is not None:
unload_qs = unload_qs.filter(ams_unit_id=ams_unit_id)
previous_filament = unload_qs.first()
if previous_filament:
previous_filament.is_loaded_in_ams = False
@@ -411,14 +507,21 @@ class Command(BaseCommand):
previous_filament.save()
logger.info(
f"Auto-unloaded {previous_filament} from Tray {tray_id} "
f"(replaced by {filament.brand} {filament.type} - {filament.color})"
f"(unit {ams_unit_id}; replaced by {filament.brand} {filament.type} - {filament.color})"
)
filament.is_loaded_in_ams = True
filament.current_tray_id = tray_id
if ams_unit_id is not None:
filament.ams_unit_id = ams_unit_id
if ams_type_label:
filament.ams_type = ams_type_label
filament.last_loaded_date = timezone.now()
if self.verbose:
logger.debug(f"Updated filament location: Tray {tray_id}")
logger.debug(f"Updated filament location: unit={ams_unit_id} tray={tray_id}")
elif ams_type_label and filament.ams_type != ams_type_label:
# Same slot but ams_type was previously unknown — fill it in.
filament.ams_type = ams_type_label
filament.save()
@@ -439,10 +542,13 @@ class Command(BaseCommand):
if filament:
remain_percent = tray_data.get('remain_percent')
if remain_percent is not None:
self._update_filament_status(filament, tray_id, remain_percent)
self._update_filament_status(filament, tray_id, remain_percent, tray_data)
unit_id = str(int(tray_id) // 4) if tray_id.isdigit() else None
unit_data = ams_units.get(unit_id, {})
# Locate the AMS unit this tray belongs to. Use the unit_id supplied
# by the snapshot directly (matches MQTT ams[i].id, including 128 for AMS HT)
# — the legacy `tray_id // 4` math breaks for AMS HT.
unit_id_int = tray_data.get('ams_unit_id')
unit_data = ams_units.get(str(unit_id_int)) if unit_id_int is not None else {}
FilamentSnapshot.objects.create(
printer_metric=printer_metric,
@@ -463,19 +569,19 @@ class Command(BaseCommand):
match_method=match_method
)
def _track_print_job(self, metric, snapshot):
from bambu_run.models import PrintJob, FilamentUsage
def _track_print_job(self, session, metric, snapshot):
from bambu_run.models import PrintJob
gcode_state = snapshot.get('gcode_state')
subtask_name = snapshot.get('subtask_name')
if self._is_print_starting(gcode_state, subtask_name):
if self.current_print_job:
self._finalize_print_job(metric, snapshot)
if self._is_print_starting(session, gcode_state, subtask_name):
if session.current_print_job:
self._finalize_print_job(session, metric, snapshot)
raw_task_id = snapshot.get('task_id')
self.current_print_job = PrintJob.objects.create(
device=self.printer_device,
session.current_print_job = PrintJob.objects.create(
device=session.printer,
project_name=subtask_name,
gcode_file=snapshot.get('gcode_file'),
start_time=metric.timestamp,
@@ -484,57 +590,58 @@ class Command(BaseCommand):
completion_percent=snapshot.get('print_percent', 0),
cloud_task_id_raw=int(raw_task_id) if raw_task_id else None,
)
self.trays_used = set()
logger.info(f"Print job started: {subtask_name}")
session.trays_used = set()
logger.info(f"[{session.device_id}] Print job started: {subtask_name}")
if self.current_print_job:
if session.current_print_job:
tray_now = snapshot.get('tray_now', '')
if tray_now not in (None, '', '255'):
try:
tray_id = int(tray_now)
if 0 <= tray_id <= 15:
self.trays_used.add(tray_id)
session.trays_used.add(tray_id)
except (ValueError, TypeError):
pass
if self._is_print_ending(gcode_state) and self.current_print_job:
self._finalize_print_job(metric, snapshot)
if self._is_print_ending(session, gcode_state) and session.current_print_job:
self._finalize_print_job(session, metric, snapshot)
self.last_gcode_state = gcode_state
self.last_subtask_name = subtask_name
session.last_gcode_state = gcode_state
session.last_subtask_name = subtask_name
def _is_print_starting(self, gcode_state, subtask_name):
def _is_print_starting(self, session, gcode_state, subtask_name):
is_printing = gcode_state not in ['FINISH', 'IDLE', 'FAILED', None, '']
has_new_job = subtask_name and subtask_name != self.last_subtask_name
has_new_job = subtask_name and subtask_name != session.last_subtask_name
return is_printing and has_new_job
def _is_print_ending(self, gcode_state):
def _is_print_ending(self, session, gcode_state):
ending_states = ['FINISH', 'FAILED']
return gcode_state in ending_states and self.last_gcode_state not in ending_states
return gcode_state in ending_states and session.last_gcode_state not in ending_states
def _finalize_print_job(self, metric, snapshot):
def _finalize_print_job(self, session, metric, snapshot):
from bambu_run.models import FilamentUsage
self.current_print_job.end_time = metric.timestamp
self.current_print_job.end_metric = metric
self.current_print_job.final_status = snapshot.get('gcode_state')
self.current_print_job.completion_percent = snapshot.get('print_percent', 0)
self.current_print_job.calculate_duration()
self.current_print_job.save()
job = session.current_print_job
job.end_time = metric.timestamp
job.end_metric = metric
job.final_status = snapshot.get('gcode_state')
job.completion_percent = snapshot.get('print_percent', 0)
job.calculate_duration()
job.save()
try:
from bambu_run.bambu_cloud import fetch_and_upsert_task
fetch_and_upsert_task(self.printer_client._client, self.current_print_job)
fetch_and_upsert_task(session.client._client, job)
except Exception as e:
logger.warning(f"Cloud task sync skipped (non-fatal): {e}")
start_metric = self.current_print_job.start_metric
start_metric = job.start_metric
if not start_metric:
logger.warning(f"No start_metric for job {self.current_print_job.id}, skipping filament usage")
elif not self.trays_used:
logger.warning(f"No trays tracked for job {self.current_print_job.project_name}, skipping filament usage")
logger.warning(f"No start_metric for job {job.id}, skipping filament usage")
elif not session.trays_used:
logger.warning(f"No trays tracked for job {job.project_name}, skipping filament usage")
else:
for tray_id in self.trays_used:
for tray_id in session.trays_used:
start_snap = start_metric.filament_snapshots.filter(
tray_id=tray_id, filament__isnull=False
).first()
@@ -546,12 +653,12 @@ class Command(BaseCommand):
).first()
usage = FilamentUsage.objects.create(
print_job=self.current_print_job,
print_job=job,
filament=start_snap.filament,
tray_id=tray_id,
starting_percent=start_snap.remain_percent or 100,
ending_percent=end_snap.remain_percent if end_snap else None,
is_primary=(len(self.trays_used) == 1),
is_primary=(len(session.trays_used) == 1),
)
usage.calculate_consumed()
usage.save()
@@ -563,30 +670,30 @@ class Command(BaseCommand):
)
logger.info(
f"Print job finished: {self.current_print_job.project_name} "
f"({self.current_print_job.final_status}) - Duration: {self.current_print_job.duration_minutes} min, "
f"Trays used: {sorted(self.trays_used) if self.trays_used else 'none tracked'}"
f"[{session.device_id}] Print job finished: {job.project_name} "
f"({job.final_status}) - Duration: {job.duration_minutes} min, "
f"Trays used: {sorted(session.trays_used) if session.trays_used else 'none tracked'}"
)
self.current_print_job = None
self.trays_used = set()
session.current_print_job = None
session.trays_used = set()
def _collect_printer_data(self):
def _collect_printer_data(self, session: "DeviceSession"):
try:
snapshot = self.printer_client.get_snapshot()
snapshot = session.client.get_snapshot()
if snapshot is None:
self.mqtt_connect_errors += 1
if self.mqtt_connect_errors <= 5 or self.verbose:
session.mqtt_connect_errors += 1
if session.mqtt_connect_errors <= 5 or self.verbose:
logger.warning(
f"MQTT not connected yet or no data available "
f"(attempt {self.mqtt_connect_errors})"
f"[{session.device_id}] MQTT not connected yet or no data available "
f"(attempt {session.mqtt_connect_errors})"
)
return
with transaction.atomic():
metric = PrinterMetrics.objects.create(
device=self.printer_device,
device=session.printer,
timestamp=timezone.now(),
nozzle_temp=self._to_decimal(snapshot.get("nozzle_temp")),
nozzle_target_temp=self._to_decimal(snapshot.get("nozzle_target_temp")),
@@ -595,6 +702,10 @@ class Command(BaseCommand):
chamber_temp=self._to_decimal(snapshot.get("chamber_temp")),
nozzle_diameter=self._to_decimal(snapshot.get("nozzle_diameter")),
nozzle_type=snapshot.get("nozzle_type"),
nozzle_temp_left=self._to_decimal(snapshot.get("nozzle_temp_left")),
nozzle_target_temp_left=self._to_decimal(snapshot.get("nozzle_target_temp_left")),
nozzle_diameter_left=self._to_decimal(snapshot.get("nozzle_diameter_left")),
nozzle_type_left=snapshot.get("nozzle_type_left"),
gcode_state=snapshot.get("gcode_state"),
print_type=snapshot.get("print_type"),
print_percent=snapshot.get("print_percent"),
@@ -634,27 +745,28 @@ class Command(BaseCommand):
ams_units=snapshot.get("ams_units", []),
external_spool=snapshot.get("external_spool", {}),
lights_report=snapshot.get("lights_report", []),
vortek_raw=snapshot.get("vortek_raw", {}),
)
filaments_data = snapshot.get('filaments', [])
if filaments_data:
self._create_filament_snapshots(metric, filaments_data, snapshot)
self._track_print_job(metric, snapshot)
self._track_print_job(session, metric, snapshot)
self.success_count += 1
session.success_count += 1
if self.verbose:
logger.debug(
f"Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, "
f"[{session.device_id}] Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, "
f"Bed={snapshot.get('bed_temp')}C, "
f"Progress={snapshot.get('print_percent')}%, "
f"State={snapshot.get('gcode_state')}"
)
except Exception as e:
self.error_count += 1
logger.error(f"Error collecting printer data (total errors: {self.error_count}): {e}")
session.error_count += 1
logger.error(f"[{session.device_id}] Error collecting printer data (total errors: {session.error_count}): {e}")
if self.verbose:
logger.exception("Detailed traceback:")
@@ -669,16 +781,20 @@ class Command(BaseCommand):
def _print_statistics(self):
if self.start_time:
runtime = timezone.now() - self.start_time
total_collections = self.success_count + self.error_count
success_count = sum(s.success_count for s in self.sessions.values())
error_count = sum(s.error_count for s in self.sessions.values())
mqtt_connect_errors = sum(s.mqtt_connect_errors for s in self.sessions.values())
total_collections = success_count + error_count
success_rate = (
(self.success_count / total_collections * 100)
(success_count / total_collections * 100)
if total_collections > 0
else 0
)
logger.info("=== Statistics ===")
logger.info(f"Runtime: {runtime}")
logger.info(f"Successful collections: {self.success_count}")
logger.info(f"Failed collections: {self.error_count}")
logger.info(f"MQTT connection warnings: {self.mqtt_connect_errors}")
logger.info(f"Printers tracked: {len(self.sessions)}")
logger.info(f"Successful collections: {success_count}")
logger.info(f"Failed collections: {error_count}")
logger.info(f"MQTT connection warnings: {mqtt_connect_errors}")
logger.info(f"Success rate: {success_rate:.1f}%")

View File

@@ -0,0 +1,128 @@
"""
Diagnose multi-printer cloud data for a Bambu Lab account.
Run this if `bambu_collector` doesn't pick up all your printers, or the data
collected for a second/third printer looks wrong. It authenticates with your
Bambu Lab account, lists every device the cloud API reports, listens briefly
for raw MQTT data from each one, and writes a redacted JSON report you can
attach to a GitHub issue.
Usage:
python manage.py bambu_diagnose
python manage.py bambu_diagnose --listen-seconds 15
python manage.py bambu_diagnose --output my_report.json
python manage.py bambu_diagnose --no-redact # local debugging only — do NOT post this output publicly
"""
import json
import logging
import time
from django.core.management.base import BaseCommand, CommandError
from bambu_run.diagnostics import build_diagnostics_report, redact_diagnostics
logger = logging.getLogger("bambu_run.diagnose")
class Command(BaseCommand):
help = "Authenticate, list every printer on the account, and write a redacted diagnostics report."
def add_arguments(self, parser):
parser.add_argument(
"--listen-seconds", type=float, default=8.0,
help="How long to listen for MQTT data per device (default: 8)",
)
parser.add_argument(
"--output", type=str, default=None,
help="Output file path (default: bambu_diagnostics_<timestamp>.json)",
)
parser.add_argument(
"--no-redact", action="store_true",
help="Keep full serials/identifiers unmasked. For your own debugging only — "
"do not paste this output into a public GitHub issue.",
)
def handle(self, *args, **options):
import os
from bambu_run.mqtt_client import BambuPrinter, BambuClient
listen_seconds = options["listen_seconds"]
redact = not options["no_redact"]
bambu_username = os.environ.get("BAMBU_USERNAME")
bambu_password = os.environ.get("BAMBU_PASSWORD")
bambu_token = os.environ.get("BAMBU_TOKEN")
if not bambu_token and not all([bambu_username, bambu_password]):
raise CommandError(
"Either BAMBU_TOKEN or both BAMBU_USERNAME and BAMBU_PASSWORD "
"environment variables must be set"
)
self.stdout.write("Authenticating with Bambu Lab cloud...")
auth = BambuPrinter(username=bambu_username, password=bambu_password, token=bambu_token)
token = auth._ensure_token()
cloud = BambuClient(token=token)
devices = cloud.get_devices()
self.stdout.write(self.style.SUCCESS(f"Found {len(devices)} device(s) on this account:"))
for device in devices:
self.stdout.write(
f" - {device.get('name', 'unknown')} "
f"({device.get('dev_product_name', 'unknown model')}) "
f"online={device.get('online')}"
)
if len(devices) < 2:
self.stdout.write(self.style.WARNING(
"Only one device returned by the cloud API — if you own multiple printers, "
"this is likely the root cause. Note this in the GitHub issue."
))
raw_payloads = {}
for device in devices:
dev_id = device.get("dev_id")
if not dev_id:
continue
self.stdout.write(f"Listening to {device.get('name', dev_id)} for {listen_seconds:.0f}s...")
client = BambuPrinter(token=token, device_id=dev_id)
try:
client.connect(blocking=False)
self._request_full_status_when_ready(client)
time.sleep(listen_seconds)
state = client.get_state()
raw_payloads[dev_id] = state._raw_data.get("print") if state._raw_data else None
except Exception as e:
self.stdout.write(self.style.WARNING(f" Could not collect data for {dev_id}: {e}"))
raw_payloads[dev_id] = None
finally:
client.disconnect()
report = build_diagnostics_report(devices, raw_payloads)
report = redact_diagnostics(report, redact=redact)
output_path = options["output"] or f"bambu_diagnostics_{int(time.time())}.json"
with open(output_path, "w") as f:
json.dump(report, f, indent=2, default=str)
self.stdout.write(self.style.SUCCESS(f"\nDiagnostics written to: {output_path}"))
if not redact:
self.stdout.write(self.style.WARNING(
"--no-redact was used: this file contains unmasked serials/identifiers. "
"Do not attach it to a public GitHub issue as-is."
))
else:
self.stdout.write(
"Serials/identifiers are masked. Please skim the file once before posting — "
"then attach it to https://github.com/RunLit/Bambu-Run/issues/10"
)
def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
mqtt_client = getattr(client, "_mqtt", None)
if mqtt_client is not None and getattr(mqtt_client, "connected", False):
client._mqtt.request_full_status()
return
time.sleep(0.5)

View File

@@ -0,0 +1,90 @@
# Generated by Django 5.2.8 on 2026-05-07 04:16
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bambu_run", "0003_cloud_task"),
]
operations = [
migrations.AddField(
model_name="filament",
name="ams_type",
field=models.CharField(
blank=True,
choices=[
("AMS", "AMS"),
("AMS 2 Pro", "AMS 2 Pro"),
("AMS HT", "AMS HT"),
],
default="",
help_text="Type of the AMS unit this spool is loaded in (AMS / AMS 2 Pro / AMS HT)",
max_length=32,
),
),
migrations.AddField(
model_name="filament",
name="ams_unit_id",
field=models.PositiveSmallIntegerField(
blank=True,
db_index=True,
help_text="Which physical AMS unit this spool is loaded in (matches MQTT ams[i].id; 128 = AMS HT)",
null=True,
),
),
migrations.AddField(
model_name="printermetrics",
name="nozzle_diameter_left",
field=models.DecimalField(
blank=True,
decimal_places=2,
help_text="Left nozzle diameter (mm). H2C only.",
max_digits=3,
null=True,
),
),
migrations.AddField(
model_name="printermetrics",
name="nozzle_target_temp_left",
field=models.DecimalField(
blank=True,
decimal_places=2,
help_text="Left extruder target temperature (°C). H2C only.",
max_digits=5,
null=True,
),
),
migrations.AddField(
model_name="printermetrics",
name="nozzle_temp_left",
field=models.DecimalField(
blank=True,
decimal_places=2,
help_text="Left extruder current temperature (°C). H2C only.",
max_digits=5,
null=True,
),
),
migrations.AddField(
model_name="printermetrics",
name="nozzle_type_left",
field=models.CharField(
blank=True,
help_text="Left nozzle type (e.g. HS01-0.4). H2C only.",
max_length=50,
null=True,
),
),
migrations.AlterField(
model_name="filament",
name="current_tray_id",
field=models.IntegerField(
blank=True,
help_text="Tray slot index within its AMS unit (0-3 for AMS/AMS 2 Pro, 0 for AMS HT)",
null=True,
),
),
]

View File

@@ -0,0 +1,22 @@
# Generated by Django 5.2.8 on 2026-06-18 12:20
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bambu_run", "0004_h2c_dual_nozzle_and_ams_fields"),
]
operations = [
migrations.AddField(
model_name="printermetrics",
name="vortek_raw",
field=models.JSONField(
blank=True,
default=dict,
help_text="Raw print.device MQTT payload (Vortek rack groundwork)",
),
),
]

View File

@@ -2,6 +2,33 @@ from django.db import models
from django.utils import timezone
# Bambu AMS model-code → human-readable type label.
# Source: live H2C MQTT probe — `print.ams.ams[i].info` field.
# Add new codes as they are observed (e.g. AMS Lite, future variants).
AMS_INFO_TO_TYPE = {
"1001": "AMS",
"1003": "AMS 2 Pro",
"2104": "AMS HT",
}
AMS_TYPE_CHOICES = [
("AMS", "AMS"),
("AMS 2 Pro", "AMS 2 Pro"),
("AMS HT", "AMS HT"),
]
def ams_type_from_info(info_code) -> str:
"""Resolve an AMS unit's `info` model code to a human label.
The HT unit reports its `id` with the 0x80 bit set (e.g. 128) — when the info
code is unknown, that bit is a reasonable secondary hint for HT identification.
"""
if info_code is None:
return ""
return AMS_INFO_TO_TYPE.get(str(info_code), "")
class Printer(models.Model):
"""Represents a Bambu Lab 3D printer device"""
@@ -58,12 +85,32 @@ class PrinterMetrics(models.Model):
max_digits=5, decimal_places=2, null=True, blank=True
)
# Nozzle info
# Nozzle info — single-nozzle / right-side back-compat fields. On dual-nozzle
# printers (H2C) these mirror the right extruder; the left extruder uses the
# `_left` columns below.
nozzle_diameter = models.DecimalField(
max_digits=3, decimal_places=2, null=True, blank=True
)
nozzle_type = models.CharField(max_length=50, null=True, blank=True)
# H2C dual-nozzle: left-side fields (NULL on single-nozzle printers).
nozzle_temp_left = models.DecimalField(
max_digits=5, decimal_places=2, null=True, blank=True,
help_text="Left extruder current temperature (°C). H2C only."
)
nozzle_target_temp_left = models.DecimalField(
max_digits=5, decimal_places=2, null=True, blank=True,
help_text="Left extruder target temperature (°C). H2C only."
)
nozzle_diameter_left = models.DecimalField(
max_digits=3, decimal_places=2, null=True, blank=True,
help_text="Left nozzle diameter (mm). H2C only."
)
nozzle_type_left = models.CharField(
max_length=50, null=True, blank=True,
help_text="Left nozzle type (e.g. HS01-0.4). H2C only."
)
# Print job status
gcode_state = models.CharField(
max_length=50, null=True, blank=True, help_text="FINISH, RUNNING, IDLE, etc."
@@ -184,6 +231,14 @@ class PrinterMetrics(models.Model):
default=list, help_text="Light status report [{node, mode}]"
)
# Groundwork for H2C's Vortek nozzle-changer rack (6 swappable hotends + 1 fixed
# left nozzle) — the full MQTT schema for per-slot state isn't confirmed yet, so
# the raw `print.device` payload is captured here unfiltered to avoid losing data
# ahead of proper per-slot modeling.
vortek_raw = models.JSONField(
default=dict, blank=True, help_text="Raw print.device MQTT payload (Vortek rack groundwork)"
)
class Meta:
db_table = "infrastructure_printer_metrics"
verbose_name = "Printer Metric"
@@ -365,7 +420,16 @@ class Filament(models.Model):
)
current_tray_id = models.IntegerField(
null=True, blank=True,
help_text="Which AMS slot (0-3) if loaded"
help_text="Tray slot index within its AMS unit (0-3 for AMS/AMS 2 Pro, 0 for AMS HT)"
)
ams_unit_id = models.PositiveSmallIntegerField(
null=True, blank=True, db_index=True,
help_text="Which physical AMS unit this spool is loaded in (matches MQTT ams[i].id; 128 = AMS HT)"
)
ams_type = models.CharField(
max_length=32, blank=True, default="",
choices=AMS_TYPE_CHOICES,
help_text="Type of the AMS unit this spool is loaded in (AMS / AMS 2 Pro / AMS HT)"
)
last_loaded_date = models.DateTimeField(
null=True, blank=True,

View File

@@ -335,10 +335,16 @@ class PrinterState:
wifi_signal: str = ""
wifi_signal_dbm: int = 0
# Nozzle info
# Nozzle info — single-nozzle / right-side back-compat fields.
nozzle_diameter: float = 0.4
nozzle_type: str = ""
# H2C dual-nozzle: left-side fields (None on single-nozzle printers).
nozzle_temp_left: Optional[float] = None
nozzle_target_temp_left: Optional[float] = None
nozzle_diameter_left: Optional[float] = None
nozzle_type_left: Optional[str] = None
# System status
home_flag: int = 0
hw_switch_state: int = 0
@@ -410,6 +416,21 @@ class PrinterState:
wifi_signal = print_data.get("wifi_signal", "")
# H2C dual-nozzle decoding. The H2C reports per-extruder temperatures
# under `print.device.extruder.info[]` as a 2-element array (index 0 =
# right, index 1 = left). The `temp` field is bit-packed:
# `temp_raw = (target << 16) | current`, both °C as ints.
nozzle_temp_left = None
nozzle_target_temp_left = None
device = print_data.get("device") or {}
extruders = (device.get("extruder") or {}).get("info") or []
if len(extruders) >= 2:
left = extruders[1]
t = left.get("temp")
if isinstance(t, int):
nozzle_target_temp_left = float((t >> 16) & 0xFFFF)
nozzle_temp_left = float(t & 0xFFFF)
return cls(
timestamp=timestamp,
sequence_id=str(print_data.get("sequence_id", "")),
@@ -438,6 +459,13 @@ class PrinterState:
wifi_signal_dbm=cls._parse_wifi_signal(wifi_signal),
nozzle_diameter=float(print_data.get("nozzle_diameter", 0.4)),
nozzle_type=print_data.get("nozzle_type", ""),
nozzle_temp_left=nozzle_temp_left,
nozzle_target_temp_left=nozzle_target_temp_left,
# Diameter/type per side: H2C currently uses uniform nozzles, so reuse top-level
# values. If a future probe shows per-side diameter/type variance, plumb it from
# `device.nozzle.info[]` cross-referenced against `device.extruder.info[i].id`.
nozzle_diameter_left=float(print_data.get("nozzle_diameter", 0.4)) if nozzle_temp_left is not None else None,
nozzle_type_left=print_data.get("nozzle_type", "") if nozzle_temp_left is not None else None,
home_flag=int(print_data.get("home_flag", 0)),
hw_switch_state=int(print_data.get("hw_switch_state", 0)),
mc_print_stage=str(print_data.get("mc_print_stage", "")),
@@ -473,6 +501,14 @@ class PrinterState:
"chamber_temp": round(self.chamber_temp, 2),
"nozzle_diameter": self.nozzle_diameter,
"nozzle_type": self.nozzle_type,
"nozzle_temp_left": (
round(self.nozzle_temp_left, 2) if self.nozzle_temp_left is not None else None
),
"nozzle_target_temp_left": (
round(self.nozzle_target_temp_left, 2) if self.nozzle_target_temp_left is not None else None
),
"nozzle_diameter_left": self.nozzle_diameter_left,
"nozzle_type_left": self.nozzle_type_left,
"gcode_state": self.gcode_state,
"print_type": self.print_type,
"print_percent": self.print_percent,
@@ -493,6 +529,10 @@ class PrinterState:
"wifi_signal_dbm": self.wifi_signal_dbm,
"print_error": self.print_error,
"has_errors": self.print_error != 0,
# Full `print.device` payload, unfiltered. H2C's Vortek rack (6 swappable
# hotends + 1 fixed left nozzle) isn't fully modeled yet — stash everything
# here so no data is lost once the real Vortek MQTT schema is confirmed.
"vortek_raw": self._raw_data.get("print", {}).get("device", {}),
"hms": self.hms,
"stg_cur": self.stg_cur,
"lights_report": self.lights_report,
@@ -515,8 +555,19 @@ class PrinterState:
snapshot["tray_now"] = self.ams.tray_now
snapshot["ams_version"] = self.ams.version
from .models import ams_type_from_info
filaments = []
for unit in self.ams.units:
# `unit_id` is the AMS unit's own id from the MQTT payload — for the
# original AMS / AMS 2 Pro it's a small int (0,1,2,...); for AMS HT
# it has the 0x80 bit set (e.g. 128). Don't compute tray_id // 4 —
# multi-AMS-type setups are not contiguous.
try:
unit_id_int = int(unit.unit_id)
except (TypeError, ValueError):
unit_id_int = None
ams_type_label = ams_type_from_info(unit.info)
for tray in unit.trays:
if tray.tray_type:
filaments.append({
@@ -542,6 +593,9 @@ class PrinterState:
"tray_bed_temp": tray.tray_bed_temp,
"bed_temp_type": tray.bed_temp_type,
"cols": tray.cols,
"ams_unit_id": unit_id_int,
"ams_info": unit.info,
"ams_type": ams_type_label,
})
snapshot["filaments"] = filaments
@@ -552,6 +606,7 @@ class PrinterState:
"ams_id": unit.ams_id,
"chip_id": unit.chip_id,
"info": unit.info,
"ams_type": ams_type_from_info(unit.info),
"humidity": unit.humidity,
"humidity_raw": unit.humidity_raw,
"temp": unit.temp,

View File

@@ -1,7 +1,7 @@
// 3D Printer Charts Initialization and Management
// Chart.js implementation for printer metrics visualization
let nozzleTempChart, bedTempChart, printProgressChart, fanSpeedsChart;
let nozzleTempChart, nozzleTempLeftChart, bedTempChart, printProgressChart, fanSpeedsChart;
let wifiSignalChart, amsConditionsChart, layerProgressChart, filamentTimelineChart;
function showNoDataMessage(canvasId) {
@@ -75,6 +75,50 @@ function initPrinterCharts(printerData, apiUrl) {
options: getTemperatureChartOptions(tickColor, gridColor, '°C')
});
// Initialize Left Nozzle Temperature Chart (H2C-class dual-nozzle).
// Mounted only when the canvas exists AND the API returned non-null
// left-side samples — single-nozzle printers leave the column NULL.
const nozzleLeftCanvas = document.getElementById('nozzleTempLeftChart');
const hasLeftData = Array.isArray(printerData.nozzle_temp_left)
&& printerData.nozzle_temp_left.some(v => v !== null && v !== undefined);
if (nozzleLeftCanvas && hasLeftData) {
const nozzleLeftCtx = nozzleLeftCanvas.getContext('2d');
nozzleTempLeftChart = new Chart(nozzleLeftCtx, {
type: 'line',
data: {
labels: printerData.timestamps,
datasets: [
{
label: 'Actual Temp (Left)',
data: printerData.nozzle_temp_left,
borderColor: 'rgb(54, 162, 235)',
backgroundColor: 'rgba(54, 162, 235, 0.1)',
tension: 0.3,
borderWidth: 2,
pointRadius: 0,
pointHoverRadius: 3,
spanGaps: true
},
{
label: 'Target Temp (Left)',
data: printerData.nozzle_target_temp_left,
borderColor: 'rgb(153, 102, 255)',
backgroundColor: 'rgba(153, 102, 255, 0.05)',
borderDash: [5, 5],
tension: 0.3,
borderWidth: 2,
pointRadius: 0,
pointHoverRadius: 3,
spanGaps: true
}
]
},
options: getTemperatureChartOptions(tickColor, gridColor, '°C')
});
} else if (nozzleLeftCanvas) {
showNoDataMessage('nozzleTempLeftChart');
}
// Initialize Bed Temperature Chart
const bedCtx = document.getElementById('bedTempChart').getContext('2d');
bedTempChart = new Chart(bedCtx, {
@@ -702,7 +746,7 @@ function updateChartTheme() {
// Update all charts
const charts = [
nozzleTempChart, bedTempChart, printProgressChart, fanSpeedsChart,
nozzleTempChart, nozzleTempLeftChart, bedTempChart, printProgressChart, fanSpeedsChart,
wifiSignalChart, amsConditionsChart, layerProgressChart, filamentTimelineChart
];
@@ -804,7 +848,7 @@ function applyDateSeparatorsToAllPrinterCharts(timestamps, dates) {
const sepAnnotations = buildDateSeparatorAnnotations(timestamps, dates);
const charts = [
nozzleTempChart, bedTempChart, printProgressChart, fanSpeedsChart,
nozzleTempChart, nozzleTempLeftChart, bedTempChart, printProgressChart, fanSpeedsChart,
wifiSignalChart, amsConditionsChart, layerProgressChart, filamentTimelineChart
];

View File

@@ -200,6 +200,13 @@ function updateAllPrinterCharts(data) {
{ data: data.nozzle_target_temp, datasetIndex: 1 }
]);
if (typeof nozzleTempLeftChart !== 'undefined' && nozzleTempLeftChart) {
updateChartData(nozzleTempLeftChart, data.timestamps, [
{ data: data.nozzle_temp_left || [], datasetIndex: 0 },
{ data: data.nozzle_target_temp_left || [], datasetIndex: 1 }
]);
}
updateChartData(bedTempChart, data.timestamps, [
{ data: data.bed_temp, datasetIndex: 0 },
{ data: data.bed_target_temp, datasetIndex: 1 }
@@ -269,7 +276,7 @@ function addProjectMarkersToCharts(markers, timestamps) {
console.log('Adding project markers:', markers);
const charts = [
nozzleTempChart, bedTempChart, printProgressChart, fanSpeedsChart,
nozzleTempChart, nozzleTempLeftChart, bedTempChart, printProgressChart, fanSpeedsChart,
wifiSignalChart, amsConditionsChart, layerProgressChart, filamentTimelineChart
];
@@ -400,7 +407,7 @@ function resetPrinterControls() {
// Clear annotations and reload with original data
const charts = [
nozzleTempChart, bedTempChart, printProgressChart, fanSpeedsChart,
nozzleTempChart, nozzleTempLeftChart, bedTempChart, printProgressChart, fanSpeedsChart,
wifiSignalChart, amsConditionsChart, layerProgressChart, filamentTimelineChart
];

View File

@@ -70,14 +70,22 @@
{% endfor %}
</select>
</div>
<div class="col-md-3">
<div class="col-md-2">
<select name="loaded" class="form-select">
<option value="">All Spools</option>
<option value="yes" {% if request.GET.loaded == 'yes' %}selected{% endif %}>Loaded in AMS</option>
<option value="no" {% if request.GET.loaded == 'no' %}selected{% endif %}>Not Loaded</option>
</select>
</div>
<div class="col-md-3">
<div class="col-md-2">
<select name="ams_type" class="form-select">
<option value="">All AMS Types</option>
{% for at in ams_type_choices %}
<option value="{{ at }}" {% if request.GET.ams_type == at %}selected{% endif %}>{{ at }}</option>
{% endfor %}
</select>
</div>
<div class="col-md-2">
<button type="submit" class="btn btn-secondary">Filter</button>
<a href="{% url 'bambu_run:filament_list' %}" class="btn btn-outline-secondary">Reset</a>
</div>
@@ -149,7 +157,11 @@
</td>
<td class="align-middle">
{% if filament.is_loaded_in_ams %}
<span class="badge bg-success">AMS Tray {{ filament.current_tray_id }}</span>
<span class="badge bg-success">
{% if filament.ams_type %}{{ filament.ams_type }}{% else %}AMS{% endif %}
{% if filament.ams_unit_id is not None %}#{{ filament.ams_unit_id }}{% endif %}
· Tray {{ filament.current_tray_id }}
</span>
{% else %}
<span class="badge bg-secondary">Storage</span>
{% endif %}

View File

@@ -14,6 +14,20 @@
Real-time monitoring for {{ device_name }}
</p>
</div>
{% if show_printer_switcher %}
<div class="col-auto d-flex align-items-center gap-2">
<label for="printerSwitcher" class="form-label mb-0 text-nowrap">Device:</label>
<select id="printerSwitcher" class="form-select" aria-label="Select printer"
onchange="if (this.value) { window.location.href = this.value; }">
{% for p in all_printers %}
<option value="{% url 'bambu_run:printer_dashboard' pk=p.pk %}"
{% if printer_device.pk == p.pk %}selected{% endif %}>
{{ p.name }}
</option>
{% endfor %}
</select>
</div>
{% endif %}
</div>
{% if error %}
@@ -22,7 +36,41 @@
<!-- Summary Cards Row -->
<div class="row g-3 mb-4">
<!-- Nozzle Temperature Card -->
{% if stats.is_dual_nozzle %}
<!-- Right Nozzle (dual-nozzle printers, e.g. H2C) -->
<div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-warning">
<div class="card-body">
<div class="d-flex justify-content-between align-items-start">
<div>
<div class="stat-label">Right Nozzle</div>
<div class="stat-value">{{ stats.nozzle_temp|floatformat:1 }}&deg;C</div>
<div class="text-muted small">target {{ stats.nozzle_target_temp|floatformat:0 }}&deg;C
{% if stats.nozzle_type %}· {{ stats.nozzle_type }}{% endif %}</div>
</div>
<i class="bi bi-thermometer-high" style="font-size: 2rem; opacity: 0.3;"></i>
</div>
</div>
</div>
</div>
<!-- Left Nozzle -->
<div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-warning">
<div class="card-body">
<div class="d-flex justify-content-between align-items-start">
<div>
<div class="stat-label">Left Nozzle</div>
<div class="stat-value">{{ stats.nozzle_temp_left|floatformat:1 }}&deg;C</div>
<div class="text-muted small">target {{ stats.nozzle_target_temp_left|floatformat:0 }}&deg;C
{% if stats.nozzle_type_left %}· {{ stats.nozzle_type_left }}{% endif %}</div>
</div>
<i class="bi bi-thermometer-high" style="font-size: 2rem; opacity: 0.3;"></i>
</div>
</div>
</div>
</div>
{% else %}
<!-- Nozzle Temperature Card (single-nozzle printers) -->
<div class="col-12 col-md-6 col-lg-3">
<div class="card infra-card-warning">
<div class="card-body">
@@ -36,6 +84,7 @@
</div>
</div>
</div>
{% endif %}
<!-- Bed Temperature Card -->
<div class="col-12 col-md-6 col-lg-3">
@@ -266,10 +315,10 @@
<!-- Charts Section -->
<div class="row g-3 mb-4">
<!-- Nozzle Temperature Chart -->
<!-- Nozzle Temperature Chart (right side / single nozzle) -->
<div class="col-12 col-lg-6">
<div class="card">
<div class="card-header">Nozzle Temperature</div>
<div class="card-header">{% if stats.is_dual_nozzle %}Right Nozzle Temperature{% else %}Nozzle Temperature{% endif %}</div>
<div class="card-body">
<div class="chart-container">
<canvas id="nozzleTempChart"></canvas>
@@ -278,6 +327,20 @@
</div>
</div>
{% if stats.is_dual_nozzle %}
<!-- Left Nozzle Temperature Chart (H2C-class dual-nozzle) -->
<div class="col-12 col-lg-6">
<div class="card">
<div class="card-header">Left Nozzle Temperature</div>
<div class="card-body">
<div class="chart-container">
<canvas id="nozzleTempLeftChart"></canvas>
</div>
</div>
</div>
</div>
{% endif %}
<!-- Bed Temperature Chart -->
<div class="col-12 col-lg-6">
<div class="card">
@@ -374,12 +437,12 @@
<script src="https://cdn.jsdelivr.net/npm/chartjs-plugin-annotation@3.0.1"></script>
<script src="{% static 'bambu_run/js/printer_charts.js' %}"></script>
<script src="{% static 'bambu_run/js/printer_charts_control.js' %}"></script>
{% if not is_basic_user %}
<div id="printerApiUrl" data-url="{% url 'bambu_run:printer_api' %}" style="display: none;"></div>
{% if not is_basic_user and printer_device %}
<div id="printerApiUrl" data-url="{% url 'bambu_run:printer_api' pk=printer_device.pk %}" style="display: none;"></div>
<script>
document.addEventListener('DOMContentLoaded', function() {
const printerData = {{ printer_data_json|safe }};
const apiUrl = '{% url "bambu_run:printer_api" %}';
const apiUrl = '{% url "bambu_run:printer_api" pk=printer_device.pk %}';
initPrinterCharts(printerData, apiUrl);
// Add project markers if they exist

View File

@@ -5,7 +5,9 @@ app_name = "bambu_run"
urlpatterns = [
path("", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
path("printer/<int:pk>/", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
path("api/printer/", views.PrinterDataAPIView.as_view(), name="printer_api"),
path("api/printer/<int:pk>/", views.PrinterDataAPIView.as_view(), name="printer_api"),
# Filament Inventory routes
path("filaments/", views.FilamentListView.as_view(), name="filament_list"),

View File

@@ -1,8 +1,9 @@
from datetime import timedelta, datetime
from django.views.generic import TemplateView, View, ListView, CreateView, UpdateView, DetailView, DeleteView
from django.contrib.auth.mixins import LoginRequiredMixin
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.http import JsonResponse
from django.http import Http404, JsonResponse
from django.urls import reverse_lazy
from django.contrib import messages
from django.db.models import Q, Sum
@@ -26,6 +27,17 @@ _METRICS_API_FIELDS = [
_MAX_CHART_POINTS = 3000
def resolve_printer_from_request(pk):
"""Resolve which Printer a dashboard/API view should show.
`pk` given (URL kwarg) -> that exact printer, 404 if missing/inactive.
`pk` omitted -> first active printer (today's single-printer default behavior).
"""
if pk is not None:
return get_object_or_404(Printer, pk=pk, is_active=True)
return Printer.objects.filter(is_active=True).first()
class PrinterDashboardView(LoginRequiredMixin, TemplateView):
template_name = "bambu_run/printer_dashboard.html"
@@ -38,13 +50,20 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
context = super().get_context_data(**kwargs)
context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE
all_printers = Printer.objects.filter(is_active=True)
context["all_printers"] = all_printers
# Shown even with a single printer — hints that multi-printer support exists.
context["show_printer_switcher"] = all_printers.exists()
try:
printer_device = Printer.objects.filter(is_active=True).first()
printer_device = resolve_printer_from_request(self.kwargs.get("pk"))
if not printer_device:
context["error"] = (
"No 3D printer device found. Please run bambu_collector first."
)
return context
except Http404:
raise
except Exception as e:
context["error"] = f"Error loading printer device: {str(e)}"
return context
@@ -76,6 +95,14 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
float(m.nozzle_target_temp) if m.nozzle_target_temp else None
for m in metrics
],
"nozzle_temp_left": [
float(m.nozzle_temp_left) if m.nozzle_temp_left is not None else None
for m in metrics
],
"nozzle_target_temp_left": [
float(m.nozzle_target_temp_left) if m.nozzle_target_temp_left is not None else None
for m in metrics
],
"bed_temp": [float(m.bed_temp) if m.bed_temp else None for m in metrics],
"bed_target_temp": [
float(m.bed_target_temp) if m.bed_target_temp else None for m in metrics
@@ -150,6 +177,14 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
stats = {
"nozzle_temp": float(latest_metric.nozzle_temp) if latest_metric.nozzle_temp else 0,
"nozzle_target_temp": float(latest_metric.nozzle_target_temp) if latest_metric.nozzle_target_temp else 0,
"nozzle_diameter": float(latest_metric.nozzle_diameter) if latest_metric.nozzle_diameter else None,
"nozzle_type": latest_metric.nozzle_type or "",
"nozzle_temp_left": float(latest_metric.nozzle_temp_left) if latest_metric.nozzle_temp_left is not None else None,
"nozzle_target_temp_left": float(latest_metric.nozzle_target_temp_left) if latest_metric.nozzle_target_temp_left is not None else None,
"nozzle_diameter_left": float(latest_metric.nozzle_diameter_left) if latest_metric.nozzle_diameter_left is not None else None,
"nozzle_type_left": latest_metric.nozzle_type_left or "",
"is_dual_nozzle": latest_metric.nozzle_temp_left is not None,
"bed_temp": float(latest_metric.bed_temp) if latest_metric.bed_temp else 0,
"chamber_temp": float(latest_metric.chamber_temp) if latest_metric.chamber_temp else 0,
"print_percent": latest_metric.print_percent or 0,
@@ -288,16 +323,21 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
class PrinterDataAPIView(LoginRequiredMixin, View):
"""API endpoint for dynamic printer chart updates"""
def get(self, request):
def get(self, request, pk=None):
start_date = request.GET.get("start_date")
end_date = request.GET.get("end_date")
start_time = request.GET.get("start_time", "00:00")
end_time = request.GET.get("end_time", "23:59")
try:
printer_device = Printer.objects.filter(is_active=True).first()
if not printer_device:
return JsonResponse({"error": "No printer device found"}, status=404)
if pk is not None:
printer_device = Printer.objects.filter(pk=pk, is_active=True).first()
if not printer_device:
return JsonResponse({"error": "Printer not found"}, status=404)
else:
printer_device = Printer.objects.filter(is_active=True).first()
if not printer_device:
return JsonResponse({"error": "No printer device found"}, status=404)
tz = zoneinfo.ZoneInfo(app_settings.TIMEZONE)
@@ -347,6 +387,8 @@ class PrinterDataAPIView(LoginRequiredMixin, View):
dates = []
nozzle_temp = []
nozzle_target_temp = []
nozzle_temp_left = []
nozzle_target_temp_left = []
bed_temp = []
bed_target_temp = []
print_percent = []
@@ -374,6 +416,8 @@ class PrinterDataAPIView(LoginRequiredMixin, View):
dates.append(ts.strftime('%Y-%m-%d'))
nozzle_temp.append(float(m.nozzle_temp) if m.nozzle_temp else None)
nozzle_target_temp.append(float(m.nozzle_target_temp) if m.nozzle_target_temp else None)
nozzle_temp_left.append(float(m.nozzle_temp_left) if m.nozzle_temp_left is not None else None)
nozzle_target_temp_left.append(float(m.nozzle_target_temp_left) if m.nozzle_target_temp_left is not None else None)
bed_temp.append(float(m.bed_temp) if m.bed_temp else None)
bed_target_temp.append(float(m.bed_target_temp) if m.bed_target_temp else None)
print_percent.append(m.print_percent if m.print_percent else 0)
@@ -451,6 +495,8 @@ class PrinterDataAPIView(LoginRequiredMixin, View):
"dates": dates,
"nozzle_temp": nozzle_temp,
"nozzle_target_temp": nozzle_target_temp,
"nozzle_temp_left": nozzle_temp_left,
"nozzle_target_temp_left": nozzle_target_temp_left,
"bed_temp": bed_temp,
"bed_target_temp": bed_target_temp,
"print_percent": print_percent,
@@ -561,6 +607,10 @@ class FilamentListView(LoginRequiredMixin, ListView):
elif loaded == 'no':
queryset = queryset.filter(is_loaded_in_ams=False)
ams_type = self.request.GET.get('ams_type')
if ams_type:
queryset = queryset.filter(ams_type=ams_type)
search = self.request.GET.get('search')
if search:
queryset = queryset.filter(
@@ -580,6 +630,11 @@ class FilamentListView(LoginRequiredMixin, ListView):
context['filament_types'] = sorted(
set(Filament.objects.exclude(type__isnull=True).exclude(type='').values_list('type', flat=True))
)
context['ams_type_choices'] = sorted(
set(
Filament.objects.exclude(ams_type='').values_list('ams_type', flat=True)
)
)
return context

View File

@@ -4,11 +4,11 @@ build-backend = "setuptools.build_meta"
[project]
name = "bambu-run"
version = "0.1.3"
version = "0.1.7"
description = "Django reusable app for Bambu Lab 3D printer monitoring and filament inventory management"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10"
requires-python = ">=3.9"
authors = [
{name = "Runnan Li"},
]
@@ -62,3 +62,7 @@ bambu_run = [
"templates/bambu_run/*.html",
"static/bambu_run/**/*",
]
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "tests.settings"
python_files = "test_*.py"

View File

@@ -111,6 +111,8 @@ PRINTER_IP = os.environ.get("PRINTER_IP", "")
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN", "")
PRINTER_SERIAL = os.environ.get("PRINTER_SERIAL", "")
CSRF_TRUSTED_ORIGINS = os.environ.get("CSRF_TRUSTED_ORIGINS", "").split(",")
# Logging
LOGGING = {
"version": 1,

0
tests/__init__.py Normal file
View File

40
tests/settings.py Normal file
View File

@@ -0,0 +1,40 @@
"""Minimal Django settings for running bambu_run's pytest suite (in-memory SQLite)."""
SECRET_KEY = "test-secret-key"
INSTALLED_APPS = [
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"bambu_run",
]
MIDDLEWARE = [
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
]
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
],
},
},
]
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": ":memory:",
}
}
USE_TZ = True
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
ROOT_URLCONF = "tests.urls"

68
tests/test_diagnostics.py Normal file
View File

@@ -0,0 +1,68 @@
import pytest
from bambu_run.diagnostics import redact_diagnostics, build_diagnostics_report
def test_redacts_password_and_token_like_keys():
data = {"BAMBU_PASSWORD": "hunter2", "access_token": "abc123", "ok": "fine"}
redacted = redact_diagnostics(data)
assert redacted["BAMBU_PASSWORD"] == "***REDACTED***"
assert redacted["access_token"] == "***REDACTED***"
assert redacted["ok"] == "fine"
def test_masks_known_identifier_keys_partially():
data = {"dev_id": "31B8BP592601478", "tray_uuid": "EE37828FA8844DE1AB12"}
redacted = redact_diagnostics(data)
assert redacted["dev_id"] == "31B8...1478"
assert redacted["tray_uuid"] == "EE37...AB12"
def test_short_identifier_values_fully_masked():
data = {"dev_id": "short"}
redacted = redact_diagnostics(data)
assert redacted["dev_id"] == "***"
def test_recurses_into_nested_structures():
data = {"devices": [{"dev_id": "31B8BP592601478", "name": "RNL-H2C"}]}
redacted = redact_diagnostics(data)
assert redacted["devices"][0]["dev_id"] == "31B8...1478"
assert redacted["devices"][0]["name"] == "RNL-H2C"
def test_no_redact_passthrough_keeps_original_values():
data = {"dev_id": "31B8BP592601478", "BAMBU_PASSWORD": "hunter2"}
result = redact_diagnostics(data, redact=False)
assert result == data
def test_build_diagnostics_report_structure():
devices = [{"dev_id": "SERIAL-A", "name": "Printer A", "dev_product_name": "H2C"}]
raw_payloads = {"SERIAL-A": {"device": {"extruder": {"info": []}}}}
report = build_diagnostics_report(devices, raw_payloads)
assert report["device_count"] == 1
assert "generated_at" in report
assert report["devices"][0]["device_info"]["dev_id"] == "SERIAL-A"
assert report["devices"][0]["raw_mqtt_payload"] == {"device": {"extruder": {"info": []}}}
def test_build_diagnostics_report_handles_missing_payload():
devices = [{"dev_id": "SERIAL-A", "name": "Printer A"}]
report = build_diagnostics_report(devices, raw_payloads={})
assert report["devices"][0]["raw_mqtt_payload"] is None
assert report["devices"][0]["note"] == "No MQTT data received within the listen window."

View File

@@ -0,0 +1,90 @@
import pytest
from bambu_run.management.commands.bambu_collector import (
Command,
DeviceSession,
resolve_printer_device,
)
from bambu_run.models import PrintJob, PrinterMetrics
class FakeClient:
"""Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
def __init__(self, snapshots):
self._snapshots = snapshots
self._index = 0
self._client = None # cloud BambuClient handle used by cloud task sync
def get_snapshot(self):
snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
self._index += 1
return snap
def make_session(device_id, name, snapshots):
printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
@pytest.mark.django_db
def test_collects_metrics_against_the_correct_printer_per_session():
session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
metric_a = PrinterMetrics.objects.get(device=session_a.printer)
metric_b = PrinterMetrics.objects.get(device=session_b.printer)
assert metric_a.nozzle_temp == 200
assert metric_b.nozzle_temp == 210
@pytest.mark.django_db
def test_print_job_tracking_is_isolated_per_session():
session_a = make_session(
"SERIAL-A",
"Printer A",
[
{"gcode_state": "RUNNING", "subtask_name": "job_A", "print_percent": 10},
{"gcode_state": "FINISH", "subtask_name": "job_A", "print_percent": 100},
],
)
session_b = make_session("SERIAL-B", "Printer B", [{"gcode_state": "IDLE"}])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
cmd._collect_printer_data(session_a)
assert PrintJob.objects.filter(device=session_a.printer).count() == 1
job = PrintJob.objects.get(device=session_a.printer)
assert job.final_status == "FINISH"
assert session_a.current_print_job is None
assert PrintJob.objects.filter(device=session_b.printer).count() == 0
assert session_b.current_print_job is None
@pytest.mark.django_db
def test_one_session_error_does_not_affect_another_session():
session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
class ExplodingClient:
def get_snapshot(self):
raise RuntimeError("MQTT connection lost")
session_a.client = ExplodingClient()
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
assert session_a.error_count == 1
assert PrinterMetrics.objects.filter(device=session_b.printer).exists()

View File

@@ -0,0 +1,78 @@
import pytest
from django.urls import reverse
from bambu_run.models import Printer
@pytest.fixture
def logged_in_client(client, django_user_model):
user = django_user_model.objects.create_user(username="tester", password="pw")
client.force_login(user)
return client
@pytest.mark.django_db
def test_dashboard_with_no_printers_shows_error(logged_in_client):
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.status_code == 200
assert "error" in resp.context
@pytest.mark.django_db
def test_dashboard_defaults_to_first_active_printer(logged_in_client):
printer = Printer.objects.create(name="Only Printer", model="H2C", is_active=True)
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.context["printer_device"].pk == printer.pk
assert list(resp.context["all_printers"]) == [printer]
# Switcher shows even with a single printer, as a hint that multi-printer exists.
assert resp.context["show_printer_switcher"] is True
@pytest.mark.django_db
def test_dashboard_hides_switcher_with_zero_printers(logged_in_client):
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.context["show_printer_switcher"] is False
@pytest.mark.django_db
def test_dashboard_pk_route_shows_requested_printer(logged_in_client):
Printer.objects.create(name="Printer A", model="H2C", is_active=True)
printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer_b.pk})
)
assert resp.context["printer_device"].pk == printer_b.pk
assert resp.context["device_name"] == "Printer B"
@pytest.mark.django_db
def test_dashboard_unknown_pk_returns_404(logged_in_client):
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": 99999})
)
assert resp.status_code == 404
@pytest.mark.django_db
def test_api_pk_route_returns_only_requested_printer_data(logged_in_client):
from bambu_run.models import PrinterMetrics
from django.utils import timezone
from decimal import Decimal
printer_a = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
PrinterMetrics.objects.create(device=printer_a, timestamp=timezone.now(), nozzle_temp=Decimal("200"))
PrinterMetrics.objects.create(device=printer_b, timestamp=timezone.now(), nozzle_temp=Decimal("210"))
resp = logged_in_client.get(
reverse("bambu_run:printer_api", kwargs={"pk": printer_b.pk})
)
assert resp.status_code == 200
data = resp.json()
assert data["nozzle_temp"] == [210.0]

View File

@@ -0,0 +1,78 @@
import pytest
from bambu_run.management.commands.bambu_collector import resolve_printer_device
from bambu_run.models import Printer
@pytest.mark.django_db
def test_creates_new_printer_keyed_by_serial():
printer = resolve_printer_device(
"0309DA123456", {"name": "RNL-H2C", "dev_product_name": "H2C"}
)
assert printer.serial_number == "0309DA123456"
assert printer.name == "RNL-H2C"
assert printer.model == "H2C"
assert printer.is_active is True
@pytest.mark.django_db
def test_second_call_with_same_serial_does_not_create_duplicate():
first = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
second = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
assert first.pk == second.pk
assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1
@pytest.mark.django_db
def test_two_different_serials_create_two_printers():
a = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
b = resolve_printer_device("SERIAL-B", {"name": "Printer B", "dev_product_name": "X1C"})
assert a.pk != b.pk
assert Printer.objects.count() == 2
@pytest.mark.django_db
def test_backfills_single_legacy_printer_with_null_serial():
legacy = Printer.objects.create(
name="Bambu Lab Printer", model="Bambu Lab", manufacturer="Bambu Lab", is_active=True
)
resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
legacy.refresh_from_db()
assert resolved.pk == legacy.pk
assert legacy.serial_number == "SERIAL-A"
assert Printer.objects.count() == 1
@pytest.mark.django_db
def test_does_not_guess_when_multiple_legacy_printers_exist():
Printer.objects.create(name="Legacy 1", model="Bambu Lab")
Printer.objects.create(name="Legacy 2", model="Bambu Lab")
resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
assert resolved.serial_number == "SERIAL-A"
assert Printer.objects.count() == 3
@pytest.mark.django_db
def test_falls_back_to_generic_defaults_without_device_info():
printer = resolve_printer_device("SERIAL-A", None)
assert printer.serial_number == "SERIAL-A"
assert printer.name == "Bambu Lab Printer"
assert printer.model == "Bambu Lab"
@pytest.mark.django_db
def test_updates_name_and_model_on_existing_printer_when_changed():
resolve_printer_device("SERIAL-A", {"name": "Old Name", "dev_product_name": "H2C"})
updated = resolve_printer_device("SERIAL-A", {"name": "New Name", "dev_product_name": "H2C"})
assert updated.name == "New Name"
assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1

View File

@@ -0,0 +1,43 @@
import pytest
from bambu_run.mqtt_client import PrinterState
from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
from bambu_run.models import PrinterMetrics
def test_snapshot_includes_raw_device_payload_for_future_vortek_modeling():
raw_device = {
"extruder": {"info": [{"id": 0, "temp": 12058720}, {"id": 1, "temp": 11534560}]},
"nozzle": {"info": [{"id": 0, "diameter": 0.4}]},
}
data = {"print": {"device": raw_device, "gcode_state": "IDLE"}}
state = PrinterState.from_mqtt_data(data)
snapshot = state.get_snapshot()
assert snapshot["vortek_raw"] == raw_device
def test_snapshot_vortek_raw_defaults_to_empty_dict_when_no_device_payload():
state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}})
snapshot = state.get_snapshot()
assert snapshot["vortek_raw"] == {}
@pytest.mark.django_db
def test_collector_persists_vortek_raw_onto_printer_metrics():
printer = resolve_printer_device("SERIAL-A", {"name": "H2C", "dev_product_name": "H2C"})
class FakeClient:
def get_snapshot(self):
return {"gcode_state": "IDLE", "vortek_raw": {"extruder": {"info": []}}}
session = DeviceSession(device_id="SERIAL-A", client=FakeClient(), printer=printer)
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
metric = PrinterMetrics.objects.get(device=printer)
assert metric.vortek_raw == {"extruder": {"info": []}}

5
tests/urls.py Normal file
View File

@@ -0,0 +1,5 @@
from django.urls import include, path
urlpatterns = [
path("", include("bambu_run.urls")),
]