Alright, folks, Leo Grant here, back from a caffeine-fueled weekend explore what’s cooking in the agent world. You know me, I’m not one for theoretical hand-waving. I like getting my hands dirty, breaking things, and then building something useful. And lately, my hands have been stained with a particular shade of “agent orchestration” – specifically, the headache and triumph of managing long-running, stateful agents.
We’ve all been there. You build a cool little agent, maybe it scrapes a few pages, sends an email, updates a database. It works. You pat yourself on the back. Then someone asks, “Can it do that for a week? Or a month? And what if it crashes? Or needs to wait for external input? Or has to coordinate with three other agents?” Suddenly, your elegant little script starts looking like a house of cards in a hurricane. That’s where we hit the wall with simple task execution and enter the murky, often frustrating, but ultimately rewarding territory of agent state management and durable execution.
Today, I want to talk about something that’s been a lifesaver for me in this exact scenario: building truly durable agents using a combination of message queues and persistent state stores. Forget the buzzwords, forget the hype. This is about practical engineering for agents that don’t just *run*, but *endure*.
The Problem: Agents Are Fickle Creatures
My first big lesson in agent durability came from a project last year. We were building an agent that would monitor a specific type of social media post, extract some data, enrich it with external APIs, and then trigger a series of actions – think CRM updates, Slack notifications, and even drafting follow-up emails. Sounds straightforward, right?
The initial version was a Python script using a simple HTTP client and some local processing. It worked great for a few hours. Then the social media API rate-limited us. The external enrichment API choked on a malformed request. Our Slack token expired. Each failure meant the entire process stopped dead. We lost state, had to manually restart, and often had to re-process things, leading to duplicates and missed events.
It was a nightmare of manual intervention. My colleague, Sarah, bless her heart, spent more time babysitting that agent than she did on actual development. This is the classic problem: agents, by their nature, often interact with unreliable external systems, require waiting, and perform complex, multi-step operations. Treating them as simple, atomic functions is a recipe for disaster.
The Solution: Embracing Durability from the Ground Up
The shift came when we stopped thinking about our agent as a single, monolithic script and started viewing it as a series of interconnected, resilient steps. This led us down the path of message queues and persistent state. Here’s how we broke it down:
Step 1: Decoupling with Message Queues
The first, most crucial step was introducing a message queue. We used RabbitMQ, but Kafka, SQS, or even Redis Streams would work just as well. The idea is simple: instead of one part of the agent directly calling another, they communicate by sending messages to a queue. This achieves several things:
- Asynchronous Processing: If one step takes a long time, it doesn’t block the next.
- Buffering: Spikes in incoming data don’t overwhelm downstream services.
- Retries: If a consumer fails, the message can be re-queued and retried later.
- Scalability: You can spin up more consumers for a queue as needed.
In our social media agent, the “monitor” part no longer directly called the “enricher.” Instead, it pushed new post data onto an incoming_posts queue. A separate “enrichment” worker consumed from this queue, processed the data, and then pushed the enriched data onto an enriched_posts queue. This chain continued.
Here’s a simplified Python example using pika for RabbitMQ. This is just for demonstration; in a real app, you’d wrap this in proper error handling and connection management.
# Producer (e.g., the "monitor" part of your agent)
import pika
import json
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='incoming_posts', durable=True)
def publish_post(post_data):
message = json.dumps(post_data)
channel.basic_publish(
exchange='',
routing_key='incoming_posts',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(f" [x] Sent '{message}'")
# Example usage
for i in range(5):
post = {"id": f"post_{i}", "content": f"New interesting post {i}", "timestamp": time.time()}
publish_post(post)
time.sleep(1)
connection.close()
# Consumer (e.g., the "enrichment" worker)
import pika
import json
import time
def callback(ch, method, properties, body):
data = json.loads(body)
print(f" [x] Received {data['id']}")
# Simulate some work that might fail or take time
try:
# Imagine calling an external API here
enriched_data = data.copy()
enriched_data['enriched_status'] = 'processed'
enriched_data['processed_at'] = time.time()
# Acknowledge the message ONLY AFTER successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
# Now, push to the next queue
publish_enriched_post(enriched_data)
except Exception as e:
print(f" [!] Error processing {data['id']}: {e}")
# Re-queue the message for later retry
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print(f" [x] Done with {data['id']}")
# Setup for publishing to the next queue (simplified, usually a separate connection)
connection_out = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel_out = connection_out.channel()
channel_out.queue_declare(queue='enriched_posts', durable=True)
def publish_enriched_post(enriched_data):
message = json.dumps(enriched_data)
channel_out.basic_publish(
exchange='',
routing_key='enriched_posts',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(f" [x] Sent enriched post '{enriched_data['id']}'")
connection_in = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel_in = connection_in.channel()
channel_in.queue_declare(queue='incoming_posts', durable=True)
channel_in.basic_consume(queue='incoming_posts', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel_in.start_consuming()
connection_in.close()
connection_out.close()
The key here is durable=True for the queue and PERSISTENT_DELIVERY_MODE for messages. This means even if RabbitMQ crashes, the messages are still there when it comes back up. And basic_ack / basic_nack are critical: only acknowledge a message when you’re truly done with it. If your consumer dies before acknowledging, the message will be redelivered.
Step 2: Persistent State for Long-Running Processes
Queues solve the “what if a step fails?” problem by allowing retries and decoupling. But what about the “what’s the current status of this overall task?” problem? What if an agent needs to wait for human input, or for an external system to complete a long-running process? That’s where persistent state comes in.
My initial thought was to just pass all state in the message payload. Big mistake. Message payloads can get huge, and replicating all that data across many queues is inefficient. More importantly, you need a single source of truth for the entire workflow’s state.
We introduced a PostgreSQL database as our central state store. For each social media post that entered our system, we created a record in a workflow_states table. This record held the unique ID of the post and its current status (e.g., RECEIVED, ENRICHING, ENRICHED, ACTION_TRIGGERED, WAITING_FOR_REVIEW, COMPLETED). It also stored metadata, like timestamps for each state change and any relevant intermediate data.
Each worker in our agent pipeline would:
- Consume a message from its input queue.
- Look up the corresponding workflow state in the database.
- Perform its work.
- Update the workflow state in the database (e.g., from
ENRICHINGtoENRICHED). - Produce a new message to the next queue, containing only the minimal information needed to identify the workflow (e.g., the post ID).
This approach means:
- Transparency: At any point, we can query the database to see the status of every single task.
- Resilience: If a worker crashes, its last updated state is in the DB. When it restarts (or another worker picks up the re-queued message), it can resume from where it left off, or at least understand what happened.
- Human Intervention: If an agent needs to pause and wait for a human to review something, the state can be set to
WAITING_FOR_REVIEW. A separate UI or manual process can then update the state toAPPROVED, which can trigger the next agent worker to pick it up. - Auditing: The state changes provide a clear audit trail.
Here’s a conceptual SQL snippet for our workflow_states table:
CREATE TABLE workflow_states (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_id VARCHAR(255) NOT NULL UNIQUE, -- e.g., 'post_123'
current_status VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
metadata JSONB -- Store any other relevant data as JSON
);
-- Example of updating state
UPDATE workflow_states
SET current_status = 'ENRICHED',
updated_at = NOW(),
metadata = jsonb_set(metadata, '{enrichment_details}', '{"source": "API_X", "score": 0.8}'::jsonb)
WHERE entity_id = 'post_123';
In our Python workers, before processing a message, we’d fetch the state. After successful processing, we’d update it. If the state wasn’t what we expected (e.g., trying to enrich an already enriched post), we could log an error and skip or handle it gracefully.
Step 3: Orchestrating with Event-Driven Triggers
With queues and persistent state, we essentially built an event-driven workflow. Each state change in the database could, in turn, trigger new actions. For simpler cases, workers just pull from their designated queues. But for more complex, conditional flows, we sometimes used database triggers or a dedicated “orchestrator” agent that monitored state changes.
For example, if a workflow reached the WAITING_FOR_REVIEW state, our orchestrator agent might notice this, send a Slack notification to a human team, and then, once the state was manually updated to APPROVED, it would queue a message for the next action-triggering agent.
This isn’t strictly necessary for every durable agent, but for truly long-running, human-in-the-loop processes, it becomes incredibly powerful. It means your agents aren’t just reacting to simple messages, but to the broader context of an evolving workflow.
The Payoff: Less Headaches, More Sleep
The transition wasn’t instantaneous. It required a mental shift and more infrastructure (managing RabbitMQ and Postgres). But the payoff was immense.
- Reliability: Our agents became significantly more solid. We could restart workers, even entire services, without losing data or progress.
- Visibility: We always knew what was happening. Sarah could now query the database to see exactly which posts were stuck, why, and at what stage. No more frantic log diving.
- Scalability: We could easily scale up individual worker types (e.g., more enrichment workers during peak hours) without affecting the entire system.
- Maintainability: Each worker became smaller, focused on a single responsibility. This made debugging and feature development much simpler.
I remember one evening when the social media API had an unscheduled outage for three hours. In the old system, it would have been a catastrophic failure, requiring a full manual restart and data reconciliation. With the durable setup, our “monitor” agent kept pushing messages to the incoming_posts queue, which just grew. The “enrichment” workers simply idled until the API came back online, then slowly worked through the backlog. No data lost, no manual intervention needed. It was beautiful.
Actionable Takeaways
If you’re building agents that need to live beyond a single, atomic execution, here’s what you should be thinking about:
- Decompose Your Agent: Break down complex agent tasks into smaller, independent steps. Each step should do one thing well.
- Embrace Asynchronous Communication: Use a message queue (RabbitMQ, Kafka, SQS, Redis Streams) to decouple these steps. Make sure your queues and messages are durable.
- Acknowledge Messages Carefully: Only acknowledge a message from the queue once its processing is fully complete and its output (if any) is safely stored or queued.
- Persist Workflow State: Use a database (SQL or NoSQL) to store the current status and relevant metadata for each long-running task. This is your single source of truth.
- Minimal Message Payloads: Messages in your queue should ideally only contain an ID that points to the full state in your persistent store, rather than carrying the entire state.
- Design for Retries and Idempotency: Your workers should be able to process the same message multiple times without causing issues (idempotency), as messages might be redelivered.
- Consider an Orchestrator (for complex workflows): For very long-running or human-in-the-loop processes, a dedicated orchestrator that monitors state changes and triggers subsequent steps can be invaluable.
Building durable agents isn’t about finding a magic library that does everything for you. It’s a architectural mindset shift. It’s about accepting that things will fail, and designing your system to not just recover, but to continue making progress despite those failures. It’s more work upfront, yes, but it saves you countless hours of firefighting and gives you the peace of mind to actually innovate, rather than just maintain.
Now, go forth and build agents that truly endure!
Related Articles
- My Agent Dev: Making AI Agents Do Real Things
- AI Dev Tools: How AI Testing Saves Developer Hours
- AI agent integration testing
🕒 Last updated: · Originally published: March 19, 2026