Skip to content

feat: kafka message dispatcher #1641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 174 commits into from
Apr 23, 2025
Merged

feat: kafka message dispatcher #1641

merged 174 commits into from
Apr 23, 2025

Conversation

nas-tabchiche
Copy link
Contributor

@nas-tabchiche nas-tabchiche commented Mar 7, 2025

  • Draft kafka consumer specs
  • Consumer initial commit

Summary by CodeRabbit

  • New Features

    • Introduced a dispatcher component that automates actions by consuming Kafka messages and interacting with the CISO Assistant REST API.
    • Added command-line interface (CLI) for authentication, configuration, and message consumption.
    • Enabled support for updating applied controls, requirement assessments, and uploading attachments via Kafka-driven messages.
    • Enhanced backend filtering capabilities for applied controls and requirement assessments.
    • Added flexible configuration through YAML files and environment variables, including S3 storage and Kafka authentication options.
    • Provided Docker support, including Dockerfile, Compose templates, and sample Kafka environments.
    • Added utility modules for API requests and Kafka configuration.
    • Added new Kafka consumer design documentation and message schema definitions.
    • Introduced scripts for Kafka message production and OSCF compliance data parsing.
    • Added a robust message processing system with error handling, retries, and dead-letter queue support.
    • Added automated entrypoint script for streamlined dispatcher startup.
  • Documentation

    • Added comprehensive README for the dispatcher, usage instructions, and configuration details.
    • Introduced architecture documentation for event streaming and message schemas.
    • Included sample messages and integration scripts for testing and demonstration.
  • Bug Fixes

    • Improved Docker build context by updating .dockerignore files to exclude unnecessary files and directories.
  • Tests

    • Added integration tests for dispatcher CLI, backend API, and Kafka connectivity.
  • Chores

    • Added project metadata and dependency management files.

Copy link
Contributor

coderabbitai bot commented Mar 7, 2025

Warning

Rate limit exceeded

@ab-smith has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 36 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between d4cb71c and d0c8ba5.

📒 Files selected for processing (3)
  • dispatcher/dispatcher.py (1 hunks)
  • dispatcher/tests/integration/test_messages.py (1 hunks)
  • dispatcher/utils/kafka.py (1 hunks)

Walkthrough

This update introduces a new Kafka-based dispatcher component to the system, integrating event-driven automation with the CISO Assistant REST API. The dispatcher is implemented as a Python CLI tool, with commands for configuration, authentication, and consuming Kafka messages. It supports message-driven updates to API resources and file attachments, using a modular message registry and schema validation. The dispatcher can be run as a standalone service or containerized, with Docker and Docker Compose configurations provided. Supporting documentation, configuration scripts, sample Kafka and message files, and integration tests are included. Backend API filtering capabilities are also enhanced for more granular resource selection.

Changes

File(s) / Path(s) Change Summary
backend/core/views.py Added new filter fields to AppliedControlFilterSet and RequirementAssessmentViewSet for enhanced API filtering capabilities.
backend/.dockerignore Added .venv to Docker ignore list.
config/make_config.py Extended get_config() with Kafka dispatcher configuration prompts; updated Docker Compose generation to support dispatcher; minor formatting changes.
config/templates/docker-compose-postgresql-caddy.yml.j2, config/templates/docker-compose-postgresql-traefik.yml.j2, config/templates/docker-compose-sqlite-caddy.yml.j2, config/templates/docker-compose-sqlite-traefik.yml.j2 Added optional dispatcher service to Docker Compose templates, with environment variables for API, Kafka, authentication, and S3, conditional on dispatcher enablement.
dispatcher/.dockerignore Added ignore rules for system, environment, editor, cache, and build files to reduce Docker build context.
dispatcher/.gitignore Added ignore rules for Python artifacts, virtual environments, editor/project files, test/coverage reports, and various tool-specific files.
dispatcher/Dockerfile Added Dockerfile for dispatcher service using Python 3.12-slim, installing dependencies with uv, and setting up entrypoint.
dispatcher/README.md Added comprehensive documentation for dispatcher usage, configuration, supported message types, and deployment.
dispatcher/dispatcher.py Implemented CLI tool with commands for authentication, consuming Kafka messages, and configuration initialization. Handles message dispatching and error reporting.
dispatcher/entrypoint.sh Added shell script to automate dispatcher setup: config initialization, authentication, and message consumption.
dispatcher/filtering.py Added process_selector function for filtering API objects based on selectors, supporting pagination and key remapping.
dispatcher/messages.py Implemented message registry, update and upload handlers for API resources, file attachment logic, and message registration.
dispatcher/pyproject.toml Added project metadata and dependency definitions for dispatcher, including runtime and development dependencies.
dispatcher/settings.py Added configuration management supporting YAML files and environment variables, with CLI for interactive/non-interactive setup and token retrieval.
dispatcher/utils/api.py Added API utility functions for authenticated requests, session management, and endpoint construction.
dispatcher/utils/kafka.py Added Kafka configuration builder supporting authentication and connection settings.
dispatcher/data/schemas/commands/applied_control/update_applied_control_v1.json Added JSON schema for "update_applied_control" command messages.
dispatcher/data/schemas/commands/requirement_assessment/update_requirement_assessment_v1.json Added JSON schema for "update_requirement_assessment" command messages.
dispatcher/data/schemas/commands/evidence/upload_attachment_v1.json Added JSON schema for "upload_attachment" command messages, supporting file content and S3 references.
dispatcher/samples/kafka/redpanda-basic.yml Added Redpanda-based Kafka+console Docker Compose sample for quickstart.
dispatcher/samples/kafka/zk-single-kafka-single.yml Added Zookeeper+Kafka single-node Docker Compose sample for local testing.
dispatcher/samples/messages/update_requirement_assessment.json Added sample JSON message for updating a requirement assessment.
dispatcher/tests/integration/test_messages.py Added integration tests for dispatcher CLI, backend API, and Kafka using testcontainers.
integration/basic.py Added script to send a Kafka message for updating compliance assessment requirements.
integration/oscf_parser.py Added script to parse OSCF JSON, extract compliance data, and send Kafka messages for automation.
documentation/architecture/event-streaming.md Added documentation describing event streaming architecture, Kafka consumer design, message flow, error handling, and configuration.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant DispatcherCLI
    participant Kafka
    participant MessageRegistry
    participant API
    participant S3

    User->>DispatcherCLI: Run "dispatcher.py consume"
    DispatcherCLI->>Kafka: Subscribe to "observation" topic
    Kafka-->>DispatcherCLI: Delivers message
    DispatcherCLI->>MessageRegistry: Dispatch message by type
    alt Update API resource
        MessageRegistry->>API: GET (filter by selector)
        API-->>MessageRegistry: Return matching object(s)
        MessageRegistry->>API: PATCH (update object(s))
        API-->>MessageRegistry: Return updated object(s)
    else Upload attachment (file)
        alt File in base64
            MessageRegistry->>API: POST evidence object
            MessageRegistry->>API: PUT file content
        else File in S3
            MessageRegistry->>S3: Download file
            MessageRegistry->>API: POST evidence object
            MessageRegistry->>API: PUT file content
        end
        MessageRegistry->>API: PATCH applied controls with evidence
    end
    Note over DispatcherCLI: On error, send to Kafka error topic
Loading

Suggested reviewers

  • eric-intuitem

Poem

🐇
A hop, a skip, a message sent,
Through Kafka streams the bytes are bent.
The dispatcher listens, keen and bright,
Updating controls in the dead of night.
Attachments fly, selectors sift,
Automation’s now a rabbit’s gift!
With Docker, config, and tests in tow—
Off to compliance, we all go!


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@ab-smith ab-smith changed the title feat: event streaming feat: kafka events consumer Mar 16, 2025
@nas-tabchiche
Copy link
Contributor Author

@coderabbitai full review

Copy link
Contributor

coderabbitai bot commented Apr 17, 2025

✅ Actions performed

Full review triggered.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

♻️ Duplicate comments (22)
dispatcher/settings.py (1)

63-64: Trace‑level log leaks secrets

dispatcher/pyproject.toml (1)

1-7: ⚠️ Potential issue

Add PEP 517/518 build-system table
A build-system section is required for PEP 517-compliant installers (e.g., poetry-core). Without it, tools like pip cannot build this project:

+[build-system]
+requires = ["poetry-core>=1.5.0"]
+build-backend = "poetry.core.masonry.api"
dispatcher/Dockerfile (2)

12-13: Add checksum verification for the UV installer
Verify the SHA256 checksum of /uv-installer.sh to mitigate supply-chain risks:

 ADD https://astral.sh/uv/0.6.9/install.sh /uv-installer.sh
-RUN sh /uv-installer.sh && rm /uv-installer.sh
+RUN echo "a4dcbca1c075e9a91a61c32856d215d835f3d24fdbba54b3c8e0035b5d0839a8  /uv-installer.sh" | sha256sum -c - \
+ && sh /uv-installer.sh \
+ && rm /uv-installer.sh

18-18: Exclude local files with .dockerignore
Optimize build context and avoid leaking sensitive files by adding a .dockerignore (e.g., exclude .env, __pycache__, .git).

dispatcher/utils/kafka.py (1)

13-31: 🛠️ Refactor suggestion

Authentication mechanism support could be expanded

The current implementation only supports SASL_PLAIN which has security limitations. A previous review suggested adding support for SCRAM-SHA-512 and logging, which hasn't been implemented yet.

Enhance authentication options and add logging:

     if use_auth:
+        import logging
+        logger = logging.getLogger(__name__)
+        
         if settings.KAFKA_SASL_MECHANISM.upper() == "PLAIN":
+            logger.info("Configuring Kafka with SASL_PLAIN authentication")
             cfg.update(
                 {
                     "security_protocol": "SASL_SSL",
                     "sasl_mechanism": "PLAIN",
                     "sasl_plain_username": settings.KAFKA_USERNAME,
                     "sasl_plain_password": settings.KAFKA_PASSWORD,
                     # optional SSL cert files:
                     # "ssl_cafile": settings.KAFKA_SSL_CAFILE,
                     # "ssl_certfile": settings.KAFKA_SSL_CERTFILE,
                     # "ssl_keyfile": settings.KAFKA_SSL_KEYFILE,
                 }
             )
+        elif settings.KAFKA_SASL_MECHANISM.upper() == "SCRAM-SHA-512":
+            logger.info("Configuring Kafka with SCRAM-SHA-512 authentication")
+            cfg.update(
+                {
+                    "security_protocol": "SASL_SSL",
+                    "sasl_mechanism": "SCRAM-SHA-512",
+                    "sasl_plain_username": settings.KAFKA_USERNAME,
+                    "sasl_plain_password": settings.KAFKA_PASSWORD,
+                }
+            )
         else:
+            logger.error(f"Unsupported SASL mechanism: {settings.KAFKA_SASL_MECHANISM}")
             raise ValueError(
                 f"Unsupported SASL mechanism: {settings.KAFKA_SASL_MECHANISM}"
             )
integration/basic.py (1)

31-50: ⚠️ Potential issue

Add error handling for Kafka connection failures

The function lacks try-except blocks to handle potential connection failures or message production errors.

Consider adding proper error handling:

 def send_kafka_message():
+    import sys
+    try:
         # Create producer configuration
         conf = {"bootstrap.servers": BOOTSTRAP_SERVERS}

         # Create Producer instance
         producer = Producer(conf)

         print(f"Connected to Kafka at {BOOTSTRAP_SERVERS}")
         print(f"Sending message to topic {TOPIC}:")
         print(json.dumps(MESSAGE, indent=2))

         # Produce the message
         producer.produce(
             topic=TOPIC, value=json.dumps(MESSAGE).encode("utf-8"), callback=delivery_report
         )

         # Wait for any outstanding messages to be delivered
         producer.flush()
         print("Message sent successfully")
+    except Exception as e:
+        print(f"Failed to send message: {e}")
+        sys.exit(1)
dispatcher/samples/kafka/zk-single-kafka-single.yml (1)

24-26: Document the multi-listener Kafka configuration.

The Kafka listener configuration is complex with three different listeners (INTERNAL, EXTERNAL, DOCKER). Without documentation, users might struggle to understand the purpose of each listener and how to properly connect to them.

The multi-listener setup needs documentation explaining:

  1. Purpose of each listener (INTERNAL, EXTERNAL, DOCKER)
  2. Platform compatibility considerations (especially for host.docker.internal)
  3. Which listener the dispatcher should use in different deployment scenarios
dispatcher/data/schemas/commands/evidence/upload_attachment_v1.json (1)

26-29: Add max length constraint to file_content field

The file_content field doesn't have a maximum size limit, which could lead to very large messages that may impact performance.

Add a maxLength constraint to prevent oversized messages:

     "file_content": {
       "type": "string",
-      "description": "Base64 encoded content of the file"
+      "description": "Base64 encoded content of the file",
+      "maxLength": 10485760
     },
config/templates/docker-compose-postgresql-traefik.yml.j2 (3)

158-160: Clarify build comment section

The commented build section suggests local building is possible but isn't currently used, which could confuse users.

Either remove the commented section or add a comment explaining when it should be used:

-    # build:
-    #   context: ../dispatcher
+    # Uncomment to build from source instead of using pre-built image
+    # build:
+    #   context: ../dispatcher

172-178: Consider using Docker secrets for sensitive credentials

Storing authentication credentials directly in environment variables makes them visible in configuration files and process lists, creating a security risk.

Consider using Docker secrets for sensitive values like credentials and tokens:

      {% if kafka_dispatcher.authentication == 'credentials' %}
-      - USER_EMAIL={{ kafka_dispatcher.credentials.user_email }}
-      - USER_PASSWORD={{ kafka_dispatcher.credentials.user_password }}
+      - USER_EMAIL_FILE=/run/secrets/dispatcher_user_email
+      - USER_PASSWORD_FILE=/run/secrets/dispatcher_user_password
      - AUTO_RENEW_SESSION={{ kafka_dispatcher.auto_renew_session }}
      {% elif kafka_dispatcher.authentication == 'token' %}
-      - TOKEN={{ kafka_dispatcher.token }}
+      - TOKEN_FILE=/run/secrets/dispatcher_token
      {% endif %}

You would also need to add a secrets section to the Docker Compose file.


157-158: 🛠️ Refactor suggestion

Avoid using 'latest' tag in production environments

Using the latest tag can lead to unpredictable deployments as new versions are released without warning. This may cause unexpected behavior or compatibility issues.

Consider using a specific version tag instead:

-    image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
+    image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:1.0.0  # Replace with appropriate version
config/templates/docker-compose-postgresql-caddy.yml.j2 (3)

134-136: Clarify build comment section

The commented build section suggests local building is possible but isn't currently used, which could confuse users.

Either remove the commented section or add a comment explaining when it should be used:

-    # build:
-    #   context: ../dispatcher
+    # Uncomment to build from source instead of using pre-built image
+    # build:
+    #   context: ../dispatcher

148-154: Consider using Docker secrets for sensitive credentials

Storing authentication credentials directly in environment variables makes them visible in configuration files and process lists, creating a security risk.

Consider using Docker secrets for sensitive values like credentials and tokens:

      {% if kafka_dispatcher.authentication == 'credentials' %}
-      - USER_EMAIL={{ kafka_dispatcher.credentials.user_email }}
-      - USER_PASSWORD={{ kafka_dispatcher.credentials.user_password }}
+      - USER_EMAIL_FILE=/run/secrets/dispatcher_user_email
+      - USER_PASSWORD_FILE=/run/secrets/dispatcher_user_password
      - AUTO_RENEW_SESSION={{ kafka_dispatcher.auto_renew_session }}
      {% elif kafka_dispatcher.authentication == 'token' %}
-      - TOKEN={{ kafka_dispatcher.token }}
+      - TOKEN_FILE=/run/secrets/dispatcher_token
      {% endif %}

You would also need to add a secrets section to the Docker Compose file.


133-134: 🛠️ Refactor suggestion

Avoid using 'latest' tag in production environments

Using the latest tag can lead to unpredictable deployments as new versions are released without warning. This may cause unexpected behavior or compatibility issues.

Consider using a specific version tag instead:

-    image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
+    image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:1.0.0  # Replace with appropriate version
dispatcher/utils/api.py (1)

2-2: ⚠️ Potential issue

Use absolute import path for settings.

The current relative import might cause issues when the module is imported from a different directory.

 import requests
-from settings import API_URL, get_access_token
+from dispatcher.settings import API_URL, get_access_token
documentation/architecture/event-streaming.md (1)

85-85: ⚠️ Potential issue

Complete the Schema Evolution and Backwards Compatibility section.

The "TBD" placeholder needs to be replaced with concrete guidance on how schema changes should be handled.

This section should include:

  • Use a central Schema Registry with versioning
  • Define compatibility policies (backward, forward, full/transitive)
  • Guidelines for schema evolution (adding fields with defaults, using optional fields)
  • Best practices for handling schema changes
  • Testing and verification procedures
integration/oscf_parser.py (1)

126-140: 🛠️ Refactor suggestion

De‑duplication logic duplicated – extract helper function

The exact “build unique_messages dict” block appears twice (lines 126‑140 and 220‑235).
We already suggested extracting it in an earlier review; the duplication remains.

Consider:

def prepare_messages(compliance_data):
    unique = {}
    for item in compliance_data:
        result = map_status_to_result(item["status_code"])
        key = f"{item['ref_id']}:{result}"
        unique[key] = {"ref_id": item["ref_id"], "result": result}
    return unique

