From b3abd77bbd05702f8c99d32135c828a06047dc11 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 00:58:06 +0300 Subject: [PATCH 1/7] Adding support for multiple buckets. Adding support for index advisor --- src/mcp_server.py | 123 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 32 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 52660f3..170034b 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -26,7 +26,7 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - bucket: Any | None = None + #bucket: Any | None = None read_only_query_mode: bool = True @@ -64,12 +64,12 @@ def get_settings() -> dict: help="Couchbase database password", callback=validate_required_param, ) -@click.option( - "--bucket-name", - envvar="CB_BUCKET_NAME", - help="Couchbase bucket name", - callback=validate_required_param, -) +#@click.option( +# "--bucket-name", +# envvar="CB_BUCKET_NAME", +# help="Couchbase bucket name", +# callback=validate_required_param, +#) @click.option( "--read-only-query-mode", envvar="READ_ONLY_QUERY_MODE", @@ -90,7 +90,7 @@ def main( connection_string, username, password, - bucket_name, + #bucket_name, read_only_query_mode, transport, ): @@ -99,7 +99,7 @@ def main( "connection_string": connection_string, "username": username, "password": password, - "bucket_name": bucket_name, + #"bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, } mcp.run(transport=transport) @@ -114,7 +114,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: connection_string = settings.get("connection_string") username = settings.get("username") password = settings.get("password") - bucket_name = settings.get("bucket_name") + #bucket_name = settings.get("bucket_name") read_only_query_mode = settings.get("read_only_query_mode") # Validate configuration @@ -128,9 +128,9 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: if not password: logger.error("Couchbase database password is not set") missing_vars.append("password") - if not bucket_name: - logger.error("Couchbase bucket name is not set") - missing_vars.append("bucket_name") + #if not bucket_name: + # logger.error("Couchbase bucket name is not set") + # missing_vars.append("bucket_name") if missing_vars: error_msg = f"Missing required configuration: {', '.join(missing_vars)}" @@ -148,9 +148,11 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: cluster.wait_until_ready(timedelta(seconds=5)) logger.info("Successfully connected to Couchbase cluster") - bucket = cluster.bucket(bucket_name) + #bucket = cluster.bucket(bucket_name) yield AppContext( - cluster=cluster, bucket=bucket, read_only_query_mode=read_only_query_mode + cluster=cluster, + #bucket=bucket, + read_only_query_mode=read_only_query_mode ) except Exception as e: @@ -164,11 +166,18 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: # Tools @mcp.tool() -def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: - """Get the names of all scopes and collections in the bucket. +def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: + """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: scopes_collections = {} collection_manager = bucket.collections() @@ -184,14 +193,14 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: @mcp.tool() def get_schema_for_collection( - ctx: Context, scope_name: str, collection_name: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str ) -> dict[str, Any]: - """Get the schema for a collection in the specified scope. + """Get the schema for a collection in the specified scope of a specified bucket. Returns a dictionary with the schema returned by running INFER on the Couchbase collection. """ try: query = f"INFER {collection_name}" - result = run_sql_plus_plus_query(ctx, scope_name, query) + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) return result except Exception as e: logger.error(f"Error getting schema: {e}") @@ -200,10 +209,16 @@ def get_schema_for_collection( @mcp.tool() def get_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: - """Get a document by its ID from the specified scope and collection.""" - bucket = ctx.request_context.lifespan_context.bucket + """Get a document by its ID from the specified bucket, scope and collection.""" + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) result = collection.get(document_id) @@ -216,14 +231,21 @@ def get_document_by_id( @mcp.tool() def upsert_document_by_id( ctx: Context, + bucket_name: str, scope_name: str, collection_name: str, document_id: str, document_content: dict[str, Any], ) -> bool: - """Insert or update a document by its ID. + """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.upsert(document_id, document_content) @@ -236,11 +258,17 @@ def upsert_document_by_id( @mcp.tool() def delete_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> bool: - """Delete a document by its ID. + """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.remove(document_id) @@ -250,13 +278,44 @@ def delete_document_by_id( logger.error(f"Error deleting document {document_id}: {e}") return False +@mcp.tool() +def advise_index_for_sql_plus_plus_query( + ctx: Context, bucket_name: str, scope_name: str, query: str +) -> dict[str, Any]: + """Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope. + Returns a dictionary with the query advised on, as well as: + 1. an array of the current indexes used and their status (or a string indicating no existing indexes available) + 2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements) + """ + response = {} + + try: + query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) + advice = result[0].get("advice") + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") + return response + except Exception as e: + logger.error(f"Error running Advise on query: {e}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") @mcp.tool() def run_sql_plus_plus_query( - ctx: Context, scope_name: str, query: str + ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: - """Run a SQL++ query on a scope and return the results as a list of JSON objects.""" - bucket = ctx.request_context.lifespan_context.bucket + """Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects.""" + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") read_only_query_mode = ctx.request_context.lifespan_context.read_only_query_mode logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}") From 77ce027ac07eb18a75876549b4c61c96a41e54db Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:41:42 +0300 Subject: [PATCH 2/7] Updated documentation after implementing index advisor and mutltiple bucket access --- README.md | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b1ad21c..08b1eb8 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,12 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th - Get a list of all the scopes and collections in the specified bucket - Get the structure for a collection -- Get a document by ID from a specified scope and collection -- Upsert a document by ID to a specified scope and collection -- Delete a document by ID from a specified scope and collection -- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope +- Get a document by ID from a specified bucket, scope and collection +- Upsert a document by ID to a specified bucket, scope and collection +- Delete a document by ID from a specified bucket, scope and collection +- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope - There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID. +- Retreive Index Advisor advice for a query on a specified bucket and scope. ## Prerequisites @@ -46,7 +47,6 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur "CB_CONNECTION_STRING": "couchbases://connection-string", "CB_USERNAME": "username", "CB_PASSWORD": "password", - "CB_BUCKET_NAME": "bucket_name" } } } @@ -58,7 +58,6 @@ The server can be configured using environment variables. The following variable - `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster - `CB_USERNAME`: The username with access to the bucket to use to connect - `CB_PASSWORD`: The password for the username to connect -- `CB_BUCKET_NAME`: The name of the bucket that the server will access - `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default. - `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end! @@ -138,7 +137,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable. -> uv run src/mcp_server.py --connection-string='' --username='' --password='' --bucket-name='' --read-only-query-mode=true --transport=sse +> uv run src/mcp_server.py --connection-string='' --username='' --password='' --read-only-query-mode=true --transport=sse The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode. @@ -159,7 +158,6 @@ docker run -i \ -e CB_CONNECTION_STRING='' \ -e CB_USERNAME='' \ -e CB_PASSWORD='' \ - -e CB_BUCKET_NAME='' \ -e MCP_TRANSPORT='stdio/sse' \ -e READ_ONLY_QUERY_MODE="true/false" \ mcp/couchbase From eda8103cc0424ddb0bca8fe254b9b93b638d76a4 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:44:40 +0300 Subject: [PATCH 3/7] Code comment cleanup --- src/mcp_server.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 170034b..185e088 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -26,7 +26,6 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - #bucket: Any | None = None read_only_query_mode: bool = True @@ -64,12 +63,7 @@ def get_settings() -> dict: help="Couchbase database password", callback=validate_required_param, ) -#@click.option( -# "--bucket-name", -# envvar="CB_BUCKET_NAME", -# help="Couchbase bucket name", -# callback=validate_required_param, -#) + @click.option( "--read-only-query-mode", envvar="READ_ONLY_QUERY_MODE", @@ -90,7 +84,6 @@ def main( connection_string, username, password, - #bucket_name, read_only_query_mode, transport, ): @@ -99,7 +92,6 @@ def main( "connection_string": connection_string, "username": username, "password": password, - #"bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, } mcp.run(transport=transport) @@ -114,7 +106,6 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: connection_string = settings.get("connection_string") username = settings.get("username") password = settings.get("password") - #bucket_name = settings.get("bucket_name") read_only_query_mode = settings.get("read_only_query_mode") # Validate configuration @@ -128,9 +119,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: if not password: logger.error("Couchbase database password is not set") missing_vars.append("password") - #if not bucket_name: - # logger.error("Couchbase bucket name is not set") - # missing_vars.append("bucket_name") + if missing_vars: error_msg = f"Missing required configuration: {', '.join(missing_vars)}" @@ -148,10 +137,8 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: cluster.wait_until_ready(timedelta(seconds=5)) logger.info("Successfully connected to Couchbase cluster") - #bucket = cluster.bucket(bucket_name) yield AppContext( cluster=cluster, - #bucket=bucket, read_only_query_mode=read_only_query_mode ) @@ -170,7 +157,6 @@ def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: @@ -212,7 +198,6 @@ def get_document_by_id( ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: """Get a document by its ID from the specified bucket, scope and collection.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -239,7 +224,6 @@ def upsert_document_by_id( ) -> bool: """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -262,7 +246,6 @@ def delete_document_by_id( ) -> bool: """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -309,7 +292,6 @@ def run_sql_plus_plus_query( ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: """Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) From 03d0d3c241ef88c785789bd379b1820465fe5f8c Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:57:29 +0300 Subject: [PATCH 4/7] Added check in advise_index_for_sql_plus_plus_query for empty response --- src/mcp_server.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 185e088..75e124e 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -275,13 +275,14 @@ def advise_index_for_sql_plus_plus_query( try: query = f"ADVISE {query}" result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) - advice = result[0].get("advice") - if (advice is not None): - advise_info = advice.get("adviseinfo") - if ( advise_info is not None): - response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") - response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") - response["query"]=result[0].get("query","Query statement unavailable") + + if result and (advice := result[0].get("advice")): + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") return response except Exception as e: logger.error(f"Error running Advise on query: {e}") From 00a391ca58621c135a01cf61f0de2fac5947b1be Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Wed, 16 Jul 2025 16:55:02 +0300 Subject: [PATCH 5/7] added function for MCP tool to get the list of buckets from the Couchbase cluster, including their bucket settings. --- src/mcp_server.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/mcp_server.py b/src/mcp_server.py index 75e124e..5da4a42 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -151,7 +151,29 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan) + # Tools + +@mcp.tool() +def get_list_of_buckets_with_settings( + ctx: Context +) -> list[str]: + """Get the list of buckets from the Couchbase cluster, including their bucket settings. + Returns a list of bucket setting objects. + """ + cluster = ctx.request_context.lifespan_context.cluster + result=[] + try: + bucket_manager = cluster.buckets() + buckets = bucket_manager.get_all_buckets() + for b in buckets: + result.append(b) + return result + except Exception as e: + logger.error(f"Error getting bucket names: {e}") + raise e + + @mcp.tool() def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: """Get the names of all scopes and collections for a specified bucket. From 08a3334db80dd21d78ac4ef6ddd7dc25125f9c36 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 20 Jul 2025 21:46:41 +0300 Subject: [PATCH 6/7] Fixed ambiguous reference to query param in advise tool function --- src/mcp_server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 5da4a42..a668db2 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -295,8 +295,8 @@ def advise_index_for_sql_plus_plus_query( response = {} try: - query = f"ADVISE {query}" - result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) + advise_query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query) if result and (advice := result[0].get("advice")): if (advice is not None): @@ -308,7 +308,7 @@ def advise_index_for_sql_plus_plus_query( return response except Exception as e: logger.error(f"Error running Advise on query: {e}") - raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e @mcp.tool() def run_sql_plus_plus_query( From 6ce9a88f06bae072537e0f2ebb497b908e81ed6e Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 27 Jul 2025 14:22:00 +0300 Subject: [PATCH 7/7] Fixed response message for connection test --- src/tools/server.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/tools/server.py b/src/tools/server.py index 6358add..117127d 100644 --- a/src/tools/server.py +++ b/src/tools/server.py @@ -57,13 +57,20 @@ def test_connection(ctx: Context, bucket_name: str = None) -> dict[str, Any]: if bucket_name is not None: try: bucket = ensure_bucket_connection(ctx, bucket_name) + bucket_connected = True + return { + "status": "success", + "cluster_connected": cluster_connected, + "bucket_connected": bucket_connected, + "message": f"Successfully connected to Couchbase cluster and bucket `{bucket_name}`", + } except Exception as e: return { "status": "error", "cluster_connected": cluster_connected, "bucket_connected": bucket_connected, "error": str(e), - "message": f"Failed to connect to bucket named {bucket_name}", + "message": f"Failed to connect to bucket named `{bucket_name}`", } else: return {