KAMF : Kafka-A2A-MCP-Flink

|

AI Silo

AI Silo 문제는 Data Silo에서 파생된 개념으로 AI 모델, 데이터, 기술, 인력, 인프라 등이 부서별로 분리·고립되어 서로 연계되지 못하여 AI를 효율적으로 활용하지 못하는 현상이다. 특히 기업에서 사용되는 ai agent들이 각각 독립적으로 운영되어 서로 통신되지 못하여 ai를 도입한 효과를 보지 못하거나 사내 데이터의 중복과 일관성 저하 문제가 발생하고 있다. 위와 같은 문제를 완화하기 위해 KAMF stack이 주목 받고 있다.

KAMF stack

KAMF stack은 Apache Kafka, MCP(Model Context Protocol), A2A(Agent2Agent), Apache Flink로 구성된다. MCP로 각각의 Agent를 만들고 A2A로 Agent들끼리 소통하게 한다. 이때 A2A만으로도 기본적인 통신과 작업은 가능하지만 대규모 multi agent 구축에는 한계가 있기 때문에 소통 단계에서 Kafka를 통해 대용량 데이터 스트림을 성능 저하 없이 처리한다. 그리고 Flink는 Kafka에 쌓엔 이벤트 스트림을 받아 실시간 처리를 지원한다.

MCP(Model Context Protocol)

MCP는 Tool 접근을 위한 프로토콜이다. LLM이 다양한 tool들과 상호작용하는 방식을 표준화하여 정의하여 agent가 자체적으로 tool들을 호출하여 사용할 수 있게 돕는다.

MCP는 server-client-host로 구성된다.

illustrated by author


  • Server
    • 각각 고유의 기능을 가진 tool들 (외부 도구, 데이터 소스, 각종 기능 제공)
  • Client
    • MCP server들과 연결된 interface.
    • 로컬 프로세스의 경우 표준 입출력(stdio)을 통해 이루어지고, 네트워크 서비스의 경우 HTTP와 SSE(Server-Sent Events)를 통해 이루어진다.
    • stdio 예시:
      client = MultiServerMCPClient(
          {
              "gold_price": {
                  "command": "./.venv/bin/python",
                  "args": ["./mcpserver-goldprice.py"],
                  "transport": "stdio",
              },
              "ledger": {
                  "command": "./.venv/bin/python",
                  "args": ["./mcpserver-ledger.py"],
                  "transport": "stdio",
              },
              "profit_clac": {
                  "command": "./.venv/bin/python",
                  "args": ["./mcpserver-profitcalc.py"],
                  "transport": "stdio",
              }
          }
      )
      
  • Host
    • 사용자와 상호작용하는 app (ex. Claude Desktop)
    • client 생성 및 실행

A2A(Agent2Agent)

A2A 서로 다른 프레임워크로 구축되고 별도 서버에서 실행되는 agent들이 효과적으로 소통하고 협업할 수 있도록 한다.

A2A를 알기 위해서는 아래 요소들에 대한 개념을 파악해야 한다.

  • Agent Card
    • 각 agent들은 ‘Agent Card’로 각각이 어떤 기능을 수행하는 agent인지 드러낸다.
    • Agent Card는 기본적으로 /.well-known/agent.json 경로에 있다.
    • Agent Card에는 다음과 같은 정보가 포함되어야 한다.
      • name: 에이전트 이름
      • description: 에이전트 설명
      • version: 에이전트 버전
      • url: A2A 서비스 엔드포인트
      • capabilities: 스트리밍, 푸시 알림 등 지원 기능
      • defaultInputModes/defaultOutputModes: 기본 입력/출력 타입(예: text, json 등)
      • skills: 수행 가능한 구체적 작업 목록(각각 id, name, description, inputModes, outputModes, examples 등 포함)
      • supportsAuthenticatedExtendedCard: 인증 확장 카드 지원 여부(선택)
      • securitySchemes: 인증 방식(예: OAuth2, Bearer 등, 선택)
    • {
        "name": "WebSearchAgent",
        "description": "An agent that performs web searches and extracts information.",
        "version": "1.0.0",
        "url": "http://localhost:8000",
        "capabilities": {
          "streaming": true,
          "pushNotifications": false
        },
        "defaultInputModes": ["text"],
        "defaultOutputModes": ["text"],
        "skills": [
          {
            "id": "browser",
            "name": "browser automation",
            "description": "Performs web searches to retrieve information.",
            "inputModes": ["text"],
            "outputModes": ["text"],
            "examples": [
              "Find the latest news about AI",
              "Search for restaurants in Seoul"
            ]
          }
        ],
        "supportsAuthenticatedExtendedCard": true,
        "securitySchemes": [
          {
            "type": "oauth2",
            "description": "OAuth 2.0 authentication required for extended capabilities."
          }
        ]
      }
      
  • Client Agent
    • 사용자의 요청을 받아 agent들에게 task를 할당하는 총괄 agent
    • client agent는 사용자의 요청을 받은 후 agent card를 확인하여 요청에 맞는 agent를 선정해서 해당 agent들에 task를 보낸다.
  • Server/Remote Agent
    • 특정 기능을 수행하는 agent
    • 특정 작업을 수행하고 결과를 반환한다.
  • Task
    • client가 agent들에게 작업을 요청하는 단위
    • 예시
      {
        "id": "task-12345-abc",
        "type": "schedule_replacement", 
        "status": "submitted",
        "created": "2025-07-08T10:30:00Z",
        "priority": "normal",
        "requester": "client-agent-001",
        "assignee": "it-asset-manager",
        "input": {
          "type": "text",
          "content": "노트북 교체를 예약해줘"
        },
        "context": {
          "user_id": "john.smith",
          "department": "engineering",
          "urgency": "medium"
        }
      }
      
    • task는 life cycle을 가진다. (submitted(제출됨) → working(작업 중) → input-required(입력 필요) → completed(완료)/failed(실패)/cancelled(취소))
    • life cycle 종류
      • Submitted (제출됨): 작업이 원격 에이전트에게 전송됨
      • Acknowledged (확인됨): 원격 에이전트가 작업 수신을 확인
      • Working (작업 중): 에이전트가 실제 작업을 수행 중
      • Input-Required (입력 필요): 추가 정보나 승인이 필요한 상태
      • Streaming (스트리밍): 실시간 진행 상황 업데이트 중
      • Completed (완료): 작업이 성공적으로 완료됨
      • Failed (실패): 작업 수행 중 오류 발생
      • Cancelled (취소): 사용자 또는 시스템에 의해 작업 취소
  • Artifact
    • 작업의 최종 결과물을 담는 객체
  • Status
    • 작업 진행률, 오류 등을 조회하는 endpoint

A2A flow

illustrated by author


  1. <Client Agent> 사용자 input 수신
  2. <Client Agent> Agent Card 불러옴
  3. <Client Agent> Agent Card를 기반으로 사용자 요청에 어떤 agent가 적합한지 점수를 매겨 호출할 agent를 선정함 복잡한 요청의 경우 여러 agent를 호출한다.
      "노트북 교체 예약" 요청 →
      ├── IT Asset Manager: 하드웨어 재고 확인
      ├── Calendar Agent: 가능한 시간대 확인  
      ├── User Directory Agent: 사용자 정보 조회
      └── Notification Agent: 확인 메시지 발송
    
  4. <Client Agent> Task 작성해서 remote agent에 전송
  5. <Server/Remote Agent> 요청을 받아 작업 수행 시작
  6. <All Agents>Task의 life cycle동안 agent들은 sse 혹은 websocket을 사용하여 Status, Artifact, Error 등을 교환 (이 과정에서 실시간 피드백과 사용자 알림이 가능하다.)
     // 클라이언트 에이전트가 받는 실시간 업데이트
     {
       "taskId": "task-12345-abc",
       "status": "working", 
       "progress": 25,
       "message": "IT 자산 데이터베이스에서 가용 노트북 확인 중...",
       "timestamp": "2025-07-08T10:32:15Z"
     }
        
     {
       "taskId": "task-12345-abc", 
       "status": "input-required",
       "message": "선호하는 노트북 모델을 선택해주세요",
       "options": ["MacBook Pro 16\"", "ThinkPad X1", "Dell XPS 15"],
       "timestamp": "2025-07-08T10:33:42Z"
     }
    
  7. <Client Agent>가 상태가 completed이 되고 final:true 가 찍힌 이벤트를 받으면 모든 Artifact를 합쳐 사용자에게 반환하거나 다음 Agent에 전달
  8. Agent Card와 Task 메타데이터를 캐시해 다음 호출 속도를 높임

Kafka는 메시지 버스 역할을 하는데, 요청이 급격히 폭주했을 때 Kafka가 흡수한 후 flink가 적절히 메세지를 읽어 와 agent에 효율적으로 메시지를 분배한다.

illustrated by author


Reference

https://github.com/never2average/a2a-mcp-server
https://github.com/GongRzhe/A2A-MCP-Server
https://becomingahacker.org/comparing-mcp-a2a-and-agntcy-in-the-ai-agent-ecosystem-f3234b85c475
https://seanfalconer.medium.com/why-googles-agent2agent-protocol-needs-apache-kafka-507b1ec456a6
https://seanfalconer.medium.com/the-ai-silo-problem-how-data-streaming-can-unify-enterprise-ai-agents-0a138cf6398c
https://seanfalconer.medium.com/kafka-a2a-mcp-and-flink-the-new-stack-for-ai-agents-4b6cb8b85b72
https://pub.towardsai.net/a2a-mcp-langchain-powerful-agent-communication-8bb692ed51d3
https://towardsdatascience.com/inside-googles-agent2agent-a2a-protocol-teaching-ai-agents-to-talk-to-each-other/
https://huggingface.co/blog/lynn-mikami/agent2agent
https://a2aprotocol.ai/blog/python-a2a-tutorial-with-source-code