and use unique_messages = prepare_messages(compliance_data) in both places.

Also applies to: 220-235

dispatcher/dispatcher.py (1)

145-151: ⚠️ Potential issue

Avoid logging full backend responses – may leak sensitive data

We previously flagged that logging e.response.text can expose credentials or PII.
The exact pattern is still present.

-logger.error(
-    f"Request failed with status code {e.response.status_code} and message: {e.response.text}"
-)
+logger.error(
+    "Request failed with status code {}", e.response.status_code
+)
+logger.debug("Response body suppressed to avoid leaking sensitive data")
dispatcher/messages.py (4)

7-13: ⚠️ Potential issue

Duplicate & ambiguous import of settings

The module already imports symbols from .settings (line 7).
import settings (line 11) shadows those names with a potentially different module (the project‑root settings.py) and keeps the redundancy flagged in the previous review.

-from .settings import API_URL, S3_URL, VERIFY_CERTIFICATE, get_access_token
-
-from loguru import logger
-
-import settings
+from .settings import (
+    API_URL,
+    S3_URL,
+    VERIFY_CERTIFICATE,
+    S3_ACCESS_KEY,
+    S3_SECRET_KEY,
+    get_access_token,
+)
+
+from loguru import logger

208-213: ⚠️ Potential issue

S3 credentials still pulled from wrong module

Even after removing the redundant import settings, the code will look for S3_ACCESS_KEY and S3_SECRET_KEY on the now‑missing top‑level module, breaking S3 authentication.

-            endpoint_url=S3_URL,
-            key=settings.S3_ACCESS_KEY,
-            secret=settings.S3_SECRET_KEY,
+            endpoint_url=S3_URL,
+            key=S3_ACCESS_KEY,
+            secret=S3_SECRET_KEY,

330-346: 🛠️ Refactor suggestion

Missing standard headers & 204‑response handling

  1. The GET/PATCH calls omit Accept/Content‑Type headers and reuse low‑level requests instead of utils.api, diverging from other helper‑calls.
  2. update_response.json() will raise on a 204 No‑Content response.
-            get_response = api.get(
-                control_endpoint,
-                headers={"Authorization": f"Token {get_access_token()}"},
-                verify=VERIFY_CERTIFICATE,
-            )
+            get_response = api.get(
+                control_endpoint,
+                headers=get_api_headers(),
+                verify=VERIFY_CERTIFICATE,
+            )
 ...
-            update_response = api.patch(
-                control_endpoint,
-                json={"evidences": [e.get("id") for e in evidences] + [evidence_id]},
-                headers={"Authorization": f"Token {get_access_token()}"},
-                verify=VERIFY_CERTIFICATE,
-            )
+            update_response = api.patch(
+                control_endpoint,
+                json={"evidences": [e.get("id") for e in evidences] + [evidence_id]},
+                headers=get_api_headers(),
+                verify=VERIFY_CERTIFICATE,
+            )
             if not update_response.ok:
 ...
-            logger.success(
-                "Updated applied control",
-                control=control,
-                response=update_response.json(),
-            )
+            logger.success(
+                "Updated applied control",
+                control=control,
+                status=update_response.status_code,
+            )

251-257: 🛠️ Refactor suggestion

Prefer json= instead of data= when creating objects

api.post() currently receives data= which sends a raw string and omits the Content‑Type: application/json header unless the helper adds it implicitly. Use json= for correctness and consistency with other calls.

