# Python A2A Tutorial

By [czmilo](https://paragraph.com/@zhangcheng) · 2025-05-02

---

[https://a2aprotocol.ai/blog/python-a2a-tutorial](https://a2aprotocol.ai/blog/python-a2a-tutorial)

A2A Python Tutorial - Comprehensive Guide
=========================================

Table of Contents
-----------------

*   Introduction
    
*   Set up Your Environment
    
*   Creating A Project
    
*   Agent Skills
    
*   Agent Card
    
*   A2A Server
    
*   Interacting With Your A2A Server
    
*   Adding Agent Capabilities
    
*   Using a Local Ollama Model
    
*   Next Steps
    

Introduction
------------

In this tutorial, you will build a simple echo A2A server using Python. This barebones implementation will show you all the features A2A has to offer. Following this tutorial, you will be able to add agent functionality using Ollama or Google's Agent Development Kit.

**What you'll learn:**

*   The basic concepts behind A2A
    
*   How to create an A2A server in Python
    
*   Interacting with an A2A server
    
*   Add a trained model to act as the agent
    

Set up Your Environment
-----------------------

### What You'll Need

*   A code editor such as Visual Studio Code (VS Code)
    
*   A command prompt such as Terminal (Linux), iTerm (Mac) or just the Terminal in VS Code
    

### Python Environment

We'll be using [uv](https://docs.astral.sh/uv/getting-started/installation/) as our package manager and to set up our project.

The A2A libraries we'll be using require `python >= 3.12` which [uv can install](https://docs.astral.sh/uv/guides/install-python/) if you don't already have a matching version. We'll be using python 3.12.

### Check

Run the following command to make sure you're ready for the next step:

    echo 'import sys; print(sys.version)' | uv run -
    

If you see something similar to the following, you are ready to proceed!

    3.12.3 (main, Feb 4 2025, 14:48:35) [GCC 13.3.0]
    

Creating A Project
------------------

Let's first create a project using `uv`. We'll add the `--package` flag in case you want to add tests, or publish your project later:

    uv init --package my-project
    cd my-project
    

### Using a Virtual Env

We'll create a venv for this project. This only needs to be done once:

    uv venv .venv
    

For this and any future terminal windows you open, you'll need to source this venv:

    source .venv/bin/activate
    

If you're using a code editor such as VS Code, you'll want to set the Python Interpreter for code completions. In VS Code, press `Ctrl-Shift-P` and select `Python: Select Interpreter`. Then select your project `my-project` followed by the correct python interpreter `Python 3.12.3 ('.venv':venv) ./.venv/bin/python`

The source code should now look similar to this:

    tree .
    .
    ├── pyproject.toml
    ├── README.md
    ├── src
    │   └── my-project
    │       ├── __init__.py
    

### Adding the Google-A2A Python Libraries

Next we'll add the sample A2A python libraries from Google:

    uv add git+https://github.com/google/A2A#subdirectory=samples/python
    

### Setting up the project structure

Let's now create some files we'll later be using:

    touch src/my_project/agent.py
    touch src/my_project/task_manager.py
    

### Test Run

If everything is setup correctly, you should now be able to run your application:

    uv run my-project
    

The output should look something like this:

    Hello from my-project!
    

Agent Skills
------------

An agent skill is a set of capabilities the agent can perform. Here's an example of what it would look like for our echo agent:

    {
      id: "my-project-echo-skill"
      name: "Echo Tool",
      description: "Echos the input given",
      tags: ["echo", "repeater"],
      examples: ["I will see this echoed back to me"],
      inputModes: ["text"],
      outputModes: ["text"]
    }
    

This conforms to the skills section of the Agent Card:

    {
      id: string; // unique identifier for the agent's skill
      name: string; //human readable name of the skill
      // description of the skill - will be used by the client or a human
      // as a hint to understand what the skill does.
      description: string;
      // Set of tag words describing classes of capabilities for this specific
      // skill (e.g. "cooking", "customer support", "billing")
      tags: string[];
      // The set of example scenarios that the skill can perform.
      // Will be used by the client as a hint to understand how the skill can be
      // used. (e.g. "I need a recipe for bread")
      examples?: string[]; // example prompts for tasks
      // The set of interaction modes that the skill supports
      // (if different than the default)
      inputModes?: string[]; // supported mime types for input
      outputModes?: string[]; // supported mime types for output
    }
    

### Implementation

Let's create this Agent Skill in code. Open up `src/my-project/__init__.py` and replace the contents with the following code:

    import google_a2a
    from google_a2a.common.types import AgentSkill
    
    def main():
      skill = AgentSkill(
        id="my-project-echo-skill",
        name="Echo Tool",
        description="Echos the input given",
        tags=["echo", "repeater"],
        examples=["I will see this echoed back to me"],
        inputModes=["text"],
        outputModes=["text"],
      )
      print(skill)
    
    if __name__ == "__main__":
      main()
    

### Test Run

Let's give this a run:

    uv run my-project
    

The output should look something like this:

    id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
    

Agent Card
----------

Now that we have defined our skills, we can create an Agent Card.

Remote Agents are required to publish an Agent Card in JSON format describing the agent's capabilities and skills in addition to authentication mechanisms. In other words, this lets the world know about your agent and how to interact with it.

### Implementation

First lets add some helpers for parsing command line arguments. This will be helpful later for starting our server:

    uv add click
    

And update our code:

    import logging
    
    import click
    from dotenv import load_dotenv
    import google_a2a
    from google_a2a.common.types import AgentSkill, AgentCapabilities, AgentCard
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    @click.command()
    @click.option("--host", default="localhost")
    @click.option("--port", default=10002)
    def main(host, port):
      skill = AgentSkill(
        id="my-project-echo-skill",
        name="Echo Tool",
        description="Echos the input given",
        tags=["echo", "repeater"],
        examples=["I will see this echoed back to me"],
        inputModes=["text"],
        outputModes=["text"],
      )
      logging.info(skill)
    
    if __name__ == "__main__":
      main()
    

Next we'll add our Agent Card:

    # ...
    def main(host, port):
      # ...
      capabilities = AgentCapabilities()
      agent_card = AgentCard(
        name="Echo Agent",
        description="This agent echos the input given",
        url=f"http://{host}:{port}/",
        version="0.1.0",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        capabilities=capabilities,
        skills=[skill]
      )
      logging.info(agent_card)
    
    if __name__ == "__main__":
      main()
    

### Test Run

Let's give this a run:

    uv run my-project
    

The output should look something like this:

    INFO:root:name='Echo Agent' description='This agent echos the input given' url='http://localhost:10002/' provider=None version='0.1.0' documentationUrl=None capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False) authentication=None defaultInputModes=['text'] defaultOutputModes=['text'] skills=[AgentSkill(id='my-project-echo-skill', name='Echo Tool', description='Echos the input given', tags=['echo', 'repeater'], examples=['I will see this echoed back to me'], inputModes=['text'], outputModes=['text'])]
    

A2A Server
----------

We're almost ready to start our server! We'll be using the `A2AServer` class from `Google-A2A` which under the hood starts a [uvicorn](https://www.uvicorn.org/) server.

### Task Manager

Before we create our server, we need a task manager to handle incoming requests.

We'll be implementing the InMemoryTaskManager interface which requires us to implement two methods:

    async def on_send_task(
      self,
      request: SendTaskRequest
    ) -> SendTaskResponse:
      """
      This method queries or creates a task for the agent.
      The caller will receive exactly one response.
      """
      pass
    
    async def on_send_task_subscribe(
      self,
      request: SendTaskStreamingRequest
    ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
      """
      This method subscribes the caller to future updates regarding a task.
      The caller will receive a response and additionally receive subscription
      updates over a session established between the client and the server
      """
      pass
    

Open up `src/my_project/task_manager.py` and add the following code. We will simply return a direct echo response and immediately mark the task complete without any sessions or subscriptions:

    from typing import AsyncIterable
    
    import google_a2a
    from google_a2a.common.server.task_manager import InMemoryTaskManager
    from google_a2a.common.types import (
      Artifact,
      JSONRPCResponse,
      Message,
      SendTaskRequest,
      SendTaskResponse,
      SendTaskStreamingRequest,
      SendTaskStreamingResponse,
      Task,
      TaskState,
      TaskStatus,
      TaskStatusUpdateEvent,
    )
    
    class MyAgentTaskManager(InMemoryTaskManager):
      def __init__(self):
        super().__init__()
    
      async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        # Upsert a task stored by InMemoryTaskManager
        await self.upsert_task(request.params)
    
        task_id = request.params.id
        # Our custom logic that simply marks the task as complete
        # and returns the echo text
        received_text = request.params.message.parts[0].text
        task = await self._update_task(
          task_id=task_id,
          task_state=TaskState.COMPLETED,
          response_text=f"on_send_task received: {received_text}"
        )
    
        # Send the response
        return SendTaskResponse(id=request.id, result=task)
    
      async def on_send_task_subscribe(
        self,
        request: SendTaskStreamingRequest
      ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
        pass
    
      async def _update_task(
        self,
        task_id: str,
        task_state: TaskState,
        response_text: str,
      ) -> Task:
        task = self.tasks[task_id]
        agent_response_parts = [
          {
            "type": "text",
            "text": response_text,
          }
        ]
        task.status = TaskStatus(
          state=task_state,
          message=Message(
            role="agent",
            parts=agent_response_parts,
          )
        )
        task.artifacts = [
          Artifact(
            parts=agent_response_parts,
          )
        ]
        return task
    

### A2A Server

With a task manager complete, we can now create our server.

Open up `src/my_project/__init__.py` and add the following code:

    # ...
    from google_a2a.common.server import A2AServer
    from my_project.task_manager import MyAgentTaskManager
    # ...
    def main(host, port):
      # ...
    
      task_manager = MyAgentTaskManager()
      server = A2AServer(
        agent_card=agent_card,
        task_manager=task_manager,
        host=host,
        port=port,
      )
      server.start()
    

### Test Run

Let's give this a run:

    uv run my-project
    

The output should look something like this:

    INFO:     Started server process [20506]
    INFO:     Waiting for application startup.
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)
    

Congratulations! Your A2A server is now running!

Interacting With Your A2A Server
--------------------------------

First we'll use Google-A2A's command-line tool to send requests to our A2A server. After trying it out, we'll write our own basic client to see how this works under the hood.

### Using Google-A2A's command-line tool

With your A2A server already running from the previous run:

    # This should already be running in your terminal
    $ uv run my-project
    INFO:     Started server process [20538]
    INFO:     Waiting for application startup.
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)
    

Open up a new terminal in the same directory:

    source .venv/bin/activate
    uv run google-a2a-cli --agent http://localhost:10002
    

Note: This will only work if you've installed google-a2a from this [pull request](https://github.com/google/A2A/pull/169) as the cli was not exposed previously.

Otherwise you'll have to checkout the [Google/A2A repository](https://github.com/google/A2A/) directly, navigate to the `samples/python` repository and run the cli directly.

You can then send messages to your server by typing and pressing Enter:

    =========  starting a new task ========
    
    What do you want to send to the agent? (:q or quit to exit): Hello!
    

If everything is working correctly you'll see this in the response:

    "message":{"role":"agent","parts":[{"type":"text","text":"on_send_task received: Hello!"}]}
    

To exit type `:q` and press Enter.

Adding Agent Capabilities
-------------------------

Now that we have a basic A2A server running, let's add some more functionality. We'll explore how A2A can work asynchronously and stream responses.

### Streaming

This allows clients to subscribe to the server and receive multiple updates instead of a single response. This can be useful for long running agent tasks, or where multiple Artifacts may be streamed back to the client.

First we'll declare our agent as ready for streaming. Open up `src/my_project/__init__.py` and update AgentCapabilities:

    # ...
    def main(host, port):
      # ...
      capabilities = AgentCapabilities(
        streaming=True
      )
      # ...
    

Now in `src/my_project/task_manager.py` we'll have to implement `on_send_task_subscribe`:

    import asyncio
    # ...
    class MyAgentTaskManager(InMemoryTaskManager):
      # ...
      async def _stream_3_messages(self, request: SendTaskStreamingRequest):
        task_id = request.params.id
        received_text = request.params.message.parts[0].text
    
        text_messages = ["one", "two", "three"]
        for text in text_messages:
          parts = [
            {
              "type": "text",
              "text": f"{received_text}: {text}",
            }
          ]
          message = Message(role="agent", parts=parts)
          is_last = text == text_messages[-1]
          task_state = TaskState.COMPLETED if is_last else TaskState.WORKING
          task_status = TaskStatus(
            state=task_state,
            message=message
          )
          task_update_event = TaskStatusUpdateEvent(
            id=request.params.id,
            status=task_status,
            final=is_last,
          )
          await self.enqueue_events_for_sse(
            request.params.id,
            task_update_event
          )
    
      async def on_send_task_subscribe(
        self,
        request: SendTaskStreamingRequest
      ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
        # Upsert a task stored by InMemoryTaskManager
        await self.upsert_task(request.params)
    
        task_id = request.params.id
        # Create a queue of work to be done for this task
        sse_event_queue = await self.setup_sse_consumer(task_id=task_id)
    
        # Start the asynchronous work for this task
        asyncio.create_task(self._stream_3_messages(request))
    
        # Tell the client to expect future streaming responses
        return self.dequeue_events_for_sse(
          request_id=request.id,
          task_id=task_id,
          sse_event_queue=sse_event_queue,
        )
    

Restart your A2A server to pickup the new changes and then rerun the cli:

    $ uv run google-a2a-cli --agent http://localhost:10002
    =========  starting a new task ========
    
    What do you want to send to the agent? (:q or quit to exit): Streaming?
    
    "status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]}
    "status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]}
    "status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]}
    

Sometimes the agent might need additional input. For example, maybe the agent will ask the client if they'd like to keep repeating the 3 messages. In this case, the agent will respond with `TaskState.INPUT_REQUIRED` to which the client will then resend `send_task_streaming` with the same `task_id` and `session_id` but with an updated message providing the input required by the agent. On the server-side we'll update `on_send_task_subscribe` to handle this case:

    # ...
    
    class MyAgentTaskManager(InMemoryTaskManager):
      # ...
      async def _stream_3_messages(self, request: SendTaskStreamingRequest):
        # ...
        async for message in messages:
          # ...
          # is_last = message == messages[-1] # Delete this line
          task_state = TaskState.WORKING
          # ...
          task_update_event = TaskStatusUpdateEvent(
            id=request.params.id,
            status=task_status,
            final=False,
          )
          # ...
    
        ask_message = Message(
          role="agent",
          parts=[
            {
              "type": "text",
              "text": "Would you like more messages? (Y/N)"
            }
          ]
        )
        task_update_event = TaskStatusUpdateEvent(
          id=request.params.id,
          status=TaskStatus(
            state=TaskState.INPUT_REQUIRED,
            message=ask_message
          ),
          final=True,
        )
        await self.enqueue_events_for_sse(
          request.params.id,
          task_update_event
        )
      # ...
      async def on_send_task_subscribe(
        self,
        request: SendTaskStreamingRequest
      ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
        task_id = request.params.id
        is_new_task = task_id in self.tasks
        # Upsert a task stored by InMemoryTaskManager
        await self.upsert_task(request.params)
    
        received_text = request.params.message.parts[0].text
        sse_event_queue = await self.setup_sse_consumer(task_id=task_id)
        if not is_new_task and received_text == "N":
          task_update_event = TaskStatusUpdateEvent(
            id=request.params.id,
            status=TaskStatus(
              state=TaskState.COMPLETED,
              message=Message(
                role="agent",
                parts=[
                  {
                    "type": "text",
                    "text": "All done!"
                  }
                ]
              )
            ),
            final=True,
          )
          await self.enqueue_events_for_sse(
            request.params.id,
            task_update_event,
          )
        else:
          asyncio.create_task(self._stream_3_messages(request))
    
        return self.dequeue_events_for_sse(
          request_id=request.id,
          task_id=task_id,
          sse_event_queue=sse_event_queue,
        )
    

Now after restarting the server and running the cli, we can see the task will keep running until we tell the agent `N`:

    $ uv run google-a2a-cli --agent http://localhost:10002
    =========  starting a new task ========
    
    What do you want to send to the agent? (:q or quit to exit): Streaming?
    
    "status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]}
    "status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]}
    "status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]}
    "status":{"state":"input-required","message":{"role":"agent","parts":[{"type":"text","text":"Would you like more messages? (Y/N)"}]}
    
    What do you want to send to the agent? (:q or quit to exit): N
    
    "status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"All done!"}]}
    

Congratulations! You now have an agent that is able to asynchronously perform work and ask users for input when needed.

Using a Local Ollama Model
--------------------------

Now we get to the exciting part. We're going to add AI to our A2A server.

In this tutorial, we'll be setting up a local Ollama model and integrating it with our A2A server.

### Requirements

We'll be installing `ollama`, `langchain` as well as downloading an ollama model that supports MCP tools (for a future tutorial).

1.  Download [ollama](https://ollama.com/download)
    
2.  Run an ollama server:
    

    # Note: if ollama is already running, you may get an error such as
    # Error: listen tcp 127.0.0.1:11434: bind: address already in use
    # On linux you can run systemctl stop ollama to stop ollama
    ollama serve
    

1.  Download a model from [this list](https://ollama.com/search). We'll be using `qwq` as it supports `tools` (as shown by its tags) and runs on a 24GB graphics card:
    

1.  Install `langchain`:
    

    uv add langchain langchain-ollama langgraph
    

Now with ollama setup, we can start integrating it into our A2A server.

### Integrating Ollama into our A2A server

First open up `src/my_project/__init__.py`:

    # ...
    
    @click.command()
    @click.option("--host", default="localhost")
    @click.option("--port", default=10002)
    @click.option("--ollama-host", default="http://127.0.0.1:11434")
    @click.option("--ollama-model", default=None)
    def main(host, port, ollama_host, ollama_model):
      # ...
      capabilities = AgentCapabilities(
        streaming=False # We'll leave streaming capabilities as an exercise for the reader
      )
      # ...
      task_manager = MyAgentTaskManager(
        ollama_host=ollama_host,
        ollama_model=ollama_mode,
      )
      # ..
    

Now let's add AI functionality in `src/my_project/agent.py`:

    from langchain_ollama import ChatOllama
    from langgraph.prebuilt import create_react_agent
    from langgraph.graph.graph import CompiledGraph
    
    def create_ollama_agent(ollama_base_url: str, ollama_model: str):
      ollama_chat_llm = ChatOllama(
        base_url=ollama_base_url,
        model=ollama_model,
        temperature=0.2
      )
      agent = create_react_agent(ollama_chat_llm, tools=[])
      return agent
    
    async def run_ollama(ollama_agent: CompiledGraph, prompt: str):
      agent_response = await ollama_agent.ainvoke(
        {"messages": prompt }
      )
      message = agent_response["messages"][-1].content
      return str(message)
    

Finally let's call our ollama agent from `src/my_project/task_manager.py`:

    # ...
    from my_project.agent import create_ollama_agent, run_ollama
    
    class MyAgentTaskManager(InMemoryTaskManager):
      def __init__(
        self,
        ollama_host: str,
        ollama_model: typing.Union[None, str]
      ):
        super().__init__()
        if ollama_model is not None:
          self.ollama_agent = create_ollama_agent(
            ollama_base_url=ollama_host,
            ollama_model=ollama_model
          )
        else:
          self.ollama_agent = None
    
      async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        # ...
        received_text = request.params.message.parts[0].text
        response_text = f"on_send_task received: {received_text}"
        if self.ollama_agent is not None:
          response_text = await run_ollama(ollama_agent=self.ollama_agent, prompt=received_text)
    
        task = await self._update_task(
          task_id=task_id,
          task_state=TaskState.COMPLETED,
          response_text=response_text
        )
    
        # Send the response
        return SendTaskResponse(id=request.id, result=task)
    
      # ...
    

Let's test it out!

First rerun our A2A server replacing `qwq` with the ollama model you downloaded:

    uv run my-project --ollama-host http://127.0.0.1:11434 --ollama-model qwq
    

And then rerun the cli:

    uv run google-a2a-cli --agent http://localhost:10002
    

Note, if you're using a large model, it may take a while to load. The cli may timeout. In which case rerun the cli once the ollama server has finished loading the model.

You should see something like the following:

    =========  starting a new task ========
    
    What do you want to send to the agent? (:q or quit to exit): hey
    
    "message":{"role":"agent","parts":[{"type":"text","text":"<think>\nOkay, the user said \"hey\". That's pretty casual. I should respond in a friendly way. Maybe ask how I can help them today. Keep it open-ended so they feel comfortable sharing what they need. Let me make sure my tone is positive and approachable. Alright, something like, \"Hey there! How can I assist you today?\" Yeah, that sounds good.\n</think>\n\nHey there! How can I assist you today? 😊"}]}
    

Congratulations! You now have an A2A server generating responses using an AI model!

Next Steps
----------

Congratulations! You now have mastered the basics of running an A2A server with an AI model as the agent. Here's some ideas of where to go next:

*   Connect our AI model with MCP tools
    
    *   Hint: first [create a MCP Server](https://modelcontextprotocol.io/quickstart/server)
        
    *   Then: [Integrate MCP Tools](https://github.com/langchain-ai/langchain-mcp-adapters?tab=readme-ov-file#client) into our existing call to `create_react_agent(ollama_chat_llm, tools=[])`
        
*   Develop your own agent using Google's Agent Development Kit or other framework. Check out the [samples](https://github.com/google/A2A/tree/main/samples/python/agents)
    
*   📚 Read the A2A [technical documentation](https://google.github.io/A2A/#/documentation) to understand the capabilities
    
*   📝 Review the A2A json specification of the protocol structures
    
*   📑 Review key topics to understand protocol details
    
    *   [A2A and MCP](https://google.github.io/A2A/#/topics/a2a_and_mcp.md)
        
    *   [Agent Discovery](https://google.github.io/A2A/#/topics/agent_discovery.md)
        
    *   [Enterprise Ready](https://google.github.io/A2A/#/topics/enterprise_ready.md)
        
    *   [Push Notifications](https://google.github.io/A2A/#/topics/push_notifications.md)

---

*Originally published on [czmilo](https://paragraph.com/@zhangcheng/python-a2a-tutorial)*
