Initial implementation of multi-printer support.

This commit is contained in:
RNL
2026-06-18 22:50:39 +10:00
parent 34293ce81a
commit e7bc3291b6
15 changed files with 617 additions and 154 deletions

View File

@@ -13,8 +13,9 @@ import logging
import os
import ssl
import time
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Optional
from typing import Any, Dict, Optional
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
@@ -26,6 +27,56 @@ from bambu_run.models import Printer, PrinterMetrics
logger = logging.getLogger("bambu_run.collector")
def resolve_printer_device(device_id: str, device_info: Optional[dict] = None) -> Printer:
"""Find-or-create the Printer row for a Bambu cloud device, keyed by serial number.
`device_info` is one entry from BambuClient.get_devices() (keys: name,
dev_product_name, dev_id, ...). Falls back to generic defaults when unavailable
(e.g. local-only connections that never call get_devices()).
"""
device_info = device_info or {}
name = device_info.get("name") or "Bambu Lab Printer"
model = device_info.get("dev_product_name") or "Bambu Lab"
printer = Printer.objects.filter(serial_number=device_id).first()
if printer is None:
# Upgrade path: a pre-multi-printer deployment has exactly one Printer row
# with no serial number yet. Backfill it instead of creating a duplicate.
# If there's more than one such row, we can't tell which one this device
# used to be, so don't guess — create a fresh row instead.
legacy_candidates = list(Printer.objects.filter(serial_number__isnull=True)[:2])
if len(legacy_candidates) == 1:
printer = legacy_candidates[0]
printer.serial_number = device_id
if printer is None:
printer = Printer(serial_number=device_id)
printer.name = name
printer.model = model
printer.manufacturer = "Bambu Lab"
printer.is_active = True
printer.save()
return printer
@dataclass
class DeviceSession:
"""Per-printer mutable state for one bound device in a multi-printer collector run."""
device_id: str
client: Any # BambuPrinter
printer: Printer
current_print_job: Optional[Any] = None
last_gcode_state: Optional[str] = None
last_subtask_name: Optional[str] = None
trays_used: set = field(default_factory=set)
error_count: int = 0
success_count: int = 0
mqtt_connect_errors: int = 0
class Command(BaseCommand):
"""
MQTT Poll -> PrinterMetrics -> FilamentSnapshot -> Auto-Match -> Update Filament
@@ -51,18 +102,11 @@ class Command(BaseCommand):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.printer_client = None
self.printer_device = None
self.sessions: Dict[str, DeviceSession] = {}
self._token: Optional[str] = None
self.verbose = False
self.disable_ssl_verify = False
self.error_count = 0
self.success_count = 0
self.mqtt_connect_errors = 0
self.start_time = None
self.current_print_job = None
self.last_gcode_state = None
self.last_subtask_name = None
self.trays_used = set()
def handle(self, *args, **options):
self.verbose = options["verbose"]
@@ -100,12 +144,13 @@ class Command(BaseCommand):
self._configure_logging()
try:
self._initialize_printer()
self._initialize_printers()
except Exception as e:
raise CommandError(f"Initialization failed: {e}")
self.start_time = timezone.now()
logger.info(f"Bambu Run data collector started for printer: {self.printer_device.name}")
printer_names = ", ".join(s.printer.name for s in self.sessions.values())
logger.info(f"Bambu Run data collector started for {len(self.sessions)} printer(s): {printer_names}")
logger.info(f"Collection interval: {interval} seconds")
logger.info(f"Mode: {'Single run' if run_once else 'Continuous'}")
@@ -113,7 +158,8 @@ class Command(BaseCommand):
if run_once:
import time as _time
_time.sleep(5)
self._collect_printer_data()
for session in self.sessions.values():
self._collect_printer_data(session)
logger.info("Single collection completed successfully")
else:
self._run_continuous_loop(interval)
@@ -124,7 +170,7 @@ class Command(BaseCommand):
logger.exception(f"Fatal error in main loop: {e}")
raise CommandError(f"Runner failed: {e}")
def _request_full_status_when_ready(self, timeout: float = 20.0) -> None:
def _request_full_status_when_ready(self, client, timeout: float = 20.0) -> None:
"""Send pushall once the MQTT broker connection is confirmed.
BambuPrinter._connected is set True immediately after connect(blocking=False),
@@ -134,9 +180,9 @@ class Command(BaseCommand):
import time as _time
deadline = _time.time() + timeout
while _time.time() < deadline:
mqtt_client = getattr(self.printer_client, "_mqtt", None)
mqtt_client = getattr(client, "_mqtt", None)
if mqtt_client is not None and getattr(mqtt_client, "connected", False):
self.printer_client._mqtt.request_full_status()
client._mqtt.request_full_status()
logger.info("Sent MQTT pushall request")
return
_time.sleep(0.5)
@@ -155,7 +201,9 @@ class Command(BaseCommand):
handler.setFormatter(formatter)
logger.addHandler(handler)
def _initialize_printer(self):
def _initialize_printers(self):
"""Authenticate once, discover every device bound to the account, and open
one BambuPrinter (own MQTT thread) per device — all in this single process."""
from bambu_run.mqtt_client import BambuPrinter
bambu_username = os.environ.get("BAMBU_USERNAME")
@@ -169,30 +217,12 @@ class Command(BaseCommand):
"environment variables must be set"
)
logger.info("Connecting to Bambu Lab printer...")
logger.info("Authenticating with Bambu Lab cloud...")
try:
if bambu_token:
logger.info("Using saved BAMBU_TOKEN for authentication")
self.printer_client = BambuPrinter(
token=bambu_token, device_id=bambu_device_id
)
else:
logger.info("Authenticating with username/password")
self.printer_client = BambuPrinter(
username=bambu_username,
password=bambu_password,
device_id=bambu_device_id,
)
logger.info("Initiating MQTT connection...")
self.printer_client.connect(blocking=False)
logger.info("MQTT connection initiated (non-blocking)")
# Request full status so AMS + dual-nozzle data arrive on startup.
try:
self._request_full_status_when_ready()
except Exception as e:
logger.warning("pushall request skipped (non-fatal): %s", e)
auth = BambuPrinter(
username=bambu_username, password=bambu_password, token=bambu_token,
)
self._token = auth._ensure_token()
except Exception as e:
if "CERTIFICATE_VERIFY_FAILED" in str(e) or "SSL" in str(e):
error_msg = (
@@ -203,56 +233,62 @@ class Command(BaseCommand):
"3. pip install --upgrade certifi\n"
)
raise CommandError(error_msg)
raise CommandError(f"Failed to initialize printer client: {e}")
raise CommandError(f"Failed to authenticate: {e}")
self.printer_device = self._ensure_printer_device_exists()
logger.info(f"Initialized for printer device: {self.printer_device}")
def _ensure_printer_device_exists(self) -> Printer:
try:
snapshot = self.printer_client.get_snapshot()
if snapshot:
device, created = Printer.objects.update_or_create(
model="Bambu Lab",
defaults={
"name": "Bambu Lab Printer",
"manufacturer": "Bambu Lab",
"is_active": True,
},
)
action = "Created" if created else "Updated"
logger.info(f"{action} printer device record: {device}")
return device
else:
logger.warning("Snapshot returned None - MQTT not connected yet")
device = Printer.objects.filter(is_active=True).first()
if device:
logger.info(f"Using existing device record: {device}")
return device
else:
device = Printer.objects.create(
name="Bambu Lab Printer",
model="Bambu Lab",
manufacturer="Bambu Lab",
is_active=True,
)
logger.info(f"Created placeholder device: {device}")
return device
except Exception as e:
logger.error(f"Error during device initialization: {e}")
device_infos = self._discover_devices(bambu_device_id)
for device_id, device_info in device_infos.items():
try:
device = Printer.objects.filter(is_active=True).first()
if device:
logger.warning(f"Using existing device record from DB: {device}")
return device
else:
raise CommandError(
"No printer device found in database and initialization failed."
)
except Printer.DoesNotExist:
raise CommandError("Failed to create or retrieve printer device.")
self._add_session(device_id, device_info)
except Exception as e:
logger.error(f"Failed to initialize printer {device_id}: {e}")
if not self.sessions:
raise CommandError("No printer sessions could be initialized")
def _discover_devices(self, explicit_device_id: Optional[str]) -> Dict[str, dict]:
"""Return {device_id: device_info} for every printer to monitor.
device_info comes from BambuClient.get_devices() (name, dev_product_name,
etc.) — empty dict when explicitly pinned to one device via BAMBU_DEVICE_ID
and the cloud listing can't be reached.
"""
from bambu_run.mqtt_client import BambuClient
try:
cloud = BambuClient(token=self._token)
devices = cloud.get_devices()
except Exception as e:
if explicit_device_id:
logger.warning(f"Could not list account devices ({e}); using BAMBU_DEVICE_ID only")
return {explicit_device_id: {}}
raise
device_infos = {d.get("dev_id"): d for d in devices if d.get("dev_id")}
if explicit_device_id:
return {explicit_device_id: device_infos.get(explicit_device_id, {})}
if not device_infos:
raise CommandError("No devices found on this account")
return device_infos
def _add_session(self, device_id: str, device_info: dict) -> "DeviceSession":
from bambu_run.mqtt_client import BambuPrinter
logger.info(f"Connecting to printer {device_id} ({device_info.get('name', 'unknown')})...")
client = BambuPrinter(token=self._token, device_id=device_id)
client.connect(blocking=False)
try:
self._request_full_status_when_ready(client)
except Exception as e:
logger.warning("pushall request skipped (non-fatal): %s", e)
printer = resolve_printer_device(device_id, device_info)
session = DeviceSession(device_id=device_id, client=client, printer=printer)
self.sessions[device_id] = session
logger.info(f"Initialized session for printer: {printer}")
return session
def _run_continuous_loop(self, interval: int):
iteration = 0
@@ -263,7 +299,8 @@ class Command(BaseCommand):
if self.verbose:
logger.debug(f"=== Iteration {iteration} ===")
self._collect_printer_data()
for session in list(self.sessions.values()):
self._collect_printer_data(session)
elapsed = time.time() - loop_start
sleep_time = max(0, interval - elapsed)
@@ -273,9 +310,28 @@ class Command(BaseCommand):
if iteration % 100 == 0:
self._print_statistics()
self._refresh_devices()
time.sleep(sleep_time)
def _refresh_devices(self):
"""Pick up printers added to the account without restarting the process."""
if os.environ.get("BAMBU_DEVICE_ID"):
return # pinned to a single explicit device — nothing to discover
try:
device_infos = self._discover_devices(None)
except Exception as e:
logger.warning(f"Device refresh skipped (non-fatal): {e}")
return
for device_id, device_info in device_infos.items():
if device_id not in self.sessions:
logger.info(f"New printer detected on account: {device_id}")
try:
self._add_session(device_id, device_info)
except Exception as e:
logger.error(f"Failed to initialize newly-detected printer {device_id}: {e}")
def _convert_mqtt_color(self, mqtt_color):
if not mqtt_color:
return None
@@ -513,19 +569,19 @@ class Command(BaseCommand):
match_method=match_method
)
def _track_print_job(self, metric, snapshot):
from bambu_run.models import PrintJob, FilamentUsage
def _track_print_job(self, session, metric, snapshot):
from bambu_run.models import PrintJob
gcode_state = snapshot.get('gcode_state')
subtask_name = snapshot.get('subtask_name')
if self._is_print_starting(gcode_state, subtask_name):
if self.current_print_job:
self._finalize_print_job(metric, snapshot)
if self._is_print_starting(session, gcode_state, subtask_name):
if session.current_print_job:
self._finalize_print_job(session, metric, snapshot)
raw_task_id = snapshot.get('task_id')
self.current_print_job = PrintJob.objects.create(
device=self.printer_device,
session.current_print_job = PrintJob.objects.create(
device=session.printer,
project_name=subtask_name,
gcode_file=snapshot.get('gcode_file'),
start_time=metric.timestamp,
@@ -534,57 +590,58 @@ class Command(BaseCommand):
completion_percent=snapshot.get('print_percent', 0),
cloud_task_id_raw=int(raw_task_id) if raw_task_id else None,
)
self.trays_used = set()
logger.info(f"Print job started: {subtask_name}")
session.trays_used = set()
logger.info(f"[{session.device_id}] Print job started: {subtask_name}")
if self.current_print_job:
if session.current_print_job:
tray_now = snapshot.get('tray_now', '')
if tray_now not in (None, '', '255'):
try:
tray_id = int(tray_now)
if 0 <= tray_id <= 15:
self.trays_used.add(tray_id)
session.trays_used.add(tray_id)
except (ValueError, TypeError):
pass
if self._is_print_ending(gcode_state) and self.current_print_job:
self._finalize_print_job(metric, snapshot)
if self._is_print_ending(session, gcode_state) and session.current_print_job:
self._finalize_print_job(session, metric, snapshot)
self.last_gcode_state = gcode_state
self.last_subtask_name = subtask_name
session.last_gcode_state = gcode_state
session.last_subtask_name = subtask_name
def _is_print_starting(self, gcode_state, subtask_name):
def _is_print_starting(self, session, gcode_state, subtask_name):
is_printing = gcode_state not in ['FINISH', 'IDLE', 'FAILED', None, '']
has_new_job = subtask_name and subtask_name != self.last_subtask_name
has_new_job = subtask_name and subtask_name != session.last_subtask_name
return is_printing and has_new_job
def _is_print_ending(self, gcode_state):
def _is_print_ending(self, session, gcode_state):
ending_states = ['FINISH', 'FAILED']
return gcode_state in ending_states and self.last_gcode_state not in ending_states
return gcode_state in ending_states and session.last_gcode_state not in ending_states
def _finalize_print_job(self, metric, snapshot):
def _finalize_print_job(self, session, metric, snapshot):
from bambu_run.models import FilamentUsage
self.current_print_job.end_time = metric.timestamp
self.current_print_job.end_metric = metric
self.current_print_job.final_status = snapshot.get('gcode_state')
self.current_print_job.completion_percent = snapshot.get('print_percent', 0)
self.current_print_job.calculate_duration()
self.current_print_job.save()
job = session.current_print_job
job.end_time = metric.timestamp
job.end_metric = metric
job.final_status = snapshot.get('gcode_state')
job.completion_percent = snapshot.get('print_percent', 0)
job.calculate_duration()
job.save()
try:
from bambu_run.bambu_cloud import fetch_and_upsert_task
fetch_and_upsert_task(self.printer_client._client, self.current_print_job)
fetch_and_upsert_task(session.client._client, job)
except Exception as e:
logger.warning(f"Cloud task sync skipped (non-fatal): {e}")
start_metric = self.current_print_job.start_metric
start_metric = job.start_metric
if not start_metric:
logger.warning(f"No start_metric for job {self.current_print_job.id}, skipping filament usage")
elif not self.trays_used:
logger.warning(f"No trays tracked for job {self.current_print_job.project_name}, skipping filament usage")
logger.warning(f"No start_metric for job {job.id}, skipping filament usage")
elif not session.trays_used:
logger.warning(f"No trays tracked for job {job.project_name}, skipping filament usage")
else:
for tray_id in self.trays_used:
for tray_id in session.trays_used:
start_snap = start_metric.filament_snapshots.filter(
tray_id=tray_id, filament__isnull=False
).first()
@@ -596,12 +653,12 @@ class Command(BaseCommand):
).first()
usage = FilamentUsage.objects.create(
print_job=self.current_print_job,
print_job=job,
filament=start_snap.filament,
tray_id=tray_id,
starting_percent=start_snap.remain_percent or 100,
ending_percent=end_snap.remain_percent if end_snap else None,
is_primary=(len(self.trays_used) == 1),
is_primary=(len(session.trays_used) == 1),
)
usage.calculate_consumed()
usage.save()
@@ -613,30 +670,30 @@ class Command(BaseCommand):
)
logger.info(
f"Print job finished: {self.current_print_job.project_name} "
f"({self.current_print_job.final_status}) - Duration: {self.current_print_job.duration_minutes} min, "
f"Trays used: {sorted(self.trays_used) if self.trays_used else 'none tracked'}"
f"[{session.device_id}] Print job finished: {job.project_name} "
f"({job.final_status}) - Duration: {job.duration_minutes} min, "
f"Trays used: {sorted(session.trays_used) if session.trays_used else 'none tracked'}"
)
self.current_print_job = None
self.trays_used = set()
session.current_print_job = None
session.trays_used = set()
def _collect_printer_data(self):
def _collect_printer_data(self, session: "DeviceSession"):
try:
snapshot = self.printer_client.get_snapshot()
snapshot = session.client.get_snapshot()
if snapshot is None:
self.mqtt_connect_errors += 1
if self.mqtt_connect_errors <= 5 or self.verbose:
session.mqtt_connect_errors += 1
if session.mqtt_connect_errors <= 5 or self.verbose:
logger.warning(
f"MQTT not connected yet or no data available "
f"(attempt {self.mqtt_connect_errors})"
f"[{session.device_id}] MQTT not connected yet or no data available "
f"(attempt {session.mqtt_connect_errors})"
)
return
with transaction.atomic():
metric = PrinterMetrics.objects.create(
device=self.printer_device,
device=session.printer,
timestamp=timezone.now(),
nozzle_temp=self._to_decimal(snapshot.get("nozzle_temp")),
nozzle_target_temp=self._to_decimal(snapshot.get("nozzle_target_temp")),
@@ -688,27 +745,28 @@ class Command(BaseCommand):
ams_units=snapshot.get("ams_units", []),
external_spool=snapshot.get("external_spool", {}),
lights_report=snapshot.get("lights_report", []),
vortek_raw=snapshot.get("vortek_raw", {}),
)
filaments_data = snapshot.get('filaments', [])
if filaments_data:
self._create_filament_snapshots(metric, filaments_data, snapshot)
self._track_print_job(metric, snapshot)
self._track_print_job(session, metric, snapshot)
self.success_count += 1
session.success_count += 1
if self.verbose:
logger.debug(
f"Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, "
f"[{session.device_id}] Printer Metrics: Nozzle={snapshot.get('nozzle_temp')}C, "
f"Bed={snapshot.get('bed_temp')}C, "
f"Progress={snapshot.get('print_percent')}%, "
f"State={snapshot.get('gcode_state')}"
)
except Exception as e:
self.error_count += 1
logger.error(f"Error collecting printer data (total errors: {self.error_count}): {e}")
session.error_count += 1
logger.error(f"[{session.device_id}] Error collecting printer data (total errors: {session.error_count}): {e}")
if self.verbose:
logger.exception("Detailed traceback:")
@@ -723,16 +781,20 @@ class Command(BaseCommand):
def _print_statistics(self):
if self.start_time:
runtime = timezone.now() - self.start_time
total_collections = self.success_count + self.error_count
success_count = sum(s.success_count for s in self.sessions.values())
error_count = sum(s.error_count for s in self.sessions.values())
mqtt_connect_errors = sum(s.mqtt_connect_errors for s in self.sessions.values())
total_collections = success_count + error_count
success_rate = (
(self.success_count / total_collections * 100)
(success_count / total_collections * 100)
if total_collections > 0
else 0
)
logger.info("=== Statistics ===")
logger.info(f"Runtime: {runtime}")
logger.info(f"Successful collections: {self.success_count}")
logger.info(f"Failed collections: {self.error_count}")
logger.info(f"MQTT connection warnings: {self.mqtt_connect_errors}")
logger.info(f"Printers tracked: {len(self.sessions)}")
logger.info(f"Successful collections: {success_count}")
logger.info(f"Failed collections: {error_count}")
logger.info(f"MQTT connection warnings: {mqtt_connect_errors}")
logger.info(f"Success rate: {success_rate:.1f}%")

