Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th

## Features

- Get a list of all the buckets in the cluster
- Get a list of all the scopes and collections in the specified bucket
- Get a list of all the scopes in the specified bucket
- Get a list of all the collections in a specified scope and bucket
- Get the structure for a collection
- Get a document by ID from a specified scope and collection
- Upsert a document by ID to a specified scope and collection
Expand Down Expand Up @@ -48,8 +51,7 @@ We publish a pre built [PyPI package](https://pypi.org/project/couchbase-mcp-ser
"env": {
"CB_CONNECTION_STRING": "couchbases://connection-string",
"CB_USERNAME": "username",
"CB_PASSWORD": "password",
"CB_BUCKET_NAME": "bucket_name"
"CB_PASSWORD": "password"
}
}
}
Expand Down Expand Up @@ -86,8 +88,7 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur
"env": {
"CB_CONNECTION_STRING": "couchbases://connection-string",
"CB_USERNAME": "username",
"CB_PASSWORD": "password",
"CB_BUCKET_NAME": "bucket_name"
"CB_PASSWORD": "password"
}
}
}
Expand All @@ -105,9 +106,8 @@ The server can be configured using environment variables or command line argumen
| Environment Variable | CLI Argument | Description | Default |
| ----------------------------- | ------------------------ | ------------------------------------------ | ------------ |
| `CB_CONNECTION_STRING` | `--connection-string` | Connection string to the Couchbase cluster | **Required** |
| `CB_USERNAME` | `--username` | Username with bucket access | **Required** |
| `CB_USERNAME` | `--username` | Username with access to required buckets | **Required** |
| `CB_PASSWORD` | `--password` | Password for authentication | **Required** |
| `CB_BUCKET_NAME` | `--bucket-name` | Name of the bucket to access | **Required** |
| `CB_MCP_READ_ONLY_QUERY_MODE` | `--read-only-query-mode` | Prevent data modification queries | `true` |
| `CB_MCP_TRANSPORT` | `--transport` | Transport mode: `stdio`, `http`, `sse` | `stdio` |
| `CB_MCP_HOST` | `--host` | Host for HTTP/SSE transport modes | `127.0.0.1` |
Expand Down Expand Up @@ -210,7 +210,6 @@ uvx couchbase-mcp-server \
--connection-string='<couchbase_connection_string>' \
--username='<database_username>' \
--password='<database_password>' \
--bucket-name='<couchbase_bucket_to_use>' \
--read-only-query-mode=true \
--transport=http
```
Expand Down Expand Up @@ -244,7 +243,6 @@ uvx couchbase-mcp-server \
--connection-string='<couchbase_connection_string>' \
--username='<database_username>' \
--password='<database_password>' \
--bucket-name='<couchbase_bucket_to_use>' \
--read-only-query-mode=true \
--transport=sse
```
Expand Down Expand Up @@ -321,7 +319,6 @@ docker run --rm -i \
-e CB_CONNECTION_STRING='<couchbase_connection_string>' \
-e CB_USERNAME='<database_user>' \
-e CB_PASSWORD='<database_password>' \
-e CB_BUCKET_NAME='<bucket_name>' \
-e CB_MCP_TRANSPORT='<http|sse|stdio>' \
-e CB_MCP_READ_ONLY_QUERY_MODE='<true|false>' \
-e CB_MCP_PORT=9001 \
Expand Down Expand Up @@ -350,8 +347,6 @@ The Docker image can be used in `stdio` transport mode with the following config
"CB_USERNAME=<database_user>",
"-e",
"CB_PASSWORD=<database_password>",
"-e",
"CB_BUCKET_NAME=<bucket_name>",
"mcp/couchbase"
]
}
Expand All @@ -377,7 +372,7 @@ The Couchbase MCP server can also be used as a managed server in your agentic ap
## Troubleshooting Tips

