mirror of
https://github.com/aljazceru/Tutorial-Codebase-Knowledge.git
synced 2025-12-18 15:04:20 +01:00
add google a2a
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -93,4 +93,7 @@ coverage/
|
||||
.eslintcache
|
||||
|
||||
# Optional REPL history
|
||||
.node_repl_history
|
||||
.node_repl_history
|
||||
|
||||
# LLM cache
|
||||
llm_cache.json
|
||||
@@ -44,6 +44,8 @@ This is a tutorial project of [Pocket Flow](https://github.com/The-Pocket/Pocket
|
||||
|
||||
- [Flask](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Flask) - Craft web apps with minimal code that scales from prototype to production!
|
||||
|
||||
- [Google A2A](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Google%20A2A) - The universal language that lets AI agents collaborate across borders!
|
||||
|
||||
- [LangGraph](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/LangGraph) - Design AI agents as flowcharts where each step remembers what happened before!
|
||||
|
||||
- [LevelDB](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/LevelDB) - Store data at warp speed with Google's engine that powers blockchains!
|
||||
|
||||
169
docs/Google A2A/01_agent_card.md
Normal file
169
docs/Google A2A/01_agent_card.md
Normal file
@@ -0,0 +1,169 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Agent Card"
|
||||
parent: "Google A2A"
|
||||
nav_order: 1
|
||||
---
|
||||
|
||||
# Chapter 1: Agent Card - The AI's Business Card
|
||||
|
||||
Welcome to the Google Agent-to-Agent (A2A) Protocol tutorial! Imagine a world full of helpful AI assistants, or "agents." Maybe one agent is great at translating languages, another excels at summarizing long documents, and a third can book appointments. How do these agents, potentially built by different companies using different technologies, find each other and figure out how to work together?
|
||||
|
||||
That's where the **Agent Card** comes in. It solves the problem of **discovery** – how one agent or application can learn about another agent's existence, capabilities, and how to communicate with it.
|
||||
|
||||
Think of it like this:
|
||||
|
||||
* **You want to hire a plumber.** How do you find one? You might look them up online, find their website, or get their business card. This tells you their name, what services they offer (fixing leaks, installing pipes), and how to contact them (phone number, address).
|
||||
* **An application (or another agent) wants to use an AI agent.** How does it find one? It looks for the agent's **Agent Card**.
|
||||
|
||||
## What is an Agent Card?
|
||||
|
||||
An **Agent Card** is a small, standardized file, usually named `agent.json`, that acts like a public profile or digital business card for an AI agent. It's typically hosted by the agent itself at a predictable web address.
|
||||
|
||||
This card contains essential information:
|
||||
|
||||
1. **Who is the agent?** (Name, description, version, who made it)
|
||||
2. **What can it do?** (List of skills, like "translate_text" or "summarize_document")
|
||||
3. **How do I talk to it?** (The agent's web address/URL, what kind of inputs it understands - text, files, structured data?)
|
||||
4. **Does it have special features?** (Like supporting real-time updates via streaming?)
|
||||
|
||||
By reading this card, other agents or applications can quickly understand if this agent is the right one for a job and exactly how to start a conversation (or, in technical terms, initiate a [Task](02_task.md)).
|
||||
|
||||
## Finding and Reading the Card (Discovery)
|
||||
|
||||
Just like many websites have a standard `robots.txt` file to tell search engines what to do, A2A agents typically make their Agent Card available at a standard path: `/.well-known/agent.json`.
|
||||
|
||||
So, if an agent lives at `http://my-translator-agent.com`, its Agent Card would likely be found at `http://my-translator-agent.com/.well-known/agent.json`.
|
||||
|
||||
Let's see how a client application might fetch this card using Python.
|
||||
|
||||
```python
|
||||
# File: demo/ui/utils/agent_card.py (simplified)
|
||||
import requests # A library to make web requests
|
||||
from common.types import AgentCard # A helper to understand the card's structure
|
||||
|
||||
def get_agent_card(remote_agent_address: str) -> AgentCard:
|
||||
"""Gets the agent card from the agent's address."""
|
||||
agent_card_url = f"{remote_agent_address}/.well-known/agent.json"
|
||||
print(f"Fetching card from: {agent_card_url}")
|
||||
# Make a web request to get the file
|
||||
response = requests.get(agent_card_url)
|
||||
response.raise_for_status() # Check if the request was successful
|
||||
# Parse the JSON file content into an AgentCard object
|
||||
return AgentCard(**response.json())
|
||||
|
||||
# Example Usage:
|
||||
agent_address = "http://example-agent.com" # Assume our agent is here
|
||||
try:
|
||||
card = get_agent_card(agent_address)
|
||||
print(f"Got card for agent: {card.name}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Could not fetch card: {e}")
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. We define the `agent_address` where the agent lives.
|
||||
2. The function builds the full URL to the standard `agent.json` path.
|
||||
3. It uses the `requests` library to make an HTTP GET request, just like your web browser does when you visit a page.
|
||||
4. If the request is successful (HTTP status 200 OK), it takes the JSON text returned by the server and parses it into a structured `AgentCard` object that the program can easily use.
|
||||
|
||||
### Example `agent.json`
|
||||
|
||||
Here's a simplified example of what the `agent.json` file might look like:
|
||||
|
||||
```json
|
||||
// File: /.well-known/agent.json (Example)
|
||||
{
|
||||
"name": "Text Summarizer Bot",
|
||||
"description": "Summarizes long text documents.",
|
||||
"version": "1.0.0",
|
||||
"url": "http://example-agent.com/a2a", // Where to send tasks
|
||||
"capabilities": {
|
||||
"streaming": false // Doesn't support real-time updates
|
||||
},
|
||||
"defaultInputModes": ["text"], // Primarily accepts text
|
||||
"defaultOutputModes": ["text"], // Primarily outputs text
|
||||
"skills": [
|
||||
{
|
||||
"id": "summarize",
|
||||
"name": "Summarize Text",
|
||||
"description": "Provide text, get a short summary."
|
||||
}
|
||||
],
|
||||
"provider": {
|
||||
"organization": "AI Helpers Inc."
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
* `name`, `description`, `version`, `provider`: Basic identification info.
|
||||
* `url`: The specific endpoint *within* the agent's server where A2A communication happens (we'll use this later when sending a [Task](02_task.md)).
|
||||
* `capabilities`: Tells us if it supports advanced features like `streaming`. This one doesn't.
|
||||
* `defaultInputModes`/`defaultOutputModes`: What kind of data it generally works with (here, just plain `text`).
|
||||
* `skills`: A list of specific things this agent can do. This one has a "summarize" skill.
|
||||
|
||||
## Under the Hood: The Discovery Flow
|
||||
|
||||
How does fetching the Agent Card actually work between the client and the agent (server)? It's a simple web request:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant C as Client App
|
||||
participant A as Agent Server
|
||||
C->>A: GET /.well-known/agent.json
|
||||
Note right of A: Agent looks for its agent.json file
|
||||
A-->>C: 200 OK (Returns content of agent.json)
|
||||
Note left of C: Client parses the JSON data
|
||||
```
|
||||
|
||||
**Steps:**
|
||||
|
||||
1. **Client Request:** The client application (e.g., our Python script) sends an HTTP GET request to the agent's base URL + `/.well-known/agent.json`.
|
||||
2. **Server Response:** The agent's server receives the request, finds its `agent.json` file, and sends its content back to the client with a success status (like `200 OK`).
|
||||
3. **Client Processing:** The client receives the JSON data and processes it to understand the agent's capabilities.
|
||||
|
||||
The provided sample code includes helper classes to make this easier:
|
||||
|
||||
* **Python:** The `A2ACardResolver` class (`samples/python/common/client/card_resolver.py`) handles fetching and parsing the card.
|
||||
* **JavaScript:** The `cli.ts` sample (`samples/js/src/cli.ts`) uses the standard `fetch` API to get the card directly.
|
||||
|
||||
```typescript
|
||||
// File: samples/js/src/cli.ts (Relevant Snippet)
|
||||
async function fetchAndDisplayAgentCard() {
|
||||
const wellKnownUrl = new URL("/.well-known/agent.json", serverUrl).toString();
|
||||
console.log(`Attempting to fetch agent card from: ${wellKnownUrl}`);
|
||||
try {
|
||||
// Use browser's fetch to get the card
|
||||
const response = await fetch(wellKnownUrl);
|
||||
if (response.ok) {
|
||||
const card: AgentCard = await response.json(); // Parse JSON
|
||||
agentName = card.name || "Agent";
|
||||
console.log(`✓ Agent Card Found: ${agentName}`);
|
||||
// ... display other card info ...
|
||||
} else {
|
||||
console.log(`⚠️ Could not fetch agent card (Status: ${response.status})`);
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.log(`⚠️ Error fetching agent card: ${error.message}`);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This JavaScript code does essentially the same thing as the Python example: builds the URL, fetches the content, and parses the JSON if successful.
|
||||
|
||||
## Conclusion
|
||||
|
||||
The Agent Card is the cornerstone of discovery in the A2A protocol. It's the agent's public announcement, telling the world who it is, what it can do, and how to interact with it. By fetching and reading this simple `agent.json` file, clients can dynamically discover and prepare to communicate with diverse AI agents.
|
||||
|
||||
Now that we understand how to *find* an agent and learn its basic properties using the Agent Card, we need to learn how to actually *give it work* to do. This brings us to the concept of a **Task**.
|
||||
|
||||
Ready to learn how to ask an agent to perform an action? Let's move on to the next chapter!
|
||||
|
||||
**Next:** [Chapter 2: Task](02_task.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
215
docs/Google A2A/02_task.md
Normal file
215
docs/Google A2A/02_task.md
Normal file
@@ -0,0 +1,215 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Task"
|
||||
parent: "Google A2A"
|
||||
nav_order: 2
|
||||
---
|
||||
|
||||
# Chapter 2: Task - The AI's Work Order
|
||||
|
||||
In the [previous chapter](01_agent_card.md), we learned how to find an AI agent and read its "business card" – the **Agent Card** – to understand what it can do and how to contact it. Think of it like finding a translator's contact information.
|
||||
|
||||
But just knowing the translator exists isn't enough. You need to actually *give them something to translate*! How do you formally request work from an A2A agent?
|
||||
|
||||
That's where the **Task** comes in. It solves the problem of **requesting and tracking work**.
|
||||
|
||||
## What is a Task?
|
||||
|
||||
Imagine you run a busy workshop. When a customer comes in wanting something built or fixed, you don't just rely on a verbal request. You create a **work order** or a **job ticket**. This ticket contains:
|
||||
|
||||
1. **What needs to be done?** (The customer's request - e.g., "Build a small bookshelf")
|
||||
2. **Who requested it?** (Customer details)
|
||||
3. **A unique ID** to track this specific job.
|
||||
4. **The current status** (e.g., "Not Started", "In Progress", "Awaiting Materials", "Completed").
|
||||
5. **The final result** (e.g., the finished bookshelf, or notes about why it couldn't be done).
|
||||
|
||||
In the A2A world, a **Task** is exactly like that work order. It's the main way agents exchange work:
|
||||
|
||||
1. **Instructions:** It starts with the initial request message from the client (e.g., "Translate 'hello world' to French").
|
||||
2. **Tracking ID:** Each task gets a unique ID so both the client and the agent know which job they're talking about.
|
||||
3. **Status:** It has a state that changes as the agent works on it (e.g., `submitted`, `working`, `completed`, `failed`).
|
||||
4. **Results:** When finished, it holds the output, called **Artifacts** (e.g., the translated text "Bonjour le monde").
|
||||
|
||||
So, if our "Translator Agent" receives a Task asking for a translation, that Task object will contain the text to translate, track whether the agent is currently translating it, and eventually hold the French translation once it's done.
|
||||
|
||||
## Creating and Sending a Task
|
||||
|
||||
How does a client (like your application, or another agent) actually create and send a Task to an agent server? It uses a specific command defined by the A2A protocol, usually called `tasks/send`.
|
||||
|
||||
Let's say our client found the "Translator Agent" from Chapter 1 and knows its `url` is `http://translator-agent.com/a2a`. The client wants to translate "hello".
|
||||
|
||||
Here's a simplified Python example of how the client might send this request:
|
||||
|
||||
```python
|
||||
# File: samples/python/hosts/cli/cli_host.py (Conceptual Snippet)
|
||||
import requests
|
||||
import json
|
||||
import uuid # To generate unique IDs
|
||||
from common.types import TaskSendParams, Message, TextPart, Task
|
||||
|
||||
# Agent's communication endpoint (from Agent Card)
|
||||
agent_a2a_url = "http://translator-agent.com/a2a"
|
||||
|
||||
# 1. Prepare the Task request details
|
||||
task_id = str(uuid.uuid4()) # Generate a unique ID for this job
|
||||
user_message = Message(
|
||||
role="user",
|
||||
parts=[TextPart(text="Translate 'hello' to French")]
|
||||
)
|
||||
task_params = TaskSendParams(id=task_id, message=user_message)
|
||||
|
||||
# 2. Create the JSON-RPC request structure
|
||||
request_payload = {
|
||||
"jsonrpc": "2.0",
|
||||
"method": "tasks/send", # The command to send a task
|
||||
"params": task_params.model_dump(exclude_none=True), # Our task details
|
||||
"id": "req-1" # An ID for *this specific web request*
|
||||
}
|
||||
|
||||
# 3. Send the request to the agent's URL
|
||||
print(f"Sending task {task_id} to {agent_a2a_url}")
|
||||
response = requests.post(agent_a2a_url, json=request_payload)
|
||||
response.raise_for_status() # Check for HTTP errors
|
||||
|
||||
# 4. Process the response
|
||||
response_data = response.json()
|
||||
if response_data.get("result"):
|
||||
# Agent accepted the task! It returns the initial Task object.
|
||||
initial_task = Task(**response_data["result"])
|
||||
print(f"Task created! ID: {initial_task.id}, State: {initial_task.status.state}")
|
||||
elif response_data.get("error"):
|
||||
print(f"Error creating task: {response_data['error']}")
|
||||
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Prepare Details:** We generate a unique `task_id` and create the `Message` containing the text we want translated. These become the `params` for our request.
|
||||
2. **Build Request:** We wrap our `params` in a standard structure specifying the `method` (`tasks/send`) we want the agent to execute. (This structure is part of JSON-RPC, which is used by A2A - more on this in the [next chapter](03_a2a_protocol___core_types.md)).
|
||||
3. **Send:** We use the `requests` library to send this structure as JSON data via an HTTP POST request to the agent's A2A `url`.
|
||||
4. **Process Response:** The agent sends back a response. If successful, the `result` contains the newly created `Task` object, likely in the `submitted` state. We print its ID and initial state. If something went wrong, the `error` field will contain details.
|
||||
|
||||
**Example Output:**
|
||||
|
||||
```
|
||||
Sending task a1b2c3d4-e5f6-7890-abcd-ef1234567890 to http://translator-agent.com/a2a
|
||||
Task created! ID: a1b2c3d4-e5f6-7890-abcd-ef1234567890, State: submitted
|
||||
```
|
||||
|
||||
Now the client knows the task was received and has its unique ID (`a1b2c3d4-...`). It can use this ID later to check the status or get the final result.
|
||||
|
||||
## Task Lifecycle: States
|
||||
|
||||
A task doesn't just get created and instantly completed. It goes through different stages, represented by its `state` field. Here are the main states:
|
||||
|
||||
* `submitted`: The agent has received the task request but hasn't started working on it yet.
|
||||
* `working`: The agent is actively processing the request (e.g., performing the translation).
|
||||
* `input-required`: (Optional) The agent needs more information from the client to continue. The client would then send another message using the same Task ID.
|
||||
* `completed`: The agent finished successfully. The results are available in the Task's `artifacts`.
|
||||
* `failed`: The agent encountered an error and could not complete the task.
|
||||
* `canceled`: The client (or agent) explicitly canceled the task before completion.
|
||||
* `unknown`: The state couldn't be determined.
|
||||
|
||||
These states allow the client to understand the progress of their request. For long-running tasks, the agent might even send updates as the state changes (we'll cover this in [Chapter 7: Streaming Communication (SSE)](07_streaming_communication__sse_.md)).
|
||||
|
||||
## Under the Hood: How a Task is Handled
|
||||
|
||||
Let's trace what happens when the client sends that `tasks/send` request:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant C as Client App
|
||||
participant A as Agent Server (A2A Endpoint)
|
||||
participant TS as Task Store (e.g., Memory, DB)
|
||||
participant TL as Task Logic (e.g., Translator)
|
||||
|
||||
C->>A: POST /a2a (JSON-RPC: method="tasks/send", params={id="T1", msg="Translate..."})
|
||||
Note right of A: Receives HTTP request, parses JSON-RPC
|
||||
|
||||
A->>TS: Create/Find Task Record (ID: "T1")
|
||||
Note right of TS: Creates a new Task object in 'submitted' state
|
||||
TS-->>A: New Task Object (ID: "T1", state: "submitted")
|
||||
|
||||
A-->>C: 200 OK (JSON-RPC: result={Task Object with state 'submitted'})
|
||||
Note left of C: Client receives confirmation Task is created
|
||||
|
||||
Note over A,TL: Agent asynchronously starts processing...
|
||||
A->>TL: Start processing Task "T1" (Input: "Translate...")
|
||||
A->>TS: Update Task "T1" status to 'working'
|
||||
Note right of TS: Updates Task record state
|
||||
|
||||
TL->>A: Processing finished (Output: "Bonjour")
|
||||
Note over A,TS: Agent updates Task with result and 'completed' state
|
||||
A->>TS: Update Task "T1" (state: 'completed', artifacts: ["Bonjour"])
|
||||
|
||||
```
|
||||
|
||||
**Steps:**
|
||||
|
||||
1. **Client Sends Request:** The client sends the `tasks/send` JSON-RPC request via HTTP POST to the agent's A2A URL.
|
||||
2. **Server Receives:** The agent server receives the request and understands it wants to start a task.
|
||||
3. **Server Stores Task:** The server creates a new `Task` record (using something like the `InMemoryTaskStore` or `FileStore` shown in `samples/js/src/server/store.ts` or conceptually managed by `samples/python/common/server/task_manager.py`). It assigns the initial `submitted` state and stores the user's message.
|
||||
4. **Server Responds:** The server immediately sends a response back to the client confirming the task was created, including the initial `Task` object.
|
||||
5. **Server Processes (Async):** The server (likely in the background) triggers the actual work (e.g., calls its internal translation logic). It updates the task's state in the store to `working`.
|
||||
6. **Server Completes:** Once the translation is done, the server updates the task's state to `completed` and adds the result ("Bonjour") as an `Artifact` in the task record within the store.
|
||||
|
||||
The client can later use the Task ID (`T1`) to fetch the updated Task object (using a different command like `tasks/get`) and retrieve the final translation from the `artifacts`.
|
||||
|
||||
### Key Data Structures
|
||||
|
||||
The definition of these structures can be found in the protocol specification and helper libraries:
|
||||
|
||||
* **Task:** (`samples/python/common/types.py:Task`, `samples/js/src/schema.ts:Task`) Holds the ID, status, artifacts, history, etc.
|
||||
* **Message:** (`samples/python/common/types.py:Message`, `samples/js/src/schema.ts:Message`) Represents a communication turn (user or agent) containing Parts.
|
||||
* **Part:** (`samples/python/common/types.py:Part`, `samples/js/src/schema.ts:Part`) The actual content (text, file, or structured data).
|
||||
* **Artifact:** (`samples/python/common/types.py:Artifact`, `samples/js/src/schema.ts:Artifact`) Output generated by the agent, also composed of Parts.
|
||||
* **TaskStatus:** (`samples/python/common/types.py:TaskStatus`, `samples/js/src/schema.ts:TaskStatus`) Contains the `TaskState` and timestamp.
|
||||
|
||||
```typescript
|
||||
// File: samples/js/src/schema.ts (Simplified Task Structure)
|
||||
|
||||
export interface Task {
|
||||
// Unique identifier for the task.
|
||||
id: string;
|
||||
// The current status of the task.
|
||||
status: TaskStatus;
|
||||
// Optional list of artifacts (outputs).
|
||||
artifacts?: Artifact[] | null;
|
||||
// (Optional) History of messages for this task
|
||||
// history?: Message[] | null;
|
||||
// ... other fields like sessionId, metadata
|
||||
}
|
||||
|
||||
export interface TaskStatus {
|
||||
// The current state (e.g., "submitted", "working", "completed").
|
||||
state: TaskState;
|
||||
// Optional message associated with this status.
|
||||
message?: Message | null;
|
||||
// Timestamp of this status update.
|
||||
timestamp?: string;
|
||||
}
|
||||
|
||||
// Example Artifact containing translated text
|
||||
// artifact = { parts: [ { type: "text", text: "Bonjour le monde" } ] }
|
||||
```
|
||||
|
||||
This structure acts as the digital "work order" that travels between the client and the agent, carrying the request, tracking progress, and holding the final result.
|
||||
|
||||
## Conclusion
|
||||
|
||||
The **Task** is the fundamental unit of work in the A2A protocol. It's how one agent asks another to do something. Think of it as a formal job request or work order that:
|
||||
|
||||
* Contains the initial instructions (as a `Message`).
|
||||
* Has a unique ID for tracking.
|
||||
* Goes through different states (`submitted`, `working`, `completed`, etc.) to show progress.
|
||||
* Holds the final results (`Artifacts`).
|
||||
|
||||
By sending a `tasks/send` request, a client initiates a Task, and by checking the Task's status and artifacts later, the client gets the results.
|
||||
|
||||
Now that we understand the basic concepts of finding an agent ([Agent Card](01_agent_card.md)) and giving it work ([Task](02_task.md)), let's look closer at the communication rules and the specific data types used in the A2A protocol.
|
||||
|
||||
**Next:** [Chapter 3: A2A Protocol & Core Types](03_a2a_protocol___core_types.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
244
docs/Google A2A/03_a2a_protocol___core_types.md
Normal file
244
docs/Google A2A/03_a2a_protocol___core_types.md
Normal file
@@ -0,0 +1,244 @@
|
||||
---
|
||||
layout: default
|
||||
title: "A2A Protocol & Core Types"
|
||||
parent: "Google A2A"
|
||||
nav_order: 3
|
||||
---
|
||||
|
||||
# Chapter 3: A2A Protocol & Core Types
|
||||
|
||||
In the previous chapters, we learned how to find an agent using its [Agent Card](01_agent_card.md) and how to give it work using a [Task](02_task.md). Think of it like finding a specific workshop (Agent Card) and submitting a work order (Task).
|
||||
|
||||
But how do the client (who submits the order) and the agent (the workshop) actually *talk* to each other? What language do they use? If the client writes the order in English, but the workshop only understands Spanish, nothing will get done!
|
||||
|
||||
This chapter tackles that problem: **How do different AI agents, possibly built by different teams using different technologies, communicate reliably?**
|
||||
|
||||
The answer lies in the **A2A Protocol** and its **Core Types**.
|
||||
|
||||
## What is a Protocol? The Rules of the Road
|
||||
|
||||
Imagine trying to drive in a country where you don't know the traffic rules. Do you drive on the left or right? What do the signs mean? It would be chaos! Traffic rules are a **protocol** – a shared set of rules everyone agrees on so things run smoothly.
|
||||
|
||||
Similarly, the **A2A Protocol** is the set of rules for how AI agents communicate. It defines:
|
||||
|
||||
1. **The Transport:** *How* messages physically travel (usually over the internet using standard HTTP requests, like your web browser uses).
|
||||
2. **The Format:** *What* the messages look like (the structure and language used).
|
||||
3. **The Actions:** *What* commands one agent can send to another (like "start a task" or "cancel a task").
|
||||
|
||||
Think of it as the **shared language** for AI agents. Just like humans use languages like English or Spanish, which have grammar (rules) and vocabulary (words), the A2A protocol provides the grammar and vocabulary for agents.
|
||||
|
||||
## The Grammar: JSON-RPC 2.0
|
||||
|
||||
For the A2A protocol, the chosen "grammar" is a standard called **JSON-RPC 2.0**. Don't let the name scare you! It's just a simple way to structure messages using JSON (JavaScript Object Notation - a very common text format for data).
|
||||
|
||||
Here's the basic idea:
|
||||
|
||||
* **Client sends a Request:** The client wanting the agent to do something sends a `Request` message.
|
||||
* **Agent sends a Response:** The agent replies with a `Response` message.
|
||||
|
||||
A typical JSON-RPC Request looks like this:
|
||||
|
||||
```json
|
||||
{
|
||||
"jsonrpc": "2.0", // Specifies the protocol version
|
||||
"method": "some_action", // What the client wants the agent to DO
|
||||
"params": { ... }, // The details needed for the action
|
||||
"id": "request-123" // A unique ID to match request and response
|
||||
}
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
* `jsonrpc`: Always "2.0".
|
||||
* `method`: The name of the command or function the client wants the agent to run (like `tasks/send` from Chapter 2).
|
||||
* `params`: The input data needed for that command (like the text to translate). This can be an object `{}` or a list `[]`.
|
||||
* `id`: A unique identifier the client makes up.
|
||||
|
||||
The agent then processes this request and sends back a Response matching that `id`:
|
||||
|
||||
**Success Response:**
|
||||
|
||||
```json
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"result": { ... }, // The output/result of the action
|
||||
"id": "request-123" // The SAME ID as the request
|
||||
}
|
||||
```
|
||||
|
||||
**Error Response:**
|
||||
|
||||
```json
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"error": { // Details about what went wrong
|
||||
"code": -32601,
|
||||
"message": "Method not found"
|
||||
},
|
||||
"id": "request-123" // The SAME ID as the request (or null if error was severe)
|
||||
}
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
* If the action worked, the response includes a `result` field containing the output.
|
||||
* If something went wrong, it includes an `error` field with a numeric `code` and a descriptive `message`.
|
||||
* Crucially, the `id` matches the request, so the client knows which request this response belongs to.
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant C as Client App
|
||||
participant A as Agent Server
|
||||
|
||||
C->>A: JSON-RPC Request (id: "req-abc", method: "tasks/send", params: {...})
|
||||
Note right of A: Agent parses JSON, finds method 'tasks/send'
|
||||
|
||||
alt Action Successful
|
||||
A-->>C: JSON-RPC Response (id: "req-abc", result: {Task Object})
|
||||
else Action Failed
|
||||
A-->>C: JSON-RPC Response (id: "req-abc", error: {code:..., message:...})
|
||||
end
|
||||
Note left of C: Client matches response 'id' to original request
|
||||
```
|
||||
|
||||
This simple request/response structure using JSON-RPC is the foundation of how A2A agents talk.
|
||||
|
||||
## The Vocabulary: Core Data Types
|
||||
|
||||
If JSON-RPC is the grammar, then the **Core Types** are the standard vocabulary – the specific kinds of "words" or data structures used within the `params` and `result` fields. We've already seen some of these!
|
||||
|
||||
Let's recap the most important ones:
|
||||
|
||||
* **`AgentCard`**: ([Chapter 1](01_agent_card.md)) The agent's profile. Describes its name, skills, and communication endpoint (`url`). Found in `/.well-known/agent.json`.
|
||||
* Defined in: `samples/js/src/schema.ts:AgentCard`, `samples/python/common/types.py:AgentCard`
|
||||
|
||||
* **`Task`**: ([Chapter 2](02_task.md)) The work order. Contains the unique `id`, current `status`, final `artifacts` (results), etc.
|
||||
* Defined in: `samples/python/common/types.py:Task`, `samples/js/src/schema.ts:Task`
|
||||
|
||||
* **`Message`**: Represents one turn in the conversation (either from the `user` or the `agent`). Contains one or more `Parts`.
|
||||
* Defined in: `samples/python/common/types.py:Message`, `samples/js/src/schema.ts:Message`
|
||||
|
||||
* **`Part`**: The actual content within a `Message` or `Artifact`. This is how we send different kinds of data:
|
||||
* `TextPart`: For plain text.
|
||||
* `FilePart`: For files (either included directly as encoded text (`bytes`) or as a link (`uri`)).
|
||||
* `DataPart`: For structured JSON data (like filling out a form).
|
||||
* Defined in: `samples/python/common/types.py:Part`, `samples/js/src/schema.ts:Part`
|
||||
|
||||
* **`Artifact`**: Represents an output generated by the agent during a `Task`. It also contains `Parts`. For example, if a Task was "create a presentation about cats", an Artifact might be a `FilePart` containing the presentation file.
|
||||
* Defined in: `samples/python/common/types.py:Artifact`, `samples/js/src/schema.ts:Artifact`
|
||||
|
||||
* **`TaskStatus`**: Holds the current progress state of a `Task`. Includes the `state` itself and a `timestamp`.
|
||||
* Defined in: `samples/python/common/types.py:TaskStatus`, `samples/js/src/schema.ts:TaskStatus`
|
||||
|
||||
* **`TaskState`**: The specific state within `TaskStatus`. Common values are: `submitted`, `working`, `completed`, `failed`, `canceled`.
|
||||
* Defined in: `samples/python/common/types.py:TaskState`, `samples/js/src/schema.ts:TaskState`
|
||||
|
||||
**Example: Building a `Message`**
|
||||
|
||||
Let's say the user wants to send the text "Translate 'hello' to French". This would be structured as a `Message` containing a `TextPart`:
|
||||
|
||||
```json
|
||||
// This structure would go inside the "params" of a tasks/send request
|
||||
{
|
||||
"role": "user", // Who is sending this message
|
||||
"parts": [ // List of content parts (here, just one)
|
||||
{
|
||||
"type": "text", // Specifies this is a TextPart
|
||||
"text": "Translate 'hello' to French"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
If the user also wanted to attach a document for translation, the `parts` list would have two items: a `TextPart` with instructions and a `FilePart` with the document.
|
||||
|
||||
## Putting It Together: The `tasks/send` Example
|
||||
|
||||
Remember the `tasks/send` request from Chapter 2? Let's look at the full JSON-RPC structure that the client sends over HTTP:
|
||||
|
||||
```json
|
||||
// Client Sends This (HTTP POST body to Agent's URL)
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "tasks/send", // The action: start/continue a task
|
||||
"params": { // The details (TaskSendParams structure)
|
||||
"id": "task-xyz-789", // Unique Task ID
|
||||
"message": { // The user's message
|
||||
"role": "user",
|
||||
"parts": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": "Translate 'hello' to French"
|
||||
}
|
||||
]
|
||||
}
|
||||
// Other optional params like sessionId could go here
|
||||
},
|
||||
"id": "client-req-001" // Unique ID for *this specific request*
|
||||
}
|
||||
```
|
||||
|
||||
If the agent accepts the task, it sends back a success response containing the initial `Task` object:
|
||||
|
||||
```json
|
||||
// Agent Sends This Back (HTTP Response body)
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"result": { // The result: a Task object
|
||||
"id": "task-xyz-789", // The same Task ID
|
||||
"status": { // The initial status
|
||||
"state": "submitted",
|
||||
"timestamp": "2023-10-27T10:00:00Z"
|
||||
},
|
||||
"artifacts": null, // No results yet
|
||||
"history": null // History might be omitted initially
|
||||
// Other Task fields
|
||||
},
|
||||
"id": "client-req-001" // Matches the request ID
|
||||
}
|
||||
```
|
||||
|
||||
This exchange uses the JSON-RPC grammar (`method`, `params`, `result`, `id`) and the A2A vocabulary (`Task`, `Message`, `Part`, `TaskStatus`, `TaskState`) to communicate clearly.
|
||||
|
||||
## Handling Mistakes: Errors in the Protocol
|
||||
|
||||
What if the client sends a request for a method the agent doesn't understand, like `tasks/make_coffee`? The agent would respond with a JSON-RPC error:
|
||||
|
||||
```json
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"error": {
|
||||
"code": -32601, // Standard JSON-RPC code for "Method not found"
|
||||
"message": "Method not found: tasks/make_coffee"
|
||||
},
|
||||
"id": "client-req-002"
|
||||
}
|
||||
```
|
||||
|
||||
The A2A protocol also defines some specific error codes for common agent issues:
|
||||
|
||||
* `-32001`: `Task Not Found` (e.g., client asks for status of a task ID that doesn't exist)
|
||||
* `-32002`: `Task Not Cancelable` (e.g., trying to cancel an already completed task)
|
||||
* `-32004`: `Unsupported Operation`
|
||||
|
||||
These standard errors help clients understand what went wrong in a predictable way. You can find definitions in the schema files:
|
||||
|
||||
* `samples/js/src/schema.ts` (search for `ErrorCode`)
|
||||
* `samples/python/common/types.py` (search for error classes like `MethodNotFoundError`, `TaskNotFoundError`)
|
||||
|
||||
## Conclusion
|
||||
|
||||
The A2A Protocol acts as the universal translator for AI agents. By defining:
|
||||
|
||||
1. A common **grammar** (JSON-RPC 2.0) for structuring requests and responses.
|
||||
2. A standard **vocabulary** (Core Types like `Task`, `Message`, `Part`, `Artifact`) for the data being exchanged.
|
||||
|
||||
...it allows agents built by anyone, using any framework, to communicate and collaborate effectively. It ensures that when one agent asks another to do something, the request is understood, progress can be tracked, and results can be returned in a predictable format.
|
||||
|
||||
Now that we understand the language agents speak, let's see how to build an agent that can actually listen and respond using this protocol.
|
||||
|
||||
**Next:** [Chapter 4: A2A Server Implementation](04_a2a_server_implementation.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
248
docs/Google A2A/04_a2a_server_implementation.md
Normal file
248
docs/Google A2A/04_a2a_server_implementation.md
Normal file
@@ -0,0 +1,248 @@
|
||||
---
|
||||
layout: default
|
||||
title: "A2A Server Implementation"
|
||||
parent: "Google A2A"
|
||||
nav_order: 4
|
||||
---
|
||||
|
||||
# Chapter 4: A2A Server Implementation
|
||||
|
||||
In the [previous chapter](03_a2a_protocol___core_types.md), we learned the "language" and "grammar" that AI agents use to talk to each other – the **A2A Protocol** based on JSON-RPC and its **Core Types** like `Task` and `Message`. Think of it like learning the rules of diplomacy and the standard format for official documents.
|
||||
|
||||
But just knowing the rules isn't enough. If one country (an AI agent) wants to send a diplomatic message (a [Task](02_task.md)) to another, it needs an official reception point – an embassy. How does an AI agent set up its "embassy" to receive and handle these official A2A communications?
|
||||
|
||||
That's the role of the **A2A Server Implementation**. It solves the problem of **hosting an agent** and making it **accessible** according to the A2A protocol rules.
|
||||
|
||||
## What is an A2A Server? The Agent's Embassy
|
||||
|
||||
Imagine our AI agent is like a skilled expert (a translator, a coder, an image generator) working inside a building. How do people from the outside world reach this expert and give them work? They can't just barge into the building!
|
||||
|
||||
They need to go through the official **reception desk** or **front office**. This office:
|
||||
|
||||
1. Listens for visitors (incoming requests).
|
||||
2. Understands the standard procedures for submitting work (the A2A protocol).
|
||||
3. Takes the request (the `Task`), logs it, and passes it to the right expert inside.
|
||||
4. Keeps track of the work's progress.
|
||||
5. Delivers the results back to the visitor when ready.
|
||||
6. Provides basic information about the building and its services (the [Agent Card](01_agent_card.md)).
|
||||
|
||||
An **A2A Server** is exactly like that front office or embassy for your AI agent. It's the software component that runs on a server, listens for incoming network requests, and acts as the official gateway for all A2A communication.
|
||||
|
||||
## Why Do We Need It?
|
||||
|
||||
Without a server, our AI agent is isolated. It might be brilliant at its job, but no other agent or application can interact with it using the standard A2A protocol. The A2A Server provides the necessary "infrastructure" to:
|
||||
|
||||
* **Listen:** Be constantly available on the network (at a specific URL) for incoming requests.
|
||||
* **Understand:** Decode the JSON-RPC messages and figure out what the client wants (e.g., `tasks/send`, `tasks/get`).
|
||||
* **Delegate:** Pass the work request (the `Task` details) to the actual AI logic (which might be implemented using tools like LangGraph, CrewAI, Genkit, or custom code).
|
||||
* **Manage:** Keep track of ongoing `Tasks`, their current `status` (e.g., `submitted`, `working`, `completed`), and store their results (`Artifacts`).
|
||||
* **Respond:** Send back properly formatted JSON-RPC responses (confirming task creation, providing results, or reporting errors).
|
||||
* **Advertise:** Serve the agent's `agent.json` ([Agent Card](01_agent_card.md)) so others can discover it.
|
||||
|
||||
Think of it as the bridge connecting your agent's internal world to the external world of A2A communication.
|
||||
|
||||
## Setting Up a Basic Server
|
||||
|
||||
Luckily, the `Google A2A` project provides helper libraries to make setting up a server much easier! You don't need to build the entire "embassy" from scratch. You mainly need to provide:
|
||||
|
||||
1. Your agent's specific logic (the "expert" who does the actual work).
|
||||
2. The agent's [Agent Card](01_agent_card.md) details.
|
||||
|
||||
Let's look at simplified examples in JavaScript (Node.js) and Python.
|
||||
|
||||
### JavaScript Example (using `A2AServer` from the library)
|
||||
|
||||
Imagine we have a very simple "Echo Agent" that just sends back whatever text it receives.
|
||||
|
||||
```typescript
|
||||
// File: simple-agent/index.ts (Conceptual Example)
|
||||
import { A2AServer, TaskContext, TaskYieldUpdate } from "google-a2a/server"; // Simplified import
|
||||
import * as schema from "google-a2a/schema";
|
||||
|
||||
// 1. Define the Agent's Logic (The "Expert")
|
||||
// This function handles a single task.
|
||||
async function* echoAgentLogic(
|
||||
context: TaskContext
|
||||
): AsyncGenerator<TaskYieldUpdate, schema.Task | void> {
|
||||
const inputText = context.userMessage.parts[0].text ?? "No text found";
|
||||
|
||||
// Yield a status update: "working"
|
||||
yield { state: "working", message: { role: "agent", parts: [{ text: "Echoing..." }] } };
|
||||
|
||||
// Yield the final result: "completed"
|
||||
yield {
|
||||
state: "completed",
|
||||
message: { role: "agent", parts: [{ text: `You said: ${inputText}` }] }
|
||||
};
|
||||
// (Artifacts could also be yielded here if needed)
|
||||
}
|
||||
|
||||
// 2. Define the Agent Card
|
||||
const echoAgentCard: schema.AgentCard = {
|
||||
name: "Echo Agent",
|
||||
description: "Replies with the text it receives.",
|
||||
url: "http://localhost:4000", // Where this server will run
|
||||
version: "1.0",
|
||||
capabilities: { streaming: true }, // It yields updates
|
||||
skills: [{ id: "echo", name: "Echo Text" }],
|
||||
// ... other card details
|
||||
};
|
||||
|
||||
// 3. Create and Start the Server
|
||||
const server = new A2AServer(echoAgentLogic, { card: echoAgentCard });
|
||||
server.start(4000); // Start listening on port 4000
|
||||
|
||||
console.log("Echo Agent server running on http://localhost:4000");
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Agent Logic (`echoAgentLogic`):** This is the core function defining *what* the agent does. It receives the `TaskContext` (containing the user's message) and uses `yield` to send back status updates (`working`) and the final result (`completed`). We'll dive deeper into this logic in [Chapter 6: Task Handling Logic (Server-side)](06_task_handling_logic__server_side_.md). For now, just see it as the agent's brain.
|
||||
2. **Agent Card (`echoAgentCard`):** We define the agent's public profile, including its name, description, and importantly, the `url` where the server will be listening.
|
||||
3. **Server Setup:** We create an instance of `A2AServer`, passing our agent's logic function and its card. Then, we call `server.start()` to make it listen for requests on the specified port (4000).
|
||||
|
||||
That's it! With this code, we have a running A2A server ready to accept `tasks/send` requests for our Echo Agent.
|
||||
|
||||
### Python Example (using `A2AServer` from the library)
|
||||
|
||||
Let's do the same for Python.
|
||||
|
||||
```python
|
||||
# File: simple_agent/main.py (Conceptual Example)
|
||||
from common.server import A2AServer, TaskManager # Simplified import
|
||||
from common.types import (
|
||||
AgentCard, AgentCapabilities, AgentSkill,
|
||||
Task, TaskSendParams, TaskStatus, TaskState, Message, TextPart, SendTaskResponse
|
||||
)
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 1. Define the Agent's Logic Handler (Task Manager)
|
||||
# This class bridges the server and the agent's actual logic.
|
||||
class EchoTaskManager(TaskManager): # Inherit from the base TaskManager
|
||||
async def on_send_task(self, params: TaskSendParams) -> SendTaskResponse:
|
||||
# Simulate processing the task
|
||||
input_text = params.message.parts[0].text if params.message.parts else "No text"
|
||||
logger.info(f"Echo Agent received: {input_text}")
|
||||
|
||||
# Create the final Task object (simplified for non-streaming)
|
||||
final_task = Task(
|
||||
id=params.id,
|
||||
status=TaskStatus(
|
||||
state=TaskState.COMPLETED,
|
||||
message=Message(role="agent", parts=[TextPart(text=f"You said: {input_text}")])
|
||||
),
|
||||
# ... other Task fields ...
|
||||
)
|
||||
# In a real scenario, you'd store/update the task state
|
||||
# self.tasks[params.id] = final_task # Example storage
|
||||
return SendTaskResponse(id=params.id, result=final_task)
|
||||
|
||||
# Implement other abstract methods from TaskManager (get, cancel, etc.)
|
||||
# (Skipped for brevity in this example)
|
||||
async def on_get_task(self, request): raise NotImplementedError()
|
||||
async def on_cancel_task(self, request): raise NotImplementedError()
|
||||
# ... and so on for streaming, push notifications etc.
|
||||
|
||||
# 2. Define the Agent Card
|
||||
echo_agent_card = AgentCard(
|
||||
name="Echo Agent",
|
||||
description="Replies with the text it receives.",
|
||||
url="http://localhost:5000/", # Where this server will run
|
||||
version="1.0",
|
||||
capabilities=AgentCapabilities(streaming=False), # Simplified non-streaming Python example
|
||||
skills=[AgentSkill(id="echo", name="Echo Text")],
|
||||
# ... other card details
|
||||
)
|
||||
|
||||
# 3. Create and Start the Server
|
||||
server = A2AServer(
|
||||
agent_card=echo_agent_card,
|
||||
task_manager=EchoTaskManager(), # Pass our task handler
|
||||
host="localhost",
|
||||
port=5000,
|
||||
)
|
||||
|
||||
logger.info("Starting Echo Agent server on http://localhost:5000")
|
||||
server.start()
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Agent Logic Handler (`EchoTaskManager`):** In the Python library structure, we often create a class that inherits from `TaskManager`. This class implements methods like `on_send_task` to handle specific A2A commands. Here, `on_send_task` simulates processing and returns the final `Task` object wrapped in a `SendTaskResponse`. [Chapter 6](06_task_handling_logic__server_side_.md) will cover this in detail.
|
||||
2. **Agent Card (`echo_agent_card`):** Similar to the JS example, we define the agent's profile.
|
||||
3. **Server Setup:** We create an `A2AServer` instance, providing the card and our custom `EchoTaskManager`. We then call `server.start()`.
|
||||
|
||||
Both examples achieve the same goal: they use the library's `A2AServer` class to quickly stand up a web server that listens for A2A requests, delegates the work to the provided agent logic, and handles the communication details.
|
||||
|
||||
## Under the Hood: How a Request is Processed
|
||||
|
||||
What happens when a client sends a `tasks/send` request to our running A2A server?
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant C as Client App
|
||||
participant S as A2A Server (e.g., Express/Starlette)
|
||||
participant TM as Task Manager/Handler (Your Logic Bridge)
|
||||
participant AL as Agent Logic (e.g., echoAgentLogic, CrewAI)
|
||||
participant TS as Task Store (Memory/DB)
|
||||
|
||||
C->>S: POST / (JSON-RPC: method="tasks/send", params={...})
|
||||
Note right of S: Receives HTTP POST, parses JSON-RPC
|
||||
|
||||
S->>TM: Call on_send_task / Invoke Handler(params)
|
||||
Note right of TM: Validates parameters
|
||||
|
||||
TM->>TS: Load/Create Task Record (ID: task-123)
|
||||
Note right of TS: Creates Task in 'submitted' state
|
||||
|
||||
TM->>AL: Execute Agent Logic (Input: user message)
|
||||
Note right of AL: Performs the core work (e.g., echo)
|
||||
|
||||
AL-->>TM: Returns result/Yields updates (e.g., "working", "completed")
|
||||
|
||||
loop For each update/result
|
||||
TM->>TS: Update Task Record (ID: task-123, state: working/completed, artifacts: [...])
|
||||
Note right of TS: Saves the latest task state
|
||||
alt Streaming Response (SSE)
|
||||
S-->>C: SSE Event (data: {TaskStatusUpdateEvent/Artifact})
|
||||
end
|
||||
end
|
||||
|
||||
alt Non-Streaming Response
|
||||
TM-->>S: Final Task object
|
||||
S-->>C: 200 OK (JSON-RPC: result={Final Task Object})
|
||||
else Streaming Response (SSE)
|
||||
Note over S,C: Stream ends after final event
|
||||
end
|
||||
```
|
||||
|
||||
**Steps:**
|
||||
|
||||
1. **Receive Request:** The client sends an HTTP POST request containing the JSON-RPC payload to the server's URL (e.g., `http://localhost:4000`). The web server part of the `A2AServer` (like Express in JS or Starlette in Python) receives this.
|
||||
2. **Parse & Route:** The `A2AServer` parses the JSON body, validates it's a valid JSON-RPC request, and looks at the `method` field (e.g., `tasks/send`). Based on the method, it calls the appropriate handler function (like `handleTaskSend` in the JS server or delegates to the `on_send_task` method of the `TaskManager` in Python).
|
||||
3. **Task Management:** The task handler (your `echoAgentLogic` or `EchoTaskManager`) takes over. It typically interacts with a `TaskStore` (like `InMemoryTaskStore`) to create or retrieve the [Task](02_task.md) record associated with the request's `taskId`. It updates the task's status to `submitted` or `working`.
|
||||
4. **Execute Agent Logic:** The handler calls the actual underlying AI agent code, passing the necessary input (like the user's message).
|
||||
5. **Process Results/Updates:** As the agent logic runs, it might produce results or status updates. The handler receives these.
|
||||
6. **Update Store & Respond:** The handler updates the `Task` record in the `TaskStore` with the new status or results (`Artifacts`).
|
||||
* For a simple request/response like `tasks/send` (non-streaming), it waits for the final result and sends back a single JSON-RPC response containing the completed `Task`.
|
||||
* For a streaming request like `tasks/sendSubscribe`, it sends back Server-Sent Events (SSE) for each update as they happen. ([Chapter 7: Streaming Communication (SSE)](07_streaming_communication__sse_.md) covers this).
|
||||
7. **Serve Agent Card:** Separately, if a client sends a GET request to `/.well-known/agent.json`, the `A2AServer` simply responds with the content of the `AgentCard` you provided during setup.
|
||||
|
||||
The `A2AServer` libraries (`samples/js/src/server/server.ts`, `samples/python/common/server/server.py`) handle the complexities of HTTP, JSON-RPC parsing, routing, and response formatting, letting you focus on implementing your agent's specific capabilities within the task handler ([Chapter 6](06_task_handling_logic__server_side_.md)).
|
||||
|
||||
## Conclusion
|
||||
|
||||
The **A2A Server Implementation** is the crucial component that brings your AI agent to life on the network, acting as its official "embassy" for A2A communication. It listens for requests, understands the A2A protocol, manages tasks, interacts with your agent's core logic, and sends back responses.
|
||||
|
||||
By using the provided `A2AServer` libraries, you can quickly set up a compliant server without worrying about the low-level details of web servers and JSON-RPC, allowing you to concentrate on building your agent's unique skills.
|
||||
|
||||
Now that we know how to build the *server* side (the agent's embassy), how does another application or agent *talk* to it? We need to build an **A2A Client**.
|
||||
|
||||
**Next:** [Chapter 5: A2A Client Implementation](05_a2a_client_implementation.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
265
docs/Google A2A/05_a2a_client_implementation.md
Normal file
265
docs/Google A2A/05_a2a_client_implementation.md
Normal file
@@ -0,0 +1,265 @@
|
||||
---
|
||||
layout: default
|
||||
title: "A2A Client Implementation"
|
||||
parent: "Google A2A"
|
||||
nav_order: 5
|
||||
---
|
||||
|
||||
# Chapter 5: A2A Client Implementation
|
||||
|
||||
In the [previous chapter](04_a2a_server_implementation.md), we learned how to build the "embassy" for our AI agent – the **A2A Server**. This server listens for incoming requests, acting as the official entry point for our agent according to the A2A protocol rules.
|
||||
|
||||
But how does someone actually *visit* this embassy and make a request? If you build a fantastic translation agent server, how does your chat application, or another AI agent, actually *use* it to translate text?
|
||||
|
||||
This chapter tackles that problem: **How do we build the component that *initiates* communication with an A2A agent server?**
|
||||
|
||||
This is the job of the **A2A Client Implementation**.
|
||||
|
||||
## What is an A2A Client? The Agent's Customer
|
||||
|
||||
Think about how you use the web:
|
||||
|
||||
* You want to visit a website (like `google.com`).
|
||||
* You open your **web browser** (like Chrome or Firefox).
|
||||
* You type the website's address into the browser.
|
||||
* The browser sends a request to the website's server.
|
||||
* The server sends back the webpage content.
|
||||
* Your browser receives the content and displays it to you.
|
||||
|
||||
In this scenario, your **web browser** is the **client**. It *starts* the conversation, knows how to format the request (using HTTP), sends it to the right address, and understands the server's response.
|
||||
|
||||
Similarly, an **A2A Client** is the software component that acts like that web browser, but specifically for talking to A2A agents:
|
||||
|
||||
1. **Knows the Agent's Address:** It needs the URL of the agent's A2A server (which it might get from the agent's [Agent Card](01_agent_card.md)).
|
||||
2. **Speaks the Language:** It knows how to format requests according to the [A2A Protocol & Core Types](03_a2a_protocol___core_types.md), using JSON-RPC for commands like `tasks/send`.
|
||||
3. **Initiates the Call:** It sends these requests over the network (usually via HTTP POST) to the agent's server.
|
||||
4. **Understands the Reply:** It receives the server's JSON-RPC response, checks for success or errors, and parses the results (like the initial `Task` object or streaming updates).
|
||||
|
||||
Essentially, the A2A Client is the part of your application (or another agent) that *consumes* the services offered by an A2A agent server.
|
||||
|
||||
## Why Do We Need It?
|
||||
|
||||
Your application's core logic (e.g., the chat interface, the document summarizer UI) shouldn't need to worry about the messy details of JSON-RPC formatting, HTTP headers, or handling network connections.
|
||||
|
||||
The A2A Client acts as an **intermediary** or **adapter**. It provides a cleaner, simpler way for your application code to interact with a remote A2A agent. Your application can just say, "Client, please send this message to the agent," and the client handles all the protocol details.
|
||||
|
||||
## Using an A2A Client Library
|
||||
|
||||
Just like we used `A2AServer` libraries to simplify building the server in [Chapter 4](04_a2a_server_implementation.md), the `Google A2A` project provides `A2AClient` libraries to make building the client side easier.
|
||||
|
||||
Let's see how we might use these libraries in JavaScript and Python to talk to the "Echo Agent" server we discussed previously.
|
||||
|
||||
### JavaScript Example (using `A2AClient` from the library)
|
||||
|
||||
Imagine we're building a simple command-line tool (`cli.ts`) that lets a user chat with our Echo Agent running at `http://localhost:4000`.
|
||||
|
||||
```typescript
|
||||
// File: samples/js/src/cli.ts (Simplified Snippet)
|
||||
import { A2AClient } from "./client/client.js"; // The client library
|
||||
import { TaskSendParams } from "./schema.js"; // Types for request parameters
|
||||
import crypto from "node:crypto"; // To generate IDs
|
||||
|
||||
// Agent's address (replace with your agent's URL)
|
||||
const serverUrl = "http://localhost:4000";
|
||||
|
||||
// 1. Create a client instance pointing to the agent's server
|
||||
const client = new A2AClient(serverUrl);
|
||||
|
||||
// User input from the command line
|
||||
const userInput = "Hello Echo Agent!";
|
||||
|
||||
// 2. Prepare the parameters for the 'tasks/sendSubscribe' request
|
||||
const taskId = crypto.randomUUID(); // Generate a unique ID for this task
|
||||
const params: TaskSendParams = {
|
||||
id: taskId,
|
||||
message: {
|
||||
role: "user",
|
||||
parts: [{ type: "text", text: userInput }], // The user's message
|
||||
},
|
||||
};
|
||||
|
||||
// 3. Send the request and handle the streaming response
|
||||
async function sendMessage() {
|
||||
console.log(`Sending task ${taskId} to ${serverUrl}...`);
|
||||
try {
|
||||
// Use sendTaskSubscribe for agents that support streaming
|
||||
const stream = client.sendTaskSubscribe(params);
|
||||
|
||||
// Loop through the events received from the server
|
||||
for await (const event of stream) {
|
||||
console.log("Received Agent Event:", event);
|
||||
// (In a real app, you'd parse 'event' which could be
|
||||
// TaskStatusUpdateEvent or TaskArtifactUpdateEvent)
|
||||
}
|
||||
console.log("Agent stream finished.");
|
||||
|
||||
} catch (error: any) {
|
||||
console.error("Error talking to agent:", error.message || error);
|
||||
}
|
||||
}
|
||||
|
||||
sendMessage();
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Create Client:** We import `A2AClient` and create an instance, telling it the URL of the agent server we want to talk to.
|
||||
2. **Prepare Request:** We gather the necessary information for our request: a unique `taskId` and the `message` containing the user's input, formatted according to the A2A `TaskSendParams` structure ([Chapter 3](03_a2a_protocol___core_types.md)).
|
||||
3. **Send & Handle Stream:** We call `client.sendTaskSubscribe(params)`. This method handles formatting the JSON-RPC request, sending the HTTP POST, and processing the Server-Sent Events (SSE) stream from the server ([Chapter 7: Streaming Communication (SSE)](07_streaming_communication__sse_.md)). We use a `for await...of` loop to process each event as it arrives from the agent.
|
||||
|
||||
**Example Output (Conceptual):**
|
||||
|
||||
```
|
||||
Sending task abc-123 to http://localhost:4000...
|
||||
Received Agent Event: { status: { state: 'working', message: { role: 'agent', parts: [ { text: 'Echoing...' } ] } } }
|
||||
Received Agent Event: { status: { state: 'completed', message: { role: 'agent', parts: [ { text: 'You said: Hello Echo Agent!' } ] } } }
|
||||
Agent stream finished.
|
||||
```
|
||||
|
||||
The client library takes care of the underlying network communication and event parsing.
|
||||
|
||||
### Python Example (using `A2AClient` from the library)
|
||||
|
||||
Let's create a similar command-line tool in Python (`cli/__main__.py`) talking to an agent at `http://localhost:5000`.
|
||||
|
||||
```python
|
||||
# File: samples/python/hosts/cli/__main__.py (Simplified Snippet)
|
||||
import asyncio
|
||||
from uuid import uuid4
|
||||
from common.client import A2AClient # The client library
|
||||
# Assume 'card' is the AgentCard fetched previously (see Chapter 1)
|
||||
# card = A2ACardResolver("http://localhost:5000").get_agent_card()
|
||||
|
||||
# 1. Create a client instance using the agent's card or URL
|
||||
# client = A2AClient(agent_card=card)
|
||||
client = A2AClient(url="http://localhost:5000") # Or directly use URL
|
||||
|
||||
# User input
|
||||
user_input = "Hi Python Agent!"
|
||||
|
||||
# 2. Prepare the payload (parameters) for the request
|
||||
task_id = uuid4().hex # Generate a unique Task ID
|
||||
payload = {
|
||||
"id": task_id,
|
||||
"message": {
|
||||
"role": "user",
|
||||
"parts": [{"type": "text", "text": user_input}],
|
||||
},
|
||||
}
|
||||
|
||||
# 3. Send the request and handle the response
|
||||
async def send_message():
|
||||
print(f"Sending task {task_id} to {client.url}...")
|
||||
try:
|
||||
# Use send_task_streaming if agent supports it (check card.capabilities.streaming)
|
||||
# Assuming streaming is supported here:
|
||||
response_stream = client.send_task_streaming(payload)
|
||||
async for result in response_stream:
|
||||
# result is already parsed SendTaskStreamingResponse object
|
||||
print(f"Received Agent Event: {result.model_dump_json(exclude_none=True)}")
|
||||
|
||||
print("Agent stream finished.")
|
||||
|
||||
# If NOT streaming, you'd use send_task:
|
||||
# task_result = await client.send_task(payload)
|
||||
# print(f"Received Agent Response: {task_result.model_dump_json(exclude_none=True)}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error talking to agent: {e}")
|
||||
|
||||
asyncio.run(send_message())
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Create Client:** We import `A2AClient` and create an instance, providing the agent's `url`.
|
||||
2. **Prepare Payload:** We create a Python dictionary `payload` containing the `id` and `message` parameters for the `tasks/send` or `tasks/sendSubscribe` method.
|
||||
3. **Send & Handle Stream:** We call `client.send_task_streaming(payload)`. Similar to the JS version, this handles the JSON-RPC formatting, HTTP POST, and returns an asynchronous iterator. We loop through it using `async for` to get parsed response objects (like `SendTaskStreamingResponse`) for each event. The library hides the complexity of parsing the SSE stream. If the agent didn't support streaming, we would call `client.send_task(payload)` instead, which would return the final `Task` object directly after the agent finishes.
|
||||
|
||||
**Example Output (Conceptual, streaming):**
|
||||
|
||||
```
|
||||
Sending task def-456 to http://localhost:5000...
|
||||
Received Agent Event: {"jsonrpc": "2.0", "result": {"status": {"state": "working", "message": {"role": "agent", "parts": [{"type": "text", "text": "Echoing..."}]}}}}
|
||||
Received Agent Event: {"jsonrpc": "2.0", "result": {"status": {"state": "completed", "message": {"role": "agent", "parts": [{"type": "text", "text": "You said: Hi Python Agent!"}]}}}}
|
||||
Agent stream finished.
|
||||
```
|
||||
|
||||
In both examples, the `A2AClient` library provides a high-level interface (`sendTaskSubscribe`, `send_task_streaming`, `sendTask`, `send_task`) that simplifies the process of communicating with an A2A server.
|
||||
|
||||
## Under the Hood: How the Client Works
|
||||
|
||||
What's happening inside the `A2AClient` library when you call a method like `sendTaskSubscribe`?
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant App as Your Application (e.g., CLI)
|
||||
participant Lib as A2AClient Library
|
||||
participant Net as Network (HTTP)
|
||||
participant Srv as A2A Agent Server
|
||||
|
||||
App->>Lib: Call client.sendTaskSubscribe(params)
|
||||
Note right of Lib: Generates JSON-RPC ID, Method='tasks/sendSubscribe'
|
||||
Lib->>Lib: Format JSON-RPC Request Body (using params)
|
||||
Note right of Lib: {jsonrpc:"2.0", id:"req-1", method:"...", params:{...}}
|
||||
|
||||
Lib->>Net: Send HTTP POST Request to Agent URL
|
||||
Note over Net,Srv: Request travels over the internet
|
||||
|
||||
Net->>Srv: Delivers HTTP POST Request
|
||||
Note right of Srv: Server receives request, parses JSON-RPC
|
||||
|
||||
Srv->>Srv: Processes Task (Starts internal logic)
|
||||
Note right of Srv: Switches to streaming mode (SSE)
|
||||
|
||||
Srv-->>Net: Send HTTP Response (Status 200 OK, Content-Type: text/event-stream)
|
||||
Srv-->>Net: Send SSE Event 1 (e.g., 'working' status)
|
||||
Srv-->>Net: Send SSE Event 2 (e.g., 'completed' status)
|
||||
Note right of Srv: Stream ends
|
||||
|
||||
Net-->>Lib: Delivers HTTP Response & SSE Events
|
||||
Note right of Lib: Receives streaming response
|
||||
|
||||
Lib->>Lib: Parse SSE Events (Extract JSON data from 'data:' lines)
|
||||
Lib-->>App: Yield Parsed Event 1 (as object)
|
||||
Lib-->>App: Yield Parsed Event 2 (as object)
|
||||
Note left of App: Application processes each event in the loop
|
||||
|
||||
App->>App: Loop finishes when stream ends
|
||||
```
|
||||
|
||||
**Steps:**
|
||||
|
||||
1. **Application Call:** Your code calls a method on the `A2AClient` instance (e.g., `sendTaskSubscribe`).
|
||||
2. **Format Request:** The library takes your parameters (`params`), generates a unique request ID, and constructs the full JSON-RPC request payload (a JSON object).
|
||||
3. **Send HTTP Request:** The library uses an underlying HTTP client (like `fetch` in browsers/Node.js or `httpx` in Python) to send an HTTP POST request to the agent server's URL. It sets the correct headers (`Content-Type: application/json`, `Accept: text/event-stream` for streaming).
|
||||
4. **Server Processing:** The A2A server receives the request, processes it (as described in [Chapter 4](04_a2a_server_implementation.md)), and starts sending back a response. For streaming, this is an HTTP response with a `text/event-stream` content type, followed by individual Server-Sent Events (SSE).
|
||||
5. **Receive Response:** The client library's HTTP client receives the response.
|
||||
6. **Parse Response/Stream:**
|
||||
* **Non-streaming (`sendTask`):** It waits for the full response, parses the JSON body, checks for JSON-RPC level errors, and extracts the `result` field (e.g., the final `Task` object).
|
||||
* **Streaming (`sendTaskSubscribe`):** It processes the incoming SSE stream, parsing the `data:` lines from each event, converting the JSON text into objects, and yielding these objects back to your application code via the async iterator.
|
||||
7. **Return/Yield Result:** The library returns the parsed result (for non-streaming) or yields the parsed events (for streaming) to your application code.
|
||||
|
||||
The client libraries (like `samples/js/src/client/client.ts` and `samples/python/common/client/client.py`) contain internal helper functions (e.g., `_makeHttpRequest`, `_handleJsonResponse`, `_handleStreamingResponse` in the JS client; `_send_request` in the Python client) to manage these steps.
|
||||
|
||||
## Conclusion
|
||||
|
||||
The **A2A Client** is the component that *initiates* conversations with A2A agent servers. It acts on behalf of your application or another agent, translating simple method calls (like "send this message") into correctly formatted A2A protocol requests (JSON-RPC over HTTP).
|
||||
|
||||
It handles the complexities of:
|
||||
|
||||
* Knowing the agent's address (`url`).
|
||||
* Formatting requests (`tasks/send`, `tasks/sendSubscribe`).
|
||||
* Sending them over the network.
|
||||
* Parsing responses (JSON results or streaming SSE events).
|
||||
* Handling errors.
|
||||
|
||||
By using the provided `A2AClient` libraries, you can easily integrate A2A communication into your applications without needing deep knowledge of the underlying protocol mechanics. You create a client, prepare your data, and call the appropriate method.
|
||||
|
||||
Now that we've seen both the server ([Chapter 4](04_a2a_server_implementation.md)) and the client side of the A2A interaction, let's dive deeper into how the *server* actually processes the tasks it receives from the client.
|
||||
|
||||
**Next:** [Chapter 6: Task Handling Logic (Server-side)](06_task_handling_logic__server_side_.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
294
docs/Google A2A/06_task_handling_logic__server_side_.md
Normal file
294
docs/Google A2A/06_task_handling_logic__server_side_.md
Normal file
@@ -0,0 +1,294 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Task Handling Logic (Server-side)"
|
||||
parent: "Google A2A"
|
||||
nav_order: 6
|
||||
---
|
||||
|
||||
# Chapter 6: Task Handling Logic (Server-side)
|
||||
|
||||
Welcome back! In [Chapter 5: A2A Client Implementation](05_a2a_client_implementation.md), we learned how to build the "customer" side – the **A2A Client** – that sends requests to an agent's server. We saw how it formats messages and talks to the agent's "embassy" ([A2A Server Implementation](04_a2a_server_implementation.md)).
|
||||
|
||||
But what happens *inside* the embassy once a request arrives? Who actually reads the request, does the work, and prepares the response?
|
||||
|
||||
This chapter focuses on the **Task Handling Logic**. It solves the problem: **What is the core "brain" inside the A2A Server that performs the requested work?**
|
||||
|
||||
## The Agent's "Brain" - The Chef in the Kitchen
|
||||
|
||||
Imagine our A2A Server ([Chapter 4](04_a2a_server_implementation.md)) is like a restaurant's front desk. It takes orders ([Tasks](02_task.md)) from customers ([A2A Clients](05_a2a_client_implementation.md)) using the standard A2A language ([A2A Protocol & Core Types](03_a2a_protocol___core_types.md)).
|
||||
|
||||
But the front desk doesn't cook the food! It passes the order to the **kitchen**, where the **chef** takes over. The chef:
|
||||
|
||||
1. **Reads the Order:** Understands what the customer wants (e.g., "Translate 'hello' to French").
|
||||
2. **Prepares the Dish:** Uses ingredients (data), tools (APIs, databases), and expertise (AI models like Gemini) to fulfill the request.
|
||||
3. **Updates the Waiter:** Might send updates back like "Order is being prepared" (`working` state).
|
||||
4. **Finishes the Dish:** Creates the final product (the translated text "Bonjour le monde").
|
||||
5. **Plates the Dish:** Packages the result (`Artifacts`) and signals completion (`completed` state).
|
||||
|
||||
The **Task Handling Logic** is the "chef" inside your A2A Server. It's the core piece of code that contains the agent's specific skills and business logic.
|
||||
|
||||
## What Does the Task Handler Do?
|
||||
|
||||
When the A2A Server receives a request like `tasks/send`, it hands off the details to the Task Handling Logic. This logic is responsible for:
|
||||
|
||||
* **Understanding the Request:** Receiving the user's `Message` and any other context associated with the `Task`.
|
||||
* **Executing the Work:**
|
||||
* Calling AI models (like Gemini via libraries like Genkit) for generation, analysis, etc.
|
||||
* Using tools (like calling a weather API, searching a database, or using specific libraries like CrewAI or LangGraph).
|
||||
* Performing custom calculations or data manipulation.
|
||||
* **Managing State:** Signaling progress by updating the `Task`'s status (e.g., changing from `submitted` to `working`).
|
||||
* **Generating Output:** Creating the final results (`Artifacts`) or intermediate updates.
|
||||
* **Handling Errors:** Reporting back if something goes wrong (`failed` state).
|
||||
|
||||
## Implementing the "Brain"
|
||||
|
||||
The `Google A2A` libraries provide structures to help you implement this logic. Let's look at simplified examples.
|
||||
|
||||
### JavaScript Example (Async Generator Handler)
|
||||
|
||||
In JavaScript, the task handler is often an `async function*` (an asynchronous generator). It receives `TaskContext` and uses `yield` to send back updates.
|
||||
|
||||
Imagine a simple agent that pretends to call an AI to greet the user.
|
||||
|
||||
```typescript
|
||||
// File: samples/js/src/server/handler.ts (Conceptual Example of a Handler)
|
||||
import * as schema from "../schema.js"; // For types like Task, Message, etc.
|
||||
import { TaskContext, TaskYieldUpdate } from "./handler.js"; // Handler types
|
||||
|
||||
// The Task Handling Logic for our 'Greeter Agent'
|
||||
async function* greeterAgentHandler(
|
||||
context: TaskContext
|
||||
): AsyncGenerator<TaskYieldUpdate> { // It yields updates
|
||||
|
||||
// 1. Get the user's name from the input message
|
||||
const userMessageText = context.userMessage.parts[0].text ?? "there";
|
||||
const userName = userMessageText.split(" ").pop(); // Simple extraction
|
||||
|
||||
// 2. Signal that work is starting
|
||||
console.log(`[GreeterAgent] Task ${context.task.id}: Starting`);
|
||||
yield {
|
||||
state: "working", // Update status to 'working'
|
||||
message: { role: "agent", parts: [{ text: "Thinking..." }] }
|
||||
};
|
||||
|
||||
// 3. Simulate calling an AI (the "chef" uses an "ingredient")
|
||||
await new Promise(resolve => setTimeout(resolve, 500)); // Pretend work
|
||||
const greeting = `Hello, ${userName}! Welcome.`;
|
||||
|
||||
// 4. Signal completion and provide the final message
|
||||
console.log(`[GreeterAgent] Task ${context.task.id}: Completing`);
|
||||
yield {
|
||||
state: "completed", // Update status to 'completed'
|
||||
message: { role: "agent", parts: [{ text: greeting }] }
|
||||
};
|
||||
// For more complex results, we could yield Artifacts here too.
|
||||
}
|
||||
|
||||
// This handler function (`greeterAgentHandler`) would be passed
|
||||
// to the A2AServer constructor, like in Chapter 4.
|
||||
// const server = new A2AServer(greeterAgentHandler, { card: greeterAgentCard });
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Input:** The function receives `context` which contains the current `task` and the `userMessage`. We extract the user's name.
|
||||
2. **Signal Working:** It `yield`s an update object setting the `state` to `working` and providing an optional status message. The A2A Server receives this yield.
|
||||
3. **Do Work:** It simulates calling an AI to generate a greeting. In real agents (like `samples/js/src/agents/coder/index.ts` or `samples/js/src/agents/movie-agent/index.ts`), this is where you'd interact with Genkit, external APIs, or other tools.
|
||||
4. **Signal Completion:** It `yield`s the final update, setting the `state` to `completed` and including the greeting in the agent's `message`.
|
||||
|
||||
### Python Example (TaskManager with Streaming)
|
||||
|
||||
In Python, you typically subclass `TaskManager` and implement methods like `on_send_task` or `on_send_task_subscribe`. For streaming responses, `on_send_task_subscribe` can also be an async generator.
|
||||
|
||||
Let's create a similar Greeter Agent.
|
||||
|
||||
```python
|
||||
# File: my_agent/task_manager.py (Conceptual Example)
|
||||
import asyncio
|
||||
from typing import Union, AsyncIterable
|
||||
from common.server.task_manager import InMemoryTaskManager # Base class
|
||||
from common.types import (
|
||||
Task, TaskSendParams, TaskStatus, TaskState, Message, TextPart,
|
||||
SendTaskStreamingRequest, SendTaskStreamingResponse, TaskStatusUpdateEvent,
|
||||
JSONRPCResponse
|
||||
)
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GreeterTaskManager(InMemoryTaskManager): # Inherit from base
|
||||
|
||||
# Handle non-streaming requests (optional)
|
||||
async def on_send_task(self, request):
|
||||
# ... implementation for non-streaming ...
|
||||
raise NotImplementedError()
|
||||
|
||||
# Handle STREAMING requests using an async generator
|
||||
async def on_send_task_subscribe(
|
||||
self, request: SendTaskStreamingRequest
|
||||
) -> Union[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]:
|
||||
|
||||
task_params: TaskSendParams = request.params
|
||||
task_id = task_params.id
|
||||
logger.info(f"[GreeterAgent] Task {task_id}: Received")
|
||||
|
||||
# 0. Set up internal queue for SSE events
|
||||
# (Handled by library/base class, conceptually)
|
||||
|
||||
# 1. Update store & get initial Task object
|
||||
await self.upsert_task(task_params) # Store the task initially
|
||||
|
||||
# --- Start the async generator part ---
|
||||
async def _process_task() -> AsyncIterable[SendTaskStreamingResponse]:
|
||||
try:
|
||||
# 2. Get user name from input
|
||||
user_message_text = task_params.message.parts[0].text if task_params.message.parts else "there"
|
||||
user_name = user_message_text.split(" ").pop()
|
||||
|
||||
# 3. Signal working (Yield a status update event)
|
||||
working_status = TaskStatus(state=TaskState.WORKING, message=Message(role="agent", parts=[TextPart(text="Thinking...")]))
|
||||
working_event = TaskStatusUpdateEvent(id=task_id, status=working_status, final=False)
|
||||
yield SendTaskStreamingResponse(id=request.id, result=working_event)
|
||||
# Update internal store (optional, depending on base class)
|
||||
await self.update_store(task_id, working_status, artifacts=None)
|
||||
|
||||
# 4. Simulate AI call
|
||||
await asyncio.sleep(0.5)
|
||||
greeting = f"Hello, {user_name}! Welcome from Python."
|
||||
|
||||
# 5. Signal completion (Yield final status update event)
|
||||
completed_status = TaskStatus(state=TaskState.COMPLETED, message=Message(role="agent", parts=[TextPart(text=greeting)]))
|
||||
completed_event = TaskStatusUpdateEvent(id=task_id, status=completed_status, final=True) # final=True
|
||||
yield SendTaskStreamingResponse(id=request.id, result=completed_event)
|
||||
# Update internal store
|
||||
await self.update_store(task_id, completed_status, artifacts=None)
|
||||
|
||||
logger.info(f"[GreeterAgent] Task {task_id}: Completed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[GreeterAgent] Task {task_id}: Error - {e}")
|
||||
# Signal failure
|
||||
failed_status = TaskStatus(state=TaskState.FAILED, message=Message(role="agent", parts=[TextPart(text=f"Error: {e}")]))
|
||||
failed_event = TaskStatusUpdateEvent(id=task_id, status=failed_status, final=True)
|
||||
yield SendTaskStreamingResponse(id=request.id, result=failed_event)
|
||||
await self.update_store(task_id, failed_status, artifacts=None)
|
||||
|
||||
# Return the async generator
|
||||
return _process_task()
|
||||
|
||||
# This GreeterTaskManager class would be passed to the A2AServer
|
||||
# server = A2AServer(task_manager=GreeterTaskManager(), ...)
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Inheritance:** We create `GreeterTaskManager` inheriting from `InMemoryTaskManager` (which provides basic task storage).
|
||||
2. **`on_send_task_subscribe`:** This method handles streaming requests. It first stores the initial task details.
|
||||
3. **Async Generator (`_process_task`):** The core logic is inside an inner `async def` that returns an `AsyncIterable`. This allows us to `yield` updates over time, similar to the JavaScript generator.
|
||||
4. **Yielding Events:** Instead of yielding raw status updates, we yield `SendTaskStreamingResponse` objects containing `TaskStatusUpdateEvent`. The `final=True` flag marks the last event. ([Chapter 7: Streaming Communication (SSE)](07_streaming_communication__sse_.md) covers SSE in detail).
|
||||
5. **Updating Store:** We explicitly call `self.update_store` after yielding events to keep the task's state consistent in our `InMemoryTaskManager`.
|
||||
6. **Error Handling:** A `try...except` block handles potential errors and yields a `failed` state event.
|
||||
|
||||
Real-world Python agents might use frameworks like CrewAI (`samples/python/agents/crewai/agent.py`) or LangGraph (`samples/python/agents/langgraph/agent.py`) within these handler methods to orchestrate more complex logic.
|
||||
|
||||
## Key Inputs to the Handler
|
||||
|
||||
The handler needs information to do its job. The context typically includes:
|
||||
|
||||
* **Task Details:** The current `Task` object, including its unique `id`, current `status`, and any `metadata`.
|
||||
* **User Message:** The specific `Message` from the user that triggered this work (containing `Parts` like text or files).
|
||||
* **History (Optional):** Previous `Messages` exchanged within this `Task` for conversational context.
|
||||
* **Cancellation Check:** A way to see if the client has requested to cancel the task.
|
||||
|
||||
These inputs are bundled in `TaskContext` (JS) or passed as parameters to the `TaskManager` methods (Python).
|
||||
|
||||
## Signaling Progress and Delivering Results
|
||||
|
||||
* **Status Updates:** Yielding status changes (`working`, `input-required`, `completed`, `failed`) keeps the client informed, especially for long-running tasks. This often includes a `Message` from the agent (e.g., "Looking up information...", "Please provide the city name.").
|
||||
* **Artifacts (Results):** For tasks that produce distinct outputs (like files, structured data, or images), the handler yields `Artifact` objects. These artifacts are collected and associated with the `Task`.
|
||||
* JS: Yield `schema.Artifact` objects directly. (`samples/js/src/agents/coder/index.ts`)
|
||||
* Python (Streaming): Yield `SendTaskStreamingResponse` containing `TaskArtifactUpdateEvent`. (`demo/ui/service/server/adk_host_manager.py` shows `process_artifact_event`)
|
||||
|
||||
## Connecting to the Server
|
||||
|
||||
As shown in [Chapter 4](04_a2a_server_implementation.md), you connect your Task Handling Logic to the `A2AServer` during its setup:
|
||||
|
||||
* **JS:** Pass the async generator function (`greeterAgentHandler`) to the `A2AServer` constructor.
|
||||
* **Python:** Pass an instance of your `TaskManager` subclass (`GreeterTaskManager()`) to the `A2AServer` constructor.
|
||||
|
||||
The server then knows exactly which "chef" to call when an order comes in.
|
||||
|
||||
## Under the Hood: Server Invoking the Handler
|
||||
|
||||
Let's visualize how the server uses the handler when a streaming `tasks/sendSubscribe` request arrives:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant C as A2A Client
|
||||
participant S as A2A Server
|
||||
participant TH as Task Handler (e.g., greeterAgentHandler)
|
||||
participant AI as AI Model/Tool (Optional)
|
||||
participant TS as Task Store
|
||||
|
||||
C->>S: POST / (JSON-RPC: method="tasks/sendSubscribe", params={...})
|
||||
Note right of S: Receives request, parses JSON-RPC
|
||||
|
||||
S->>TS: Create/Get Task Record (ID: task-123)
|
||||
TS-->>S: Task Object (state: submitted)
|
||||
|
||||
S->>TH: Invoke handler(context) / Call on_send_task_subscribe()
|
||||
Note right of TH: Handler starts executing
|
||||
|
||||
TH->>TS: Update Task (state: working)
|
||||
TH-->>S: yield {state: "working", ...} / yield TaskStatusUpdateEvent(working)
|
||||
Note right of S: Receives yielded update
|
||||
|
||||
S-->>C: Send SSE Event (data: TaskStatusUpdateEvent - working)
|
||||
Note left of C: Client receives 'working' status
|
||||
|
||||
alt Handler needs AI/Tool
|
||||
TH->>AI: Request generation("greet user")
|
||||
AI-->>TH: Response ("Hello there!")
|
||||
end
|
||||
|
||||
TH->>TS: Update Task (state: completed, message: "Hello...")
|
||||
TH-->>S: yield {state: "completed", ...} / yield TaskStatusUpdateEvent(completed, final=True)
|
||||
Note right of S: Receives final yielded update
|
||||
|
||||
S-->>C: Send SSE Event (data: TaskStatusUpdateEvent - completed, final=True)
|
||||
Note left of C: Client receives 'completed' status, stream ends
|
||||
```
|
||||
|
||||
**Steps:**
|
||||
|
||||
1. **Request In:** The `A2A Server` receives the `tasks/sendSubscribe` request.
|
||||
2. **Task Prep:** It looks up or creates the `Task` in the `Task Store`.
|
||||
3. **Invoke Handler:** It calls your registered Task Handling Logic (e.g., `greeterAgentHandler` or `GreeterTaskManager.on_send_task_subscribe`), providing the necessary context.
|
||||
4. **Handler Executes & Yields:** Your handler runs. When it `yield`s a status update (like `working`):
|
||||
* It might update the `Task Store`.
|
||||
* It returns the update to the `A2AServer`.
|
||||
5. **Server Sends Update:** The `A2AServer` formats the update as a Server-Sent Event (SSE) and sends it to the `A2A Client`.
|
||||
6. **(Optional) External Calls:** The handler might call external services (AI, tools).
|
||||
7. **Handler Yields Final Result:** When the handler is done, it `yield`s the final `completed` (or `failed`) status update (often marked as `final=True` in streaming).
|
||||
8. **Server Sends Final Update:** The `A2AServer` sends the final SSE event to the client, closing the stream.
|
||||
|
||||
Key files involved:
|
||||
|
||||
* **JS Handler Definition:** `samples/js/src/server/handler.ts` (defines `TaskContext`, `TaskYieldUpdate`, `TaskHandler`)
|
||||
* **JS Agent Example:** `samples/js/src/agents/coder/index.ts`, `samples/js/src/agents/movie-agent/index.ts`
|
||||
* **Python Base Manager:** `samples/python/common/server/task_manager.py` (defines `TaskManager`, `InMemoryTaskManager`)
|
||||
* **Python Agent Examples:** `samples/python/agents/crewai/agent.py`, `samples/python/agents/langgraph/agent.py`, `demo/ui/service/server/adk_host_manager.py` (more complex, uses ADK)
|
||||
|
||||
## Conclusion
|
||||
|
||||
The **Task Handling Logic** is the heart of your A2A agent – the "chef" that actually does the work. It receives requests via the `A2AServer`, interacts with AI models or tools, manages the task's state transitions, and generates the final response or intermediate updates.
|
||||
|
||||
By implementing this logic (often as an async generator in JS or a `TaskManager` subclass in Python) and connecting it to your server, you define your agent's unique capabilities and how it fulfills the tasks requested by clients.
|
||||
|
||||
We saw how handlers can `yield` updates. But how do these updates actually get sent back to the client in real-time? Let's dive into the mechanism used for that: Streaming Communication using Server-Sent Events (SSE).
|
||||
|
||||
**Next:** [Chapter 7: Streaming Communication (SSE)](07_streaming_communication__sse_.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
379
docs/Google A2A/07_streaming_communication__sse_.md
Normal file
379
docs/Google A2A/07_streaming_communication__sse_.md
Normal file
@@ -0,0 +1,379 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Streaming Communication (SSE)"
|
||||
parent: "Google A2A"
|
||||
nav_order: 7
|
||||
---
|
||||
|
||||
# Chapter 7: Streaming Communication (SSE)
|
||||
|
||||
In the [previous chapter](06_task_handling_logic__server_side_.md), we built the "brain" of our agent – the **Task Handling Logic**. We saw how this logic can `yield` status updates or partial results as it works on a task. That's great, but how do those updates actually get back to the client in real-time? If the agent is writing a long story, how does the user see it paragraph by paragraph instead of waiting minutes for the whole thing?
|
||||
|
||||
This chapter dives into **Streaming Communication** using **Server-Sent Events (SSE)**. It solves the problem: **How can the server send real-time updates to the client for tasks that take time?**
|
||||
|
||||
## The Problem: Waiting is Boring!
|
||||
|
||||
Imagine you ask your AI agent assistant to plan a detailed weekend trip to a new city. This involves looking up flights, hotels, attractions, restaurants, checking opening times, maybe even booking things. This could take a minute or two!
|
||||
|
||||
If the communication was just a simple request and response, your application would send the request "Plan my trip" and then... wait. And wait. And wait. Finally, after two minutes, it would get the complete plan back. That's not a very engaging experience! You'd wonder if it was even working.
|
||||
|
||||
Wouldn't it be better if the agent could send updates like:
|
||||
|
||||
* "Okay, planning your trip to Paris..."
|
||||
* "Found potential flights..."
|
||||
* "Checking hotel availability near the Eiffel Tower..."
|
||||
* "Here's a draft itinerary..."
|
||||
* "Okay, the final plan is ready!"
|
||||
|
||||
This way, the user sees progress and knows the agent is actively working.
|
||||
|
||||
## The Solution: Streaming with Server-Sent Events (SSE)
|
||||
|
||||
This real-time update mechanism is called **streaming**. Instead of one big response at the end, the server *streams* multiple small messages back to the client over a single connection.
|
||||
|
||||
The Google A2A protocol uses a standard web technology called **Server-Sent Events (SSE)** to achieve this.
|
||||
|
||||
**Analogy: Package Tracking**
|
||||
|
||||
Think about ordering a package online:
|
||||
|
||||
* **Regular Request/Response:** You place the order, and the *only* update you get is when the package finally arrives at your door.
|
||||
* **Streaming (SSE):** You place the order, and you get *live updates*: "Order confirmed," "Package shipped," "Out for delivery," "Delivered."
|
||||
|
||||
SSE works like that live tracking. The client makes one request, and the server keeps that connection open, pushing updates (events) whenever something new happens.
|
||||
|
||||
**Key points about SSE:**
|
||||
|
||||
* **Server Pushes:** The server sends data to the client whenever it wants (after the initial connection).
|
||||
* **One-Way:** Data primarily flows from Server -> Client.
|
||||
* **Standard Web Tech:** It's built on top of regular HTTP.
|
||||
|
||||
## How Streaming Works in A2A
|
||||
|
||||
1. **Client Initiates:** The [A2A Client](05_a2a_client_implementation.md) uses a specific JSON-RPC method: `tasks/sendSubscribe` (instead of the regular `tasks/send`). This tells the server, "I want to start this task, AND I want to receive live updates."
|
||||
2. **Server Acknowledges:** The [A2A Server](04_a2a_server_implementation.md) receives the `tasks/sendSubscribe` request. It prepares to handle a streaming response.
|
||||
3. **Special Response Header:** The server sends back an initial HTTP response with a special header: `Content-Type: text/event-stream`. This tells the client, "Get ready for a stream of events!" The connection stays open.
|
||||
4. **Handler Yields:** Inside the server, the [Task Handling Logic](06_task_handling_logic__server_side_.md) (the async generator) starts working. When it `yield`s a status update (like `state: 'working'`) or an artifact:
|
||||
* The `A2AServer` library catches this yielded value.
|
||||
5. **Server Sends Event:** The `A2AServer` formats the yielded data into an SSE message (more on the format later) and sends it down the open connection to the client.
|
||||
6. **Repeat:** Steps 4 and 5 repeat every time the handler yields something new.
|
||||
7. **Stream Ends:** When the handler finishes (or yields a final state like `completed` or `failed`), the server sends a final event (often marked with `final: true`) and then closes the connection.
|
||||
|
||||
## Server-Side: Sending the Stream
|
||||
|
||||
Let's peek at how the `A2AServer` library handles yielded values from your task handler ([Chapter 6](06_task_handling_logic__server_side_.md)) to send SSE events.
|
||||
|
||||
### JavaScript Example (Conceptual)
|
||||
|
||||
The `A2AServer` in `samples/js/src/server/server.ts` uses the underlying Express.js response object (`res`) to write SSE messages.
|
||||
|
||||
```typescript
|
||||
// File: samples/js/src/server/server.ts (Simplified Snippet inside handleTaskSendSubscribe)
|
||||
|
||||
// --- Setup SSE ---
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "text/event-stream", // Tell client it's SSE
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
});
|
||||
|
||||
// Function to send a single SSE event
|
||||
const sendEvent = (eventData: schema.JSONRPCResponse) => {
|
||||
// Format: "data: <json string>\n\n"
|
||||
res.write(`data: ${JSON.stringify(eventData)}\n\n`);
|
||||
};
|
||||
|
||||
// --- Process generator yields ---
|
||||
for await (const yieldValue of generator) {
|
||||
// ... (Apply update, save to store etc. - see Chapter 6) ...
|
||||
|
||||
// Create the JSON payload (TaskStatusUpdateEvent or TaskArtifactUpdateEvent)
|
||||
const eventPayload = createEventFromYield(taskId, yieldValue, isFinal);
|
||||
|
||||
// Wrap payload in a JSON-RPC Response structure
|
||||
const rpcResponse = createSuccessResponse(req.id, eventPayload);
|
||||
|
||||
// Send the formatted event down the stream
|
||||
sendEvent(rpcResponse);
|
||||
|
||||
if (isFinal) break; // Stop if handler yielded a final state
|
||||
}
|
||||
|
||||
// --- End Stream ---
|
||||
if (!res.writableEnded) {
|
||||
res.end(); // Close the connection
|
||||
}
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Headers:** The server first sends HTTP headers to establish the SSE connection (`Content-Type: text/event-stream`).
|
||||
2. **`sendEvent` Helper:** A function is defined to format the JSON data correctly (`data: ...\n\n`) and write it to the response stream (`res.write`).
|
||||
3. **Looping:** The code loops through the values yielded by your `TaskHandler` generator.
|
||||
4. **Formatting:** Each yielded value is turned into a standard A2A event payload (`TaskStatusUpdateEvent` or `TaskArtifactUpdateEvent`) wrapped in a JSON-RPC response structure.
|
||||
5. **Sending:** `sendEvent` is called to push the formatted message to the client.
|
||||
6. **Closing:** Once the loop finishes (or a final event is sent), `res.end()` closes the connection.
|
||||
|
||||
### Python Example (Conceptual)
|
||||
|
||||
The Python `A2AServer` in `samples/python/common/server/server.py` uses the `sse-starlette` library and `EventSourceResponse` to handle the streaming.
|
||||
|
||||
```python
|
||||
# File: samples/python/common/server/server.py (Simplified Snippet _create_response)
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
from typing import AsyncIterable
|
||||
|
||||
# ... inside _process_request ...
|
||||
result = await self.task_manager.on_send_task_subscribe(json_rpc_request)
|
||||
return self._create_response(result) # Pass the generator to _create_response
|
||||
|
||||
# ... inside A2AServer ...
|
||||
def _create_response(self, result: Any) -> JSONResponse | EventSourceResponse:
|
||||
if isinstance(result, AsyncIterable):
|
||||
# If the handler returned an async generator...
|
||||
|
||||
async def event_generator(generator_result) -> AsyncIterable[dict[str, str]]:
|
||||
# Wrap the generator to format SSE messages
|
||||
async for item in generator_result:
|
||||
# item is expected to be a JSONRPCResponse containing the event payload
|
||||
yield {"data": item.model_dump_json(exclude_none=True)}
|
||||
|
||||
# Use EventSourceResponse to handle the streaming
|
||||
return EventSourceResponse(event_generator(result))
|
||||
# ... (handle non-streaming JSONResponse) ...
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Generator:** The `on_send_task_subscribe` method in your `TaskManager` ([Chapter 6](06_task_handling_logic__server_side_.md)) returns an `AsyncIterable` (an async generator).
|
||||
2. **`EventSourceResponse`:** The `A2AServer` detects this generator and wraps it in `EventSourceResponse`.
|
||||
3. **Formatting:** The inner `event_generator` function iterates through the items yielded by your handler (which are already formatted as `SendTaskStreamingResponse` objects containing the event payload). It takes each item, converts it to a JSON string, and yields it in the `{"data": ...}` format expected by `EventSourceResponse`.
|
||||
4. **Automatic Streaming:** `EventSourceResponse` automatically handles sending the correct SSE headers and writing each yielded `data` chunk to the client over the open connection.
|
||||
|
||||
In both cases, the library handles the details of SSE formatting, letting your `TaskHandler` focus just on yielding the updates.
|
||||
|
||||
## Client-Side: Receiving the Stream
|
||||
|
||||
How does the `A2AClient` handle these incoming events?
|
||||
|
||||
### JavaScript Example (Conceptual)
|
||||
|
||||
The `A2AClient` in `samples/js/src/client/client.ts` uses the browser's `fetch` API and `ReadableStream` to process the SSE events.
|
||||
|
||||
```typescript
|
||||
// File: samples/js/src/client/client.ts (Simplified Snippet inside _handleStreamingResponse)
|
||||
|
||||
async function* _handleStreamingResponse(response: Response): AsyncIterable<any> {
|
||||
if (!response.ok || !response.body) {
|
||||
// Handle HTTP errors before trying to stream
|
||||
throw new Error(`HTTP error ${response.status}`);
|
||||
}
|
||||
|
||||
// Get a reader for the response body stream (decoded as text)
|
||||
const reader = response.body
|
||||
.pipeThrough(new TextDecoderStream())
|
||||
.getReader();
|
||||
let buffer = ""; // To handle partial messages
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read(); // Read next chunk
|
||||
|
||||
if (done) break; // Stream finished
|
||||
|
||||
buffer += value; // Add chunk to buffer
|
||||
const lines = buffer.split("\n\n"); // Split into potential messages
|
||||
buffer = lines.pop() || ""; // Keep any trailing partial message
|
||||
|
||||
for (const message of lines) {
|
||||
if (message.startsWith("data: ")) { // Check for SSE data line
|
||||
const dataLine = message.substring("data: ".length);
|
||||
try {
|
||||
// Parse the JSON data from the line
|
||||
const parsedData = JSON.parse(dataLine);
|
||||
// parsedData is expected to be a JSONRPCResponse
|
||||
if (parsedData.result) {
|
||||
// Yield the actual event payload (TaskStatusUpdateEvent, etc.)
|
||||
yield parsedData.result;
|
||||
} else if (parsedData.error) {
|
||||
// Handle errors received in the stream
|
||||
throw new RpcError(parsedData.error.code, parsedData.error.message);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("Failed to parse SSE data:", dataLine, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock(); // Clean up the reader
|
||||
}
|
||||
}
|
||||
|
||||
// Usage (from Chapter 5):
|
||||
// const stream = client.sendTaskSubscribe(params);
|
||||
// for await (const event of stream) {
|
||||
// console.log("Received Agent Event:", event);
|
||||
// }
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Reader:** It gets a `ReadableStreamDefaultReader` to read the response body chunk by chunk.
|
||||
2. **Buffering:** It uses a `buffer` to accumulate incoming text, because SSE messages (`data: ...\n\n`) might arrive split across multiple network packets.
|
||||
3. **Splitting Messages:** It splits the buffer by the SSE message separator (`\n\n`).
|
||||
4. **Parsing `data:`:** It looks for lines starting with `data: `, extracts the JSON string after it, and parses it.
|
||||
5. **Yielding Payload:** It extracts the `result` field from the parsed JSON-RPC response (this `result` contains the `TaskStatusUpdateEvent` or `TaskArtifactUpdateEvent`) and `yield`s it to the application code (the `for await...of` loop).
|
||||
6. **Error Handling:** It includes checks for HTTP errors and JSON parsing errors.
|
||||
|
||||
### Python Example (Conceptual)
|
||||
|
||||
The Python `A2AClient` in `samples/python/common/client/client.py` uses the `httpx-sse` library.
|
||||
|
||||
```python
|
||||
# File: samples/python/common/client/client.py (Simplified Snippet send_task_streaming)
|
||||
import httpx
|
||||
from httpx_sse import connect_sse # SSE client library
|
||||
import json
|
||||
|
||||
async def send_task_streaming(self, payload: dict) -> AsyncIterable[SendTaskStreamingResponse]:
|
||||
request = SendTaskStreamingRequest(params=payload)
|
||||
request_json = request.model_dump(exclude_none=True)
|
||||
|
||||
# Use httpx client and connect_sse context manager
|
||||
async with httpx.AsyncClient(timeout=None) as client:
|
||||
try:
|
||||
async with connect_sse(client, "POST", self.url, json=request_json) as event_source:
|
||||
# Iterate through Server-Sent Events provided by the library
|
||||
async for sse in event_source.aiter_sse():
|
||||
if sse.event == "message": # Default event type
|
||||
try:
|
||||
# Parse the JSON data from the event
|
||||
response_data = json.loads(sse.data)
|
||||
# Validate and yield the parsed response object
|
||||
yield SendTaskStreamingResponse(**response_data)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Warning: Could not decode SSE data: {sse.data}")
|
||||
except Exception as e: # Catch validation errors too
|
||||
print(f"Warning: Error processing SSE data: {e} - Data: {sse.data}")
|
||||
except httpx.RequestError as e:
|
||||
raise A2AClientHTTPError(400, str(e)) from e
|
||||
# Handle other potential errors like connection issues
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **`httpx-sse`:** It uses the `connect_sse` function from `httpx-sse`. This function handles the underlying HTTP connection and SSE parsing.
|
||||
2. **Iteration:** `event_source.aiter_sse()` provides an async iterator that yields individual SSE events as they arrive.
|
||||
3. **Parsing:** Inside the loop, `sse.data` contains the JSON string from the `data:` line. We parse it using `json.loads()`.
|
||||
4. **Validation & Yield:** We validate the parsed data against the `SendTaskStreamingResponse` model (which expects the `result` to be an event payload) and `yield` the resulting object to the application code (`async for result in response_stream:`).
|
||||
5. **Error Handling:** Includes `try...except` blocks for JSON decoding errors and HTTP request errors.
|
||||
|
||||
Again, the client libraries hide most of the complexity, providing a simple async iterator for your application to consume.
|
||||
|
||||
## Under the Hood: The SSE Sequence
|
||||
|
||||
Here's how the pieces fit together when a client requests streaming:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant App as Client Application
|
||||
participant ClientLib as A2AClient Library
|
||||
participant Network as HTTP/SSE
|
||||
participant ServerLib as A2AServer Library
|
||||
participant Handler as Task Handler (Agent Logic)
|
||||
|
||||
App->>ClientLib: Call client.sendTaskSubscribe(params)
|
||||
ClientLib->>Network: POST /a2a (JSON-RPC: method="tasks/sendSubscribe", Accept: text/event-stream)
|
||||
Network->>ServerLib: Deliver POST request
|
||||
|
||||
ServerLib->>ServerLib: Receive request, See 'sendSubscribe'
|
||||
ServerLib->>Network: Respond HTTP 200 OK (Content-Type: text/event-stream)
|
||||
ServerLib->>Handler: Invoke handler(context)
|
||||
|
||||
Network->>ClientLib: Deliver HTTP 200 OK (stream headers)
|
||||
Note right of ClientLib: Connection open, ready for events
|
||||
|
||||
Handler->>Handler: Start processing...
|
||||
Handler-->>ServerLib: yield {state: "working"}
|
||||
|
||||
ServerLib->>ServerLib: Format update as JSONRPCResponse(result=TaskStatusUpdateEvent)
|
||||
ServerLib->>Network: Send SSE event (data: {"jsonrpc":"2.0", "id":"req-1", "result":{...working...}}\n\n)
|
||||
|
||||
Network->>ClientLib: Deliver SSE event
|
||||
ClientLib->>ClientLib: Parse 'data:' line, extract 'result' payload
|
||||
ClientLib-->>App: yield TaskStatusUpdateEvent (working)
|
||||
|
||||
Handler->>Handler: Generate partial result...
|
||||
Handler-->>ServerLib: yield Artifact(...)
|
||||
|
||||
ServerLib->>ServerLib: Format update as JSONRPCResponse(result=TaskArtifactUpdateEvent)
|
||||
ServerLib->>Network: Send SSE event (data: {"jsonrpc":"2.0", "id":"req-1", "result":{...artifact...}}\n\n)
|
||||
|
||||
Network->>ClientLib: Deliver SSE event
|
||||
ClientLib->>ClientLib: Parse 'data:' line, extract 'result' payload
|
||||
ClientLib-->>App: yield TaskArtifactUpdateEvent (artifact)
|
||||
|
||||
Handler->>Handler: Finish processing...
|
||||
Handler-->>ServerLib: yield {state: "completed"}
|
||||
|
||||
ServerLib->>ServerLib: Format update (final=true) as JSONRPCResponse(result=TaskStatusUpdateEvent)
|
||||
ServerLib->>Network: Send SSE event (data: {"jsonrpc":"2.0", "id":"req-1", "result":{...completed, final:true}}\n\n)
|
||||
ServerLib->>Network: Close connection
|
||||
|
||||
Network->>ClientLib: Deliver SSE event
|
||||
ClientLib->>ClientLib: Parse 'data:' line, extract 'result' payload
|
||||
ClientLib-->>App: yield TaskStatusUpdateEvent (completed)
|
||||
ClientLib->>ClientLib: Detect stream end
|
||||
App->>App: Async loop finishes
|
||||
```
|
||||
|
||||
## SSE Event Format in A2A
|
||||
|
||||
The basic format of an SSE message is:
|
||||
|
||||
```
|
||||
data: <payload_as_json_string>
|
||||
|
||||
```
|
||||
|
||||
(Note the two newlines at the end!)
|
||||
|
||||
In the A2A protocol, the `<payload_as_json_string>` is typically a standard JSON-RPC `Response` object. The `result` field of this response object contains the actual A2A event payload:
|
||||
|
||||
* **`TaskStatusUpdateEvent`:** Sent when the task's status changes (e.g., `submitted` -> `working`). Includes the new `TaskStatus`.
|
||||
* **`TaskArtifactUpdateEvent`:** Sent when the task generates an output `Artifact` (like a chunk of text, a file reference, or structured data).
|
||||
|
||||
**Example Status Update Event (as sent over SSE):**
|
||||
|
||||
```
|
||||
data: {"jsonrpc": "2.0", "id": "req-client-123", "result": {"id": "task-abc", "status": {"state": "working", "message": {"role": "agent", "parts": [{"text": "Analyzing data..."}]}, "timestamp": "..." }, "final": false}}
|
||||
|
||||
```
|
||||
|
||||
**Example Artifact Update Event (as sent over SSE):**
|
||||
|
||||
```
|
||||
data: {"jsonrpc": "2.0", "id": "req-client-123", "result": {"id": "task-abc", "artifact": {"parts": [{"text": "Here is the first paragraph..."}]}, "final": false}}
|
||||
|
||||
```
|
||||
|
||||
The `final: true` flag is added to the *last* event sent for a task (usually a final `TaskStatusUpdateEvent` with state `completed` or `failed`) to signal the end of the stream.
|
||||
|
||||
## Conclusion
|
||||
|
||||
Streaming Communication using Server-Sent Events (SSE) is a powerful feature of the A2A protocol that allows agents to provide real-time feedback for long-running tasks.
|
||||
|
||||
* It improves user experience by showing progress instead of making users wait.
|
||||
* It uses the standard SSE web technology (`Content-Type: text/event-stream`).
|
||||
* Clients initiate streaming using `tasks/sendSubscribe`.
|
||||
* Servers use libraries (like `sse-starlette` or custom Express logic) to send `data:` events containing JSON-RPC responses with `TaskStatusUpdateEvent` or `TaskArtifactUpdateEvent` payloads.
|
||||
* Clients use libraries (like `httpx-sse` or `fetch` streams) to easily consume these events.
|
||||
|
||||
Now that we understand how individual agents can communicate, even for long tasks, how can we coordinate *multiple* agents to work together on a larger goal?
|
||||
|
||||
**Next:** [Chapter 8: Multi-Agent Orchestration (Host Agent)](08_multi_agent_orchestration__host_agent_.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
251
docs/Google A2A/08_multi_agent_orchestration__host_agent_.md
Normal file
251
docs/Google A2A/08_multi_agent_orchestration__host_agent_.md
Normal file
@@ -0,0 +1,251 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Multi-Agent Orchestration (Host Agent)"
|
||||
parent: "Google A2A"
|
||||
nav_order: 8
|
||||
---
|
||||
|
||||
# Chapter 8: Multi-Agent Orchestration (Host Agent)
|
||||
|
||||
In the [previous chapter](07_streaming_communication__sse_.md), we saw how an agent server can stream updates back to a client using Server-Sent Events (SSE). This is great for keeping users informed during long tasks.
|
||||
|
||||
But what if a task is *so* complex that no single AI agent can handle it alone? Imagine asking an assistant: "Plan a weekend trip to London for me, including flights from New York, a hotel near the British Museum, and suggest two vegetarian restaurants."
|
||||
|
||||
One agent might be amazing at finding flights, another specialized in hotel bookings, and a third brilliant at restaurant recommendations. How can we get these specialist agents to work together to fulfill your complex request?
|
||||
|
||||
This chapter introduces the concept of **Multi-Agent Orchestration** using a **Host Agent**. It solves the problem: **How can we coordinate multiple, specialized AI agents to achieve a larger goal?**
|
||||
|
||||
## What is a Host Agent? The Project Manager AI
|
||||
|
||||
Think of a big project, like building a house. You don't just talk to one person. You have a **project manager** (or general contractor). They:
|
||||
|
||||
1. Receive the high-level goal (build a house).
|
||||
2. Understand the different skills needed (plumbing, electrical, framing, etc.).
|
||||
3. Find and hire specialists (plumbers, electricians, carpenters).
|
||||
4. Assign specific tasks to each specialist.
|
||||
5. Coordinate their work and deadlines.
|
||||
6. Combine their contributions into the final house.
|
||||
|
||||
A **Host Agent** in the A2A world acts exactly like that project manager. It's an AI agent whose main job is *not* to perform tasks itself, but to **coordinate other agents**. Specifically, it acts as an **[A2A Client](05_a2a_client_implementation.md)** to *other* downstream A2A agents.
|
||||
|
||||
Here's the flow:
|
||||
|
||||
1. **Receives Request:** The Host Agent gets a request from a user or application (e.g., "Plan my London trip").
|
||||
2. **Finds Specialists:** It looks at its list of known downstream agents and their [Agent Cards](01_agent_card.md) to see who has the needed skills (e.g., "Flight Booker Agent", "Hotel Finder Agent").
|
||||
3. **Delegates Tasks:** It breaks down the request and sends specific [Tasks](02_task.md) to the chosen downstream agents using the standard [A2A Protocol & Core Types](03_a2a_protocol___core_types.md). For example:
|
||||
* Sends a task "Find NYC-London flights for next weekend" to the Flight Booker Agent.
|
||||
* Sends a task "Find hotels near British Museum" to the Hotel Finder Agent.
|
||||
4. **Gathers Results:** It receives the results (potentially via [Streaming Communication (SSE)](07_streaming_communication__sse_.md)) from the downstream agents.
|
||||
5. **Combines & Responds:** It might combine the flight info and hotel options into a single, coherent response for the original user.
|
||||
|
||||
The Host Agent is the central coordinator, making multiple agents appear as one unified, more capable agent.
|
||||
|
||||
## How a Host Agent Works (Conceptual)
|
||||
|
||||
Let's imagine we're building a simple Host Agent. It knows about two other agents:
|
||||
|
||||
* `Joke Teller Agent` (at `http://joke-agent.com`) - Skill: `tell_joke`
|
||||
* `Summarizer Agent` (at `http://summary-agent.com`) - Skill: `summarize_text`
|
||||
|
||||
Our Host Agent receives the request: "Tell me a joke and summarize this article: [long article text]"
|
||||
|
||||
Here's how the Host Agent's internal logic might work:
|
||||
|
||||
1. **Analyze Request:** The Host Agent realizes the request has two parts: telling a joke and summarizing text.
|
||||
2. **Match Skills:**
|
||||
* It checks its known agents' [Agent Cards](01_agent_card.md).
|
||||
* It sees `Joke Teller Agent` has the `tell_joke` skill.
|
||||
* It sees `Summarizer Agent` has the `summarize_text` skill.
|
||||
3. **Delegate Task 1 (Joke):**
|
||||
* It acts as an [A2A Client](05_a2a_client_implementation.md).
|
||||
* It sends a `tasks/send` request to `http://joke-agent.com/a2a` with the message "Tell me a joke".
|
||||
4. **Delegate Task 2 (Summary):**
|
||||
* It acts as an [A2A Client](05_a2a_client_implementation.md) again.
|
||||
* It sends a `tasks/send` request to `http://summary-agent.com/a2a` with the message containing the article text.
|
||||
5. **Await Responses:** It waits for both downstream tasks to complete (using their Task IDs to track them). Let's say it gets:
|
||||
* From Joke Agent: "Why don't scientists trust atoms? Because they make up everything!"
|
||||
* From Summarizer Agent: "[Short summary of the article]"
|
||||
6. **Combine & Reply:** It combines these results into a single response for the original user: "Okay, here's a joke: Why don't scientists trust atoms? Because they make up everything! \n\nAnd here's the summary: [Short summary of the article]"
|
||||
|
||||
## Example Implementation Snippets (Conceptual Python)
|
||||
|
||||
Building a full Host Agent often involves frameworks like Google's Agent Development Kit (ADK), as seen in `samples/python/hosts/multiagent/host_agent.py`. However, let's look at the core A2A concepts conceptually.
|
||||
|
||||
The Host Agent needs a way to manage connections to downstream agents. We might have a helper class like `RemoteAgentConnection` (inspired by `samples/python/hosts/multiagent/remote_agent_connection.py`) which internally uses an [A2A Client](05_a2a_client_implementation.md).
|
||||
|
||||
```python
|
||||
# Conceptual Helper Class (Manages client for one downstream agent)
|
||||
from common.client import A2AClient
|
||||
from common.types import AgentCard, TaskSendParams, Task
|
||||
|
||||
class RemoteAgentConnection:
|
||||
def __init__(self, agent_card: AgentCard):
|
||||
# Store the downstream agent's card
|
||||
self.card = agent_card
|
||||
# Create an A2A client specifically for this agent
|
||||
self.client = A2AClient(agent_card=agent_card)
|
||||
print(f"Connection ready for agent: {self.card.name}")
|
||||
|
||||
async def send_task_to_remote(self, params: TaskSendParams) -> Task:
|
||||
print(f"Host sending task {params.id} to {self.card.name}...")
|
||||
# Use the internal A2A client to send the task
|
||||
# (Simplified: assumes non-streaming for clarity)
|
||||
response = await self.client.send_task(params.model_dump())
|
||||
print(f"Host received response for task {params.id} from {self.card.name}")
|
||||
return response.result # Return the final Task object
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
* This class holds the [Agent Card](01_agent_card.md) of a downstream agent.
|
||||
* It creates and holds an [A2A Client](05_a2a_client_implementation.md) instance configured to talk to that specific agent's A2A server URL.
|
||||
* The `send_task_to_remote` method takes the task details (`TaskSendParams`) and uses the internal client to actually send the [Task](02_task.md) over A2A.
|
||||
|
||||
Now, the Host Agent's main logic might look something like this:
|
||||
|
||||
```python
|
||||
# Conceptual Host Agent Logic
|
||||
import asyncio
|
||||
from common.types import Message, TextPart, TaskSendParams
|
||||
import uuid
|
||||
|
||||
class HostAgentLogic:
|
||||
def __init__(self):
|
||||
# Assume agent cards are loaded somehow
|
||||
joke_agent_card = AgentCard(name="Joke Agent", url="http://joke-agent.com/a2a", ...)
|
||||
summary_agent_card = AgentCard(name="Summarizer Agent", url="http://summary-agent.com/a2a", ...)
|
||||
|
||||
# Create connections to downstream agents
|
||||
self.remote_connections = {
|
||||
"Joke Agent": RemoteAgentConnection(joke_agent_card),
|
||||
"Summarizer Agent": RemoteAgentConnection(summary_agent_card),
|
||||
}
|
||||
print("Host Agent initialized with remote connections.")
|
||||
|
||||
async def handle_user_request(self, user_request_text: str):
|
||||
print(f"Host received user request: {user_request_text}")
|
||||
# Super simplified logic: If "joke" in request, call Joke Agent.
|
||||
# If "summarize" in request, call Summarizer Agent.
|
||||
|
||||
tasks_to_run = []
|
||||
if "joke" in user_request_text.lower():
|
||||
joke_conn = self.remote_connections["Joke Agent"]
|
||||
joke_params = TaskSendParams(
|
||||
id=str(uuid.uuid4()),
|
||||
message=Message(role="user", parts=[TextPart(text="Tell joke")])
|
||||
)
|
||||
# Add the task-sending coroutine to the list
|
||||
tasks_to_run.append(joke_conn.send_task_to_remote(joke_params))
|
||||
|
||||
if "summarize" in user_request_text.lower():
|
||||
# (Assume article_text is extracted from user_request_text)
|
||||
article_text = "This is the article to summarize..."
|
||||
summary_conn = self.remote_connections["Summarizer Agent"]
|
||||
summary_params = TaskSendParams(
|
||||
id=str(uuid.uuid4()),
|
||||
message=Message(role="user", parts=[TextPart(text=article_text)])
|
||||
)
|
||||
tasks_to_run.append(summary_conn.send_task_to_remote(summary_params))
|
||||
|
||||
# Run the downstream tasks concurrently
|
||||
print(f"Host dispatching {len(tasks_to_run)} tasks...")
|
||||
results = await asyncio.gather(*tasks_to_run)
|
||||
print("Host gathered results from downstream agents.")
|
||||
|
||||
# Combine results (simplified)
|
||||
final_response = ""
|
||||
for task_result in results:
|
||||
if task_result.status.message and task_result.status.message.parts:
|
||||
final_response += task_result.status.message.parts[0].text + "\n"
|
||||
|
||||
print(f"Host final response: {final_response}")
|
||||
return final_response
|
||||
|
||||
# --- Example Usage ---
|
||||
# async def main():
|
||||
# host = HostAgentLogic()
|
||||
# await host.handle_user_request("Tell me a joke and summarize stuff.")
|
||||
# asyncio.run(main())
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
1. **Initialization:** The `HostAgentLogic` creates `RemoteAgentConnection` instances for each downstream agent it knows.
|
||||
2. **Request Handling:** When `handle_user_request` is called, it figures out which downstream agents are needed based on the request text (very basic keyword matching here).
|
||||
3. **Prepare Tasks:** It prepares the `TaskSendParams` for each required downstream task.
|
||||
4. **Concurrent Delegation:** It uses `asyncio.gather` to run the `send_task_to_remote` calls for all needed agents *concurrently*. This means it doesn't wait for the joke agent to finish before asking the summarizer agent to start.
|
||||
5. **Combine Results:** After `asyncio.gather` finishes (meaning all downstream tasks have completed), it extracts the results from the returned `Task` objects and combines them into a final response.
|
||||
|
||||
This example shows the core idea: the Host Agent uses its knowledge of other agents' capabilities and acts as an A2A client to delegate work, potentially in parallel. Real host agents would have much more sophisticated logic for planning, delegation, and result synthesis, possibly using large language models themselves for coordination.
|
||||
|
||||
## Under the Hood: Orchestration Flow
|
||||
|
||||
Let's trace the communication for our "Joke & Summarize" example:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant User
|
||||
participant Host as Host Agent (Server)
|
||||
participant HAClient as Host Agent (Internal A2A Client)
|
||||
participant Joke as Joke Agent (Server)
|
||||
participant Summary as Summarizer Agent (Server)
|
||||
|
||||
User->>Host: Send Task T0: "Tell joke & summarize..."
|
||||
Note over Host: Analyzes request, needs Joke & Summarizer
|
||||
|
||||
Host->>HAClient: Initiate A2A Task T1 to Joke Agent ("Tell joke")
|
||||
HAClient->>Joke: POST /a2a (tasks/send, id=T1, msg="Tell joke")
|
||||
Note right of Joke: Joke Agent starts processing T1
|
||||
|
||||
Host->>HAClient: Initiate A2A Task T2 to Summarizer Agent ("Summarize text...")
|
||||
HAClient->>Summary: POST /a2a (tasks/send, id=T2, msg="...")
|
||||
Note right of Summary: Summarizer Agent starts processing T2
|
||||
|
||||
Joke-->>HAClient: 200 OK (JSON-RPC result: Task T1 object, state=completed, result="Why..?")
|
||||
HAClient-->>Host: Received result for T1
|
||||
|
||||
Summary-->>HAClient: 200 OK (JSON-RPC result: Task T2 object, state=completed, result="[Summary...]")
|
||||
HAClient-->>Host: Received result for T2
|
||||
|
||||
Note over Host: Combines results from T1 and T2
|
||||
Host-->>User: Respond Task T0 (state=completed, result="Joke: ... Summary: ...")
|
||||
```
|
||||
|
||||
**Steps:**
|
||||
|
||||
1. User sends the initial request (Task T0) to the Host Agent.
|
||||
2. The Host Agent's logic determines it needs both the Joke Agent and Summarizer Agent.
|
||||
3. The Host Agent uses its internal A2A client capabilities (represented by `HAClient`) to send Task T1 to the Joke Agent's A2A server endpoint.
|
||||
4. Concurrently (or sequentially), the Host Agent uses its client capabilities to send Task T2 to the Summarizer Agent's A2A server endpoint.
|
||||
5. The downstream agents (Joke, Summary) process their respective tasks and send back A2A responses (containing the final Task object with results) to the Host Agent's client component.
|
||||
6. The Host Agent logic receives the results for T1 and T2.
|
||||
7. The Host Agent combines the results and sends the final response for the original Task T0 back to the user.
|
||||
|
||||
The key is that the Host Agent speaks A2A *both* as a server (to the original user) and as a client (to the downstream agents).
|
||||
|
||||
**Relevant Files:**
|
||||
|
||||
* `samples/python/hosts/multiagent/host_agent.py`: Implements the host agent logic, deciding which tools (remote agents) to call.
|
||||
* `samples/python/hosts/multiagent/remote_agent_connection.py`: Wraps the `A2AClient` for easier use by the `HostAgent`. It handles sending the task via A2A (streaming or non-streaming).
|
||||
* `demo/ui/service/server/adk_host_manager.py`: Manages the lifecycle and state of the host agent within the demo application framework (using Google ADK). It shows how task callbacks from `RemoteAgentConnection` update the overall state.
|
||||
|
||||
## Conclusion
|
||||
|
||||
Multi-Agent Orchestration allows us to combine the strengths of specialized AI agents to tackle complex problems that a single agent might struggle with.
|
||||
|
||||
The **Host Agent** acts as the "project manager" in this system. It:
|
||||
|
||||
* Understands the overall goal.
|
||||
* Knows the capabilities of other available agents (via their [Agent Cards](01_agent_card.md)).
|
||||
* Delegates sub-tasks to appropriate downstream agents by acting as an [A2A Client](05_a2a_client_implementation.md).
|
||||
* Coordinates the process and potentially combines the results.
|
||||
|
||||
This pattern enables building sophisticated applications by composing modular, specialized agents that communicate using the standard A2A protocol.
|
||||
|
||||
Now that we've explored the core concepts and components of the A2A protocol, let's see how they all come together in a practical demonstration.
|
||||
|
||||
**Next:** [Chapter 9: Demo UI Application & Service](09_demo_ui_application___service.md)
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
257
docs/Google A2A/09_demo_ui_application___service.md
Normal file
257
docs/Google A2A/09_demo_ui_application___service.md
Normal file
@@ -0,0 +1,257 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Demo UI Application & Service"
|
||||
parent: "Google A2A"
|
||||
nav_order: 9
|
||||
---
|
||||
|
||||
# Chapter 9: Demo UI Application & Service
|
||||
|
||||
In the [previous chapter](08_multi_agent_orchestration__host_agent_.md), we explored how a **Host Agent** can act like a project manager, coordinating multiple specialized agents using the A2A protocol to achieve complex goals. We've learned about Agent Cards, Tasks, the protocol itself, servers, clients, task logic, streaming, and orchestration. That's a lot of building blocks!
|
||||
|
||||
But how do we see all these pieces working together in a real, interactive way? Just reading about protocols and servers is like reading the blueprints for a car. Wouldn't it be more helpful to actually *see* the car drive?
|
||||
|
||||
That's where the **Demo UI Application & Service** comes in. It solves the problem: **How can we visualize and interact with the A2A protocol and multi-agent systems in action?**
|
||||
|
||||
## What is the Demo UI Application & Service? The Control Room
|
||||
|
||||
Imagine a space mission control room. You have:
|
||||
|
||||
* **Big Screens (UI):** Showing the rocket's status, communication logs, astronaut locations, etc.
|
||||
* **Flight Controllers (Backend Service):** People at consoles managing specific parts of the mission, talking to different teams, and updating the screens.
|
||||
* **Astronauts & Ground Crew (A2A Agents):** The actual experts doing the work (flying, repairing, analyzing), communicating back via radio (A2A protocol).
|
||||
|
||||
The **Demo UI Application & Service** is like that control room for our A2A agents:
|
||||
|
||||
1. **Demo UI Application:** This is the web-based frontend, built using a Python framework called [Mesop](https://github.com/mesop-dev/mesop). It provides the "big screens" – a chat interface where you can talk to agents, see their responses (including special content like forms or images), view lists of available agents, and inspect the communication flow.
|
||||
2. **Backend Service (`ConversationServer`):** This is the "flight controller" software running behind the scenes. It's a backend web service (built using FastAPI in Python) that the UI application talks to. It's *not* the main [Host Agent](08_multi_agent_orchestration__host_agent_.md) itself, but rather an **intermediary**. It manages the user's conversations, receives events from the UI (like sending a message), communicates with the actual agent logic (like the Host Agent), and sends state updates back to the UI so the screens stay current.
|
||||
|
||||
Think of it as a user-friendly window into the world of A2A, letting you watch and participate as agents collaborate.
|
||||
|
||||
## Key Components
|
||||
|
||||
Let's break down the two main parts:
|
||||
|
||||
### 1. Frontend (Mesop UI Application)
|
||||
|
||||
This is what you see and interact with in your web browser. Mesop allows building UIs purely in Python. Key features include:
|
||||
|
||||
* **Chat Interface:** Displays the conversation history between you and the agent system. (`demo/ui/components/conversation.py`)
|
||||
* **Input Box:** Where you type your messages to the agent. (`demo/ui/components/conversation.py`)
|
||||
* **Agent Management:** Allows adding new agents by providing their [Agent Card](01_agent_card.md) URL. (`demo/ui/pages/agent_list.py`)
|
||||
* **Rich Content Rendering:** Can display not just text, but also interactive forms sent by agents (`demo/ui/components/form_render.py`), images, etc.
|
||||
* **Task/Event Views:** Provides ways to inspect the underlying [Tasks](02_task.md) and communication events happening via A2A. (`demo/ui/pages/task_list.py`, `demo/ui/pages/event_list.py`)
|
||||
|
||||
```python
|
||||
# File: demo/ui/components/conversation.py (Simplified Snippet)
|
||||
# ... imports ...
|
||||
|
||||
@me.component
|
||||
def conversation():
|
||||
"""Conversation component"""
|
||||
page_state = me.state(PageState) # Local page state
|
||||
app_state = me.state(AppState) # Global application state
|
||||
|
||||
# ... loop to display existing messages using chat_bubble component ...
|
||||
for message in app_state.messages:
|
||||
if is_form(message):
|
||||
render_form(message, app_state) # Special handling for forms
|
||||
# ... other message types ...
|
||||
else:
|
||||
chat_bubble(message, message.message_id) # Display regular chat message
|
||||
|
||||
# --- Input area ---
|
||||
with me.box(style=me.Style(display="flex", flex_direction="row", ...)):
|
||||
me.input(
|
||||
label="How can I help you?",
|
||||
on_enter=send_message_enter, # Function to call when user presses Enter
|
||||
# ... other attributes ...
|
||||
)
|
||||
with me.content_button(on_click=send_message_button): # Button handler
|
||||
me.icon(icon="send")
|
||||
|
||||
async def send_message_enter(e: me.InputEnterEvent):
|
||||
# ... (get state) ...
|
||||
message_content = e.value
|
||||
message_id = str(uuid.uuid4())
|
||||
# Store something to indicate a background task is running
|
||||
app_state = me.state(AppState)
|
||||
app_state.background_tasks[message_id] = "Processing..."
|
||||
yield # Update UI to show indicator
|
||||
# Call the backend service to actually send the message
|
||||
await send_message(message_content, message_id)
|
||||
yield # Allow UI to potentially update again
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
* This Mesop component defines the chat interface.
|
||||
* It uses `app_state` (defined in `demo/ui/state/state.py`) to access the current list of messages and display them.
|
||||
* It renders an `me.input` field. When the user presses Enter (`on_enter`), the `send_message_enter` function is called.
|
||||
* `send_message_enter` gets the user's text, updates the state to show a "Processing..." indicator, and then calls `send_message` (defined in `demo/ui/state/host_agent_service.py`) which actually communicates with the backend `ConversationServer`.
|
||||
|
||||
### 2. Backend (`ConversationServer`)
|
||||
|
||||
This FastAPI server acts as the bridge between the simple HTTP/JSON communication from the UI and the potentially more complex agent interactions (which might involve A2A or frameworks like Google ADK).
|
||||
|
||||
* **API Endpoints:** Exposes simple HTTP endpoints (e.g., `/message/send`, `/conversation/list`) that the UI's client can call. (`demo/ui/service/server/server.py`)
|
||||
* **Conversation Management:** Keeps track of different chat sessions.
|
||||
* **State Management:** Holds the application state (messages, tasks, agents) that the UI needs to display.
|
||||
* **Agent Interaction Logic:** Contains the logic to forward requests from the UI to the actual agent system (e.g., the ADK [Host Agent](08_multi_agent_orchestration__host_agent_.md)). (`demo/ui/service/server/adk_host_manager.py`)
|
||||
* **Callback Handling:** Receives updates (like task status changes or new artifacts) from the agent system and updates its internal state.
|
||||
|
||||
```python
|
||||
# File: demo/ui/service/server/server.py (Simplified Snippet)
|
||||
from fastapi import APIRouter, Request
|
||||
from common.types import Message
|
||||
from .adk_host_manager import ADKHostManager # Implements agent interaction logic
|
||||
# ... other imports ...
|
||||
|
||||
class ConversationServer:
|
||||
def __init__(self, router: APIRouter):
|
||||
# Choose the manager (e.g., ADKHostManager uses the Host Agent)
|
||||
self.manager = ADKHostManager()
|
||||
|
||||
# Define API route for sending messages
|
||||
router.add_api_route(
|
||||
"/message/send",
|
||||
self._send_message, # Maps URL to the _send_message method
|
||||
methods=["POST"])
|
||||
# ... other routes (/conversation/list, /task/list, etc.) ...
|
||||
|
||||
async def _send_message(self, request: Request):
|
||||
message_data = await request.json()
|
||||
# Parse the message data sent by the UI client
|
||||
message = Message(**message_data['params'])
|
||||
# Add necessary metadata (IDs, etc.)
|
||||
message = self.manager.sanitize_message(message)
|
||||
# --- Crucial Part: Pass message to the agent logic ---
|
||||
# Run the actual agent processing in a background thread
|
||||
# so the API call returns quickly to the UI.
|
||||
thread = threading.Thread(
|
||||
target=lambda: asyncio.run(self.manager.process_message(message))
|
||||
)
|
||||
thread.start()
|
||||
# Return an immediate confirmation to the UI
|
||||
return SendMessageResponse(result=MessageInfo(
|
||||
message_id=message.metadata['message_id'],
|
||||
# ... other info ...
|
||||
))
|
||||
```
|
||||
|
||||
**Explanation:**
|
||||
|
||||
* The `ConversationServer` sets up API routes using FastAPI.
|
||||
* The `_send_message` method handles requests to the `/message/send` endpoint.
|
||||
* It parses the `Message` sent from the UI client.
|
||||
* It calls `self.manager.process_message(message)`. The `manager` (here, `ADKHostManager`) is responsible for actually interacting with the underlying agent system ([Host Agent](08_multi_agent_orchestration__host_agent_.md)).
|
||||
* Crucially, `process_message` is run in a separate thread so the API can respond quickly, acknowledging receipt, while the potentially long-running agent work happens in the background.
|
||||
|
||||
## How It Works: The Flow of a Message
|
||||
|
||||
Let's trace what happens when you type "Hello" and press Enter in the Demo UI:
|
||||
|
||||
1. **UI (Mesop):** The `on_enter` event triggers `send_message_enter` in `conversation.py`.
|
||||
2. **UI State:** `send_message_enter` updates the `AppState` to show a "Processing" indicator.
|
||||
3. **UI Client (`host_agent_service.py`):** `send_message_enter` calls `SendMessage(message)`. This function uses the `ConversationClient` to make an HTTP POST request to the `ConversationServer`'s `/message/send` endpoint, sending the user's message as JSON.
|
||||
```python
|
||||
# File: demo/ui/state/host_agent_service.py (Simplified Snippet)
|
||||
async def SendMessage(message: Message) -> str | None:
|
||||
client = ConversationClient(server_url) # Backend server URL
|
||||
try:
|
||||
# Make HTTP POST request to backend API
|
||||
response = await client.send_message(SendMessageRequest(params=message))
|
||||
return response.result # Contains confirmation IDs
|
||||
except Exception as e:
|
||||
print("Failed to send message: ", e)
|
||||
```
|
||||
4. **Backend Service (`server.py`):** The `_send_message` method on the `ConversationServer` receives the POST request.
|
||||
5. **Backend Service Logic (`adk_host_manager.py`):** `_send_message` calls `self.manager.process_message(message)` (running in a background thread).
|
||||
```python
|
||||
# File: demo/ui/service/server/adk_host_manager.py (Simplified Snippet)
|
||||
async def process_message(self, message: Message):
|
||||
# ... (Store message, add event) ...
|
||||
# Get conversation context
|
||||
conversation_id = message.metadata.get('conversation_id')
|
||||
# --- Interact with the actual agent (e.g., Google ADK Runner) ---
|
||||
async for event in self._host_runner.run_async(
|
||||
user_id=self.user_id,
|
||||
session_id=conversation_id,
|
||||
new_message=self.adk_content_from_message(message) # Convert to agent format
|
||||
):
|
||||
# Process events coming *back* from the agent
|
||||
self.add_event(...) # Store for UI event log
|
||||
# ... potentially update task status via task_callback ...
|
||||
# ... (Store final response message) ...
|
||||
# Remove pending indicator
|
||||
self._pending_message_ids.remove(get_message_id(message))
|
||||
```
|
||||
6. **Agent Processing:** `process_message` uses the ADK `Runner` (`self._host_runner`) to send the message to the configured agent (our [Host Agent](08_multi_agent_orchestration__host_agent_.md)). The Host Agent might then use its own [A2A Client](05_a2a_client_implementation.md) logic to talk to downstream agents via A2A.
|
||||
7. **Agent Response/Updates:** As the agent system works, it sends back events (potentially via [Streaming Communication (SSE)](07_streaming_communication__sse_.md) if using A2A, or via ADK callbacks). The `ADKHostManager`'s `task_callback` or the `run_async` loop processes these updates, storing new messages, updating task statuses, and storing artifacts.
|
||||
8. **UI Polling (`page_scaffold.py`):** Meanwhile, the Mesop UI periodically polls the `ConversationServer` for state updates using an `async_poller` component. This poller triggers `UpdateAppState` in `host_agent_service.py`.
|
||||
```python
|
||||
# File: demo/ui/components/page_scaffold.py (Simplified Snippet)
|
||||
async def refresh_app_state(e: mel.WebEvent): # Triggered by poller
|
||||
yield
|
||||
app_state = me.state(AppState)
|
||||
# Call backend service to get the latest state
|
||||
await UpdateAppState(app_state, app_state.current_conversation_id)
|
||||
yield
|
||||
# ... in page_scaffold component setup ...
|
||||
async_poller(action=..., trigger_event=refresh_app_state)
|
||||
```
|
||||
9. **Backend State Request (`host_agent_service.py`):** `UpdateAppState` calls various `ConversationServer` endpoints (like `/conversation/list`, `/message/list`, `/task/list`) to get the latest messages, tasks, etc.
|
||||
10. **Backend Response:** The `ConversationServer` returns the current state data from its `manager`.
|
||||
11. **UI Update:** `UpdateAppState` updates the global `AppState` in Mesop with the fresh data. Because Mesop automatically re-renders when state changes, the UI updates to show the agent's response, remove the "Processing" indicator, and update task lists.
|
||||
|
||||
## Under the Hood: Sequence Diagram
|
||||
|
||||
This diagram shows the high-level flow for sending a message and getting a response, involving the UI, the Backend Service, and the Agent Logic (like the Host Agent).
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant User
|
||||
participant UI as Mesop Frontend
|
||||
participant BClient as Backend Client (host_agent_service)
|
||||
participant BServer as Backend Service (ConversationServer)
|
||||
participant Manager as Backend Manager (ADKHostManager)
|
||||
participant Agent as Agent Logic (Host Agent / ADK)
|
||||
|
||||
User->>UI: Type message, press Enter
|
||||
UI->>BClient: Call SendMessage(msg)
|
||||
BClient->>BServer: POST /message/send (JSON: msg)
|
||||
BServer->>Manager: Call process_message(msg) [async]
|
||||
BServer-->>BClient: 200 OK (Ack)
|
||||
BClient-->>UI: Return (UI shows processing)
|
||||
|
||||
Note over Manager, Agent: Agent processing happens...
|
||||
Manager->>Agent: Run agent with message
|
||||
Agent-->>Manager: Agent produces results/updates
|
||||
Manager->>Manager: Store results/state updates
|
||||
|
||||
loop UI Polling for Updates
|
||||
UI->>BClient: Call UpdateAppState()
|
||||
BClient->>BServer: POST /message/list, /task/list, etc.
|
||||
BServer->>Manager: Get current state data
|
||||
Manager-->>BServer: Return state data
|
||||
BServer-->>BClient: 200 OK (JSON: state)
|
||||
BClient->>UI: Update Mesop AppState
|
||||
Note over UI: Mesop re-renders with new data (agent response)
|
||||
end
|
||||
```
|
||||
|
||||
## Conclusion
|
||||
|
||||
The Demo UI Application and its associated `ConversationServer` backend provide a crucial, practical tool for the Google A2A project. They act as an interactive "control room" allowing you to:
|
||||
|
||||
* **Visualize** conversations involving one or more A2A agents.
|
||||
* **Interact** with the system by sending messages.
|
||||
* **Observe** how components like the [Host Agent](08_multi_agent_orchestration__host_agent_.md) delegate tasks using the A2A protocol.
|
||||
* **Inspect** the state of [Tasks](02_task.md) and communication events.
|
||||
* **Experiment** by adding new agents via their [Agent Cards](01_agent_card.md).
|
||||
|
||||
It brings together all the concepts we've discussed – servers, clients, tasks, streaming, orchestration – into a tangible demonstration, making the abstract protocol concrete and easier to understand. This completes our journey through the core concepts of the Google A2A protocol and its demonstration application!
|
||||
|
||||
---
|
||||
|
||||
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
38
docs/Google A2A/index.md
Normal file
38
docs/Google A2A/index.md
Normal file
@@ -0,0 +1,38 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Google A2A"
|
||||
nav_order: 12
|
||||
has_children: true
|
||||
---
|
||||
|
||||
# Tutorial: Google A2A
|
||||
|
||||
> This tutorial is AI-generated! To learn more, check out [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
|
||||
|
||||
The **Google A2A (Agent-to-Agent)**<sup>[View Repo](https://github.com/google/A2A)</sup> project defines an *open protocol* enabling different AI agents, possibly built with different technologies, to communicate and work together.
|
||||
Think of it as a common language (*A2A Protocol*) agents use to discover each other (*Agent Card*), assign work (*Task*), and exchange results, even providing real-time updates (*Streaming*).
|
||||
The project includes sample *client* and *server* implementations, example agents using frameworks like LangGraph or CrewAI, and a *demo UI* showcasing multi-agent interactions.
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
A0["A2A Protocol & Core Types"]
|
||||
A1["Task"]
|
||||
A2["Agent Card"]
|
||||
A3["A2A Server Implementation"]
|
||||
A4["A2A Client Implementation"]
|
||||
A5["Task Handling Logic (Server-side)"]
|
||||
A6["Streaming Communication (SSE)"]
|
||||
A7["Demo UI Application & Service"]
|
||||
A8["Multi-Agent Orchestration (Host Agent)"]
|
||||
A0 -- "Defines Structure For" --> A1
|
||||
A0 -- "Defines Structure For" --> A2
|
||||
A4 -- "Sends Task Requests To" --> A3
|
||||
A3 -- "Delegates Task To" --> A5
|
||||
A5 -- "Executes" --> A1
|
||||
A8 -- "Uses for Discovery" --> A2
|
||||
A3 -- "Sends Updates Via" --> A6
|
||||
A4 -- "Receives Updates Via" --> A6
|
||||
A8 -- "Acts As" --> A4
|
||||
A7 -- "Presents/Manages" --> A8
|
||||
A7 -- "Communicates With" --> A5
|
||||
```
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "LangGraph"
|
||||
nav_order: 12
|
||||
nav_order: 13
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "LevelDB"
|
||||
nav_order: 13
|
||||
nav_order: 14
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "MCP Python SDK"
|
||||
nav_order: 14
|
||||
nav_order: 15
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "NumPy Core"
|
||||
nav_order: 15
|
||||
nav_order: 16
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "OpenManus"
|
||||
nav_order: 16
|
||||
nav_order: 17
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Pydantic Core"
|
||||
nav_order: 17
|
||||
nav_order: 18
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "Requests"
|
||||
nav_order: 18
|
||||
nav_order: 19
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
layout: default
|
||||
title: "SmolaAgents"
|
||||
nav_order: 19
|
||||
nav_order: 20
|
||||
has_children: true
|
||||
---
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ This is a tutorial project of [Pocket Flow](https://github.com/The-Pocket/Pocket
|
||||
- [DSPy](./DSPy/index.md) - Build LLM apps like Lego blocks that optimize themselves!
|
||||
- [FastAPI](./FastAPI/index.md) - Create APIs at lightning speed with automatic docs that clients will love!
|
||||
- [Flask](./Flask/index.md) - Craft web apps with minimal code that scales from prototype to production!
|
||||
- [Google A2A](./Google A2A/index.md) - The universal language that lets AI agents collaborate across borders!
|
||||
- [LangGraph](./LangGraph/index.md) - Design AI agents as flowcharts where each step remembers what happened before!
|
||||
- [LevelDB](./LevelDB/index.md) - Store data at warp speed with Google's engine that powers blockchains!
|
||||
- [MCP Python SDK](./MCP Python SDK/index.md) - Build powerful apps that communicate through an elegant protocol without sweating the details!
|
||||
|
||||
Reference in New Issue
Block a user