Alright, Leute, Leo Grant hier, zurück von einem von Koffein getriebenen Wochenende, um zu erkunden, was in der Agentenwelt vor sich geht. Ihr kennt mich, ich bin nicht der Typ, der viel Theorie quatscht. Ich mag es, mir die Hände schmutzig zu machen, Dinge kaputt zu machen und dann etwas Nützliches aufzubauen. Und in letzter Zeit sind meine Hände mit einem bestimmten Farbton von „Agentenorchestrierung“ befleckt – speziell mit dem Kopfzerbrechen und dem Triumph, lang laufende, zustandsbehaftete Agenten zu managen.
Wir waren alle schon mal in dieser Situation. Du baust einen coolen kleinen Agenten, vielleicht scrapt er ein paar Seiten, sendet eine E-Mail, aktualisiert eine Datenbank. Es funktioniert. Du klopfst dir auf die Schulter. Dann fragt jemand: „Kann er das eine Woche lang machen? Oder einen Monat? Und was, wenn er abstürzt? Oder auf externe Eingaben warten muss? Oder mit drei anderen Agenten koordinieren muss?“ Plötzlich sieht dein elegantes kleines Skript aus wie ein Kartenhaus im Hurrikan. Hier stoßen wir an die Grenzen der einfachen Aufgabenbearbeitung und betreten das unklare, oft frustrierende, aber letztendlich lohnende Gebiet des Agentenzustandsmanagements und der dauerhaften Ausführung.
Heute möchte ich über etwas sprechen, das mir in genau diesem Szenario das Leben gerettet hat: den Aufbau wahrhaft langlebiger Agenten durch eine Kombination aus Nachrichtenwarteschlangen und persistenten Zustandspeichern. Vergesst die Modewörter, vergesst den Hype. Es geht um praktische Ingenieurskunst für Agenten, die nicht nur *laufen*, sondern *überstehen*.
Das Problem: Agenten sind launische Geschöpfe
Meine erste große Lektion in Sachen Agentenhaltbarkeit kam aus einem Projekt im letzten Jahr. Wir bauten einen Agenten, der eine bestimmte Art von Social-Media-Post überwachen, Daten extrahieren, mit externen APIs anreichern und dann eine Reihe von Aktionen auslösen sollte – denkt an CRM-Updates, Slack-Benachrichtigungen und sogar das Entwerfen von Follow-up-E-Mails. Klingt einfach, oder?
Die erste Version war ein Python-Skript, das einen einfachen HTTP-Client und etwas lokale Verarbeitung verwendete. Es funktionierte großartig für ein paar Stunden. Dann wurde unsere Social Media API in der Rate begrenzt. Die externe Anreicherungs-API bekam Probleme mit einer fehlerhaften Anfrage. Unser Slack-Token lief ab. Jeder Fehler führte dazu, dass der gesamte Prozess abrupt stoppte. Wir verloren den Zustand, mussten manuell neu starten und oft Dinge erneut verarbeiten, was zu Duplikaten und verpassten Ereignissen führte.
Es war ein Albtraum manueller Interventionen. Meine Kollegin, Sarah, Gott hab sie selig, verbrachte mehr Zeit damit, diesen Agenten zu betreuen, als mit tatsächlicher Entwicklung. Das ist das klassische Problem: Agenten, durch ihre Natur, interagieren oft mit unzuverlässigen externen Systemen, erfordern Wartezeiten und führen komplexe, mehrstufige Operationen durch. Sie als einfache, atomare Funktionen zu behandeln, ist ein Rezept für das Desaster.
Die Lösung: Haltbarkeit von Grund auf annehmen
Der Wandel kam, als wir aufhörten, unseren Agenten als einzelnes, monumentales Skript zu betrachten, und begannen, ihn als eine Reihe von miteinander verbundenen, widerstandsfähigen Schritten zu sehen. Das führte uns auf den Weg der Nachrichtenwarteschlangen und persistenten Zustände. So haben wir es aufgeschlüsselt:
Schritt 1: Entkopplung mit Nachrichtenwarteschlangen
Der erste, entscheidende Schritt war die Einführung einer Nachrichtenwarteschlange. Wir verwendeten RabbitMQ, aber Kafka, SQS oder sogar Redis Streams würden genauso gut funktionieren. Die Idee ist einfach: Statt dass ein Teil des Agenten direkt einen anderen aufruft, kommunizieren sie, indem sie Nachrichten an eine Warteschlange senden. Dies erreicht mehrere Dinge:
- Asynchrone Verarbeitung: Wenn ein Schritt lange dauert, blockiert er nicht den nächsten.
- Zwischenspeicherung: Datenanfragen machen die nachgelagerten Dienste nicht überfordert.
- Wiederholungen: Wenn ein Verbraucher fehlschlägt, kann die Nachricht erneut in die Warteschlange eingereiht und später wiederholt werden.
- Skalierbarkeit: Du kannst nach Bedarf mehr Verbraucher für eine Warteschlange aufsetzen.
In unserem Social-Media-Agenten rief der „Überwacher“ nicht mehr direkt den „Anreicherer“ auf. Stattdessen schob er neue Post-Daten in eine incoming_posts-Warteschlange. Ein separater „Anreicherungs“-Arbeiter konsumierte aus dieser Warteschlange, verarbeitete die Daten und schob dann die angereicherten Daten in eine enriched_posts-Warteschlange. Diese Kette setzte sich fort.
Hier ist ein vereinfachtes Python-Beispiel, das pika für RabbitMQ verwendet. Dies ist nur zur Demonstration; in einer echten App würdest du dies in eine ordentliche Fehlerbehandlung und Verbindungsmanagement einpacken.
# Producer (z.B. der "Überwachungs"-Teil deines Agenten)
import pika
import json
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='incoming_posts', durable=True)
def publish_post(post_data):
message = json.dumps(post_data)
channel.basic_publish(
exchange='',
routing_key='incoming_posts',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(f" [x] Gesendet '{message}'")
# Beispielhafte Nutzung
for i in range(5):
post = {"id": f"post_{i}", "content": f"Neuer interessanter Post {i}", "timestamp": time.time()}
publish_post(post)
time.sleep(1)
connection.close()
# Consumer (z.B. der "Anreicherungs"-Arbeiter)
import pika
import json
import time
def callback(ch, method, properties, body):
data = json.loads(body)
print(f" [x] Empfangene {data['id']}")
# Simuliere einige Arbeit, die fehlschlagen oder Zeit benötigen könnte
try:
# Stelle dir vor, hier wird eine externe API aufgerufen
enriched_data = data.copy()
enriched_data['enriched_status'] = 'verarbeitet'
enriched_data['processed_at'] = time.time()
# Bestätige die Nachricht NUR NACH erfolgreicher Verarbeitung
ch.basic_ack(delivery_tag=method.delivery_tag)
# Jetzt, push zu der nächsten Warteschlange
publish_enriched_post(enriched_data)
except Exception as e:
print(f" [!] Fehler bei der Verarbeitung von {data['id']}: {e}")
# Stelle die Nachricht für eine spätere Wiederholung erneut in die Warteschlange
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print(f" [x] Fertig mit {data['id']}")
# Setup für das Publizieren zur nächsten Warteschlange (vereinfacht, normalerweise eine separate Verbindung)
connection_out = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel_out = connection_out.channel()
channel_out.queue_declare(queue='enriched_posts', durable=True)
def publish_enriched_post(enriched_data):
message = json.dumps(enriched_data)
channel_out.basic_publish(
exchange='',
routing_key='enriched_posts',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(f" [x] Gesendeter angereicherter Post '{enriched_data['id']}'")
connection_in = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel_in = connection_in.channel()
channel_in.queue_declare(queue='incoming_posts', durable=True)
channel_in.basic_consume(queue='incoming_posts', on_message_callback=callback)
print(' [*] Warte auf Nachrichten. Zum Beenden drücke CTRL+C')
channel_in.start_consuming()
connection_in.close()
connection_out.close()
Der Schlüssel hier ist durable=True für die Warteschlange und PERSISTENT_DELIVERY_MODE für die Nachrichten. Das bedeutet, selbst wenn RabbitMQ abstürzt, sind die Nachrichten da, wenn es wieder hochgefahren wird. Und basic_ack / basic_nack sind entscheidend: Bestätige eine Nachricht nur, wenn du wirklich fertig bist. Wenn dein Verbraucher stirbt, bevor er bestätigt, wird die Nachricht erneut zugestellt.
Schritt 2: Persistenter Zustand für langlaufende Prozesse
Warteschlangen lösen das Problem „Was passiert, wenn ein Schritt fehlschlägt?“ durch Erlauben von Wiederholungen und Entkopplung. Aber was ist mit dem Problem „Was ist der aktuelle Status dieser gesamten Aufgabe?“ Was, wenn ein Agent auf menschliches Feedback warten muss oder darauf warten muss, dass ein externes System einen langlaufenden Prozess abschließt? Hier kommt der persistente Zustand ins Spiel.
Mein ursprünglicher Gedanke war, einfach den gesamten Zustand im Nachrichtenpayload zu übergeben. Großer Fehler. Nachrichtenpayloads können riesig werden, und das Replizieren all dieser Daten über viele Warteschlangen hinweg ist ineffizient. Viel wichtiger ist, dass du eine einzige Quelle der Wahrheit für den gesamten Zustand des Workflows benötigst.
Wir führten eine PostgreSQL-Datenbank als unser zentrales Zustandslager ein. Für jeden Social-Media-Post, der in unser System eintrat, erstellten wir einen Datensatz in einer workflow_states-Tabelle. Dieser Datensatz hielt die eindeutige ID des Posts und seinen aktuellen Status (z.B. RECEIVED, ENRICHING, ENRICHED, ACTION_TRIGGERED, WAITING_FOR_REVIEW, COMPLETED). Er speicherte auch Metadaten, wie Zeitstempel für jede Zustandsänderung und alle relevanten Zwischendaten.
Jeder Arbeiter in unserer Agenten-Pipeline würde:
- Eine Nachricht aus seiner Eingangswarteschlange konsumieren.
- Den entsprechenden Workflow-Zustand in der Datenbank abfragen.
- Seine Arbeit verrichten.
- Den Workflow-Zustand in der Datenbank aktualisieren (z.B. von
ENRICHINGzuENRICHED). - Eine neue Nachricht an die nächste Warteschlange erzeugen, die nur die minimalen Informationen enthält, um den Workflow zu identifizieren (z.B. die Post-ID).
Dieser Ansatz bedeutet:
- Transparenz: Zu jedem Zeitpunkt können wir die Datenbank abfragen, um den Status jeder einzelnen Aufgabe zu sehen.
- Widerstandsfähigkeit: Wenn ein Arbeiter abstürzt, ist sein zuletzt aktualisierter Zustand in der DB. Wenn er neu gestartet wird (oder ein anderer Arbeiter die erneut in die Warteschlange eingestellte Nachricht aufnimmt), kann er von dort weitermachen, wo er aufgehört hat, oder zumindest verstehen, was passiert ist.
- Menschenintervention: Wenn ein Agent pausieren und auf die Überprüfung durch einen Menschen warten muss, kann der Zustand auf
WAITING_FOR_REVIEWgesetzt werden. Ein separates UI oder manueller Prozess kann dann den Zustand aufAPPROVEDaktualisieren, was den nächsten Agentenarbeiter dazu bringen kann, es aufzunehmen. - Überprüfung: Die Zustandsänderungen bieten eine klare Prüfspur.
Hier ist ein konzeptioneller SQL-Schnipsel für unsere workflow_states-Tabelle:
ERSTELLE TABELLE workflow_states (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_id VARCHAR(255) NOT NULL UNIQUE, -- z.B. 'post_123'
current_status VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
metadata JSONB -- Speichere alle anderen relevanten Daten als JSON
);
-- Beispiel für das Aktualisieren des Status
UPDATE workflow_states
SET current_status = 'ENRICHED',
updated_at = NOW(),
metadata = jsonb_set(metadata, '{enrichment_details}', '{"source": "API_X", "score": 0.8}'::jsonb)
WHERE entity_id = 'post_123';
In unseren Python-Workern würden wir vor der Verarbeitung einer Nachricht den Status abrufen. Nach erfolgreicher Verarbeitung würden wir ihn aktualisieren. Wenn der Status nicht dem entsprach, was wir erwartet hatten (z.B. wenn versucht wird, einen bereits angereicherten Beitrag zu bereichern), könnten wir einen Fehler protokollieren und ihn überspringen oder elegant behandeln.
Schritt 3: Orchestrierung mit ereignisgesteuerten Triggern
Mit Warteschlangen und persistentem Status haben wir im Grunde einen ereignisgesteuerten Workflow aufgebaut. Jede Statusänderung in der Datenbank könnte ihrerseits neue Aktionen auslösen. In einfacheren Fällen ziehen Worker einfach aus ihren zugewiesenen Warteschlangen. Bei komplexeren, bedingten Abläufen verwendeten wir manchmal Datenbank-Trigger oder einen speziellen „Orchestrator“-Agenten, der Statusänderungen überwachte.
Beispielsweise, wenn ein Workflow den WAITING_FOR_REVIEW-Status erreicht, könnte unser Orchestrator-Agent dies bemerken, eine Slack-Benachrichtigung an ein menschliches Team senden und dann, sobald der Status manuell auf APPROVED aktualisiert wurde, eine Nachricht für den nächsten aktionsauslösenden Agenten in die Warteschlange stellen.
Dies ist nicht für jeden langlebigen Agenten unbedingt erforderlich, aber für wirklich langfristige, menschlich gesteuerte Prozesse wird es äußerst leistungsstark. Es bedeutet, dass deine Agenten nicht nur auf einfache Nachrichten reagieren, sondern auf den breiteren Kontext eines sich entwickelnden Workflows.
Der Gewinn: Weniger Kopfschmerzen, mehr Schlaf
Der Übergang war nicht sofort. Es erforderte einen mentalen Wandel und mehr Infrastruktur (Verwaltung von RabbitMQ und Postgres). Aber die Auszahlung war immens.
- Zuverlässigkeit: Unsere Agenten wurden erheblich stabiler. Wir konnten Worker, sogar ganze Dienste, neu starten, ohne Daten oder Fortschritt zu verlieren.
- Sichtbarkeit: Wir wussten immer, was passierte. Sarah konnte jetzt die Datenbank abfragen, um genau zu sehen, welche Beiträge hängen geblieben sind, warum und in welchem Stadium. Kein hektisches Log-Diving mehr.
- Skalierbarkeit: Wir konnten individuelle Workertypen (z.B. mehr Anreicherungs-Worker während der Hauptzeiten) problemlos skalieren, ohne das gesamte System zu beeinträchtigen.
- Wartbarkeit: Jeder Worker wurde kleiner und konzentrierte sich auf eine einzige Verantwortung. Das erleichterte das Debugging und die Entwicklung von Funktionen erheblich.
Ich erinnere mich an einen Abend, als die Social-Media-API einen ungeplanten Ausfall von drei Stunden hatte. Im alten System wäre es ein katastrophaler Fehler gewesen, der einen vollständigen manuellen Neustart und eine Datenabgleich erforderte. Mit dem langlebigen Setup hielt unser „Überwachungs“-Agent weiterhin Nachrichten in die incoming_posts-Warteschlange zu schieben, die einfach wuchs. Die „Anreicherungs“-Worker warteten einfach, bis die API wieder online war, und arbeiteten dann langsam den Rückstand ab. Keine Daten verloren, keine manuelle Intervention erforderlich. Es war schön.
Handlungsfähige Erkenntnisse
Wenn du Agenten baust, die über eine einzige, atomare Ausführung hinaus bestehen müssen, solltest du an Folgendes denken:
- Zerlege deinen Agenten: Teile komplexe Agentenaufgaben in kleinere, unabhängige Schritte auf. Jeder Schritt sollte eine Sache gut machen.
- Umfange asynchrone Kommunikation: Verwende eine Nachrichtenwarteschlange (RabbitMQ, Kafka, SQS, Redis Streams), um diese Schritte zu entkoppeln. Stelle sicher, dass deine Warteschlangen und Nachrichten beständig sind.
- Nachrichten sorgfältig bestätigen: Bestätige eine Nachricht aus der Warteschlange erst, wenn deren Verarbeitung vollständig abgeschlossen ist und ihre Ausgabe (falls vorhanden) sicher gespeichert oder in die Warteschlange gestellt ist.
- Workflow-Status persistent speichern: Verwende eine Datenbank (SQL oder NoSQL), um den aktuellen Status und relevante Metadaten für jede lang laufende Aufgabe zu speichern. Dies ist deine einzige Quelle der Wahrheit.
- Minimale Nachrichteninhalte: Nachrichten in deiner Warteschlange sollten idealerweise nur eine ID enthalten, die auf den vollständigen Status in deinem persistenten Speicher verweist, anstatt den gesamten Status zu transportieren.
- Für Wiederholungen und Idempotenz designen: Deine Worker sollten in der Lage sein, dieselbe Nachricht mehrfach zu verarbeiten, ohne Probleme zu verursachen (Idempotenz), da Nachrichten möglicherweise erneut zugestellt werden.
- Überlege einen Orchestrator (für komplexe Workflows): Für sehr lang laufende oder menschlich gesteuerte Prozesse kann ein spezieller Orchestrator, der Statusänderungen überwacht und nachfolgende Schritte auslöst, von unschätzbarem Wert sein.
Den langlebigen Agenten geht es nicht darum, eine magische Bibliothek zu finden, die alles für dich erledigt. Es ist eine architektonische Denkweise. Es geht darum zu akzeptieren, dass Dinge fehlschlagen werden, und dein System so zu entwerfen, dass es nicht nur wiederhergestellt wird, sondern trotz dieser Fehler weiterhin Fortschritte macht. Es ist zwar mehr Arbeit im Voraus, ja, aber es spart dir unzählige Stunden der Brandbekämpfung und gibt dir die Ruhe, tatsächlich zu innovieren, anstatt nur zu warten.
Jetzt geh und baue Agenten, die wirklich Bestand haben!
Verwandte Artikel
- Mein Agent Dev: KI-Agenten reale Dinge tun lassen
- AI Dev Tools: Wie KI-Tests Entwicklerstunden sparen
- KI-Agenten-Integrationstests
🕒 Published: