Databases TypeScript ★ 159

confluentinc/mcp-confluent

Confluent integration to interact with Confluent Kafka and Confluent Cloud REST APIs.

Claude Desktop config.json'a ekle

{
  "mcpServers": {
    "confluentinc-mcp-confluent": {
      "command": "node",
      "args": [
        "~/.mcp/mcp-confluent/index.js"
      ]
    }
  }
}

Confluent MCP Server

npm version License: MIT

An open-source MCP server that enables AI assistants to interact with Confluent Cloud, Confluent Platform, and standalone Apache Kafka deployments through natural language. It provides 50+ tools across Kafka, Flink SQL, Schema Registry, Connectors, Tableflow, and more — usable from any MCP-compatible client including Claude Desktop, Claude Code, Cursor, VS Code, Goose, and Gemini CLI.

Quick Start

Prerequisites: Node.js 22+. If you want to interact with Confluent Cloud, you need to create an account first.

  1. Generate a quick config.yaml file in your project root:
npx @confluentinc/mcp-confluent --init-config
  1. Edit the config.yaml file with your connection details, then:
npx @confluentinc/mcp-confluent --config ./config.yaml

See Getting Started for full setup instructions and Configuring MCP Clients for integration with your preferred AI tool.

Table of Contents

Available Tools

Tools are auto-enabled based on which service blocks are present in your resolved configuration; see CONFIGURATION.md for the full block-to-tool mapping.

You can list all available tools via the CLI:

npx -y @confluentinc/mcp-confluent --list-tools

Always-Available Tools

These tools need no service blocks or authentication — they’re enabled even on a bare config, regardless of which deployment the rest of your config targets.

CategoryToolsDescription
Documentationsearch-product-docs, get-product-doc-pageSearch Confluent product docs and fetch full page content
Diagnosticsexplain-disabled-toolsExplain why specific tools are absent from tools/list

Available Tools for Confluent Cloud

These tools require endpoints and authentication against specific Confluent Cloud components. Refer to config.example.yaml for the full set of configuration variables. Categories marked with ¹ also work with OAuth authentication — sign in via your browser instead of provisioning API keys.

CategoryToolsDescription
Kafka ¹list-topics, create-topics, delete-topics, produce-message, consume-messages, list-consumer-groups, describe-consumer-group, get-consumer-group-lag, alter-topic-config, get-topic-configManage topics, produce/consume messages, inspect consumer groups, configure topic settings
Flink SQLcreate-flink-statement, list-flink-statements, read-flink-statement, delete-flink-statements, get-flink-statement-exceptionsCreate and manage Flink SQL statements
Flink Cataloglist-flink-catalogs, list-flink-databases, list-flink-tables, describe-flink-table, get-flink-table-infoExplore Flink catalogs, databases, and table schemas
Flink Diagnosticscheck-flink-statement-health, detect-flink-statement-issues, get-flink-statement-profileHealth checks, issue detection, and query profiling
Connectorslist-connectors, get-connector-config, get-connector-offsets, get-connector-status, get-connector-tasks, get-connector-error-summary, get-connector-error-recommendations, get-connector-logs, create-connector, delete-connector, pause-connector, resume-connector, restart-connector, update-connector-configInspect and manage Kafka Connect connectors
Schema Registry ¹list-schemas, delete-schemaList, inspect, and delete data schemas
Catalog & Tagssearch-topics-by-tag, search-topics-by-name, create-topic-tags, delete-tag, remove-tag-from-entity, add-tags-to-topic, list-tagsOrganize and search topics using tags
Organizations, Environments & Clusters ¹list-organizations, list-environments, read-environment, list-clustersDiscover Confluent Cloud resources
Tableflowcreate-tableflow-topic, list-tableflow-topics, read-tableflow-topic, update-tableflow-topic, delete-tableflow-topic, list-tableflow-regionsManage Tableflow-enabled topics
Tableflow Catalogcreate-tableflow-catalog-integration, list-tableflow-catalog-integrations, read-tableflow-catalog-integration, update-tableflow-catalog-integration, delete-tableflow-catalog-integrationManage Tableflow catalog integrations (e.g., AWS Glue)
Metricslist-available-metrics, query-metricsDiscover and query Confluent Cloud operational metrics
Billing ¹list-billing-costsQuery billing and cost data

¹ Also available under OAuth — see OAuth Authentication for Confluent Cloud for setup and caveats. Categories not marked currently require a direct connection with static API keys; OAuth migration is in progress.

Available Tools for local deployments

These tools only require Kafka or Schema Registry endpoints - no Confluent Cloud API key/secret is needed. Ideal for local development with self-managed clusters, including Confluent Platform.

# minimal config.yaml for local development
connections:
  local:
    type: direct
    kafka:
      bootstrap_servers: "localhost:9092"
    schema_registry:
      endpoint: "http://localhost:8081"

Ready-to-use variants live in sample_configs/.

CategoryToolsDescription
Kafkalist-topics, create-topics, delete-topics, produce-message, consume-messages, list-consumer-groups, describe-consumer-group, get-consumer-group-lagManage topics, produce/consume messages, inspect consumer groups
Schema Registrylist-schemas, delete-schemaList, inspect, and delete data schemas

Using with Confluent Platform

mcp-confluent runs against a self-managed Confluent Platform (CP) cluster the same way it runs against any local Kafka + Schema Registry deployment: point a direct connection at your brokers and Schema Registry. A CP connection exposes the same tools as any other local deployment — see Available Tools for local deployments. The Confluent Cloud tools (Flink, Tableflow, Billing, Metrics, and the rest) require a Confluent Cloud account and stay disabled on CP. The only differences from a localhost:9092 setup are authentication and TLS.

Sample YAML config

sample_configs/confluent-platform.yaml is a copy-pasteable starter. It assumes PLAIN over SASL_SSL for Kafka and HTTP Basic Auth for Schema Registry. Customize the broker and Schema Registry URLs, and inject credentials via the ${KAFKA_API_KEY} / ${KAFKA_API_SECRET} / ${SCHEMA_REGISTRY_API_KEY} / ${SCHEMA_REGISTRY_API_SECRET} environment variables. If your cluster uses SCRAM or another SASL mechanism, override security.protocol and sasl.mechanisms through the kafka.extra_properties map in that file.

TLS trust (internal CAs)

CP clusters frequently sit behind an internal CA. If you see TLS handshake failures against the broker or Schema Registry, point Node at your CA bundle when starting the server:

NODE_EXTRA_CA_CERTS=/path/to/internal-ca.pem npm run start -- --config path/to/config.yaml

End-to-end smoke test

A docker-compose stack (docker-compose.cp-test.yml) brings up a local CP Kafka (KRaft, SASL_PLAINTEXT/PLAIN) plus an unauthenticated Schema Registry. The matching integration tests are tagged @cp and live next to their handlers as *.cp.integration.test.ts:

docker compose -f docker-compose.cp-test.yml up -d
# Wait ~30s for Kafka + SR to become ready, then:
CP_KAFKA_USERNAME=mcp CP_KAFKA_PASSWORD=mcp-secret \
  npm run test:integration -- --tags-filter=@cp
docker compose -f docker-compose.cp-test.yml down -v

The tests skip cleanly when those env vars are unset, so npm run test:unit and a default npm run test:integration against your real Confluent Cloud account are unaffected if you don’t have the docker stack running.

Getting Started

Prerequisites

  • Node.js 22 or later — we recommend using NVM to manage versions:
    nvm install 22
    nvm use 22
  • A local environment with Kafka or Schema Registry running, or a Confluent Cloud account with appropriate API keys or login credentials if using OAuth to authenticate.

General Setup Steps

This MCP server is designed to be used with various MCP clients, such as Claude Desktop, Copilot, or Goose CLI/Desktop. The specific configuration and interaction will depend on the client you are using.

The MCP server can authenticate to Confluent Cloud via OAuth (PKCE) in addition to static API keys defined in the YAML config. See OAuth Authentication For Confluent Cloud for more details.

