Skip to content

CFR: Assorted fixes and improvements #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Unreleased
- I/O: Updated to `ingestr>=0.13.61`
- CFR: Improved log output
- CFR: Fixed double quoting of table name. Thanks, @karynzv.
- CFR: When importing, started using `replace` strategy instead of `append`
- CFR: Improved importing data re. type mapping without NumPy
- CFR: Truncated target table before importing, using `append`
strategy again, because `replace` doesn't do the right DDL.

## 2025/07/01 v0.0.37
- Settings: Fixed comparison of `0s` vs `0ms`. Thanks, @hlcianfagna.
Expand Down
45 changes: 33 additions & 12 deletions cratedb_toolkit/cfr/systable.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import typing as t
from pathlib import Path

import orjsonl
import pandas as pd
from sqlalchemy_cratedb import insert_bulk
from tqdm.contrib.logging import logging_redirect_tqdm

if t.TYPE_CHECKING:
import polars as pl

Expand Down Expand Up @@ -154,11 +159,16 @@
timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
path = self.path / self.info.cluster_name / timestamp / "sys"
logger.info(f"Exporting system tables to: {path}")
system_tables = self.inspector.table_names()
path_schema = path / ExportSettings.SCHEMA_PATH
path_data = path / ExportSettings.DATA_PATH
path_schema.mkdir(parents=True, exist_ok=True)
path_data.mkdir(parents=True, exist_ok=True)
with logging_redirect_tqdm():
self._save(path_schema, path_data)
return path

def _save(self, path_schema: Path, path_data: Path) -> None:
system_tables = self.inspector.table_names()
table_count = 0
for tablename in tqdm(system_tables, disable=None):
logger.debug(f"Exporting table: {tablename}")
Expand All @@ -167,13 +177,13 @@

path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"
tablename_out = self.adapter.quote_relation_name(f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}")
tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}"

# Write schema file.
# Write the schema file.
with open(path_table_schema, "w") as fh_schema:
print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema)

# Write data file.
# Write the data file.
df = self.read_table(tablename=tablename)
if df.is_empty():
continue
Expand All @@ -187,7 +197,6 @@
self.dump_table(frame=df, file=t.cast(t.TextIO, fh_data))

logger.info(f"Successfully exported {table_count} system tables")
return path


class SystemTableImporter:
Expand Down Expand Up @@ -219,9 +228,13 @@

logger.info(f"Importing system tables from: {self.source}")

with logging_redirect_tqdm():
self._load(path_schema, path_data)

def _load(self, path_schema: Path, path_data: Path):
table_count = 0
for tablename in tqdm(self.table_names()):
tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename
tablename_restored = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}"

path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql"
path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}"
Expand All @@ -236,22 +249,30 @@
schema_sql = path_table_schema.read_text()
self.adapter.run_sql(schema_sql)

# Truncate table.
self.adapter.run_sql(f"DELETE FROM {self.adapter.quote_relation_name(tablename_restored)};") # noqa: S608

# Load data.
try:
df: "pl.DataFrame" = self.load_table(path_table_data)
df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append")
df: "pd.DataFrame" = pd.DataFrame.from_records(self.load_table(path_table_data))
df.to_sql(
name=tablename_restored,
con=self.adapter.engine,
index=False,
if_exists="append",
method=insert_bulk,
)
except Exception as ex:
error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}")

logger.info(f"Successfully imported {table_count} system tables")
# df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501

def load_table(self, path: Path) -> "pl.DataFrame":
def load_table(self, path: Path) -> t.List:
import polars as pl

if path.suffix in [".jsonl"]:
return pl.read_ndjson(path)
return orjsonl.load(path)
elif path.suffix in [".parquet", ".pq"]:
return pl.read_parquet(path)
return pl.read_parquet(path).to_pandas().to_dict("records")

Check warning on line 276 in cratedb_toolkit/cfr/systable.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cfr/systable.py#L276

Added line #L276 was not covered by tests
Comment on lines +270 to +276
Copy link

@coderabbitai coderabbitai bot Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Verify parquet loading path is tested.

The load_table method correctly switches from returning a DataFrame to returning a list of records, using orjsonl.load for JSONL files and pandas records for parquet files.

The static analysis indicates line 276 (parquet loading path) lacks test coverage. Consider adding tests for the parquet loading functionality:


🏁 Script executed:

#!/bin/bash
# Description: Check test coverage for parquet loading in SystemTableImporter
# Expected: Find tests that exercise the parquet loading path

# Search for tests related to parquet loading in SystemTableImporter
rg -A 10 -B 5 "parquet.*SystemTableImporter|SystemTableImporter.*parquet" tests/

Length of output: 82


Add tests for the parquet-loading path in load_table