View File

@@ -0,0 +1,22 @@
# Generated by Django 5.2.8 on 2026-06-18 12:20
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bambu_run", "0004_h2c_dual_nozzle_and_ams_fields"),
]
operations = [
migrations.AddField(
model_name="printermetrics",
name="vortek_raw",
field=models.JSONField(
blank=True,
default=dict,
help_text="Raw print.device MQTT payload (Vortek rack groundwork)",
),
),
]

View File

@@ -231,6 +231,14 @@ class PrinterMetrics(models.Model):
default=list, help_text="Light status report [{node, mode}]"
)
# Groundwork for H2C's Vortek nozzle-changer rack (6 swappable hotends + 1 fixed
# left nozzle) — the full MQTT schema for per-slot state isn't confirmed yet, so
# the raw `print.device` payload is captured here unfiltered to avoid losing data
# ahead of proper per-slot modeling.
vortek_raw = models.JSONField(
default=dict, blank=True, help_text="Raw print.device MQTT payload (Vortek rack groundwork)"
)
class Meta:
db_table = "infrastructure_printer_metrics"
verbose_name = "Printer Metric"

View File

@@ -529,6 +529,10 @@ class PrinterState:
"wifi_signal_dbm": self.wifi_signal_dbm,
"print_error": self.print_error,
"has_errors": self.print_error != 0,
# Full `print.device` payload, unfiltered. H2C's Vortek rack (6 swappable
# hotends + 1 fixed left nozzle) isn't fully modeled yet — stash everything
# here so no data is lost once the real Vortek MQTT schema is confirmed.
"vortek_raw": self._raw_data.get("print", {}).get("device", {}),
"hms": self.hms,
"stg_cur": self.stg_cur,
"lights_report": self.lights_report,

