diff --git a/README.md b/README.md index c52e53e..93bdef1 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,6 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur "CB_CONNECTION_STRING": "couchbases://connection-string", "CB_USERNAME": "username", "CB_PASSWORD": "password", - "CB_BUCKET_NAME": "bucket_name" } } } diff --git a/src/mcp_server.py b/src/mcp_server.py index dcb2f74..6f27bf4 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -77,11 +77,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: envvar="CB_PASSWORD", help="Couchbase database password (required for operations)", ) -@click.option( - "--bucket-name", - envvar="CB_BUCKET_NAME", - help="Couchbase bucket name (required for operations)", -) + @click.option( "--read-only-query-mode", envvar=[ @@ -121,7 +117,6 @@ def main( connection_string, username, password, - bucket_name, read_only_query_mode, transport, host, @@ -133,7 +128,6 @@ def main( "connection_string": connection_string, "username": username, "password": password, - "bucket_name": bucket_name, "read_only_query_mode": read_only_query_mode, "transport": transport, "host": host, diff --git a/src/tools/__init__.py b/src/tools/__init__.py index 77b2c25..b93170d 100644 --- a/src/tools/__init__.py +++ b/src/tools/__init__.py @@ -15,6 +15,7 @@ from .query import ( get_schema_for_collection, run_sql_plus_plus_query, + advise_index_for_sql_plus_plus_query, ) # Server tools @@ -34,6 +35,7 @@ delete_document_by_id, get_schema_for_collection, run_sql_plus_plus_query, + advise_index_for_sql_plus_plus_query, ] __all__ = [ @@ -46,6 +48,7 @@ "delete_document_by_id", "get_schema_for_collection", "run_sql_plus_plus_query", + "advise_index_for_sql_plus_plus_query", # Convenience "ALL_TOOLS", ] diff --git a/src/tools/kv.py b/src/tools/kv.py index 7ffbd80..a146059 100644 --- a/src/tools/kv.py +++ b/src/tools/kv.py @@ -10,17 +10,20 @@ from mcp.server.fastmcp import Context from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_bucket_connection +from utils.context import ensure_cluster_connection, ensure_bucket_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.kv") def get_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> dict[str, Any]: - """Get a document by its ID from the specified scope and collection. - If the document is not found, it will raise an exception.""" - bucket = ensure_bucket_connection(ctx) + """Get a document by its ID from the specified bucket, scope and collection.""" + try: + bucket = ensure_bucket_connection(ctx, bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e try: collection = bucket.scope(scope_name).collection(collection_name) result = collection.get(document_id) @@ -32,14 +35,19 @@ def get_document_by_id( def upsert_document_by_id( ctx: Context, + bucket_name: str, scope_name: str, collection_name: str, document_id: str, document_content: dict[str, Any], ) -> bool: - """Insert or update a document by its ID. + """Insert or update a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ensure_bucket_connection(ctx) + try: + bucket = ensure_bucket_connection(ctx, bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e try: collection = bucket.scope(scope_name).collection(collection_name) collection.upsert(document_id, document_content) @@ -51,11 +59,15 @@ def upsert_document_by_id( def delete_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str ) -> bool: - """Delete a document by its ID. + """Delete a document in a bucket, scope and collection by its ID. Returns True on success, False on failure.""" - bucket = ensure_bucket_connection(ctx) + try: + bucket = ensure_bucket_connection(ctx, bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e try: collection = bucket.scope(scope_name).collection(collection_name) collection.remove(document_id) diff --git a/src/tools/query.py b/src/tools/query.py index 2b8e44c..0d605e9 100644 --- a/src/tools/query.py +++ b/src/tools/query.py @@ -11,35 +11,74 @@ from mcp.server.fastmcp import Context from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_bucket_connection +from utils.context import ensure_cluster_connection, ensure_bucket_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.query") def get_schema_for_collection( +<<<<<<< HEAD + ctx: Context, bucket_name: str, scope_name: str, collection_name: str +) -> list[dict[str, Any]]: +======= ctx: Context, scope_name: str, collection_name: str ) -> dict[str, Any]: +>>>>>>> upstream/main """Get the schema for a collection in the specified scope. Returns a dictionary with the collection name and the schema returned by running INFER query on the Couchbase collection. """ schema = {"collection_name": collection_name, "schema": []} try: query = f"INFER {collection_name}" +<<<<<<< HEAD + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query) + return result +======= result = run_sql_plus_plus_query(ctx, scope_name, query) # Result is a list of list of schemas. We convert it to a list of schemas. if result: schema["schema"] = result[0] +>>>>>>> upstream/main except Exception as e: logger.error(f"Error getting schema: {e}") raise return schema +def advise_index_for_sql_plus_plus_query( + ctx: Context, bucket_name: str, scope_name: str, query: str +) -> dict[str, Any]: + """Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope. + Returns a dictionary with the query advised on, as well as: + 1. an array of the current indexes used and their status (or a string indicating no existing indexes available) + 2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements) + """ + response = {} + + try: + advise_query = f"ADVISE {query}" + result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query) + + if result and (advice := result[0].get("advice")): + if (advice is not None): + advise_info = advice.get("adviseinfo") + if ( advise_info is not None): + response["current_indexes"] = advise_info.get("current_indexes", "No current indexes") + response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available") + response["query"]=result[0].get("query","Query statement unavailable") + return response + except Exception as e: + logger.error(f"Error running Advise on query: {e}") + raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e def run_sql_plus_plus_query( - ctx: Context, scope_name: str, query: str + ctx: Context, bucket_name: str, scope_name: str, query: str ) -> list[dict[str, Any]]: """Run a SQL++ query on a scope and return the results as a list of JSON objects.""" - bucket = ensure_bucket_connection(ctx) + try: + bucket = ensure_bucket_connection(ctx, bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e app_context = ctx.request_context.lifespan_context read_only_query_mode = app_context.read_only_query_mode logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}") diff --git a/src/tools/server.py b/src/tools/server.py index 2089667..fcd690d 100644 --- a/src/tools/server.py +++ b/src/tools/server.py @@ -11,7 +11,7 @@ from utils.config import get_settings from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_bucket_connection +from utils.context import ensure_cluster_connection, ensure_bucket_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.server") @@ -45,39 +45,102 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]: } +<<<<<<< HEAD +def test_connection(ctx: Context, bucket_name: str = None) -> dict[str, Any]: + """Test the connection to Couchbase cluster and optionally a specified bucket. +======= def test_cluster_connection(ctx: Context) -> dict[str, Any]: """Test the connection to Couchbase cluster and bucket. This tool verifies the connection to the Couchbase cluster and bucket by establishing the connection if it is not already established. +>>>>>>> upstream/main Returns connection status and basic cluster information. """ + cluster_connected = False + bucket_connected = False try: - bucket = ensure_bucket_connection(ctx) - - # Test basic connectivity by getting bucket name - bucket_name = bucket.name - - return { - "status": "success", - "cluster_connected": True, - "bucket_connected": True, - "bucket_name": bucket_name, - "message": "Successfully connected to Couchbase cluster and bucket", - } + cluster = ensure_cluster_connection(ctx) + cluster_connected = True + if bucket_name is not None: + try: + bucket = ensure_bucket_connection(ctx, bucket_name) + bucket_connected = True + return { + "status": "success", + "cluster_connected": cluster_connected, + "bucket_connected": bucket_connected, + "message": f"Successfully connected to Couchbase cluster and bucket `{bucket_name}`", + } + except Exception as e: + return { + "status": "error", + "cluster_connected": cluster_connected, + "bucket_connected": bucket_connected, + "error": str(e), + "message": f"Failed to connect to bucket named `{bucket_name}`", + } + else: + return { + "status": "success", + "cluster_connected": cluster_connected, + "message": "Successfully connected to Couchbase cluster", + } except Exception as e: return { "status": "error", - "cluster_connected": False, - "bucket_connected": False, + "cluster_connected": cluster_connected, "error": str(e), "message": "Failed to connect to Couchbase", } -def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: - """Get the names of all scopes and collections in the bucket. +def get_list_of_buckets_with_settings( + ctx: Context +) -> list[dict[str, Any]]: + """Get the list of buckets from the Couchbase cluster, including their bucket settings and additional statistics. + Returns a list of comprehensive bucket information objects including settings. + """ + + result = [] + + try: + cluster = ensure_cluster_connection(ctx) + bucket_manager = cluster.buckets() + buckets = bucket_manager.get_all_buckets() + + for bucket_settings in buckets: + # Convert BucketSettings object to dictionary using available attributes + bucket_dict = {"bucket_name": bucket_settings.name} + + # Add basic bucket settings with safe access + for attr in ["bucket_type", "ram_quota", "num_replicas", "replica_indexes", + "flush_enabled", "max_expiry", "compression_mode", + "minimum_durability_level", "storage_backend", "eviction_policy", + "conflict_resolution", "history_retention_collection_default", + "history_retention_bytes", "history_retention_duration"]: + if hasattr(bucket_settings, attr): + value = getattr(bucket_settings, attr) + # If the value has a .value attribute (enum), use that + if hasattr(value, 'value'): + bucket_dict[attr] = value.value + else: + bucket_dict[attr] = value + + result.append(bucket_dict) + + return result + except Exception as e: + logger.error(f"Error getting bucket information: {e}") + raise + +def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]: + """Get the names of all scopes and collections for a specified bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - bucket = ensure_bucket_connection(ctx) + try: + bucket = ensure_bucket_connection(ctx, bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e try: scopes_collections = {} collection_manager = bucket.collections() diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 62c7127..341d6cf 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -13,7 +13,6 @@ # Connection utilities from .connection import ( - connect_to_bucket, connect_to_couchbase_cluster, ) @@ -33,6 +32,7 @@ # Context utilities from .context import ( AppContext, + ensure_cluster_connection, ensure_bucket_connection, ) @@ -46,9 +46,9 @@ "validate_connection_config", # Connection "connect_to_couchbase_cluster", - "connect_to_bucket", # Context "AppContext", + "ensure_cluster_connection", "ensure_bucket_connection", # Constants "MCP_SERVER_NAME", diff --git a/src/utils/config.py b/src/utils/config.py index d2f4408..08abcc2 100644 --- a/src/utils/config.py +++ b/src/utils/config.py @@ -25,7 +25,7 @@ def get_settings() -> dict: def validate_connection_config() -> None: """Validate that all required parameters for the MCP server are available when needed.""" settings = get_settings() - required_params = ["connection_string", "username", "password", "bucket_name"] + required_params = ["connection_string", "username", "password"] missing_params = [] for param in required_params: diff --git a/src/utils/connection.py b/src/utils/connection.py index 9b07589..0c6d5e9 100644 --- a/src/utils/connection.py +++ b/src/utils/connection.py @@ -2,7 +2,7 @@ from datetime import timedelta from couchbase.auth import PasswordAuthenticator -from couchbase.cluster import Bucket, Cluster +from couchbase.cluster import Cluster from couchbase.options import ClusterOptions from .constants import MCP_SERVER_NAME @@ -31,16 +31,3 @@ def connect_to_couchbase_cluster( except Exception as e: logger.error(f"Failed to connect to Couchbase: {e}") raise - - -def connect_to_bucket(cluster: Cluster, bucket_name: str) -> Bucket: - """Connect to a bucket and return the bucket object if successful. - If the operation fails, it will raise an exception. - """ - try: - logger.info(f"Connecting to bucket: {bucket_name}") - bucket = cluster.bucket(bucket_name) - return bucket - except Exception as e: - logger.error(f"Failed to connect to bucket: {e}") - raise diff --git a/src/utils/context.py b/src/utils/context.py index a549160..3c63ad6 100644 --- a/src/utils/context.py +++ b/src/utils/context.py @@ -5,7 +5,7 @@ from mcp.server.fastmcp import Context from utils.config import get_settings, validate_connection_config -from utils.connection import connect_to_bucket, connect_to_couchbase_cluster +from utils.connection import connect_to_couchbase_cluster from utils.constants import MCP_SERVER_NAME logger = logging.getLogger(f"{MCP_SERVER_NAME}.utils.context") @@ -43,38 +43,27 @@ def _set_cluster_in_lifespan_context(ctx: Context) -> None: raise -def _set_bucket_in_lifespan_context(ctx: Context) -> None: - """Set the bucket in the lifespan context. - If the bucket is not set, it will try to connect to the bucket using the cluster object in the lifespan context. - If the cluster is not set, it will try to connect to the cluster using the connection string, username, and password. - If the connection fails, it will raise an exception. - """ - settings = get_settings() - bucket_name = settings.get("bucket_name") - - # If the bucket is not set, try to connect to the bucket using the cluster object in the lifespan context - app_context = ctx.request_context.lifespan_context - +def ensure_bucket_connection(ctx: Context, bucket_name: str) -> Bucket: + """Ensure bucket connection is established and return the bucket object.""" try: - # If the cluster is not set, try to connect to the cluster - if not app_context.cluster: - _set_cluster_in_lifespan_context(ctx) - cluster = app_context.cluster - - # Try to connect to the bucket using the cluster object - bucket = connect_to_bucket(cluster, bucket_name) # type: ignore - app_context.bucket = bucket + cluster = ensure_cluster_connection(ctx) except Exception as e: - logger.error( - f"Failed to connect to bucket: {e} \n Please check your bucket name and credentials." - ) + logger.error(f"Unable to connect to Couchbase cluster {e}") raise + try: + bucket = cluster.bucket(bucket_name) + except Exception as e: + logger.error(f"Error accessing bucket: {e}") + raise + return bucket - -def ensure_bucket_connection(ctx: Context) -> Bucket: - """Ensure bucket connection is established and return the bucket object.""" +def ensure_cluster_connection(ctx: Context) -> Cluster: + """Ensure cluster connection is established and return the cluster object.""" validate_connection_config() app_context = ctx.request_context.lifespan_context - if not app_context.bucket: - _set_bucket_in_lifespan_context(ctx) - return app_context.bucket + if not app_context.cluster: + try: + _set_cluster_in_lifespan_context(ctx) + except Exception as e: + raise + return app_context.cluster