Está bien, amigos, Leo Grant aquí, de vuelta de un fin de semana lleno de cafeína en el que me sumergí en lo que está sucediendo en el mundo de los agentes. Ya me conocen, no soy de aquellos que solo hacen teoría. Me gusta ensuciarme las manos, romper cosas y luego construir algo útil. Y últimamente, mis manos han estado manchadas con un tono particular de “orquestación de agentes” – específicamente, el dolor de cabeza y el triunfo de gestionar agentes duraderos y con estado de larga duración.
Todos hemos estado allí. Creas un agente genial, tal vez scrapea algunas páginas, envía un correo electrónico, actualiza una base de datos. Funciona. Te das una palmadita en la espalda. Luego alguien pregunta: “¿Puede hacer eso durante una semana? ¿O un mes? ¿Y si se cae? ¿O necesita esperar la entrada externa? ¿O tiene que coordinarse con tres otros agentes?” De repente, tu elegante script comienza a parecerse a una casa de cartas en un huracán. Ahí es donde chocamos con la pared de la ejecución de tareas simples y entramos en el turbio, a menudo frustrante, pero finalmente gratificante territorio de la gestión del estado de agentes y la ejecución durable.
Hoy, quiero hablar sobre algo que ha sido un salvavidas para mí en este escenario específico: construir agentes verdaderamente duraderos utilizando una combinación de colas de mensajes y almacenes de estado persistente. Olvida los términos de moda, olvida el bombo publicitario. Esto se trata de ingeniería práctica para agentes que no solo *corren*, sino que *perduran*.
El Problema: Los Agentes Son Criaturas Caprichosas
Mi primera gran lección en durabilidad de agentes vino de un proyecto el año pasado. Estábamos construyendo un agente que monitorizaba un tipo específico de publicación en redes sociales, extraía algunos datos, los enriquecía con APIs externas y luego desencadenaba una serie de acciones – piensa en actualizaciones de CRM, notificaciones de Slack e incluso redactar correos electrónicos de seguimiento. Suena sencillo, ¿verdad?
La versión inicial era un script de Python usando un cliente HTTP simple y algo de procesamiento local. Funcionaba genial durante unas horas. Luego la API de redes sociales nos limitó la tasa. La API de enriquecimiento externo se ahogó con una solicitud malformada. Nuestro token de Slack expiró. Cada fallo significaba que todo el proceso se detenía por completo. Perdimos estado, tuvimos que reiniciar manualmente y a menudo teníamos que reprocesar cosas, lo que llevó a duplicados y eventos perdidos.
Fue una pesadilla de intervención manual. Mi colega, Sarah, bendita sea, pasó más tiempo cuidando ese agente que en el desarrollo real. Este es el problema clásico: los agentes, por su naturaleza, a menudo interactúan con sistemas externos poco fiables, requieren esperas y realizan operaciones complejas y en múltiples pasos. Tratarles como simples funciones atómicas es una receta para el desastre.
La Solución: Abrazar la Durabilidad Desde el Principio
El cambio llegó cuando dejamos de pensar en nuestro agente como un único script monolítico y comenzamos a verlo como una serie de pasos interconectados y resilientes. Esto nos llevó por el camino de las colas de mensajes y el estado persistente. Así es como lo desglosamos:
Paso 1: Desacoplamiento con Colas de Mensajes
El primer y más crucial paso fue introducir una cola de mensajes. Usamos RabbitMQ, pero Kafka, SQS o incluso Redis Streams funcionarían igual de bien. La idea es simple: en lugar de que una parte del agente llame directamente a otra, se comunican enviando mensajes a una cola. Esto logra varias cosas:
- Procesamiento Asíncrono: Si un paso tarda mucho, no bloquea al siguiente.
- Almacenamiento en Búfer: Picos en los datos entrantes no abruman a los servicios posteriores.
- Reintentos: Si un consumidor falla, el mensaje se puede reencolar y volver a intentar más tarde.
- Escalabilidad: Puedes activar más consumidores para una cola según sea necesario.
En nuestro agente de redes sociales, la parte de “monitorización” ya no llamaba directamente al “enriquecedor.” En su lugar, empujó los nuevos datos de publicación a una cola incoming_posts. Un “trabajador” de “enriquecimiento” separado consumía de esta cola, procesaba los datos y luego empujaba los datos enriquecidos a una cola enriched_posts. Esta cadena continuó.
Aquí tienes un ejemplo simplificado de Python usando pika para RabbitMQ. Esto es solo para demostración; en una aplicación real, envolverías esto en un manejo adecuado de errores y gestión de conexiones.
# Productor (por ejemplo, la parte "monitor" de tu agente)
import pika
import json
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='incoming_posts', durable=True)
def publish_post(post_data):
message = json.dumps(post_data)
channel.basic_publish(
exchange='',
routing_key='incoming_posts',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(f" [x] Enviado '{message}'")
# Ejemplo de uso
for i in range(5):
post = {"id": f"post_{i}", "content": f"Nueva publicación interesante {i}", "timestamp": time.time()}
publish_post(post)
time.sleep(1)
connection.close()
# Consumidor (por ejemplo, el "trabajador de enriquecimiento")
import pika
import json
import time
def callback(ch, method, properties, body):
data = json.loads(body)
print(f" [x] Recibido {data['id']}")
# Simular algo de trabajo que podría fallar o llevar tiempo
try:
# Imaginar llamar a una API externa aquí
enriched_data = data.copy()
enriched_data['enriched_status'] = 'procesado'
enriched_data['processed_at'] = time.time()
# Reconocer el mensaje SOLO DESPUÉS de un procesamiento exitoso
ch.basic_ack(delivery_tag=method.delivery_tag)
# Ahora, empujar a la siguiente cola
publish_enriched_post(enriched_data)
except Exception as e:
print(f" [!] Error procesando {data['id']}: {e}")
# Reencolar el mensaje para un nuevo intento
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print(f" [x] Hecho con {data['id']}")
# Configuración para publicar en la siguiente cola (simplificada, normalmente una conexión separada)
connection_out = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel_out = connection_out.channel()
channel_out.queue_declare(queue='enriched_posts', durable=True)
def publish_enriched_post(enriched_data):
message = json.dumps(enriched_data)
channel_out.basic_publish(
exchange='',
routing_key='enriched_posts',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(f" [x] Enviado publicación enriquecida '{enriched_data['id']}'")
connection_in = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel_in = connection_in.channel()
channel_in.queue_declare(queue='incoming_posts', durable=True)
channel_in.basic_consume(queue='incoming_posts', on_message_callback=callback)
print(' [*] Esperando mensajes. Para salir presiona CTRL+C')
channel_in.start_consuming()
connection_in.close()
connection_out.close()
La clave aquí es durable=True para la cola y PERSISTENT_DELIVERY_MODE para los mensajes. Esto significa que incluso si RabbitMQ falla, los mensajes aún estarán allí cuando vuelva a estar operativo. Y basic_ack / basic_nack son críticos: solo reconoce un mensaje cuando realmente has terminado con él. Si tu consumidor falla antes de reconocerlo, el mensaje será entregado nuevamente.
Paso 2: Estado Persistente para Procesos de Larga Duración
Las colas resuelven el problema de “¿qué pasa si un paso falla?” permitiendo reintentos y desacoplamiento. Pero ¿qué hay del problema de “¿cuál es el estado actual de esta tarea en general?”? ¿Qué pasa si un agente necesita esperar la entrada de un humano, o por un sistema externo para completar un proceso de larga duración? Ahí es donde entra el estado persistente.
Mi pensamiento inicial fue simplemente pasar todo el estado en la carga útil del mensaje. Gran error. Las cargas útiles de mensaje pueden hacerse enormes, y replicar todos esos datos en muchas colas es ineficiente. Más importante aún, necesitas una sola fuente de verdad para el estado de todo el flujo de trabajo.
Introdujimos una base de datos PostgreSQL como nuestro almacén central de estado. Para cada publicación de redes sociales que ingresaba en nuestro sistema, creamos un registro en una tabla workflow_states. Este registro contenía el ID único de la publicación y su estado actual (por ejemplo, RECEIVED, ENRICHING, ENRICHED, ACTION_TRIGGERED, WAITING_FOR_REVIEW, COMPLETED). También almacenaba metadatos, como marcas de tiempo para cada cambio de estado y cualquier dato intermedio relevante.
Cada trabajador en nuestra canalización de agentes:
- Consumiría un mensaje de su cola de entrada.
- Buscaría el estado de flujo de trabajo correspondiente en la base de datos.
- Realizaría su trabajo.
- Actualizaría el estado de flujo de trabajo en la base de datos (por ejemplo, de
ENRICHINGaENRICHED). - Produciría un nuevo mensaje para la siguiente cola, conteniendo solo la información mínima necesaria para identificar el flujo de trabajo (por ejemplo, el ID de la publicación).
Este enfoque significa:
- Transparencia: En cualquier momento, podemos consultar la base de datos para ver el estado de cada tarea individual.
- Resiliencia: Si un trabajador falla, su último estado actualizado está en la base de datos. Cuando se reinicia (o otro trabajador recoge el mensaje reencolado), puede continuar desde donde lo dejó, o al menos entender lo que sucedió.
- Intervención Humana: Si un agente necesita pausar y esperar a que un humano revise algo, el estado puede establecerse en
WAITING_FOR_REVIEW. Una UI separada o proceso manual puede luego actualizar el estado aAPPROVED, lo que puede desencadenar que el siguiente trabajador del agente lo recoja. - Auditoría: Los cambios de estado proporcionan un claro rastro de auditoría.
Aquí tienes un fragmento conceptual de SQL para nuestra tabla workflow_states:
CREATE TABLE workflow_states (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_id VARCHAR(255) NOT NULL UNIQUE, -- e.g., 'post_123'
current_status VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
metadata JSONB -- Almacena cualquier otro dato relevante como JSON
);
-- Ejemplo de actualización de estado
UPDATE workflow_states
SET current_status = 'ENRICHED',
updated_at = NOW(),
metadata = jsonb_set(metadata, '{enrichment_details}', '{"source": "API_X", "score": 0.8}'::jsonb)
WHERE entity_id = 'post_123';
En nuestros trabajadores de Python, antes de procesar un mensaje, obtendríamos el estado. Después de un procesamiento exitoso, lo actualizaríamos. Si el estado no era el que esperábamos (por ejemplo, intentar enriquecer un post que ya estaba enriquecido), podríamos registrar un error y omitirlo o manejarlo de manera apropiada.
Paso 3: Orquestando con Disparadores Basados en Eventos
Con colas y estado persistente, esencialmente construimos un flujo de trabajo impulsado por eventos. Cada cambio de estado en la base de datos podría, a su vez, activar nuevas acciones. Para casos más simples, los trabajadores simplemente extraen de sus colas designadas. Pero para flujos más complejos y condicionales, a veces utilizamos disparadores de base de datos o un agente de “orquestador” dedicado que monitoreaba los cambios de estado.
Por ejemplo, si un flujo de trabajo alcanzaba el estado WAITING_FOR_REVIEW, nuestro agente orquestador podría notar esto, enviar una notificación de Slack a un equipo humano, y luego, una vez que el estado se actualizaba manualmente a APPROVED, encolaría un mensaje para el siguiente agente que activa una acción.
No es estrictamente necesario para cada agente duradero, pero para procesos realmente prolongados con humanos en el circuito, se vuelve increíblemente poderoso. Significa que tus agentes no solo están reaccionando a mensajes simples, sino al contexto más amplio de un flujo de trabajo en evolución.
La Recompensa: Menos Dolores de Cabeza, Más Sueño
La transición no fue instantánea. Requirió un cambio mental y más infraestructura (gestionando RabbitMQ y Postgres). Pero la recompensa fue inmensa.
- Fiabilidad: Nuestros agentes se volvieron significativamente más fiables. Podíamos reiniciar trabajadores, incluso servicios completos, sin perder datos ni progreso.
- Visibilidad: Siempre sabíamos lo que estaba sucediendo. Sarah ahora podía consultar la base de datos para ver exactamente qué posts estaban atascados, por qué y en qué etapa. No más búsqueda frenética en los registros.
- Escalabilidad: Podíamos escalar fácilmente tipos individuales de trabajadores (por ejemplo, más trabajadores de enriquecimiento durante horas pico) sin afectar a todo el sistema.
- Mantenibilidad: Cada trabajador se volvió más pequeño, enfocado en una sola responsabilidad. Esto hizo que la depuración y el desarrollo de características fueran mucho más simples.
Recuerdo una tarde en la que la API de redes sociales tuvo una caída inesperada durante tres horas. En el viejo sistema, habría sido un fracaso catastrófico, requiriendo un reinicio manual completo y reconciliación de datos. Con la configuración duradera, nuestro agente de “monitor” seguía enviando mensajes a la cola incoming_posts, que simplemente crecía. Los trabajadores de “enriquecimiento” simplemente se quedaron inactivos hasta que la API volvió a estar en línea, luego comenzaron a trabajar lentamente a través del retraso. No se perdió ningún dato, no se necesitó intervención manual. Fue hermoso.
Lecciones Prácticas
Si estás construyendo agentes que necesitan vivir más allá de una única ejecución atómica, aquí tienes en qué deberías pensar:
- Descompón Tu Agente: Divide las tareas complejas de los agentes en pasos más pequeños e independientes. Cada paso debe hacer una cosa bien.
- Acepta la Comunicación Asincrónica: Utiliza una cola de mensajes (RabbitMQ, Kafka, SQS, Redis Streams) para desacoplar estos pasos. Asegúrate de que tus colas y mensajes sean duraderos.
- Acknowledge Messages Carefully: Solo reconoce un mensaje de la cola una vez que su procesamiento esté completamente terminado y su salida (si la hay) esté segura y almacenada o encolada.
- Persistir el Estado del Flujo de Trabajo: Utiliza una base de datos (SQL o NoSQL) para almacenar el estado actual y los metadatos relevantes para cada tarea de larga duración. Esta es tu única fuente de verdad.
- Minimal Message Payloads: Los mensajes en tu cola deberían idealmente solo contener un ID que apunte al estado completo en tu almacén persistente, en lugar de llevar todo el estado.
- Diseña para Reintentos e Idempotencia: Tus trabajadores deberían poder procesar el mismo mensaje múltiples veces sin causar problemas (idempotencia), ya que los mensajes podrían ser reentregados.
- Considera un Orquestador (para flujos de trabajo complejos): Para procesos muy prolongados o con humanos en el circuito, un orquestador dedicado que monitoree los cambios de estado y dispare los pasos subsecuentes puede ser invaluable.
Construir agentes duraderos no se trata de encontrar una biblioteca mágica que haga todo por ti. Es un cambio de mentalidad arquitectónica. Se trata de aceptar que las cosas fallarán, y diseñar tu sistema no solo para recuperarse, sino para seguir avanzando a pesar de esos fallos. Sí, es más trabajo al principio, pero te ahorra horas interminables de apagar incendios y te da la tranquilidad para realmente innovar, en lugar de solo mantener.
¡Ahora, adelante y construye agentes que realmente perduren!
Artículos Relacionados
- Mi Agente Dev: Haciendo que los Agentes de IA Hagan Cosas Reales
- Herramientas de Desarrollo de IA: Cómo las Pruebas de IA Ahorran Horas a los Desarrolladores
- Pruebas de integración de agentes de IA
🕒 Published: