diff --git a/README.md b/README.md index c52e53e..ac4008e 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,10 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th ## Features +- Get a list of all the buckets in the cluster - Get a list of all the scopes and collections in the specified bucket +- Get a list of all the scopes in the specified bucket +- Get a list of all the collections in a specified scope and bucket. Note that this tool requires the cluster to have Query services. - Get the structure for a collection - Get a document by ID from a specified scope and collection - Upsert a document by ID to a specified scope and collection @@ -102,16 +105,16 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur The server can be configured using environment variables or command line arguments: -| Environment Variable | CLI Argument | Description | Default | -| ----------------------------- | ------------------------ | ------------------------------------------ | ------------ | -| `CB_CONNECTION_STRING` | `--connection-string` | Connection string to the Couchbase cluster | **Required** | -| `CB_USERNAME` | `--username` | Username with bucket access | **Required** | -| `CB_PASSWORD` | `--password` | Password for authentication | **Required** | -| `CB_BUCKET_NAME` | `--bucket-name` | Name of the bucket to access | **Required** | -| `CB_MCP_READ_ONLY_QUERY_MODE` | `--read-only-query-mode` | Prevent data modification queries | `true` | -| `CB_MCP_TRANSPORT` | `--transport` | Transport mode: `stdio`, `http`, `sse` | `stdio` | -| `CB_MCP_HOST` | `--host` | Host for HTTP/SSE transport modes | `127.0.0.1` | -| `CB_MCP_PORT` | `--port` | Port for HTTP/SSE transport modes | `8000` | +| Environment Variable | CLI Argument | Description | Default | +| ----------------------------- | ------------------------ | ------------------------------------------------------ | ------------ | +| `CB_CONNECTION_STRING` | `--connection-string` | Connection string to the Couchbase cluster | **Required** | +| `CB_USERNAME` | `--username` | Username with access to required buckets | **Required** | +| `CB_PASSWORD` | `--password` | Password for authentication | **Required** | +| `CB_BUCKET_NAME` | `--bucket-name` | Default bucket name when not it is not passed to tools | Not set | +| `CB_MCP_READ_ONLY_QUERY_MODE` | `--read-only-query-mode` | Prevent data modification queries | `true` | +| `CB_MCP_TRANSPORT` | `--transport` | Transport mode: `stdio`, `http`, `sse` | `stdio` | +| `CB_MCP_HOST` | `--host` | Host for HTTP/SSE transport modes | `127.0.0.1` | +| `CB_MCP_PORT` | `--port` | Port for HTTP/SSE transport modes | `8000` | You can also check the version of the server using: @@ -210,7 +213,7 @@ uvx couchbase-mcp-server \ --connection-string='' \ --username='' \ --password='' \ - --bucket-name='' \ + --bucket-name='' \ --read-only-query-mode=true \ --transport=http ``` @@ -244,7 +247,7 @@ uvx couchbase-mcp-server \ --connection-string='' \ --username='' \ --password='' \ - --bucket-name='' \ + --bucket-name='' \ --read-only-query-mode=true \ --transport=sse ``` @@ -377,7 +380,7 @@ The Couchbase MCP server can also be used as a managed server in your agentic ap ## Troubleshooting Tips - Ensure the path to your MCP server repository is correct in the configuration if running from source. -- Verify that your Couchbase connection string, database username, password and bucket name are correct. +- Verify that your Couchbase connection string, database username, password are correct. - If using Couchbase Capella, ensure that the cluster is [accessible](https://docs.couchbase.com/cloud/clusters/allow-ip-address.html) from the machine where the MCP server is running. - Check that the database user has proper permissions to access the specified bucket. - Confirm that the `uv` package manager is properly installed and accessible. You may need to provide absolute path to `uv`/`uvx` in the `command` field in the configuration. diff --git a/src/mcp_server.py b/src/mcp_server.py index dcb2f74..a2b1883 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -80,7 +80,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: @click.option( "--bucket-name", envvar="CB_BUCKET_NAME", - help="Couchbase bucket name (required for operations)", + help="Couchbase bucket name. If not provided, the list of buckets can be discovered using tools.", ) @click.option( "--read-only-query-mode", diff --git a/src/tools/__init__.py b/src/tools/__init__.py index 77b2c25..8ac42a5 100644 --- a/src/tools/__init__.py +++ b/src/tools/__init__.py @@ -19,16 +19,22 @@ # Server tools from .server import ( + get_buckets_in_cluster, + get_collections_in_scope, get_scopes_and_collections_in_bucket, + get_scopes_in_bucket, get_server_configuration_status, test_cluster_connection, ) # List of all tools for easy registration ALL_TOOLS = [ + get_buckets_in_cluster, get_server_configuration_status, test_cluster_connection, get_scopes_and_collections_in_bucket, + get_collections_in_scope, + get_scopes_in_bucket, get_document_by_id, upsert_document_by_id, delete_document_by_id, @@ -41,6 +47,9 @@ "get_server_configuration_status", "test_cluster_connection", "get_scopes_and_collections_in_bucket", + "get_collections_in_scope", + "get_scopes_in_bucket", + "get_buckets_in_cluster", "get_document_by_id", "upsert_document_by_id", "delete_document_by_id", diff --git a/src/tools/kv.py b/src/tools/kv.py index 7ffbd80..b625b8d 100644 --- a/src/tools/kv.py +++ b/src/tools/kv.py @@ -9,18 +9,27 @@ from mcp.server.fastmcp import Context +from utils.config import resolve_bucket_name +from utils.connection import connect_to_bucket from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_bucket_connection +from utils.context import get_cluster_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.kv") def get_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, + scope_name: str, + collection_name: str, + document_id: str, + bucket_name: str | None = None, ) -> dict[str, Any]: """Get a document by its ID from the specified scope and collection. If the document is not found, it will raise an exception.""" - bucket = ensure_bucket_connection(ctx) + + cluster = get_cluster_connection(ctx) + bucket_name = resolve_bucket_name(bucket_name) + bucket = connect_to_bucket(cluster, bucket_name) try: collection = bucket.scope(scope_name).collection(collection_name) result = collection.get(document_id) @@ -36,10 +45,13 @@ def upsert_document_by_id( collection_name: str, document_id: str, document_content: dict[str, Any], + bucket_name: str | None = None, ) -> bool: """Insert or update a document by its ID. Returns True on success, False on failure.""" - bucket = ensure_bucket_connection(ctx) + cluster = get_cluster_connection(ctx) + bucket_name = resolve_bucket_name(bucket_name) + bucket = connect_to_bucket(cluster, bucket_name) try: collection = bucket.scope(scope_name).collection(collection_name) collection.upsert(document_id, document_content) @@ -51,11 +63,17 @@ def upsert_document_by_id( def delete_document_by_id( - ctx: Context, scope_name: str, collection_name: str, document_id: str + ctx: Context, + scope_name: str, + collection_name: str, + document_id: str, + bucket_name: str | None = None, ) -> bool: """Delete a document by its ID. Returns True on success, False on failure.""" - bucket = ensure_bucket_connection(ctx) + cluster = get_cluster_connection(ctx) + bucket_name = resolve_bucket_name(bucket_name) + bucket = connect_to_bucket(cluster, bucket_name) try: collection = bucket.scope(scope_name).collection(collection_name) collection.remove(document_id) diff --git a/src/tools/query.py b/src/tools/query.py index 2b8e44c..d78c42d 100644 --- a/src/tools/query.py +++ b/src/tools/query.py @@ -10,14 +10,16 @@ from lark_sqlpp import modifies_data, modifies_structure, parse_sqlpp from mcp.server.fastmcp import Context +from utils.config import resolve_bucket_name +from utils.connection import connect_to_bucket from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_bucket_connection +from utils.context import get_cluster_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.query") def get_schema_for_collection( - ctx: Context, scope_name: str, collection_name: str + ctx: Context, scope_name: str, collection_name: str, bucket_name: str | None = None ) -> dict[str, Any]: """Get the schema for a collection in the specified scope. Returns a dictionary with the collection name and the schema returned by running INFER query on the Couchbase collection. @@ -25,7 +27,7 @@ def get_schema_for_collection( schema = {"collection_name": collection_name, "schema": []} try: query = f"INFER {collection_name}" - result = run_sql_plus_plus_query(ctx, scope_name, query) + result = run_sql_plus_plus_query(ctx, scope_name, query, bucket_name) # Result is a list of list of schemas. We convert it to a list of schemas. if result: schema["schema"] = result[0] @@ -36,10 +38,14 @@ def get_schema_for_collection( def run_sql_plus_plus_query( - ctx: Context, scope_name: str, query: str + ctx: Context, scope_name: str, query: str, bucket_name: str | None = None ) -> list[dict[str, Any]]: """Run a SQL++ query on a scope and return the results as a list of JSON objects.""" - bucket = ensure_bucket_connection(ctx) + cluster = get_cluster_connection(ctx) + bucket_name = resolve_bucket_name(bucket_name) + + bucket = connect_to_bucket(cluster, bucket_name) + app_context = ctx.request_context.lifespan_context read_only_query_mode = app_context.read_only_query_mode logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}") @@ -75,3 +81,20 @@ def run_sql_plus_plus_query( except Exception as e: logger.error(f"Error running query: {e!s}", exc_info=True) raise + + +# Don't expose this function to the MCP server until we have a use case +def run_cluster_query(ctx: Context, query: str, **kwargs: Any) -> list[dict[str, Any]]: + """Run a query on the cluster object and return the results as a list of JSON objects.""" + + cluster = get_cluster_connection(ctx) + results = [] + + try: + result = cluster.query(query, **kwargs) + for row in result: + results.append(row) + return results + except Exception as e: + logger.error(f"Error running query: {e}") + raise diff --git a/src/tools/server.py b/src/tools/server.py index 2089667..b1b0d6f 100644 --- a/src/tools/server.py +++ b/src/tools/server.py @@ -1,7 +1,7 @@ """ Tools for server operations. -This module contains tools for getting the server status, testing the connection, and getting the scopes and collections in the bucket. +This module contains tools for getting the server status, testing the connection, and getting the buckets in the cluster, the scopes and collections in the bucket. """ import logging @@ -9,9 +9,11 @@ from mcp.server.fastmcp import Context -from utils.config import get_settings +from tools.query import run_cluster_query +from utils.config import get_settings, resolve_bucket_name +from utils.connection import connect_to_bucket from utils.constants import MCP_SERVER_NAME -from utils.context import ensure_bucket_connection +from utils.context import get_cluster_connection logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.server") @@ -34,7 +36,6 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]: app_context = ctx.request_context.lifespan_context connection_status = { "cluster_connected": app_context.cluster is not None, - "bucket_connected": app_context.bucket is not None, } return { @@ -45,39 +46,45 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]: } -def test_cluster_connection(ctx: Context) -> dict[str, Any]: - """Test the connection to Couchbase cluster and bucket. +def test_cluster_connection( + ctx: Context, bucket_name: str | None = None +) -> dict[str, Any]: + """Test the connection to Couchbase cluster and optionally to a bucket. This tool verifies the connection to the Couchbase cluster and bucket by establishing the connection if it is not already established. + If bucket name is not provided, it will not try to connect to the bucket specified in the MCP server settings. Returns connection status and basic cluster information. """ try: - bucket = ensure_bucket_connection(ctx) - - # Test basic connectivity by getting bucket name - bucket_name = bucket.name + cluster = get_cluster_connection(ctx) + bucket_name = resolve_bucket_name(bucket_name) + bucket = connect_to_bucket(cluster, bucket_name) return { "status": "success", - "cluster_connected": True, - "bucket_connected": True, + "cluster_connected": cluster.connected, + "bucket_connected": bucket is not None, "bucket_name": bucket_name, - "message": "Successfully connected to Couchbase cluster and bucket", + "message": "Successfully connected to Couchbase cluster", } except Exception as e: return { "status": "error", "cluster_connected": False, "bucket_connected": False, + "bucket_name": bucket_name, "error": str(e), "message": "Failed to connect to Couchbase", } -def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: +def get_scopes_and_collections_in_bucket( + ctx: Context, bucket_name: str +) -> dict[str, list[str]]: """Get the names of all scopes and collections in the bucket. Returns a dictionary with scope names as keys and lists of collection names as values. """ - bucket = ensure_bucket_connection(ctx) + cluster = get_cluster_connection(ctx) + bucket = connect_to_bucket(cluster, bucket_name) try: scopes_collections = {} collection_manager = bucket.collections() @@ -89,3 +96,44 @@ def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]: except Exception as e: logger.error(f"Error getting scopes and collections: {e}") raise + + +def get_buckets_in_cluster(ctx: Context) -> list[str]: + """Get the names of all the accessible buckets in the cluster.""" + cluster = get_cluster_connection(ctx) + bucket_manager = cluster.buckets() + buckets_with_settings = bucket_manager.get_all_buckets() + + buckets = [] + for bucket in buckets_with_settings: + buckets.append(bucket.name) + + return buckets + + +def get_scopes_in_bucket(ctx: Context, bucket_name: str | None = None) -> list[str]: + """Get the names of all scopes in the given bucket.""" + cluster = get_cluster_connection(ctx) + bucket_name = resolve_bucket_name(bucket_name) + bucket = connect_to_bucket(cluster, bucket_name) + try: + scopes = bucket.collections().get_all_scopes() + return [scope.name for scope in scopes] + except Exception as e: + logger.error(f"Error getting scopes and collections: {e}") + raise + + +def get_collections_in_scope( + ctx: Context, scope_name: str, bucket_name: str | None = None +) -> list[str]: + """Get the names of all collections in the given scope and bucket.""" + bucket_name = resolve_bucket_name(bucket_name) + + if not scope_name: + raise ValueError("Scope name is required") + + # Get the collections in the scope using system:all_keyspaces collection + query = "SELECT DISTINCT(name) as collection_name FROM system:all_keyspaces where `bucket`=$bucket and `scope`=$scope" + results = run_cluster_query(ctx, query, bucket=bucket_name, scope=scope_name) + return [result["collection_name"] for result in results] diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 62c7127..3094e93 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -7,8 +7,7 @@ # Configuration utilities from .config import ( get_settings, - validate_connection_config, - validate_required_param, + resolve_bucket_name, ) # Connection utilities @@ -33,7 +32,7 @@ # Context utilities from .context import ( AppContext, - ensure_bucket_connection, + get_cluster_connection, ) # Note: Individual modules create their own hierarchical loggers using: @@ -42,14 +41,13 @@ __all__ = [ # Config "get_settings", - "validate_required_param", - "validate_connection_config", + "resolve_bucket_name", # Connection "connect_to_couchbase_cluster", "connect_to_bucket", # Context "AppContext", - "ensure_bucket_connection", + "get_cluster_connection", # Constants "MCP_SERVER_NAME", "DEFAULT_READ_ONLY_MODE", diff --git a/src/utils/config.py b/src/utils/config.py index d2f4408..0545fd2 100644 --- a/src/utils/config.py +++ b/src/utils/config.py @@ -7,32 +7,27 @@ logger = logging.getLogger(f"{MCP_SERVER_NAME}.utils.config") -def validate_required_param( - ctx: click.Context, param: click.Parameter, value: str | None -) -> str: - """Validate that a required parameter is not empty.""" - if not value or value.strip() == "": - raise click.BadParameter(f"{param.name} cannot be empty") - return value - - def get_settings() -> dict: """Get settings from Click context.""" ctx = click.get_current_context() return ctx.obj or {} -def validate_connection_config() -> None: - """Validate that all required parameters for the MCP server are available when needed.""" - settings = get_settings() - required_params = ["connection_string", "username", "password", "bucket_name"] - missing_params = [] +def get_bucket_name_from_settings() -> str | None: + """Get bucket name from Click context.""" + ctx = click.get_current_context() + return ctx.obj.get("bucket_name") + - for param in required_params: - if not settings.get(param): - missing_params.append(param) +def resolve_bucket_name(passed_bucket_name: str | None = None) -> str: + """Resolve the bucket name from passed argument or passed settings and validate it. - if missing_params: - error_msg = f"Missing required parameters for the MCP server: {', '.join(missing_params)}" - logger.error(error_msg) - raise ValueError(error_msg) + Returns the resolved bucket name, or raises a ValueError if missing. + """ + # If passed bucket name is provided, use it. + # If not, check if bucket name is provided in the settings. + # If not, raise an error. + bucket_name = passed_bucket_name or get_bucket_name_from_settings() + if not bucket_name: + raise ValueError("Bucket name is required") + return bucket_name diff --git a/src/utils/connection.py b/src/utils/connection.py index 9b07589..09978fd 100644 --- a/src/utils/connection.py +++ b/src/utils/connection.py @@ -38,7 +38,6 @@ def connect_to_bucket(cluster: Cluster, bucket_name: str) -> Bucket: If the operation fails, it will raise an exception. """ try: - logger.info(f"Connecting to bucket: {bucket_name}") bucket = cluster.bucket(bucket_name) return bucket except Exception as e: diff --git a/src/utils/context.py b/src/utils/context.py index a549160..eb46d3a 100644 --- a/src/utils/context.py +++ b/src/utils/context.py @@ -1,11 +1,11 @@ import logging from dataclasses import dataclass -from couchbase.cluster import Bucket, Cluster +from couchbase.cluster import Cluster from mcp.server.fastmcp import Context -from utils.config import get_settings, validate_connection_config -from utils.connection import connect_to_bucket, connect_to_couchbase_cluster +from utils.config import get_settings +from utils.connection import connect_to_couchbase_cluster from utils.constants import MCP_SERVER_NAME logger = logging.getLogger(f"{MCP_SERVER_NAME}.utils.context") @@ -16,7 +16,7 @@ class AppContext: """Context for the MCP server.""" cluster: Cluster | None = None - bucket: Bucket | None = None + bucket_name: str | None = None read_only_query_mode: bool = True @@ -43,38 +43,11 @@ def _set_cluster_in_lifespan_context(ctx: Context) -> None: raise -def _set_bucket_in_lifespan_context(ctx: Context) -> None: - """Set the bucket in the lifespan context. - If the bucket is not set, it will try to connect to the bucket using the cluster object in the lifespan context. +def get_cluster_connection(ctx: Context) -> Cluster: + """Return the cluster connection from the lifespan context. If the cluster is not set, it will try to connect to the cluster using the connection string, username, and password. - If the connection fails, it will raise an exception. """ - settings = get_settings() - bucket_name = settings.get("bucket_name") - - # If the bucket is not set, try to connect to the bucket using the cluster object in the lifespan context - app_context = ctx.request_context.lifespan_context - - try: - # If the cluster is not set, try to connect to the cluster - if not app_context.cluster: - _set_cluster_in_lifespan_context(ctx) - cluster = app_context.cluster - - # Try to connect to the bucket using the cluster object - bucket = connect_to_bucket(cluster, bucket_name) # type: ignore - app_context.bucket = bucket - except Exception as e: - logger.error( - f"Failed to connect to bucket: {e} \n Please check your bucket name and credentials." - ) - raise - - -def ensure_bucket_connection(ctx: Context) -> Bucket: - """Ensure bucket connection is established and return the bucket object.""" - validate_connection_config() app_context = ctx.request_context.lifespan_context - if not app_context.bucket: - _set_bucket_in_lifespan_context(ctx) - return app_context.bucket + if not app_context.cluster: + _set_cluster_in_lifespan_context(ctx) + return app_context.cluster