diff --git a/CHANGES.md b/CHANGES.md index af980945..0eecc36f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,12 @@ ## Unreleased - I/O: Updated to `ingestr>=0.13.61` +- CFR: Improved log output +- CFR: Fixed double quoting of table name. Thanks, @karynzv. +- CFR: When importing, started using `replace` strategy instead of `append` +- CFR: Improved importing data re. type mapping without NumPy +- CFR: Truncated target table before importing, using `append` + strategy again, because `replace` doesn't do the right DDL. ## 2025/07/01 v0.0.37 - Settings: Fixed comparison of `0s` vs `0ms`. Thanks, @hlcianfagna. diff --git a/cratedb_toolkit/cfr/systable.py b/cratedb_toolkit/cfr/systable.py index c79b0557..a5327536 100644 --- a/cratedb_toolkit/cfr/systable.py +++ b/cratedb_toolkit/cfr/systable.py @@ -23,6 +23,11 @@ import typing as t from pathlib import Path +import orjsonl +import pandas as pd +from sqlalchemy_cratedb import insert_bulk +from tqdm.contrib.logging import logging_redirect_tqdm + if t.TYPE_CHECKING: import polars as pl @@ -154,11 +159,16 @@ def save(self) -> Path: timestamp = dt.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") path = self.path / self.info.cluster_name / timestamp / "sys" logger.info(f"Exporting system tables to: {path}") - system_tables = self.inspector.table_names() path_schema = path / ExportSettings.SCHEMA_PATH path_data = path / ExportSettings.DATA_PATH path_schema.mkdir(parents=True, exist_ok=True) path_data.mkdir(parents=True, exist_ok=True) + with logging_redirect_tqdm(): + self._save(path_schema, path_data) + return path + + def _save(self, path_schema: Path, path_data: Path) -> None: + system_tables = self.inspector.table_names() table_count = 0 for tablename in tqdm(system_tables, disable=None): logger.debug(f"Exporting table: {tablename}") @@ -167,13 +177,13 @@ def save(self) -> Path: path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" - tablename_out = self.adapter.quote_relation_name(f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}") + tablename_out = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}" - # Write schema file. + # Write the schema file. with open(path_table_schema, "w") as fh_schema: print(self.inspector.ddl(tablename_in=tablename, tablename_out=tablename_out), file=fh_schema) - # Write data file. + # Write the data file. df = self.read_table(tablename=tablename) if df.is_empty(): continue @@ -187,7 +197,6 @@ def save(self) -> Path: self.dump_table(frame=df, file=t.cast(t.TextIO, fh_data)) logger.info(f"Successfully exported {table_count} system tables") - return path class SystemTableImporter: @@ -219,9 +228,13 @@ def load(self): logger.info(f"Importing system tables from: {self.source}") + with logging_redirect_tqdm(): + self._load(path_schema, path_data) + + def _load(self, path_schema: Path, path_data: Path): table_count = 0 for tablename in tqdm(self.table_names()): - tablename_restored = ExportSettings.TABLE_FILENAME_PREFIX + tablename + tablename_restored = f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}" path_table_schema = path_schema / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.sql" path_table_data = path_data / f"{ExportSettings.TABLE_FILENAME_PREFIX}{tablename}.{self.data_format}" @@ -236,22 +249,30 @@ def load(self): schema_sql = path_table_schema.read_text() self.adapter.run_sql(schema_sql) + # Truncate table. + self.adapter.run_sql(f"DELETE FROM {self.adapter.quote_relation_name(tablename_restored)};") # noqa: S608 + # Load data. try: - df: "pl.DataFrame" = self.load_table(path_table_data) - df.write_database(table_name=tablename_restored, connection=self.dburi, if_table_exists="append") + df: "pd.DataFrame" = pd.DataFrame.from_records(self.load_table(path_table_data)) + df.to_sql( + name=tablename_restored, + con=self.adapter.engine, + index=False, + if_exists="append", + method=insert_bulk, + ) except Exception as ex: error_logger(self.debug)(f"Importing table failed: {tablename}. Reason: {ex}") logger.info(f"Successfully imported {table_count} system tables") - # df.to_pandas().to_sql(name=tablename, con=self.adapter.engine, if_exists="append", index=False) # noqa: ERA001, E501 - def load_table(self, path: Path) -> "pl.DataFrame": + def load_table(self, path: Path) -> t.List: import polars as pl if path.suffix in [".jsonl"]: - return pl.read_ndjson(path) + return orjsonl.load(path) elif path.suffix in [".parquet", ".pq"]: - return pl.read_parquet(path) + return pl.read_parquet(path).to_pandas().to_dict("records") else: raise NotImplementedError(f"Input format not implemented: {path.suffix}") diff --git a/pyproject.toml b/pyproject.toml index 434ada2c..07b4c42f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,7 +110,7 @@ dependencies = [ "python-slugify<9", "pyyaml<7", "requests>=2.28,<3", - "sqlalchemy-cratedb>=0.41", + "sqlalchemy-cratedb>=0.41.0", "sqlparse<0.6", "tqdm<5", "typing-extensions<5; python_version<='3.7'", diff --git a/tests/cfr/test_systable.py b/tests/cfr/test_systable.py index 7043c172..ad6144af 100644 --- a/tests/cfr/test_systable.py +++ b/tests/cfr/test_systable.py @@ -35,7 +35,9 @@ def test_cfr_sys_export_success(cratedb, tmp_path, caplog): """ # Invoke command. - runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}) + runner = CliRunner( + env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}, mix_stderr=False + ) result = runner.invoke( cli, args="--debug sys-export", @@ -47,8 +49,8 @@ def test_cfr_sys_export_success(cratedb, tmp_path, caplog): assert "Exporting system tables to" in caplog.text assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing" - # Verify outcome. - path = Path(json.loads(result.output)["path"]) + # Verify the outcome. + path = Path(json.loads(result.stdout)["path"]) assert filenames(path) == ["data", "schema"] schema_files = filenames(path / "schema") @@ -66,7 +68,9 @@ def test_cfr_sys_export_to_archive_file(cratedb, tmp_path, caplog): target = os.path.join(tmp_path, "cluster-data.tgz") # Invoke command. - runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}) + runner = CliRunner( + env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}, mix_stderr=False + ) result = runner.invoke( cli, args=f"--debug sys-export {target}", @@ -78,8 +82,8 @@ def test_cfr_sys_export_to_archive_file(cratedb, tmp_path, caplog): assert "Exporting system tables to" in caplog.text assert re.search(r"Successfully exported \d+ system tables", caplog.text), "Log message missing" - # Verify outcome. - path = Path(json.loads(result.output)["path"]) + # Verify the outcome. + path = Path(json.loads(result.stdout)["path"]) assert "cluster-data.tgz" in path.name data_files = [] @@ -102,7 +106,7 @@ def test_cfr_sys_export_failure(cratedb, tmp_path, caplog): """ # Invoke command. - runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_TARGET": str(tmp_path)}) + runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_TARGET": str(tmp_path)}, mix_stderr=False) result = runner.invoke( cli, args="--debug sys-export", @@ -116,7 +120,9 @@ def test_cfr_sys_export_failure(cratedb, tmp_path, caplog): def test_cfr_sys_export_ensure_table_name_is_quoted(cratedb, tmp_path, caplog): - runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}) + runner = CliRunner( + env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_TARGET": str(tmp_path)}, mix_stderr=False + ) result = runner.invoke( cli, args="--debug sys-export", @@ -124,7 +130,7 @@ def test_cfr_sys_export_ensure_table_name_is_quoted(cratedb, tmp_path, caplog): ) assert result.exit_code == 0 - path = Path(json.loads(result.output)["path"]) + path = Path(json.loads(result.stdout)["path"]) sys_cluster_table_schema = path / "schema" / "sys-cluster.sql" with open(sys_cluster_table_schema, "r") as f: content = f.read() @@ -172,7 +178,9 @@ def test_cfr_sys_import_success(cratedb, tmp_path, caplog): shutil.copy(sys_operations_data, data_path) # Invoke command. - runner = CliRunner(env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_SOURCE": str(tmp_path)}) + runner = CliRunner( + env={"CRATEDB_CLUSTER_URL": cratedb.database.dburi, "CFR_SOURCE": str(tmp_path)}, mix_stderr=False + ) result = runner.invoke( cli, args="--debug sys-import", @@ -184,7 +192,7 @@ def test_cfr_sys_import_success(cratedb, tmp_path, caplog): assert "Importing system tables from" in caplog.text assert re.search(r"Successfully imported \d+ system tables", caplog.text), "Log message missing" - # Verify outcome. + # Verify the outcome. results = cratedb.database.run_sql("SHOW TABLES", records=True) assert {"table_name": "sys-operations"} in results @@ -198,7 +206,7 @@ def test_cfr_sys_import_failure(cratedb, tmp_path, caplog): """ # Invoke command. - runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_SOURCE": str(tmp_path)}) + runner = CliRunner(env={"CRATEDB_CLUSTER_URL": "crate://foo.bar/", "CFR_SOURCE": str(tmp_path)}, mix_stderr=False) result = runner.invoke( cli, args="--debug sys-import",