View File

@@ -14,6 +14,19 @@
Real-time monitoring for {{ device_name }}
</p>
</div>
{% if show_printer_switcher %}
<div class="col-auto">
<select class="form-select" aria-label="Select printer"
onchange="if (this.value) { window.location.href = this.value; }">
{% for p in all_printers %}
<option value="{% url 'bambu_run:printer_dashboard' pk=p.pk %}"
{% if printer_device.pk == p.pk %}selected{% endif %}>
{{ p.name }}
</option>
{% endfor %}
</select>
</div>
{% endif %}
</div>
{% if error %}
@@ -423,12 +436,12 @@
<script src="https://cdn.jsdelivr.net/npm/chartjs-plugin-annotation@3.0.1"></script>
<script src="{% static 'bambu_run/js/printer_charts.js' %}"></script>
<script src="{% static 'bambu_run/js/printer_charts_control.js' %}"></script>
{% if not is_basic_user %}
<div id="printerApiUrl" data-url="{% url 'bambu_run:printer_api' %}" style="display: none;"></div>
{% if not is_basic_user and printer_device %}
<div id="printerApiUrl" data-url="{% url 'bambu_run:printer_api' pk=printer_device.pk %}" style="display: none;"></div>
<script>
document.addEventListener('DOMContentLoaded', function() {
const printerData = {{ printer_data_json|safe }};
const apiUrl = '{% url "bambu_run:printer_api" %}';
const apiUrl = '{% url "bambu_run:printer_api" pk=printer_device.pk %}';
initPrinterCharts(printerData, apiUrl);
// Add project markers if they exist

View File

@@ -5,7 +5,9 @@ app_name = "bambu_run"
urlpatterns = [
path("", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
path("printer/<int:pk>/", views.PrinterDashboardView.as_view(), name="printer_dashboard"),
path("api/printer/", views.PrinterDataAPIView.as_view(), name="printer_api"),
path("api/printer/<int:pk>/", views.PrinterDataAPIView.as_view(), name="printer_api"),
# Filament Inventory routes
path("filaments/", views.FilamentListView.as_view(), name="filament_list"),

View File

@@ -1,8 +1,9 @@
from datetime import timedelta, datetime
from django.views.generic import TemplateView, View, ListView, CreateView, UpdateView, DetailView, DeleteView
from django.contrib.auth.mixins import LoginRequiredMixin
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.http import JsonResponse
from django.http import Http404, JsonResponse
from django.urls import reverse_lazy
from django.contrib import messages
from django.db.models import Q, Sum
@@ -26,6 +27,17 @@ _METRICS_API_FIELDS = [
_MAX_CHART_POINTS = 3000
def resolve_printer_from_request(pk):
"""Resolve which Printer a dashboard/API view should show.
`pk` given (URL kwarg) -> that exact printer, 404 if missing/inactive.
`pk` omitted -> first active printer (today's single-printer default behavior).
"""
if pk is not None:
return get_object_or_404(Printer, pk=pk, is_active=True)
return Printer.objects.filter(is_active=True).first()
class PrinterDashboardView(LoginRequiredMixin, TemplateView):
template_name = "bambu_run/printer_dashboard.html"
@@ -38,13 +50,19 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
context = super().get_context_data(**kwargs)
context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE
all_printers = Printer.objects.filter(is_active=True)
context["all_printers"] = all_printers
context["show_printer_switcher"] = all_printers.count() > 1
try:
printer_device = Printer.objects.filter(is_active=True).first()
printer_device = resolve_printer_from_request(self.kwargs.get("pk"))
if not printer_device:
context["error"] = (
"No 3D printer device found. Please run bambu_collector first."
)
return context
except Http404:
raise
except Exception as e:
context["error"] = f"Error loading printer device: {str(e)}"
return context
@@ -304,16 +322,21 @@ class PrinterDashboardView(LoginRequiredMixin, TemplateView):
class PrinterDataAPIView(LoginRequiredMixin, View):
"""API endpoint for dynamic printer chart updates"""
def get(self, request):
def get(self, request, pk=None):
start_date = request.GET.get("start_date")
end_date = request.GET.get("end_date")
start_time = request.GET.get("start_time", "00:00")
end_time = request.GET.get("end_time", "23:59")
try:
printer_device = Printer.objects.filter(is_active=True).first()
if not printer_device:
return JsonResponse({"error": "No printer device found"}, status=404)
if pk is not None:
printer_device = Printer.objects.filter(pk=pk, is_active=True).first()
if not printer_device:
return JsonResponse({"error": "Printer not found"}, status=404)
else:
printer_device = Printer.objects.filter(is_active=True).first()
if not printer_device:
return JsonResponse({"error": "No printer device found"}, status=404)
tz = zoneinfo.ZoneInfo(app_settings.TIMEZONE)

View File

@@ -62,3 +62,7 @@ bambu_run = [
"templates/bambu_run/*.html",
"static/bambu_run/**/*",
]
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "tests.settings"
python_files = "test_*.py"

0
tests/__init__.py Normal file
View File

40
tests/settings.py Normal file
View File

@@ -0,0 +1,40 @@
"""Minimal Django settings for running bambu_run's pytest suite (in-memory SQLite)."""
SECRET_KEY = "test-secret-key"
INSTALLED_APPS = [
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"bambu_run",
]
MIDDLEWARE = [
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
]
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
],
},
},
]
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": ":memory:",
}
}
USE_TZ = True
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
ROOT_URLCONF = "tests.urls"

