From b3abd77bbd05702f8c99d32135c828a06047dc11 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 00:58:06 +0300 Subject: [PATCH 01/15] Adding support for multiple buckets. Adding support for index advisor --- src/mcp_server.py | 123 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 32 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 52660f3..170034b 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -26,7 +26,7 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - bucket: Any | None = None + #bucket: Any | None = None read_only_query_mode: bool = True @@ -64,12 +64,12 @@ def get_settings() -> dict: help="Couchbase database password", callback=validate_required_param, ) -@click.option( - "--bucket-name", - envvar="CB_BUCKET_NAME", - help="Couchbase bucket name", - callback=validate_required_param, -) +#@click.option( +# "--bucket-name", +# envvar="CB_BUCKET_NAME", +# help="Couchbase bucket name", +# callback=validate_required_param, +#) @click.option( "--read-only-query-mode", envvar="READ_ONLY_QUERY_MODE", @@ -90,7 +90,7 @@ def main( connection_string, username, password, - bucket_name, + #bucket_name, read_only_query_mode, transport, ): @@ -99,7 +99,7 @@ def main( "connection_string": connection_string, "username": username, "password": password, - "bucket_name": bucket_name, + #"bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, } mcp.run(transport=transport) @@ -114,7 +114,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: connection_string = settings.get("connection_string") username = settings.get("username") password = settings.get("password") - bucket_name = settings.get("bucket_name") + #bucket_name = settings.get("bucket_name") read_only_query_mode = settings.get("read_only_query_mode") # Validate configuration @@ -128,9 +128,9 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: if not password: logger.error("Couchbase database password is not set") missing_vars.append("password") - if not bucket_name: - logger.error("Couchbase bucket name is not set") - missing_vars.append("bucket_name") + #if not bucket_name: + # logger.error("Couchbase bucket name is not set") + # missing_vars.append("bucket_name") if missing_vars: error_msg = f"Missing required configuration: {', '.join(missing_vars)}" @@ -148,9 +148,11 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: cluster.wait_until_ready(timedelta(seconds=5)) logger.info("Successfully connected to Couchbase cluster") - bucket = cluster.bucket(bucket_name) + #bucket = cluster.bucket(bucket_name) yield AppContext( - cluster=cluster, bucket=bucket, read_only_query_mode=read_only_query_mode + cluster=cluster, + #bucket=bucket, + read_only_query_mode=read_only_query_mode ) except Exception as e: @@ -164,11 +166,18 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: # Tools @mcp.tool() -def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: - """Get the names of all scopes and collections in the bucket. +def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: + """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: scopes_collections = {} collection_manager = bucket.collections() @@ -184,14 +193,14 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: @mcp.tool() def get_schema_for_collection( - ctx: Context, scope_name: str, collection_name: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str ) -> dict[str, Any]: - """Get the schema for a collection in the specified scope. + """Get the schema for a collection in the specified scope of a specified bucket. Returns a dictionary with the schema returned by running INFER on the Couchbase collection. """ try: query = f"INFER {collection_name}" - result = run_sql_plus_plus_query(ctx, scope_name, query) + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) return result except Exception as e: logger.error(f"Error getting schema: {e}") @@ -200,10 +209,16 @@ def get_schema_for_collection( @mcp.tool() def get_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: - """Get a document by its ID from the specified scope and collection.""" - bucket = ctx.request_context.lifespan_context.bucket + """Get a document by its ID from the specified bucket, scope and collection.""" + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) result = collection.get(document_id) @@ -216,14 +231,21 @@ def get_document_by_id( @mcp.tool() def upsert_document_by_id( ctx: Context, + bucket_name: str, scope_name: str, collection_name: str, document_id: str, document_content: dict[str, Any], ) -> bool: - """Insert or update a document by its ID. + """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.upsert(document_id, document_content) @@ -236,11 +258,17 @@ def upsert_document_by_id( @mcp.tool() def delete_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> bool: - """Delete a document by its ID. + """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ctx.request_context.lifespan_context.bucket + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") try: collection = bucket.scope(scope_name).collection(collection_name) collection.remove(document_id) @@ -250,13 +278,44 @@ def delete_document_by_id( logger.error(f"Error deleting document {document_id}: {e}") return False +@mcp.tool() +def advise_index_for_sql_plus_plus_query( + ctx: Context, bucket_name: str, scope_name: str, query: str +) -> dict[str, Any]: + """Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope. + Returns a dictionary with the query advised on, as well as: + 1. an array of the current indexes used and their status (or a string indicating no existing indexes available) + 2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements) + """ + response = {} + + try: + query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) + advice = result[0].get("advice") + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") + return response + except Exception as e: + logger.error(f"Error running Advise on query: {e}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") @mcp.tool() def run_sql_plus_plus_query( - ctx: Context, scope_name: str, query: str + ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: - """Run a SQL++ query on a scope and return the results as a list of JSON objects.""" - bucket = ctx.request_context.lifespan_context.bucket + """Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects.""" + #bucket = ctx.request_context.lifespan_context.bucket + cluster = ctx.request_context.lifespan_context.cluster + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") read_only_query_mode = ctx.request_context.lifespan_context.read_only_query_mode logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}") From 77ce027ac07eb18a75876549b4c61c96a41e54db Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:41:42 +0300 Subject: [PATCH 02/15] Updated documentation after implementing index advisor and mutltiple bucket access --- README.md | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b1ad21c..08b1eb8 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,12 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th - Get a list of all the scopes and collections in the specified bucket - Get the structure for a collection -- Get a document by ID from a specified scope and collection -- Upsert a document by ID to a specified scope and collection -- Delete a document by ID from a specified scope and collection -- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope +- Get a document by ID from a specified bucket, scope and collection +- Upsert a document by ID to a specified bucket, scope and collection +- Delete a document by ID from a specified bucket, scope and collection +- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope - There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID. +- Retreive Index Advisor advice for a query on a specified bucket and scope. ## Prerequisites @@ -46,7 +47,6 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur "CB_CONNECTION_STRING": "couchbases://connection-string", "CB_USERNAME": "username", "CB_PASSWORD": "password", - "CB_BUCKET_NAME": "bucket_name" } } } @@ -58,7 +58,6 @@ The server can be configured using environment variables. The following variable - `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster - `CB_USERNAME`: The username with access to the bucket to use to connect - `CB_PASSWORD`: The password for the username to connect -- `CB_BUCKET_NAME`: The name of the bucket that the server will access - `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default. - `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end! @@ -138,7 +137,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable. -> uv run src/mcp_server.py --connection-string='' --username='' --password='' --bucket-name='' --read-only-query-mode=true --transport=sse +> uv run src/mcp_server.py --connection-string='' --username='' --password='' --read-only-query-mode=true --transport=sse The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode. @@ -159,7 +158,6 @@ docker run -i \ -e CB_CONNECTION_STRING='' \ -e CB_USERNAME='' \ -e CB_PASSWORD='' \ - -e CB_BUCKET_NAME='' \ -e MCP_TRANSPORT='stdio/sse' \ -e READ_ONLY_QUERY_MODE="true/false" \ mcp/couchbase From eda8103cc0424ddb0bca8fe254b9b93b638d76a4 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:44:40 +0300 Subject: [PATCH 03/15] Code comment cleanup --- src/mcp_server.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 170034b..185e088 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -26,7 +26,6 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - #bucket: Any | None = None read_only_query_mode: bool = True @@ -64,12 +63,7 @@ def get_settings() -> dict: help="Couchbase database password", callback=validate_required_param, ) -#@click.option( -# "--bucket-name", -# envvar="CB_BUCKET_NAME", -# help="Couchbase bucket name", -# callback=validate_required_param, -#) + @click.option( "--read-only-query-mode", envvar="READ_ONLY_QUERY_MODE", @@ -90,7 +84,6 @@ def main( connection_string, username, password, - #bucket_name, read_only_query_mode, transport, ): @@ -99,7 +92,6 @@ def main( "connection_string": connection_string, "username": username, "password": password, - #"bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, } mcp.run(transport=transport) @@ -114,7 +106,6 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: connection_string = settings.get("connection_string") username = settings.get("username") password = settings.get("password") - #bucket_name = settings.get("bucket_name") read_only_query_mode = settings.get("read_only_query_mode") # Validate configuration @@ -128,9 +119,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: if not password: logger.error("Couchbase database password is not set") missing_vars.append("password") - #if not bucket_name: - # logger.error("Couchbase bucket name is not set") - # missing_vars.append("bucket_name") + if missing_vars: error_msg = f"Missing required configuration: {', '.join(missing_vars)}" @@ -148,10 +137,8 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: cluster.wait_until_ready(timedelta(seconds=5)) logger.info("Successfully connected to Couchbase cluster") - #bucket = cluster.bucket(bucket_name) yield AppContext( cluster=cluster, - #bucket=bucket, read_only_query_mode=read_only_query_mode ) @@ -170,7 +157,6 @@ def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: @@ -212,7 +198,6 @@ def get_document_by_id( ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: """Get a document by its ID from the specified bucket, scope and collection.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -239,7 +224,6 @@ def upsert_document_by_id( ) -> bool: """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -262,7 +246,6 @@ def delete_document_by_id( ) -> bool: """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) @@ -309,7 +292,6 @@ def run_sql_plus_plus_query( ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: """Run a SQL++ query on a scope in a specified bucket and return the results as a list of JSON objects.""" - #bucket = ctx.request_context.lifespan_context.bucket cluster = ctx.request_context.lifespan_context.cluster try: bucket = cluster.bucket(bucket_name) From 03d0d3c241ef88c785789bd379b1820465fe5f8c Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 15 Jul 2025 12:57:29 +0300 Subject: [PATCH 04/15] Added check in advise_index_for_sql_plus_plus_query for empty response --- src/mcp_server.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 185e088..75e124e 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -275,13 +275,14 @@ def advise_index_for_sql_plus_plus_query( try: query = f"ADVISE {query}" result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) - advice = result[0].get("advice") - if (advice is not None): - advise_info = advice.get("adviseinfo") - if ( advise_info is not None): - response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") - response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") - response["query"]=result[0].get("query","Query statement unavailable") + + if result and (advice := result[0].get("advice")): + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") return response except Exception as e: logger.error(f"Error running Advise on query: {e}") From 00a391ca58621c135a01cf61f0de2fac5947b1be Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Wed, 16 Jul 2025 16:55:02 +0300 Subject: [PATCH 05/15] added function for MCP tool to get the list of buckets from the Couchbase cluster, including their bucket settings. --- src/mcp_server.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/mcp_server.py b/src/mcp_server.py index 75e124e..5da4a42 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -151,7 +151,29 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan) + # Tools + +@mcp.tool() +def get_list_of_buckets_with_settings( + ctx: Context +) -> list[str]: + """Get the list of buckets from the Couchbase cluster, including their bucket settings. + Returns a list of bucket setting objects. + """ + cluster = ctx.request_context.lifespan_context.cluster + result=[] + try: + bucket_manager = cluster.buckets() + buckets = bucket_manager.get_all_buckets() + for b in buckets: + result.append(b) + return result + except Exception as e: + logger.error(f"Error getting bucket names: {e}") + raise e + + @mcp.tool() def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: """Get the names of all scopes and collections for a specified bucket. From d596e4bd36e6b2b873615af07b3441fad552dc25 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 20 Jul 2025 13:31:43 +0300 Subject: [PATCH 06/15] Added mtls support for cluster connection. Client certificate and key must be named client.pem and client.key and reside in provided client_cert_path directory. Either client cert or user/pass to be used. --- src/mcp_server.py | 105 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 100 insertions(+), 5 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 5da4a42..9552cef 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -2,7 +2,7 @@ from typing import Any from mcp.server.fastmcp import FastMCP, Context from couchbase.cluster import Cluster -from couchbase.auth import PasswordAuthenticator +from couchbase.auth import PasswordAuthenticator, CertificateAuthenticator from couchbase.options import ClusterOptions import logging from dataclasses import dataclass @@ -10,6 +10,7 @@ from typing import AsyncIterator from lark_sqlpp import modifies_data, modifies_structure, parse_sqlpp import click +import os MCP_SERVER_NAME = "couchbase" @@ -27,6 +28,12 @@ class AppContext: cluster: Cluster | None = None read_only_query_mode: bool = True + username: str = None + password: str = None + connection_string: str = None + ca_cert_path : str = None + use_tls : bool = True + client_cert_path : str = None def validate_required_param( @@ -37,6 +44,47 @@ def validate_required_param( raise click.BadParameter(f"{param.name} cannot be empty") return value +def validate_authentication_method(params : dict ) -> bool: + """Util function to verify either user/password combination OR client certificates have been included""" + username = params.get("username") + password = params.get("password") + client_cert_path = params.get("client_cert_path") + ca_cert_path = params.get("ca_cert_path") + + # Strip values to check for empty strings + if username is not None: + username = username.strip() + if password is not None: + password = password.strip() + + if client_cert_path: + client_cert = os.path.join(client_cert_path, "client.pem") + client_key = os.path.join(client_cert_path, "client.key") + + if not os.path.isfile(client_cert) or not os.path.isfile(client_key): + raise click.BadParameter( + f"Client certificate files not found in {client_cert_path}. Required: client.pem and client.key." + ) + + if username or password: + raise click.BadParameter( + "You must use either a client certificate or username/password, not both." + ) + + elif username or password: + if not username or not password: + raise click.BadParameter( + "Both username and password must be provided and non-empty if using basic authentication." + ) + else: + raise click.BadParameter( + "You must provide either a client certificate path or username/password combination, neither received." + ) + + if not ca_cert_path: + logger.warning(f"A trusted CA certificate has not been provided, using local trust store for TLS connections") + + def get_settings() -> dict: """Get settings from Click context.""" @@ -55,13 +103,28 @@ def get_settings() -> dict: "--username", envvar="CB_USERNAME", help="Couchbase database user", - callback=validate_required_param, + #callback=validate_required_param, ) @click.option( "--password", envvar="CB_PASSWORD", help="Couchbase database password", - callback=validate_required_param, + #callback=validate_required_param, +) + +@click.option( + '--ca-cert-path', + envvar="CA_CERT_PATH", + type=click.Path(exists=True), + default=None, + help='Path to Server TLS certificate, required for API calls.') + +@click.option( + "--client-cert-path", + envvar="CLIENT_CERT_PATH", + default=None, + help="Path to client.key and client.pem files for mtls client authentication", + #callback=validate_required_param, ) @click.option( @@ -85,6 +148,8 @@ def main( username, password, read_only_query_mode, + ca_cert_path, + client_cert_path, transport, ): """Couchbase MCP Server""" @@ -93,7 +158,14 @@ def main( "username": username, "password": password, "read_only_query_mode": read_only_query_mode, + "ca_cert_path" : ca_cert_path, + "client_cert_path" : client_cert_path } + try: + validate_authentication_method(ctx.obj) + except Exception as e: + logger.error(f"Failed to validate auth method params: {e}") + raise e mcp.run(transport=transport) @@ -107,6 +179,12 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: username = settings.get("username") password = settings.get("password") read_only_query_mode = settings.get("read_only_query_mode") + ca_cert_path = settings.get("ca_cert_path") + client_cert_path = settings.get("client_cert_path") + use_tls = True + if "://" in connection_string: + protocol = connection_string.split("://", 1)[0] + use_tls = (protocol[-1] == 's') # Validate configuration missing_vars = [] @@ -128,7 +206,18 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: try: logger.info("Creating Couchbase cluster connection...") - auth = PasswordAuthenticator(username, password) + #use client cert if provided, else user/password + if client_cert_path: + tls_conf = { + "cert_path" : f'{client_cert_path}/client.pem', + "key_path" : f'{client_cert_path}/client.key', + } + #set ca cert as trust store if provided + if ca_cert_path: + tls_conf["trust_store"] = ca_cert_path + auth = CertificateAuthenticator(**tls_conf) + else: + auth = PasswordAuthenticator(username, password) options = ClusterOptions(auth) options.apply_profile("wan_development") @@ -139,7 +228,13 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: yield AppContext( cluster=cluster, - read_only_query_mode=read_only_query_mode + ca_cert_path = ca_cert_path, + connection_string = connection_string, + username=username, + password=password, + read_only_query_mode=read_only_query_mode, + use_tls=use_tls, + client_cert_path=client_cert_path ) except Exception as e: From b68330ae5db9ef90ade5975333972c3b0434d21e Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 20 Jul 2025 20:52:39 +0300 Subject: [PATCH 07/15] Input validation fixes and Readme update for mTLS support --- README.md | 33 ++++++++++++++++++++++++++++++--- src/mcp_server.py | 20 ++------------------ 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 08b1eb8..b344a03 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th - There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID. - Retreive Index Advisor advice for a query on a specified bucket and scope. + ## Prerequisites - Python 3.10 or higher. @@ -31,7 +32,7 @@ git clone https://github.com/Couchbase-Ecosystem/mcp-server-couchbase.git ### Server Configuration for MCP Clients This is the common configuration for the MCP clients such as Claude Desktop, Cursor, Windsurf Editor. - +Using Basic Auth: ```json { "mcpServers": { @@ -47,6 +48,30 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur "CB_CONNECTION_STRING": "couchbases://connection-string", "CB_USERNAME": "username", "CB_PASSWORD": "password", + "CA_CERT_PATH" : "path/to/ca.crt" + } + } + } +} +``` + +Using mTLS: + +```json +{ + "mcpServers": { + "couchbase": { + "command": "uv", + "args": [ + "--directory", + "path/to/cloned/repo/mcp-server-couchbase/", + "run", + "src/mcp_server.py" + ], + "env": { + "CB_CONNECTION_STRING": "couchbases://connection-string", + "CLIENT_CERT_PATH": "path/to/client/cert_and_key/", + "CA_CERT_PATH" : "path/to/ca.crt" } } } @@ -56,8 +81,10 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur The server can be configured using environment variables. The following variables are supported: - `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster -- `CB_USERNAME`: The username with access to the bucket to use to connect -- `CB_PASSWORD`: The password for the username to connect +- `CB_USERNAME`: The username with access to the bucket to use to connect. Must be set if using Baisc Auth and unset if using mTLS. +- `CB_PASSWORD`: The password for the username to connect. Must be set if using Baisc Auth and unset if using mTLS with client certificate. +- `CLIENT_CERT_PATH`: The path to client certificate (named client.pem) and key (named client.key) for mTLS authentication. Must be set if using mTLS and unset if using Basic Auth with username and password. +- `CA_CERT_PATH`: The path to the CA certificate for trusted TLS connection to the server. If not provided, locally trusted certificate store will be used. - `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default. - `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end! diff --git a/src/mcp_server.py b/src/mcp_server.py index 9552cef..63e873e 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -186,35 +186,19 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: protocol = connection_string.split("://", 1)[0] use_tls = (protocol[-1] == 's') - # Validate configuration - missing_vars = [] - if not connection_string: - logger.error("Couchbase connection string is not set") - missing_vars.append("connection_string") - if not username: - logger.error("Couchbase database user is not set") - missing_vars.append("username") - if not password: - logger.error("Couchbase database password is not set") - missing_vars.append("password") - - - if missing_vars: - error_msg = f"Missing required configuration: {', '.join(missing_vars)}" - logger.error(error_msg) - raise ValueError(error_msg) try: logger.info("Creating Couchbase cluster connection...") #use client cert if provided, else user/password if client_cert_path: + client_cert_path = client_cert_path[:-1] if client_cert_path.endswith('/') else client_cert_path tls_conf = { "cert_path" : f'{client_cert_path}/client.pem', "key_path" : f'{client_cert_path}/client.key', } #set ca cert as trust store if provided if ca_cert_path: - tls_conf["trust_store"] = ca_cert_path + tls_conf["trust_store_path"] = ca_cert_path auth = CertificateAuthenticator(**tls_conf) else: auth = PasswordAuthenticator(username, password) From e86955806272aa55338d5da139c2bd98eaf70884 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 20 Jul 2025 21:40:34 +0300 Subject: [PATCH 08/15] Fixes after AI code review for merge. Used os.path.join for path concatination and changed raise exceptions to contain full trace. --- src/mcp_server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 63e873e..b4f4bff 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -66,7 +66,7 @@ def validate_authentication_method(params : dict ) -> bool: f"Client certificate files not found in {client_cert_path}. Required: client.pem and client.key." ) - if username or password: + if username or password or username == "" or password =="": raise click.BadParameter( "You must use either a client certificate or username/password, not both." ) @@ -165,7 +165,7 @@ def main( validate_authentication_method(ctx.obj) except Exception as e: logger.error(f"Failed to validate auth method params: {e}") - raise e + raise mcp.run(transport=transport) @@ -191,10 +191,10 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: logger.info("Creating Couchbase cluster connection...") #use client cert if provided, else user/password if client_cert_path: - client_cert_path = client_cert_path[:-1] if client_cert_path.endswith('/') else client_cert_path + tls_conf = { - "cert_path" : f'{client_cert_path}/client.pem', - "key_path" : f'{client_cert_path}/client.key', + "cert_path" : os.path.join(client_cert_path, "client.pem"), + "key_path" : os.path.join(client_cert_path, "client.key"), } #set ca cert as trust store if provided if ca_cert_path: @@ -250,7 +250,7 @@ def get_list_of_buckets_with_settings( return result except Exception as e: logger.error(f"Error getting bucket names: {e}") - raise e + raise @mcp.tool() From 45cd4b2cfaac3de09503cde9ca6c843efdf37439 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 20 Jul 2025 21:44:06 +0300 Subject: [PATCH 09/15] Additional raise exception fixes --- README.md | 4 ++-- src/mcp_server.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index b344a03..8c4a5e5 100644 --- a/README.md +++ b/README.md @@ -81,8 +81,8 @@ Using mTLS: The server can be configured using environment variables. The following variables are supported: - `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster -- `CB_USERNAME`: The username with access to the bucket to use to connect. Must be set if using Baisc Auth and unset if using mTLS. -- `CB_PASSWORD`: The password for the username to connect. Must be set if using Baisc Auth and unset if using mTLS with client certificate. +- `CB_USERNAME`: The username with access to the bucket to use to connect. Must be set if using Basic Auth and unset if using mTLS. +- `CB_PASSWORD`: The password for the username to connect. Must be set if using Basic Auth and unset if using mTLS with client certificate. - `CLIENT_CERT_PATH`: The path to client certificate (named client.pem) and key (named client.key) for mTLS authentication. Must be set if using mTLS and unset if using Basic Auth with username and password. - `CA_CERT_PATH`: The path to the CA certificate for trusted TLS connection to the server. If not provided, locally trusted certificate store will be used. - `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default. diff --git a/src/mcp_server.py b/src/mcp_server.py index b4f4bff..714a48f 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -387,7 +387,7 @@ def advise_index_for_sql_plus_plus_query( return response except Exception as e: logger.error(f"Error running Advise on query: {e}") - raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e @mcp.tool() def run_sql_plus_plus_query( From 08a3334db80dd21d78ac4ef6ddd7dc25125f9c36 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 20 Jul 2025 21:46:41 +0300 Subject: [PATCH 10/15] Fixed ambiguous reference to query param in advise tool function --- src/mcp_server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 5da4a42..a668db2 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -295,8 +295,8 @@ def advise_index_for_sql_plus_plus_query( response = {} try: - query = f"ADVISE {query}" - result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) + advise_query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query) if result and (advice := result[0].get("advice")): if (advice is not None): @@ -308,7 +308,7 @@ def advise_index_for_sql_plus_plus_query( return response except Exception as e: logger.error(f"Error running Advise on query: {e}") - raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e @mcp.tool() def run_sql_plus_plus_query( From 51616efe7775d3dd6cd5513bb7c350509ede389c Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Mon, 21 Jul 2025 13:00:01 +0300 Subject: [PATCH 11/15] added cert_path to basic auth config --- src/mcp_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 418ccd1..df7e840 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -201,7 +201,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: tls_conf["trust_store_path"] = ca_cert_path auth = CertificateAuthenticator(**tls_conf) else: - auth = PasswordAuthenticator(username, password) + auth = PasswordAuthenticator(username, password, cert_path = ca_cert_path) options = ClusterOptions(auth) options.apply_profile("wan_development") From 6ce9a88f06bae072537e0f2ebb497b908e81ed6e Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Sun, 27 Jul 2025 14:22:00 +0300 Subject: [PATCH 12/15] Fixed response message for connection test --- src/tools/server.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/tools/server.py b/src/tools/server.py index 6358add..117127d 100644 --- a/src/tools/server.py +++ b/src/tools/server.py @@ -57,13 +57,20 @@ def test_connection(ctx: Context, bucket_name: str = None) -> dict[str, Any]: if bucket_name is not None: try: bucket = ensure_bucket_connection(ctx, bucket_name) + bucket_connected = True + return { + "status": "success", + "cluster_connected": cluster_connected, + "bucket_connected": bucket_connected, + "message": f"Successfully connected to Couchbase cluster and bucket `{bucket_name}`", + } except Exception as e: return { "status": "error", "cluster_connected": cluster_connected, "bucket_connected": bucket_connected, "error": str(e), - "message": f"Failed to connect to bucket named {bucket_name}", + "message": f"Failed to connect to bucket named `{bucket_name}`", } else: return { From 5e9f9d07d429b674e1b224580e558a2ac1de3cbf Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Tue, 29 Jul 2025 00:04:22 +0300 Subject: [PATCH 13/15] fixed missing dependencies and accidental deletions from merge --- src/mcp_server.py | 4 +--- src/utils/config.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index c333852..0e01a0d 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -97,6 +97,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: default=DEFAULT_TRANSPORT, help="Transport mode for the server (stdio or sse)", ) +@click.pass_context def main( ctx, connection_string, @@ -134,8 +135,5 @@ def main( mcp.run(transport=transport) - - - if __name__ == "__main__": main() diff --git a/src/utils/config.py b/src/utils/config.py index fa132be..34b7702 100644 --- a/src/utils/config.py +++ b/src/utils/config.py @@ -1,7 +1,7 @@ import logging import click - +import os from .constants import MCP_SERVER_NAME logger = logging.getLogger(f"{MCP_SERVER_NAME}.utils.config") From 921bf4f427edb32a02a9dd74d10256cc26a39d02 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Thu, 28 Aug 2025 23:07:04 +0300 Subject: [PATCH 14/15] commiting unsaved changes from last commit --- src/mcp_server.py | 21 ++++++--------------- src/tools/query.py | 10 ---------- src/tools/server.py | 5 ----- 3 files changed, 6 insertions(+), 30 deletions(-) diff --git a/src/mcp_server.py b/src/mcp_server.py index 8a91883..caa865d 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -129,13 +129,10 @@ def main( password, read_only_query_mode, transport, -<<<<<<< HEAD ca_cert_path, - client_cert_path -======= + client_cert_path, host, port, ->>>>>>> main ): """Couchbase MCP Server""" # Store configuration in context @@ -144,23 +141,18 @@ def main( "username": username, "password": password, "read_only_query_mode": read_only_query_mode, -<<<<<<< HEAD "ca_cert_path" : ca_cert_path, - "client_cert_path" : client_cert_path - } + "client_cert_path" : client_cert_path, + "transport": transport, + "host": host, + "port": port, + } try: validate_authentication_method(ctx.obj) except Exception as e: logger.error(f"Failed to validate auth method params: {e}") raise - # Create MCP server inside main() - mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan) -======= - "transport": transport, - "host": host, - "port": port, - } # Map user-friendly transport names to SDK transport names sdk_transport = NETWORK_TRANSPORTS_SDK_MAPPING.get(transport, transport) @@ -177,7 +169,6 @@ def main( ) mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan, **config) ->>>>>>> main # Register all tools for tool in ALL_TOOLS: diff --git a/src/tools/query.py b/src/tools/query.py index 0d605e9..93c5073 100644 --- a/src/tools/query.py +++ b/src/tools/query.py @@ -17,28 +17,18 @@ def get_schema_for_collection( -<<<<<<< HEAD - ctx: Context, bucket_name: str, scope_name: str, collection_name: str -) -> list[dict[str, Any]]: -======= ctx: Context, scope_name: str, collection_name: str ) -> dict[str, Any]: ->>>>>>> upstream/main """Get the schema for a collection in the specified scope. Returns a dictionary with the collection name and the schema returned by running INFER query on the Couchbase collection. """ schema = {"collection_name": collection_name, "schema": []} try: query = f"INFER {collection_name}" -<<<<<<< HEAD - result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) - return result -======= result = run_sql_plus_plus_query(ctx, scope_name, query) # Result is a list of list of schemas. We convert it to a list of schemas. if result: schema["schema"] = result[0] ->>>>>>> upstream/main except Exception as e: logger.error(f"Error getting schema: {e}") raise diff --git a/src/tools/server.py b/src/tools/server.py index fcd690d..4cf916f 100644 --- a/src/tools/server.py +++ b/src/tools/server.py @@ -45,14 +45,9 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]: } -<<<<<<< HEAD -def test_connection(ctx: Context, bucket_name: str = None) -> dict[str, Any]: - """Test the connection to Couchbase cluster and optionally a specified bucket. -======= def test_cluster_connection(ctx: Context) -> dict[str, Any]: """Test the connection to Couchbase cluster and bucket. This tool verifies the connection to the Couchbase cluster and bucket by establishing the connection if it is not already established. ->>>>>>> upstream/main Returns connection status and basic cluster information. """ cluster_connected = False From e696b5b0d445aa16b41bbcc9f451c734f504b785 Mon Sep 17 00:00:00 2001 From: Eyal Nussbaum Date: Thu, 28 Aug 2025 23:48:25 +0300 Subject: [PATCH 15/15] resetting base fork to origin --- README.md | 1 + src/mcp_server.py | 8 +++- src/tools/__init__.py | 3 -- src/tools/kv.py | 32 +++++-------- src/tools/query.py | 45 ++----------------- src/tools/server.py | 99 ++++++++--------------------------------- src/utils/__init__.py | 4 +- src/utils/config.py | 2 +- src/utils/connection.py | 15 ++++++- src/utils/context.py | 49 ++++++++++++-------- 10 files changed, 86 insertions(+), 172 deletions(-) diff --git a/README.md b/README.md index 93bdef1..c52e53e 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,7 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur "CB_CONNECTION_STRING": "couchbases://connection-string", "CB_USERNAME": "username", "CB_PASSWORD": "password", + "CB_BUCKET_NAME": "bucket_name" } } } diff --git a/src/mcp_server.py b/src/mcp_server.py index 6f27bf4..dcb2f74 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -77,7 +77,11 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: envvar="CB_PASSWORD", help="Couchbase database password (required for operations)", ) - +@click.option( + "--bucket-name", + envvar="CB_BUCKET_NAME", + help="Couchbase bucket name (required for operations)", +) @click.option( "--read-only-query-mode", envvar=[ @@ -117,6 +121,7 @@ def main( connection_string, username, password, + bucket_name, read_only_query_mode, transport, host, @@ -128,6 +133,7 @@ def main( "connection_string": connection_string, "username": username, "password": password, + "bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, "transport": transport, "host": host, diff --git a/src/tools/__init__.py b/src/tools/__init__.py index b93170d..77b2c25 100644 --- a/src/tools/__init__.py +++ b/src/tools/__init__.py @@ -15,7 +15,6 @@ from .query import ( get_schema_for_collection, run_sql_plus_plus_query, - advise_index_for_sql_plus_plus_query, ) # Server tools @@ -35,7 +34,6 @@ delete_document_by_id, get_schema_for_collection, run_sql_plus_plus_query, - advise_index_for_sql_plus_plus_query, ] __all__ = [ @@ -48,7 +46,6 @@ "delete_document_by_id", "get_schema_for_collection", "run_sql_plus_plus_query", - "advise_index_for_sql_plus_plus_query", # Convenience "ALL_TOOLS", ] diff --git a/src/tools/kv.py b/src/tools/kv.py index a146059..7ffbd80 100644 --- a/src/tools/kv.py +++ b/src/tools/kv.py @@ -10,20 +10,17 @@ from mcp.server.fastmcp import Context from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_cluster_connection, ensure_bucket_connection +from utils.context import ensure_bucket_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.kv") def get_document_by_id( - ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str + ctx: Context, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: - """Get a document by its ID from the specified bucket, scope and collection.""" - try: - bucket = ensure_bucket_connection(ctx, bucket_name) - except Exception as e: - logger.error(f"Error accessing bucket: {e}") - raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e + """Get a document by its ID from the specified scope and collection. + If the document is not found, it will raise an exception.""" + bucket = ensure_bucket_connection(ctx) try: collection = bucket.scope(scope_name).collection(collection_name) result = collection.get(document_id) @@ -35,19 +32,14 @@ def get_document_by_id( def upsert_document_by_id( ctx: Context, - bucket_name: str, scope_name: str, collection_name: str, document_id: str, document_content: dict[str, Any], ) -> bool: - """Insert or update a document in a bucket, scope and collection by its ID. + """Insert or update a document by its ID. Returns True on success, False on failure.""" - try: - bucket = ensure_bucket_connection(ctx, bucket_name) - except Exception as e: - logger.error(f"Error accessing bucket: {e}") - raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e + bucket = ensure_bucket_connection(ctx) try: collection = bucket.scope(scope_name).collection(collection_name) collection.upsert(document_id, document_content) @@ -59,15 +51,11 @@ def upsert_document_by_id( def delete_document_by_id( - ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str + ctx: Context, scope_name: str, collection_name: str, document_id: str ) -> bool: - """Delete a document in a bucket, scope and collection by its ID. + """Delete a document by its ID. Returns True on success, False on failure.""" - try: - bucket = ensure_bucket_connection(ctx, bucket_name) - except Exception as e: - logger.error(f"Error accessing bucket: {e}") - raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e + bucket = ensure_bucket_connection(ctx) try: collection = bucket.scope(scope_name).collection(collection_name) collection.remove(document_id) diff --git a/src/tools/query.py b/src/tools/query.py index 0d605e9..2b8e44c 100644 --- a/src/tools/query.py +++ b/src/tools/query.py @@ -11,74 +11,35 @@ from mcp.server.fastmcp import Context from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_cluster_connection, ensure_bucket_connection +from utils.context import ensure_bucket_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.query") def get_schema_for_collection( -<<<<<<< HEAD - ctx: Context, bucket_name: str, scope_name: str, collection_name: str -) -> list[dict[str, Any]]: -======= ctx: Context, scope_name: str, collection_name: str ) -> dict[str, Any]: ->>>>>>> upstream/main """Get the schema for a collection in the specified scope. Returns a dictionary with the collection name and the schema returned by running INFER query on the Couchbase collection. """ schema = {"collection_name": collection_name, "schema": []} try: query = f"INFER {collection_name}" -<<<<<<< HEAD - result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) - return result -======= result = run_sql_plus_plus_query(ctx, scope_name, query) # Result is a list of list of schemas. We convert it to a list of schemas. if result: schema["schema"] = result[0] ->>>>>>> upstream/main except Exception as e: logger.error(f"Error getting schema: {e}") raise return schema -def advise_index_for_sql_plus_plus_query( - ctx: Context, bucket_name: str, scope_name: str, query: str -) -> dict[str, Any]: - """Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope. - Returns a dictionary with the query advised on, as well as: - 1. an array of the current indexes used and their status (or a string indicating no existing indexes available) - 2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements) - """ - response = {} - - try: - advise_query = f"ADVISE {query}" - result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query) - - if result and (advice := result[0].get("advice")): - if (advice is not None): - advise_info = advice.get("adviseinfo") - if ( advise_info is not None): - response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") - response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") - response["query"]=result[0].get("query","Query statement unavailable") - return response - except Exception as e: - logger.error(f"Error running Advise on query: {e}") - raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e def run_sql_plus_plus_query( - ctx: Context, bucket_name: str, scope_name: str, query: str + ctx: Context, scope_name: str, query: str ) -> list[dict[str, Any]]: """Run a SQL++ query on a scope and return the results as a list of JSON objects.""" - try: - bucket = ensure_bucket_connection(ctx, bucket_name) - except Exception as e: - logger.error(f"Error accessing bucket: {e}") - raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e + bucket = ensure_bucket_connection(ctx) app_context = ctx.request_context.lifespan_context read_only_query_mode = app_context.read_only_query_mode logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}") diff --git a/src/tools/server.py b/src/tools/server.py index fcd690d..2089667 100644 --- a/src/tools/server.py +++ b/src/tools/server.py @@ -11,7 +11,7 @@ from utils.config import get_settings from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_cluster_connection, ensure_bucket_connection +from utils.context import ensure_bucket_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.server") @@ -45,102 +45,39 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]: } -<<<<<<< HEAD -def test_connection(ctx: Context, bucket_name: str = None) -> dict[str, Any]: - """Test the connection to Couchbase cluster and optionally a specified bucket. -======= def test_cluster_connection(ctx: Context) -> dict[str, Any]: """Test the connection to Couchbase cluster and bucket. This tool verifies the connection to the Couchbase cluster and bucket by establishing the connection if it is not already established. ->>>>>>> upstream/main Returns connection status and basic cluster information. """ - cluster_connected = False - bucket_connected = False try: - cluster = ensure_cluster_connection(ctx) - cluster_connected = True - if bucket_name is not None: - try: - bucket = ensure_bucket_connection(ctx, bucket_name) - bucket_connected = True - return { - "status": "success", - "cluster_connected": cluster_connected, - "bucket_connected": bucket_connected, - "message": f"Successfully connected to Couchbase cluster and bucket `{bucket_name}`", - } - except Exception as e: - return { - "status": "error", - "cluster_connected": cluster_connected, - "bucket_connected": bucket_connected, - "error": str(e), - "message": f"Failed to connect to bucket named `{bucket_name}`", - } - else: - return { - "status": "success", - "cluster_connected": cluster_connected, - "message": "Successfully connected to Couchbase cluster", - } + bucket = ensure_bucket_connection(ctx) + + # Test basic connectivity by getting bucket name + bucket_name = bucket.name + + return { + "status": "success", + "cluster_connected": True, + "bucket_connected": True, + "bucket_name": bucket_name, + "message": "Successfully connected to Couchbase cluster and bucket", + } except Exception as e: return { "status": "error", - "cluster_connected": cluster_connected, + "cluster_connected": False, + "bucket_connected": False, "error": str(e), "message": "Failed to connect to Couchbase", } -def get_list_of_buckets_with_settings( - ctx: Context -) -> list[dict[str, Any]]: - """Get the list of buckets from the Couchbase cluster, including their bucket settings and additional statistics. - Returns a list of comprehensive bucket information objects including settings. - """ - - result = [] - - try: - cluster = ensure_cluster_connection(ctx) - bucket_manager = cluster.buckets() - buckets = bucket_manager.get_all_buckets() - - for bucket_settings in buckets: - # Convert BucketSettings object to dictionary using available attributes - bucket_dict = {"bucket_name": bucket_settings.name} - - # Add basic bucket settings with safe access - for attr in ["bucket_type", "ram_quota", "num_replicas", "replica_indexes", - "flush_enabled", "max_expiry", "compression_mode", - "minimum_durability_level", "storage_backend", "eviction_policy", - "conflict_resolution", "history_retention_collection_default", - "history_retention_bytes", "history_retention_duration"]: - if hasattr(bucket_settings, attr): - value = getattr(bucket_settings, attr) - # If the value has a .value attribute (enum), use that - if hasattr(value, 'value'): - bucket_dict[attr] = value.value - else: - bucket_dict[attr] = value - - result.append(bucket_dict) - - return result - except Exception as e: - logger.error(f"Error getting bucket information: {e}") - raise - -def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: - """Get the names of all scopes and collections for a specified bucket. +def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: + """Get the names of all scopes and collections in the bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - try: - bucket = ensure_bucket_connection(ctx, bucket_name) - except Exception as e: - logger.error(f"Error accessing bucket: {e}") - raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e + bucket = ensure_bucket_connection(ctx) try: scopes_collections = {} collection_manager = bucket.collections() diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 341d6cf..62c7127 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -13,6 +13,7 @@ # Connection utilities from .connection import ( + connect_to_bucket, connect_to_couchbase_cluster, ) @@ -32,7 +33,6 @@ # Context utilities from .context import ( AppContext, - ensure_cluster_connection, ensure_bucket_connection, ) @@ -46,9 +46,9 @@ "validate_connection_config", # Connection "connect_to_couchbase_cluster", + "connect_to_bucket", # Context "AppContext", - "ensure_cluster_connection", "ensure_bucket_connection", # Constants "MCP_SERVER_NAME", diff --git a/src/utils/config.py b/src/utils/config.py index 08abcc2..d2f4408 100644 --- a/src/utils/config.py +++ b/src/utils/config.py @@ -25,7 +25,7 @@ def get_settings() -> dict: def validate_connection_config() -> None: """Validate that all required parameters for the MCP server are available when needed.""" settings = get_settings() - required_params = ["connection_string", "username", "password"] + required_params = ["connection_string", "username", "password", "bucket_name"] missing_params = [] for param in required_params: diff --git a/src/utils/connection.py b/src/utils/connection.py index 0c6d5e9..9b07589 100644 --- a/src/utils/connection.py +++ b/src/utils/connection.py @@ -2,7 +2,7 @@ from datetime import timedelta from couchbase.auth import PasswordAuthenticator -from couchbase.cluster import Cluster +from couchbase.cluster import Bucket, Cluster from couchbase.options import ClusterOptions from .constants import MCP_SERVER_NAME @@ -31,3 +31,16 @@ def connect_to_couchbase_cluster( except Exception as e: logger.error(f"Failed to connect to Couchbase: {e}") raise + + +def connect_to_bucket(cluster: Cluster, bucket_name: str) -> Bucket: + """Connect to a bucket and return the bucket object if successful. + If the operation fails, it will raise an exception. + """ + try: + logger.info(f"Connecting to bucket: {bucket_name}") + bucket = cluster.bucket(bucket_name) + return bucket + except Exception as e: + logger.error(f"Failed to connect to bucket: {e}") + raise diff --git a/src/utils/context.py b/src/utils/context.py index 3c63ad6..a549160 100644 --- a/src/utils/context.py +++ b/src/utils/context.py @@ -5,7 +5,7 @@ from mcp.server.fastmcp import Context from utils.config import get_settings, validate_connection_config -from utils.connection import connect_to_couchbase_cluster +from utils.connection import connect_to_bucket, connect_to_couchbase_cluster from utils.constants import MCP_SERVER_NAME logger = logging.getLogger(f"{MCP_SERVER_NAME}.utils.context") @@ -43,27 +43,38 @@ def _set_cluster_in_lifespan_context(ctx: Context) -> None: raise -def ensure_bucket_connection(ctx: Context, bucket_name: str) -> Bucket: - """Ensure bucket connection is established and return the bucket object.""" - try: - cluster = ensure_cluster_connection(ctx) - except Exception as e: - logger.error(f"Unable to connect to Couchbase cluster {e}") - raise +def _set_bucket_in_lifespan_context(ctx: Context) -> None: + """Set the bucket in the lifespan context. + If the bucket is not set, it will try to connect to the bucket using the cluster object in the lifespan context. + If the cluster is not set, it will try to connect to the cluster using the connection string, username, and password. + If the connection fails, it will raise an exception. + """ + settings = get_settings() + bucket_name = settings.get("bucket_name") + + # If the bucket is not set, try to connect to the bucket using the cluster object in the lifespan context + app_context = ctx.request_context.lifespan_context + try: - bucket = cluster.bucket(bucket_name) + # If the cluster is not set, try to connect to the cluster + if not app_context.cluster: + _set_cluster_in_lifespan_context(ctx) + cluster = app_context.cluster + + # Try to connect to the bucket using the cluster object + bucket = connect_to_bucket(cluster, bucket_name) # type: ignore + app_context.bucket = bucket except Exception as e: - logger.error(f"Error accessing bucket: {e}") + logger.error( + f"Failed to connect to bucket: {e} \n Please check your bucket name and credentials." + ) raise - return bucket -def ensure_cluster_connection(ctx: Context) -> Cluster: - """Ensure cluster connection is established and return the cluster object.""" + +def ensure_bucket_connection(ctx: Context) -> Bucket: + """Ensure bucket connection is established and return the bucket object.""" validate_connection_config() app_context = ctx.request_context.lifespan_context - if not app_context.cluster: - try: - _set_cluster_in_lifespan_context(ctx) - except Exception as e: - raise - return app_context.cluster + if not app_context.bucket: + _set_bucket_in_lifespan_context(ctx) + return app_context.bucket