-response = api.post(
-    objects_endpoint,
-    data={"name": values.get("name", name)},
+response = api.post(
+    objects_endpoint,
+    json={"name": values.get("name", name)},
🧹 Nitpick comments (28)
dispatcher/.gitignore (1)

1-6: Re‑evaluate ignoring the dispatcher YAML configs

.dispatcher_config.yaml (line 5) is ignored.
This prevents committing project‑wide, non‑secret defaults that teammates/container images might rely on.
If the file is meant to contain secrets, keep it ignored; otherwise add a clear “sample” (e.g. dispatcher_config.sample.yaml) to version control so users know the expected schema.

config/make_config.py (1)

146-149: Default use_s3 = True may be surprising

The confirmation prompt defaults to True. Most users won’t have S3 handy on first run; consider defaulting to False to minimise friction.

dispatcher/entrypoint.sh (1)

6-6: Use exec for final command to forward signals
Replace the final invocation with exec uv run dispatcher.py consume so the Python process replaces the shell and receives signals directly:

-uv run dispatcher.py consume
+exec uv run dispatcher.py consume
dispatcher/pyproject.toml (1)

17-22: Consider using project.optional-dependencies for dev deps
PEP 621 specifies [project.optional-dependencies] instead of [dependency-groups]. If you intend to adhere to PEP 621, update accordingly or confirm your build tool supports dependency-groups.

dispatcher/data/schemas/commands/applied_control/update_applied_control_v1.json (1)

1-25: Enforce strict schema by disallowing extra fields
Add "additionalProperties": false at the root to prevent unexpected keys in command messages:

 {
+  "additionalProperties": false,
   "$schema": "http://json-schema.org/draft-07/schema#",
dispatcher/Dockerfile (1)

8-11: Merge apt-get commands to reduce image layers
Combine apt-get update, installs, and cleanup into a single RUN to optimize layer count:

-RUN apt-get update && apt-get install -y \
-  libyaml-cpp-dev
-RUN apt-get install -y --no-install-recommends curl ca-certificates
+RUN apt-get update && apt-get install -y --no-install-recommends \
+    libyaml-cpp-dev \
+    curl \
+    ca-certificates \
+ && rm -rf /var/lib/apt/lists/*
integration/basic.py (2)

1-19: Message structure looks good but could be more configurable

The configuration and message structure are well-defined, but consider making the message content configurable through command line arguments or environment variables for greater flexibility.

 # Configuration
 BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:19092")
 TOPIC = "observation"
+MESSAGE_TYPE = os.environ.get("KAFKA_MESSAGE_TYPE", "update_requirement_assessment")
+COMPLIANCE_REF = os.environ.get("COMPLIANCE_REF", "ISO_001")
+REQUIREMENT_REF = os.environ.get("REQUIREMENT_REF", "A.5.1")
+RESULT_VALUE = os.environ.get("RESULT_VALUE", "compliant")

 # The message to send
 MESSAGE = {
-    "message_type": "update_requirement_assessment",
+    "message_type": MESSAGE_TYPE,
     "selector": {
-        "compliance_assessment__ref_id": "ISO_001",
-        "requirement__ref_id": "A.5.1",
+        "compliance_assessment__ref_id": COMPLIANCE_REF,
+        "requirement__ref_id": REQUIREMENT_REF,
     },
-    "values": {"result": "compliant"},
+    "values": {"result": RESULT_VALUE},
 }

21-29: Delivery report callback is well-implemented

The callback function for message delivery reports is clear and provides useful feedback. Note that this function appears to be duplicated from integration/oscf_parser.py.

Consider refactoring the duplicate delivery_report function into a shared utility module to avoid code duplication.

dispatcher/data/schemas/commands/requirement_assessment/update_requirement_assessment_v1.json (1)

1-25: Schema structure is well-defined but could benefit from examples

The JSON schema correctly defines the structure for the "update_requirement_assessment" message type, with proper type constraints and required fields. The previous review suggestions have been incorporated, including the enum for message_type and description field.

Consider adding examples to help users understand the expected format:

 {
   "$schema": "http://json-schema.org/draft-07/schema#",
   "title": "update_requirement_assessment",
   "type": "object",
   "properties": {
     "message_type": {
       "type": "string",
       "enum": ["update_requirement_assessment"],
       "description": "Identifies the message type for routing"
     },
     "selector": {
       "type": "object",
+      "description": "Properties used to uniquely identify the requirement assessment",
       "additionalProperties": {
         "type": "string"
       }
     },
     "values": {
       "type": "object",
+      "description": "Properties and values to update on the requirement assessment",
       "additionalProperties": {
         "type": ["string", "number", "boolean", "null"]
       }
     }
   },
-  "required": ["message_type", "selector", "values"]
+  "required": ["message_type", "selector", "values"],
+  "examples": [
+    {
+      "message_type": "update_requirement_assessment",
+      "selector": {
+        "compliance_assessment__ref_id": "ISO_001",
+        "requirement__ref_id": "A.5.1"
+      },
+      "values": {"result": "compliant", "comment": "Implemented as required", "score": 1}
+    }
+  ]
 }

The schema correctly validates against Draft-07 and includes appropriate constraints for each field.

dispatcher/samples/kafka/zk-single-kafka-single.yml (1)

30-32: Add comment about replication factor limitations.

The configuration uses a replication factor of 1, which is appropriate for a single-node setup but would be a single point of failure in production environments.

      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
+     # Replication factor of 1 is only appropriate for development/testing
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
config/templates/docker-compose-sqlite-caddy.yml.j2 (1)

114-119: Consider using Docker secrets for sensitive credentials.

Storing passwords and tokens as environment variables exposes them in container inspection and logs, posing a security risk.

For production environments, consider using Docker secrets or external secret management solutions for sensitive values like:

  • USER_PASSWORD
  • TOKEN
  • S3_ACCESS_KEY
  • S3_SECRET_KEY

Example using Docker secrets:

secrets:
  dispatcher_password:
    external: true

services:
  dispatcher:
    # ... other configuration
    secrets:
      - dispatcher_password
    environment:
      - USER_PASSWORD_FILE=/run/secrets/dispatcher_password
config/templates/docker-compose-sqlite-traefik.yml.j2 (2)

120-122: Clarify build comment section

The commented build section suggests local building is possible but isn't currently used, which could confuse users.

Either remove the commented section or add a comment explaining when it should be used:

-    # build:
-    #   context: ../dispatcher
+    # Uncomment to build from source instead of using pre-built image
+    # build:
+    #   context: ../dispatcher

135-140: Consider using Docker secrets for sensitive credentials

Storing authentication credentials directly in environment variables makes them visible in configuration files and process lists, creating a security risk.

Consider using Docker secrets for sensitive values like credentials and tokens:

      {% if kafka_dispatcher.authentication == 'credentials' %}
-      - USER_EMAIL={{ kafka_dispatcher.credentials.user_email }}
-      - USER_PASSWORD={{ kafka_dispatcher.credentials.user_password }}
+      - USER_EMAIL_FILE=/run/secrets/dispatcher_user_email
+      - USER_PASSWORD_FILE=/run/secrets/dispatcher_user_password
      - AUTO_RENEW_SESSION={{ kafka_dispatcher.auto_renew_session }}
      {% elif kafka_dispatcher.authentication == 'token' %}
-      - TOKEN={{ kafka_dispatcher.token }}
+      - TOKEN_FILE=/run/secrets/dispatcher_token
      {% endif %}

You would also need to add a secrets section to the Docker Compose file.

dispatcher/data/schemas/commands/evidence/upload_attachment_v1.json (1)

11-16: Add descriptive documentation for selector object

The selector object lacks descriptive documentation about its purpose and usage.

Enhance the schema with more descriptive documentation:

     "selector": {
       "type": "object",
+      "description": "Criteria to select the target resource(s) for the attachment upload",
       "additionalProperties": {
         "type": "string"
       }
dispatcher/samples/kafka/redpanda-basic.yml (2)

31-31: Pin Redpanda image to a specific digest for enhanced reproducibility.

While you've pinned to a specific version (v25.1.1), for production-critical infrastructure, consider using image digests for even more deterministic builds.

-    image: docker.redpanda.com/redpandadata/redpanda:v25.1.1
+    image: docker.redpanda.com/redpandadata/redpanda:v25.1.1@sha256:<digest-value>

27-30: Document potential production adjustments needed.

While this configuration is appropriate for development, production environments would need adjustments to resource allocation, security settings, and persistence configuration.

Add a comment explaining:

      # Mode dev-container uses well-known configuration properties for development in containers.
      - --mode dev-container
      # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
      - --smp 1
      - --default-log-level=info
+     # NOTE: For production deployment, consider:
+     # - Increasing SMP/CPU allocation
+     # - Configuring appropriate authentication/authorization
+     # - Adjusting log levels
+     # - Using replicated volumes for data persistence
dispatcher/utils/api.py (1)

14-19: Consider refactoring HTTP methods to reduce code duplication.

All five HTTP methods follow the same pattern with timeout handling. Consider refactoring to reduce duplication.

+def _request(method, url, **kwargs):
+    """Base request method with timeout handling."""
+    if "timeout" not in kwargs:
+        kwargs["timeout"] = 30  # Default timeout of 30 seconds
+    response = getattr(session, method)(url, **kwargs)
+    response.raise_for_status()
+    return response
+
 def get(url, **kwargs):
-    if "timeout" not in kwargs:
-        kwargs["timeout"] = 30  # Default timeout of 30 seconds
-    response = session.get(url, **kwargs)
-    response.raise_for_status()
-    return response
+    return _request("get", url, **kwargs)

 def post(url, **kwargs):
-    if "timeout" not in kwargs:
-        kwargs["timeout"] = 30  # Default timeout of 30 seconds
-    response = session.post(url, **kwargs)
-    response.raise_for_status()
-    return response
+    return _request("post", url, **kwargs)

# Same pattern for patch, put, delete methods

Also applies to: 22-27, 30-35, 38-43, 46-51

documentation/architecture/event-streaming.md (1)

66-70: Fix formatting in common fields list.

There's loose punctuation in the common fields list that should be fixed for consistency.

-The following fields are common to all commands:
-
-- `message_type: string`: Use case to trigger (e.g. `update_applied_control_status`, `update_requirement_assessment_status`, `attach_evidence_to_applied_control`...)
-- `version: int`: Schema version
-- `selector: object`: Key-value pairs to pass to the API as filters (e.g. `{"ref_id": "ID.AM-01", "folder": "global/mydomain", "target": "single"}`)
+The following fields are common to all commands:
+
+- `message_type: string`: Use case to trigger (e.g., `update_applied_control_status`, `update_requirement_assessment_status`, `attach_evidence_to_applied_control`...)
+- `version: int`: Schema version
+- `selector: object`: Key-value pairs to pass to the API as filters (e.g., `{"ref_id": "ID.AM-01", "folder": "global/mydomain", "target": "single"}`)
🧰 Tools
🪛 LanguageTool

[uncategorized] ~68-~68: Loose punctuation mark.
Context: ... all commands: - message_type: string: Use case to trigger (e.g. `update_appli...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~69-~69: Loose punctuation mark.
Context: ...to_applied_control...) - version: int: Schema version - selector: object`: Ke...

(UNLIKELY_OPENING_PUNCTUATION)

dispatcher/filtering.py (1)

61-70: Improve error logging for failed API requests.

While the error handling includes status code information, it could be enhanced with more context.

        except requests.exceptions.RequestException as e:
            logger.error(
-                "Search failed",
+                f"Search failed: {str(e)}",
+                endpoint=next_url,
+                selector=query_params if next_url == endpoint else {},
                response=e.response,
                status_code=e.response.status_code
                if hasattr(e, "response") and e.response
                else None,
            )
            raise
dispatcher/tests/integration/test_messages.py (2)

1-1: Remove unused uuid import

uuid is never referenced in the module and should be removed to keep the import list clean and avoid Ruff F401 violations.

-import uuid
🧰 Tools
🪛 Ruff (0.8.2)

1-1: uuid imported but unused

Remove unused import: uuid

(F401)


28-44: Collapse nested context‑managers into a single with

Testcontainers’ DockerImage already returns an object usable in a with block.
Nesting two consecutive with statements makes the code harder to read and triggers Ruff SIM117. You can combine them like this:

-    with DockerImage(path=BACKEND_DIR) as image:
-        with (
-            DockerContainer(image=str(image))
-            ...
-            .with_exposed_ports(8000) as container
-        ):
+    with DockerImage(path=BACKEND_DIR) as image, (
+        DockerContainer(image=str(image))
+        .with_env("ALLOWED_HOSTS", "backend,localhost")
+        .with_env("DJANGO_DEBUG", "True")
+        .with_env("DJANGO_SUPERUSER_EMAIL", DJANGO_SUPERUSER_EMAIL)
+        .with_env("DJANGO_SUPERUSER_PASSWORD", DJANGO_SUPERUSER_PASSWORD)
+        .with_volume_mapping(str(BACKEND_DIR / "db"), "/code/db", "rw")
+        .with_exposed_ports(8000)
+    ) as container:
         wait_for_logs(container, "Booting worker with pid")
         yield container
🧰 Tools
🪛 Ruff (0.8.2)

30-43: Use a single with statement with multiple contexts instead of nested with statements

Combine with statements

(SIM117)

integration/oscf_parser.py (2)

4-5: Drop unused sys import

sys is never used; remove it to silence Ruff F401.

-import sys
🧰 Tools
🪛 Ruff (0.8.2)

4-4: sys imported but unused

Remove unused import: sys

(F401)


142-160: Loop variable key is unused

Inside for key, data in unique_messages.items(): the variable key is never referenced.
Use an underscore to signal intentional discard:

-for key, data in unique_messages.items():
+for _, data in unique_messages.items():
🧰 Tools
🪛 Ruff (0.8.2)

142-142: Loop control variable key not used within loop body

(B007)

dispatcher/dispatcher.py (2)

1-1: Remove unused os import

os is not referenced – delete to satisfy Ruff F401.

-import os
🧰 Tools
🪛 Ruff (0.8.2)

1-1: os imported but unused

Remove unused import: os

(F401)


93-95: Hard‑coded group_id – make it configurable

Using "my-group" ties every consumer instance to the same consumer‑group and prevents flexible deployments. Consider sourcing the group‑id from settings.py or a CLI flag.

-group_id="my-group",
+group_id=settings.CONSUMER_GROUP_ID,
dispatcher/README.md (2)

21-31: Missing “Installation” section referenced by anchor

Line 23 links to #installation, but no “Installation” header exists in the document. Clicking the link will scroll to the top instead of the intended instructions.
Add an explicit “Installation” section or update the anchor to the existing “Running the dispatcher as a CLI tool” header.

-This project uses `uv` for dependency management. If you haven't installed uv yet, follow the instructions on its [documentation](https://docs.astral.sh/uv/).
+## Installation
+
+This project uses `uv` for dependency management. If you haven't installed uv yet, follow the instructions on its [documentation](https://docs.astral.sh/uv/).
+
+```bash
+pip install uv             # Optional fallback for environments without uv pre‑installed
+```

84-104: Bullet list renders incorrectly (“Loose punctuation mark”)

The “Fields:” blocks are rendered as a single paragraph because there is no blank line before the nested list items.
Insert a blank line after “Fields:” (and before the first -) for each of the three message‑type sections so Markdown parsers render proper bullet lists.

dispatcher/messages.py (1)

286-289: Avoid loading the entire file into memory

file_obj.read() reads the whole attachment into RAM before posting. For large evidence files this can exhaust memory; requests can stream a file object directly.

-    response = api.post(
-        endpoint,
-        headers=get_api_headers(extra_headers=extra_headers),
-        data=file_obj.read(),
+    # Let requests stream the file; no explicit read() needed
+    response = api.post(
+        endpoint,
+        headers=get_api_headers(extra_headers=extra_headers),
+        data=file_obj,
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b2cfa07 and 8fb4548.

⛔ Files ignored due to path filters (1)
  • dispatcher/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (29)
  • backend/.dockerignore (1 hunks)
  • backend/core/views.py (2 hunks)
  • config/make_config.py (3 hunks)
  • config/templates/docker-compose-postgresql-caddy.yml.j2 (1 hunks)
  • config/templates/docker-compose-postgresql-traefik.yml.j2 (1 hunks)
  • config/templates/docker-compose-sqlite-caddy.yml.j2 (1 hunks)
  • config/templates/docker-compose-sqlite-traefik.yml.j2 (1 hunks)
  • dispatcher/.dockerignore (1 hunks)
  • dispatcher/.gitignore (1 hunks)
  • dispatcher/Dockerfile (1 hunks)
  • dispatcher/README.md (1 hunks)
  • dispatcher/data/schemas/commands/applied_control/update_applied_control_v1.json (1 hunks)
  • dispatcher/data/schemas/commands/evidence/upload_attachment_v1.json (1 hunks)
  • dispatcher/data/schemas/commands/requirement_assessment/update_requirement_assessment_v1.json (1 hunks)
  • dispatcher/dispatcher.py (1 hunks)
  • dispatcher/entrypoint.sh (1 hunks)
  • dispatcher/filtering.py (1 hunks)
  • dispatcher/messages.py (1 hunks)
  • dispatcher/pyproject.toml (1 hunks)
  • dispatcher/samples/kafka/redpanda-basic.yml (1 hunks)
  • dispatcher/samples/kafka/zk-single-kafka-single.yml (1 hunks)
  • dispatcher/samples/messages/update_requirement_assessment.json (1 hunks)
  • dispatcher/settings.py (1 hunks)
  • dispatcher/tests/integration/test_messages.py (1 hunks)
  • dispatcher/utils/api.py (1 hunks)
  • dispatcher/utils/kafka.py (1 hunks)
  • documentation/architecture/event-streaming.md (1 hunks)
  • integration/basic.py (1 hunks)
  • integration/oscf_parser.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
integration/basic.py (1)
integration/oscf_parser.py (1)
  • delivery_report (14-21)
dispatcher/entrypoint.sh (1)
dispatcher/dispatcher.py (1)
  • consume (82-193)
dispatcher/utils/api.py (1)
dispatcher/settings.py (1)
  • get_access_token (269-287)
integration/oscf_parser.py (3)
integration/basic.py (1)
  • delivery_report (21-28)
backend/ebios_rm/models.py (1)
  • ref_id (668-669)
backend/core/views.py (1)
  • result (4650-4651)
🪛 LanguageTool
documentation/architecture/event-streaming.md

[uncategorized] ~68-~68: Loose punctuation mark.
Context: ... all commands: - message_type: string: Use case to trigger (e.g. `update_appli...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~69-~69: Loose punctuation mark.
Context: ...to_applied_control...) - version: int: Schema version - selector: object`: Ke...

(UNLIKELY_OPENING_PUNCTUATION)

dispatcher/README.md

[uncategorized] ~191-~191: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~192-~192: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~212-~212: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~213-~213: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~234-~234: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~236-~236: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~238-~238: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)

🪛 Ruff (0.8.2)
dispatcher/tests/integration/test_messages.py

1-1: uuid imported but unused

Remove unused import: uuid

(F401)


30-43: Use a single with statement with multiple contexts instead of nested with statements

Combine with statements

(SIM117)

dispatcher/dispatcher.py

1-1: os imported but unused

Remove unused import: os

(F401)

integration/oscf_parser.py

4-4: sys imported but unused

Remove unused import: sys

(F401)


68-69: Use a single if statement instead of nested if statements

(SIM102)


142-142: Loop control variable key not used within loop body

(B007)

🪛 markdownlint-cli2 (0.17.2)
dispatcher/README.md

10-10: Link fragments should be valid
null

(MD051, link-fragments)

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: enterprise-startup-functional-test (3.12)
  • GitHub Check: startup-functional-test (3.12)
  • GitHub Check: functional-tests (3.12, chromium)
  • GitHub Check: startup-docker-compose-test
  • GitHub Check: enterprise-functional-tests (3.12, chromium)
  • GitHub Check: test (3.12)
  • GitHub Check: build (3.12)
  • GitHub Check: enterprise-startup-docker-compose-test
🔇 Additional comments (24)
backend/.dockerignore (1)

7-7: 👍 .venv exclusion is a good addition

Excluding the virtual‑environment directory keeps the Docker build context slim and prevents leaking host‑specific binaries into the image.
No further action required.

config/make_config.py (1)

200-202: Stripping all blank lines can break YAML folding

filtered = [line for line in lines if line.strip() != ""] removes intentional blank lines and can mangle literal blocks (| / >) or anchors in YAML.
Safer alternative: collapse sequences of > 2 blank lines instead of deleting every empty line.

dispatcher/samples/messages/update_requirement_assessment.json (1)

1-11: Approve sample message format
The JSON sample aligns with the registered message_type, correctly demonstrates the selector filters, and provides a valid values payload.

dispatcher/pyproject.toml (1)

8-15: Dependencies look good
Runtime dependencies are well-constrained and align with the dispatcher’s requirements.

dispatcher/Dockerfile (1)

23-23: ENTRYPOINT in exec form is appropriate
The exec-form ENTRYPOINT ["uv", "run", "bash", "entrypoint.sh"] correctly replaces PID 1 and forwards signals.

dispatcher/utils/kafka.py (3)

1-3: Import looks good!

You have correctly used the absolute import style as recommended, addressing a previous suggestion.


4-12: Well-structured function signature and documentation

The function has a clear signature with proper type hints and a helpful docstring explaining its purpose.


32-33: Return statement is clean and concise

The function properly returns the constructed configuration dictionary.

integration/basic.py (1)

52-54: Entry point is clean and straightforward

The script's entry point correctly calls the main function when run directly.

dispatcher/.dockerignore (1)

1-21: Well-structured .dockerignore file

The .dockerignore file follows Docker best practices by excluding unnecessary files and directories from the build context. This helps optimize Docker build performance and security by preventing sensitive files, development artifacts, and large directories from being included in the image.

The file covers all common patterns that should be excluded:

  • System and temporary files (*.DS_Store, ~$, *.bak)
  • Environment and virtual environment directories (.env, .venv, venv)
  • Editor-specific folders (.vscode)
  • Database files and secrets (*.sqlite3, django_secret_key)
  • Build artifacts and dependencies (node_modules, dist)
  • Application-specific directories (backend/profiles, etc.)
dispatcher/samples/kafka/zk-single-kafka-single.yml (1)

1-2: Source attribution is good practice.

Clear attribution to the original source with license information is a good practice. This helps users understand the origin and licensing terms of the configuration.

config/templates/docker-compose-sqlite-caddy.yml.j2 (1)

95-102: Good practice: Conditional service inclusion.

Using Jinja2 conditionals to include the dispatcher service only when enabled follows good configuration management practices, making the deployment flexible.

backend/core/views.py (2)

1317-1317: Add filtering by ref_id in AppliedControlFilterSet
Including "ref_id" in the Meta.fields of AppliedControlFilterSet enables direct filtering on control reference IDs, aligning with the rest of the API’s filter patterns.


4562-4569: Enable nested field filtering for RequirementAssessmentViewSet
The filterset_fields list now supports "folder__name", "requirement__ref_id", "compliance_assessment__ref_id", and "compliance_assessment__assets__ref_id", allowing clients to filter assessments by these related fields. Please ensure you add or update tests to cover these new filter options and validate their behavior.

dispatcher/samples/kafka/redpanda-basic.yml (1)

1-65: Well-structured Redpanda Docker Compose setup for development.

This Docker Compose configuration provides a good starting point for development with a single-node Redpanda Kafka environment. The configuration includes detailed comments explaining the purpose of each setting, which is excellent for developer understanding.

dispatcher/utils/api.py (3)

7-11: Thorough token validation in update_session_token.

Good implementation of token validation that raises a clear error message when authentication fails.


54-65: Good implementation of the header construction function.

The function properly handles default parameters, validates the token, and combines headers appropriately.


68-72: Well-designed API endpoint construction.

The make_api_endpoint function handles slash formatting properly and ensures the trailing slash needed by the API.

documentation/architecture/event-streaming.md (1)

1-136: Well-structured architecture document for event streaming.

This document provides a comprehensive overview of the Kafka-based event streaming architecture, covering components, message flow, error handling, and configuration. The detailed sections on message schemas, error handling, and configuration parameters will be valuable for developers.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~68-~68: Loose punctuation mark.
Context: ... all commands: - message_type: string: Use case to trigger (e.g. `update_appli...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~69-~69: Loose punctuation mark.
Context: ...to_applied_control...) - version: int: Schema version - selector: object`: Ke...

(UNLIKELY_OPENING_PUNCTUATION)

dispatcher/filtering.py (4)

32-33: Properly handling mutable default argument.

Good implementation that avoids the common Python pitfall of mutable default arguments by using None and initializing within the function body.


54-60: Good API call implementation with timeout.

The API call now includes a timeout parameter and uses the api utility module for consistent request handling.


97-107: Thorough ID field validation.

Good implementation that checks for missing ID fields in both single and multiple result scenarios, with appropriate error logging.


84-96: Clear error messages for result count validation.

The error messages clearly explain the issue when the number of results doesn't match the expected target, helping developers understand and fix selector issues.

dispatcher/tests/integration/test_messages.py (1)

11-12: Verify that dispatcher.cli is importable from the package

The tests import the package (import dispatcher as ds) and assume it exposes cli.
Unless dispatcher/__init__.py re‑exports cli (e.g. from .dispatcher import cli),
ds.cli will raise AttributeError, causing these tests to fail.

Please make sure dispatcher/__init__.py contains:

from .dispatcher import cli  # re‑export CLI for tests
__all__ = ["cli"]

or change the tests to from dispatcher.dispatcher import cli.

Also applies to: 66-72

Comment on lines 246 to 251
creds = config.get("credentials", {})
if not (creds.get("token") or (creds.get("email") and creds.get("password"))):
logger.error(
"Missing credentials in configuration. Please set USER_EMAIL and USER_PASSWORD via environment variables or in the config file.",
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Log‑and‑continue may mask fatal mis‑configuration

After logging “Missing credentials”, the code continues, producing empty strings that will fail later at runtime.
Prefer to abort early:

-    logger.error(
-        "Missing credentials in configuration. Please set USER_EMAIL and USER_PASSWORD via environment variables or in the config file.",
-    )
+    logger.error(
+        "Missing credentials or token. Set USER_TOKEN or USER_EMAIL + USER_PASSWORD.",
+    )
+    sys.exit(1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
creds = config.get("credentials", {})
if not (creds.get("token") or (creds.get("email") and creds.get("password"))):
logger.error(
"Missing credentials in configuration. Please set USER_EMAIL and USER_PASSWORD via environment variables or in the config file.",
)
creds = config.get("credentials", {})
if not (creds.get("token") or (creds.get("email") and creds.get("password"))):
logger.error(
"Missing credentials or token. Set USER_TOKEN or USER_EMAIL + USER_PASSWORD.",
)
sys.exit(1)

Comment on lines +40 to +57
"debug": os.getenv("DEBUG", "").lower() == "true" or None,
"rest": {
"url": os.getenv("API_URL"),
"verify_certificate": os.getenv("VERIFY_CERTIFICATE") == "True" or None,
},
"credentials": {
"token": os.getenv("USER_TOKEN"),
"email": os.getenv("USER_EMAIL"),
"password": os.getenv("USER_PASSWORD"),
},
"kafka": {
"use_auth": os.getenv("KAFKA_USE_AUTH") == "True" or None,
"sasl_mechanism": os.getenv("KAFKA_SASL_MECHANISM"),
"sasl_plain_username": os.getenv("KAFKA_USERNAME"),
"sasl_plain_password": os.getenv("KAFKA_PASSWORD"),
},
"auto_renew_session": os.getenv("AUTO_RENEW_SESSION") == "True" or None,
"bootstrap_servers": os.getenv("BOOTSTRAP_SERVERS"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Boolean env‑var parsing loses false/disabled states

Pattern os.getenv("VERIFY_CERTIFICATE") == "True" or None collapses "False" and “missing” into None, so the user cannot turn features off via env‑vars.

-        "verify_certificate": os.getenv("VERIFY_CERTIFICATE") == "True" or None,
+        "verify_certificate": (
+            None
+            if (v := os.getenv("VERIFY_CERTIFICATE")) is None
+            else v.lower() == "true"
+        ),

Replicate for KAFKA_USE_AUTH, AUTO_RENEW_SESSION, etc.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"debug": os.getenv("DEBUG", "").lower() == "true" or None,
"rest": {
"url": os.getenv("API_URL"),
"verify_certificate": os.getenv("VERIFY_CERTIFICATE") == "True" or None,
},
"credentials": {
"token": os.getenv("USER_TOKEN"),
"email": os.getenv("USER_EMAIL"),
"password": os.getenv("USER_PASSWORD"),
},
"kafka": {
"use_auth": os.getenv("KAFKA_USE_AUTH") == "True" or None,
"sasl_mechanism": os.getenv("KAFKA_SASL_MECHANISM"),
"sasl_plain_username": os.getenv("KAFKA_USERNAME"),
"sasl_plain_password": os.getenv("KAFKA_PASSWORD"),
},
"auto_renew_session": os.getenv("AUTO_RENEW_SESSION") == "True" or None,
"bootstrap_servers": os.getenv("BOOTSTRAP_SERVERS"),
"rest": {
"url": os.getenv("API_URL"),
- "verify_certificate": os.getenv("VERIFY_CERTIFICATE") == "True" or None,
+ "verify_certificate": (
+ None
+ if (v := os.getenv("VERIFY_CERTIFICATE")) is None
+ else v.lower() == "true"
+ ),
},

Comment on lines +35 to +36
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Evaluate security implications of permissive ACL settings.

The configuration enables ACL authorization but allows everyone if no ACL is found, which creates a permissive security model. This might be suitable for development but could pose security risks in production.

Consider adding documentation about:

  1. Security implications of this setting
  2. Recommended changes for production deployments
  3. How to configure proper ACLs for the dispatcher service

🏁 Script executed:

#!/bin/bash
# Check if there's any security documentation for the Kafka setup
grep -r --include="*.md" "KAFKA_AUTHORIZER_CLASS_NAME" dispatcher/
grep -r --include="*.md" "KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND" dispatcher/
grep -r --include="*.md" "security" --include="*.md" "kafka" dispatcher/

Length of output: 252


🏁 Script executed:

#!/bin/bash
# Search entire repo for documentation referencing Kafka authorizer and ACL settings
grep -R --include="*.md" --include="*.rst" "KAFKA_AUTHORIZER_CLASS_NAME" .
grep -R --include="*.md" --include="*.rst" "KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND" .
grep -R -i --include="*.md" --include="*.rst" "security.*kafka" .

Length of output: 226


Add Security Documentation for Kafka ACL Settings

The current sample config in dispatcher/samples/kafka/zk-single-kafka-single.yml (lines 35–36) enables ACL authorization but falls back to “allow everyone” when no ACL is found. This is fine for development, but poses a security risk in production.

Please add or update documentation to cover:

  • Security implications of KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
  • Recommended production settings (e.g. setting it to "false" and defining strict ACLs)
  • A step‑by‑step guide for configuring ACLs for the dispatcher service

You can create a new markdown (or rst) under docs/dispatcher/ (for example, kafka-security.md) or append to an existing Kafka setup guide.

Comment on lines +98 to +99
image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
# build:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using 'latest' tag in production deployments.

Using the 'latest' tag can lead to unexpected changes and inconsistent deployments, making version tracking and rollbacks difficult.

    container_name: dispatcher
-   image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
+   image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:${DISPATCHER_VERSION:-latest}
    # build:
    #   context: ../dispatcher

Consider either:

  1. Pinning to a specific version tag
  2. Using an environment variable with a default value as shown in the diff
  3. Using the same version tagging strategy used for other services in this file
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
# build:
container_name: dispatcher
image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:${DISPATCHER_VERSION:-latest}
# build:
# context: ../dispatcher

Comment on lines +119 to +120
image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
# build:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using 'latest' tag in production environments

Using the latest tag can lead to unpredictable deployments as new versions are released without warning. This may cause unexpected behavior or compatibility issues.

Consider using a specific version tag instead:

-    image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
+    image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:1.0.0  # Replace with appropriate version
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
# build:
image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:1.0.0 # Replace with appropriate version
# build:

Comment on lines +5 to +50
"properties": {
"message_type": {
"type": "string",
"enum": ["upload_attachment"],
"description": "Identifies the message type for routing"
},
"selector": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"values": {
"type": "object",
"properties": {
"file_name": {
"type": "string",
"minLength": 1,
"maxLength": 255,
"description": "Name of the file to be uploaded"
},
"file_content": {
"type": "string",
"description": "Base64 encoded content of the file"
},
"file_s3_bucket": {
"type": "string",
"description": "S3 bucket name where the file is stored"
},
"applied_controls": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
},
"oneOf": [
{
"required": ["file_content", "file_name"]
},
{
"required": ["file_s3_bucket", "file_name"]
}
]
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make message_type and selector required fields

The schema doesn't explicitly specify which root-level properties are required, which could lead to validation issues.

Add a required array to enforce necessary fields:

   "type": "object",
+  "required": ["message_type", "selector", "values"],
   "properties": {
     "message_type": {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"properties": {
"message_type": {
"type": "string",
"enum": ["upload_attachment"],
"description": "Identifies the message type for routing"
},
"selector": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"values": {
"type": "object",
"properties": {
"file_name": {
"type": "string",
"minLength": 1,
"maxLength": 255,
"description": "Name of the file to be uploaded"
},
"file_content": {
"type": "string",
"description": "Base64 encoded content of the file"
},
"file_s3_bucket": {
"type": "string",
"description": "S3 bucket name where the file is stored"
},
"applied_controls": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
},
"oneOf": [
{
"required": ["file_content", "file_name"]
},
{
"required": ["file_s3_bucket", "file_name"]
}
]
}
}
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["message_type", "selector", "values"],
"properties": {
"message_type": {
"type": "string",
"enum": ["upload_attachment"],
"description": "Identifies the message type for routing"
},
"selector": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"values": {
"type": "object",
"properties": {
"file_name": {
"type": "string",
"minLength": 1,
"maxLength": 255,
"description": "Name of the file to be uploaded"
},
"file_content": {
"type": "string",
"description": "Base64 encoded content of the file"
},
"file_s3_bucket": {
"type": "string",
"description": "S3 bucket name where the file is stored"
},
"applied_controls": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
},
"oneOf": [
{
"required": ["file_content", "file_name"]
},
{
"required": ["file_s3_bucket", "file_name"]
}
]
}
}
}

Comment on lines +65 to +66
status_code = node.get("status_code")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against None status codes

map_status_to_result calls status_code.lower(), which raises AttributeError if status_code is None. A missing OSCF status_code will currently crash the script.

-def map_status_to_result(status_code):
+def map_status_to_result(status_code: str | None):
     ...
-    return status_mapping.get(status_code.lower(), "not_applicable")
+    if not status_code:
+        return "not_applicable"
+    return status_mapping.get(status_code.lower(), "not_applicable")

Also applies to: 99-110

@nas-tabchiche nas-tabchiche marked this pull request as ready for review April 18, 2025 13:37
@nas-tabchiche nas-tabchiche requested a review from ab-smith April 18, 2025 13:37
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (4)
dispatcher/settings.py (4)

43-43: Boolean env-var parsing loses false/disabled states

The pattern os.getenv("VERIFY_CERTIFICATE") == "True" or None collapses "False" and "missing" into None, so users cannot turn features off via env vars.

-            "verify_certificate": os.getenv("VERIFY_CERTIFICATE") == "True" or None,
+            "verify_certificate": (
+                None
+                if (v := os.getenv("VERIFY_CERTIFICATE")) is None
+                else v.lower() in ("true", "1", "yes")
+            ),

51-56: Boolean env-var parsing is inconsistent across config

The same pattern that makes boolean env-var parsing lose false states applies to multiple boolean settings.

-            "use_auth": os.getenv("KAFKA_USE_AUTH") == "True" or None,
+            "use_auth": (
+                None
+                if (v := os.getenv("KAFKA_USE_AUTH")) is None
+                else v.lower() in ("true", "1", "yes")
+            ),

Apply similar changes to AUTO_RENEW_SESSION as well.


63-64: Trace-level log leaks secrets

logger.trace("Loaded environment configuration", config=config) prints the entire config dict, including passwords and tokens, whenever TRACE is enabled. Mask sensitive keys or drop the dump.

-logger.trace("Loaded environment configuration", config=config)
+logger.trace("Loaded environment configuration (secrets masked)")

246-250: Log-and-continue masks fatal mis-configuration

After logging "Missing credentials", the code continues, producing empty strings that will fail later at runtime.
Prefer to abort early:

creds = config.get("credentials", {})
if not (creds.get("token") or (creds.get("email") and creds.get("password"))):
-    logger.warning(
-        "Missing credentials in configuration. Please set USER_EMAIL and USER_PASSWORD via environment variables or in the config file.",
-    )
+    logger.error(
+        "Missing credentials or token. Set USER_TOKEN or USER_EMAIL + USER_PASSWORD.",
+    )
+    sys.exit(1)
🧹 Nitpick comments (2)
dispatcher/settings.py (2)

169-181: Kafka SASL mechanism fixed to PLAIN with limited user choice

The code hardcodes kafka_sasl_mechanism to "PLAIN" and then uses a match statement that only handles this one case, which makes the match statement unnecessary.

-        kafka_sasl_mechanism = "PLAIN"  # OAUTHBEARER will be available in the future
+        kafka_sasl_mechanism = click.prompt(
+            "Enter SASL mechanism (PLAIN supported, OAUTHBEARER coming soon)",
+            type=click.Choice(["PLAIN"], case_sensitive=False),
+            default="PLAIN"
+        )
         kafka_username = None
         kafka_password = None
-        match kafka_sasl_mechanism:
-            case "PLAIN":
-                kafka_username = click.prompt(
-                    "Enter your Kafka username",
-                )
-                kafka_password = click.prompt(
-                    "Enter your Kafka password",
-                    hide_input=True,
-                )
+        if kafka_sasl_mechanism.upper() == "PLAIN":
+            kafka_username = click.prompt(
+                "Enter your Kafka username",
+            )
+            kafka_password = click.prompt(
+                "Enter your Kafka password",
+                hide_input=True,
+            )

269-287: Inconsistent error reporting in get_access_token

The function uses logger.error for file errors but click.echo for missing authentication data. This inconsistency complicates log monitoring.

    else:
-        click.echo("Authentication data not found.", err=True)
+        logger.error("Authentication data not found.")
        return None
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e232d8 and 8847fe0.

📒 Files selected for processing (4)
  • dispatcher/dispatcher.py (1 hunks)
  • dispatcher/messages.py (1 hunks)
  • dispatcher/settings.py (1 hunks)
  • dispatcher/utils/kafka.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • dispatcher/utils/kafka.py
  • dispatcher/dispatcher.py
  • dispatcher/messages.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: startup-functional-test (3.12)
  • GitHub Check: enterprise-startup-functional-test (3.12)
  • GitHub Check: enterprise-startup-docker-compose-test
  • GitHub Check: functional-tests (3.12, chromium)
  • GitHub Check: enterprise-functional-tests (3.12, chromium)
  • GitHub Check: startup-docker-compose-test

Comment on lines +84 to +85
logger.trace("Merged configuration", config=result)
return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Trace-level log leaks secrets in merged configuration

Similar to line 63, this log statement prints the entire merged configuration which includes sensitive credentials.

-    logger.trace("Merged configuration", config=result)
+    # Create a copy with sensitive info masked for logging
+    log_safe_config = result.copy()
+    if "credentials" in log_safe_config:
+        log_safe_config["credentials"] = {k: "***" if k in ("password", "token") else v 
+                                        for k, v in log_safe_config["credentials"].items()}
+    if "kafka" in log_safe_config and "sasl_plain_password" in log_safe_config["kafka"]:
+        log_safe_config["kafka"]["sasl_plain_password"] = "***"
+    if "s3_secret_key" in log_safe_config:
+        log_safe_config["s3_secret_key"] = "***"
+    logger.trace("Merged configuration (secrets masked)", config=log_safe_config)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.trace("Merged configuration", config=result)
return result
# Create a copy with sensitive info masked for logging
log_safe_config = result.copy()
if "credentials" in log_safe_config:
log_safe_config["credentials"] = {
k: "***" if k in ("password", "token") else v
for k, v in log_safe_config["credentials"].items()
}
if "kafka" in log_safe_config and "sasl_plain_password" in log_safe_config["kafka"]:
log_safe_config["kafka"]["sasl_plain_password"] = "***"
if "s3_secret_key" in log_safe_config:
log_safe_config["s3_secret_key"] = "***"
logger.trace("Merged configuration (secrets masked)", config=log_safe_config)
return result

Comment on lines +88 to +98
def save_yaml_config(config, config_path=DEFAULT_CONFIG_PATH):
"""Save the configuration to a YAML file."""
try:
with open(config_path, "w") as file:
yaml.safe_dump(config, file, default_flow_style=False, sort_keys=False)
logger.info(
f"Configuration saved to {config_path}. Please update it with your credentials."
)
except Exception as e:
logger.error(f"Error saving config file: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Error handling in save_yaml_config is incomplete

The function doesn't return any status or raise an exception after logging the error, so callers can't determine if the save operation succeeded.

def save_yaml_config(config, config_path=DEFAULT_CONFIG_PATH):
    """Save the configuration to a YAML file."""
    try:
        with open(config_path, "w") as file:
            yaml.safe_dump(config, file, default_flow_style=False, sort_keys=False)
        logger.info(
            f"Configuration saved to {config_path}. Please update it with your credentials."
        )
+        return True
    except Exception as e:
-        logger.error(f"Error saving config file: {e}")
+        logger.error(f"Error saving config file: {e}")
+        return False

Also, for consistency with load_yaml_config, consider using Path:

-    try:
-        with open(config_path, "w") as file:
+    try:
+        path = Path(config_path)
+        with path.open("w") as file:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def save_yaml_config(config, config_path=DEFAULT_CONFIG_PATH):
"""Save the configuration to a YAML file."""
try:
with open(config_path, "w") as file:
yaml.safe_dump(config, file, default_flow_style=False, sort_keys=False)
logger.info(
f"Configuration saved to {config_path}. Please update it with your credentials."
)
except Exception as e:
logger.error(f"Error saving config file: {e}")
def save_yaml_config(config, config_path=DEFAULT_CONFIG_PATH):
"""Save the configuration to a YAML file."""
try:
path = Path(config_path)
with path.open("w") as file:
yaml.safe_dump(config, file, default_flow_style=False, sort_keys=False)
logger.info(
f"Configuration saved to {config_path}. Please update it with your credentials."
)
return True
except Exception as e:
logger.error(f"Error saving config file: {e}")
return False

"Missing credentials in configuration. Please set USER_EMAIL and USER_PASSWORD via environment variables or in the config file.",
)

DEBUG = config.get("debug", False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

DEBUG boolean interpretation is inconsistent with its definition

Line 40 defines debug as os.getenv("DEBUG", "").lower() == "true" or None, but here we're using a different default.

-DEBUG = config.get("debug", False)
+# Ensure consistent behavior with how debug was loaded from env
+DEBUG = bool(config.get("debug"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DEBUG = config.get("debug", False)
# Ensure consistent behavior with how debug was loaded from env
DEBUG = bool(config.get("debug"))

"""Load configuration values from environment variables."""
# Note: For booleans, we compare to the string "True"
config = {
"debug": os.getenv("DEBUG", "").lower() == "true" or None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Debug logic is inconsistent with other boolean parsings

The debug setting uses lower() == "true", but other boolean settings in the same function use == "True" with case sensitivity. This inconsistency could lead to unexpected behavior.

-        "debug": os.getenv("DEBUG", "").lower() == "true" or None,
+        "debug": os.getenv("DEBUG", "True") == "True" or None,

Better yet, apply consistent parsing for all boolean environment variables:

-        "debug": os.getenv("DEBUG", "").lower() == "true" or None,
+        "debug": os.getenv("DEBUG", "").lower() in ("true", "1", "yes") or None,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"debug": os.getenv("DEBUG", "").lower() == "true" or None,
"debug": os.getenv("DEBUG", "True") == "True" or None,

Comment on lines 186 to 197
"Enter the S3 storage URL (e.g., http://localhost:9000)",
default=os.getenv("S3_URL", "http://localhost:9000"),
)
s3_access_key = click.prompt(
"Enter your S3 access key (leave blank if you are using pre-signed URLs to authenticate requests to your S3 storage)",
)
s3_secret_key = None
if s3_access_key:
s3_secret_key = click.prompt(
"Enter your S3 secret key",
hide_input=True,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

S3 credentials lack defaults from environment variables

Unlike other settings, S3 access and secret keys don't use environment variables as defaults in interactive mode.

        s3_access_key = click.prompt(
            "Enter your S3 access key (leave blank if you are using pre-signed URLs to authenticate requests to your S3 storage)",
+            default=os.getenv("S3_ACCESS_KEY", ""),
        )
        s3_secret_key = None
        if s3_access_key:
            s3_secret_key = click.prompt(
                "Enter your S3 secret key",
                hide_input=True,
+                default=os.getenv("S3_SECRET_KEY", ""),
            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"Enter the S3 storage URL (e.g., http://localhost:9000)",
default=os.getenv("S3_URL", "http://localhost:9000"),
)
s3_access_key = click.prompt(
"Enter your S3 access key (leave blank if you are using pre-signed URLs to authenticate requests to your S3 storage)",
)
s3_secret_key = None
if s3_access_key:
s3_secret_key = click.prompt(
"Enter your S3 secret key",
hide_input=True,
)
"Enter the S3 storage URL (e.g., http://localhost:9000)",
default=os.getenv("S3_URL", "http://localhost:9000"),
)
s3_access_key = click.prompt(
"Enter your S3 access key (leave blank if you are using pre-signed URLs to authenticate requests to your S3 storage)",
default=os.getenv("S3_ACCESS_KEY", ""),
)
s3_secret_key = None
if s3_access_key:
s3_secret_key = click.prompt(
"Enter your S3 secret key",
hide_input=True,
default=os.getenv("S3_SECRET_KEY", ""),
)

Comment on lines +10 to +12
log_message_format = (
"<green>{time}</green> | <level>{level}</level> | <level>{message}</level>"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Defined log format is unused

log_message_format is defined but never applied to configure the logger.


🏁 Script executed:

#!/bin/bash
# Check if log_message_format is used elsewhere
rg -l "log_message_format" --type py

Length of output: 84


🏁 Script executed:

#!/bin/bash
# Show context where log_message_format is referenced in dispatcher.py
rg -n -C3 "log_message_format" dispatcher/dispatcher.py

Length of output: 465


Centralize log_message_format Usage
The format defined in dispatcher/settings.py (lines 10–12) is never used—dispatcher/dispatcher.py defines its own copy on lines 21–23. To avoid duplication and keep your logging configuration in one place:

• In dispatcher/dispatcher.py (lines 21–23), remove the local definition and import the one from settings.py
• Use the imported format when calling logger.add(...)
• Move any DEBUG‐only extension (+= " | <magenta>{extra}</magenta>") into settings.py if you want it applied globally

Suggested change in dispatcher/dispatcher.py:

--- a/dispatcher/dispatcher.py
+++ b/dispatcher/dispatcher.py
@@ -21,7 +21,7 @@
- log_message_format = (
-     "<green>{time}</green> | <level>{level}</level> | <level>{message}</level>"
- )
+ from dispatcher.settings import log_message_format

  if settings.DEBUG:
      log_message_format += " | <magenta>{extra}</magenta>"

This will ensure you’re using the single source of truth in settings.py and remove the unused definition there.

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
dispatcher/README.md (5)

12-43: Unify CLI invocation method

Within this section you alternate between using uv run dispatcher.py (lines 35–36, 41–42) and plain python dispatcher.py later in the README. This inconsistency may trip up users who aren’t familiar with the uv tool. Consider choosing one invocation pattern and applying it consistently throughout the document, or explicitly note that both are valid entry points (with guidance on when to use each).


44-75: Enhance Docker run example with configuration mounting

The docker run example (lines 68–75) shows environment variables but does not demonstrate how to supply a pre‑configured file (e.g. .dispatcher_config). If a user prefers file‑based configuration, they’ll need to mount their config into the container. Consider adding a -v flag example:

 docker run -d \
   --name ciso-assistant-dispatcher \
+  -v $(pwd)/.dispatcher_config:/app/.dispatcher_config \
   -e API_URL=https://localhost:8443 \
   ... 

Also, you could include -e ERRORS_TOPIC=errors to mirror the full set of env vars listed later.


77-104: Specify default config file name and code‑fence language

In “Configuration” (line 79) you mention a .dispatcher_config file but never state its default filename or path. Please clarify:

  • What is the config filename and where is it created (e.g. ~/.dispatcher_config.yaml or project root)?
  • Should it have a specific extension?

Additionally, the environment variables block (lines 86–104) is shown in a fenced code block without a language hint. For better syntax highlighting, change the fence to bash.


106-180: Consolidate “init-config” and authentication instructions

You show uv run dispatcher.py init-config -i (lines 35–36) then later python dispatcher.py init-config (lines 111–112), and similarly mix uv run with python dispatcher.py auth. This pattern could be confusing. I recommend grouping the “CLI usage” examples into a single “Usage” section with one consistent invocation style, e.g.:

# Interactive config
dispatcher init-config --interactive

# Authenticate
dispatcher auth --email foo@org --password bar

If you’re packaging dispatcher.py as a console script, you might also suggest installing via pip install -e . so users can run a dispatcher command directly.


182-308: Improve schema and JSON example formatting; add TOC

  • In the JSON examples, you already use ```json fences (good).
  • In “Selectors” (lines 281–284), the placeholder <CISO_ASSISTANT_API_URL> could be formatted as a code span: `<CISO_ASSISTANT_API_URL>` to avoid Markdown parsing issues.
  • The main README is long—consider adding a “Table of Contents” at the top to help readers jump to sections like Configuration, Usage, Messages reference, etc.

These are nice‑to‑have improvements to enhance overall readability.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~191-~191: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~192-~192: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~212-~212: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~213-~213: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~234-~234: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~236-~236: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~238-~238: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0d09bb0 and 51226dc.

📒 Files selected for processing (3)
  • dispatcher/README.md (1 hunks)
  • dispatcher/settings.py (1 hunks)
  • dispatcher/utils/kafka.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • dispatcher/utils/kafka.py
  • dispatcher/settings.py
🧰 Additional context used
🪛 LanguageTool
dispatcher/README.md

[uncategorized] ~191-~191: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~192-~192: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~212-~212: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~213-~213: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~234-~234: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~236-~236: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~238-~238: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)

🪛 markdownlint-cli2 (0.17.2)
dispatcher/README.md

10-10: Link fragments should be valid
null

(MD051, link-fragments)

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: startup-functional-test (3.12)
  • GitHub Check: startup-docker-compose-test
  • GitHub Check: functional-tests (3.12, chromium)
  • GitHub Check: enterprise-startup-functional-test (3.12)
  • GitHub Check: Analyze (python)
  • GitHub Check: test (3.12)
  • GitHub Check: enterprise-functional-tests (3.12, chromium)
  • GitHub Check: enterprise-startup-docker-compose-test
  • GitHub Check: build (3.12)

Comment on lines +1 to +11
# CISO Assistant Dispatcher

The **CISO Assistant Dispatcher** is a command-line tool that bridges event-driven messaging with the CISO Assistant API to orchestrate actions based on incoming Kafka messages.

## Prerequisites

- **Python 3.8+**
- A running Kafka cluster (can be any Kafka-compatible cluster, e.g. [Redpanda](https://redpanda.com/))
- Access to the CISO Assistant REST API
- Required Python packages (see [Installation](#installation) below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix broken Installation link and/or add an Installation section

The “Prerequisites” list refers to an “Installation” section (see [Installation](#installation)), but no such heading exists in this README. This will confuse readers and break the anchor link.

Please either:

  1. Rename “## Running the dispatcher as a CLI tool” to “## Installation” (or insert a new “## Installation” section above it), or
  2. Update the link to point to the correct section (e.g. “Running the dispatcher as a CLI tool”).

This change is needed to ensure that all cross‑references resolve correctly.

🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

10-10: Link fragments should be valid
null

(MD051, link-fragments)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
dispatcher/README.md (1)

10-10: Broken link: missing “Installation” section
The link [Installation](#installation) in the Prerequisites refers to a non-existent heading. Please either rename the “## Running the dispatcher as a CLI tool” section to “## Installation” or update the link to point to the correct section (e.g., #running-the-dispatcher-as-a-cli-tool).

🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

10-10: Link fragments should be valid
null

(MD051, link-fragments)

🧹 Nitpick comments (2)
dispatcher/README.md (2)

21-31: Recommend using a virtual environment
Consider adding a step to set up a Python virtual environment (e.g., python -m venv .venv and source .venv/bin/activate) before running uv sync to isolate dependencies and avoid polluting the global environment.


44-44: Terminology consistency in section title
Consider renaming “## Running the dispatcher as a dockerized service” to “## Running the dispatcher as a Docker container” to align with common terminology and improve reader comprehension.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 51226dc and 07532b9.

📒 Files selected for processing (2)
  • dispatcher/README.md (1 hunks)
  • dispatcher/settings.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • dispatcher/settings.py
🧰 Additional context used
🪛 LanguageTool
dispatcher/README.md

[uncategorized] ~194-~194: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~195-~195: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~215-~215: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~216-~216: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~240-~240: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~241-~241: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~242-~242: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)

🪛 markdownlint-cli2 (0.17.2)
dispatcher/README.md

10-10: Link fragments should be valid
null

(MD051, link-fragments)

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: startup-functional-test (3.12)
  • GitHub Check: Analyze (python)
  • GitHub Check: functional-tests (3.12, chromium)
  • GitHub Check: startup-docker-compose-test
  • GitHub Check: enterprise-startup-functional-test (3.12)
  • GitHub Check: enterprise-functional-tests (3.12, chromium)
  • GitHub Check: build (3.12)
  • GitHub Check: enterprise-startup-docker-compose-test
  • GitHub Check: test (3.12)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
dispatcher/README.md (1)

5-11: ⚠️ Potential issue

Fix broken Installation link
The prerequisites refer to an “Installation” section (see [Installation](#installation)), but no such heading exists. Please update the link or add an ## Installation section.

Here's a quick fix to point to the CLI setup section:

- - Required Python packages (see [Installation](#installation) below)
+ - Required Python packages (see [Running the dispatcher as a CLI tool](#running-the-dispatcher-as-a-cli-tool) below)
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

10-10: Link fragments should be valid
null

(MD051, link-fragments)

🧹 Nitpick comments (11)
dispatcher/README.md (11)

1-4: Clarify the intro for better readability
The opening description is solid, but consider splitting the single paragraph into two or adding a short bullet‑list overview of the dispatcher's key responsibilities (e.g., message consumption, schema validation, API orchestration) to improve scannability.


12-43: Separate installation from usage
The “Running the dispatcher as a CLI tool” section currently mixes installation (clone + dependency install) with execution steps (init‑config, consume). I recommend splitting this into two clear subsections—## Installation (clone + uv sync) and ## Usage – CLI mode (init‑config, consume)—to align with common README patterns.


44-76: Enhance Docker section with Compose example
You provide a Dockerfile build/run flow but mention Docker Compose templates elsewhere. Consider adding a snippet showing how to launch the dispatcher via docker-compose (e.g., docker-compose -f config/docker-compose.yml up -d dispatcher) to streamline container setup.


77-104: Document default values for environment variables
In the Environment variables reference, it would be helpful to note default values (if any) and indicate which variables are mandatory vs. optional. This will clarify what users must set vs. what the dispatcher can infer.


105-115: Clarify config file path and behavior
Mention exactly where the .dispatcher_config file is created (e.g., current working directory) and whether invoking init-config without -i will overwrite an existing file. You could also link to the schema in dispatcher/settings.py for advanced users.


129-161: Streamline authentication instructions
The two auth modes are clear, but labeling token support as “soon to be released” may confuse users about current functionality. Consider removing that note or marking it as alpha. Also, collapsing shared details (env vars vs. flags) into a table could make this section more concise.


164-172: Add S3 region/endpoint context
When configuring S3, users often need to specify region or custom endpoints. Either note that the URL may include the region or consider adding optional variables such as S3_REGION or S3_ENDPOINT if supported in dispatcher/utils/storage.py.


185-298: Link to JSON schema definitions
The message reference section is comprehensive. To improve discoverability, add hyperlinks to the actual JSON schema files under dispatcher/data/schemas/commands/ (e.g., link “update_applied_control” to its schema JSON). This ensures users can validate payloads against live schemas.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~194-~194: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~195-~195: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~215-~215: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~216-~216: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~240-~240: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~241-~241: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~242-~242: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)


282-303: Clarify selector defaults
You mention “Set to single if not specified.” It would be clearer to show the defaulting behavior in code or link directly to the logic in dispatcher/filtering.py. A brief code snippet would help users understand how the default is applied.


304-311: Document Docker Compose generation
Under “Deployment,” you reference make_config.py but don’t show how to actually run the generated Compose file. Including a short example—e.g.,

python config/make_config.py > docker-compose.yml
docker-compose up -d

—would accelerate onboarding.


312-316: Expand contributing guidelines
The note about appending /api when using runserver is helpful. You might also link to a top‑level CONTRIBUTING.md or include a brief checklist (code style, tests, issues) to encourage consistent community contributions.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 07532b9 and bbf7223.

📒 Files selected for processing (1)
  • dispatcher/README.md (1 hunks)
🧰 Additional context used
🪛 LanguageTool
dispatcher/README.md

[uncategorized] ~194-~194: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~195-~195: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~215-~215: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~216-~216: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~240-~240: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~241-~241: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~242-~242: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)

🪛 markdownlint-cli2 (0.17.2)
dispatcher/README.md

10-10: Link fragments should be valid
null

(MD051, link-fragments)

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: enterprise-startup-functional-test (3.12)
  • GitHub Check: enterprise-startup-docker-compose-test
  • GitHub Check: functional-tests (3.12, chromium)
  • GitHub Check: startup-functional-test (3.12)
  • GitHub Check: startup-docker-compose-test
  • GitHub Check: enterprise-functional-tests (3.12, chromium)
  • GitHub Check: build (3.12)
  • GitHub Check: test (3.12)
🔇 Additional comments (1)
dispatcher/README.md (1)

174-184: Usage snippet looks good
The example for python dispatcher.py consume is clear and concise—excellent!

@ab-smith
Copy link
Contributor

@coderabbitai full review

Copy link
Contributor

coderabbitai bot commented Apr 23, 2025

✅ Actions performed

Full review triggered.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

♻️ Duplicate comments (28)
dispatcher/pyproject.toml (1)

1-7: 🛠️ Refactor suggestion

Add PEP 517 build-system and CLI entrypoint
To support modern build tools and expose a console script, include these sections:

+[build-system]
+requires = ["poetry-core>=1.5.0"]
+build-backend = "poetry.core.masonry.api"
+
+[project.scripts]
+ciso-assistant-dispatcher = "dispatcher.dispatcher:main"
dispatcher/Dockerfile (1)

12-13: 🛠️ Refactor suggestion

Add checksum verification for the uv installer script
Verifying the installer’s SHA256 ensures you’re running untampered code. Apply this diff (replace <actual-sha256sum> with the real checksum):

 ADD https://astral.sh/uv/0.6.9/install.sh /uv-installer.sh
-RUN sh /uv-installer.sh && rm /uv-installer.sh
+RUN echo "<actual-sha256sum>  /uv-installer.sh" | sha256sum -c - && \
+    sh /uv-installer.sh && rm /uv-installer.sh
integration/basic.py (1)

31-50: Add error handling for Kafka connection failures.

The function lacks try-except blocks to handle potential connection failures or message production errors.

Consider adding proper error handling:

 def send_kafka_message():
-    # Create producer configuration
-    conf = {"bootstrap.servers": BOOTSTRAP_SERVERS}
-
-    # Create Producer instance
-    producer = Producer(conf)
-
-    print(f"Connected to Kafka at {BOOTSTRAP_SERVERS}")
-    print(f"Sending message to topic {TOPIC}:")
-    print(json.dumps(MESSAGE, indent=2))
-
-    # Produce the message
-    producer.produce(
-        topic=TOPIC, value=json.dumps(MESSAGE).encode("utf-8"), callback=delivery_report
-    )
-
-    # Wait for any outstanding messages to be delivered
-    producer.flush()
-    print("Message sent successfully")
+    try:
+        # Create producer configuration
+        conf = {"bootstrap.servers": BOOTSTRAP_SERVERS}
+
+        # Create Producer instance
+        producer = Producer(conf)
+
+        print(f"Connected to Kafka at {BOOTSTRAP_SERVERS}")
+        print(f"Sending message to topic {TOPIC}:")
+        print(json.dumps(MESSAGE, indent=2))
+
+        # Produce the message
+        producer.produce(
+            topic=TOPIC, value=json.dumps(MESSAGE).encode("utf-8"), callback=delivery_report
+        )
+
+        # Wait for any outstanding messages to be delivered
+        producer.flush()
+        print("Message sent successfully")
+    except Exception as e:
+        print(f"Failed to send message: {e}")
+        sys.exit(1)

Don't forget to add import sys at the top of the file.

dispatcher/utils/kafka.py (1)

1-1: Use absolute or explicit relative imports.

The current import style may lead to module resolution issues when the package structure changes.

Replace with an explicit import path:

-import settings
+from dispatcher import settings

Or, if within the same package:

-import settings
+from . import settings
dispatcher/samples/kafka/zk-single-kafka-single.yml (2)

24-26: Double-check Kafka listener configuration compatibility.

The multiple listener configuration (INTERNAL, EXTERNAL, DOCKER) is complex and might cause connectivity issues if not properly understood by users.

It would be beneficial to verify that these listener configurations work correctly with the dispatcher component. The DOCKER listener specifically might need documentation on how it's used.


35-36: Add Security Documentation for Kafka ACL Settings

The current sample config enables ACL authorization but falls back to "allow everyone" when no ACL is found. This is fine for development, but poses a security risk in production.

config/templates/docker-compose-sqlite-caddy.yml.j2 (2)

98-99: Avoid using 'latest' tag in production deployments.

Using the 'latest' tag can lead to unexpected changes and inconsistent deployments, making version tracking and rollbacks difficult.


95-128: Add health check for dispatcher service.

Unlike the other services, the dispatcher lacks a health check, which can impact reliability and proper orchestration.

config/templates/docker-compose-sqlite-traefik.yml.j2 (1)

119-120: Avoid using 'latest' tag in production environments

Using the latest tag can lead to unpredictable deployments as new versions are released without warning. This may cause unexpected behavior or compatibility issues.

dispatcher/data/schemas/commands/evidence/upload_attachment_v1.json (2)

5-50: Make message_type, selector, and values required fields

The schema doesn't explicitly specify which root-level properties are required, which could lead to validation issues.


26-29: 🛠️ Refactor suggestion

Add format validation for file_content field

The file_content field stores Base64-encoded file data but lacks format specification and size constraints.

        "file_content": {
          "type": "string",
-          "description": "Base64 encoded content of the file"
+          "description": "Base64 encoded content of the file",
+          "contentEncoding": "base64",
+          "maxLength": 10485760
        },

This change specifies that the content is Base64-encoded and limits the maximum size to approximately 10MB.

config/make_config.py (1)

88-168: ⚠️ Potential issue

Well-structured Kafka dispatcher configuration with minor S3 configuration issue.

The Kafka dispatcher configuration section provides comprehensive user prompts for all necessary settings, with reasonable defaults and clear explanations.

However, there's a potential issue with the S3 configuration when use_s3 is False. Currently, empty strings are initialized for S3 keys, which may cause issues in downstream templates expecting null values.

Apply this fix to ensure the templates don't encounter KeyError issues:

 else:
     config["kafka_dispatcher"] = {"enabled": False}
+    # Prevent KeyError in templates
+    config["kafka_dispatcher"].update(
+        {
+            "s3_url": None,
+            "s3_access_key": None,
+            "s3_secret_key": None,
+        }
+    )
dispatcher/utils/api.py (1)

1-3: ⚠️ Potential issue

Fix the relative import path.

The import statement uses a relative import for settings, which will cause issues when the module is imported from a different directory.

 import requests
-from settings import API_URL, get_access_token
+from dispatcher.settings import API_URL, get_access_token
documentation/architecture/event-streaming.md (1)

85-85: ⚠️ Potential issue

Complete the Avro schema evolution section with best practices.

The "TBD" placeholder for schema evolution should be completed with concrete guidance on handling schema changes while maintaining compatibility.

Based on best practices for Avro schema evolution, include:

  • Use a Schema Registry with versioning
  • Define compatibility policies (backward, forward, or full)
  • Add new fields with default values
  • Make fields optional using union types
  • Use aliases for renamed fields
  • Document schema changes and version history
  • Test schema resolution scenarios
integration/oscf_parser.py (2)

89-110: ⚠️ Potential issue

Fix potential AttributeError when status_code is None.

The map_status_to_result function will raise an AttributeError if status_code is None because it unconditionally calls status_code.lower().

-def map_status_to_result(status_code):
+def map_status_to_result(status_code: str | None):
     """
     Map the status code to a result value.
 
     Args:
         status_code: The status code from the OSCF file
 
     Returns:
         A result string (compliant, non_compliant, etc.)
     """
     # This mapping can be adjusted based on your specific requirements
     status_mapping = {
         "pass": "compliant",
         "fail": "non_compliant",
         "warning": "partially_compliant",
         "error": "not_applicable",
         # Add more mappings as needed
     }
 
     # Return the mapped value or a default
-    return status_mapping.get(status_code.lower(), "not_applicable")
+    if not status_code:
+        return "not_applicable"
+    return status_mapping.get(status_code.lower(), "not_applicable")

217-261: 🛠️ Refactor suggestion

Eliminate code duplication between dry-run and production modes.

There's significant duplication between the dry-run mode and production mode. The message preparation logic appears in both sections.

Extract the duplicate code into a separate function:

+def prepare_messages(compliance_data):
+    """
+    Prepare unique messages from compliance data.
+    
+    Args:
+        compliance_data: List of dictionaries with ref_id and status_code
+        
+    Returns:
+        Dictionary of unique messages
+    """
+    # De-duplicate messages based on combination of ref_id and result
+    unique_messages = {}
+    for item in compliance_data:
+        result = map_status_to_result(item["status_code"])
+
+        # Create a unique key based on both ref_id and result
+        key = f"{item['ref_id']}:{result}"
+
+        # Store the unique combination
+        unique_messages[key] = {"ref_id": item["ref_id"], "result": result}
+    
+    return unique_messages

 def send_kafka_messages(compliance_data, assessment_ref_id):
     # ...
-    # De-duplicate messages based on combination of ref_id and result
-    unique_messages = {}
-    for item in compliance_data:
-        result = map_status_to_result(item["status_code"])
-
-        # Create a unique key based on both ref_id and result
-        key = f"{item['ref_id']}:{result}"
-
-        # Store the unique combination
-        unique_messages[key] = {"ref_id": item["ref_id"], "result": result}
+    unique_messages = prepare_messages(compliance_data)
     
     # ...

 def main():
     # ...
     if args.dry_run:
         print("=== DRY RUN MODE (not sending to Kafka) ===")

-        # De-duplicate messages based on combination of ref_id and result
-        unique_messages = {}
-        for item in compliance_data:
-            result = map_status_to_result(item["status_code"])
-
-            # Create a unique key based on both ref_id and result
-            key = f"{item['ref_id']}:{result}"
-
-            # Store the unique combination
-            unique_messages[key] = {"ref_id": item["ref_id"], "result": result}
+        unique_messages = prepare_messages(compliance_data)
dispatcher/dispatcher.py (3)

21-34: Duplicate log‑format definition – please consolidate with settings.py.

The same log_message_format and logger.add(...) logic already exists in dispatcher/settings.py.
Keeping two sources of truth is error‑prone and has been highlighted in a prior review. Consider importing the format (and possibly the logger initialisation) from settings.py to avoid divergence.


145-150: Avoid logging full response bodies – potential PII / credential leak

Dumping e.response.text at error level can expose sensitive data (tokens, personal details) in shared logs.
Mask the body or switch to debug level as previously recommended.


140-169: Retry loop still unbounded – add max‑retries and back‑off

The while True construct keeps retrying indefinitely on every transient RequestException, which can hammer the backend.
Please cap retries (e.g. 3 attempts with exponential back‑off) and send the message to the error topic after the budget is exhausted.

dispatcher/README.md (1)

10-10: Broken “Installation” anchor

The text links to [Installation](#installation) but no such section exists. Rename an existing heading or update the link to avoid a 404 anchor.

🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

10-10: Link fragments should be valid
null

(MD051, link-fragments)

dispatcher/settings.py (3)

63-64: TRACE dump leaks secrets

Logging the entire env‑config dictionary at TRACE still exposes passwords/tokens. Mask sensitive keys as previously advised.


84-85: Merged‑config log leaks secrets

Same concern as above for the merged configuration.


88-98: save_yaml_config should return status / raise on failure

Prior review suggested returning True/False (or raising) so callers can react; still missing.

dispatcher/messages.py (5)

4-11: Fix package-relative imports and redundant settings import.

There are issues with the import statements that could cause problems:

  1. The imports from filtering and settings should use package-relative imports
  2. The settings module is imported twice (line 7 and line 11), which is redundant and could lead to confusion

Apply these changes to fix the import issues:

-from filtering import process_selector
+from .filtering import process_selector

-from settings import API_URL, S3_URL, VERIFY_CERTIFICATE, get_access_token
+from .settings import API_URL, S3_URL, VERIFY_CERTIFICATE, get_access_token

-import settings
+# Re-use the already imported symbols

308-315: Possible selector type mismatch in applied_controls_selector.

The code expects applied_controls_selector to be a dictionary, but if the API expects a single dict or mapping, passing a different structure could cause errors.

#!/bin/bash
# Check the type used in existing code for applied_controls selectors

# Search for any usages of applied_controls in selectors
rg -A 5 "applied_controls.*selector" --type py

147-148: Good fix for mutable default argument.

You've correctly handled the mutable default argument issue by using None as the default and initializing an empty dictionary inside the function body.


208-210: ⚠️ Potential issue

S3 credentials are read from the wrong module.

After replacing the settings import with a relative import, the settings.S3_ACCESS_KEY and settings.S3_SECRET_KEY references will be undefined.

-            key=settings.S3_ACCESS_KEY,
-            secret=settings.S3_SECRET_KEY,
+            key=S3_ACCESS_KEY,
+            secret=S3_SECRET_KEY,

328-346: 🛠️ Refactor suggestion

Use get_api_headers for consistency in API calls.

The function manually constructs authentication headers in both the GET and PATCH requests when it should use the imported get_api_headers utility.

-            get_response = api.get(
-                control_endpoint,
-                headers={"Authorization": f"Token {get_access_token()}"},
-                verify=VERIFY_CERTIFICATE,
-            )
+            get_response = api.get(
+                control_endpoint,
+                headers=get_api_headers(),
+                verify=VERIFY_CERTIFICATE,
+            )
-            update_response = api.patch(
-                control_endpoint,
-                json={"evidences": [e.get("id") for e in evidences] + [evidence_id]},
-                headers={"Authorization": f"Token {get_access_token()}"},
-                verify=VERIFY_CERTIFICATE,
-            )
+            update_response = api.patch(
+                control_endpoint,
+                json={"evidences": [e.get("id") for e in evidences] + [evidence_id]},
+                headers=get_api_headers(content_type="application/json"),
+                verify=VERIFY_CERTIFICATE,
+            )
🧹 Nitpick comments (33)
backend/.dockerignore (1)

7-7: Exclude Python virtual environment
The addition of .venv ensures the local Python environment is not sent to Docker builds. Consider also excluding other common venv directories (env/, venv/) if they may appear here, or rely on a shared ignore pattern across components for consistency.

backend/core/views.py (1)

1317-1319: Enable filtering applied controls by ref_id
Adding "ref_id": ["exact"] enhances the AppliedControlFilterSet. To ensure performant queries, verify that ref_id is indexed or add an index if necessary. Also, consider aligning filter capabilities in similar ViewSets (e.g., PolicyViewSet) for consistency.

dispatcher/samples/kafka/redpanda-basic.yml (1)

1-65: Improve service readiness and configurability
The Redpanda and console services are set up correctly for development. As an enhancement:

  • Add healthcheck definitions (e.g., probe Kafka port or HTTP API) to ensure readiness before dependent services start.
  • Parameterize the Redpanda image tag and ports via environment variables or a .env file to simplify upgrades.
  • Consider mapping the console’s CONFIG_FILEPATH to a bind mount so local edits don’t require rebuilding.
dispatcher/.gitignore (1)

1-179: Consolidate ignore rules and avoid duplication
This .gitignore is thorough, but many entries overlap with the root or other components. To reduce maintenance:

  • Reference or import the root ignore patterns where possible (e.g., via a gitinclude or shared template).
  • Prune entries that aren’t applicable to the dispatcher directory (e.g., Django-specific files if the dispatcher isn’t a Django project).
  • Document any optional lock files (e.g., poetry.lock) clearly so contributors know which to commit.
dispatcher/entrypoint.sh (1)

4-6: Use exec to forward signals to the consumer process
Without exec, the shell remains PID 1 and may not propagate SIGTERM/SIGINT to the consume subprocess. Replace the last line with:

- uv run dispatcher.py consume
+ exec uv run dispatcher.py consume
dispatcher/Dockerfile (1)

8-10: Consolidate apt-get commands to reduce layers
Combine update, install, and cleanup into one RUN to slim the image:

-RUN apt-get update && apt-get install -y \
-  libyaml-cpp-dev
-
-RUN apt-get install -y --no-install-recommends curl ca-certificates
+RUN apt-get update && apt-get install -y --no-install-recommends \
+    libyaml-cpp-dev curl ca-certificates && \
+    rm -rf /var/lib/apt/lists/*
integration/basic.py (3)

21-29: Consider using a shared delivery_report implementation.

This function is identical to the one in integration/oscf_parser.py. Consider extracting it to a shared utility module to avoid code duplication.

 # Import the shared version
+from common.kafka_utils import delivery_report
 
-def delivery_report(err, msg):
-    """Callback function for message delivery reports."""
-    if err is not None:
-        print(f"Message delivery failed: {err}")
-    else:
-        print(
-            f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
-        )

11-18: Consider parameterizing the message content for better reusability.

The current implementation uses a hardcoded message. Making the message content configurable would improve the script's reusability for testing different scenarios.

-# The message to send
-MESSAGE = {
-    "message_type": "update_requirement_assessment",
-    "selector": {
-        "compliance_assessment__ref_id": "ISO_001",
-        "requirement__ref_id": "A.5.1",
-    },
-    "values": {"result": "compliant"},
-}
+# Default message to send
+DEFAULT_MESSAGE = {
+    "message_type": "update_requirement_assessment",
+    "selector": {
+        "compliance_assessment__ref_id": "ISO_001",
+        "requirement__ref_id": "A.5.1",
+    },
+    "values": {"result": "compliant"},
+}
+
+def get_message():
+    """Returns the message to send, allowing for customization via environment variables."""
+    import os
+    import json
+    
+    # Check if a custom message is provided via environment variable
+    custom_message = os.environ.get("KAFKA_TEST_MESSAGE")
+    if custom_message:
+        try:
+            return json.loads(custom_message)
+        except json.JSONDecodeError:
+            print(f"Warning: Invalid JSON in KAFKA_TEST_MESSAGE, using default message")
+    
+    return DEFAULT_MESSAGE
+
+MESSAGE = get_message()

1-5: Add docstring and argparse for command-line options.

This script would benefit from a docstring explaining its purpose and from parsing command-line arguments to make it more flexible.

 #!/usr/bin/env python3
+"""
+Kafka message sender utility for testing the observation message dispatcher.
+
+This script connects to a Kafka broker and sends a test message to the 'observation' topic.
+It can be configured via environment variables or command-line arguments.
+"""
 import json
 import os
+import sys
+import argparse
 from confluent_kafka import Producer
+
+def parse_args():
+    """Parse command-line arguments."""
+    parser = argparse.ArgumentParser(description="Send a test message to Kafka")
+    parser.add_argument("--bootstrap-servers", default=os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:19092"),
+                        help="Kafka bootstrap servers (default: KAFKA_BOOTSTRAP_SERVERS env var or localhost:19092)")
+    parser.add_argument("--topic", default=os.environ.get("KAFKA_TOPIC", "observation"),
+                        help="Kafka topic to send message to (default: KAFKA_TOPIC env var or observation)")
+    return parser.parse_args()
+
+# Parse arguments
+args = parse_args()
 
 # Configuration
-BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:19092")
-TOPIC = "observation"
+BOOTSTRAP_SERVERS = args.bootstrap_servers
+TOPIC = args.topic
dispatcher/utils/kafka.py (1)

25-29: Consider implementing SSL certificate configuration.

The SSL certificate configuration is currently commented out. Consider implementing proper support for SSL certificates.

                     "sasl_mechanism": settings.KAFKA_SASL_MECHANISM.upper(),
                     "sasl_plain_username": settings.KAFKA_USERNAME,
                     "sasl_plain_password": settings.KAFKA_PASSWORD,
-                    # optional SSL cert files:
-                    # "ssl_cafile": settings.KAFKA_SSL_CAFILE,
-                    # "ssl_certfile": settings.KAFKA_SSL_CERTFILE,
-                    # "ssl_keyfile": settings.KAFKA_SSL_KEYFILE,
+                    **({"ssl_cafile": settings.KAFKA_SSL_CAFILE} if hasattr(settings, "KAFKA_SSL_CAFILE") and settings.KAFKA_SSL_CAFILE else {}),
+                    **({"ssl_certfile": settings.KAFKA_SSL_CERTFILE} if hasattr(settings, "KAFKA_SSL_CERTFILE") and settings.KAFKA_SSL_CERTFILE else {}),
+                    **({"ssl_keyfile": settings.KAFKA_SSL_KEYFILE} if hasattr(settings, "KAFKA_SSL_KEYFILE") and settings.KAFKA_SSL_KEYFILE else {}),
dispatcher/data/schemas/commands/requirement_assessment/update_requirement_assessment_v1.json (2)

1-25: Schema is well-structured but could benefit from additional descriptions.

The schema correctly defines the structure for requirement assessment update messages with appropriate type constraints. Consider adding descriptions for the selector and values properties to improve documentation.

   "selector": {
     "type": "object",
+    "description": "Properties used to uniquely identify the requirement assessment",
     "additionalProperties": {
       "type": "string"
     }
   },
   "values": {
     "type": "object",
+    "description": "Properties and values to update on the requirement assessment",
     "additionalProperties": {
       "type": ["string", "number", "boolean", "null"]
     }
   }

2-4: Add schema description and examples.

Adding a description field at the schema level and providing examples would improve the schema's usability and documentation.

 {
   "$schema": "http://json-schema.org/draft-07/schema#",
   "title": "update_requirement_assessment",
+  "description": "Schema for messages that update properties of a requirement assessment",
   "type": "object",
+  "examples": [
+    {
+      "message_type": "update_requirement_assessment",
+      "selector": {
+        "compliance_assessment__ref_id": "ISO_001",
+        "requirement__ref_id": "A.5.1"
+      },
+      "values": {
+        "result": "compliant",
+        "comments": "Implementation verified by audit"
+      }
+    }
+  ],
dispatcher/samples/kafka/zk-single-kafka-single.yml (1)

1-39: Add volume persistence for Kafka and Zookeeper data

The current configuration doesn't include volume mappings for Kafka and Zookeeper data, which means all data will be lost when containers are restarted or removed.

  zoo1:
    image: confluentinc/cp-zookeeper:7.8.0
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
+   volumes:
+     - zookeeper-data:/var/lib/zookeeper/data
+     - zookeeper-log:/var/lib/zookeeper/log
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.8.0
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
+   volumes:
+     - kafka-data:/var/lib/kafka/data
    environment:
      ...
+
+volumes:
+  zookeeper-data:
+  zookeeper-log:
+  kafka-data:
config/templates/docker-compose-sqlite-traefik.yml.j2 (1)

117-149: Add resource limits to prevent resource exhaustion

The dispatcher service doesn't have CPU or memory limits defined, which could lead to resource exhaustion issues if the service consumes excessive resources.

  dispatcher:
    container_name: dispatcher
    image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
    # build:
    #   context: ../dispatcher
    restart: always
+   deploy:
+     resources:
+       limits:
+         cpus: '0.50'
+         memory: 256M
+       reservations:
+         cpus: '0.25'
+         memory: 128M
    environment:
      - API_URL=http://backend:8000/api
dispatcher/data/schemas/commands/evidence/upload_attachment_v1.json (2)

34-39: Add schema documentation for applied_controls structure

The applied_controls property lacks detailed documentation about its structure and purpose.

        "applied_controls": {
          "type": "object",
          "additionalProperties": {
            "type": "string"
-          }
+          },
+          "description": "Mapping of applied control IDs to their updated statuses",
+          "example": {"applied-control-id-1": "effective", "applied-control-id-2": "implemented"}
        }

2-4: Add schema version property

JSON schemas typically include a version property to support schema evolution and backward compatibility.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "upload_attachment",
+  "version": "1.0",
  "type": "object",
documentation/architecture/event-streaming.md (2)

68-71: Fix list formatting for improved readability.

The list items have inconsistent formatting with loose punctuation marks after the field names.

-The following fields are common to all commands:
-
-- `message_type: string`: Use case to trigger (e.g. `update_applied_control_status`, `update_requirement_assessment_status`, `attach_evidence_to_applied_control`...)
-- `version: int`: Schema version
-- `selector: object`: Key-value pairs to pass to the API as filters (e.g. `{"ref_id": "ID.AM-01", "folder": "global/mydomain", "target": "single"}`)
+The following fields are common to all commands:
+
+- `message_type`: Use case to trigger (e.g. `update_applied_control_status`, `update_requirement_assessment_status`, `attach_evidence_to_applied_control`...)
+- `version`: Schema version
+- `selector`: Key-value pairs to pass to the API as filters (e.g. `{"ref_id": "ID.AM-01", "folder": "global/mydomain", "target": "single"}`)
🧰 Tools
🪛 LanguageTool

[uncategorized] ~68-~68: Loose punctuation mark.
Context: ... all commands: - message_type: string: Use case to trigger (e.g. `update_appli...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~69-~69: Loose punctuation mark.
Context: ...to_applied_control...) - version: int: Schema version - selector: object`: Ke...

(UNLIKELY_OPENING_PUNCTUATION)


124-124: Complete the Schema Registry configuration section.

The "TBD" placeholder for Schema Registry configuration should be defined to ensure proper implementation.

This section should include details such as:

  • Schema Registry URL/endpoint
  • Authentication mechanism
  • Schema naming conventions
  • Compatibility settings
  • How schemas are registered and versioned

Would you like me to help you draft this section with appropriate configuration details?

dispatcher/filtering.py (3)

3-4: Import statement clean-up required.

The code imports both the requests and utils.api modules, but only uses the API module for making requests.

 import requests
 from loguru import logger
 import utils.api as api

The requests import is still needed for the exception handling (requests.exceptions.RequestException), so it should be kept.


17-19: Improve error message in docstring.

The docstring description for selector parameter incorrectly describes it as a list of dictionaries, but the parameter type is dict[str, str].

-        selector (list): List of dictionaries with key-value pairs. Expected keys include 'domain', 'ref_id',
+        selector (dict[str, str]): Dictionary with key-value pairs. Expected keys include 'domain', 'ref_id',

62-70: Enhance error logging with detailed information.

The error logging for request failures could be further improved with more contextual information.

         except requests.exceptions.RequestException as e:
             logger.error(
-                "Search failed",
+                "API search request failed",
+                url=next_url,
+                params=query_params if next_url == endpoint else {},
                 response=e.response,
                 status_code=e.response.status_code
                 if hasattr(e, "response") and e.response
                 else None,
             )
             raise
dispatcher/tests/integration/test_messages.py (3)

1-1: Remove unused import.

The uuid module is imported but never used in the file.

-import uuid
 from testcontainers.core.container import DockerContainer
🧰 Tools
🪛 Ruff (0.8.2)

1-1: uuid imported but unused

Remove unused import: uuid

(F401)


30-43: Combine nested 'with' statements for cleaner code.

The nested with statements can be combined for better readability and maintainability.

-    with DockerImage(path=BACKEND_DIR) as image:
-        with (
-            DockerContainer(image=str(image))
-            .with_env("ALLOWED_HOSTS", "backend,localhost")
-            .with_env("DJANGO_DEBUG", "True")
-            .with_env("DJANGO_SUPERUSER_EMAIL", DJANGO_SUPERUSER_EMAIL)
-            .with_env("DJANGO_SUPERUSER_PASSWORD", DJANGO_SUPERUSER_PASSWORD)
-            .with_volume_mapping(
-                str(BACKEND_DIR / "db"),
-                "/code/db",
-                "rw",
-            )
-            .with_exposed_ports(8000) as container
-        ):
+    with DockerImage(path=BACKEND_DIR) as image, (
+        DockerContainer(image=str(image))
+        .with_env("ALLOWED_HOSTS", "backend,localhost")
+        .with_env("DJANGO_DEBUG", "True")
+        .with_env("DJANGO_SUPERUSER_EMAIL", DJANGO_SUPERUSER_EMAIL)
+        .with_env("DJANGO_SUPERUSER_PASSWORD", DJANGO_SUPERUSER_PASSWORD)
+        .with_volume_mapping(
+            str(BACKEND_DIR / "db"),
+            "/code/db",
+            "rw",
+        )
+        .with_exposed_ports(8000) as container
+    ):
🧰 Tools
🪛 Ruff (0.8.2)

30-43: Use a single with statement with multiple contexts instead of nested with statements

Combine with statements

(SIM117)


88-88: Remove debug print statement.

The print(vars(result)) statement appears to be leftover debugging code that should be removed before merging.

     assert result.exit_code == 0
-    print(vars(result))
     assert "Successfully authenticated" in result.output
integration/oscf_parser.py (3)

4-4: Remove unused import.

The sys module is imported but never used in the file.

 import json
 import os
-import sys
 import argparse
🧰 Tools
🪛 Ruff (0.8.2)

4-4: sys imported but unused

Remove unused import: sys

(F401)


68-70: Simplify nested conditionals.

The nested if statements can be combined for better readability.

-            # Check if node contains unmapped compliance data for the requested framework
-            if "unmapped" in node and "compliance" in node["unmapped"]:
-                if framework in node["unmapped"]["compliance"]:
+            # Check if node contains unmapped compliance data for the requested framework
+            if ("unmapped" in node and "compliance" in node["unmapped"] 
+                and framework in node["unmapped"]["compliance"]):
🧰 Tools
🪛 Ruff (0.8.2)

68-69: Use a single if statement instead of nested if statements

(SIM102)


142-142: Unused loop variable.

The loop control variable key is not used within the loop body. Consider changing the loop structure.

-    for key, data in unique_messages.items():
+    for data in unique_messages.values():
🧰 Tools
🪛 Ruff (0.8.2)

142-142: Loop control variable key not used within loop body

(B007)

dispatcher/dispatcher.py (2)

36-36: Unused global auth_data variable

auth_data = dict() is declared but never referenced elsewhere in the module. Please remove it or populate it with meaningful state to avoid dead code and reader confusion.

- auth_data = dict()

125-129: Decode errors are swallowed – consider routing to the error topic

When JSON decoding fails you continue, effectively discarding the original payload.
Forwarding the raw message to ERRORS_TOPIC would preserve it for later inspection.

-            except Exception as e:
-                logger.error(f"Error decoding message: {e}")
-                continue
+            except Exception as e:
+                logger.error("Error decoding message", error=str(e))
+                error_producer.send(
+                    settings.ERRORS_TOPIC,
+                    value=json.dumps(
+                        {"raw_value": msg.value.decode("utf-8", errors="replace"),
+                         "error": str(e)}
+                    ).encode(),
+                )
+                continue
dispatcher/settings.py (1)

265-270: DEBUG flag may remain string

DEBUG = config.get("debug", False) can still be the original string "True", which is truthy even when falsey values are intended. Cast explicitly:

-DEBUG = config.get("debug", False)
+DEBUG = bool(config.get("debug"))
dispatcher/messages.py (3)

16-23: MessageRegistry lacks a method to retrieve handlers.

The registry implements an add method but doesn't provide a way to look up or retrieve handlers for dispatching.

Consider adding a retrieval method to the registry:

 class MessageRegistry:
     REGISTRY = {}
 
     def add(self, message):
         self.REGISTRY[message.__name__] = message
 
+    def get(self, message_type: str):
+        """Get the message handler by message type."""
+        handler = self.REGISTRY.get(message_type)
+        if not handler:
+            raise Exception(f"No handler registered for message type: {message_type}")
+        return handler

175-181: Add return type annotations for consistency.

Resource-specific update functions should include return type annotations for consistency with other functions in the module.

-def update_applied_control(message: dict):
+def update_applied_control(message: dict) -> list:
     return update_objects(message, "applied-controls")


-def update_requirement_assessment(message: dict):
+def update_requirement_assessment(message: dict) -> list:
     return update_objects(message, "requirement-assessments")

369-387: Add return type annotation and consider returning evidence details.

The upload_attachment function doesn't have a return type annotation, and it could be useful to return information about the created evidence.

-def upload_attachment(message: dict):
+def upload_attachment(message: dict) -> dict:
     """
     Main function to process attachment upload.
     Determines file source (base64 or S3), creates or finds the related evidence,
     uploads the file, and then updates applied controls if provided.
+    
+    Returns:
+        dict: Information about the uploaded evidence.
     """
     selector = message.get("selector", {})
     values = message.get("values", {})

     # Load file from provided source (base64 or S3)
     file_name, file_obj = get_file_from_message(values)

     # Get or create the evidence and upload the file
     evidence_id = get_or_create("evidences", selector, values, file_name)
     upload_file_to_evidence(evidence_id, file_name, file_obj)

     # Update applied controls if any
     update_applied_controls_with_evidence(values, evidence_id, file_name)
+    
+    # Return information about the evidence
+    return {
+        "evidence_id": evidence_id,
+        "file_name": file_name,
+        "status": "uploaded"
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f44c161 and bbf7223.

⛔ Files ignored due to path filters (1)
  • dispatcher/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (29)
  • backend/.dockerignore (1 hunks)
  • backend/core/views.py (2 hunks)
  • config/make_config.py (3 hunks)
  • config/templates/docker-compose-postgresql-caddy.yml.j2 (1 hunks)
  • config/templates/docker-compose-postgresql-traefik.yml.j2 (1 hunks)
  • config/templates/docker-compose-sqlite-caddy.yml.j2 (1 hunks)
  • config/templates/docker-compose-sqlite-traefik.yml.j2 (1 hunks)
  • dispatcher/.dockerignore (1 hunks)
  • dispatcher/.gitignore (1 hunks)
  • dispatcher/Dockerfile (1 hunks)
  • dispatcher/README.md (1 hunks)
  • dispatcher/data/schemas/commands/applied_control/update_applied_control_v1.json (1 hunks)
  • dispatcher/data/schemas/commands/evidence/upload_attachment_v1.json (1 hunks)
  • dispatcher/data/schemas/commands/requirement_assessment/update_requirement_assessment_v1.json (1 hunks)
  • dispatcher/dispatcher.py (1 hunks)
  • dispatcher/entrypoint.sh (1 hunks)
  • dispatcher/filtering.py (1 hunks)
  • dispatcher/messages.py (1 hunks)
  • dispatcher/pyproject.toml (1 hunks)
  • dispatcher/samples/kafka/redpanda-basic.yml (1 hunks)
  • dispatcher/samples/kafka/zk-single-kafka-single.yml (1 hunks)
  • dispatcher/samples/messages/update_requirement_assessment.json (1 hunks)
  • dispatcher/settings.py (1 hunks)
  • dispatcher/tests/integration/test_messages.py (1 hunks)
  • dispatcher/utils/api.py (1 hunks)
  • dispatcher/utils/kafka.py (1 hunks)
  • documentation/architecture/event-streaming.md (1 hunks)
  • integration/basic.py (1 hunks)
  • integration/oscf_parser.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
dispatcher/entrypoint.sh (1)
dispatcher/dispatcher.py (1)
  • consume (81-192)
integration/basic.py (1)
integration/oscf_parser.py (1)
  • delivery_report (14-21)
dispatcher/utils/api.py (1)
dispatcher/settings.py (1)
  • get_access_token (282-300)
integration/oscf_parser.py (3)
integration/basic.py (1)
  • delivery_report (21-28)
backend/ebios_rm/models.py (1)
  • ref_id (668-669)
backend/core/views.py (1)
  • result (4636-4637)
dispatcher/messages.py (3)
dispatcher/filtering.py (1)
  • process_selector (6-109)
dispatcher/settings.py (1)
  • get_access_token (282-300)
dispatcher/utils/api.py (4)
  • get_api_headers (54-65)
  • get (14-19)
  • patch (30-35)
  • post (22-27)
🪛 Ruff (0.8.2)
dispatcher/tests/integration/test_messages.py

1-1: uuid imported but unused

Remove unused import: uuid

(F401)


30-43: Use a single with statement with multiple contexts instead of nested with statements

Combine with statements

(SIM117)

integration/oscf_parser.py

4-4: sys imported but unused

Remove unused import: sys

(F401)


68-69: Use a single if statement instead of nested if statements

(SIM102)


142-142: Loop control variable key not used within loop body

(B007)

🪛 LanguageTool
documentation/architecture/event-streaming.md

[uncategorized] ~68-~68: Loose punctuation mark.
Context: ... all commands: - message_type: string: Use case to trigger (e.g. `update_appli...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~69-~69: Loose punctuation mark.
Context: ...to_applied_control...) - version: int: Schema version - selector: object`: Ke...

(UNLIKELY_OPENING_PUNCTUATION)

dispatcher/README.md

[uncategorized] ~194-~194: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~195-~195: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~215-~215: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~216-~216: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~240-~240: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~241-~241: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~242-~242: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)

🪛 markdownlint-cli2 (0.17.2)
dispatcher/README.md

10-10: Link fragments should be valid
null

(MD051, link-fragments)

🔇 Additional comments (21)
backend/core/views.py (1)

4547-4555: Add nested filters to RequirementAssessmentViewSet
Introducing "folder__name", "requirement__ref_id", "compliance_assessment__ref_id", and "compliance_assessment__assets__ref_id" gives API clients more granular lookups. Please:

  1. Confirm these joins do not introduce N+1 issues—add select_related/prefetch_related if needed.
  2. Update API documentation (OpenAPI schema) to list these new parameters.
  3. Monitor query performance and consider database indexes on the referenced fields.
dispatcher/entrypoint.sh (1)

1-2: Strict error handling enabled
The script correctly uses set -euo pipefail to catch errors, undefined variables, and pipeline failures.

dispatcher/samples/messages/update_requirement_assessment.json (1)

1-10: Sample message format is correct
The JSON accurately reflects the expected structure for an update_requirement_assessment message, matching the dispatcher’s schema and registry.

dispatcher/.dockerignore (1)

1-21: Docker ignore file looks good.

The Dockerignore file correctly excludes common files and directories that shouldn't be included in the Docker image, such as temporary files, environment directories, development artifacts, and sensitive information.

config/templates/docker-compose-postgresql-traefik.yml.j2 (1)

153-187: Well-structured Kafka dispatcher configuration.

The new dispatcher service is well-integrated with the existing Docker Compose structure. The conditional rendering based on kafka_dispatcher.enabled allows for optional inclusion, which is a good practice for modular deployments.

I appreciate the comprehensive environment variable configuration covering:

  • API connectivity
  • Kafka broker settings
  • Authentication options (both credentials and token-based)
  • S3 storage integration

The service properly depends on the backend being healthy before starting, ensuring proper initialization order.

config/templates/docker-compose-postgresql-caddy.yml.j2 (1)

129-163: Consistent dispatcher configuration across templates.

The dispatcher service configuration is properly implemented and maintains consistency with the Traefik template, which is crucial for ensuring a uniform deployment experience regardless of the chosen proxy solution.

The implementation correctly handles all required environment variables and dependencies, following the same patterns used throughout the rest of the compose file.

config/make_config.py (1)

203-208: Good improvement to compose file generation.

Removing empty lines from the generated Docker Compose file is a nice enhancement that results in cleaner, more readable output files. This change won't affect functionality but improves maintainability.

dispatcher/utils/api.py (4)

4-12: Good implementation of session management with error handling.

The session object and token update functionality follow best practices:

  1. Using a shared session improves performance by reusing connections
  2. Proper error handling when the token is missing
  3. Clear error message that indicates the specific issue

This approach ensures that authentication failures are detected early with actionable error messages.


14-52: Well-implemented HTTP request methods with timeout defaults.

The HTTP methods (get, post, patch, put, delete) are implemented consistently with:

  1. Default timeouts to prevent hanging connections
  2. Proper error handling with raise_for_status()
  3. Consistent return of the response object

This implementation helps prevent common issues with API requests like infinite waits and undetected errors.


54-66: Robust API header construction with validation.

The get_api_headers function properly:

  1. Validates the token before constructing headers
  2. Uses appropriate defaults for optional parameters
  3. Handles extra headers correctly

This implementation ensures that API requests will have properly formatted authentication headers.


68-73: Clean implementation of API endpoint construction.

The make_api_endpoint function:

  1. Properly handles path segments with or without slashes
  2. Ensures consistent trailing slash required by many REST APIs
  3. Provides good documentation of its purpose

This utility will help maintain consistent endpoint formatting throughout the application.

dispatcher/filtering.py (1)

6-45: LGTM! Good defensive programming practices implemented.

The function has comprehensive validation, proper error handling, and makes effective use of pagination. You've already implemented key improvements from previous reviews, such as:

  • Using None as default for mutable arguments
  • Adding timeout to API requests
  • Handling missing 'id' fields properly
dispatcher/tests/integration/test_messages.py (4)

55-57: LGTM! Test correctly verifies API container status.

The test properly checks if the API is running by decoding logs and verifying expected output.


59-63: LGTM! Kafka connectivity check is well-implemented.

The test effectively verifies that Kafka is running and accessible on the expected port.


66-71: LGTM! Dispatcher CLI test is well-structured.

The test correctly invokes the dispatcher CLI with the help command and verifies both the exit code and expected output.


74-89: LGTM! Authentication test is comprehensive.

The test successfully verifies that the dispatcher can authenticate using credentials, properly checking both exit code and output message.

integration/oscf_parser.py (2)

144-151: LGTM! Well-structured message format.

The Kafka message structure is clear and follows a consistent pattern with message_type, selector, and values fields.


171-196: LGTM! Command-line arguments well-defined.

The command-line argument parser is well-structured with appropriate help text and default values.

dispatcher/messages.py (3)

199-200: Good improvement using standard base64 decoding.

You've correctly used base64.b64decode() instead of the deprecated .decode("base64") method, which is more robust and compatible with Python 3.


56-84: The get_object_ids function properly handles None results.

Good implementation of error handling when the selector query returns no results, with clear error messaging and appropriate logging.


389-391: The message registry setup looks good.

The message handlers are properly registered with the registry for dispatching based on message types.

Comment on lines +95 to +128
{% if kafka_dispatcher.enabled %}
dispatcher:
container_name: dispatcher
image: ghcr.io/intuitem/ciso-assistant-community/dispatcher:latest
# build:
# context: ../dispatcher
restart: always
environment:
- API_URL=http://backend:8000/api
- BOOTSTRAP_SERVERS={{ kafka_dispatcher.broker_url }}
- KAFKA_USE_AUTH={{ kafka_dispatcher.kafka_use_auth }}
{% if kafka_dispatcher.kafka_use_auth %}
- KAFKA_SASL_MECHANISM={{ kafka_dispatcher.kafka_sasl_mechanism }}
- KAFKA_USERNAME={{ kafka_dispatcher.kafka_username }}
- KAFKA_PASSWORD={{ kafka_dispatcher.kafka_password }}
{% endif %}
- OBSERVATION_TOPIC={{ kafka_dispatcher.observation_topic }}
- ERRORS_TOPIC={{ kafka_dispatcher.errors_topic }}
{% if kafka_dispatcher.authentication == 'credentials' %}
- USER_EMAIL={{ kafka_dispatcher.credentials.user_email }}
- USER_PASSWORD={{ kafka_dispatcher.credentials.user_password }}
- AUTO_RENEW_SESSION={{ kafka_dispatcher.auto_renew_session }}
{% elif kafka_dispatcher.authentication == 'token' %}
- TOKEN={{ kafka_dispatcher.token }}
{% endif %}
{% if kafka_dispatcher.s3_url %}
- S3_URL={{ kafka_dispatcher.s3_url }}
- S3_ACCESS_KEY={{ kafka_dispatcher.s3_access_key }}
- S3_SECRET_KEY={{ kafka_dispatcher.s3_secret_key }}
{% endif %}
depends_on:
backend:
condition: service_healthy
{% endif %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Document when to use commented build section

The commented-out build section suggests local building is supported, but it's not clear when users should uncomment it.


🏁 Script executed:

#!/bin/bash
# Check for documentation on when to use the build section instead of the image
grep -r "build.*context.*dispatcher" --include="*.md" dispatcher/
grep -r "locally build" --include="*.md" dispatcher/

Length of output: 122


Document usage of the commented‑out build block

The # build: section in config/templates/docker-compose-sqlite-caddy.yml.j2 implies local‐build support, but there’s no guidance on when to uncomment it.

• In config/templates/docker-compose-sqlite-caddy.yml.j2 (lines 95–128): add an inline comment or link explaining that uncommenting the build block will build the dispatcher locally instead of pulling the image.
• In your main README.md (or a new docs/development.md): include a “Local development” section that describes how and when to enable the build context for the dispatcher.
• Optionally, reference any environment‑specific caveats (e.g., needing Docker Engine installed) when opting into the local build.

Comment on lines +40 to +62
"debug": os.getenv("DEBUG", "").lower() == "true" or None,
"rest": {
"url": os.getenv("API_URL"),
"verify_certificate": os.getenv("VERIFY_CERTIFICATE") == "True" or None,
},
"credentials": {
"token": os.getenv("USER_TOKEN"),
"email": os.getenv("USER_EMAIL"),
"password": os.getenv("USER_PASSWORD"),
},
"kafka": {
"use_auth": os.getenv("KAFKA_USE_AUTH") == "True" or None,
"sasl_mechanism": os.getenv("KAFKA_SASL_MECHANISM"),
"sasl_plain_username": os.getenv("KAFKA_USERNAME"),
"sasl_plain_password": os.getenv("KAFKA_PASSWORD"),
},
"auto_renew_session": os.getenv("AUTO_RENEW_SESSION") == "True" or None,
"bootstrap_servers": os.getenv("BOOTSTRAP_SERVERS"),
"errors_topic": os.getenv("ERRORS_TOPIC"),
"s3_url": os.getenv("S3_URL"),
"s3_access_key": os.getenv("S3_ACCESS_KEY"),
"s3_secret_key": os.getenv("S3_SECRET_KEY"),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Boolean env‑var parsing inconsistent & fragile

Using == "True" (title‑case) for some flags and .lower() == "true" for others makes behaviour unpredictable (“true”, “1”, “yes” will be ignored).
Consider a helper:

def _truthy(v):  # "" → None
    return None if v is None else v.lower() in ("true", "1", "yes")

and apply uniformly to all boolean env‑vars.

Comment on lines +86 to +103
def update_single_object(resource_endpoint: str, obj_id: str, values: dict) -> dict:
"""
Updates a single object using a PATCH request and returns the updated object.
"""
patch_url = f"{API_URL}/{resource_endpoint}/{obj_id}/"

logger.debug(f"Updating {resource_endpoint} {obj_id}", values=values)

res = api.patch(
patch_url,
json=values,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Token {get_access_token()}",
},
verify=VERIFY_CERTIFICATE,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use get_api_headers utility function for consistent header construction.

The function manually constructs API headers when the imported get_api_headers utility should be used for consistency.

-    res = api.patch(
-        patch_url,
-        json=values,
-        headers={
-            "Accept": "application/json",
-            "Content-Type": "application/json",
-            "Authorization": f"Token {get_access_token()}",
-        },
-        verify=VERIFY_CERTIFICATE,
-    )
+    res = api.patch(
+        patch_url,
+        json=values,
+        headers=get_api_headers(content_type="application/json"),
+        verify=VERIFY_CERTIFICATE,
+    )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def update_single_object(resource_endpoint: str, obj_id: str, values: dict) -> dict:
"""
Updates a single object using a PATCH request and returns the updated object.
"""
patch_url = f"{API_URL}/{resource_endpoint}/{obj_id}/"
logger.debug(f"Updating {resource_endpoint} {obj_id}", values=values)
res = api.patch(
patch_url,
json=values,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Token {get_access_token()}",
},
verify=VERIFY_CERTIFICATE,
)
def update_single_object(resource_endpoint: str, obj_id: str, values: dict) -> dict:
"""
Updates a single object using a PATCH request and returns the updated object.
"""
patch_url = f"{API_URL}/{resource_endpoint}/{obj_id}/"
logger.debug(f"Updating {resource_endpoint} {obj_id}", values=values)
res = api.patch(
patch_url,
json=values,
headers=get_api_headers(content_type="application/json"),
verify=VERIFY_CERTIFICATE,
)
return res.json()

Comment on lines +250 to +256
logger.info("Creating new object with name: {}", name, values=values)
response = api.post(
objects_endpoint,
data={"name": values.get("name", name)},
headers={"Authorization": f"Token {get_access_token()}"},
verify=VERIFY_CERTIFICATE,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use json parameter instead of data for API requests and get_api_headers for consistency.

When creating objects, you should:

  1. Use json instead of data for structured JSON data
  2. Use get_api_headers() for consistent header formatting
  3. Set proper content type
-        response = api.post(
-            objects_endpoint,
-            data={"name": values.get("name", name)},
-            headers={"Authorization": f"Token {get_access_token()}"},
-            verify=VERIFY_CERTIFICATE,
-        )
+        response = api.post(
+            objects_endpoint,
+            json={"name": values.get("name", name)},
+            headers=get_api_headers(content_type="application/json"),
+            verify=VERIFY_CERTIFICATE,
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info("Creating new object with name: {}", name, values=values)
response = api.post(
objects_endpoint,
data={"name": values.get("name", name)},
headers={"Authorization": f"Token {get_access_token()}"},
verify=VERIFY_CERTIFICATE,
)
logger.info("Creating new object with name: {}", name, values=values)
response = api.post(
objects_endpoint,
json={"name": values.get("name", name)},
headers=get_api_headers(content_type="application/json"),
verify=VERIFY_CERTIFICATE,
)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
dispatcher/README.md (1)

10-10: 🛠️ Refactor suggestion

Fix broken Installation link or add an Installation section.

The doc at line 10 points to [Installation](#installation), but no such ## Installation heading exists. This will result in a broken anchor. Please either:

  • Insert a ## Installation section before "Running the dispatcher as a CLI tool", or
  • Update the link to reference the correct heading (e.g., [Running the dispatcher as a CLI tool](#running-the-dispatcher-as-a-cli-tool)).
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

10-10: Link fragments should be valid
null

(MD051, link-fragments)

🧹 Nitpick comments (3)
dispatcher/README.md (3)

21-42: Optional: Unify command invocation style.

The guide switches between uv run dispatcher.py … for CLI commands and python dispatcher.py … for config initialization. For consistency and to reduce confusion, consider standardizing on one invocation method (e.g., always using uv run dispatcher.py … once dependencies are installed).

Also applies to: 108-115


46-49: Use GitHub‑compatible admonition syntax.

The [!NOTE] block (lines 46–49) follows a syntax not natively supported by GitHub-flavored Markdown. You might switch to a standard blockquote or add a brief subheading. For example:

> **Note:** It is recommended you use environment variables…

This ensures the note renders correctly on GitHub.


286-287: Clarify placeholder for API URL.

The text at lines 286–287 shows <CISO_ASSISTANT_API_URL>/schema/swagger/. Angle brackets may be interpreted as HTML and not show up. Consider wrapping this in backticks or providing a proper link, e.g.:

[`$API_URL/schema/swagger/`](<YOUR_API_URL>/schema/swagger/)

This will render correctly and guide users to replace the placeholder.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bbf7223 and a20636d.

📒 Files selected for processing (2)
  • dispatcher/README.md (1 hunks)
  • dispatcher/utils/kafka.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • dispatcher/utils/kafka.py
🧰 Additional context used
🪛 LanguageTool
dispatcher/README.md

[uncategorized] ~194-~194: Loose punctuation mark.
Context: ...applied controls. Fields: - selector: Identifies the target applied control t...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~195-~195: Loose punctuation mark.
Context: ...et applied control to update. - values: The updated values of the target applie...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~215-~215: Loose punctuation mark.
Context: ...ment assessments. Fields: - selector: Identifies the target requirement asses...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~216-~216: Loose punctuation mark.
Context: ...irement assessment to update. - values: The updated values of the target requir...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~237-~237: Loose punctuation mark.
Context: ...from a S3 bucket. Fields: - selector: Identifies the target evidence to uploa...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~239-~239: Loose punctuation mark.
Context: ... name. - values - applied_controls: The applied control to which the eviden...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~240-~240: Loose punctuation mark.
Context: ...to the existing ones. - file_content: The base64 encoded content of the file ...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~241-~241: Loose punctuation mark.
Context: ...ucketis not specified. -file_name`: The name of the file to upload. This fi...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~242-~242: Loose punctuation mark.
Context: ... field is required. - file_s3_bucket: The S3 bucket where the file is stored....

(UNLIKELY_OPENING_PUNCTUATION)

🪛 markdownlint-cli2 (0.17.2)
dispatcher/README.md

10-10: Link fragments should be valid
null

(MD051, link-fragments)

⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: startup-functional-test (3.12)
  • GitHub Check: enterprise-startup-functional-test (3.12)
  • GitHub Check: Analyze (python)
  • GitHub Check: enterprise-startup-docker-compose-test
  • GitHub Check: functional-tests (3.12, chromium)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: startup-docker-compose-test
  • GitHub Check: enterprise-functional-tests (3.12, chromium)
  • GitHub Check: build (3.12)
  • GitHub Check: test (3.12)

3. **Run the interactive configuration script:**

```bash
uv run dispatcher.py init-config -i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure CLI command names match implementation.

The docs use init-config (with a hyphen) in both the uv and python invocations (uv run dispatcher.py init-config -i at 35 and python dispatcher.py init-config at 111), but the code’s entrypoint defines the command as init_config (with an underscore). This mismatch will cause users to get a "no such command" error.
Please verify the actual command name in dispatcher.py and align the docs accordingly.

Also applies to: 111-111

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
dispatcher/utils/kafka.py (1)

1-1: Use absolute or explicit relative imports

The current import style may lead to module resolution issues when the package structure changes.

Replace with an explicit import path:

-import settings
+from dispatcher import settings

Or, if within the same package:

-import settings
+from . import settings
🧹 Nitpick comments (7)
dispatcher/utils/kafka.py (7)

6-10: Log message format duplicates level formatting

The log message format applies the level formatting twice - once for the level itself and once for the message content.

 log_message_format = (
-    "<green>{time}</green> | <level>{level}</level> | <level>{message}</level>"
+    "<green>{time}</green> | <level>{level}</level> | <cyan>{message}</cyan>"
 )
 if settings.DEBUG:
     log_message_format += " | <magenta>{extra}</magenta>"

12-19: Add logging level configuration based on DEBUG setting

The logging setup removes the default handler but doesn't adjust the verbosity level based on the DEBUG setting.

 logger.remove(0)
 logger.add(
     sys.stderr,
     format=log_message_format,
     colorize=True,
     backtrace=True,
     diagnose=True,
+    level="DEBUG" if settings.DEBUG else "INFO",
 )

22-26: Improve function type hints and docstring

The current type hints and docstring could be more specific about return types and exceptions.

-def build_kafka_config(use_auth: bool = settings.KAFKA_USE_AUTH) -> dict:
+from typing import Dict, Any
+
+def build_kafka_config(use_auth: bool = settings.KAFKA_USE_AUTH) -> Dict[str, Any]:
     """
     Returns the dict of kwargs to pass to KafkaConsumer/Producer.
     If use_auth=True, it will include the SASL/SSL or OAuthBearer settings.
+    
+    Raises:
+        ValueError: If BOOTSTRAP_SERVERS is not configured or if authentication
+                   credentials are missing when authentication is enabled.
     """

27-31: Add validation for the BOOTSTRAP_SERVERS format

The current implementation validates that BOOTSTRAP_SERVERS exists but doesn't check its format.

     if not settings.BOOTSTRAP_SERVERS:
         raise ValueError("BOOTSTRAP_SERVERS not configured")
+        
+    # Validate format of bootstrap servers
+    servers = settings.BOOTSTRAP_SERVERS.split(',')
+    if not all(':' in server for server in servers):
+        logger.warning("BOOTSTRAP_SERVERS may have invalid format. Expected format: 'host1:port1,host2:port2'")
+        
     cfg = {
         "bootstrap_servers": settings.BOOTSTRAP_SERVERS,
     }

51-62: Document SSL certificate configuration and consider enabling it

The SSL certificate configuration is commented out, suggesting it's not fully implemented yet.

             cfg.update(
                 {
                     "security_protocol": "SASL_SSL",
                     "sasl_mechanism": settings.KAFKA_SASL_MECHANISM.upper(),
                     "sasl_plain_username": settings.KAFKA_USERNAME,
                     "sasl_plain_password": settings.KAFKA_PASSWORD,
-                    # optional SSL cert files:
-                    # "ssl_cafile": settings.KAFKA_SSL_CAFILE,
-                    # "ssl_certfile": settings.KAFKA_SSL_CERTFILE,
-                    # "ssl_keyfile": settings.KAFKA_SSL_KEYFILE,
                 }
             )
+            
+            # Add SSL certificate configuration if provided
+            if hasattr(settings, 'KAFKA_SSL_CAFILE') and settings.KAFKA_SSL_CAFILE:
+                cfg['ssl_cafile'] = settings.KAFKA_SSL_CAFILE
+            if hasattr(settings, 'KAFKA_SSL_CERTFILE') and settings.KAFKA_SSL_CERTFILE:
+                cfg['ssl_certfile'] = settings.KAFKA_SSL_CERTFILE
+            if hasattr(settings, 'KAFKA_SSL_KEYFILE') and settings.KAFKA_SSL_KEYFILE:
+                cfg['ssl_keyfile'] = settings.KAFKA_SSL_KEYFILE

22-69: Add support for common Kafka configuration parameters

The current function only configures authentication and bootstrap servers, but there are other common parameters that may be useful.

Consider adding support for common Kafka parameters like group_id, auto_offset_reset, etc. This would make the function more reusable:

-def build_kafka_config(use_auth: bool = settings.KAFKA_USE_AUTH) -> dict:
+def build_kafka_config(
+    use_auth: bool = settings.KAFKA_USE_AUTH,
+    extra_config: Dict[str, Any] = None
+) -> Dict[str, Any]:
     """
     Returns the dict of kwargs to pass to KafkaConsumer/Producer.
     If use_auth=True, it will include the SASL/SSL or OAuthBearer settings.
+    
+    Args:
+        use_auth: Whether to include authentication settings
+        extra_config: Additional configuration parameters to include
     
+    Returns:
+        Dict with Kafka configuration parameters
+    
+    Raises:
+        ValueError: If required configuration is missing
     """
     if not settings.BOOTSTRAP_SERVERS:
         raise ValueError("BOOTSTRAP_SERVERS not configured")
     cfg = {
         "bootstrap_servers": settings.BOOTSTRAP_SERVERS,
     }
     
+    # Add common parameters if they exist in settings
+    for param in ['GROUP_ID', 'AUTO_OFFSET_RESET', 'ENABLE_AUTO_COMMIT']:
+        settings_key = f"KAFKA_{param}"
+        if hasattr(settings, settings_key) and getattr(settings, settings_key) is not None:
+            cfg[param.lower()] = getattr(settings, settings_key)

     # Authentication configuration
     # ...
     
+    # Add any extra config parameters
+    if extra_config:
+        cfg.update(extra_config)
     
     return cfg

12-19: Add error handling for logger.remove(0)

The code currently removes logger handler 0 without checking if it exists.

-logger.remove(0)
+try:
+    logger.remove(0)
+except ValueError:
+    # Handler might not exist, which is fine
+    pass
 logger.add(
     sys.stderr,
     format=log_message_format,
     colorize=True,
     backtrace=True,
     diagnose=True,
 )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a20636d and d4cb71c.

📒 Files selected for processing (2)
  • dispatcher/settings.py (1 hunks)
  • dispatcher/utils/kafka.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • dispatcher/settings.py
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Analyze (python)
  • GitHub Check: startup-functional-test (3.12)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: enterprise-startup-functional-test (3.12)
  • GitHub Check: functional-tests (3.12, chromium)
  • GitHub Check: startup-docker-compose-test
  • GitHub Check: test (3.12)
  • GitHub Check: enterprise-functional-tests (3.12, chromium)
  • GitHub Check: build (3.12)
  • GitHub Check: enterprise-startup-docker-compose-test

@ab-smith ab-smith merged commit d234adc into main Apr 23, 2025
19 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Apr 23, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants