from datetime import timedelta, datetime from django.views.generic import TemplateView, View, ListView, CreateView, UpdateView, DetailView, DeleteView from django.contrib.auth.mixins import LoginRequiredMixin from django.shortcuts import get_object_or_404 from django.utils import timezone from django.http import Http404, JsonResponse from django.urls import reverse_lazy from django.contrib import messages from django.db.models import Q, Sum import json import zoneinfo from .conf import app_settings from .models import Printer, PrinterMetrics, Filament, FilamentColor, FilamentType, FilamentSnapshot, PrintJob, FilamentUsage from .forms import FilamentForm, FilamentColorForm, FilamentTypeForm _METRICS_API_FIELDS = [ 'id', 'device_id', 'timestamp', 'nozzle_temp', 'nozzle_target_temp', 'bed_temp', 'bed_target_temp', 'print_percent', 'cooling_fan_speed', 'heatbreak_fan_speed', 'wifi_signal_dbm', 'ams_humidity_raw', 'ams_temp', 'layer_num', 'total_layer_num', 'gcode_state', 'print_type', 'subtask_name', 'external_spool', ] _MAX_CHART_POINTS = 3000 def resolve_printer_from_request(pk): """Resolve which Printer a dashboard/API view should show. `pk` given (URL kwarg) -> that exact printer, 404 if missing/inactive. `pk` omitted -> first active printer (today's single-printer default behavior). """ if pk is not None: return get_object_or_404(Printer, pk=pk, is_active=True) return Printer.objects.filter(is_active=True).first() class PrinterDashboardView(LoginRequiredMixin, TemplateView): template_name = "bambu_run/printer_dashboard.html" def _get_date_range(self, request): """Return (start_dt, end_dt) for the dashboard query. Override for custom date logic.""" time_24h_ago = timezone.now() - timedelta(hours=24) return time_24h_ago, None # None means "now" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE all_printers = Printer.objects.filter(is_active=True) context["all_printers"] = all_printers # Shown even with a single printer — hints that multi-printer support exists. context["show_printer_switcher"] = all_printers.exists() try: printer_device = resolve_printer_from_request(self.kwargs.get("pk")) if not printer_device: context["error"] = ( "No 3D printer device found. Please run bambu_collector first." ) return context except Http404: raise except Exception as e: context["error"] = f"Error loading printer device: {str(e)}" return context tz = zoneinfo.ZoneInfo(app_settings.TIMEZONE) # Get date range (overridable by subclasses) start_dt, end_dt = self._get_date_range(self.request) metrics = PrinterMetrics.objects.filter( device=printer_device, timestamp__gte=start_dt ) if end_dt: metrics = metrics.filter(timestamp__lte=end_dt) metrics = metrics.prefetch_related('filament_snapshots').order_by("timestamp") latest_metric = metrics.last() printer_data_json = { "timestamps": [ m.timestamp.astimezone(tz).strftime("%H:%M") for m in metrics ], "dates": [ m.timestamp.astimezone(tz).strftime("%Y-%m-%d") for m in metrics ], "nozzle_temp": [ float(m.nozzle_temp) if m.nozzle_temp else None for m in metrics ], "nozzle_target_temp": [ float(m.nozzle_target_temp) if m.nozzle_target_temp else None for m in metrics ], "nozzle_temp_left": [ float(m.nozzle_temp_left) if m.nozzle_temp_left is not None else None for m in metrics ], "nozzle_target_temp_left": [ float(m.nozzle_target_temp_left) if m.nozzle_target_temp_left is not None else None for m in metrics ], "bed_temp": [float(m.bed_temp) if m.bed_temp else None for m in metrics], "bed_target_temp": [ float(m.bed_target_temp) if m.bed_target_temp else None for m in metrics ], "print_percent": [ m.print_percent if m.print_percent else 0 for m in metrics ], "print_type": [m.print_type for m in metrics], "gcode_state": [m.gcode_state for m in metrics], "cooling_fan_speed": [ m.cooling_fan_speed if m.cooling_fan_speed else 0 for m in metrics ], "heatbreak_fan_speed": [ m.heatbreak_fan_speed if m.heatbreak_fan_speed else 0 for m in metrics ], "wifi_signal_dbm": [ m.wifi_signal_dbm if m.wifi_signal_dbm else None for m in metrics ], "ams_humidity_raw": [ m.ams_humidity_raw if m.ams_humidity_raw else None for m in metrics ], "ams_temp": [ float(m.ams_temp) if m.ams_temp else None for m in metrics ], "layer_num": [ m.layer_num if m.layer_num else 0 for m in metrics ], "total_layer_num": [ m.total_layer_num if m.total_layer_num else 0 for m in metrics ], "filament_timeline": self._prepare_filament_timeline(metrics), } stats = {} if latest_metric: filaments_list = [] try: filament_snapshots = latest_metric.filament_snapshots.select_related('filament').all() for snapshot in filament_snapshots: filament_dict = { 'tray_id': snapshot.tray_id, 'type': snapshot.type or 'Unknown', 'brand': snapshot.sub_type or 'Unknown', 'color': snapshot.color or 'FFFFFFFF', 'remain_percent': snapshot.remain_percent or 0, 'ams_unit_id': snapshot.ams_unit_id, 'ams_type': snapshot.ams_type or '', } if snapshot.filament: filament_dict['color_name'] = snapshot.filament.color filament_dict['filament_pk'] = snapshot.filament.pk filament_dict['is_transparent'] = snapshot.filament.is_transparent filaments_list.append(filament_dict) except Exception: filaments_list = [] # Distinct AMS units represented in this snapshot, for the unit # filter/badges in the template. Sort numeric unit ids first # (AMS / AMS 2 Pro), HT (id 128 / bit 0x80 set) last. seen_units = {} for f in filaments_list: uid = f.get('ams_unit_id') if uid is not None and uid not in seen_units: seen_units[uid] = f.get('ams_type') or '' ams_units_list = [ {'ams_unit_id': uid, 'ams_type': label} for uid, label in sorted(seen_units.items()) ] # Group trays by physical AMS unit for the panel-style dashboard layout — # one tinted panel per unit, full-width for multi-slot units (AMS/AMS 2 Pro), # compact for single-slot units (AMS HT) so several can flow side-by-side. units_meta = { u.get('unit_id'): u for u in (latest_metric.ams_units or []) } ams_groups = [] for uid, label in sorted(seen_units.items()): unit_meta = units_meta.get(str(uid), {}) ams_groups.append({ 'unit_id': uid, 'ams_type': label, 'label': f"{label or 'AMS'} (Unit {uid})", 'humidity': unit_meta.get('humidity'), 'temp': unit_meta.get('temp'), 'filaments': [f for f in filaments_list if f.get('ams_unit_id') == uid], }) subtask_name = latest_metric.subtask_name or "No active print" # Look up active PrintJob for a better display name (cloud design_title) job_display_name = subtask_name if latest_metric.subtask_name: active_job = ( PrintJob.objects.filter( device=printer_device, project_name=latest_metric.subtask_name, end_time__isnull=True, ).select_related('cloud_task').first() or PrintJob.objects.filter( device=printer_device, project_name=latest_metric.subtask_name, ).select_related('cloud_task').order_by('-start_time').first() ) if active_job: job_display_name = active_job.display_name stats = { "nozzle_temp": float(latest_metric.nozzle_temp) if latest_metric.nozzle_temp else 0, "nozzle_target_temp": float(latest_metric.nozzle_target_temp) if latest_metric.nozzle_target_temp else 0, "nozzle_diameter": float(latest_metric.nozzle_diameter) if latest_metric.nozzle_diameter else None, "nozzle_type": latest_metric.nozzle_type or "", "nozzle_temp_left": float(latest_metric.nozzle_temp_left) if latest_metric.nozzle_temp_left is not None else None, "nozzle_target_temp_left": float(latest_metric.nozzle_target_temp_left) if latest_metric.nozzle_target_temp_left is not None else None, "nozzle_diameter_left": float(latest_metric.nozzle_diameter_left) if latest_metric.nozzle_diameter_left is not None else None, "nozzle_type_left": latest_metric.nozzle_type_left or "", "is_dual_nozzle": latest_metric.nozzle_temp_left is not None, "bed_temp": float(latest_metric.bed_temp) if latest_metric.bed_temp else 0, "chamber_temp": float(latest_metric.chamber_temp) if latest_metric.chamber_temp else 0, "print_percent": latest_metric.print_percent or 0, "gcode_state": latest_metric.gcode_state or "Unknown", "print_type": latest_metric.print_type or "idle", "subtask_name": subtask_name, "job_display_name": job_display_name, "chamber_light": latest_metric.chamber_light or "unknown", "ams_temp": float(latest_metric.ams_temp) if latest_metric.ams_temp else None, "ams_humidity": latest_metric.ams_humidity, "filaments": filaments_list, "ams_units": ams_units_list, "ams_groups": ams_groups, "external_spool": latest_metric.external_spool or {}, "timestamp": latest_metric.timestamp.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S"), } project_markers = self._calculate_project_markers(list(metrics), tz) printer_data_json["project_markers"] = project_markers context["printer_device"] = printer_device context["device_name"] = printer_device.name context["stats"] = stats context["metrics_count"] = metrics.count() context["printer_data_json"] = json.dumps(printer_data_json) return context def _calculate_project_markers(self, metrics, timezone_info): """Calculate where print jobs start and end, using cloud design_title when available.""" if not metrics: return [] # Build a lookup: subtask_name -> display_name from PrintJobs in this time window window_start = metrics[0].timestamp window_end = metrics[-1].timestamp device = metrics[0].device jobs_qs = PrintJob.objects.filter( device=device, start_time__gte=window_start - timedelta(minutes=5), start_time__lte=window_end + timedelta(minutes=5), ).select_related('cloud_task') # Map project_name (= subtask_name) -> best display name subtask_to_display = {} for job in jobs_qs: subtask_to_display[job.project_name] = job.display_name markers = [] current_job = None last_state = None for idx, metric in enumerate(metrics): subtask = metric.subtask_name gcode_state = metric.gcode_state is_printing = gcode_state not in ['FINISH', 'IDLE', None, ''] if subtask and subtask != current_job and is_printing: display = subtask_to_display.get(subtask, subtask) markers.append({ 'type': 'start', 'index': idx, 'timestamp': metric.timestamp.astimezone(timezone_info).isoformat(), 'project_name': display, }) current_job = subtask last_state = gcode_state elif current_job and last_state and last_state not in ['FINISH', 'IDLE'] and gcode_state in ['FINISH', 'IDLE']: display = subtask_to_display.get(current_job, current_job) markers.append({ 'type': 'end', 'index': idx, 'timestamp': metric.timestamp.astimezone(timezone_info).isoformat(), 'project_name': display, }) current_job = None last_state = gcode_state return markers def _prepare_filament_timeline(self, metrics): """Prepare filament data organized by unique filament configurations.""" filament_data = {} total_points = len(metrics) for idx, metric in enumerate(metrics): try: snapshots = metric.filament_snapshots.all() except Exception: snapshots = [] for snapshot in snapshots: tray_id = snapshot.tray_id ams_unit_id = snapshot.ams_unit_id ams_type = snapshot.ams_type or '' fil_type = snapshot.type or 'Unknown' fil_sub_type = snapshot.sub_type or 'Unknown' fil_color = snapshot.color or 'FFFFFFFF' unique_key = f"{ams_unit_id}_{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}" if unique_key not in filament_data: filament_data[unique_key] = { 'tray_id': tray_id, 'ams_unit_id': ams_unit_id, 'ams_type': ams_type, 'type': fil_type, 'brand': fil_sub_type, 'color': fil_color, 'remain_data': [None] * total_points, 'start_idx': idx, } remain_percent = snapshot.remain_percent or 0 filament_data[unique_key]['remain_data'][idx] = remain_percent for idx, metric in enumerate(metrics): external = metric.external_spool or {} if external.get('type'): fil_type = external.get('type', 'Unknown') fil_color = external.get('color', '161616FF') unique_key = f"External_{fil_type}_{fil_color}" if unique_key not in filament_data: filament_data[unique_key] = { 'tray_id': 'External', 'type': fil_type, 'brand': 'External', 'color': fil_color, 'remain_data': [None] * total_points, 'start_idx': idx, } remain_percent = external.get('remain', 0) filament_data[unique_key]['remain_data'][idx] = remain_percent return filament_data class PrinterDataAPIView(LoginRequiredMixin, View): """API endpoint for dynamic printer chart updates""" def get(self, request, pk=None): start_date = request.GET.get("start_date") end_date = request.GET.get("end_date") start_time = request.GET.get("start_time", "00:00") end_time = request.GET.get("end_time", "23:59") try: if pk is not None: printer_device = Printer.objects.filter(pk=pk, is_active=True).first() if not printer_device: return JsonResponse({"error": "Printer not found"}, status=404) else: printer_device = Printer.objects.filter(is_active=True).first() if not printer_device: return JsonResponse({"error": "No printer device found"}, status=404) tz = zoneinfo.ZoneInfo(app_settings.TIMEZONE) # Stage A: only() + step calculation query = ( PrinterMetrics.objects .filter(device=printer_device) .only(*_METRICS_API_FIELDS) ) if start_date and start_time and end_date and end_time: start_dt = datetime.strptime(f"{start_date} {start_time}", "%Y-%m-%d %H:%M").replace(tzinfo=tz) end_dt = datetime.strptime(f"{end_date} {end_time}", "%Y-%m-%d %H:%M").replace(tzinfo=tz) query = query.filter(timestamp__gte=start_dt, timestamp__lte=end_dt) range_seconds = (end_dt - start_dt).total_seconds() expected_count = max(1, int(range_seconds / 30)) elif start_date and start_time: start_dt = datetime.strptime(f"{start_date} {start_time}", "%Y-%m-%d %H:%M").replace(tzinfo=tz) query = query.filter(timestamp__gte=start_dt) expected_count = _MAX_CHART_POINTS elif end_date and end_time: end_dt = datetime.strptime(f"{end_date} {end_time}", "%Y-%m-%d %H:%M").replace(tzinfo=tz) query = query.filter(timestamp__lte=end_dt) expected_count = _MAX_CHART_POINTS else: expected_count = _MAX_CHART_POINTS step = max(1, expected_count // _MAX_CHART_POINTS) # Stage B: single DB round-trip, downsample in Python metrics_list = list(query.order_by("timestamp")) if step > 1: metrics_list = metrics_list[::step] total_points = len(metrics_list) # Stage C: targeted snapshot fetch (only sampled IDs) snapshots_by_metric: dict = {} if metrics_list: sampled_ids = [m.id for m in metrics_list] for snap in FilamentSnapshot.objects.filter(printer_metric_id__in=sampled_ids): snapshots_by_metric.setdefault(snap.printer_metric_id, []).append(snap) # Stage D: single-pass serialization timestamps = [] timestamps_iso = [] dates = [] nozzle_temp = [] nozzle_target_temp = [] nozzle_temp_left = [] nozzle_target_temp_left = [] bed_temp = [] bed_target_temp = [] print_percent = [] cooling_fan_speed = [] heatbreak_fan_speed = [] wifi_signal_dbm = [] ams_humidity_raw = [] ams_temp = [] layer_num = [] total_layer_num = [] gcode_state = [] print_type = [] subtask_name = [] project_markers = [] current_job = None last_state = None filament_data = {} for idx, m in enumerate(metrics_list): ts = m.timestamp.astimezone(tz) timestamps.append(ts.strftime('%H:%M')) timestamps_iso.append(ts.isoformat()) dates.append(ts.strftime('%Y-%m-%d')) nozzle_temp.append(float(m.nozzle_temp) if m.nozzle_temp else None) nozzle_target_temp.append(float(m.nozzle_target_temp) if m.nozzle_target_temp else None) nozzle_temp_left.append(float(m.nozzle_temp_left) if m.nozzle_temp_left is not None else None) nozzle_target_temp_left.append(float(m.nozzle_target_temp_left) if m.nozzle_target_temp_left is not None else None) bed_temp.append(float(m.bed_temp) if m.bed_temp else None) bed_target_temp.append(float(m.bed_target_temp) if m.bed_target_temp else None) print_percent.append(m.print_percent if m.print_percent else 0) cooling_fan_speed.append(m.cooling_fan_speed if m.cooling_fan_speed else 0) heatbreak_fan_speed.append(m.heatbreak_fan_speed if m.heatbreak_fan_speed else 0) wifi_signal_dbm.append(m.wifi_signal_dbm if m.wifi_signal_dbm else None) ams_humidity_raw.append(m.ams_humidity_raw if m.ams_humidity_raw else None) ams_temp.append(float(m.ams_temp) if m.ams_temp else None) layer_num.append(m.layer_num if m.layer_num else 0) total_layer_num.append(m.total_layer_num if m.total_layer_num else 0) gcode_state.append(m.gcode_state) print_type.append(m.print_type) subtask_name.append(m.subtask_name) # Project marker detection (inline) subtask = m.subtask_name gs = m.gcode_state is_printing = gs not in ['FINISH', 'IDLE', None, ''] if subtask and subtask != current_job and is_printing: project_markers.append({ 'type': 'start', 'index': idx, 'timestamp': ts.isoformat(), 'project_name': subtask, }) current_job = subtask last_state = gs elif current_job and last_state and last_state not in ['FINISH', 'IDLE'] and gs in ['FINISH', 'IDLE']: project_markers.append({ 'type': 'end', 'index': idx, 'timestamp': ts.isoformat(), 'project_name': current_job, }) current_job = None last_state = gs # Filament timeline (inline) for snap in snapshots_by_metric.get(m.id, []): tray_id = snap.tray_id fil_type = snap.type or 'Unknown' fil_sub_type = snap.sub_type or 'Unknown' fil_color = snap.color or 'FFFFFFFF' unique_key = f"{tray_id}_{fil_type}_{fil_sub_type}_{fil_color}" if unique_key not in filament_data: filament_data[unique_key] = { 'tray_id': tray_id, 'type': fil_type, 'brand': fil_sub_type, 'color': fil_color, 'remain_data': [None] * total_points, 'start_idx': idx, } filament_data[unique_key]['remain_data'][idx] = snap.remain_percent or 0 external = m.external_spool or {} if external.get('type'): fil_type = external.get('type', 'Unknown') fil_color = external.get('color', '161616FF') unique_key = f"External_{fil_type}_{fil_color}" if unique_key not in filament_data: filament_data[unique_key] = { 'tray_id': 'External', 'type': fil_type, 'brand': 'External', 'color': fil_color, 'remain_data': [None] * total_points, 'start_idx': idx, } filament_data[unique_key]['remain_data'][idx] = external.get('remain', 0) data = { "timestamps": timestamps, "timestamps_iso": timestamps_iso, "dates": dates, "nozzle_temp": nozzle_temp, "nozzle_target_temp": nozzle_target_temp, "nozzle_temp_left": nozzle_temp_left, "nozzle_target_temp_left": nozzle_target_temp_left, "bed_temp": bed_temp, "bed_target_temp": bed_target_temp, "print_percent": print_percent, "cooling_fan_speed": cooling_fan_speed, "heatbreak_fan_speed": heatbreak_fan_speed, "wifi_signal_dbm": wifi_signal_dbm, "ams_humidity_raw": ams_humidity_raw, "ams_temp": ams_temp, "layer_num": layer_num, "total_layer_num": total_layer_num, "gcode_state": gcode_state, "print_type": print_type, "subtask_name": subtask_name, "project_markers": project_markers, "filament_timeline": filament_data, } return JsonResponse(data) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"error": str(e)}, status=500) class FilamentUsageDataAPIView(LoginRequiredMixin, View): """API endpoint for filament usage history with date/time filtering""" def get(self, request, pk): start_date = request.GET.get("start_date") end_date = request.GET.get("end_date") start_time = request.GET.get("start_time", "00:00") end_time = request.GET.get("end_time", "23:59") try: filament = Filament.objects.get(pk=pk) tz = zoneinfo.ZoneInfo(app_settings.TIMEZONE) query = filament.usage_snapshots.select_related('printer_metric') if start_date and start_time: from datetime import datetime start_dt_naive = datetime.strptime(f"{start_date} {start_time}", "%Y-%m-%d %H:%M") start_dt = start_dt_naive.replace(tzinfo=tz) query = query.filter(printer_metric__timestamp__gte=start_dt) if end_date and end_time: from datetime import datetime end_dt_naive = datetime.strptime(f"{end_date} {end_time}", "%Y-%m-%d %H:%M") end_dt = end_dt_naive.replace(tzinfo=tz) query = query.filter(printer_metric__timestamp__lte=end_dt) fallback_used = False if not start_date and not end_date: time_24h_ago = timezone.now() - timedelta(hours=24) default_query = query.filter(printer_metric__timestamp__gte=time_24h_ago) if default_query.exists(): snapshots = default_query.order_by('printer_metric__timestamp') else: # Fallback: show 24h window ending at the most recent available snapshot last_snapshot = query.order_by('-printer_metric__timestamp').first() if last_snapshot: last_ts = last_snapshot.printer_metric.timestamp fallback_start = last_ts - timedelta(hours=24) snapshots = query.filter( printer_metric__timestamp__gte=fallback_start, printer_metric__timestamp__lte=last_ts ).order_by('printer_metric__timestamp') fallback_used = True else: snapshots = query.none() else: snapshots = query.order_by('printer_metric__timestamp') data = { "timestamps": [s.printer_metric.timestamp.astimezone(tz).strftime('%Y-%m-%d %H:%M') for s in snapshots], "remaining": [s.remain_percent for s in snapshots], "fallback_used": fallback_used, } return JsonResponse(data) except Filament.DoesNotExist: return JsonResponse({"error": "Filament not found"}, status=404) except Exception as e: import traceback traceback.print_exc() return JsonResponse({"error": str(e)}, status=500) # ==================== Filament CRUD Views ==================== class FilamentListView(LoginRequiredMixin, ListView): model = Filament template_name = 'bambu_run/filament_list.html' context_object_name = 'filaments' paginate_by = 20 def get_queryset(self): queryset = Filament.objects.all() filament_type = self.request.GET.get('type') if filament_type: queryset = queryset.filter(type=filament_type) loaded = self.request.GET.get('loaded') if loaded == 'yes': queryset = queryset.filter(is_loaded_in_ams=True) elif loaded == 'no': queryset = queryset.filter(is_loaded_in_ams=False) ams_type = self.request.GET.get('ams_type') if ams_type: queryset = queryset.filter(ams_type=ams_type) search = self.request.GET.get('search') if search: queryset = queryset.filter( Q(brand__icontains=search) | Q(color__icontains=search) | Q(type__icontains=search) ) return queryset def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['total_spools'] = Filament.objects.count() context['loaded_spools'] = Filament.objects.filter(is_loaded_in_ams=True).count() context['low_filaments'] = Filament.objects.filter(remaining_percent__lt=20).count() context['filament_types'] = sorted( set(Filament.objects.exclude(type__isnull=True).exclude(type='').values_list('type', flat=True)) ) context['ams_type_choices'] = sorted( set( Filament.objects.exclude(ams_type='').values_list('ams_type', flat=True) ) ) return context def _filament_type_map(): """Return a JSON-serialisable dict mapping FilamentType pk → {type, sub_type, brand}.""" return { str(ft.pk): {'type': ft.type, 'sub_type': ft.sub_type or '', 'brand': ft.brand} for ft in FilamentType.objects.all() } class FilamentCreateView(LoginRequiredMixin, CreateView): model = Filament form_class = FilamentForm template_name = 'bambu_run/filament_form.html' success_url = reverse_lazy('bambu_run:filament_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['filament_type_map'] = json.dumps(_filament_type_map()) return context def form_valid(self, form): messages.success(self.request, f'Filament spool "{form.instance}" added successfully!') return super().form_valid(form) class FilamentUpdateView(LoginRequiredMixin, UpdateView): model = Filament form_class = FilamentForm template_name = 'bambu_run/filament_form.html' success_url = reverse_lazy('bambu_run:filament_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['filament_type_map'] = json.dumps(_filament_type_map()) return context def form_valid(self, form): messages.success(self.request, f'Filament spool "{form.instance}" updated successfully!') return super().form_valid(form) class FilamentDeleteView(LoginRequiredMixin, DeleteView): model = Filament template_name = 'bambu_run/filament_confirm_delete.html' success_url = reverse_lazy('bambu_run:filament_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE return context def delete(self, request, *args, **kwargs): filament = self.get_object() messages.success(self.request, f'Filament spool "{filament}" has been deleted.') return super().delete(request, *args, **kwargs) class FilamentDetailView(LoginRequiredMixin, DetailView): model = Filament template_name = 'bambu_run/filament_detail.html' context_object_name = 'filament' def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE filament = self.object context['print_usages'] = filament.print_usages.select_related('print_job__cloud_task').order_by('-print_job__start_time')[:20] total_consumed = filament.print_usages.aggregate( total=Sum('consumed_percent') )['total'] or 0 context['total_consumed_percent'] = total_consumed return context # ==================== FilamentColor Views ==================== class FilamentColorListView(LoginRequiredMixin, ListView): model = FilamentColor template_name = 'bambu_run/filament_color_list.html' context_object_name = 'colors' paginate_by = 50 def get_queryset(self): return FilamentColor.objects.all().order_by('filament_type', 'filament_sub_type', 'color_name') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['total_colors'] = FilamentColor.objects.count() return context class FilamentColorCreateView(LoginRequiredMixin, CreateView): model = FilamentColor form_class = FilamentColorForm template_name = 'bambu_run/filament_color_form.html' success_url = reverse_lazy('bambu_run:filament_color_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE return context def form_valid(self, form): response = super().form_valid(form) self._update_matching_filaments(self.object) return response def _update_matching_filaments(self, filament_color): from .utils import match_and_update_filament_color updated_count = match_and_update_filament_color(filament_color) if updated_count > 0: messages.success( self.request, f"Color '{filament_color.color_name}' created! " f"Updated {updated_count} matching filament spool(s)." ) class FilamentColorUpdateView(LoginRequiredMixin, UpdateView): model = FilamentColor form_class = FilamentColorForm template_name = 'bambu_run/filament_color_form.html' success_url = reverse_lazy('bambu_run:filament_color_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE return context def form_valid(self, form): response = super().form_valid(form) self._update_matching_filaments(self.object) return response def _update_matching_filaments(self, filament_color): from .utils import match_and_update_filament_color updated_count = match_and_update_filament_color(filament_color) if updated_count > 0: messages.success( self.request, f"Color '{filament_color.color_name}' updated! " f"Updated {updated_count} matching filament spool(s)." ) class FilamentColorDeleteView(LoginRequiredMixin, DeleteView): model = FilamentColor template_name = 'bambu_run/filament_color_confirm_delete.html' success_url = reverse_lazy('bambu_run:filament_color_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE return context def delete(self, request, *args, **kwargs): messages.success(request, f"Color '{self.get_object().color_name}' deleted successfully!") return super().delete(request, *args, **kwargs) # ==================== FilamentType Views ==================== class FilamentTypeListView(LoginRequiredMixin, ListView): model = FilamentType template_name = 'bambu_run/filament_type_list.html' context_object_name = 'types' paginate_by = 50 def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['total_types'] = FilamentType.objects.count() return context class FilamentTypeCreateView(LoginRequiredMixin, CreateView): model = FilamentType form_class = FilamentTypeForm template_name = 'bambu_run/filament_type_form.html' success_url = reverse_lazy('bambu_run:filament_type_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['existing_types'] = list( FilamentType.objects.values_list('type', flat=True).distinct().order_by('type') ) context['existing_sub_types'] = list( FilamentType.objects.exclude(sub_type__isnull=True).exclude(sub_type='') .values_list('sub_type', flat=True).distinct().order_by('sub_type') ) context['existing_brands'] = list( FilamentType.objects.values_list('brand', flat=True).distinct().order_by('brand') ) context['preset_types'] = FilamentTypeForm.PRESET_TYPES context['preset_sub_types'] = FilamentTypeForm.PRESET_SUB_TYPES context['preset_brands'] = FilamentTypeForm.PRESET_BRANDS return context def form_valid(self, form): messages.success(self.request, f'Filament type "{form.instance}" added successfully!') return super().form_valid(form) class FilamentTypeUpdateView(LoginRequiredMixin, UpdateView): model = FilamentType form_class = FilamentTypeForm template_name = 'bambu_run/filament_type_form.html' success_url = reverse_lazy('bambu_run:filament_type_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE context['existing_types'] = list( FilamentType.objects.values_list('type', flat=True).distinct().order_by('type') ) context['existing_sub_types'] = list( FilamentType.objects.exclude(sub_type__isnull=True).exclude(sub_type='') .values_list('sub_type', flat=True).distinct().order_by('sub_type') ) context['existing_brands'] = list( FilamentType.objects.values_list('brand', flat=True).distinct().order_by('brand') ) context['preset_types'] = FilamentTypeForm.PRESET_TYPES context['preset_sub_types'] = FilamentTypeForm.PRESET_SUB_TYPES context['preset_brands'] = FilamentTypeForm.PRESET_BRANDS return context def form_valid(self, form): messages.success(self.request, f'Filament type "{form.instance}" updated successfully!') return super().form_valid(form) class FilamentTypeDeleteView(LoginRequiredMixin, DeleteView): model = FilamentType template_name = 'bambu_run/filament_type_confirm_delete.html' success_url = reverse_lazy('bambu_run:filament_type_list') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['bambu_run_base_template'] = app_settings.BASE_TEMPLATE return context def delete(self, request, *args, **kwargs): messages.success(request, f"Filament type '{self.get_object()}' deleted successfully!") return super().delete(request, *args, **kwargs)