View File

@@ -0,0 +1,90 @@
import pytest
from bambu_run.management.commands.bambu_collector import (
Command,
DeviceSession,
resolve_printer_device,
)
from bambu_run.models import PrintJob, PrinterMetrics
class FakeClient:
"""Stub in place of BambuPrinter — returns canned snapshots, no real MQTT."""
def __init__(self, snapshots):
self._snapshots = snapshots
self._index = 0
self._client = None # cloud BambuClient handle used by cloud task sync
def get_snapshot(self):
snap = self._snapshots[min(self._index, len(self._snapshots) - 1)]
self._index += 1
return snap
def make_session(device_id, name, snapshots):
printer = resolve_printer_device(device_id, {"name": name, "dev_product_name": "H2C"})
return DeviceSession(device_id=device_id, client=FakeClient(snapshots), printer=printer)
@pytest.mark.django_db
def test_collects_metrics_against_the_correct_printer_per_session():
session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
metric_a = PrinterMetrics.objects.get(device=session_a.printer)
metric_b = PrinterMetrics.objects.get(device=session_b.printer)
assert metric_a.nozzle_temp == 200
assert metric_b.nozzle_temp == 210
@pytest.mark.django_db
def test_print_job_tracking_is_isolated_per_session():
session_a = make_session(
"SERIAL-A",
"Printer A",
[
{"gcode_state": "RUNNING", "subtask_name": "job_A", "print_percent": 10},
{"gcode_state": "FINISH", "subtask_name": "job_A", "print_percent": 100},
],
)
session_b = make_session("SERIAL-B", "Printer B", [{"gcode_state": "IDLE"}])
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
cmd._collect_printer_data(session_a)
assert PrintJob.objects.filter(device=session_a.printer).count() == 1
job = PrintJob.objects.get(device=session_a.printer)
assert job.final_status == "FINISH"
assert session_a.current_print_job is None
assert PrintJob.objects.filter(device=session_b.printer).count() == 0
assert session_b.current_print_job is None
@pytest.mark.django_db
def test_one_session_error_does_not_affect_another_session():
session_a = make_session("SERIAL-A", "Printer A", [{"nozzle_temp": 200, "gcode_state": "IDLE"}])
session_b = make_session("SERIAL-B", "Printer B", [{"nozzle_temp": 210, "gcode_state": "IDLE"}])
class ExplodingClient:
def get_snapshot(self):
raise RuntimeError("MQTT connection lost")
session_a.client = ExplodingClient()
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session_a)
cmd._collect_printer_data(session_b)
assert session_a.error_count == 1
assert PrinterMetrics.objects.filter(device=session_b.printer).exists()