The general steps to configure (if not using OAuth) and run this MCP are:

  1. Create a configuration file: Copy the provided config.yaml example file to the root of your project. You can use the CLI to bootstrap one in your current directory — no git checkout required:
npx @confluentinc/mcp-confluent --init-config
  1. Populate the file: Fill in the necessary values for your Confluent Cloud environment. See CONFIGURATION.md for the full reference; only fill in the service blocks you need (each one enables a group of tools).

  2. Start the Server: You can run the MCP server in one of two ways:

    • From source: Follow the instructions in the Contributing Guide to build and run the server from source. This typically involves:

      • Installing dependencies (npm install)
      • Building the project (npm run build or npm run dev)
    • With npx: You can start the server directly using npx, no build required:

      npx @confluentinc/mcp-confluent --config /path/to/myconfig.yaml
  3. Configure your MCP Client: Each client (e.g., Claude, Goose) will have its own way of specifying the MCP server’s address and any required credentials. You’ll need to configure your client to connect to the address where this server is running (likely localhost with a specific port). The port the server runs on is set via server.http.port in config.yaml.

  4. Start your MCP Client: Once your client is configured to connect to the MCP server, you can start your MCP client and on startup it will stand up an instance of this MCP server locally. This instance will be responsible for managing data schemas and interacting with resources on your behalf.

  5. Interact with your resources through the Client: Once the client is connected and configured, you can use the client’s interface to interact with Confluent Cloud or local resources. The client will send requests to this MCP server, which will then interact with the available connections on your behalf.

Configuration

The full configuration reference — YAML schema, every service block, env-var interpolation, OAuth and HTTP/SSE auth setup, the (deprecated) legacy env-var table, and tool-to-block mapping — lives in CONFIGURATION.md.

Compatibility note. This release ships full parity between YAML (-c config.yaml) and the legacy env-var path (-e config.env) for a single connection. The env-var-only path will emit a startup warning in a near-future release and be removed a release or two later. Multi-connection support (next release) will be YAML-only. See CONFIGURATION.md → Two paths, one configuration.

Prerequisites & setup for Tableflow commands

Tableflow tools interact with cloud storage (e.g. AWS S3) and a metadata catalog (e.g. AWS Glue) on your behalf via the Flink runtime in Confluent Cloud. The Flink runtime needs IAM permissions on your cloud account, and those have to be granted and linked into Confluent Cloud before any Tableflow tool will succeed.

Follow the Tableflow quick start with custom storage & Glue to set up the roles, policies, and provider integrations. Skipping this step leads to authorization errors when mcp-confluent tries to provision or manage Tableflow-enabled tables.

OAuth Authentication for Confluent Cloud

The MCP server can authenticate to Confluent Cloud via OAuth (PKCE) instead of static API keys. On the first tool call that needs Confluent access, the server opens your browser to the Confluent Cloud sign-in page; subsequent tool calls reuse the resulting session. No API keys to provision.

Setup

npx @confluentinc/mcp-confluent --init-oauth-config
# edit ./config.yaml if needed, then:
npx @confluentinc/mcp-confluent --config ./config.yaml

--init-oauth-config drops a starter config.oauth.example.yaml into ./config.yaml. The whole file is essentially:

connections:
  ccloud-oauth:
    type: oauth

See CONFIGURATION.md → Authentication modes for the full schema and ergonomics.

The ¹-marked categories in Available Tools for Confluent Cloud work under OAuth today; everything else still needs a direct connection with static API keys.

CLI Usage

The MCP server provides a flexible command line interface (CLI) for advanced control. The CLI lets you pick the config file, transports, and fine-tune which tools are enabled or blocked.

Basic Usage

You can view all CLI options and help with:

npx @confluentinc/mcp-confluent --help
Show output
Usage: mcp-confluent [options]

Confluent MCP Server - Model Context Protocol implementation for Confluent Cloud

