Appearance
API Endpoints
◄ Anterior: Base de Datos | Índice | Siguiente: Handlers ►
Tabla de Contenidos
- POST /api/jobs/{type}
- GET /api/jobs/{id}
- GET /api/jobs/{id}/stream (Fase 2)
- GET /api/notifications
- PATCH /api/notifications/:id/read
- Flujos de Ejecución
POST /api/jobs/
Descripción: Despachar nuevo job para ejecución asíncrona
Path Params:
type(string): Tipo de job (debe tener handler registrado)
Headers:
Authorization: Bearer {jwt}(requerido)X-Schema: {schema}(requerido)
Request Body:
json
{
"payload": {
"campo1": "valor",
"campo2": 123
}
}Response (202 Accepted):
json
{
"status": "accepted",
"job_id": 123,
"message": "Job creado, se ejecutará en segundo plano"
}Códigos de respuesta:
- 202 Accepted: Job creado exitosamente
- 400 Bad Request: Payload inválido
- 422 Unprocessable Entity: Tipo de job no existe
- 429 Too Many Requests: Usuario excede límite de jobs pendientes
Ejemplo con curl:
bash
curl -X POST http://localhost/api/jobs/batch_invoicing \
-H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGc..." \
-H "X-Schema: suc0001" \
-H "Content-Type: application/json" \
-d '{
"payload": {
"cliente_ids": [1, 2, 3],
"fecha": "2026-02-05"
}
}'GET /api/jobs/
Descripción: Consultar estado de job
Path Params:
id(int): ID del job
Headers:
Authorization: Bearer {jwt}(requerido)
Response (200 OK):
json
{
"status": "success",
"data": {
"id": 123,
"type": "batch_invoicing",
"status": "completed",
"payload": {
"cliente_ids": [1, 2, 3],
"fecha": "2026-02-05"
},
"result": {
"facturas_creadas": 3,
"monto_total": 3000.00,
"factura_ids": [101, 102, 103],
"errores": []
},
"error": null,
"created_at": "2026-02-05T10:00:00Z",
"started_at": "2026-02-05T10:00:05Z",
"completed_at": "2026-02-05T10:05:30Z",
"execution_time_seconds": 325
}
}Estados posibles:
pending: Job en cola, aún no iniciórunning: Job ejecutándose actualmentecompleted: Job terminó exitosamente (verresult)failed: Job falló (vererror)
Códigos de respuesta:
- 200 OK: Job encontrado
- 404 Not Found: Job no existe o no pertenece al usuario
Ejemplo polling en JavaScript:
javascript
async function waitForJob(jobId) {
while (true) {
const response = await fetch(`/api/jobs/${jobId}`);
const data = await response.json();
if (data.data.status === 'completed') {
return data.data.result;
}
if (data.data.status === 'failed') {
throw new Error(data.data.error);
}
await new Promise(resolve => setTimeout(resolve, 2000)); // 2s
}
}GET /api/jobs/{id}/stream (Fase 2)
Descripción: SSE endpoint para recibir actualizaciones en tiempo real
Path Params:
id(int): ID del job
Query Params:
token(string): JWT token (workaround para auth, EventSource no soporta headers)
Headers:
- Ninguno (autenticación via query param)
Response: Server-Sent Events (SSE)
Event types:
job_status: Actualización de estado
event: job_status data: {"id": 123, "status": "running"}job_completed: Job terminado exitosamente
event: job_completed data: { "id": 123, "status": "completed", "result": { "facturas_creadas": 3, "monto_total": 3000.00 } }job_failed: Job falló
event: job_failed data: { "id": 123, "status": "failed", "error": "Cliente ID 999 no encontrado" }
Ejemplo con EventSource:
javascript
function streamJob(jobId, token) {
const eventSource = new EventSource(
`/api/jobs/${jobId}/stream?token=${token}`
);
eventSource.addEventListener('job_completed', (event) => {
const data = JSON.parse(event.data);
console.log('Completado:', data.result);
eventSource.close();
});
eventSource.addEventListener('job_failed', (event) => {
const data = JSON.parse(event.data);
console.error('Falló:', data.error);
eventSource.close();
});
eventSource.onerror = () => {
console.error('Error en conexión SSE');
eventSource.close();
};
}GET /api/notifications
Descripción: Listar notificaciones del usuario
Query Params:
unread(bool): Solo no leídas (opcional)
Response:
json
{
"status": "success",
"data": [
{
"id": 456,
"type": "success",
"title": "Facturación completada",
"message": "Se crearon 50 facturas exitosamente",
"metadata": {
"job_id": 123,
"result": { "facturas_creadas": 50 }
},
"is_read": false,
"created_at": "2026-02-05T10:05:30Z"
}
],
"meta": {
"total": 10,
"total_unread": 1
}
}PATCH /api/notifications/:id/read
Descripción: Marcar notificación como leída
Path Params:
id(int): ID de la notificación
Response: 204 No Content
Efecto:
is_read= TRUEread_at= NOW()
Flujos de Ejecución
Despacho Asíncrono (POST /jobs/{type})
Actores:
- Usuario (Frontend)
- JobController
- JobDispatcher
- JobRepository
- OS (exec)
Flujo:
Request HTTP:
httpPOST /api/jobs/batch_invoicing X-Schema: suc0001 Authorization: Bearer {jwt} { "payload": { "cliente_ids": [1, 2, 3], "fecha": "2026-02-05" } }JobController::dispatch():
- Extraer user_id del JWT
- Extraer schema del header X-Schema
- Validar payload estructural
- Delegar a JobDispatcher
JobDispatcher::dispatch():
- Verificar límite de jobs pendientes:php
if ($this->repo->countPendingByUser($userId) >= 10) { throw new TooManyJobsException(); } - Crear BackgroundJob:php
$job = new BackgroundJob( type: 'batch_invoicing', status: 'pending', payload: $payload, user_id: $userId, schema: $schema ); - Persistir:
$jobId = $this->repo->create($job); - Lanzar worker:php
$command = sprintf( "php %s/cli/background-worker.php %d > /dev/null 2>&1 &", PROJECT_ROOT, $jobId ); exec($command);
- Verificar límite de jobs pendientes:
Response HTTP 202 Accepted:
json{ "status": "accepted", "job_id": 123, "message": "Job creado, se ejecutará en segundo plano" }
Tiempo total: ~50-200ms (NO espera al worker)
Ejecución en Background (CLI Worker)
Actores:
- background-worker.php (proceso independiente)
- JobExecutor
- Handler (ej: BatchInvoicingJobHandler)
- Service (ej: FacturaService)
- NotificationService
Flujo:
Inicio del worker:
bashphp cli/background-worker.php 123 > /dev/null 2>&1 &Cargar job:
php$job = $this->repo->findById($jobId); if (!$job) { throw new JobNotFoundException(); }Actualizar a running:
php$job->status = 'running'; $job->started_at = date('Y-m-d H:i:s'); $this->repo->update($job);Configurar schema (CRÍTICO):
php$this->connectionManager->setSearchPath($job->schema); // Ahora todas las queries ejecutarán en el schema correctoObtener handler:
php$handler = $this->handlers[$job->type] ?? null; if (!$handler) { throw new NoHandlerException(); }Ejecutar handler:
phptry { $result = $handler->handle($job->payload); $job->status = 'completed'; $job->result = $result; $job->completed_at = date('Y-m-d H:i:s'); } catch (Exception $e) { $job->status = 'failed'; $job->error = $e->getMessage(); $job->completed_at = date('Y-m-d H:i:s'); }Actualizar job final:
php$this->repo->update($job);Crear notificación:
php$this->notificationService->createFromJobResult($job);Exit:
phpexit($job->isFailed() ? 1 : 0);
Tiempo total: Variable (segundos a minutos, dependiendo del handler)
Consulta de Estado (Polling HTTP - Fase 1)
Frontend:
javascript
async function pollJobStatus(jobId) {
const interval = 2000; // 2 segundos
const poll = setInterval(async () => {
const response = await fetch(`/api/jobs/${jobId}`);
const data = await response.json();
if (data.data.status === 'completed') {
console.log('Job completado:', data.data.result);
clearInterval(poll);
showNotification('success', 'Operación completada');
}
if (data.data.status === 'failed') {
console.error('Job falló:', data.data.error);
clearInterval(poll);
showNotification('error', 'Operación falló');
}
}, interval);
}Ventajas:
- ✅ Simple de implementar
- ✅ Compatible con todos los navegadores
- ✅ No requiere conexión persistente
Desventajas:
- ❌ Latencia: usuario espera hasta próximo poll
- ❌ Overhead: requests aunque job no haya cambiado
- ❌ No escala bien con muchos jobs concurrentes
SSE con PostgreSQL NOTIFY (Fase 2)
Frontend:
javascript
function streamJobStatus(jobId) {
const eventSource = new EventSource(`/api/jobs/${jobId}/stream`);
eventSource.addEventListener('job_completed', (event) => {
const data = JSON.parse(event.data);
console.log('Job completado:', data.result);
eventSource.close();
showNotification('success', 'Operación completada');
});
eventSource.addEventListener('job_failed', (event) => {
const data = JSON.parse(event.data);
console.error('Job falló:', data.error);
eventSource.close();
showNotification('error', 'Operación falló');
});
}Backend (JobStreamController):
php
public function stream(ServerRequestInterface $request, ResponseInterface $response, array $args): ResponseInterface
{
$jobId = (int) $args['id'];
// Configurar SSE headers
$response = $response
->withHeader('Content-Type', 'text/event-stream')
->withHeader('Cache-Control', 'no-cache')
->withHeader('X-Accel-Buffering', 'no');
// Escuchar canal PostgreSQL
$pdo = $this->connectionManager->get('oficial')->getConnection();
$pdo->exec("LISTEN job_updates_{$jobId}");
// Stream events
while (true) {
$notification = pg_get_notify($pdo);
if ($notification) {
$payload = json_decode($notification['payload'], true);
$response->getBody()->write("data: " . json_encode($payload) . "\n\n");
ob_flush(); flush();
if (in_array($payload['status'], ['completed', 'failed'])) {
break; // Cerrar stream
}
}
usleep(100000); // 100ms
}
return $response;
}Ventajas:
- ✅ Latencia mínima (notificación instantánea)
- ✅ CERO overhead cuando job no cambia
- ✅ Escala mejor (conexión persistente reutilizable)
Desventajas:
- ❌ Más complejo de implementar
- ❌ Requiere soporte de servidor para long-polling
- ❌ EventSource no soporta custom headers (workaround: query param para auth)