View File

@@ -0,0 +1,69 @@
import pytest
from django.urls import reverse
from bambu_run.models import Printer
@pytest.fixture
def logged_in_client(client, django_user_model):
user = django_user_model.objects.create_user(username="tester", password="pw")
client.force_login(user)
return client
@pytest.mark.django_db
def test_dashboard_with_no_printers_shows_error(logged_in_client):
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.status_code == 200
assert "error" in resp.context
@pytest.mark.django_db
def test_dashboard_defaults_to_first_active_printer(logged_in_client):
printer = Printer.objects.create(name="Only Printer", model="H2C", is_active=True)
resp = logged_in_client.get(reverse("bambu_run:printer_dashboard"))
assert resp.context["printer_device"].pk == printer.pk
assert list(resp.context["all_printers"]) == [printer]
@pytest.mark.django_db
def test_dashboard_pk_route_shows_requested_printer(logged_in_client):
Printer.objects.create(name="Printer A", model="H2C", is_active=True)
printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": printer_b.pk})
)
assert resp.context["printer_device"].pk == printer_b.pk
assert resp.context["device_name"] == "Printer B"
@pytest.mark.django_db
def test_dashboard_unknown_pk_returns_404(logged_in_client):
resp = logged_in_client.get(
reverse("bambu_run:printer_dashboard", kwargs={"pk": 99999})
)
assert resp.status_code == 404
@pytest.mark.django_db
def test_api_pk_route_returns_only_requested_printer_data(logged_in_client):
from bambu_run.models import PrinterMetrics
from django.utils import timezone
from decimal import Decimal
printer_a = Printer.objects.create(name="Printer A", model="H2C", is_active=True)
printer_b = Printer.objects.create(name="Printer B", model="X1C", is_active=True)
PrinterMetrics.objects.create(device=printer_a, timestamp=timezone.now(), nozzle_temp=Decimal("200"))
PrinterMetrics.objects.create(device=printer_b, timestamp=timezone.now(), nozzle_temp=Decimal("210"))
resp = logged_in_client.get(
reverse("bambu_run:printer_api", kwargs={"pk": printer_b.pk})
)
assert resp.status_code == 200
data = resp.json()
assert data["nozzle_temp"] == [210.0]

