Skip to content
Open
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th

- Get a list of all the scopes and collections in the specified bucket
- Get the structure for a collection
- Get a document by ID from a specified scope and collection
- Upsert a document by ID to a specified scope and collection
- Delete a document by ID from a specified scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope
- Get a document by ID from a specified bucket, scope and collection
- Upsert a document by ID to a specified bucket, scope and collection
- Delete a document by ID from a specified bucket, scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope
- There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID.
- Retreive Index Advisor advice for a query on a specified bucket and scope.

## Prerequisites

Expand Down Expand Up @@ -46,7 +47,6 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur
"CB_CONNECTION_STRING": "couchbases://connection-string",
"CB_USERNAME": "username",
"CB_PASSWORD": "password",
"CB_BUCKET_NAME": "bucket_name"
}
}
}
Expand All @@ -58,7 +58,6 @@ The server can be configured using environment variables. The following variable
- `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster
- `CB_USERNAME`: The username with access to the bucket to use to connect
- `CB_PASSWORD`: The password for the username to connect
- `CB_BUCKET_NAME`: The name of the bucket that the server will access
- `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default.
- `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end!

Expand Down Expand Up @@ -138,7 +137,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m

By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable.

> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --bucket-name='<couchbase_bucket_to_use>' --read-only-query-mode=true --transport=sse
> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --read-only-query-mode=true --transport=sse

The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode.

Expand All @@ -159,7 +158,6 @@ docker run -i \
-e CB_CONNECTION_STRING='<couchbase_connection_string>' \
-e CB_USERNAME='<database_user>' \
-e CB_PASSWORD='<database_password>' \
-e CB_BUCKET_NAME='<bucket_name>' \
-e MCP_TRANSPORT='stdio/sse' \
-e READ_ONLY_QUERY_MODE="true/false" \
mcp/couchbase
Expand Down
105 changes: 73 additions & 32 deletions src/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class AppContext:
"""Context for the MCP server."""

cluster: Cluster | None = None
bucket: Any | None = None
read_only_query_mode: bool = True


Expand Down Expand Up @@ -64,12 +63,7 @@ def get_settings() -> dict:
help="Couchbase database password",
callback=validate_required_param,
)
@click.option(
"--bucket-name",
envvar="CB_BUCKET_NAME",
help="Couchbase bucket name",
callback=validate_required_param,
)

@click.option(
"--read-only-query-mode",
envvar="READ_ONLY_QUERY_MODE",
Expand All @@ -90,7 +84,6 @@ def main(
connection_string,
username,
password,
bucket_name,
read_only_query_mode,
transport,
):
Expand All @@ -99,7 +92,6 @@ def main(
"connection_string": connection_string,
"username": username,
"password": password,
"bucket_name": bucket_name,
"read_only_query_mode": read_only_query_mode,
}
mcp.run(transport=transport)
Expand All @@ -114,7 +106,6 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
connection_string = settings.get("connection_string")
username = settings.get("username")
password = settings.get("password")
bucket_name = settings.get("bucket_name")
read_only_query_mode = settings.get("read_only_query_mode")

# Validate configuration
Expand All @@ -128,9 +119,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
if not password:
logger.error("Couchbase database password is not set")
missing_vars.append("password")
if not bucket_name:
logger.error("Couchbase bucket name is not set")
missing_vars.append("bucket_name")


if missing_vars:
error_msg = f"Missing required configuration: {', '.join(missing_vars)}"
Expand All @@ -148,9 +137,9 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
cluster.wait_until_ready(timedelta(seconds=5))
logger.info("Successfully connected to Couchbase cluster")

bucket = cluster.bucket(bucket_name)
yield AppContext(
cluster=cluster, bucket=bucket, read_only_query_mode=read_only_query_mode
cluster=cluster,
read_only_query_mode=read_only_query_mode
)

except Exception as e:
Expand All @@ -164,11 +153,17 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:

# Tools
@mcp.tool()
def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:
"""Get the names of all scopes and collections in the bucket.
def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]:
"""Get the names of all scopes and collections for a specified bucket.
Returns a dictionary with scope names as keys and lists of collection names as values.
"""
bucket = ctx.request_context.lifespan_context.bucket
cluster = ctx.request_context.lifespan_context.cluster

try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
try:
scopes_collections = {}
collection_manager = bucket.collections()
Expand All @@ -184,14 +179,14 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:

@mcp.tool()
def get_schema_for_collection(
ctx: Context, scope_name: str, collection_name: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str
) -> dict[str, Any]:
"""Get the schema for a collection in the specified scope.
"""Get the schema for a collection in the specified scope of a specified bucket.
Returns a dictionary with the schema returned by running INFER on the Couchbase collection.
"""
try:
query = f"INFER {collection_name}"
result = run_sql_plus_plus_query(ctx, scope_name, query)
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query)
return result
except Exception as e:
logger.error(f"Error getting schema: {e}")
Expand All @@ -200,10 +195,15 @@ def get_schema_for_collection(

@mcp.tool()
def get_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> dict[str, Any]:
"""Get a document by its ID from the specified scope and collection."""
bucket = ctx.request_context.lifespan_context.bucket
"""Get a document by its ID from the specified bucket, scope and collection."""
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
try:
collection = bucket.scope(scope_name).collection(collection_name)
result = collection.get(document_id)
Expand All @@ -216,14 +216,20 @@ def get_document_by_id(
@mcp.tool()
def upsert_document_by_id(
ctx: Context,
bucket_name: str,
scope_name: str,
collection_name: str,
document_id: str,
document_content: dict[str, Any],
) -> bool:
"""Insert or update a document by its ID.
"""Insert or update a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ctx.request_context.lifespan_context.bucket
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.upsert(document_id, document_content)
Expand All @@ -236,11 +242,16 @@ def upsert_document_by_id(

@mcp.tool()
def delete_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> bool:
"""Delete a document by its ID.
"""Delete a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ctx.request_context.lifespan_context.bucket
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.remove(document_id)
Expand All @@ -250,13 +261,43 @@ def delete_document_by_id(
logger.error(f"Error deleting document {document_id}: {e}")
return False

@mcp.tool()
def advise_index_for_sql_plus_plus_query(
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> dict[str, Any]:
"""Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope.
Returns a dictionary with the query advised on, as well as:
1. an array of the current indexes used and their status (or a string indicating no existing indexes available)
2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements)
"""
response = {}

try:
query = f"ADVISE {query}"
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query)
advice = result[0].get("advice")
if (advice is not None):
advise_info = advice.get("adviseinfo")
if ( advise_info is not None):
response["current_indexes"] = advise_info.get("current_indexes", "No current indexes")
response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available")
response["query"]=result[0].get("query","Query statement unavailable")
return response
except Exception as e:
logger.error(f"Error running Advise on query: {e}")
raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}")

@mcp.tool()
def run_sql_plus_plus_query(
ctx: Context, scope_name: str, query: str
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> list[dict[str, Any]]:
"""Run a SQL++ query on a scope and return the results as a list of JSON objects."""
bucket = ctx.request_context.lifespan_context.bucket
"""Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects."""
cluster = ctx.request_context.lifespan_context.cluster
try:
bucket = cluster.bucket(bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.")
read_only_query_mode = ctx.request_context.lifespan_context.read_only_query_mode
logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}")

Expand Down