Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th

- Get a list of all the scopes and collections in the specified bucket
- Get the structure for a collection
- Get a document by ID from a specified scope and collection
- Upsert a document by ID to a specified scope and collection
- Delete a document by ID from a specified scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope
- Get a document by ID from a specified bucket, scope and collection
- Upsert a document by ID to a specified bucket, scope and collection
- Delete a document by ID from a specified bucket, scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope
- There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID.
- Retreive Index Advisor advice for a query on a specified bucket and scope.


## Prerequisites

Expand All @@ -30,7 +32,7 @@ git clone https://github.com/Couchbase-Ecosystem/mcp-server-couchbase.git
### Server Configuration for MCP Clients

This is the common configuration for the MCP clients such as Claude Desktop, Cursor, Windsurf Editor.

Using Basic Auth:
```json
{
"mcpServers": {
Expand All @@ -46,7 +48,30 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur
"CB_CONNECTION_STRING": "couchbases://connection-string",
"CB_USERNAME": "username",
"CB_PASSWORD": "password",
"CB_BUCKET_NAME": "bucket_name"
"CA_CERT_PATH" : "path/to/ca.crt"
}
}
}
}
```

Using mTLS:

```json
{
"mcpServers": {
"couchbase": {
"command": "uv",
"args": [
"--directory",
"path/to/cloned/repo/mcp-server-couchbase/",
"run",
"src/mcp_server.py"
],
"env": {
"CB_CONNECTION_STRING": "couchbases://connection-string",
"CLIENT_CERT_PATH": "path/to/client/cert_and_key/",
"CA_CERT_PATH" : "path/to/ca.crt"
}
}
}
Expand All @@ -56,9 +81,10 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur
The server can be configured using environment variables. The following variables are supported:

- `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster
- `CB_USERNAME`: The username with access to the bucket to use to connect
- `CB_PASSWORD`: The password for the username to connect
- `CB_BUCKET_NAME`: The name of the bucket that the server will access
- `CB_USERNAME`: The username with access to the bucket to use to connect. Must be set if using Basic Auth and unset if using mTLS.
- `CB_PASSWORD`: The password for the username to connect. Must be set if using Basic Auth and unset if using mTLS with client certificate.
- `CLIENT_CERT_PATH`: The path to client certificate (named client.pem) and key (named client.key) for mTLS authentication. Must be set if using mTLS and unset if using Basic Auth with username and password.
- `CA_CERT_PATH`: The path to the CA certificate for trusted TLS connection to the server. If not provided, locally trusted certificate store will be used.
- `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default.
- `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end!

Expand Down Expand Up @@ -138,7 +164,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m

By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable.

> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --bucket-name='<couchbase_bucket_to_use>' --read-only-query-mode=true --transport=sse
> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --read-only-query-mode=true --transport=sse

The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode.

Expand All @@ -159,7 +185,6 @@ docker run -i \
-e CB_CONNECTION_STRING='<couchbase_connection_string>' \
-e CB_USERNAME='<database_user>' \
-e CB_PASSWORD='<database_password>' \
-e CB_BUCKET_NAME='<bucket_name>' \
-e MCP_TRANSPORT='stdio/sse' \
-e READ_ONLY_QUERY_MODE="true/false" \
mcp/couchbase
Expand Down
33 changes: 23 additions & 10 deletions src/mcp_server.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
"""
Couchbase MCP Server
"""

import os
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import click
from mcp.server.fastmcp import FastMCP
from utils.config import validate_authentication_method

# Import tools
from tools import ALL_TOOLS
Expand Down Expand Up @@ -73,10 +70,19 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
help="Couchbase database password (required for operations)",
)
@click.option(
"--bucket-name",
envvar="CB_BUCKET_NAME",
help="Couchbase bucket name (required for operations)",
'--ca-cert-path',
envvar="CA_CERT_PATH",
type=click.Path(exists=True),
default=None,
help='Path to Server TLS certificate, required for secure connections.')

@click.option(
"--client-cert-path",
envvar="CLIENT_CERT_PATH",
default=None,
help="Path to client.key and client.pem files for mtls client authentication",
)