View File

@@ -0,0 +1,78 @@
import pytest
from bambu_run.management.commands.bambu_collector import resolve_printer_device
from bambu_run.models import Printer
@pytest.mark.django_db
def test_creates_new_printer_keyed_by_serial():
printer = resolve_printer_device(
"0309DA123456", {"name": "RNL-H2C", "dev_product_name": "H2C"}
)
assert printer.serial_number == "0309DA123456"
assert printer.name == "RNL-H2C"
assert printer.model == "H2C"
assert printer.is_active is True
@pytest.mark.django_db
def test_second_call_with_same_serial_does_not_create_duplicate():
first = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
second = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
assert first.pk == second.pk
assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1
@pytest.mark.django_db
def test_two_different_serials_create_two_printers():
a = resolve_printer_device("SERIAL-A", {"name": "Printer A", "dev_product_name": "H2C"})
b = resolve_printer_device("SERIAL-B", {"name": "Printer B", "dev_product_name": "X1C"})
assert a.pk != b.pk
assert Printer.objects.count() == 2
@pytest.mark.django_db
def test_backfills_single_legacy_printer_with_null_serial():
legacy = Printer.objects.create(
name="Bambu Lab Printer", model="Bambu Lab", manufacturer="Bambu Lab", is_active=True
)
resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
legacy.refresh_from_db()
assert resolved.pk == legacy.pk
assert legacy.serial_number == "SERIAL-A"
assert Printer.objects.count() == 1
@pytest.mark.django_db
def test_does_not_guess_when_multiple_legacy_printers_exist():
Printer.objects.create(name="Legacy 1", model="Bambu Lab")
Printer.objects.create(name="Legacy 2", model="Bambu Lab")
resolved = resolve_printer_device("SERIAL-A", {"name": "RNL-H2C", "dev_product_name": "H2C"})
assert resolved.serial_number == "SERIAL-A"
assert Printer.objects.count() == 3
@pytest.mark.django_db
def test_falls_back_to_generic_defaults_without_device_info():
printer = resolve_printer_device("SERIAL-A", None)
assert printer.serial_number == "SERIAL-A"
assert printer.name == "Bambu Lab Printer"
assert printer.model == "Bambu Lab"
@pytest.mark.django_db
def test_updates_name_and_model_on_existing_printer_when_changed():
resolve_printer_device("SERIAL-A", {"name": "Old Name", "dev_product_name": "H2C"})
updated = resolve_printer_device("SERIAL-A", {"name": "New Name", "dev_product_name": "H2C"})
assert updated.name == "New Name"
assert Printer.objects.filter(serial_number="SERIAL-A").count() == 1