- Ensure the path to your MCP server repository is correct in the configuration if running from source.
- Verify that your Couchbase connection string, database username, password and bucket name are correct.
- Verify that your Couchbase connection string, database username, password are correct.
- If using Couchbase Capella, ensure that the cluster is [accessible](https://docs.couchbase.com/cloud/clusters/allow-ip-address.html) from the machine where the MCP server is running.
- Check that the database user has proper permissions to access the specified bucket.
- Confirm that the `uv` package manager is properly installed and accessible. You may need to provide absolute path to `uv`/`uvx` in the `command` field in the configuration.
Expand Down
7 changes: 0 additions & 7 deletions src/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
envvar="CB_PASSWORD",
help="Couchbase database password (required for operations)",
)
@click.option(
"--bucket-name",
envvar="CB_BUCKET_NAME",
help="Couchbase bucket name (required for operations)",
)
@click.option(
"--read-only-query-mode",
envvar=[
Expand Down Expand Up @@ -121,7 +116,6 @@ def main(
connection_string,
username,
password,
bucket_name,
read_only_query_mode,
transport,
host,
Expand All @@ -133,7 +127,6 @@ def main(
"connection_string": connection_string,
"username": username,
"password": password,
"bucket_name": bucket_name,
"read_only_query_mode": read_only_query_mode,
"transport": transport,
"host": host,
Expand Down
9 changes: 9 additions & 0 deletions src/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@

# Server tools
from .server import (
get_buckets_in_cluster,
get_collections_in_scope,
get_scopes_and_collections_in_bucket,
get_scopes_in_bucket,
get_server_configuration_status,
test_cluster_connection,
)

# List of all tools for easy registration
ALL_TOOLS = [
get_buckets_in_cluster,
get_server_configuration_status,
test_cluster_connection,
get_scopes_and_collections_in_bucket,
get_collections_in_scope,
get_scopes_in_bucket,
get_document_by_id,
upsert_document_by_id,
delete_document_by_id,
Expand All @@ -41,6 +47,9 @@
"get_server_configuration_status",
"test_cluster_connection",
"get_scopes_and_collections_in_bucket",
"get_collections_in_scope",
"get_scopes_in_bucket",
"get_buckets_in_cluster",
"get_document_by_id",
"upsert_document_by_id",
"delete_document_by_id",
Expand Down
26 changes: 20 additions & 6 deletions src/tools/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@

from mcp.server.fastmcp import Context

from utils.connection import connect_to_bucket
from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import get_cluster_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.kv")


def get_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context,
bucket_name: str,
scope_name: str,
collection_name: str,
document_id: str,
) -> dict[str, Any]:
"""Get a document by its ID from the specified scope and collection.
If the document is not found, it will raise an exception."""
bucket = ensure_bucket_connection(ctx)

cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name)
try:
collection = bucket.scope(scope_name).collection(collection_name)
result = collection.get(document_id)
Expand All @@ -32,14 +39,16 @@ def get_document_by_id(

def upsert_document_by_id(
ctx: Context,
bucket_name: str,
scope_name: str,
collection_name: str,
document_id: str,
document_content: dict[str, Any],
) -> bool:
"""Insert or update a document by its ID.
Returns True on success, False on failure."""
bucket = ensure_bucket_connection(ctx)
cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name)
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.upsert(document_id, document_content)
Expand All @@ -51,11 +60,16 @@ def upsert_document_by_id(


def delete_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context,
bucket_name: str,
scope_name: str,
collection_name: str,
document_id: str,
) -> bool:
"""Delete a document by its ID.
Returns True on success, False on failure."""
bucket = ensure_bucket_connection(ctx)
cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name)
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.remove(document_id)
Expand Down
13 changes: 8 additions & 5 deletions src/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@
from lark_sqlpp import modifies_data, modifies_structure, parse_sqlpp
from mcp.server.fastmcp import Context

from utils.connection import connect_to_bucket
from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import get_cluster_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.query")


def get_schema_for_collection(
ctx: Context, scope_name: str, collection_name: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str
) -> dict[str, Any]:
"""Get the schema for a collection in the specified scope.
Returns a dictionary with the collection name and the schema returned by running INFER query on the Couchbase collection.
"""
schema = {"collection_name": collection_name, "schema": []}
try:
query = f"INFER {collection_name}"
result = run_sql_plus_plus_query(ctx, scope_name, query)
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query)
# Result is a list of list of schemas. We convert it to a list of schemas.
if result:
schema["schema"] = result[0]
Expand All @@ -36,10 +37,12 @@ def get_schema_for_collection(


def run_sql_plus_plus_query(
ctx: Context, scope_name: str, query: str
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> list[dict[str, Any]]:
"""Run a SQL++ query on a scope and return the results as a list of JSON objects."""
bucket = ensure_bucket_connection(ctx)
cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name)

app_context = ctx.request_context.lifespan_context
read_only_query_mode = app_context.read_only_query_mode
logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}")
Expand Down
71 changes: 55 additions & 16 deletions src/tools/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Tools for server operations.
This module contains tools for getting the server status, testing the connection, and getting the scopes and collections in the bucket.
This module contains tools for getting the server status, testing the connection, and getting the buckets in the cluster, the scopes and collections in the bucket.
"""

import logging
Expand All @@ -10,8 +10,9 @@
from mcp.server.fastmcp import Context

from utils.config import get_settings
from utils.connection import connect_to_bucket
from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import get_cluster_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.server")

Expand All @@ -26,15 +27,13 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]:
configuration = {
"connection_string": settings.get("connection_string", "Not set"),
"username": settings.get("username", "Not set"),
"bucket_name": settings.get("bucket_name", "Not set"),
"read_only_query_mode": settings.get("read_only_query_mode", True),
"password_configured": bool(settings.get("password")),
}

app_context = ctx.request_context.lifespan_context
connection_status = {
"cluster_connected": app_context.cluster is not None,
"bucket_connected": app_context.bucket is not None,
}

return {
Expand All @@ -45,39 +44,43 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]:
}


def test_cluster_connection(ctx: Context) -> dict[str, Any]:
"""Test the connection to Couchbase cluster and bucket.
def test_cluster_connection(
ctx: Context, bucket_name: str | None = None
) -> dict[str, Any]:
"""Test the connection to Couchbase cluster and optionally to a bucket.
This tool verifies the connection to the Couchbase cluster and bucket by establishing the connection if it is not already established.
Returns connection status and basic cluster information.
"""
try:
bucket = ensure_bucket_connection(ctx)

# Test basic connectivity by getting bucket name
bucket_name = bucket.name
cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name) if bucket_name else None
Copy link
Preview