@click.option(
"--read-only-query-mode",
envvar="READ_ONLY_QUERY_MODE",
Expand All @@ -97,20 +103,27 @@ def main(
connection_string,
username,
password,
bucket_name,
read_only_query_mode,
transport,
ca_cert_path,
client_cert_path
):
"""Couchbase MCP Server"""
# Store configuration in context
ctx.obj = {
"connection_string": connection_string,
"username": username,
"password": password,
"bucket_name": bucket_name,
"read_only_query_mode": read_only_query_mode,
"ca_cert_path" : ca_cert_path,
"client_cert_path" : client_cert_path
}

try:
validate_authentication_method(ctx.obj)
except Exception as e:
logger.error(f"Failed to validate auth method params: {e}")
raise
# Create MCP server inside main()
mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan)

Expand Down
6 changes: 6 additions & 0 deletions src/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,43 @@
from .query import (
get_schema_for_collection,
run_sql_plus_plus_query,
advise_index_for_sql_plus_plus_query,
)

# Server tools
from .server import (
get_scopes_and_collections_in_bucket,
get_server_configuration_status,
test_connection,
get_list_of_buckets_with_settings,
)

# List of all tools for easy registration
ALL_TOOLS = [
get_server_configuration_status,
test_connection,
get_list_of_buckets_with_settings,
get_scopes_and_collections_in_bucket,
get_document_by_id,
upsert_document_by_id,
delete_document_by_id,
get_schema_for_collection,
run_sql_plus_plus_query,
advise_index_for_sql_plus_plus_query,
]

__all__ = [
# Individual tools
"get_server_configuration_status",
"test_connection",
"get_list_of_buckets_with_settings",
"get_scopes_and_collections_in_bucket",
"get_document_by_id",
"upsert_document_by_id",
"delete_document_by_id",
"get_schema_for_collection",
"run_sql_plus_plus_query",
"advise_index_for_sql_plus_plus_query",
# Convenience
"ALL_TOOLS",
]
32 changes: 22 additions & 10 deletions src/tools/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
from mcp.server.fastmcp import Context

from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import ensure_cluster_connection, ensure_bucket_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.kv")


def get_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> dict[str, Any]:
"""Get a document by its ID from the specified scope and collection.
If the document is not found, it will raise an exception."""
bucket = ensure_bucket_connection(ctx)
"""Get a document by its ID from the specified bucket, scope and collection."""
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
try:
collection = bucket.scope(scope_name).collection(collection_name)
result = collection.get(document_id)
Expand All @@ -32,14 +35,19 @@ def get_document_by_id(

def upsert_document_by_id(
ctx: Context,
bucket_name: str,
scope_name: str,
collection_name: str,
document_id: str,
document_content: dict[str, Any],
) -> bool:
"""Insert or update a document by its ID.
"""Insert or update a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ensure_bucket_connection(ctx)
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.upsert(document_id, document_content)
Expand All @@ -51,11 +59,15 @@ def upsert_document_by_id(


def delete_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> bool:
"""Delete a document by its ID.
"""Delete a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ensure_bucket_connection(ctx)
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.remove(document_id)
Expand Down
39 changes: 34 additions & 5 deletions src/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,60 @@
from mcp.server.fastmcp import Context

from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import ensure_cluster_connection, ensure_bucket_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.query")


def get_schema_for_collection(
ctx: Context, scope_name: str, collection_name: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str
) -> list[dict[str, Any]]:
"""Get the schema for a collection in the specified scope.
Returns a dictionary with the schema returned by running INFER on the Couchbase collection.
"""
try:
query = f"INFER {collection_name}"
result = run_sql_plus_plus_query(ctx, scope_name, query)
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query)
return result
except Exception as e:
logger.error(f"Error getting schema: {e}")
raise

def advise_index_for_sql_plus_plus_query(
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> dict[str, Any]:
"""Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope.
Returns a dictionary with the query advised on, as well as:
1. an array of the current indexes used and their status (or a string indicating no existing indexes available)
2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements)
"""
response = {}

try:
advise_query = f"ADVISE {query}"
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query)

if result and (advice := result[0].get("advice")):
if (advice is not None):
advise_info = advice.get("adviseinfo")
if ( advise_info is not None):
response["current_indexes"] = advise_info.get("current_indexes", "No current indexes")
response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available")
response["query"]=result[0].get("query","Query statement unavailable")
return response
except Exception as e:
logger.error(f"Error running Advise on query: {e}")
raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e

def run_sql_plus_plus_query(
ctx: Context, scope_name: str, query: str
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> list[dict[str, Any]]:
"""Run a SQL++ query on a scope and return the results as a list of JSON objects."""
bucket = ensure_bucket_connection(ctx)
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
app_context = ctx.request_context.lifespan_context
read_only_query_mode = app_context.read_only_query_mode
logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}")
Expand Down
Loading