Options:
  -V, --version                    output the version number
  -e, --env-file <path>            Load environment variables from file
  -k, --kafka-config-file <file>   Path to a properties file for configuring kafka clients
  -t, --transport <types>          Transport types (comma-separated list) (choices: "http", "sse", "stdio", default: "stdio")
  --allow-tools <tools>            Comma-separated list of tool names to allow. If provided, takes precedence over --allow-tools-file. Allow-list is applied before block-list.
  --block-tools <tools>            Comma-separated list of tool names to block. If provided, takes precedence over --block-tools-file. Block-list is applied after allow-list.
  --allow-tools-file <file>        File with tool names to allow (one per line). Used only if --allow-tools is not provided. Allow-list is applied before block-list.
  --block-tools-file <file>        File with tool names to block (one per line). Used only if --block-tools is not provided. Block-list is applied after allow-list.
  --list-tools                     Print the final set of enabled tool names (with descriptions) after allow/block filtering and exit. Does not start the server.
  --disable-auth                   Disable authentication for HTTP/SSE transports. WARNING: Only use in development environments.
  --allowed-hosts <hosts>          Comma-separated list of allowed Host header values for DNS rebinding protection.
  --generate-key                   Generate a secure API key for MCP_API_KEY and print it to stdout, then exit.
  -h, --help                       display help for command

Example: Deploy using all transports

npx @confluentinc/mcp-confluent -c config.yaml --transport http,sse,stdio
Show output
...
{"level":"info","time":"2025-05-14T17:03:02.883Z","pid":47959,"hostname":"G9PW1FJH64","name":"mcp-confluent","msg":"Starting transports: http, sse, stdio"}
{"level":"info","time":"2025-05-14T17:03:02.971Z","pid":47959,"hostname":"G9PW1FJH64","name":"mcp-confluent","msg":"HTTP transport routes registered"}
{"level":"info","time":"2025-05-14T17:03:02.972Z","pid":47959,"hostname":"G9PW1FJH64","name":"mcp-confluent","msg":"SSE transport routes registered"}
{"level":"info","time":"2025-05-14T17:03:02.972Z","pid":47959,"hostname":"G9PW1FJH64","name":"mcp-confluent","msg":"STDIO transport connected"}
{"level":"info","time":"2025-05-14T17:03:03.012Z","pid":47959,"hostname":"G9PW1FJH64","name":"mcp-confluent","msg":"Server listening at http://[::1]:3000"}
{"level":"info","time":"2025-05-14T17:03:03.013Z","pid":47959,"hostname":"G9PW1FJH64","name":"mcp-confluent","msg":"Server listening at http://127.0.0.1:3000"}
{"level":"info","time":"2025-05-14T17:03:03.013Z","pid":47959,"hostname":"G9PW1FJH64","name":"mcp-confluent","msg":"All transports started successfully"}

Example: Allow Only Specific Tools

npx @confluentinc/mcp-confluent -c config.yaml --allow-tools produce-message,consume-messages

Only the specified tools will be enabled; all others will be disabled.

Example: Block Certain Tools

npx @confluentinc/mcp-confluent -c config.yaml --block-tools produce-message,consume-messages

All tools except the specified ones will be enabled.

Example: Use Tool Lists from Files

You can also maintain allow/block lists in files (one tool name per line):

npx -y @confluentinc/mcp-confluent -c config.yaml --allow-tools-file allow.txt --block-tools-file block.txt

Example: List All Available Tools

npx -y @confluentinc/mcp-confluent --list-tools
Show output
billing:
  list-billing-costs: Retrieve billing cost data for a Confluent Cloud organization within a specified date range with pagination support

catalog:
  add-tags-to-topic: Assign existing tags to Kafka topics in Confluent Cloud.
  create-topic-tags: Create new tag definitions in Confluent Cloud.
  delete-tag: Delete a tag definition from Confluent Cloud.
  list-tags: Retrieve all tags with definitions from Confluent Cloud Schema Registry.
  remove-tag-from-entity: Remove tag from an entity in Confluent Cloud.
  search-topics-by-name: List all topics in the Kafka cluster matching the specified name.
  search-topics-by-tag: List all topics in the Kafka cluster with the specified tag.

confluent-cloud:
  list-clusters: Get all clusters in the Confluent Cloud environment
  list-environments: Get all environments in Confluent Cloud with pagination support
  list-organizations: List Confluent Cloud organizations the current credentials can see. Paginated; if the response includes a nextPageTok...
  read-environment: Get details of a specific environment by ID

connect:
  create-connector: Create a new connector. Returns the new connector information if successful.
  delete-connector: Delete an existing connector. Returns success message if deletion was successful.
  get-connector-config: Retrieve the full configuration map for a connector. Returns the flat config object the connector was created/updated...
  get-connector-error-recommendations: Get suggested remediation steps for a connector that has failed or is in an error state. Returns a one-liner when no recommendations are available.
  get-connector-error-summary: Summarize a connector's current errors. Projects Confluent Cloud's /status diagnostics into a compact, agent-friendly form. Returns a one-liner when the connector is healthy.
  get-connector-logs: Retrieve recent log entries for a Confluent Cloud connector from the Cloud logging API. Defaults to the last hour of ERROR-level entries. Paginated via nextPageToken.
  get-connector-offsets: Retrieve current offsets for a connector's tasks. Useful for detecting lag, stalled tasks, or assisting recovery.
  get-connector-status: Get the current state of a connector and its tasks (RUNNING, FAILED, PAUSED, UNASSIGNED) including failure traces if ...
  get-connector-tasks: List the tasks of a connector along with their configurations.
  list-connectors: Retrieve a list of "names" of the active connectors. You can then make a read request for a specific connector by name.
  pause-connector: Pause a running connector and its tasks. Idempotent.
  restart-connector: Restart a connector and its tasks. Asynchronous; the connector will not transition state synchronously.
  resume-connector: Resume a paused connector and its tasks. Idempotent.
  update-connector-config: Update the configuration of an existing connector. Full-replace: omitted keys are removed and the connector is reconf...

docs:
  get-product-doc-page: Fetch the full markdown content of a Confluent product documentation page. Accepts URLs under https://docs.confluent....
  search-product-docs: Search Confluent product documentation (docs.confluent.io, developer.confluent.io, support.confluent.io) by keyword.