Copilot AI Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function attempts to connect to a bucket when bucket_name is provided, but there's no validation that bucket_name is not an empty string. An empty string would pass the truthiness check but would likely cause connection issues.

Suggested change
bucket = connect_to_bucket(cluster, bucket_name) if bucket_name else None
bucket = connect_to_bucket(cluster, bucket_name) if bucket_name and bucket_name.strip() else None

Copilot uses AI. Check for mistakes.


return {
"status": "success",
"cluster_connected": True,
"bucket_connected": True,
"bucket_name": bucket_name,
"message": "Successfully connected to Couchbase cluster and bucket",
"cluster_connected": cluster.connected,
"bucket_connected": bucket is not None,
"bucket_name": bucket.name if bucket else None,
"message": "Successfully connected to Couchbase cluster",
}
except Exception as e:
return {
"status": "error",
"cluster_connected": False,
"bucket_connected": False,
"bucket_name": None,
"error": str(e),
"message": "Failed to connect to Couchbase",
}


def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:
def get_scopes_and_collections_in_bucket(
ctx: Context, bucket_name: str
) -> dict[str, list[str]]:
"""Get the names of all scopes and collections in the bucket.
Returns a dictionary with scope names as keys and lists of collection names as values.
"""
bucket = ensure_bucket_connection(ctx)
cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name)
try:
scopes_collections = {}
collection_manager = bucket.collections()
Expand All @@ -89,3 +92,39 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:
except Exception as e:
logger.error(f"Error getting scopes and collections: {e}")
raise


def get_buckets_in_cluster(ctx: Context) -> list[str]:
"""Get the names of all the accessible buckets in the cluster."""
cluster = get_cluster_connection(ctx)
bucket_manager = cluster.buckets()
buckets_with_settings = bucket_manager.get_all_buckets()

buckets = []
for bucket in buckets_with_settings:
buckets.append(bucket.name)

return buckets


def get_scopes_in_bucket(ctx: Context, bucket_name: str) -> list[str]:
"""Get the names of all scopes in the given bucket."""
cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name)
try:
scopes = bucket.collections().get_all_scopes()
return [scope.name for scope in scopes]
except Exception as e:
logger.error(f"Error getting scopes and collections: {e}")
raise


def get_collections_in_scope(
ctx: Context, bucket_name: str, scope_name: str
) -> list[str]:
"""Get the names of all collections in the given scope and bucket."""
scopes_and_collections = get_scopes_and_collections_in_bucket(ctx, bucket_name)
if scope_name not in scopes_and_collections:
logger.error(f"Scope {scope_name} not found in bucket {bucket_name}")
raise ValueError(f"Scope {scope_name} not found in bucket {bucket_name}")
return scopes_and_collections[scope_name]
Copy link
Preview

Copilot AI Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function calls get_scopes_and_collections_in_bucket which retrieves all scopes and collections, but only returns collections for a specific scope. This is inefficient as it fetches unnecessary data.

Suggested change
return scopes_and_collections[scope_name]
cluster = get_cluster_connection(ctx)
bucket = connect_to_bucket(cluster, bucket_name)
try:
scopes = bucket.collections().get_all_scopes()
for scope in scopes:
if scope.name == scope_name:
return [c.name for c in scope.collections]
logger.error(f"Scope {scope_name} not found in bucket {bucket_name}")
raise ValueError(f"Scope {scope_name} not found in bucket {bucket_name}")
except Exception as e:
logger.error(f"Error getting collections in scope: {e}")
raise

Copilot uses AI. Check for mistakes.

Loading