The load_table method correctly handles both JSONL and Parquet files, but we have no evidence of tests covering the Parquet branch. Please add unit tests that:

  • Exercise both .parquet and .pq suffixes.
  • Use Polars (or another library) to write a small in-memory DataFrame to a temporary Parquet file.
  • Call load_table(path) and assert the returned list of records matches the original data.
  • Live under your existing test hierarchy (e.g. tests/), perhaps as tests/test_systable_parquet.py.

This will ensure future changes to the Parquet-loading logic are caught by CI.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 276-276: cratedb_toolkit/cfr/systable.py#L276
Added line #L276 was not covered by tests

🤖 Prompt for AI Agents
In cratedb_toolkit/cfr/systable.py around lines 270 to 276, the load_table
method handles Parquet files but lacks test coverage for this branch. To fix
this, create a new test file under the tests/ directory, for example
tests/test_systable_parquet.py, and write unit tests that generate small
in-memory DataFrames using Polars or a similar library, save them to temporary
files with .parquet and .pq suffixes, then call load_table with these paths and
assert that the returned list of records matches the original DataFrame data.
This will ensure the Parquet-loading logic is properly tested and covered by CI.

Copy link
Member Author

@amotl amotl Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai: You are right, this code path is not exercised using software tests just yet. However, we will not dedicate cycles to that within this iteration.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amotl Understood! I acknowledge your decision to defer the test coverage for the parquet loading path to a future iteration. This aligns with your project's approach of focusing on core functionality first.

Would you like me to create a GitHub issue to track adding test coverage for the parquet loading path in SystemTableImporter.load_table() for future work? This would help ensure it doesn't get lost and can be addressed when you have cycles to dedicate to more comprehensive testing.

  (\   /)
 ( ._. )
o_(")(")  *understanding nod*

🧠 Learnings used
Learnt from: amotl
PR: crate/cratedb-toolkit#81
File: cratedb_toolkit/api/main.py:7-7
Timestamp: 2025-04-27T02:46:46.795Z
Learning: Test coverage for the `@lru_cache` decorator on the `get_client_bundle` method in `cratedb_toolkit/api/main.py` is being tracked in issue GH-423 for a future iteration and should not be raised during PR reviews.

Learnt from: amotl
PR: crate/cratedb-toolkit#421
File: cratedb_toolkit/settings/compare.py:35-45
Timestamp: 2025-04-25T18:40:39.517Z
Learning: For cratedb-toolkit, amotl prefers starting with minimal "run-to-completion" tests that verify basic functionality before adding more thorough test cases.

Learnt from: amotl
PR: crate/cratedb-toolkit#421
File: cratedb_toolkit/settings/compare.py:412-447
Timestamp: 2025-04-25T18:36:10.508Z
Learning: The cratedb-toolkit project prefers blackbox "run-to-completion" testing for CLI tools rather than detailed unit tests for individual utility functions. Integration tests that verify the end-to-end functionality are prioritized over testing individual components in isolation.

Learnt from: amotl
PR: crate/cratedb-toolkit#400
File: cratedb_toolkit/docs/settings.py:0-0
Timestamp: 2025-04-13T14:43:29.858Z
Learning: For the cratedb-toolkit project, functional validation through CLI commands is preferred as the initial testing approach, with more granular unit tests being deferred to subsequent iterations when necessary.

Learnt from: amotl
PR: crate/cratedb-toolkit#421
File: tests/docs/test_functions.py:9-24
Timestamp: 2025-04-25T17:59:17.303Z
Learning: In the cratedb-toolkit project, explicit and separate test cases are preferred over parameterized tests when testing similar functionality with different outputs to maintain code clarity.

Learnt from: amotl
PR: crate/cratedb-toolkit#421
File: cratedb_toolkit/settings/compare.py:129-149
Timestamp: 2025-04-25T18:40:26.508Z
Learning: In the cratedb-toolkit project, comprehensive unit tests for utility functions like `normalize_value` in the settings comparison module are intentionally skipped as per the project's testing strategy.

Learnt from: amotl
PR: crate/cratedb-toolkit#421
File: cratedb_toolkit/settings/compare.py:152-183
Timestamp: 2025-04-25T18:39:11.724Z
Learning: The project is currently favoring minimal "run-to-completion" tests over comprehensive unit tests, especially for new functionality like the settings comparison utility.

Learnt from: amotl
PR: crate/cratedb-toolkit#81
File: cratedb_toolkit/cfr/marimo.py:53-54
Timestamp: 2025-05-02T11:43:42.639Z
Learning: The submodules within `cratedb_toolkit/io` employ a different naming scheme than other parts of the codebase and retain their own conventions (e.g., may continue using `cratedb_sqlalchemy_url` instead of `sqlalchemy_url`).

Learnt from: amotl
PR: crate/cratedb-toolkit#81
File: cratedb_toolkit/api/main.py:0-0
Timestamp: 2025-04-27T17:47:50.261Z
Learning: When suggesting improvements or refactorings that aren't directly related to the current PR, create a GitHub issue instead of suggesting a direct code change. This applies especially to improvements related to error handling, performance optimizations, or restructuring that aren't critical to the current PR.

Learnt from: amotl
PR: crate/cratedb-toolkit#81
File: cratedb_toolkit/api/main.py:0-0
Timestamp: 2025-04-27T17:47:50.261Z
Learning: When suggesting improvements or refactorings that aren't directly related to the current PR, create a GitHub issue instead of suggesting a direct code change. This applies especially to improvements related to error handling, performance optimizations, or restructuring that aren't critical to the current PR.

else:
raise NotImplementedError(f"Input format not implemented: {path.suffix}")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ dependencies = [
"python-slugify<9",
"pyyaml<7",
"requests>=2.28,<3",
"sqlalchemy-cratedb>=0.41",
"sqlalchemy-cratedb>=0.41.0",
"sqlparse<0.6",
"tqdm<5",
"typing-extensions<5; python_version<='3.7'",
Expand Down
32 changes: 20 additions & 12 deletions tests/cfr/test_systable.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def test_cfr_sys_export_success(cratedb, tmp_path, caplog):
"""

# Invoke command.
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)})
runner = CliRunner(
env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}, mix_stderr=False
)
result = runner.invoke(
cli,
args="--debug sys-export",
Expand All @@ -47,8 +49,8 @@ def test_cfr_sys_export_success(cratedb, tmp_path, caplog):
assert "Exporting system tables to" in caplog.text
assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing"

# Verify outcome.
path = Path(json.loads(result.output)["path"])
# Verify the outcome.
path = Path(json.loads(result.stdout)["path"])
assert filenames(path) == ["data", "schema"]

schema_files = filenames(path / "schema")
Expand All @@ -66,7 +68,9 @@ def test_cfr_sys_export_to_archive_file(cratedb, tmp_path, caplog):
target = os.path.join(tmp_path, "cluster-data.tgz")

# Invoke command.
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)})
runner = CliRunner(
env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}, mix_stderr=False
)
result = runner.invoke(
cli,
args=f"--debug sys-export {target}",
Expand All @@ -78,8 +82,8 @@ def test_cfr_sys_export_to_archive_file(cratedb, tmp_path, caplog):
assert "Exporting system tables to" in caplog.text
assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing"

# Verify outcome.
path = Path(json.loads(result.output)["path"])
# Verify the outcome.
path = Path(json.loads(result.stdout)["path"])
assert "cluster-data.tgz" in path.name

data_files = []
Expand All @@ -102,7 +106,7 @@ def test_cfr_sys_export_failure(cratedb, tmp_path, caplog):
"""

# Invoke command.
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_TARGET": str(tmp_path)})
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_TARGET": str(tmp_path)}, mix_stderr=False)
result = runner.invoke(
cli,
args="--debug sys-export",
Expand All @@ -116,15 +120,17 @@ def test_cfr_sys_export_failure(cratedb, tmp_path, caplog):


def test_cfr_sys_export_ensure_table_name_is_quoted(cratedb, tmp_path, caplog):
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)})
runner = CliRunner(
env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}, mix_stderr=False
)
result = runner.invoke(
cli,
args="--debug sys-export",
catch_exceptions=False,
)
assert result.exit_code == 0

path = Path(json.loads(result.output)["path"])
path = Path(json.loads(result.stdout)["path"])
sys_cluster_table_schema = path / "schema" / "sys-cluster.sql"
with open(sys_cluster_table_schema, "r") as f:
content = f.read()
Expand Down Expand Up @@ -172,7 +178,9 @@ def test_cfr_sys_import_success(cratedb, tmp_path, caplog):
shutil.copy(sys_operations_data, data_path)

# Invoke command.
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_SOURCE": str(tmp_path)})
runner = CliRunner(
env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_SOURCE": str(tmp_path)}, mix_stderr=False
)
result = runner.invoke(
cli,
args="--debug sys-import",
Expand All @@ -184,7 +192,7 @@ def test_cfr_sys_import_success(cratedb, tmp_path, caplog):
assert "Importing system tables from" in caplog.text
assert re.search(r"Successfully imported \d+ system tables", caplog.text), "Log message missing"

# Verify outcome.
# Verify the outcome.
results = cratedb.database.run_sql("SHOW TABLES", records=True)
assert {"table_name": "sys-operations"} in results

Expand All @@ -198,7 +206,7 @@ def test_cfr_sys_import_failure(cratedb, tmp_path, caplog):
"""

# Invoke command.
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_SOURCE": str(tmp_path)})
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_SOURCE": str(tmp_path)}, mix_stderr=False)
result = runner.invoke(
cli,
args="--debug sys-import",
Expand Down