View File

@@ -0,0 +1,43 @@
import pytest
from bambu_run.mqtt_client import PrinterState
from bambu_run.management.commands.bambu_collector import Command, DeviceSession, resolve_printer_device
from bambu_run.models import PrinterMetrics
def test_snapshot_includes_raw_device_payload_for_future_vortek_modeling():
raw_device = {
"extruder": {"info": [{"id": 0, "temp": 12058720}, {"id": 1, "temp": 11534560}]},
"nozzle": {"info": [{"id": 0, "diameter": 0.4}]},
}
data = {"print": {"device": raw_device, "gcode_state": "IDLE"}}
state = PrinterState.from_mqtt_data(data)
snapshot = state.get_snapshot()
assert snapshot["vortek_raw"] == raw_device
def test_snapshot_vortek_raw_defaults_to_empty_dict_when_no_device_payload():
state = PrinterState.from_mqtt_data({"print": {"gcode_state": "IDLE"}})
snapshot = state.get_snapshot()
assert snapshot["vortek_raw"] == {}
@pytest.mark.django_db
def test_collector_persists_vortek_raw_onto_printer_metrics():
printer = resolve_printer_device("SERIAL-A", {"name": "H2C", "dev_product_name": "H2C"})
class FakeClient:
def get_snapshot(self):
return {"gcode_state": "IDLE", "vortek_raw": {"extruder": {"info": []}}}
session = DeviceSession(device_id="SERIAL-A", client=FakeClient(), printer=printer)
cmd = Command()
cmd.verbose = False
cmd._collect_printer_data(session)
metric = PrinterMetrics.objects.get(device=printer)
assert metric.vortek_raw == {"extruder": {"info": []}}

5
tests/urls.py Normal file
View File

@@ -0,0 +1,5 @@
from django.urls import include, path
urlpatterns = [
path("", include("bambu_run.urls")),
]