flink:
  check-flink-statement-health: Perform an aggregate health check for a Flink SQL statement. Returns status (healthy/warning/critical), current phase...
  create-flink-statement: Make a request to create a statement.
  delete-flink-statements: Make a request to delete a statement.
  describe-flink-table: Get full schema details for a Flink table via INFORMATION_SCHEMA.COLUMNS. Returns column names, data types (including...
  detect-flink-statement-issues: Detect issues for a Flink SQL statement by analyzing status, exceptions, and performance metrics. Identifies problems...
  get-flink-statement-exceptions: Retrieve the 10 most recent exceptions for a Flink SQL statement. Useful for diagnosing failed or failing statements.
  get-flink-statement-profile: Get Query Profiler data for a Flink SQL statement. Returns the task graph with human-readable task/operator names, pe...
  get-flink-table-info: Get table metadata via INFORMATION_SCHEMA.TABLES. Returns watermark configuration, distribution info, and table type.
  list-flink-catalogs: List all catalogs available in the Flink environment via INFORMATION_SCHEMA.CATALOGS.
  list-flink-databases: List all databases (schemas) in a Flink catalog via INFORMATION_SCHEMA.SCHEMATA. Returns catalog and database names.
  list-flink-statements: Retrieve a sorted, filtered, paginated list of all statements.
  list-flink-tables: List all tables in a Flink database via INFORMATION_SCHEMA.TABLES. Returns table names and types.
  read-flink-statement: Make a request to read a statement and its results

kafka:
  alter-topic-config: Alter topic configuration in Confluent Cloud.
  consume-messages: Consume messages from Kafka topics. Optionally restrict to a partition, start from an offset, timestamp, earliest, la...
  create-topics: Create one or more Kafka topics with an optional partition count.
  delete-topics: Delete the topic with the given names.
  describe-consumer-group: Describe a single consumer group on a Kafka cluster. Returns the group's state, type, protocol, partition assignor, c...
  get-consumer-group-lag: Compute live offset lag for a single Kafka consumer group. Returns per-(topic, partition) {committedOffset, highWater...
  get-partition-offsets: Return per-partition low/high watermarks and message counts for a Kafka topic. Use this to size a backfill, measure l...
  get-topic-config: Retrieve configuration details for a specific Kafka topic.
  list-consumer-groups: List consumer groups on a Kafka cluster — wraps the broker's listGroups admin call. Optional filters narrow the resul...
  list-topics: List all topics in the Kafka cluster.
  produce-message: Produce records to a Kafka topic. Supports Confluent Schema Registry serialization (AVRO, JSON, PROTOBUF) for both ke...

mcp-server-diagnostics:
  explain-disabled-tools: Call when the user asks why a tool is missing or unavailable (e.g., "why can't I list Kafka topics?", "where are the ...

metrics:
  list-available-metrics: List available Confluent Cloud metrics and their filter fields from the Telemetry API. Use this tool BEFORE query-met...
  query-metrics: Query Confluent Cloud metrics from the Telemetry API. IMPORTANT: Use the list-available-metrics tool first to discove...

schema-registry:
  delete-schema: Delete a schema subject or a specific version from the Schema Registry. If version is omitted, all versions of the su...
  list-schemas: List all schemas in the Schema Registry.

tableflow:
  create-tableflow-catalog-integration: Make a request to create a catalog integration.
  create-tableflow-topic: Make a request to create a tableflow topic.
  delete-tableflow-catalog-integration: Make a request to delete a tableflow catalog integration.
  delete-tableflow-topic: Make a request to delete a tableflow topic.
  list-tableflow-catalog-integrations: Retrieve a sorted, filtered, paginated list of all catalog integrations.
  list-tableflow-regions: Retrieve a sorted, filtered, paginated list of all tableflow regions.
  list-tableflow-topics: Retrieve a sorted, filtered, paginated list of all tableflow topics.
  read-tableflow-catalog-integration: Make a request to read a catalog integration.
  read-tableflow-topic: Make a request to read a tableflow topic.
  update-tableflow-catalog-integration: Make a request to update a catalog integration.
  update-tableflow-topic: Make a request to update a tableflow topic.

Tip: The allow-list is applied before the block-list. If neither is provided, all tools are enabled by default.

Configuring MCP Clients

Please refer to the following guides for step-by-step instructions on setting up and using this MCP server with your preferred client:

Telemetry

This MCP server collects usage data to help make improvements. You can opt out by setting DO_NOT_TRACK=true in your environment. See telemetry.md for full details on what is collected.

Troubleshooting

“Node.js version not supported” — This project requires Node.js 22 or later. Check your version with node -v and upgrade if needed.

Tools not appearing — Each tool requires specific service blocks in your config.yaml. Run --list-tools to see which tools are active, or invoke the explain-disabled-tools MCP tool from your client for a per-tool reason. The block-to-tool mapping lives in CONFIGURATION.md.

Authentication errors on HTTP/SSE — Generate an API key with npx @confluentinc/mcp-confluent --generate-key and add it to your config.yaml under server.auth.api_key. See CONFIGURATION.md → HTTP/SSE transport security.

Connection refused / port conflicts — The default HTTP port is 8080. Set server.http.port in your config.yaml to change it.

Tableflow authorization errors — Tableflow tools require specific IAM permissions in your cloud environment. See Prerequisites & setup for Tableflow commands.

Contributing

Bug reports and feedback is appreciated in the form of Github Issues. For guidelines on contributing please see CONTRIBUTING.md

Pre-release testing

To run the MCP server against a pre-release version for beta testing or early feedback, download the release tarball file to a local directory. Then, when running any of the npx commands above, replace @confluentinc/mcp-confluent with the path to that tarball, e.g. npx @~path/to/my/tarball --list-tools

Benzer MCP sunucuları

Daha fazla: Databases →