mirror of
https://github.com/RunLit/Bambu-Run.git
synced 2026-06-22 14:09:04 +01:00
feat: MCP server, Bambu Cloud task sync & display name fix (#7)
* added mcp initial trail files * timestamp use your local django timezone * added bambu cloud task sync with correct endpoint other than py cloud api * back fill and relink print name using cloud if there is * use correct bump-version
This commit is contained in:
728
bambu_run/mcp_tools.py
Normal file
728
bambu_run/mcp_tools.py
Normal file
@@ -0,0 +1,728 @@
|
||||
"""
|
||||
Pure Django ORM query functions for MCP tools.
|
||||
|
||||
Zero dependency on the `mcp` package — returns markdown strings.
|
||||
RAE can reuse these directly.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
from decimal import Decimal
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from django.db.models import Avg, Count, Max, Min, Q, Sum
|
||||
from django.utils import timezone
|
||||
|
||||
from .conf import app_settings
|
||||
|
||||
|
||||
def _local_dt(dt, fmt="%Y-%m-%d %H:%M %Z"):
|
||||
"""Convert a UTC-aware datetime to the configured local timezone for display."""
|
||||
if dt is None:
|
||||
return "—"
|
||||
tz = ZoneInfo(app_settings.TIMEZONE)
|
||||
return dt.astimezone(tz).strftime(fmt)
|
||||
|
||||
|
||||
def _redact(value, label="[redacted]"):
|
||||
"""Redact sensitive values if MCP_HIDE_SENSITIVE is enabled."""
|
||||
if app_settings.MCP_HIDE_SENSITIVE:
|
||||
return label
|
||||
return value
|
||||
|
||||
|
||||
def _job_name(job):
|
||||
"""Return the best available display name for a print job.
|
||||
|
||||
Prefers cloud design_title (e.g., 'Planetary Gears Finger Fidget Spinners')
|
||||
over the MQTT subtask_name (e.g., 'All variants at 0.16mm high quality').
|
||||
Falls back to project_name for local/SD prints with no cloud task.
|
||||
"""
|
||||
if job.cloud_task_id and job.cloud_task and job.cloud_task.design_title:
|
||||
return job.cloud_task.design_title
|
||||
return job.project_name
|
||||
|
||||
|
||||
def _format_duration(minutes):
|
||||
"""Format minutes into human-readable duration."""
|
||||
if minutes is None:
|
||||
return "Unknown"
|
||||
hours, mins = divmod(int(minutes), 60)
|
||||
if hours > 0:
|
||||
return f"{hours}h {mins}m"
|
||||
return f"{mins}m"
|
||||
|
||||
|
||||
def _format_temp(temp):
|
||||
"""Format temperature value."""
|
||||
if temp is None:
|
||||
return "N/A"
|
||||
return f"{temp}°C"
|
||||
|
||||
|
||||
# ─── Tools ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def get_printer_status(printer_id=None):
|
||||
"""Current live status of printer(s) including temps, progress, AMS, errors."""
|
||||
from .models import Printer, PrinterMetrics
|
||||
|
||||
printers = Printer.objects.filter(is_active=True)
|
||||
if printer_id:
|
||||
printers = printers.filter(id=printer_id)
|
||||
|
||||
if not printers.exists():
|
||||
return "No printers found."
|
||||
|
||||
parts = []
|
||||
for printer in printers:
|
||||
metric = PrinterMetrics.objects.filter(device=printer).first()
|
||||
if not metric:
|
||||
parts.append(f"## {printer.name}\n**No data available yet.**\n")
|
||||
continue
|
||||
|
||||
state = metric.gcode_state or "Unknown"
|
||||
lines = [f"## Printer Status: {printer.name}"]
|
||||
lines.append(f"**Model**: {printer.model} | **Serial**: {_redact(printer.serial_number)}")
|
||||
lines.append(f"**IP**: {_redact(printer.ip_address)} | **Location**: {printer.location or 'N/A'}")
|
||||
lines.append(f"**State**: {state}")
|
||||
|
||||
if metric.print_percent is not None and state == "RUNNING":
|
||||
layer_info = ""
|
||||
if metric.layer_num is not None and metric.total_layer_num:
|
||||
layer_info = f" (Layer {metric.layer_num}/{metric.total_layer_num})"
|
||||
lines.append(f"**Progress**: {metric.print_percent}%{layer_info}")
|
||||
if metric.subtask_name:
|
||||
lines.append(f"**Project**: {metric.subtask_name}")
|
||||
if metric.remaining_time_min:
|
||||
lines.append(f"**ETA**: {_format_duration(metric.remaining_time_min)} remaining")
|
||||
|
||||
# Temperatures
|
||||
lines.append("")
|
||||
lines.append("### Temperatures")
|
||||
lines.append("| Component | Current | Target |")
|
||||
lines.append("|-----------|---------|--------|")
|
||||
lines.append(f"| Nozzle | {_format_temp(metric.nozzle_temp)} | {_format_temp(metric.nozzle_target_temp)} |")
|
||||
lines.append(f"| Bed | {_format_temp(metric.bed_temp)} | {_format_temp(metric.bed_target_temp)} |")
|
||||
lines.append(f"| Chamber | {_format_temp(metric.chamber_temp)} | - |")
|
||||
|
||||
# AMS filaments from JSON
|
||||
if metric.filaments:
|
||||
lines.append("")
|
||||
lines.append("### AMS Slots")
|
||||
lines.append("| Slot | Material | Color | Remaining |")
|
||||
lines.append("|------|----------|-------|-----------|")
|
||||
for f in metric.filaments:
|
||||
slot = f.get("slot", "?")
|
||||
ftype = f.get("sub_type") or f.get("type", "?")
|
||||
color = f.get("color", "")
|
||||
color_display = f"#{color[:6]}" if color and len(color) >= 6 else "?"
|
||||
remain = f.get("remain_percent", "?")
|
||||
lines.append(f"| {slot} | {ftype} | {color_display} | {remain}% |")
|
||||
|
||||
# Errors
|
||||
if metric.has_errors or metric.hms:
|
||||
lines.append("")
|
||||
lines.append("### Alerts")
|
||||
if metric.print_error:
|
||||
lines.append(f"- Print error code: {metric.print_error}")
|
||||
if metric.hms:
|
||||
for msg in metric.hms[:5]:
|
||||
lines.append(f"- HMS: {msg}")
|
||||
|
||||
lines.append(f"\n*Last updated: {_local_dt(metric.timestamp, '%Y-%m-%d %H:%M:%S %Z')}*")
|
||||
parts.append("\n".join(lines))
|
||||
|
||||
return "\n\n---\n\n".join(parts)
|
||||
|
||||
|
||||
def list_printers():
|
||||
"""List all registered printers."""
|
||||
from .models import Printer
|
||||
|
||||
printers = Printer.objects.all()
|
||||
if not printers.exists():
|
||||
return "No printers registered."
|
||||
|
||||
lines = ["# Printers", ""]
|
||||
lines.append("| ID | Name | Model | Active | Serial | IP | Location |")
|
||||
lines.append("|----|------|-------|--------|--------|----|----------|")
|
||||
for p in printers:
|
||||
lines.append(
|
||||
f"| {p.id} | {p.name} | {p.model} | "
|
||||
f"{'Yes' if p.is_active else 'No'} | "
|
||||
f"{_redact(p.serial_number)} | {_redact(p.ip_address)} | "
|
||||
f"{p.location or '-'} |"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def get_print_history(status=None, days=None, project_name=None, limit=20):
|
||||
"""Print job history with optional filters."""
|
||||
from .models import PrintJob
|
||||
|
||||
qs = PrintJob.objects.select_related("device", "cloud_task")
|
||||
|
||||
if status:
|
||||
qs = qs.filter(final_status__iexact=status)
|
||||
if days:
|
||||
cutoff = timezone.now() - timedelta(days=int(days))
|
||||
qs = qs.filter(start_time__gte=cutoff)
|
||||
if project_name:
|
||||
qs = qs.filter(
|
||||
Q(project_name__icontains=project_name)
|
||||
| Q(cloud_task__design_title__icontains=project_name)
|
||||
)
|
||||
|
||||
jobs = qs[:int(limit)]
|
||||
if not jobs:
|
||||
return "No print jobs found matching the criteria."
|
||||
|
||||
lines = ["# Print History", ""]
|
||||
lines.append("| ID | Project | Printer | Status | Progress | Duration | Started |")
|
||||
lines.append("|----|---------|---------|--------|----------|----------|---------|")
|
||||
for j in jobs:
|
||||
lines.append(
|
||||
f"| {j.id} | {_job_name(j)} | {j.device.name} | "
|
||||
f"{j.final_status or 'In Progress'} | {j.completion_percent}% | "
|
||||
f"{_format_duration(j.duration_minutes)} | "
|
||||
f"{_local_dt(j.start_time, '%Y-%m-%d %H:%M')} |"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def get_print_job_detail(job_id):
|
||||
"""Single job detail including filament usage."""
|
||||
from .models import FilamentUsage, PrintJob
|
||||
|
||||
try:
|
||||
job = PrintJob.objects.select_related("device", "cloud_task").get(id=job_id)
|
||||
except PrintJob.DoesNotExist:
|
||||
return f"Print job #{job_id} not found."
|
||||
|
||||
lines = [f"# Print Job: {_job_name(job)}", ""]
|
||||
if job.cloud_task and job.cloud_task.design_title and job.cloud_task.design_title != job.project_name:
|
||||
lines.append(f"**Plate**: {job.project_name}")
|
||||
lines.append(f"**Printer**: {job.device.name}")
|
||||
lines.append(f"**Status**: {job.final_status or 'In Progress'}")
|
||||
lines.append(f"**Progress**: {job.completion_percent}%")
|
||||
if job.gcode_file:
|
||||
lines.append(f"**G-code**: {job.gcode_file}")
|
||||
lines.append(f"**Started**: {_local_dt(job.start_time, '%Y-%m-%d %H:%M:%S %Z')}")
|
||||
if job.end_time:
|
||||
lines.append(f"**Ended**: {_local_dt(job.end_time, '%Y-%m-%d %H:%M:%S %Z')}")
|
||||
lines.append(f"**Duration**: {_format_duration(job.duration_minutes)}")
|
||||
if job.total_layers:
|
||||
lines.append(f"**Total Layers**: {job.total_layers}")
|
||||
|
||||
# Filament usage
|
||||
usages = FilamentUsage.objects.select_related("filament").filter(print_job=job)
|
||||
if usages.exists():
|
||||
lines.append("")
|
||||
lines.append("### Filament Usage")
|
||||
lines.append("| Spool | Material | Color | Consumed | Grams |")
|
||||
lines.append("|-------|----------|-------|----------|-------|")
|
||||
for u in usages:
|
||||
f = u.filament
|
||||
lines.append(
|
||||
f"| {f.brand} {f.type} | {f.sub_type or f.type} | "
|
||||
f"{f.color} | {u.consumed_percent or 0}% | "
|
||||
f"{u.consumed_grams or '-'}g |"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def list_filaments(type=None, brand=None, color=None, loaded_in_ams=None, low_filament=None):
|
||||
"""Filament inventory with optional filters."""
|
||||
from .models import Filament
|
||||
|
||||
qs = Filament.objects.all()
|
||||
if type:
|
||||
qs = qs.filter(type__iexact=type)
|
||||
if brand:
|
||||
qs = qs.filter(brand__icontains=brand)
|
||||
if color:
|
||||
qs = qs.filter(color__icontains=color)
|
||||
if loaded_in_ams is not None:
|
||||
qs = qs.filter(is_loaded_in_ams=loaded_in_ams)
|
||||
if low_filament:
|
||||
qs = qs.filter(remaining_percent__lte=20)
|
||||
|
||||
filaments = qs[:50]
|
||||
if not filaments:
|
||||
return "No filaments found matching the criteria."
|
||||
|
||||
lines = ["# Filament Inventory", ""]
|
||||
lines.append(f"*{qs.count()} spools total*\n")
|
||||
lines.append("| ID | Brand | Type | Color | Remaining | In AMS | Last Used |")
|
||||
lines.append("|----|-------|------|-------|-----------|--------|-----------|")
|
||||
for f in filaments:
|
||||
color_display = f"{f.color}"
|
||||
if f.color_hex:
|
||||
color_display += f" ({f.color_hex})"
|
||||
last_used = _local_dt(f.last_used, "%Y-%m-%d") if f.last_used else "-"
|
||||
lines.append(
|
||||
f"| {f.id} | {f.brand} | {f.sub_type or f.type} | "
|
||||
f"{color_display} | {f.remaining_percent}% | "
|
||||
f"{'Yes' if f.is_loaded_in_ams else 'No'} | {last_used} |"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def get_filament_detail(filament_id):
|
||||
"""Single spool detail with usage history."""
|
||||
from .models import Filament, FilamentUsage
|
||||
|
||||
try:
|
||||
f = Filament.objects.get(id=filament_id)
|
||||
except Filament.DoesNotExist:
|
||||
return f"Filament #{filament_id} not found."
|
||||
|
||||
lines = [f"# Filament: {f.brand} {f.type} - {f.color}", ""]
|
||||
lines.append(f"**Type**: {f.sub_type or f.type}")
|
||||
lines.append(f"**Brand**: {f.brand}")
|
||||
lines.append(f"**Color**: {f.color} ({f.color_hex or 'N/A'})")
|
||||
lines.append(f"**Remaining**: {f.remaining_percent}%")
|
||||
if f.remaining_weight_grams:
|
||||
lines.append(f"**Remaining Weight**: {f.remaining_weight_grams}g / {f.initial_weight_grams or '?'}g")
|
||||
lines.append(f"**In AMS**: {'Yes (slot ' + str(f.current_tray_id) + ')' if f.is_loaded_in_ams else 'No'}")
|
||||
lines.append(f"**Created By**: {f.created_by}")
|
||||
if f.tray_uuid:
|
||||
lines.append(f"**Serial**: {_redact(f.tray_uuid)}")
|
||||
if f.purchase_date:
|
||||
lines.append(f"**Purchased**: {f.purchase_date}")
|
||||
if f.notes:
|
||||
lines.append(f"**Notes**: {f.notes}")
|
||||
|
||||
# Usage history
|
||||
usages = FilamentUsage.objects.select_related("print_job").filter(filament=f).order_by("-print_job__start_time")[:10]
|
||||
if usages.exists():
|
||||
lines.append("")
|
||||
lines.append("### Recent Print Usage")
|
||||
lines.append("| Job | Date | Consumed | Grams |")
|
||||
lines.append("|-----|------|----------|-------|")
|
||||
for u in usages:
|
||||
lines.append(
|
||||
f"| {u.print_job.project_name} | "
|
||||
f"{_local_dt(u.print_job.start_time, '%Y-%m-%d')} | "
|
||||
f"{u.consumed_percent or 0}% | {u.consumed_grams or '-'}g |"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def get_temperature_history(printer_id=None, hours=6, metric="all"):
|
||||
"""Temperature trends as summary stats (avg/min/max) over recent hours."""
|
||||
from .models import Printer, PrinterMetrics
|
||||
|
||||
cutoff = timezone.now() - timedelta(hours=int(hours))
|
||||
|
||||
qs = PrinterMetrics.objects.filter(timestamp__gte=cutoff)
|
||||
if printer_id:
|
||||
qs = qs.filter(device_id=printer_id)
|
||||
|
||||
if not qs.exists():
|
||||
return f"No temperature data in the last {hours} hours."
|
||||
|
||||
printers = Printer.objects.filter(
|
||||
id__in=qs.values_list("device_id", flat=True).distinct()
|
||||
)
|
||||
|
||||
parts = [f"# Temperature History (last {hours}h)", ""]
|
||||
for printer in printers:
|
||||
pqs = qs.filter(device=printer)
|
||||
stats = pqs.aggregate(
|
||||
nozzle_avg=Avg("nozzle_temp"),
|
||||
nozzle_min=Min("nozzle_temp"),
|
||||
nozzle_max=Max("nozzle_temp"),
|
||||
bed_avg=Avg("bed_temp"),
|
||||
bed_min=Min("bed_temp"),
|
||||
bed_max=Max("bed_temp"),
|
||||
chamber_avg=Avg("chamber_temp"),
|
||||
chamber_min=Min("chamber_temp"),
|
||||
chamber_max=Max("chamber_temp"),
|
||||
)
|
||||
|
||||
parts.append(f"## {printer.name}")
|
||||
parts.append(f"*{pqs.count()} data points*\n")
|
||||
parts.append("| Sensor | Avg | Min | Max |")
|
||||
parts.append("|--------|-----|-----|-----|")
|
||||
|
||||
if metric in ("all", "nozzle"):
|
||||
parts.append(
|
||||
f"| Nozzle | {_format_temp(stats['nozzle_avg'])} | "
|
||||
f"{_format_temp(stats['nozzle_min'])} | {_format_temp(stats['nozzle_max'])} |"
|
||||
)
|
||||
if metric in ("all", "bed"):
|
||||
parts.append(
|
||||
f"| Bed | {_format_temp(stats['bed_avg'])} | "
|
||||
f"{_format_temp(stats['bed_min'])} | {_format_temp(stats['bed_max'])} |"
|
||||
)
|
||||
if metric in ("all", "chamber"):
|
||||
parts.append(
|
||||
f"| Chamber | {_format_temp(stats['chamber_avg'])} | "
|
||||
f"{_format_temp(stats['chamber_min'])} | {_format_temp(stats['chamber_max'])} |"
|
||||
)
|
||||
parts.append("")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def get_filament_usage_stats(days=30, group_by="type"):
|
||||
"""Aggregate filament consumption statistics."""
|
||||
from .models import FilamentUsage
|
||||
|
||||
cutoff = timezone.now() - timedelta(days=int(days))
|
||||
qs = FilamentUsage.objects.filter(
|
||||
print_job__start_time__gte=cutoff,
|
||||
consumed_grams__isnull=False,
|
||||
).select_related("filament")
|
||||
|
||||
if not qs.exists():
|
||||
return f"No filament usage data in the last {days} days."
|
||||
|
||||
lines = [f"# Filament Usage Stats (last {days} days)", ""]
|
||||
|
||||
if group_by == "type":
|
||||
stats = (
|
||||
qs.values("filament__type")
|
||||
.annotate(
|
||||
total_grams=Sum("consumed_grams"),
|
||||
total_percent=Sum("consumed_percent"),
|
||||
job_count=Count("print_job", distinct=True),
|
||||
)
|
||||
.order_by("-total_grams")
|
||||
)
|
||||
lines.append("| Type | Total Grams | Jobs | Avg Grams/Job |")
|
||||
lines.append("|------|-------------|------|---------------|")
|
||||
for s in stats:
|
||||
avg = s["total_grams"] / s["job_count"] if s["job_count"] else 0
|
||||
lines.append(
|
||||
f"| {s['filament__type']} | {s['total_grams']}g | "
|
||||
f"{s['job_count']} | {avg:.0f}g |"
|
||||
)
|
||||
elif group_by == "color":
|
||||
stats = (
|
||||
qs.values("filament__color", "filament__type")
|
||||
.annotate(total_grams=Sum("consumed_grams"), job_count=Count("print_job", distinct=True))
|
||||
.order_by("-total_grams")
|
||||
)
|
||||
lines.append("| Color | Type | Total Grams | Jobs |")
|
||||
lines.append("|-------|------|-------------|------|")
|
||||
for s in stats:
|
||||
lines.append(
|
||||
f"| {s['filament__color']} | {s['filament__type']} | "
|
||||
f"{s['total_grams']}g | {s['job_count']} |"
|
||||
)
|
||||
elif group_by == "spool":
|
||||
stats = (
|
||||
qs.values("filament__id", "filament__brand", "filament__type", "filament__color")
|
||||
.annotate(total_grams=Sum("consumed_grams"), job_count=Count("print_job", distinct=True))
|
||||
.order_by("-total_grams")[:20]
|
||||
)
|
||||
lines.append("| Spool | Total Grams | Jobs |")
|
||||
lines.append("|-------|-------------|------|")
|
||||
for s in stats:
|
||||
lines.append(
|
||||
f"| {s['filament__brand']} {s['filament__type']} {s['filament__color']} | "
|
||||
f"{s['total_grams']}g | {s['job_count']} |"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def get_printer_health(printer_id=None):
|
||||
"""Diagnostics: errors, humidity, wifi, recent failed prints."""
|
||||
from .models import Printer, PrinterMetrics, PrintJob
|
||||
|
||||
printers = Printer.objects.filter(is_active=True)
|
||||
if printer_id:
|
||||
printers = printers.filter(id=printer_id)
|
||||
|
||||
if not printers.exists():
|
||||
return "No printers found."
|
||||
|
||||
parts = ["# Printer Health Report", ""]
|
||||
for printer in printers:
|
||||
latest = PrinterMetrics.objects.filter(device=printer).first()
|
||||
if not latest:
|
||||
parts.append(f"## {printer.name}\n**No data available.**\n")
|
||||
continue
|
||||
|
||||
parts.append(f"## {printer.name}")
|
||||
|
||||
# Connectivity
|
||||
parts.append("### Connectivity")
|
||||
if latest.wifi_signal_dbm is not None:
|
||||
signal = latest.wifi_signal_dbm
|
||||
quality = "Excellent" if signal > -50 else "Good" if signal > -60 else "Fair" if signal > -70 else "Poor"
|
||||
parts.append(f"- WiFi: {signal} dBm ({quality})")
|
||||
parts.append(f"- Last seen: {_local_dt(latest.timestamp, '%Y-%m-%d %H:%M:%S %Z')}")
|
||||
age = (timezone.now() - latest.timestamp).total_seconds()
|
||||
if age > 300:
|
||||
parts.append(f"- **Warning**: No data for {_format_duration(age / 60)}")
|
||||
|
||||
# AMS environment
|
||||
if latest.ams_humidity is not None or latest.ams_temp is not None:
|
||||
parts.append("### AMS Environment")
|
||||
if latest.ams_humidity is not None:
|
||||
hum_status = "OK" if latest.ams_humidity < 5 else "High" if latest.ams_humidity < 8 else "Critical"
|
||||
parts.append(f"- Humidity: {latest.ams_humidity} ({hum_status})")
|
||||
if latest.ams_temp is not None:
|
||||
parts.append(f"- Temperature: {latest.ams_temp}°C")
|
||||
|
||||
# HMS errors
|
||||
if latest.hms:
|
||||
parts.append("### Active HMS Alerts")
|
||||
for msg in latest.hms:
|
||||
parts.append(f"- {msg}")
|
||||
|
||||
# Recent failures
|
||||
week_ago = timezone.now() - timedelta(days=7)
|
||||
failed = PrintJob.objects.filter(
|
||||
device=printer,
|
||||
start_time__gte=week_ago,
|
||||
final_status__in=["FAILED", "CANCELLED"],
|
||||
)
|
||||
if failed.exists():
|
||||
parts.append(f"### Recent Failures (7d): {failed.count()}")
|
||||
for job in failed.select_related("cloud_task")[:5]:
|
||||
parts.append(f"- {_job_name(job)} ({job.final_status}) — {_local_dt(job.start_time, '%m-%d %H:%M')}")
|
||||
|
||||
# Success rate
|
||||
week_jobs = PrintJob.objects.filter(device=printer, start_time__gte=week_ago)
|
||||
total = week_jobs.count()
|
||||
if total > 0:
|
||||
success = week_jobs.filter(final_status="FINISH").count()
|
||||
parts.append(f"\n**7-day success rate**: {success}/{total} ({100 * success // total}%)")
|
||||
|
||||
parts.append("")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def search_print_jobs(query):
|
||||
"""Search print jobs by project name or gcode file."""
|
||||
from .models import PrintJob
|
||||
|
||||
if not query:
|
||||
return "Please provide a search query."
|
||||
|
||||
jobs = PrintJob.objects.select_related("device", "cloud_task").filter(
|
||||
Q(project_name__icontains=query)
|
||||
| Q(gcode_file__icontains=query)
|
||||
| Q(cloud_task__design_title__icontains=query)
|
||||
)[:20]
|
||||
|
||||
if not jobs:
|
||||
return f"No print jobs matching '{query}'."
|
||||
|
||||
lines = [f"# Search Results: '{query}'", ""]
|
||||
lines.append(f"*{len(jobs)} results*\n")
|
||||
lines.append("| ID | Project | Printer | Status | Date |")
|
||||
lines.append("|----|---------|---------|--------|------|")
|
||||
for j in jobs:
|
||||
lines.append(
|
||||
f"| {j.id} | {_job_name(j)} | {j.device.name} | "
|
||||
f"{j.final_status or 'In Progress'} | {_local_dt(j.start_time, '%Y-%m-%d')} |"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def get_printing_summary(days=7):
|
||||
"""High-level activity summary."""
|
||||
from .models import FilamentUsage, Printer, PrintJob
|
||||
|
||||
cutoff = timezone.now() - timedelta(days=int(days))
|
||||
jobs = PrintJob.objects.filter(start_time__gte=cutoff)
|
||||
|
||||
total = jobs.count()
|
||||
finished = jobs.filter(final_status="FINISH").count()
|
||||
failed = jobs.filter(final_status="FAILED").count()
|
||||
cancelled = jobs.filter(final_status="CANCELLED").count()
|
||||
in_progress = jobs.filter(final_status__isnull=True).count()
|
||||
|
||||
total_minutes = jobs.filter(duration_minutes__isnull=False).aggregate(
|
||||
total=Sum("duration_minutes")
|
||||
)["total"] or 0
|
||||
|
||||
total_grams = FilamentUsage.objects.filter(
|
||||
print_job__start_time__gte=cutoff,
|
||||
consumed_grams__isnull=False,
|
||||
).aggregate(total=Sum("consumed_grams"))["total"] or 0
|
||||
|
||||
lines = [f"# Printing Summary (last {days} days)", ""]
|
||||
lines.append(f"**Total Jobs**: {total}")
|
||||
lines.append(f"- Completed: {finished}")
|
||||
lines.append(f"- Failed: {failed}")
|
||||
lines.append(f"- Cancelled: {cancelled}")
|
||||
lines.append(f"- In Progress: {in_progress}")
|
||||
if total > 0:
|
||||
lines.append(f"- Success Rate: {100 * finished // total}%")
|
||||
lines.append(f"\n**Total Print Time**: {_format_duration(total_minutes)}")
|
||||
lines.append(f"**Total Filament Used**: {total_grams}g")
|
||||
|
||||
# Most printed projects
|
||||
top_projects = (
|
||||
jobs.values("project_name")
|
||||
.annotate(count=Count("id"))
|
||||
.order_by("-count")[:5]
|
||||
)
|
||||
if top_projects:
|
||||
lines.append("\n### Most Printed")
|
||||
for p in top_projects:
|
||||
lines.append(f"- {p['project_name']} ({p['count']}x)")
|
||||
|
||||
# Active printers
|
||||
active_printers = Printer.objects.filter(
|
||||
print_jobs__start_time__gte=cutoff
|
||||
).distinct()
|
||||
if active_printers.exists():
|
||||
lines.append(f"\n**Active Printers**: {', '.join(p.name for p in active_printers)}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def find_compatible_filament(type, min_remaining_percent=10, color=None):
|
||||
"""Find spools matching material type criteria."""
|
||||
from .models import Filament
|
||||
|
||||
qs = Filament.objects.filter(
|
||||
type__iexact=type,
|
||||
remaining_percent__gte=int(min_remaining_percent),
|
||||
)
|
||||
if color:
|
||||
qs = qs.filter(color__icontains=color)
|
||||
|
||||
filaments = qs[:20]
|
||||
if not filaments:
|
||||
return f"No {type} filament found with >={min_remaining_percent}% remaining."
|
||||
|
||||
lines = [f"# Compatible Filament: {type}", ""]
|
||||
if color:
|
||||
lines.append(f"*Color filter: {color}*\n")
|
||||
lines.append(f"*{qs.count()} spools found*\n")
|
||||
lines.append("| ID | Brand | Sub-type | Color | Remaining | In AMS |")
|
||||
lines.append("|----|-------|----------|-------|-----------|--------|")
|
||||
for f in filaments:
|
||||
lines.append(
|
||||
f"| {f.id} | {f.brand} | {f.sub_type or f.type} | "
|
||||
f"{f.color} | {f.remaining_percent}% | "
|
||||
f"{'Yes' if f.is_loaded_in_ams else 'No'} |"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ─── Resources ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def resource_printers():
|
||||
"""List all printers (resource)."""
|
||||
return list_printers()
|
||||
|
||||
|
||||
def resource_printer_status(printer_id):
|
||||
"""Latest printer status (resource)."""
|
||||
return get_printer_status(printer_id=printer_id)
|
||||
|
||||
|
||||
def resource_filaments():
|
||||
"""Full filament inventory (resource)."""
|
||||
return list_filaments()
|
||||
|
||||
|
||||
def resource_filament_detail(filament_id):
|
||||
"""Single spool with usage (resource)."""
|
||||
return get_filament_detail(filament_id=filament_id)
|
||||
|
||||
|
||||
def resource_recent_print_jobs():
|
||||
"""Last 20 print jobs (resource)."""
|
||||
return get_print_history(limit=20)
|
||||
|
||||
|
||||
def resource_filament_types():
|
||||
"""Filament type registry (resource)."""
|
||||
from .models import FilamentType
|
||||
|
||||
types = FilamentType.objects.all()
|
||||
if not types.exists():
|
||||
return "No filament types registered."
|
||||
|
||||
lines = ["# Filament Types", ""]
|
||||
lines.append("| ID | Type | Sub-type | Brand |")
|
||||
lines.append("|----|------|----------|-------|")
|
||||
for t in types:
|
||||
lines.append(f"| {t.id} | {t.type} | {t.sub_type or '-'} | {t.brand} |")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def resource_filament_colors():
|
||||
"""Filament color database (resource)."""
|
||||
from .models import FilamentColor
|
||||
|
||||
colors = FilamentColor.objects.all()[:100]
|
||||
if not colors:
|
||||
return "No filament colors in database."
|
||||
|
||||
lines = ["# Filament Colors", ""]
|
||||
lines.append(f"*Showing up to 100 of {FilamentColor.objects.count()}*\n")
|
||||
lines.append("| Color | Hex | Type | Sub-type | Brand |")
|
||||
lines.append("|-------|-----|------|----------|-------|")
|
||||
for c in colors:
|
||||
lines.append(
|
||||
f"| {c.color_name} | #{c.color_code} | {c.filament_type} | "
|
||||
f"{c.filament_sub_type or '-'} | {c.brand} |"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ─── Prompts ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def prompt_printer_check_in(printer_id=None):
|
||||
"""Full status briefing: status + health + recent prints."""
|
||||
parts = [
|
||||
get_printer_status(printer_id=printer_id),
|
||||
get_printer_health(printer_id=printer_id),
|
||||
get_print_history(days=1, limit=5),
|
||||
]
|
||||
return "\n\n---\n\n".join(parts)
|
||||
|
||||
|
||||
def prompt_filament_inventory_report():
|
||||
"""Inventory report with low-stock warnings."""
|
||||
from .models import Filament
|
||||
|
||||
low_stock = Filament.objects.filter(remaining_percent__lte=20)
|
||||
parts = [list_filaments()]
|
||||
if low_stock.exists():
|
||||
lines = ["\n## Low Stock Warnings"]
|
||||
for f in low_stock:
|
||||
lines.append(f"- **{f.brand} {f.type} {f.color}**: {f.remaining_percent}% remaining")
|
||||
parts.append("\n".join(lines))
|
||||
return "\n\n".join(parts)
|
||||
|
||||
|
||||
def prompt_print_job_review(job_id):
|
||||
"""Review a completed job."""
|
||||
return get_print_job_detail(job_id)
|
||||
|
||||
|
||||
def prompt_weekly_digest():
|
||||
"""Weekly activity summary."""
|
||||
parts = [
|
||||
get_printing_summary(days=7),
|
||||
get_filament_usage_stats(days=7, group_by="type"),
|
||||
]
|
||||
return "\n\n---\n\n".join(parts)
|
||||
|
||||
|
||||
def prompt_troubleshoot_printer(printer_id=None):
|
||||
"""Diagnose issues from recent data."""
|
||||
parts = [
|
||||
get_printer_health(printer_id=printer_id),
|
||||
get_printer_status(printer_id=printer_id),
|
||||
get_temperature_history(printer_id=printer_id, hours=2),
|
||||
]
|
||||
return "\n\n---\n\n".join(parts)
|
||||
Reference in New Issue
Block a user