339 lines
9.5 KiB
TypeScript
339 lines
9.5 KiB
TypeScript
/**
|
||
* Render Queue — Server-side job queue for Bradly rendering.
|
||
*
|
||
* Features:
|
||
* - In-memory job queue with concurrent rendering limit
|
||
* - SSE (Server-Sent Events) for real-time progress
|
||
* - Support for video (MP4/WebM) and image (PNG/JPEG) export
|
||
* - Job lifecycle: queued → rendering → done / error
|
||
*
|
||
* Uses Puppeteer + FFmpeg instead of @remotion/renderer.
|
||
*/
|
||
import path from 'path';
|
||
import fs from 'fs';
|
||
import crypto from 'crypto';
|
||
import { renderFrames, renderStill } from '../engine/renderer/puppeteerRenderer';
|
||
import { encodeVideo, cleanupFrames, AudioTrackInput } from '../engine/renderer/videoEncoder';
|
||
|
||
// ═══ Types ═══
|
||
|
||
export type RenderFormat = 'mp4' | 'webm' | 'gif' | 'png' | 'jpeg';
|
||
|
||
export interface RenderJob {
|
||
id: string;
|
||
status: 'queued' | 'rendering' | 'done' | 'error';
|
||
progress: number; // 0-100
|
||
format: RenderFormat;
|
||
width: number;
|
||
height: number;
|
||
fps: number;
|
||
durationInFrames: number;
|
||
compositionId: string;
|
||
brandId?: string;
|
||
inputProps: Record<string, any>;
|
||
targetPath?: string;
|
||
outputPath?: string;
|
||
downloadUrl?: string;
|
||
error?: string;
|
||
createdAt: number;
|
||
startedAt?: number;
|
||
completedAt?: number;
|
||
renderedFrames?: number;
|
||
totalFrames?: number;
|
||
quality?: 'draft' | 'standard' | 'high' | 'ultra';
|
||
estimatedSizeMB?: number;
|
||
priority?: number; // Higher = process first
|
||
fileSizeBytes?: number; // Actual output file size
|
||
}
|
||
|
||
export interface RenderJobCreateParams {
|
||
format: RenderFormat;
|
||
width: number;
|
||
height: number;
|
||
fps: number;
|
||
durationInFrames: number;
|
||
compositionId: string;
|
||
brandId?: string;
|
||
inputProps: Record<string, any>;
|
||
targetPath?: string;
|
||
}
|
||
|
||
// ═══ Constants ═══
|
||
|
||
const MAX_CONCURRENT = 1; // Rendering is CPU-intensive
|
||
const RENDERS_DIR = process.env.BRADLY_RENDERS_DIR || path.join(process.cwd(), 'renders');
|
||
const UPLOADS_DIR = process.env.BRADLY_UPLOADS_DIR || path.join(process.cwd(), 'uploads');
|
||
|
||
// Default serve URL for the running app (dev server or built app)
|
||
const DEFAULT_SERVE_URL = process.env.BRADLY_SERVE_URL || 'http://localhost:5173';
|
||
|
||
// Ensure renders directory exists
|
||
if (!fs.existsSync(RENDERS_DIR)) {
|
||
fs.mkdirSync(RENDERS_DIR, { recursive: true });
|
||
}
|
||
|
||
// ═══ State ═══
|
||
|
||
const jobs = new Map<string, RenderJob>();
|
||
const sseClients = new Map<string, Set<(data: string) => void>>();
|
||
let activeRenders = 0;
|
||
|
||
// ═══ SSE Helpers ═══
|
||
|
||
function broadcastJobUpdate(job: RenderJob) {
|
||
const data = JSON.stringify({
|
||
type: 'job-update',
|
||
job: sanitizeJob(job),
|
||
});
|
||
|
||
// Broadcast to all connected SSE clients
|
||
for (const [, clients] of sseClients) {
|
||
for (const send of clients) {
|
||
send(data);
|
||
}
|
||
}
|
||
}
|
||
|
||
function sanitizeJob(job: RenderJob): Omit<RenderJob, 'inputProps'> & { inputProps?: undefined } {
|
||
// Don't send inputProps over SSE (too large)
|
||
const { inputProps, ...rest } = job;
|
||
return rest;
|
||
}
|
||
|
||
export function addSSEClient(clientId: string, send: (data: string) => void): () => void {
|
||
if (!sseClients.has(clientId)) {
|
||
sseClients.set(clientId, new Set());
|
||
}
|
||
sseClients.get(clientId)!.add(send);
|
||
|
||
// Return cleanup function
|
||
return () => {
|
||
const clients = sseClients.get(clientId);
|
||
if (clients) {
|
||
clients.delete(send);
|
||
if (clients.size === 0) sseClients.delete(clientId);
|
||
}
|
||
};
|
||
}
|
||
|
||
// ═══ Job Management ═══
|
||
|
||
export function createJob(params: RenderJobCreateParams): RenderJob {
|
||
const job: RenderJob = {
|
||
id: crypto.randomUUID(),
|
||
status: 'queued',
|
||
progress: 0,
|
||
...params,
|
||
createdAt: Date.now(),
|
||
totalFrames: params.format === 'png' || params.format === 'jpeg' ? 1 : params.durationInFrames,
|
||
renderedFrames: 0,
|
||
};
|
||
|
||
jobs.set(job.id, job);
|
||
broadcastJobUpdate(job);
|
||
processQueue(); // Try to start rendering immediately
|
||
|
||
return job;
|
||
}
|
||
|
||
export function getJob(id: string): RenderJob | undefined {
|
||
return jobs.get(id);
|
||
}
|
||
|
||
export function getAllJobs(): RenderJob[] {
|
||
return Array.from(jobs.values())
|
||
.sort((a, b) => b.createdAt - a.createdAt);
|
||
}
|
||
|
||
export function deleteJob(id: string): boolean {
|
||
const job = jobs.get(id);
|
||
if (!job) return false;
|
||
|
||
// Clean up output file if it exists
|
||
if (job.outputPath && fs.existsSync(job.outputPath)) {
|
||
try { fs.unlinkSync(job.outputPath); } catch {}
|
||
}
|
||
|
||
jobs.delete(id);
|
||
broadcastJobUpdate({ ...job, status: 'error', error: 'Deleted' });
|
||
return true;
|
||
}
|
||
|
||
// ═══ Queue Processing ═══
|
||
|
||
async function processQueue() {
|
||
if (activeRenders >= MAX_CONCURRENT) return;
|
||
|
||
// Pick highest priority queued job
|
||
const queuedJobs = Array.from(jobs.values())
|
||
.filter(j => j.status === 'queued')
|
||
.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
|
||
const nextJob = queuedJobs[0];
|
||
if (!nextJob) return;
|
||
|
||
activeRenders++;
|
||
nextJob.status = 'rendering';
|
||
nextJob.startedAt = Date.now();
|
||
broadcastJobUpdate(nextJob);
|
||
|
||
try {
|
||
await renderJob(nextJob);
|
||
} catch (err: any) {
|
||
nextJob.status = 'error';
|
||
nextJob.error = err.message || 'Unknown render error';
|
||
nextJob.completedAt = Date.now();
|
||
broadcastJobUpdate(nextJob);
|
||
console.error(`❌ Render failed [${nextJob.id}]:`, err);
|
||
} finally {
|
||
activeRenders--;
|
||
processQueue(); // Process next in queue
|
||
}
|
||
}
|
||
|
||
async function renderJob(job: RenderJob): Promise<void> {
|
||
const serveUrl = process.env.BRADLY_SERVE_URL || DEFAULT_SERVE_URL;
|
||
const isStill = job.format === 'png' || job.format === 'jpeg';
|
||
const ext = job.format;
|
||
const outputPath = job.targetPath || path.join(RENDERS_DIR, `${job.id}.${ext}`);
|
||
|
||
// Ensure the directory for the target path exists
|
||
const targetDir = path.dirname(outputPath);
|
||
if (!fs.existsSync(targetDir)) {
|
||
fs.mkdirSync(targetDir, { recursive: true });
|
||
}
|
||
|
||
console.log(`🎬 Rendering [${job.id}] → ${job.format} (${job.width}×${job.height})`);
|
||
|
||
if (isStill) {
|
||
// ── Still image render ──
|
||
await renderStill({
|
||
serveUrl,
|
||
inputProps: job.inputProps,
|
||
width: job.width,
|
||
height: job.height,
|
||
outputPath,
|
||
imageFormat: job.format as 'png' | 'jpeg',
|
||
});
|
||
|
||
job.progress = 100;
|
||
job.renderedFrames = 1;
|
||
broadcastJobUpdate(job);
|
||
} else {
|
||
// ── Video render ──
|
||
// Step 1: Capture all frames as images
|
||
const framesDir = path.join(RENDERS_DIR, `${job.id}-frames`);
|
||
|
||
await renderFrames({
|
||
serveUrl,
|
||
inputProps: job.inputProps,
|
||
width: job.width,
|
||
height: job.height,
|
||
fps: job.fps,
|
||
durationInFrames: job.durationInFrames,
|
||
framesDir,
|
||
imageFormat: 'png',
|
||
onProgress: (rendered, total) => {
|
||
// Frame capture is ~70% of the work
|
||
const progress = Math.round((rendered / total) * 70);
|
||
job.progress = Math.min(progress, 69);
|
||
job.renderedFrames = rendered;
|
||
broadcastJobUpdate(job);
|
||
},
|
||
});
|
||
|
||
// Step 2: Encode frames to video with FFmpeg
|
||
job.progress = 70;
|
||
broadcastJobUpdate(job);
|
||
|
||
const codec = job.format === 'webm' ? 'vp8' as const : 'h264' as const;
|
||
|
||
// Extract audio tracks
|
||
const audioTracks: AudioTrackInput[] = [];
|
||
|
||
// Helper to resolve audio URLs for FFmpeg
|
||
const resolveAudioUrl = (url: string | undefined): string | null => {
|
||
if (!url || typeof url !== 'string') return null;
|
||
if (url.startsWith('/api/media/')) {
|
||
const filename = url.replace('/api/media/', '');
|
||
return path.join(UPLOADS_DIR, filename);
|
||
}
|
||
if (url.startsWith('http')) {
|
||
return url;
|
||
}
|
||
return null;
|
||
};
|
||
|
||
// 1. Brand audio
|
||
const brandAudioUrl = resolveAudioUrl(job.inputProps.designMD?.brandAudioUrl);
|
||
if (brandAudioUrl) {
|
||
audioTracks.push({
|
||
url: brandAudioUrl,
|
||
startFrame: 0,
|
||
volume: job.inputProps.designMD?.brandAudioVolume ?? 1,
|
||
});
|
||
}
|
||
|
||
// 2. Timeline elements
|
||
const elements = job.inputProps.timelineElements || [];
|
||
elements.forEach((el: any) => {
|
||
if ((el.type === 'audio' || el.type === 'video') && el.content) {
|
||
const audioUrl = resolveAudioUrl(el.content);
|
||
if (audioUrl) {
|
||
audioTracks.push({
|
||
url: audioUrl,
|
||
startFrame: el.startFrame || 0,
|
||
volume: el.volume ?? 1,
|
||
});
|
||
}
|
||
}
|
||
});
|
||
|
||
await encodeVideo({
|
||
framesDir,
|
||
framePattern: 'frame-%06d.png',
|
||
outputPath,
|
||
fps: job.fps,
|
||
codec,
|
||
audioTracks,
|
||
durationInFrames: job.durationInFrames,
|
||
onProgress: (percent) => {
|
||
// Encoding is ~30% of the work
|
||
job.progress = 70 + Math.round(percent * 0.29);
|
||
broadcastJobUpdate(job);
|
||
},
|
||
});
|
||
|
||
// Step 3: Cleanup frame images
|
||
cleanupFrames(framesDir);
|
||
}
|
||
|
||
job.status = 'done';
|
||
job.progress = 100;
|
||
job.completedAt = Date.now();
|
||
job.outputPath = outputPath;
|
||
job.downloadUrl = `/api/renders/${job.id}.${ext}`;
|
||
|
||
// Capture file size
|
||
try {
|
||
const stats = fs.statSync(outputPath);
|
||
job.fileSizeBytes = stats.size;
|
||
} catch {}
|
||
|
||
broadcastJobUpdate(job);
|
||
|
||
const elapsed = ((job.completedAt - (job.startedAt ?? job.createdAt)) / 1000).toFixed(1);
|
||
const sizeMB = job.fileSizeBytes ? (job.fileSizeBytes / (1024 * 1024)).toFixed(1) + 'MB' : 'unknown';
|
||
console.log(`✅ Render complete [${job.id}] in ${elapsed}s (${sizeMB}) → ${job.downloadUrl}`);
|
||
}
|
||
|
||
// ═══ Cleanup (auto-delete old renders after 1 hour) ═══
|
||
|
||
setInterval(() => {
|
||
const oneHourAgo = Date.now() - 60 * 60 * 1000;
|
||
for (const [id, job] of jobs) {
|
||
if (job.status === 'done' && job.completedAt && job.completedAt < oneHourAgo) {
|
||
deleteJob(id);
|
||
}
|
||
}
|
||
}, 10 * 60 * 1000); // Check every 10 minutes
|