From 7a17bc6800dfa36cd3b3a7bce837e212c85aba15 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Fri, 1 Jul 2022 22:12:05 +0200 Subject: [PATCH 01/29] :construction: WIP Start porting dump command --- rethinkdb/cli/_dump.py | 229 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) diff --git a/rethinkdb/cli/_dump.py b/rethinkdb/cli/_dump.py index 0563b9e..557b3cb 100644 --- a/rethinkdb/cli/_dump.py +++ b/rethinkdb/cli/_dump.py @@ -22,6 +22,148 @@ """ import click +import datetime +import os +import platform +import shutil +import sys +import tarfile +import tempfile +import time +import traceback + +from rethinkdb import _export, utils_common +from rethinkdb.logger import default_logger + + +usage = ( + "rethinkdb dump [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert FILENAME] [-f FILE] " + "[--clients NUM] [-e (DB | DB.TABLE)]..." +) +help_epilog = """ +EXAMPLES: +rethinkdb dump -c mnemosyne:39500 + Archive all data from a cluster running on host 'mnemosyne' with a client port at 39500. + +rethinkdb dump -e test -f rdb_dump.tar.gz + Archive only the 'test' database from a local cluster into a named file. + +rethinkdb dump -c hades -e test.subscribers -p + Archive a specific table from a cluster running on host 'hades' which requires a password.""" + + +def parse_options(argv, prog=None): + parser = utils_common.CommonOptionsParser( + usage=usage, epilog=help_epilog, prog=prog + ) + + parser.add_option( + "-f", + "--file", + dest="out_file", + metavar="FILE", + default=None, + help="file to write archive to (defaults to rethinkdb_dump_DATE_TIME.tar.gz);\nif FILE is -, use standard " + "output (note that intermediate files will still be written to the --temp-dir directory)", + ) + parser.add_option( + "-e", + "--export", + dest="db_tables", + metavar="DB|DB.TABLE", + default=[], + type="db_table", + help="limit dump to the given database or table (may be specified multiple times)", + action="append", + ) + + parser.add_option( + "--temp-dir", + dest="temp_dir", + metavar="directory", + default=None, + help="the directory to use for intermediary results", + ) + parser.add_option( + "--overwrite-file", + dest="overwrite", + default=False, + help="overwrite -f/--file if it exists", + action="store_true", + ) + parser.add_option( + "--clients", + dest="clients", + metavar="NUM", + default=3, + help="number of tables to export simultaneously (default: 3)", + type="pos_int", + ) + parser.add_option( + "--read-outdated", + dest="outdated", + default=False, + help="use outdated read mode", + action="store_true", + ) + + options, args = parser.parse_args(argv) + + # Check validity of arguments + if len(args) != 0: + raise parser.error( + f"No positional arguments supported. Unrecognized option(s): {args}" + ) + + # Add dump name + if platform.system() == "Windows" or platform.system().lower().startswith("cygwin"): + options.dump_name = "rethinkdb_dump_%s" % datetime.datetime.today().strftime( + "%Y-%m-%dT%H-%M-%S" + ) # no colons in name + else: + options.dump_name = f'rethinkdb_dump_{datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%S")}' + + # Verify valid output file + if options.out_file == "-": + options.out_file = sys.stdout + options.quiet = True + elif options.out_file is None: + options.out_file = os.path.realpath(f"{options.dump_name}.tar.gz") + else: + options.out_file = os.path.realpath(options.out_file) + + if options.out_file is not sys.stdout: + if os.path.exists(options.out_file) and not options.overwrite: + parser.error(f"Output file already exists: {options.out_file}") + if os.path.exists(options.out_file) and not os.path.isfile(options.out_file): + parser.error( + f"There is a non-file at the -f/--file location: {options.out_file}" + ) + + # Verify valid client count + if options.clients < 1: + raise RuntimeError( + f"Error: invalid number of clients ({options.clients}), must be greater than zero" + ) + + # Make sure the temporary directory exists and is accessible + if options.temp_dir is not None: + if not os.path.exists(options.temp_dir): + try: + os.makedirs(options.temp_dir) + except OSError: + parser.error( + "Could not create temporary directory: %s" % options.temp_dir + ) + if not os.path.isdir(options.temp_dir): + parser.error( + f"Temporary directory doesn't exist or is not a directory: {options.temp_dir}" + ) + if not os.access(options.temp_dir, os.W_OK): + parser.error(f"Temporary directory inaccessible: {options.temp_dir}") + + return options + @click.command def cmd_dump(): @@ -29,3 +171,90 @@ def cmd_dump(): Dump creates an archive of data from a RethinkDB cluster. """ click.echo("dump command") + argv = None + prog = None + + options = parse_options(argv or sys.argv[1:], prog=prog) + try: + if not options.quiet: + # Print a warning about the capabilities of dump, so no one is confused (hopefully) + print( + """\ + NOTE: 'rethinkdb-dump' saves data, secondary indexes, and write hooks, but does *not* save + cluster metadata. You will need to recreate your cluster setup yourself after + you run 'rethinkdb-restore'.""" + ) + + try: + start_time = time.time() + archive = None + + # -- _export options - need to be kep in-sync with _export + + options.directory = os.path.realpath(tempfile.mkdtemp(dir=options.temp_dir)) + options.fields = None + options.delimiter = None + options.format = "json" + + # -- export to a directory + + if not options.quiet: + print(" Exporting to temporary directory...") + + try: + _export.run(options) + except Exception as exc: + default_logger.exception(exc) + + if options.debug: + sys.stderr.write(f"\n{traceback.format_exc()}\n") + + raise Exception(f"Error: export failed, {exc}") + + # -- zip directory + + if not options.quiet: + print(" Zipping export directory...") + + try: + if hasattr(options.out_file, "read"): + archive = tarfile.open(fileobj=options.out_file, mode="w:gz") + else: + archive = tarfile.open(name=options.out_file, mode="w:gz") + for curr, _, files in os.walk(os.path.realpath(options.directory)): + for data_file in files: + full_path = os.path.join(options.directory, curr, data_file) + archive_path = os.path.join( + options.dump_name, + os.path.relpath(full_path, options.directory), + ) + archive.add(full_path, arcname=archive_path) + os.unlink(full_path) + finally: + if archive: + archive.close() + + # -- + + if not options.quiet: + print( + "Done (%.2f seconds): %s" + % ( + time.time() - start_time, + options.out_file.name + if hasattr(options.out_file, "name") + else options.out_file, + ) + ) + except KeyboardInterrupt: + time.sleep(0.2) + raise RuntimeError("Interrupted") + finally: + if os.path.exists(options.directory): + shutil.rmtree(options.directory) + + except Exception as ex: + if options.debug: + traceback.print_exc() + print(ex, file=sys.stderr) + return 1 From 5fd2acb460b1f64b06065534e357820defe36ecd Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Wed, 13 Jul 2022 22:58:53 +0200 Subject: [PATCH 02/29] :construction: Copying the cli export from the old driver --- rethinkdb/cli/_export.py | 638 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 638 insertions(+) diff --git a/rethinkdb/cli/_export.py b/rethinkdb/cli/_export.py index 3002489..7b0d890 100644 --- a/rethinkdb/cli/_export.py +++ b/rethinkdb/cli/_export.py @@ -21,7 +21,630 @@ Export exports data from a RethinkDB cluster into a directory. """ import click +import csv +import ctypes +import datetime +import json +import multiprocessing +import numbers +import optparse +import os +import platform +import signal +import sys +import tempfile +import time +import traceback +from multiprocessing.queues import SimpleQueue + +from rethinkdb import errors, query, utils_common +from rethinkdb.logger import default_logger + +# STATIC INFORMATION ABOUT THE EXOIRT FEATURE +usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR] + [-e (DB | DB.TABLE)]... + [--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] + [--clients NUM]""" +help_description = ( + "`rethinkdb export` exports data from a RethinkDB cluster into a directory" +) +help_epilog = """ +EXAMPLES: +rethinkdb export -c mnemosyne:39500 + Export all data from a cluster running on host 'mnemosyne' with a client port at 39500. + +rethinkdb export -e test -d rdb_export + Export only the 'test' database on a local cluster into a named directory. + +rethinkdb export -c hades -e test.subscribers -p + Export a specific table from a cluster running on host 'hades' which requires a password. + +rethinkdb export --format csv -e test.history --fields time,message --delimiter ';' + Export a specific table from a local cluster in CSV format with the fields 'time' and 'message', + using a semicolon as field delimiter (rather than a comma). + +rethinkdb export --fields id,value -e test.data + Export a specific table from a local cluster in JSON format with only the fields 'id' and 'value'. +""" + + +def parse_options(argv, prog=None): + if platform.system() == "Windows" or platform.system().lower().startswith("cygwin"): + # no colons in name + default_dir = f'''rethinkdb_export_{datetime.datetime.today().strftime( + "%Y-%m-%dT%H-%M-%S")} + ''' + else: + default_dir = f'''rethinkdb_export_{datetime.datetime.today().strftime( + "%Y-%m-%dT%H:%M:%S")} + ''' + + parser = utils_common.CommonOptionsParser( + usage=usage, description=help_description, epilog=help_epilog, prog=prog + ) + + parser.add_option( + "-d", + "--directory", + dest="directory", + metavar="DIRECTORY", + default=default_dir, + help="directory to output to (default: rethinkdb_export_DATE_TIME)", + type="new_file", + ) + parser.add_option( + "-e", + "--export", + dest="db_tables", + metavar="DB|DB.TABLE", + default=[], + help="limit dump to the given database or table (may be specified multiple times)", + action="append", + type="db_table", + ) + parser.add_option( + "--fields", + dest="fields", + metavar=",...", + default=None, + help="export only specified fields (required for CSV format)", + ) + parser.add_option( + "--format", + dest="format", + metavar="json|csv|ndjson", + default="json", + help="format to write (defaults to json. ndjson is newline delimited json.)", + type="choice", + choices=["json", "csv", "ndjson"], + ) + parser.add_option( + "--clients", + dest="clients", + metavar="NUM", + default=3, + help="number of tables to export simultaneously (default: 3)", + type="pos_int", + ) + parser.add_option( + "--read-outdated", + dest="outdated", + default=False, + help="use outdated read mode", + action="store_true", + ) + + csvGroup = optparse.OptionGroup(parser, "CSV options") + csvGroup.add_option( + "--delimiter", + dest="delimiter", + metavar="CHARACTER", + default=None, + help="character to be used as field delimiter, or '\\t' for tab (default: ',')", + ) + parser.add_option_group(csvGroup) + + options, args = parser.parse_args(argv) + + # -- Check validity of arguments + + if len(args) != 0: + parser.error( + f"No positional arguments supported. Unrecognized option(s): {args}" + ) + + if options.fields: + if len(options.db_tables) != 1 or options.db_tables[0].table is None: + parser.error( + "The --fields option can only be used when exporting a single table" + ) + options.fields = options.fields.split(",") + + # - format specific validation + + if options.format == "csv": + if options.fields is None: + parser.error("CSV files require the '--fields' option to be specified.") + + if options.delimiter is None: + options.delimiter = "," + elif options.delimiter == "\\t": + options.delimiter = "\t" + elif len(options.delimiter) != 1: + parser.error( + f"Specify exactly one character for the --delimiter option: {options.delimiter}" + ) + else: + if options.delimiter: + parser.error("--delimiter option is only valid for CSV file formats") + + return options + + +def json_writer(filename, fields, task_queue, error_queue, format): + try: + with open(filename, "w") as out: + first = True + if format != "ndjson": + out.write("[") + item = task_queue.get() + while not isinstance(item, StopIteration): + row = item[0] + if fields is not None: + for item in list(row.keys()): + if item not in fields: + del row[item] + if first: + if format == "ndjson": + out.write(json.dumps(row)) + else: + out.write("\n" + json.dumps(row)) + first = False + elif format == "ndjson": + out.write("\n" + json.dumps(row)) + else: + out.write(",\n" + json.dumps(row)) + + item = task_queue.get() + if format != "ndjson": + out.write("\n]\n") + except BaseException: + ex_type, ex_class, tb = sys.exc_info() + error_queue.put((ex_type, ex_class, traceback.extract_tb(tb))) + + # Read until the exit task so the readers do not hang on pushing onto the queue + while not isinstance(task_queue.get(), StopIteration): + pass + + +def csv_writer(filename, fields, delimiter, task_queue, error_queue): + try: + with open(filename, "w") as out: + out_writer = csv.writer(out, delimiter=delimiter) + out_writer.writerow(fields) + + item = task_queue.get() + while not isinstance(item, StopIteration): + row = item[0] + info = [] + # If the data is a simple type, just write it directly, otherwise, write it as json + for field in fields: + if field not in row: + info.append(None) + elif isinstance(row[field], numbers.Number): + info.append(str(row[field])) + elif isinstance(row[field], str): + info.append(row[field]) + elif isinstance(row[field], unicode): + info.append(row[field].encode("utf-8")) + else: + if str == unicode: + info.append(json.dumps(row[field])) + else: + info.append(json.dumps(row[field]).encode("utf-8")) + out_writer.writerow(info) + item = task_queue.get() + except BaseException: + ex_type, ex_class, tb = sys.exc_info() + error_queue.put((ex_type, ex_class, traceback.extract_tb(tb))) + + # Read until the exit task so the readers do not hang on pushing onto the queue + while not isinstance(task_queue.get(), StopIteration): + pass + + +def export_table( + db, + table, + directory, + options, + error_queue, + progress_info, + sindex_counter, + hook_counter, + exit_event, +): + signal.signal( + signal.SIGINT, signal.SIG_DFL + ) # prevent signal handlers from being set in child processes + + writer = None + + has_write_hooks = utils_common.check_minimum_version(options, "2.3.7", False) + + try: + # -- get table info + + table_info = options.retryQuery( + "table info: %s.%s" % (db, table), query.db(db).table(table).info() + ) + + # Rather than just the index names, store all index information + table_info["indexes"] = options.retryQuery( + f"table index data {db}.{table}", + query.db(db).table(table).index_status(), + run_options={"binary_format": "raw"}, + ) + + sindex_counter.value += len(table_info["indexes"]) + + if has_write_hooks: + table_info["write_hook"] = options.retryQuery( + f"table write hook data {db}.{table}", + query.db(db).table(table).get_write_hook(), + run_options={"binary_format": "raw"}, + ) + + if table_info["write_hook"] is not None: + hook_counter.value += 1 + + with open(os.path.join(directory, db, table + ".info"), "w") as info_file: + info_file.write(json.dumps(table_info) + "\n") + with sindex_counter.get_lock(): + sindex_counter.value += len(table_info["indexes"]) + # -- start the writer + if six.PY3: + ctx = multiprocessing.get_context(multiprocessing.get_start_method()) + task_queue = SimpleQueue(ctx=ctx) + else: + task_queue = SimpleQueue() + + writer = None + if options.format == "json": + filename = directory + f"/{db}/{table}.json" % (db, table) + writer = multiprocessing.Process( + target=json_writer, + args=( + filename, + options.fields, + task_queue, + error_queue, + options.format, + ), + ) + elif options.format == "csv": + filename = directory + f"/{db}/{table}.csv" % (db, table) + writer = multiprocessing.Process( + target=csv_writer, + args=( + filename, + options.fields, + options.delimiter, + task_queue, + error_queue, + ), + ) + elif options.format == "ndjson": + filename = directory + f"/{db}/{table}.ndjson" % (db, table) + writer = multiprocessing.Process( + target=json_writer, + args=( + filename, + options.fields, + task_queue, + error_queue, + options.format, + ), + ) + else: + raise RuntimeError(f"unknown format type: {options.format}") + writer.start() + + # -- read in the data source + + # - + + lastPrimaryKey = None + read_rows = 0 + run_options = {"time_format": "raw", "binary_format": "raw"} + if options.outdated: + run_options["read_mode"] = "outdated" + cursor = options.retryQuery( + f"inital cursor for {db}.{table}", + query.db(db).table(table).order_by(index=table_info["primary_key"]), + run_options=run_options, + ) + while not exit_event.is_set(): + try: + for row in cursor: + # bail on exit + if exit_event.is_set(): + break + + # add to the output queue + task_queue.put([row]) + lastPrimaryKey = row[table_info["primary_key"]] + read_rows += 1 + + # Update the progress every 20 rows + if read_rows % 20 == 0: + progress_info[0].value = read_rows + + else: + # Export is done - since we used estimates earlier, update the actual table size + progress_info[0].value = read_rows + progress_info[1].value = read_rows + break + + except (errors.ReqlTimeoutError, errors.ReqlDriverError): + # connection problem, re-setup the cursor + try: + cursor.close() + except errors.ReqlError as exc: + default_logger.exception(exc) + + cursor = options.retryQuery( + f"backup cursor for {db}.{table}", + query.db(db) + .table(table) + .between(lastPrimaryKey, query.maxval, left_bound="open") + .order_by(index=table_info["primary_key"]), + run_options=run_options, + ) + + except (errors.ReqlError, errors.ReqlDriverError) as ex: + error_queue.put( + ( + RuntimeError, + RuntimeError(ex.message), + traceback.extract_tb(sys.exc_info()[2]), + ) + ) + except BaseException: + ex_type, ex_class, tb = sys.exc_info() + error_queue.put((ex_type, ex_class, traceback.extract_tb(tb))) + finally: + if writer and writer.is_alive(): + task_queue.put(StopIteration()) + writer.join() + + +def abort_export(signum, frame, exit_event, interrupt_event): + interrupt_event.set() + exit_event.set() + + +# We sum up the row count from all tables for total percentage completion +# This is because table exports can be staggered when there are not enough clients +# to export all of them at once. As a result, the progress bar will not necessarily +# move at the same rate for different tables. + + +def update_progress(progress_info, options): + rows_done = 0 + total_rows = 1 + for current, max_count in progress_info: + curr_val = current.value + max_val = max_count.value + if curr_val < 0: + # There is a table that hasn't finished counting yet, we can't report progress + rows_done = 0 + break + else: + rows_done += curr_val + total_rows += max_val + + if not options.quiet: + utils_common.print_progress(float(rows_done) / total_rows, indent=4) + + +def run_clients(options, workingDir, db_table_set): + # Spawn one client for each db.table, up to options.clients at a time + exit_event = multiprocessing.Event() + processes = [] + if six.PY3: + ctx = multiprocessing.get_context(multiprocessing.get_start_method()) + error_queue = SimpleQueue(ctx=ctx) + else: + error_queue = SimpleQueue() + interrupt_event = multiprocessing.Event() + sindex_counter = multiprocessing.Value(ctypes.c_longlong, 0) + hook_counter = multiprocessing.Value(ctypes.c_longlong, 0) + + signal.signal( + signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event) + ) + errors = [] + + try: + progress_info = [] + arg_lists = [] + for db, table in db_table_set: + + tableSize = int( + options.retryQuery( + "count", + query.db(db).table(table).info()["doc_count_estimates"].sum(), + ) + ) + + progress_info.append( + ( + multiprocessing.Value(ctypes.c_longlong, 0), + multiprocessing.Value(ctypes.c_longlong, tableSize), + ) + ) + arg_lists.append( + ( + db, + table, + workingDir, + options, + error_queue, + progress_info[-1], + sindex_counter, + hook_counter, + exit_event, + ) + ) + + # Wait for all tables to finish + while processes or arg_lists: + time.sleep(0.1) + + while not error_queue.empty(): + exit_event.set() # Stop immediately if an error occurs + errors.append(error_queue.get()) + + processes = [process for process in processes if process.is_alive()] + + if len(processes) < options.clients and len(arg_lists) > 0: + new_process = multiprocessing.Process( + target=export_table, args=arg_lists.pop(0) + ) + new_process.start() + processes.append(new_process) + + update_progress(progress_info, options) + + # If we were successful, make sure 100% progress is reported + # (rows could have been deleted which would result in being done at less than 100%) + if len(errors) == 0 and not interrupt_event.is_set() and not options.quiet: + utils_common.print_progress(1.0, indent=4) + + # Continue past the progress output line and print total rows processed + def plural(num, text, plural_text): + return "%d %s" % (num, text if num == 1 else plural_text) + + if not options.quiet: + print( + f'''\n {plural( + sum([max(0, info[0].value) for info in progress_info]), + "row", + "rows", + )} exported from { + plural(len(db_table_set), "table", "tables") + }, with { + plural( + sindex_counter.value, "secondary index", "secondary indexes" + ) + }, and { + plural(hook_counter.value, "hook function", "hook functions") + } + ''' + ) + finally: + signal.signal(signal.SIGINT, signal.SIG_DFL) + + if interrupt_event.is_set(): + raise RuntimeError("Interrupted") + + if len(errors) != 0: + # multiprocessing queues don't handle tracebacks, so they've already been stringified in the queue + for error in errors: + print(f"{error[1]}", file=sys.stderr) + if options.debug: + print( + f"{error[0].__name__} traceback: {error[2]}", file=sys.stderr + ) + raise RuntimeError("Errors occurred during export") + + +def run(options): + # Make sure this isn't a pre-`reql_admin` cluster - which could result in data loss + # if the user has a database named 'rethinkdb' + utils_common.check_minimum_version(options, "1.6") + + # get the complete list of tables + db_table_set = set() + all_tables = [ + utils_common.DbTable(x["db"], x["name"]) + for x in options.retryQuery( + "list tables", + query.db("rethinkdb").table("table_config").pluck(["db", "name"]), + ) + ] + if not options.db_tables: + db_table_set = all_tables # default to all tables + else: + all_databases = options.retryQuery( + "list dbs", query.db_list().filter(query.row.ne("rethinkdb")) + ) + for db_table in options.db_tables: + db, table = db_table + + if db == "rethinkdb": + raise AssertionError("Can not export tables from the system database") + + if db not in all_databases: + raise RuntimeError(f"Error: Database '{db}' not found") + + if ( + table is None + ): # This is just a db name, implicitly selecting all tables in that db + db_table_set.update(set([x for x in all_tables if x.db == db])) + else: + if utils_common.DbTable(db, table) not in all_tables: + raise RuntimeError(f"Error: Table not found: '{db}.{table}'" % (db, table)) + db_table_set.add(db_table) + + # Determine the actual number of client processes we'll have + options.clients = min(options.clients, len(db_table_set)) + + # create the working directory and its structure + parent_dir = os.path.dirname(options.directory) + if not os.path.exists(parent_dir): + if os.path.isdir(parent_dir): + raise RuntimeError( + f"Output parent directory is not a directory: {parent_dir}" + ) + try: + os.makedirs(parent_dir) + except OSError as e: + raise optparse.OptionValueError( + f"Unable to create parent directory for {parent_dir}: {e.strerror}" + ) + working_dir = tempfile.mkdtemp( + prefix=os.path.basename(options.directory) + "_partial_", + dir=os.path.dirname(options.directory), + ) + try: + for db in set([database for database, _ in db_table_set]): + os.makedirs(os.path.join(working_dir, str(db))) + except OSError as e: + raise RuntimeError( + f"Failed to create temporary directory ({e.filename}): {e.strerror}" + ) + + # Run the export + run_clients(options, working_dir, db_table_set) + + # Move the temporary directory structure over to the original output directory + try: + if os.path.isdir(options.directory): + os.rmdir( + options.directory + ) # an empty directory is created here when using _dump + elif os.path.exists(options.directory): + raise Exception( + f"There was a file at the output location: {options.directory}" + ) + os.rename(working_dir, options.directory) + except OSError as e: + raise RuntimeError( + f"Failed to move temporary directory to output directory ({options.directory}): {e.strerror}" + ) @click.command def cmd_export(): @@ -29,3 +652,18 @@ def cmd_export(): Export data from a RethinkDB cluster into a directory. """ click.echo("export command") + argv = [] + prog = [] + options = parse_options(argv or sys.argv[1:], prog=prog) + + start_time = time.time() + try: + run(options) + except Exception as ex: + if options.debug: + traceback.print_exc() + print(ex, file=sys.stderr) + return 1 + if not options.quiet: + print(f" Done ({time.time() - start_time}:.2f seconds)") + return 0 From c3367dafa461e6a3f2db61c993a26652322c8f53 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Thu, 14 Jul 2022 22:19:55 +0200 Subject: [PATCH 03/29] :construction: Fix some import errors --- rethinkdb/cli/_dump.py | 8 ++++---- rethinkdb/cli/_export.py | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rethinkdb/cli/_dump.py b/rethinkdb/cli/_dump.py index 557b3cb..fb20399 100644 --- a/rethinkdb/cli/_dump.py +++ b/rethinkdb/cli/_dump.py @@ -32,8 +32,8 @@ import time import traceback -from rethinkdb import _export, utils_common -from rethinkdb.logger import default_logger +from rethinkdb import cmd_export, utils_common +# from rethinkdb.logger import default_logger usage = ( @@ -202,9 +202,9 @@ def cmd_dump(): print(" Exporting to temporary directory...") try: - _export.run(options) + cmd_export.run(options) except Exception as exc: - default_logger.exception(exc) + # default_logger.exception(exc) if options.debug: sys.stderr.write(f"\n{traceback.format_exc()}\n") diff --git a/rethinkdb/cli/_export.py b/rethinkdb/cli/_export.py index 7b0d890..359b496 100644 --- a/rethinkdb/cli/_export.py +++ b/rethinkdb/cli/_export.py @@ -39,7 +39,7 @@ from multiprocessing.queues import SimpleQueue from rethinkdb import errors, query, utils_common -from rethinkdb.logger import default_logger +# from rethinkdb.logger import default_logger # STATIC INFORMATION ABOUT THE EXOIRT FEATURE usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR] @@ -392,7 +392,8 @@ def export_table( try: cursor.close() except errors.ReqlError as exc: - default_logger.exception(exc) + # default_logger.exception(exc) + pass cursor = options.retryQuery( f"backup cursor for {db}.{table}", From 6aea50b20818833b788b3cdf63d598ca8b078c11 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Thu, 14 Jul 2022 23:11:11 +0200 Subject: [PATCH 04/29] :construction: Copy import and fix f-strings --- rethinkdb/cli/_export.py | 2 +- rethinkdb/cli/_import.py | 1711 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 1712 insertions(+), 1 deletion(-) diff --git a/rethinkdb/cli/_export.py b/rethinkdb/cli/_export.py index 359b496..481946b 100644 --- a/rethinkdb/cli/_export.py +++ b/rethinkdb/cli/_export.py @@ -597,7 +597,7 @@ def run(options): db_table_set.update(set([x for x in all_tables if x.db == db])) else: if utils_common.DbTable(db, table) not in all_tables: - raise RuntimeError(f"Error: Table not found: '{db}.{table}'" % (db, table)) + raise RuntimeError(f"Error: Table not found: '{db}.{table}'") db_table_set.add(db_table) # Determine the actual number of client processes we'll have diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 582e3fc..2624e15 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -22,6 +22,1698 @@ """ import click +import codecs +import collections +import csv +import ctypes +import json +import multiprocessing +import optparse +import os +import signal +import sys +import time +import traceback +from multiprocessing.queues import Queue, SimpleQueue + +import six + +from rethinkdb import ast, errors, query, utils_common +# from rethinkdb.logger import default_logger + +try: + unicode +except NameError: + unicode = str + +try: + from Queue import Empty, Full +except ImportError: + from queue import Empty, Full + +# json parameters +JSON_READ_CHUNK_SIZE = 128 * 1024 +JSON_MAX_BUFFER_SIZE = 128 * 1024 * 1024 +MAX_NESTING_DEPTH = 100 + +Error = collections.namedtuple("Error", ["message", "traceback", "file"]) + +NEW_LINE = "\n" + +class SourceFile(object): + format = None # set by subclasses + + name = None + + db = None + table = None + primary_key = None + indexes = None + write_hook = None + source_options = None + + start_time = None + end_time = None + + query_runner = None + + _source = None # open filehandle for the source + + # - internal synchronization variables + + _bytes_size = None + _bytes_read = None # -1 until started + + _total_rows = None # -1 until known + _rows_read = None + _rows_written = None + + def __init__( + self, + source, + db, + table, + query_runner, + primary_key=None, + indexes=None, + write_hook=None, + source_options=None, + ): + + if self.format is None: + raise AssertionError( + "{class_name} must have a format".format( + class_name=self.__class__.__name__ + ) + ) + + if self.db == "rethinkdb": + raise AssertionError("Can not import tables into the system database") + + # query_runner + if not isinstance(query_runner, utils_common.RetryQuery): + raise AssertionError("Query runner is not instance of RetryQuery") + + self.query_runner = query_runner + + # reporting information + self._bytes_size = multiprocessing.Value(ctypes.c_longlong, -1) + self._bytes_read = multiprocessing.Value(ctypes.c_longlong, -1) + + self._total_rows = multiprocessing.Value(ctypes.c_longlong, -1) + self._rows_read = multiprocessing.Value(ctypes.c_longlong, 0) + self._rows_written = multiprocessing.Value(ctypes.c_longlong, 0) + + # source + if hasattr(source, "read"): + if unicode != str or "b" in source.mode: + # Python2.x or binary file, assume utf-8 encoding + self._source = codecs.getreader("utf-8")(source) + else: + # assume that it has the right encoding on it + self._source = source + else: + try: + self._source = codecs.open(source, mode="r", encoding="utf-8") + except IOError as exc: + default_logger.exception(exc) + raise ValueError( + f'Unable to open source file "{str(source)}": ' \ + f'{str(exc)}' + ) + + if ( + hasattr(self._source, "name") + and self._source.name + and os.path.isfile(self._source.name) + ): + self._bytes_size.value = os.path.getsize(source) + if self._bytes_size.value == 0: + raise ValueError(f"Source is zero-length: {source}") + + # table info + self.db = db + self.table = table + self.primary_key = primary_key + self.indexes = indexes or [] + self.write_hook = write_hook or [] + + # options + self.source_options = source_options or { + "create_args": {"primary_key": self.primary_key} + } + + # name + if hasattr(self._source, "name") and self._source.name: + self.name = os.path.basename(self._source.name) + else: + self.name = f"{self.db}.{self.table}" + + def __hash__(self): + return hash((self.db, self.table)) + + def get_line(self): + """Returns a single line from the file""" + raise NotImplementedError( + f"This needs to be implemented on the {self.format} subclass" + ) + + # - bytes + @property + def bytes_size(self): + return self._bytes_size.value + + @bytes_size.setter + def bytes_size(self, value): + self._bytes_size.value = value + + @property + def bytes_read(self): + return self._bytes_read.value + + @bytes_read.setter + def bytes_read(self, value): + self._bytes_read.value = value + + # - rows + @property + def total_rows(self): + return self._total_rows.value + + @total_rows.setter + def total_rows(self, value): + self._total_rows.value = value + + @property + def rows_read(self): + return self._rows_read.value + + @rows_read.setter + def rows_read(self, value): + self._rows_read.value = value + + @property + def rows_written(self): + return self._rows_written.value + + def add_rows_written(self, increment): # we have multiple writers to coordinate + with self._rows_written.get_lock(): + self._rows_written.value += increment + + # - percent done + @property + def percent_done(self): + """return a float between 0 and 1 for a reasonable guess of percentage complete""" + # assume that reading takes 50% of the time and writing the other 50% + completed = 0.0 # of 2.0 + + # - add read percentage + if ( + self._bytes_size.value <= 0 + or self._bytes_size.value <= self._bytes_read.value + ): + completed += 1.0 + elif self._bytes_read.value < 0 and self._total_rows.value >= 0: + # done by rows read + if self._rows_read > 0: + completed += float(self._rows_read) / float(self._total_rows.value) + else: + # done by bytes read + if self._bytes_read.value > 0: + completed += float(self._bytes_read.value) / float( + self._bytes_size.value + ) + + # - add written percentage + if self._rows_read.value or self._rows_written.value: + total_rows = float(self._total_rows.value) + if total_rows == 0: + completed += 1.0 + elif total_rows < 0: + # a guesstimate + per_row_size = float(self._bytes_read.value) / float( + self._rows_read.value + ) + total_rows = float(self._rows_read.value) + ( + float(self._bytes_size.value - self._bytes_read.value) + / per_row_size + ) + completed += float(self._rows_written.value) / total_rows + else: + # accurate count + completed += float(self._rows_written.value) / total_rows + + # - return the value + return completed * 0.5 + + def setup_table(self): + """Ensure that the db, table, and indexes exist and are correct""" + + # - ensure the table exists and is ready + self.query_runner( + f"create table: {self.db}.{self.table}", + ast.expr([self.table]) + .set_difference(query.db(self.db).table_list()) + .for_each( + query.db(self.db).table_create( + query.row, + **self.source_options["create_args"] + if "create_args" in self.source_options + else {} + ) + ), + ) + + self.query_runner( + f"wait for {self.db}.{self.table}", + query.db(self.db).table(self.table).wait(timeout=30), + ) + + # - ensure that the primary key on the table is correct + primary_key = self.query_runner( + f"primary key {self.db}.{self.table}", + query.db(self.db).table(self.table).info()["primary_key"], + ) + if self.primary_key is None: + self.primary_key = primary_key + elif primary_key != self.primary_key: + raise RuntimeError( + f"Error: table {self.db}.{self.table} primary key was " \ + f"`{primary_key}` rather than the expected: {self.primary_key}" + ) + + def restore_indexes(self, warning_queue): + # recreate secondary indexes - dropping existing on the assumption they are wrong + if self.indexes: + existing_indexes = self.query_runner( + f"indexes from: {self.db}.{self.table}", + query.db(self.db).table(self.table).index_list(), + ) + try: + created_indexes = [] + for index in self.indexes: + if index["index"] in existing_indexes: # drop existing versions + self.query_runner( + f"drop index: {self.db}.{self.table}:{index['index']}" + query.db(self.db) + .table(self.table) + .index_drop(index["index"]), + ) + self.query_runner( + f"create index: {self.db}.{self.table}:{index['index']}" + query.db(self.db) + .table(self.table) + .index_create(index["index"], index["function"]), + ) + created_indexes.append(index["index"]) + + # wait for all of the created indexes to build + self.query_runner( + f"waiting for indexes on {self.db}.{self.table}", + query.db(self.db) + .table(self.table) + .index_wait(query.args(created_indexes)), + ) + except RuntimeError: + exception_type, exception_class, trcback = sys.exc_info() + warning_queue.put( + ( + exception_type, + exception_class, + traceback.extract_tb(trcback), + self._source.name, + ) + ) + + if self.write_hook: + self.query_runner( + f"Write hook from: {self.db}.{self.table}", + query.db(self.db).table(self.table).get_write_hook(), + ) + try: + self.query_runner( + f"drop hook: {self.db}.{self.table}", + query.db(self.db).table(self.table).set_write_hook(None), + ) + self.query_runner( + f"create hook: {self.db}.{self.table}:{self.write_hook}", + query.db(self.db) + .table(self.table) + .set_write_hook(self.write_hook["function"]), + ) + except RuntimeError: + exception_type, exception_class, trcback = sys.exc_info() + warning_queue.put( + ( + exception_type, + exception_class, + traceback.extract_tb(trcback), + self._source.name, + ) + ) + + def batches(self, batch_size=None, warning_queue=None): + + # setup table + self.setup_table() + + # default batch_size + if batch_size is None: + batch_size = utils_common.default_batch_size + else: + batch_size = int(batch_size) + + if batch_size <= 0: + raise AssertionError("Batch size can not be less than one") + + # setup + self.setup_file(warning_queue=warning_queue) + + # - yield batches + + batch = [] + try: + need_more_data = False + while True: + if need_more_data: + self.fill_buffer() + need_more_data = False + + while len(batch) < batch_size: + try: + row = self.get_line() + # ToDo: validate the line + batch.append(row) + except NeedMoreData: + need_more_data = True + break + else: + yield batch + batch = [] + + except StopIteration as e: + # yield any final batch + if batch: + yield batch + + # - check the end of the file + + self.teardown() + + # - rebuild indexes + if self.indexes: + self.restore_indexes(warning_queue) + + def setup_file(self, warning_queue=None): + raise NotImplementedError("Subclasses need to implement this") + + def teardown(self): + pass + + def read_to_queue( + self, + work_queue, + exit_event, + error_queue, + warning_queue, + timing_queue, + fields=None, + ignore_signals=True, + batch_size=None, + ): + if ( + ignore_signals + ): # ToDo: work out when we are in a worker process automatically + signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these + + if batch_size is None: + batch_size = utils_common.default_batch_size + + self.start_time = time.time() + try: + timePoint = time.time() + for batch in self.batches(warning_queue=warning_queue): + timing_queue.put(("reader_work", time.time() - timePoint)) + timePoint = time.time() + + # apply the fields filter + if fields: + for row in batch: + for key in [x for x in row.keys() if x not in fields]: + del row[key] + + while not exit_event.is_set(): + try: + work_queue.put((self.db, self.table, batch), timeout=0.1) + self._rows_read.value += len(batch) + break + except Full: + pass + else: + break + timing_queue.put(("reader_wait", time.time() - timePoint)) + timePoint = time.time() + + # - report relevant errors + except Exception as exc: + default_logger.exception(exc) + error_queue.put(Error(str(exc), traceback.format_exc(), self.name)) + exit_event.set() + raise + finally: + self.end_time = time.time() + + +class NeedMoreData(Exception): + pass + + +class JsonSourceFile(SourceFile): + format = "json" + + decoder = json.JSONDecoder() + json_array = None + found_first = False + + _buffer_size = JSON_READ_CHUNK_SIZE + _buffer_str = None + _buffer_pos = None + _buffer_end = None + + def fill_buffer(self): + if self._buffer_str is None: + self._buffer_str = "" + self._buffer_pos = 0 + self._buffer_end = 0 + elif self._buffer_pos == 0: + # double the buffer under the assumption that the documents are too large to fit + if self._buffer_size == JSON_MAX_BUFFER_SIZE: + raise Exception( + f"Error: JSON max buffer size exceeded on file " \ + f"{self.name} (from position {self.bytes_processed}). " \ + f"Use '--max-document-size' to extend your buffer." + ) + self._buffer_size = min(self._buffer_size * 2, JSON_MAX_BUFFER_SIZE) + + # add more data + read_target = self._buffer_size - self._buffer_end + self._buffer_pos + + if read_target < 1: + raise AssertionError("Can not set the read target and full the buffer") + + new_chunk = self._source.read(read_target) + + if len(new_chunk) == 0: + raise StopIteration() # file ended + + self._buffer_str = self._buffer_str[self._buffer_pos :] + new_chunk + self._bytes_read.value += len(new_chunk) + + # reset markers + self._buffer_pos = 0 + self._buffer_end = len(self._buffer_str) - 1 + + def get_line(self): + """Return a line from the current _buffer_str, or raise NeedMoreData trying""" + + # advance over any whitespace + self._buffer_pos = json.decoder.WHITESPACE.match( + self._buffer_str, self._buffer_pos + ).end() + if self._buffer_pos >= self._buffer_end: + raise NeedMoreData() + + # read over a comma if we are not the first item in a json_array + if ( + self.json_array + and self.found_first + and self._buffer_str[self._buffer_pos] == "," + ): + self._buffer_pos += 1 + if self._buffer_pos >= self._buffer_end: + raise NeedMoreData() + + # advance over any post-comma whitespace + self._buffer_pos = json.decoder.WHITESPACE.match( + self._buffer_str, self._buffer_pos + ).end() + if self._buffer_pos >= self._buffer_end: + raise NeedMoreData() + + # parse and return an object + try: + row, self._buffer_pos = self.decoder.raw_decode( + self._buffer_str, idx=self._buffer_pos + ) + self.found_first = True + return row + except (ValueError, IndexError): + raise NeedMoreData() + + def setup_file(self, warning_queue=None): + # - move to the first record + + # advance through any leading whitespace + while True: + self.fill_buffer() + self._buffer_pos = json.decoder.WHITESPACE.match(self._buffer_str, 0).end() + if self._buffer_pos == 0: + break + + # check the first character + try: + if self._buffer_str[0] == "[": + self.json_array = True + self._buffer_pos = 1 + elif self._buffer_str[0] == "{": + self.json_array = False + else: + raise ValueError( + "Error: JSON format not recognized - file does not begin with an object or array" + ) + except IndexError: + raise ValueError("Error: JSON file was empty of content") + + def teardown(self): + + # - check the end of the file + # note: fill_buffer should have guaranteed that we have only the data in the end + + # advance through any leading whitespace + self._buffer_pos = json.decoder.WHITESPACE.match( + self._buffer_str, self._buffer_pos + ).end() + + # check the end of the array if we have it + if self.json_array: + if self._buffer_str[self._buffer_pos] != "]": + snippit = self._buffer_str[self._buffer_pos :] + extra = ( + "" + if len(snippit) <= 100 + else f" and {len(snippit) - 100} more characters" + ) + raise ValueError( + f"Error: JSON array did not end cleanly, " \ + f"rather with: <<{snippit[:100]}>>{extra}" + ) + self._buffer_pos += 1 + + # advance through any trailing whitespace + self._buffer_pos = json.decoder.WHITESPACE.match( + self._buffer_str, self._buffer_pos + ).end() + snippit = self._buffer_str[self._buffer_pos :] + if len(snippit) > 0: + extra = ( + "" + if len(snippit) <= 100 + else f" and {len(snippit) - 100} more characters" + ) + raise ValueError( + f"Error: extra data after JSON data: <<{snippit[:100]}>>" \ + f"{extra}" + ) + + +class CsvSourceFile(SourceFile): + format = "csv" + + no_header_row = False + custom_header = None + + _reader = None # instance of csv.reader + _columns = None # name of the columns + + def __init__(self, *args, **kwargs): + if "source_options" in kwargs and isinstance(kwargs["source_options"], dict): + if "no_header_row" in kwargs["source_options"]: + self.no_header_row = kwargs["source_options"]["no_header_row"] + if "custom_header" in kwargs["source_options"]: + self.custom_header = kwargs["source_options"]["custom_header"] + + super(CsvSourceFile, self).__init__(*args, **kwargs) + + def byte_counter(self): + """Generator for getting a byte count on a file being used""" + + for line in self._source: + self._bytes_read.value += len(line) + if unicode != str: + yield line.encode( + "utf-8" + ) # Python2.x csv module does not really handle unicode + else: + yield line + + def setup_file(self, warning_queue=None): + # - setup csv.reader with a byte counter wrapper + + self._reader = csv.reader(self.byte_counter()) + + # - get the header information for column names + + if not self.no_header_row: + self._columns = next(self._reader) + + # field names may override fields from the header + if self.custom_header is not None: + if not self.no_header_row: + warning_queue.put( + f"Ignoring header row on {self.name}: " \ + f"{str(self._columns)}" + ) + self._columns = self.custom_header + elif self.no_header_row: + raise ValueError("Error: No field name information available") + + def get_line(self): + raw_row = next(self._reader) + if len(self._columns) != len(raw_row): + raise Exception( + f"Error: '{self.name}' line {self._reader.line_num} " \ + f"has an inconsistent number of columns: {str(raw_row)}" + ) + + row = {} + for key, value in zip( + self._columns, raw_row + ): # note: we import all csv fields as strings + # treat empty fields as no entry rather than empty string + if value == "": + continue + row[key] = value if str == unicode else unicode(value, encoding="utf-8") + + return row + + +# == + + +usage = """rethinkdb import -d DIR [-c HOST:PORT] [--tls-cert FILENAME] [-p] [--password-file FILENAME] + [--force] [-i (DB | DB.TABLE)] [--clients NUM] + [--shards NUM_SHARDS] [--replicas NUM_REPLICAS] + rethinkdb import -f FILE --table DB.TABLE [-c HOST:PORT] [--tls-cert FILENAME] [-p] [--password-file FILENAME] + [--force] [--clients NUM] [--format (csv | json)] [--pkey PRIMARY_KEY] + [--shards NUM_SHARDS] [--replicas NUM_REPLICAS] + [--delimiter CHARACTER] [--custom-header FIELD,FIELD... [--no-header]]""" + +help_epilog = """ +EXAMPLES: + +rethinkdb import -d rdb_export -c mnemosyne:39500 --clients 128 + Import data into a cluster running on host 'mnemosyne' with a client port at 39500, + using 128 client connections and the named export directory. + +rethinkdb import -f site_history.csv --format csv --table test.history --pkey count + Import data into a local cluster and the table 'history' in the 'test' database, + using the named CSV file, and using the 'count' field as the primary key. + +rethinkdb import -d rdb_export -c hades -p -i test + Import data into a cluster running on host 'hades' which requires a password, + using only the database 'test' from the named export directory. + +rethinkdb import -f subscriber_info.json --fields id,name,hashtag --force + Import data into a local cluster using the named JSON file, and only the fields + 'id', 'name', and 'hashtag', overwriting any existing rows with the same primary key. + +rethinkdb import -f user_data.csv --delimiter ';' --no-header --custom-header id,name,number + Import data into a local cluster using the named CSV file with no header and instead + use the fields 'id', 'name', and 'number', the delimiter is a semicolon (rather than + a comma). +""" + + +def parse_options(argv, prog=None): + parser = utils_common.CommonOptionsParser( + usage=usage, epilog=help_epilog, prog=prog + ) + + parser.add_option( + "--clients", + dest="clients", + metavar="CLIENTS", + default=8, + help="client connections to use (default: 8)", + type="pos_int", + ) + parser.add_option( + "--hard-durability", + dest="durability", + action="store_const", + default="soft", + help="use hard durability writes (slower, uses less memory)", + const="hard", + ) + parser.add_option( + "--force", + dest="force", + action="store_true", + default=False, + help="import even if a table already exists, overwriting duplicate primary keys", + ) + + parser.add_option( + "--batch-size", + dest="batch_size", + default=utils_common.default_batch_size, + help=optparse.SUPPRESS_HELP, + type="pos_int", + ) + + # Replication settings + replication_options_group = optparse.OptionGroup(parser, "Replication Options") + replication_options_group.add_option( + "--shards", + dest="create_args", + metavar="SHARDS", + help="shards to setup on created tables (default: 1)", + type="pos_int", + action="add_key", + ) + replication_options_group.add_option( + "--replicas", + dest="create_args", + metavar="REPLICAS", + help="replicas to setup on created tables (default: 1)", + type="pos_int", + action="add_key", + ) + parser.add_option_group(replication_options_group) + + # Directory import options + dir_import_group = optparse.OptionGroup(parser, "Directory Import Options") + dir_import_group.add_option( + "-d", + "--directory", + dest="directory", + metavar="DIRECTORY", + default=None, + help="directory to import data from", + ) + dir_import_group.add_option( + "-i", + "--import", + dest="db_tables", + metavar="DB|DB.TABLE", + default=[], + help="restore only the given database or table (may be specified multiple times)", + action="append", + type="db_table", + ) + dir_import_group.add_option( + "--no-secondary-indexes", + dest="indexes", + action="store_false", + default=None, + help="do not create secondary indexes", + ) + parser.add_option_group(dir_import_group) + + # File import options + file_import_group = optparse.OptionGroup(parser, "File Import Options") + file_import_group.add_option( + "-f", + "--file", + dest="file", + metavar="FILE", + default=None, + help="file to import data from", + type="file", + ) + file_import_group.add_option( + "--table", + dest="import_table", + metavar="DB.TABLE", + default=None, + help="table to import the data into", + ) + file_import_group.add_option( + "--fields", + dest="fields", + metavar="FIELD,...", + default=None, + help="limit which fields to use when importing one table", + ) + file_import_group.add_option( + "--format", + dest="format", + metavar="json|csv", + default=None, + help="format of the file (default: json, accepts newline delimited json)", + type="choice", + choices=["json", "csv"], + ) + file_import_group.add_option( + "--pkey", + dest="create_args", + metavar="PRIMARY_KEY", + default=None, + help="field to use as the primary key in the table", + action="add_key", + ) + parser.add_option_group(file_import_group) + + # CSV import options + csv_import_group = optparse.OptionGroup(parser, "CSV Options") + csv_import_group.add_option( + "--delimiter", + dest="delimiter", + metavar="CHARACTER", + default=None, + help="character separating fields, or '\\t' for tab", + ) + csv_import_group.add_option( + "--no-header", + dest="no_header", + action="store_true", + default=None, + help="do not read in a header of field names", + ) + csv_import_group.add_option( + "--custom-header", + dest="custom_header", + metavar="FIELD,...", + default=None, + help="header to use (overriding file header), must be specified if --no-header", + ) + parser.add_option_group(csv_import_group) + + # JSON import options + json_options_group = optparse.OptionGroup(parser, "JSON Options") + json_options_group.add_option( + "--max-document-size", + dest="max_document_size", + metavar="MAX_SIZE", + default=0, + help="maximum allowed size (bytes) for a single JSON document (default: 128MiB)", + type="pos_int", + ) + json_options_group.add_option( + "--max-nesting-depth", + dest="max_nesting_depth", + metavar="MAX_DEPTH", + default=0, + help="maximum depth of the JSON documents (default: 100)", + type="pos_int", + ) + parser.add_option_group(json_options_group) + + options, args = parser.parse_args(argv) + + # Check validity of arguments + + if len(args) != 0: + raise parser.error( + f"No positional arguments supported. " \ + f"Unrecognized option(s): {args}" + ) + + # - create_args + if options.create_args is None: + options.create_args = {} + + # - options based on file/directory import + + if options.directory and options.file: + parser.error("-f/--file and -d/--directory can not be used together") + + elif options.directory: + if not os.path.exists(options.directory): + parser.error( + f"-d/--directory does not exist: {options.directory}" + ) + if not os.path.isdir(options.directory): + parser.error( + f"-d/--directory is not a directory: {options.directory}" + ) + options.directory = os.path.realpath(options.directory) + + # disallow invalid options + if options.import_table: + parser.error("--table option is not valid when importing a directory") + if options.fields: + parser.error("--fields option is not valid when importing a directory") + if options.format: + parser.error("--format option is not valid when importing a directory") + if options.create_args: + parser.error("--pkey option is not valid when importing a directory") + + if options.delimiter: + parser.error("--delimiter option is not valid when importing a directory") + if options.no_header: + parser.error("--no-header option is not valid when importing a directory") + if options.custom_header: + parser.error( + f"table create options are not valid when importing " \ + f"a directory: " \ + f"{', '.join( + [x.lower().replace('_', ' ') for x in options.custom_header.keys()] + )} + ) + + # check valid options + if not os.path.isdir(options.directory): + parser.error( + f"Directory to import does not exist: {options.directory}" + ) + + if options.fields and ( + len(options.db_tables) > 1 or options.db_tables[0].table is None + ): + parser.error( + "--fields option can only be used when importing a single table" + ) + + elif options.file: + if not os.path.exists(options.file): + parser.error("-f/--file does not exist: %s" % options.file) + if not os.path.isfile(options.file): + parser.error("-f/--file is not a file: %s" % options.file) + options.file = os.path.realpath(options.file) + + # format + if options.format is None: + options.format = os.path.splitext(options.file)[1].lstrip(".") + + # import_table + if options.import_table: + res = utils_common._tableNameRegex.match(options.import_table) + if res and res.group("table"): + options.import_table = utils_common.DbTable( + res.group("db"), res.group("table") + ) + else: + parser.error( + f"Invalid --table option: {options.import_table}" + ) + else: + parser.error("A value is required for --table when importing from a file") + + # fields + options.fields = options.fields.split(",") if options.fields else None + + # disallow invalid options + if options.db_tables: + parser.error("-i/--import can only be used when importing a directory") + if options.indexes: + parser.error( + "--no-secondary-indexes can only be used when importing a directory" + ) + + if options.format == "csv": + # disallow invalid options + if options.max_document_size: + parser.error( + "--max_document_size only affects importing JSON documents" + ) + + # delimiter + if options.delimiter is None: + options.delimiter = "," + elif options.delimiter == "\\t": + options.delimiter = "\t" + elif len(options.delimiter) != 1: + parser.error( + f"Specify exactly one character for the --delimiter " \ + f"option: {options.delimiter}" + ) + + # no_header + if options.no_header is None: + options.no_header = False + elif options.custom_header is None: + parser.error("--custom-header is required if --no-header is specified") + + # custom_header + if options.custom_header: + options.custom_header = options.custom_header.split(",") + + elif options.format == "json": + # disallow invalid options + if options.delimiter is not None: + parser.error("--delimiter option is not valid for json files") + if options.no_header: + parser.error("--no-header option is not valid for json files") + if options.custom_header is not None: + parser.error("--custom-header option is not valid for json files") + + # default options + options.format = "json" + + if options.max_document_size > 0: + global JSON_MAX_BUFFER_SIZE + JSON_MAX_BUFFER_SIZE = options.max_document_size + + options.file = os.path.abspath(options.file) + + else: + parser.error(f"Unrecognized file format: {options.format}") + + else: + parser.error("Either -f/--file or -d/--directory is required") + + # -- + + # max_nesting_depth + if options.max_nesting_depth > 0: + global MAX_NESTING_DEPTH + MAX_NESTING_DEPTH = options.max_nesting_depth + + # -- + + return options + + +# This is run for each client requested, and accepts tasks from the reader processes + + +def table_writer( + tables, options, work_queue, error_queue, warning_queue, exit_event, timing_queue +): + signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these + db = table = batch = None + + try: + conflict_action = "replace" if options.force else "error" + timePoint = time.time() + while not exit_event.is_set(): + # get a batch + try: + db, table, batch = work_queue.get(timeout=0.1) + except Empty: + continue + timing_queue.put(("writer_wait", time.time() - timePoint)) + timePoint = time.time() + + # shut down when appropriate + if isinstance(batch, StopIteration): + return + + # find the table we are working on + table_info = tables[(db, table)] + tbl = query.db(db).table(table) + + # write the batch to the database + try: + res = options.retryQuery( + f"write batch to {db}.{table}", + tbl.insert( + ast.expr(batch, nesting_depth=MAX_NESTING_DEPTH), + durability=options.durability, + conflict=conflict_action, + ), + ) + + if res["errors"] > 0: + raise RuntimeError( + f"Error when importing into table " \ + f"'{db}.{table}': {res['first_error']}" + ) + modified = res["inserted"] + res["replaced"] + res["unchanged"] + if modified != len(batch): + raise RuntimeError( + f"The inserted/replaced/unchanged number did not " \ + f"match when importing into table " \ + f"'{db}.{table}': {res['first_error']}" + ) + + table_info.add_rows_written(modified) + + except errors.ReqlError: + # the error might have been caused by a comm or temporary error causing a partial batch write + + for row in batch: + if table_info.primary_key not in row: + raise RuntimeError( + f"Connection error while importing. " \ + f"Current row does not have the specified " \ + f"primary key ({table_info.primary_key}), " \ + f"so cannot guarantee absence of duplicates" + ) + res = None + if conflict_action == "replace": + res = options.retryQuery( + f"write row to {db}.{table}", + tbl.insert( + ast.expr(row, nesting_depth=MAX_NESTING_DEPTH), + durability=options.durability, + conflict=conflict_action, + ignore_write_hook=True, + ), + ) + else: + existingRow = options.retryQuery( + f"read row from {db}.{table}", + tbl.get(row[table_info.primary_key]), + ) + if not existingRow: + res = options.retryQuery( + f"write row to {db}.{table}", + tbl.insert( + ast.expr(row, nesting_depth=MAX_NESTING_DEPTH), + durability=options.durability, + conflict=conflict_action, + ignore_write_hook=True, + ), + ) + elif existingRow != row: + raise RuntimeError( + f"Duplicate primary key `{table_info.primary_key}`:" \ + f"{NEW_LINE}{str(row)}{NEW_LINE}{str(existingRow)}" + ) + + if res["errors"] > 0: + raise RuntimeError( + f"Error when importing into table " \ + f"'{db}.{table}': {res['first_error']}" + ) + if res["inserted"] + res["replaced"] + res["unchanged"] != 1: + raise RuntimeError( + f"The inserted/replaced/unchanged number was " \ + f"not 1 when inserting on " \ + f"'{db}.{table}': {res}" + ) + table_info.add_rows_written(1) + timing_queue.put(("writer_work", time.time() - timePoint)) + timePoint = time.time() + + except Exception as e: + error_queue.put( + Error(str(e), traceback.format_exc(), f"{db}.{table}") + ) + exit_event.set() + + +def update_progress(tables, debug, exit_event, sleep=0.2): + signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should not get these + + # give weights to each of the tables based on file size + totalSize = sum([x.bytes_size for x in tables]) + for table in tables: + table.weight = float(table.bytes_size) / totalSize + + lastComplete = None + startTime = time.time() + readWrites = collections.deque(maxlen=5) # (time, read, write) + readWrites.append((startTime, 0, 0)) + readRate = None + writeRate = None + while True: + try: + if exit_event.is_set(): + break + complete = read = write = 0 + currentTime = time.time() + for table in tables: + complete += table.percent_done * table.weight + if debug: + read += table.rows_read + write += table.rows_written + readWrites.append((currentTime, read, write)) + if complete != lastComplete: + timeDelta = readWrites[-1][0] - readWrites[0][0] + if debug and len(readWrites) > 1 and timeDelta > 0: + readRate = max( + (readWrites[-1][1] - readWrites[0][1]) / timeDelta, 0 + ) + writeRate = max( + (readWrites[-1][2] - readWrites[0][2]) / timeDelta, 0 + ) + utils_common.print_progress( + complete, indent=2, read=readRate, write=writeRate + ) + lastComplete = complete + time.sleep(sleep) + except KeyboardInterrupt: + break + except Exception as e: + if debug: + print(e) + traceback.print_exc() + + +def import_tables(options, sources, files_ignored=None): + # Make sure this isn't a pre-`reql_admin` cluster - which could result in data loss + # if the user has a database named 'rethinkdb' + utils_common.check_minimum_version(options, "1.6") + + start_time = time.time() + + tables = dict(((x.db, x.table), x) for x in sources) # (db, table) => table + + if six.PY3: + ctx = multiprocessing.get_context(multiprocessing.get_start_method()) + error_queue = SimpleQueue(ctx=ctx) + warning_queue = SimpleQueue(ctx=ctx) + timing_queue = SimpleQueue(ctx=ctx) + else: + error_queue = SimpleQueue() + warning_queue = SimpleQueue() + timing_queue = SimpleQueue() + + max_queue_size = options.clients * 3 + work_queue = multiprocessing.Manager().Queue(max_queue_size) + + exit_event = multiprocessing.Event() + interrupt_event = multiprocessing.Event() + + errors = [] + warnings = [] + timing_sums = {} + + pools = [] + progress_bar = None + progress_bar_sleep = 0.2 + + # - setup KeyboardInterupt handler + signal.signal(signal.SIGINT, lambda a, b: utils_common.abort(pools, exit_event)) + + # - queue draining + def drain_queues(): + # error_queue + while not error_queue.empty(): + errors.append(error_queue.get()) + + # warning_queue + while not warning_queue.empty(): + warnings.append(warning_queue.get()) + + # timing_queue + while not timing_queue.empty(): + key, value = timing_queue.get() + if key not in timing_sums: + timing_sums[key] = value + else: + timing_sums[key] += value + + # - setup dbs and tables + + # create missing dbs + needed_dbs = set([x.db for x in sources]) + if "rethinkdb" in needed_dbs: + raise RuntimeError( + "Error: Cannot import tables into the system database: 'rethinkdb'" + ) + options.retryQuery( + f"ensure dbs: {', '.join(needed_dbs)}", + ast.expr(needed_dbs) + .set_difference(query.db_list()) + .for_each(query.db_create(query.row)), + ) + + # check for existing tables, or if --force is enabled ones with mis-matched primary keys + existing_tables = dict( + [ + ((x["db"], x["name"]), x["primary_key"]) + for x in options.retryQuery( + "list tables", + query.db("rethinkdb") + .table("table_config") + .pluck(["db", "name", "primary_key"]), + ) + ] + ) + already_exist = [] + for source in sources: + if (source.db, source.table) in existing_tables: + if not options.force: + already_exist.append(f"{source.db}.{source.table}") + elif source.primary_key is None: + source.primary_key = existing_tables[(source.db, source.table)] + elif source.primary_key != existing_tables[(source.db, source.table)]: + raise RuntimeError( + f"Error: Table '{source.db}.{source.table}' " \ + f"already exists with a different primary key: " \ + f"{existing_tables[(source.db, source.table)]} "\ + f"(expected: {source.primary_key})" + ) + + if len(already_exist) == 1: + raise RuntimeError( + f"Error: Table '{already_exist[0]}' already exists, run with" \ + f" --force to import into the existing table" + ) + elif len(already_exist) > 1: + already_exist.sort() + raise RuntimeError( + f"Error: The following tables already exist, " \ + f"run with --force to import into the existing tables:" \ + f"{NEW_LINE} " \ + f"{(NEW_LINE + ' ').join(already_exist)}" + ) + + # - start the import + + try: + # - start the progress bar + if not options.quiet: + progress_bar = multiprocessing.Process( + target=update_progress, + name="progress bar", + args=(sources, options.debug, exit_event, progress_bar_sleep), + ) + progress_bar.start() + pools.append([progress_bar]) + + # - start the writers + writers = [] + pools.append(writers) + for i in range(options.clients): + writer = multiprocessing.Process( + target=table_writer, + name=f"table writer {i}", + kwargs={ + "tables": tables, + "options": options, + "work_queue": work_queue, + "error_queue": error_queue, + "warning_queue": warning_queue, + "timing_queue": timing_queue, + "exit_event": exit_event, + }, + ) + writers.append(writer) + writer.start() + + # - read the tables options.clients at a time + readers = [] + pools.append(readers) + file_iter = iter(sources) + try: + while not exit_event.is_set(): + # add a workers to fill up the readers pool + while len(readers) < options.clients: + table = next(file_iter) + reader = multiprocessing.Process( + target=table.read_to_queue, + name=f"table reader {table.db}.{table.table}", + kwargs={ + "fields": options.fields, + "batch_size": options.batch_size, + "work_queue": work_queue, + "error_queue": error_queue, + "warning_queue": warning_queue, + "timing_queue": timing_queue, + "exit_event": exit_event, + }, + ) + readers.append(reader) + reader.start() + + # drain the queues + drain_queues() + + # reap completed tasks + for reader in readers[:]: + if not reader.is_alive(): + readers.remove(reader) + if len(readers) == options.clients: + time.sleep(0.05) + except StopIteration: + pass # ran out of new tables + + # - wait for the last batch of readers to complete + while readers: + # drain the queues + drain_queues() + + # drain the work queue to prevent readers from stalling on exit + if exit_event.is_set(): + try: + while True: + work_queue.get(timeout=0.1) + except Empty: + pass + + # watch the readers + for reader in readers[:]: + try: + reader.join(0.1) + except Exception as exc: + default_logger.exception(exc) + if not reader.is_alive(): + readers.remove(reader) + + # - append enough StopIterations to signal all writers + for _ in writers: + while True: + if exit_event.is_set(): + break + try: + work_queue.put((None, None, StopIteration()), timeout=0.1) + break + except Full: + pass + + # - wait for all of the writers + for writer in writers[:]: + while writer.is_alive(): + writer.join(0.1) + writers.remove(writer) + + # - stop the progress bar + if progress_bar: + progress_bar.join(progress_bar_sleep * 2) + if not interrupt_event.is_set(): + utils_common.print_progress(1, indent=2) + if progress_bar.is_alive(): + progress_bar.terminate() + + # - drain queues + drain_queues() + + # - final reporting + if not options.quiet: + # if successful, make sure 100% progress is reported + if len(errors) == 0 and not interrupt_event.is_set(): + utils_common.print_progress(1.0, indent=2) + + # advance past the progress bar + print("") + + # report statistics + def plural(num, text): + return f"{num} {text}{'' if num == 1 else 's'}" + + print( + f" {plural(sum(x.rows_written for x in sources), 'row')}" \ + f" imported to {plural(len(sources), 'table')} in " \ + f"{time.time() - start_time}:.2f secs" + ) + + # report debug statistics + if options.debug: + print("Debug timing:") + for key, value in sorted(timing_sums.items(), key=lambda x: x[0]): + print(f" {key}: {value}:.2f") + finally: + signal.signal(signal.SIGINT, signal.SIG_DFL) + + drain_queues() + + for error in errors: + print(f"{error.message}", file=sys.stderr) + if options.debug and error.traceback: + print(f" Traceback:\n{error.traceback}", file=sys.stderr) + if len(error.file) == 4: + print(f" In file: {error.file}", file=sys.stderr) + + for warning in warnings: + print(f"{warning[1]}", file=sys.stderr) + if options.debug: + print( + f"{warning[0].__name__, warning[2]} traceback: {sys.stderr}") + if len(warning) == 4: + print(f"In file: {warning[3], sys.stderr}") + + if interrupt_event.is_set(): + raise RuntimeError("Interrupted") + if errors: + raise RuntimeError("Errors occurred during import") + if warnings: + raise RuntimeError("Warnings occurred during import") + + +def parse_sources(options, files_ignored=None): + def parse_info_file(path): + primary_key = None + indexes = [] + write_hook = None + with open(path, "r") as info_file: + metadata = json.load(info_file) + if "primary_key" in metadata: + primary_key = metadata["primary_key"] + if "indexes" in metadata and options.indexes is not False: + indexes = metadata["indexes"] + if "write_hook" in metadata: + write_hook = metadata["write_hook"] + return primary_key, indexes, write_hook + + has_write_hooks = utils_common.check_minimum_version(options, "2.3.7", False) + + sources = set() + if files_ignored is None: + files_ignored = [] + if options.directory and options.file: + raise RuntimeError( + "Error: Both --directory and --file cannot be specified together" + ) + elif options.file: + db, table = options.import_table + path, ext = os.path.splitext(options.file) + table_type_options = None + if ext == ".json": + table_type = JsonSourceFile + elif ext == ".csv": + table_type = CsvSourceFile + table_type_options = { + "no_header_row": options.no_header, + "custom_header": options.custom_header, + } + else: + raise Exception("The table type is not recognised: %s" % ext) + + # - parse the info file if it exists + primary_key = ( + options.create_args.get("primary_key", None) + if options.create_args + else None + ) + indexes = [] + write_hook = None + info_path = path + ".info" + if (primary_key is None or options.indexes is not False) and os.path.isfile( + info_path + ): + info_primary_key, info_indexes, info_write_hook = parse_info_file(info_path) + if primary_key is None: + primary_key = info_primary_key + if options.indexes is not False: + indexes = info_indexes + if write_hook is None: + write_hook = info_write_hook + if write_hook and not has_write_hooks: + raise Exception("this RDB version doesn't support write-hooks") + + sources.add( + table_type( + source=options.file, + db=db, + table=table, + query_runner=options.retryQuery, + primary_key=primary_key, + indexes=indexes, + write_hook=write_hook, + source_options=table_type_options, + ) + ) + elif options.directory: + # Scan for all files, make sure no duplicated tables with different formats + dbs = False + files_ignored = [] + for root, dirs, files in os.walk(options.directory): + if not dbs: + files_ignored.extend([os.path.join(root, f) for f in files]) + # The first iteration through should be the top-level directory, which contains the db folders + dbs = True + + # don't recurse into folders not matching our filter + db_filter = set([db_table[0] for db_table in options.db_tables or []]) + if db_filter: + for dir_name in dirs[:]: # iterate on a copy + if dir_name not in db_filter: + dirs.remove(dir_name) + else: + if dirs: + files_ignored.extend([os.path.join(root, d) for d in dirs]) + del dirs[:] + + db = os.path.basename(root) + for filename in files: + path = os.path.join(root, filename) + table, ext = os.path.splitext(filename) + table = os.path.basename(table) + + if ext not in [".json", ".csv", ".info"]: + files_ignored.append(os.path.join(root, filename)) + elif ext == ".info": + pass # Info files are included based on the data files + elif not os.path.exists(os.path.join(root, table + ".info")): + files_ignored.append(os.path.join(root, filename)) + else: + # apply db/table filters + if options.db_tables: + for filter_db, filter_table in options.db_tables: + if db == filter_db and filter_table in (None, table): + break # either all tables in this db, or specific pair + else: + files_ignored.append(os.path.join(root, filename)) + continue # not a chosen db/table + + # collect the info + primary_key = None + indexes = [] + write_hook = None + info_path = os.path.join(root, table + ".info") + if not os.path.isfile(info_path): + files_ignored.append(os.path.join(root, filename)) + else: + primary_key, indexes, write_hook = parse_info_file( + info_path + ) + if write_hook and not has_write_hooks: + raise Exception( + "RDB versions below doesn't support write-hooks" + ) + + table_type = None + if ext == ".json": + table_type = JsonSourceFile + elif ext == ".csv": + table_type = CsvSourceFile + else: + raise Exception( + f"The table type is not recognised: {ext}" + ) + source = table_type( + source=path, + query_runner=options.retryQuery, + db=db, + table=table, + primary_key=primary_key, + indexes=indexes, + write_hook=write_hook, + ) + + # ensure we don't have a duplicate + if table in sources: + raise RuntimeError( + f"Error: Duplicate db.table found in directory tree: {source.db}.{source.table}" + ) + + sources.add(source) + + # Warn the user about the files that were ignored + if len(files_ignored) > 0: + print( + "Unexpected files found in the specified directory. Importing a directory expects", + file=sys.stderr, + ) + print( + " a directory from `rethinkdb export`. If you want to import individual tables", + file=sys.stderr, + ) + print( + " import them as single files. The following files were ignored:", + file=sys.stderr, + ) + for ignored_file in files_ignored: + print(f"{str(ignored_file), file=sys.stderr}") + else: + raise RuntimeError("Error: Neither --directory or --file specified") + + return sources @click.command @@ -30,3 +1722,22 @@ def cmd_import(): Import loads data into a RethinkDB cluster. """ click.echo("import command") + argv = [] + prog = [] + start_time = time.time() + + if argv is None: + argv = sys.argv[1:] + options = parse_options(argv, prog=prog) + + try: + sources = parse_sources(options) + import_tables(options, sources) + except RuntimeError as ex: + print(ex, file=sys.stderr) + if str(ex) == "Warnings occurred during import": + return 2 + return 1 + if not options.quiet: + print(f" Done ({time.time() - start_time} seconds)") + return 0 From e6254640d56c5584338967ab09e056f7d7337f0d Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sat, 16 Jul 2022 15:07:03 +0200 Subject: [PATCH 05/29] :bug: Fix Syntax errors in cli's import --- rethinkdb/cli/_import.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 2624e15..3a18928 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -314,13 +314,13 @@ def restore_indexes(self, warning_queue): for index in self.indexes: if index["index"] in existing_indexes: # drop existing versions self.query_runner( - f"drop index: {self.db}.{self.table}:{index['index']}" + f"drop index: {self.db}.{self.table}:{index['index']}", query.db(self.db) .table(self.table) .index_drop(index["index"]), ) self.query_runner( - f"create index: {self.db}.{self.table}:{index['index']}" + f"create index: {self.db}.{self.table}:{index['index']}", query.db(self.db) .table(self.table) .index_create(index["index"], index["function"]), @@ -967,9 +967,9 @@ def parse_options(argv, prog=None): parser.error( f"table create options are not valid when importing " \ f"a directory: " \ - f"{', '.join( + f"""{', '.join( [x.lower().replace('_', ' ') for x in options.custom_header.keys()] - )} + )}""" ) # check valid options @@ -1709,7 +1709,7 @@ def parse_info_file(path): file=sys.stderr, ) for ignored_file in files_ignored: - print(f"{str(ignored_file), file=sys.stderr}") + print(f"{str(ignored_file)} {sys.stderr}") else: raise RuntimeError("Error: Neither --directory or --file specified") From 44fa648b0767cc3dd77a5b6a73e8be082ad5fada Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sat, 16 Jul 2022 15:07:42 +0200 Subject: [PATCH 06/29] :construction: Copy and add f-strings to index_rebuild --- rethinkdb/cli/_index_rebuild.py | 246 ++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) diff --git a/rethinkdb/cli/_index_rebuild.py b/rethinkdb/cli/_index_rebuild.py index 8c3bc1e..d90e6a8 100644 --- a/rethinkdb/cli/_index_rebuild.py +++ b/rethinkdb/cli/_index_rebuild.py @@ -25,6 +25,236 @@ """ import click +import sys +import time +import traceback + +from rethinkdb import query, utils_common + +usage = ( + "rethinkdb index-rebuild [-c HOST:PORT] [-n NUM] " + "[-r (DB | DB.TABLE)] [--tls-cert FILENAME] [-p] " + "[--password-file FILENAME]..." +) +help_epilog = """ +FILE: the archive file to restore data from + +EXAMPLES: +rethinkdb index-rebuild -c mnemosyne:39500 + rebuild all outdated secondary indexes from the cluster through the host 'mnemosyne', + one at a time + +rethinkdb index-rebuild -r test -r production.users -n 5 + rebuild all outdated secondary indexes from a local cluster on all tables in the + 'test' database as well as the 'production.users' table, five at a time +""" + +# Prefix used for indexes that are being rebuilt +TMP_INDEX_PREFIX = "$reql_temp_index$_" + + +def parse_options(argv, prog=None): + parser = utils_common.CommonOptionsParser( + usage=usage, epilog=help_epilog, prog=prog + ) + + parser.add_option( + "-r", + "--rebuild", + dest="db_table", + metavar="DB|DB.TABLE", + default=[], + help="databases or tables to rebuild indexes on (default: all, may be specified multiple times)", + action="append", + type="db_table", + ) + parser.add_option( + "-n", + dest="concurrent", + metavar="NUM", + default=1, + help="concurrent indexes to rebuild (default: 1)", + type="pos_int", + ) + parser.add_option( + "--force", + dest="force", + action="store_true", + default=False, + help="rebuild non-outdated indexes", + ) + + options, args = parser.parse_args(argv) + + # Check validity of arguments + if len(args) != 0: + parser.error( + f"Error: No positional arguments supported. " + f"Unrecognized option '{args[0]}'" + ) + + return options + + +def rebuild_indexes(options): + + # flesh out options.db_table + if not options.db_table: + options.db_table = [ + utils_common.DbTable(x["db"], x["name"]) + for x in options.retryQuery( + "all tables", + query.db("rethinkdb").table("table_config").pluck(["db", "name"]), + ) + ] + else: + for db_table in options.db_table[:]: # work from a copy + if not db_table[1]: + options.db_table += [ + utils_common.DbTable(db_table[0], x) + for x in options.retryQuery( + f"table list of {db_table[0]}", + query.db(db_table[0]).table_list(), + ) + ] + del options.db_table[db_table] + + # wipe out any indexes with the TMP_INDEX_PREFIX + for db, table in options.db_table: + for index in options.retryQuery( + f"list indexes on {db}.{table}" % (db, table), + query.db(db).table(table).index_list(), + ): + if index.startswith(TMP_INDEX_PREFIX): + options.retryQuery( + f"drop index: {db}.{table}:{index}", + query.db(index["db"]) + .table(index["table"]) + .index_drop(index["name"]), + ) + + # get the list of indexes to rebuild + indexes_to_build = [] + for db, table in options.db_table: + indexes = None + if not options.force: + indexes = options.retryQuery( + f"get outdated indexes from {db}.{table}" % (db, table), + query.db(db) + .table(table) + .index_status() + .filter({"outdated": True}) + .get_field("index"), + ) + else: + indexes = options.retryQuery( + f"get all indexes from {db}.{table}", + query.db(db).table(table).index_status().get_field("index"), + ) + for index in indexes: + indexes_to_build.append({"db": db, "table": table, "name": index}) + + # rebuild selected indexes + + total_indexes = len(indexes_to_build) + indexes_completed = 0 + progress_ratio = 0.0 + highest_progress = 0.0 + indexes_in_progress = [] + + if not options.quiet: + print( + f"Rebuilding {total_indexes} index" + f"{'es' if total_indexes > 1 else ''}: " + f", ".join( + [ + f"`%({i.db})s.%({i.table})s:%({i.name})s`" + for i in indexes_to_build] + ) + ) + + while len(indexes_to_build) > 0 or len(indexes_in_progress) > 0: + # Make sure we're running the right number of concurrent index rebuilds + while ( + len(indexes_to_build) > 0 and len(indexes_in_progress) < options.concurrent + ): + index = indexes_to_build.pop() + indexes_in_progress.append(index) + index["temp_name"] = TMP_INDEX_PREFIX + index["name"] + index["progress"] = 0 + index["ready"] = False + + existing_indexes = dict( + (x["index"], x["function"]) + for x in options.retryQuery( + "existing indexes", + query.db(index["db"]) + .table(index["table"]) + .index_status() + .pluck("index", "function"), + ) + ) + + if index["name"] not in existing_indexes: + raise AssertionError( + f"{index['name']} is not part of existing indexes " + f"{', '.join(existing_indexes)}" + ) + + if index["temp_name"] not in existing_indexes: + options.retryQuery( + "create temp index: %(db)s.%(table)s:%(name)s" % index, + query.db(index["db"]) + .table(index["table"]) + .index_create(index["temp_name"], existing_indexes[index["name"]]), + ) + + # Report progress + highest_progress = max(highest_progress, progress_ratio) + if not options.quiet: + utils_common.print_progress(highest_progress) + + # Check the status of indexes in progress + progress_ratio = 0.0 + for index in indexes_in_progress: + status = options.retryQuery( + f"progress `{index.db}s.{index.table}s` " + f"index `{index.name}s`", + query.db(index["db"]) + .table(index["table"]) + .index_status(index["temp_name"]) + .nth(0), + ) + if status["ready"]: + index["ready"] = True + options.retryQuery( + f"rename `{index.db}s.{index.table}s` " + f"index `{index.name}s`", + query.db(index["db"]) + .table(index["table"]) + .index_rename(index["temp_name"], index["name"], overwrite=True), + ) + else: + progress_ratio += status.get("progress", 0) / total_indexes + + indexes_in_progress = [ + index for index in indexes_in_progress if not index["ready"] + ] + indexes_completed = ( + total_indexes - len(indexes_to_build) - len(indexes_in_progress) + ) + progress_ratio += float(indexes_completed) / total_indexes + + if len(indexes_in_progress) == options.concurrent or ( + len(indexes_in_progress) > 0 and len(indexes_to_build) == 0 + ): + # Short sleep to keep from killing the CPU + time.sleep(0.1) + + # Make sure the progress bar says we're done and get past the progress bar line + if not options.quiet: + utils_common.print_progress(1.0) + print("") @click.command @@ -33,3 +263,19 @@ def cmd_index_rebuild(): Rebuild outdated secondary indexes. """ click.echo("index rebuild command") + argv = [] + prog = [] + start_time = time.time() + options = parse_options(argv or sys.argv[1:], prog=prog) + + try: + rebuild_indexes(options) + except Exception as ex: + if options.debug: + traceback.print_exc() + if not options.quiet: + print(ex, file=sys.stderr) + return 1 + if not options.quiet: + print(f"Done ({time.time() - start_time} seconds)") + return 0 From 3b479a31fae75f144539a886870ab9a872aef458 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sat, 16 Jul 2022 15:11:02 +0200 Subject: [PATCH 07/29] :bug: Fix indentation --- rethinkdb/cli/_import.py | 64 ++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 3a18928..d1d5373 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -925,8 +925,8 @@ def parse_options(argv, prog=None): if len(args) != 0: raise parser.error( - f"No positional arguments supported. " \ - f"Unrecognized option(s): {args}" + f"No positional arguments supported. " + f"Unrecognized option(s): {args}" ) # - create_args @@ -941,12 +941,12 @@ def parse_options(argv, prog=None): elif options.directory: if not os.path.exists(options.directory): parser.error( - f"-d/--directory does not exist: {options.directory}" - ) + f"-d/--directory does not exist: {options.directory}" + ) if not os.path.isdir(options.directory): parser.error( - f"-d/--directory is not a directory: {options.directory}" - ) + f"-d/--directory is not a directory: {options.directory}" + ) options.directory = os.path.realpath(options.directory) # disallow invalid options @@ -965,9 +965,9 @@ def parse_options(argv, prog=None): parser.error("--no-header option is not valid when importing a directory") if options.custom_header: parser.error( - f"table create options are not valid when importing " \ - f"a directory: " \ - f"""{', '.join( + f"table create options are not valid when importing " + f"a directory: " + f"""{', '.join( [x.lower().replace('_', ' ') for x in options.custom_header.keys()] )}""" ) @@ -975,8 +975,8 @@ def parse_options(argv, prog=None): # check valid options if not os.path.isdir(options.directory): parser.error( - f"Directory to import does not exist: {options.directory}" - ) + f"Directory to import does not exist: {options.directory}" + ) if options.fields and ( len(options.db_tables) > 1 or options.db_tables[0].table is None @@ -1005,8 +1005,8 @@ def parse_options(argv, prog=None): ) else: parser.error( - f"Invalid --table option: {options.import_table}" - ) + f"Invalid --table option: {options.import_table}" + ) else: parser.error("A value is required for --table when importing from a file") @@ -1035,8 +1035,8 @@ def parse_options(argv, prog=None): options.delimiter = "\t" elif len(options.delimiter) != 1: parser.error( - f"Specify exactly one character for the --delimiter " \ - f"option: {options.delimiter}" + f"Specify exactly one character for the --delimiter " + f"option: {options.delimiter}" ) # no_header @@ -1127,15 +1127,15 @@ def table_writer( if res["errors"] > 0: raise RuntimeError( - f"Error when importing into table " \ - f"'{db}.{table}': {res['first_error']}" + f"Error when importing into table " + f"'{db}.{table}': {res['first_error']}" ) modified = res["inserted"] + res["replaced"] + res["unchanged"] if modified != len(batch): raise RuntimeError( - f"The inserted/replaced/unchanged number did not " \ - f"match when importing into table " \ - f"'{db}.{table}': {res['first_error']}" + f"The inserted/replaced/unchanged number did not " + f"match when importing into table " + f"'{db}.{table}': {res['first_error']}" ) table_info.add_rows_written(modified) @@ -1146,9 +1146,9 @@ def table_writer( for row in batch: if table_info.primary_key not in row: raise RuntimeError( - f"Connection error while importing. " \ - f"Current row does not have the specified " \ - f"primary key ({table_info.primary_key}), " \ + f"Connection error while importing. " + f"Current row does not have the specified " + f"primary key ({table_info.primary_key}), " f"so cannot guarantee absence of duplicates" ) res = None @@ -1179,20 +1179,20 @@ def table_writer( ) elif existingRow != row: raise RuntimeError( - f"Duplicate primary key `{table_info.primary_key}`:" \ - f"{NEW_LINE}{str(row)}{NEW_LINE}{str(existingRow)}" + f"Duplicate primary key `{table_info.primary_key}`:" + f"{NEW_LINE}{str(row)}{NEW_LINE}{str(existingRow)}" ) if res["errors"] > 0: raise RuntimeError( - f"Error when importing into table " \ - f"'{db}.{table}': {res['first_error']}" + f"Error when importing into table " + f"'{db}.{table}': {res['first_error']}" ) if res["inserted"] + res["replaced"] + res["unchanged"] != 1: raise RuntimeError( - f"The inserted/replaced/unchanged number was " \ - f"not 1 when inserting on " \ - f"'{db}.{table}': {res}" + f"The inserted/replaced/unchanged number was " + f"not 1 when inserting on " + f"'{db}.{table}': {res}" ) table_info.add_rows_written(1) timing_queue.put(("writer_work", time.time() - timePoint)) @@ -1200,8 +1200,8 @@ def table_writer( except Exception as e: error_queue.put( - Error(str(e), traceback.format_exc(), f"{db}.{table}") - ) + Error(str(e), traceback.format_exc(), f"{db}.{table}") + ) exit_event.set() From ffc66eb5b0126dc70bf504611ab1dad20ca5ab43 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sat, 16 Jul 2022 15:21:39 +0200 Subject: [PATCH 08/29] :bug: Fix syntax errors --- rethinkdb/cli/_import.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index d1d5373..36d8f03 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -60,6 +60,7 @@ NEW_LINE = "\n" + class SourceFile(object): format = None # set by subclasses @@ -136,10 +137,10 @@ def __init__( try: self._source = codecs.open(source, mode="r", encoding="utf-8") except IOError as exc: - default_logger.exception(exc) + # default_logger.exception(exc) raise ValueError( - f'Unable to open source file "{str(source)}": ' \ - f'{str(exc)}' + f'Unable to open source file "{str(source)}": ' + f'{str(exc)}' ) if ( @@ -298,8 +299,8 @@ def setup_table(self): self.primary_key = primary_key elif primary_key != self.primary_key: raise RuntimeError( - f"Error: table {self.db}.{self.table} primary key was " \ - f"`{primary_key}` rather than the expected: {self.primary_key}" + f"Error: table {self.db}.{self.table} primary key was " + f"`{primary_key}` rather than the expected: {self.primary_key}" ) def restore_indexes(self, warning_queue): @@ -411,7 +412,7 @@ def batches(self, batch_size=None, warning_queue=None): yield batch batch = [] - except StopIteration as e: + except StopIteration: # yield any final batch if batch: yield batch @@ -476,7 +477,7 @@ def read_to_queue( # - report relevant errors except Exception as exc: - default_logger.exception(exc) + # default_logger.exception(exc) error_queue.put(Error(str(exc), traceback.format_exc(), self.name)) exit_event.set() raise @@ -509,9 +510,9 @@ def fill_buffer(self): # double the buffer under the assumption that the documents are too large to fit if self._buffer_size == JSON_MAX_BUFFER_SIZE: raise Exception( - f"Error: JSON max buffer size exceeded on file " \ - f"{self.name} (from position {self.bytes_processed}). " \ - f"Use '--max-document-size' to extend your buffer." + f"Error: JSON max buffer size exceeded on file " + f"{self.name} (from position {self.bytes_processed}). " + f"Use '--max-document-size' to extend your buffer." ) self._buffer_size = min(self._buffer_size * 2, JSON_MAX_BUFFER_SIZE) @@ -614,8 +615,8 @@ def teardown(self): else f" and {len(snippit) - 100} more characters" ) raise ValueError( - f"Error: JSON array did not end cleanly, " \ - f"rather with: <<{snippit[:100]}>>{extra}" + f"Error: JSON array did not end cleanly, " + f"rather with: <<{snippit[:100]}>>{extra}" ) self._buffer_pos += 1 @@ -631,8 +632,8 @@ def teardown(self): else f" and {len(snippit) - 100} more characters" ) raise ValueError( - f"Error: extra data after JSON data: <<{snippit[:100]}>>" \ - f"{extra}" + f"Error: extra data after JSON data: <<{snippit[:100]}>>" + f"{extra}" ) @@ -680,8 +681,8 @@ def setup_file(self, warning_queue=None): if self.custom_header is not None: if not self.no_header_row: warning_queue.put( - f"Ignoring header row on {self.name}: " \ - f"{str(self._columns)}" + f"Ignoring header row on {self.name}: " + f"{str(self._columns)}" ) self._columns = self.custom_header elif self.no_header_row: @@ -691,8 +692,8 @@ def get_line(self): raw_row = next(self._reader) if len(self._columns) != len(raw_row): raise Exception( - f"Error: '{self.name}' line {self._reader.line_num} " \ - f"has an inconsistent number of columns: {str(raw_row)}" + f"Error: '{self.name}' line {self._reader.line_num} " + f"has an inconsistent number of columns: {str(raw_row)}" ) row = {} From ad827693de5fe33029e69a219a1832ac06762a82 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sat, 16 Jul 2022 15:24:21 +0200 Subject: [PATCH 09/29] :bug: Fix syntax errors --- rethinkdb/cli/_import.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 36d8f03..500e2e0 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -1344,23 +1344,23 @@ def drain_queues(): source.primary_key = existing_tables[(source.db, source.table)] elif source.primary_key != existing_tables[(source.db, source.table)]: raise RuntimeError( - f"Error: Table '{source.db}.{source.table}' " \ - f"already exists with a different primary key: " \ - f"{existing_tables[(source.db, source.table)]} "\ - f"(expected: {source.primary_key})" + f"Error: Table '{source.db}.{source.table}' " + f"already exists with a different primary key: " + f"{existing_tables[(source.db, source.table)]} " + f"(expected: {source.primary_key})" ) if len(already_exist) == 1: raise RuntimeError( - f"Error: Table '{already_exist[0]}' already exists, run with" \ + f"Error: Table '{already_exist[0]}' already exists, run with" f" --force to import into the existing table" ) elif len(already_exist) > 1: already_exist.sort() raise RuntimeError( - f"Error: The following tables already exist, " \ - f"run with --force to import into the existing tables:" \ - f"{NEW_LINE} " \ + f"Error: The following tables already exist, " + f"run with --force to import into the existing tables:" + f"{NEW_LINE} " f"{(NEW_LINE + ' ').join(already_exist)}" ) @@ -1452,7 +1452,8 @@ def drain_queues(): try: reader.join(0.1) except Exception as exc: - default_logger.exception(exc) + # default_logger.exception(exc) + pass if not reader.is_alive(): readers.remove(reader) @@ -1498,8 +1499,8 @@ def plural(num, text): return f"{num} {text}{'' if num == 1 else 's'}" print( - f" {plural(sum(x.rows_written for x in sources), 'row')}" \ - f" imported to {plural(len(sources), 'table')} in " \ + f" {plural(sum(x.rows_written for x in sources), 'row')}" + f" imported to {plural(len(sources), 'table')} in " f"{time.time() - start_time}:.2f secs" ) From 54d00eb7ac8e36e0e72c179702729d0791420072 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Tue, 9 Aug 2022 22:25:47 +0200 Subject: [PATCH 10/29] :bug: Fix bytes_processed exception when reading file with small buffer size #280 --- rethinkdb/cli/_import.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 500e2e0..37f0cd2 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -511,7 +511,7 @@ def fill_buffer(self): if self._buffer_size == JSON_MAX_BUFFER_SIZE: raise Exception( f"Error: JSON max buffer size exceeded on file " - f"{self.name} (from position {self.bytes_processed}). " + f"{self.name} (from position {self._bytes_read}). " f"Use '--max-document-size' to extend your buffer." ) self._buffer_size = min(self._buffer_size * 2, JSON_MAX_BUFFER_SIZE) From 523c8b14ddd4029bd3d50c544de50d08f817672b Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sat, 1 Oct 2022 22:40:52 +0200 Subject: [PATCH 11/29] :sparkles: Ported the restore module --- rethinkdb/cli/_restore.py | 316 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 316 insertions(+) diff --git a/rethinkdb/cli/_restore.py b/rethinkdb/cli/_restore.py index c1b4da2..fe15f60 100644 --- a/rethinkdb/cli/_restore.py +++ b/rethinkdb/cli/_restore.py @@ -22,6 +22,307 @@ """ import click +import copy +import multiprocessing +import optparse +import os +import shutil +import sys +import tarfile +import tempfile +import time +import traceback + +from rethinkdb import _import, utils_common + +usage = ( + "rethinkdb restore FILE [-c HOST:PORT] [--tls-cert FILENAME] [-p] [--password-file FILENAME] [--clients NUM] " + "[--shards NUM_SHARDS] [--replicas NUM_REPLICAS] [--force] [-i (DB | DB.TABLE)]..." +) +help_epilog = """ +FILE: + the archive file to restore data from; + if FILE is -, use standard input (note that + intermediate files will still be written to + the --temp-dir directory) + +EXAMPLES: + +rethinkdb restore rdb_dump.tar.gz -c mnemosyne:39500 + Import data into a cluster running on host 'mnemosyne' with a client port at 39500 using + the named archive file. + +rethinkdb restore rdb_dump.tar.gz -i test + Import data into a local cluster from only the 'test' database in the named archive file. + +rethinkdb restore rdb_dump.tar.gz -i test.subscribers -c hades -p + Import data into a cluster running on host 'hades' which requires a password from only + a specific table from the named archive file. + +rethinkdb restore rdb_dump.tar.gz --clients 4 --force + Import data to a local cluster from the named archive file using only 4 client connections + and overwriting any existing rows with the same primary key. +""" + + +def parse_options(argv, prog=None): + parser = utils_common.CommonOptionsParser( + usage=usage, epilog=help_epilog, prog=prog + ) + + parser.add_option( + "-i", + "--import", + dest="db_tables", + metavar="DB|DB.TABLE", + default=[], + help="limit restore to the given database or table (may be specified multiple times)", + action="append", + type="db_table", + ) + + parser.add_option( + "--temp-dir", + dest="temp_dir", + metavar="DIR", + default=None, + help="directory to use for intermediary results", + ) + parser.add_option( + "--clients", + dest="clients", + metavar="CLIENTS", + default=8, + help="client connections to use (default: 8)", + type="pos_int", + ) + parser.add_option( + "--hard-durability", + dest="durability", + action="store_const", + default="soft", + help="use hard durability writes (slower, uses less memory)", + const="hard", + ) + parser.add_option( + "--force", + dest="force", + action="store_true", + default=False, + help="import data even if a table already exists", + ) + parser.add_option( + "--no-secondary-indexes", + dest="indexes", + action="store_false", + default=None, + help="do not create secondary indexes for the restored tables", + ) + + parser.add_option( + "--writers-per-table", + dest="writers", + default=multiprocessing.cpu_count(), + help=optparse.SUPPRESS_HELP, + type="pos_int", + ) + parser.add_option( + "--batch-size", + dest="batch_size", + default=utils_common.default_batch_size, + help=optparse.SUPPRESS_HELP, + type="pos_int", + ) + + # Replication settings + replication_options_group = optparse.OptionGroup(parser, "Replication Options") + replication_options_group.add_option( + "--shards", + dest="create_args", + metavar="SHARDS", + help="shards to setup on created tables (default: 1)", + type="pos_int", + action="add_key", + ) + replication_options_group.add_option( + "--replicas", + dest="create_args", + metavar="REPLICAS", + help="replicas to setup on created tables (default: 1)", + type="pos_int", + action="add_key", + ) + parser.add_option_group(replication_options_group) + + options, args = parser.parse_args(argv) + + # -- Check validity of arguments + + # - archive + if len(args) == 0: + parser.error( + "Archive to import not specified. Provide an archive file created by rethinkdb-dump." + ) + elif len(args) != 1: + parser.error("Only one positional argument supported") + options.in_file = args[0] + if options.in_file == "-": + options.in_file = sys.stdin + else: + if not os.path.isfile(options.in_file): + parser.error(f"Archive file does not exist: {options.in_file}") + options.in_file = os.path.realpath(options.in_file) + + # - temp_dir + if options.temp_dir: + if not os.path.isdir(options.temp_dir): + parser.error( + f"Temporary directory doesn't exist or is not a directory: {options.temp_dir}" + ) + if not os.access(options.temp_dir, os.W_OK): + parser.error(f"Temporary directory inaccessible: {options.temp_dir}") + + # - create_args + if options.create_args is None: + options.create_args = {} + + # -- + + return options + + +def do_unzip(temp_dir, options): + """extract the tarfile to the filesystem""" + + tables_to_export = set(options.db_tables) + top_level = None + files_ignored = [] + files_found = False + archive = None + tarfile_options = { + "mode": "r|*", + "fileobj" if hasattr(options.in_file, "read") else "name": options.in_file, + } + try: + archive = tarfile.open(**tarfile_options) + for tarinfo in archive: + # skip without comment anything but files + if not tarinfo.isfile(): + continue # skip everything but files + + # normalize the path + relpath = os.path.relpath( + os.path.realpath(tarinfo.name.strip().lstrip(os.sep)) + ) + + # skip things that try to jump out of the folder + if relpath.startswith(os.path.pardir): + files_ignored.append(tarinfo.name) + continue + + # skip files types other than what we use + if not os.path.splitext(relpath)[1] in (".json", ".csv", ".info"): + files_ignored.append(tarinfo.name) + continue + + # ensure this looks like our structure + try: + top, db, file_name = relpath.split(os.sep) + except ValueError: + raise RuntimeError( + f"Error: Archive file has an unexpected directory structure: {tarinfo.name}" + ) + + if not top_level: + top_level = top + elif top != top_level: + raise RuntimeError( + f"Error: Archive file has an unexpected directory structure ({top} vs {top_level})" + ) + + # filter out tables we are not looking for + table = os.path.splitext(file_name) + if tables_to_export and not ( + (db, table) in tables_to_export or (db, None) in tables_to_export + ): + continue # skip without comment + + # write the file out + files_found = True + dest_path = os.path.join(temp_dir, db, file_name) + + if not os.path.exists(os.path.dirname(dest_path)): + os.makedirs(os.path.dirname(dest_path)) + + with open(dest_path, "wb") as dest: + source = archive.extractfile(tarinfo) + chunk = True + while chunk: + chunk = source.read(1024 * 128) + dest.write(chunk) + source.close() + + if not os.path.isfile(dest_path): + raise AssertionError( + f"Was not able to write {dest_path}" + ) + + finally: + if archive: + archive.close() + + if not files_found: + raise RuntimeError("Error: Archive file had no files") + + # - send the location and ignored list back to our caller + return files_ignored + + +def do_restore(options): + # Create a temporary directory to store the extracted data + temp_dir = tempfile.mkdtemp(dir=options.temp_dir) + + try: + # - extract the archive + if not options.quiet: + print("Extracting archive file...") + start_time = time.time() + + do_unzip(temp_dir, options) + + if not options.quiet: + print(f" Done ({(time.time() - start_time):2f} seconds)") + + # - default _import options + + options = copy.copy(options) + options.fields = None + options.directory = temp_dir + options.file = None + + sources = _import.parse_sources(options) + + # - run the import + if not options.quiet: + print("Importing from directory...") + + try: + _import.import_tables(options, sources) + except RuntimeError as ex: + if options.debug: + traceback.print_exc() + if str(ex) == "Warnings occurred during import": + raise RuntimeError( + "Warning: import did not create some secondary indexes." + ) + else: + error_string = str(ex) + if error_string.startswith("Error: "): + error_string = error_string[len("Error: ") :] + raise RuntimeError(f"Error: import failed: {error_string}") + # 'Done' message will be printed by the import script + finally: + shutil.rmtree(temp_dir) @click.command @@ -30,3 +331,18 @@ def cmd_restore(): Restore loads data into a RethinkDB cluster from an archive. """ click.echo("restore command") + argv = [] + prog = [] + start_time = time.time() + + if argv is None: + argv = sys.argv[1:] + options = parse_options(argv, prog=prog) + + try: + do_restore(options) + except RuntimeError as ex: + print(ex, file=sys.stderr) + return 1 + print(f"Restore done in {time.time() - start_time}") + return 0 From b7ae9d07c462419f9796db961db294d0ef91f65b Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sun, 27 Nov 2022 22:36:14 +0100 Subject: [PATCH 12/29] :construction: Add click options to dump --- rethinkdb/cli/_dump.py | 86 +++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/rethinkdb/cli/_dump.py b/rethinkdb/cli/_dump.py index fb20399..0d935ea 100644 --- a/rethinkdb/cli/_dump.py +++ b/rethinkdb/cli/_dump.py @@ -166,17 +166,24 @@ def parse_options(argv, prog=None): @click.command -def cmd_dump(): +@click.option("--directory", default="", help="The target directory of the dump") +@click.option("--fields", default=None, help="The fields to be dumped") +@click.option("--delimiter", default=None, help="The delimiter of the exported data") +@click.option("--format", default="json", help="The format of the export data") +@click.option("--quiet", default=True, help="Verbose dump") +@click.opton("--debug", default=False, help="Whether to debug or not") +@click.option("--out_file", help="The output file name") +def cmd_dump(directory, fields, delimiter, format, quiet, debug, out_file): """ Dump creates an archive of data from a RethinkDB cluster. """ click.echo("dump command") - argv = None - prog = None + # argv = None + # prog = None - options = parse_options(argv or sys.argv[1:], prog=prog) + # options = parse_options(argv or sys.argv[1:], prog=prog) try: - if not options.quiet: + if not quiet: # Print a warning about the capabilities of dump, so no one is confused (hopefully) print( """\ @@ -191,42 +198,60 @@ def cmd_dump(): # -- _export options - need to be kep in-sync with _export - options.directory = os.path.realpath(tempfile.mkdtemp(dir=options.temp_dir)) - options.fields = None - options.delimiter = None - options.format = "json" + # options.directory = os.path.realpath(tempfile.mkdtemp(dir=options.temp_dir)) + # options.fields = None + # options.delimiter = None + # options.format = "json" # -- export to a directory - if not options.quiet: + # if not options.quiet: + # print(" Exporting to temporary directory...") + if not quiet: print(" Exporting to temporary directory...") try: - cmd_export.run(options) + # cmd_export.run(options) + cmd_export.run({ + "directory": directory, + "quiet": quiet, + "fields": fields, + "delimter": delimiter, + "format": format + }) except Exception as exc: # default_logger.exception(exc) - if options.debug: + # if options.debug: + if debug: sys.stderr.write(f"\n{traceback.format_exc()}\n") raise Exception(f"Error: export failed, {exc}") # -- zip directory - if not options.quiet: + # if not options.quiet: + if not quiet: print(" Zipping export directory...") try: - if hasattr(options.out_file, "read"): - archive = tarfile.open(fileobj=options.out_file, mode="w:gz") + # if hasattr(options.out_file, "read"): + if hasattr(out_file, "read") + # archive = tarfile.open(fileobj=options.out_file, mode="w:gz") + archive = tarfile.open(fileobj=out_file, mode="w:gz") else: - archive = tarfile.open(name=options.out_file, mode="w:gz") - for curr, _, files in os.walk(os.path.realpath(options.directory)): + # archive = tarfile.open(name=options.out_file, mode="w:gz") + archive = tarfile.open(name=out_file, mode="w:gz") + # for curr, _, files in os.walk(os.path.realpath(options.directory)): + for curr, _, files in os.walk(os.path.realpath(directory)): for data_file in files: - full_path = os.path.join(options.directory, curr, data_file) + # full_path = os.path.join(options.directory, curr, data_file) + full_path = os.path.join(directory, curr, data_file) archive_path = os.path.join( - options.dump_name, - os.path.relpath(full_path, options.directory), + # options.dump_name, + dump_name + # os.path.relpath(full_path, options.directory), + os.path.relpath(full_path, directory), ) archive.add(full_path, arcname=archive_path) os.unlink(full_path) @@ -236,25 +261,32 @@ def cmd_dump(): # -- - if not options.quiet: + # if not options.quiet: + if not quiet: print( "Done (%.2f seconds): %s" % ( time.time() - start_time, - options.out_file.name - if hasattr(options.out_file, "name") - else options.out_file, + # options.out_file.name + out_file.name + # if hasattr(options.out_file, "name") + if hasattr(out_file, "name") + # else options.out_file, + else out_file, ) ) except KeyboardInterrupt: time.sleep(0.2) raise RuntimeError("Interrupted") finally: - if os.path.exists(options.directory): - shutil.rmtree(options.directory) + # if os.path.exists(options.directory): + # shutil.rmtree(options.directory) + if os.path.exists(directory): + shutil.rmtree(directory) except Exception as ex: - if options.debug: + # if options.debug: + if debug: traceback.print_exc() print(ex, file=sys.stderr) return 1 From 753bb1f412ffd9130d28e0c2c7ccf680079e55de Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:32:54 +0200 Subject: [PATCH 13/29] :construction: Remove the click library reference --- rethinkdb/cli/_export.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/rethinkdb/cli/_export.py b/rethinkdb/cli/_export.py index 481946b..d8c54ad 100644 --- a/rethinkdb/cli/_export.py +++ b/rethinkdb/cli/_export.py @@ -20,7 +20,6 @@ """ Export exports data from a RethinkDB cluster into a directory. """ -import click import csv import ctypes import datetime @@ -647,14 +646,7 @@ def run(options): f"Failed to move temporary directory to output directory ({options.directory}): {e.strerror}" ) -@click.command -def cmd_export(): - """ - Export data from a RethinkDB cluster into a directory. - """ - click.echo("export command") - argv = [] - prog = [] +def main(argv=None, prog=None): options = parse_options(argv or sys.argv[1:], prog=prog) start_time = time.time() @@ -666,5 +658,9 @@ def cmd_export(): print(ex, file=sys.stderr) return 1 if not options.quiet: - print(f" Done ({time.time() - start_time}:.2f seconds)") + print(" Done (%.2f seconds)" % (time.time() - start_time)) return 0 + + +if __name__ == "__main__": + sys.exit(main()) From a50f8221fe431d78663f9d4e94aaa7156973be6c Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:34:25 +0200 Subject: [PATCH 14/29] :construction: Remove the click command library references --- rethinkdb/cli/_dump.py | 103 +++++++++++++---------------------------- 1 file changed, 33 insertions(+), 70 deletions(-) diff --git a/rethinkdb/cli/_dump.py b/rethinkdb/cli/_dump.py index 0d935ea..b2af56b 100644 --- a/rethinkdb/cli/_dump.py +++ b/rethinkdb/cli/_dump.py @@ -20,8 +20,6 @@ """ Dump creates an archive of data from a RethinkDB cluster. """ -import click - import datetime import os import platform @@ -165,25 +163,10 @@ def parse_options(argv, prog=None): return options -@click.command -@click.option("--directory", default="", help="The target directory of the dump") -@click.option("--fields", default=None, help="The fields to be dumped") -@click.option("--delimiter", default=None, help="The delimiter of the exported data") -@click.option("--format", default="json", help="The format of the export data") -@click.option("--quiet", default=True, help="Verbose dump") -@click.opton("--debug", default=False, help="Whether to debug or not") -@click.option("--out_file", help="The output file name") -def cmd_dump(directory, fields, delimiter, format, quiet, debug, out_file): - """ - Dump creates an archive of data from a RethinkDB cluster. - """ - click.echo("dump command") - # argv = None - # prog = None - - # options = parse_options(argv or sys.argv[1:], prog=prog) +def main(argv=None, prog=None): + options = parse_options(argv or sys.argv[1:], prog=prog) try: - if not quiet: + if not options.quiet: # Print a warning about the capabilities of dump, so no one is confused (hopefully) print( """\ @@ -198,60 +181,42 @@ def cmd_dump(directory, fields, delimiter, format, quiet, debug, out_file): # -- _export options - need to be kep in-sync with _export - # options.directory = os.path.realpath(tempfile.mkdtemp(dir=options.temp_dir)) - # options.fields = None - # options.delimiter = None - # options.format = "json" + options.directory = os.path.realpath(tempfile.mkdtemp(dir=options.temp_dir)) + options.fields = None + options.delimiter = None + options.format = "json" # -- export to a directory - # if not options.quiet: - # print(" Exporting to temporary directory...") - if not quiet: + if not options.quiet: print(" Exporting to temporary directory...") try: - # cmd_export.run(options) - cmd_export.run({ - "directory": directory, - "quiet": quiet, - "fields": fields, - "delimter": delimiter, - "format": format - }) + _export.run(options) except Exception as exc: - # default_logger.exception(exc) + default_logger.exception(exc) - # if options.debug: - if debug: - sys.stderr.write(f"\n{traceback.format_exc()}\n") + if options.debug: + sys.stderr.write("\n%s\n" % traceback.format_exc()) - raise Exception(f"Error: export failed, {exc}") + raise Exception("Error: export failed, %s" % exc) # -- zip directory - # if not options.quiet: - if not quiet: + if not options.quiet: print(" Zipping export directory...") try: - # if hasattr(options.out_file, "read"): - if hasattr(out_file, "read") - # archive = tarfile.open(fileobj=options.out_file, mode="w:gz") - archive = tarfile.open(fileobj=out_file, mode="w:gz") + if hasattr(options.out_file, "read"): + archive = tarfile.open(fileobj=options.out_file, mode="w:gz") else: - # archive = tarfile.open(name=options.out_file, mode="w:gz") - archive = tarfile.open(name=out_file, mode="w:gz") - # for curr, _, files in os.walk(os.path.realpath(options.directory)): - for curr, _, files in os.walk(os.path.realpath(directory)): + archive = tarfile.open(name=options.out_file, mode="w:gz") + for curr, _, files in os.walk(os.path.realpath(options.directory)): for data_file in files: - # full_path = os.path.join(options.directory, curr, data_file) - full_path = os.path.join(directory, curr, data_file) + full_path = os.path.join(options.directory, curr, data_file) archive_path = os.path.join( - # options.dump_name, - dump_name - # os.path.relpath(full_path, options.directory), - os.path.relpath(full_path, directory), + options.dump_name, + os.path.relpath(full_path, options.directory), ) archive.add(full_path, arcname=archive_path) os.unlink(full_path) @@ -261,32 +226,30 @@ def cmd_dump(directory, fields, delimiter, format, quiet, debug, out_file): # -- - # if not options.quiet: - if not quiet: + if not options.quiet: print( "Done (%.2f seconds): %s" % ( time.time() - start_time, - # options.out_file.name - out_file.name - # if hasattr(options.out_file, "name") - if hasattr(out_file, "name") - # else options.out_file, - else out_file, + options.out_file.name + if hasattr(options.out_file, "name") + else options.out_file, ) ) except KeyboardInterrupt: time.sleep(0.2) raise RuntimeError("Interrupted") finally: - # if os.path.exists(options.directory): - # shutil.rmtree(options.directory) - if os.path.exists(directory): - shutil.rmtree(directory) + if os.path.exists(options.directory): + shutil.rmtree(options.directory) except Exception as ex: - # if options.debug: - if debug: + if options.debug: traceback.print_exc() print(ex, file=sys.stderr) return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 96ccdd442980ae7e574775bfd955e3e1efe78f45 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:35:39 +0200 Subject: [PATCH 15/29] :construction: Remove the click library --- rethinkdb/cli/_import.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 37f0cd2..75c9c2b 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -20,8 +20,6 @@ """ Import loads data into a RethinkDB cluster. """ - -import click import codecs import collections import csv @@ -1718,14 +1716,7 @@ def parse_info_file(path): return sources -@click.command -def cmd_import(): - """ - Import loads data into a RethinkDB cluster. - """ - click.echo("import command") - argv = [] - prog = [] +def main(argv=None, prog=None): start_time = time.time() if argv is None: @@ -1741,5 +1732,9 @@ def cmd_import(): return 2 return 1 if not options.quiet: - print(f" Done ({time.time() - start_time} seconds)") + print(" Done (%d seconds)" % (time.time() - start_time)) return 0 + + +if __name__ == "__main__": + sys.exit(main()) From babc139e1c46c9c150854766025cfbbd9540a57d Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:36:31 +0200 Subject: [PATCH 16/29] :construction: Remove click library --- rethinkdb/cli/_index_rebuild.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/rethinkdb/cli/_index_rebuild.py b/rethinkdb/cli/_index_rebuild.py index d90e6a8..412d3f8 100644 --- a/rethinkdb/cli/_index_rebuild.py +++ b/rethinkdb/cli/_index_rebuild.py @@ -23,8 +23,6 @@ This should be used after upgrading to a newer version of rethinkdb. There will be a notification in the web UI if any secondary indexes are out-of-date. """ - -import click import sys import time import traceback @@ -257,17 +255,9 @@ def rebuild_indexes(options): print("") -@click.command -def cmd_index_rebuild(): - """ - Rebuild outdated secondary indexes. - """ - click.echo("index rebuild command") - argv = [] - prog = [] - start_time = time.time() +def main(argv=None, prog=None): options = parse_options(argv or sys.argv[1:], prog=prog) - + start_time = time.time() try: rebuild_indexes(options) except Exception as ex: @@ -277,5 +267,9 @@ def cmd_index_rebuild(): print(ex, file=sys.stderr) return 1 if not options.quiet: - print(f"Done ({time.time() - start_time} seconds)") + print("Done (%d seconds)" % (time.time() - start_time)) return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 0ae82d1e368c6fdf13cc25ab05e8e6fd988b8026 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:37:16 +0200 Subject: [PATCH 17/29] :construction: Remove click command --- rethinkdb/cli/_restore.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/rethinkdb/cli/_restore.py b/rethinkdb/cli/_restore.py index fe15f60..0cd99e7 100644 --- a/rethinkdb/cli/_restore.py +++ b/rethinkdb/cli/_restore.py @@ -20,8 +20,6 @@ """ Restore loads data into a RethinkDB cluster from an archive. """ - -import click import copy import multiprocessing import optparse @@ -325,16 +323,7 @@ def do_restore(options): shutil.rmtree(temp_dir) -@click.command -def cmd_restore(): - """ - Restore loads data into a RethinkDB cluster from an archive. - """ - click.echo("restore command") - argv = [] - prog = [] - start_time = time.time() - +def main(argv=None, prog=None): if argv is None: argv = sys.argv[1:] options = parse_options(argv, prog=prog) @@ -344,5 +333,8 @@ def cmd_restore(): except RuntimeError as ex: print(ex, file=sys.stderr) return 1 - print(f"Restore done in {time.time() - start_time}") return 0 + + +if __name__ == "__main__": + exit(main()) From b29254bbc66c44c63e748784ff471c29bb963b81 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:40:44 +0200 Subject: [PATCH 18/29] :bug: Fix tabs to spaces --- rethinkdb/cli/_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rethinkdb/cli/_export.py b/rethinkdb/cli/_export.py index d8c54ad..c211521 100644 --- a/rethinkdb/cli/_export.py +++ b/rethinkdb/cli/_export.py @@ -392,7 +392,7 @@ def export_table( cursor.close() except errors.ReqlError as exc: # default_logger.exception(exc) - pass + pass cursor = options.retryQuery( f"backup cursor for {db}.{table}", From 5e9b422e646394a898d0edc3131d0763b6e73fc3 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:41:48 +0200 Subject: [PATCH 19/29] :bug: Change tabs into spaces From cab859ebb0c0e40e3a113eb9424e4bf576b939ba Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:43:59 +0200 Subject: [PATCH 20/29] :bug: Fix indentation From b37603b920c47361e60cf2ea2ad226a38a755b5d Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:45:37 +0200 Subject: [PATCH 21/29] :bug: Remove click library --- rethinkdb/cli/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rethinkdb/cli/__init__.py b/rethinkdb/cli/__init__.py index 7227375..b45510a 100644 --- a/rethinkdb/cli/__init__.py +++ b/rethinkdb/cli/__init__.py @@ -19,10 +19,11 @@ Although these commands can be used to prepare data for backup, depending on your needs, you may want to evaluate more mature backup backup solutions. """ - +""" from rethinkdb.cli._dump import cmd_dump from rethinkdb.cli._export import cmd_export from rethinkdb.cli._import import cmd_import from rethinkdb.cli._index_rebuild import cmd_index_rebuild from rethinkdb.cli._repl import cmd_repl from rethinkdb.cli._restore import cmd_restore +""" From 2a3f9c22b930ea43a82493f5f78ff76fec18d1a0 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:49:50 +0200 Subject: [PATCH 22/29] :bug: Remove click library --- rethinkdb/cli/main.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rethinkdb/cli/main.py b/rethinkdb/cli/main.py index 5ea90d5..77e8491 100644 --- a/rethinkdb/cli/main.py +++ b/rethinkdb/cli/main.py @@ -29,7 +29,7 @@ - repl - restore """ - +""" import click from rethinkdb.cli import ( @@ -44,9 +44,9 @@ @click.group def cmd_main(): - """ - Group of commands for the RethinkDB database. - """ + +Group of commands for the RethinkDB database. + cmd_main.add_command(cmd_dump, "dump") @@ -55,3 +55,4 @@ def cmd_main(): cmd_main.add_command(cmd_index_rebuild, "index_rebuild") cmd_main.add_command(cmd_repl, "repl") cmd_main.add_command(cmd_restore, "restore") +""" From a5e7ef9155f68838444729aa9b45fb31da80fb2a Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:53:50 +0200 Subject: [PATCH 23/29] :bug: Remove click library --- rethinkdb/cli/_dump.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rethinkdb/cli/_dump.py b/rethinkdb/cli/_dump.py index b2af56b..6e95430 100644 --- a/rethinkdb/cli/_dump.py +++ b/rethinkdb/cli/_dump.py @@ -30,7 +30,7 @@ import time import traceback -from rethinkdb import cmd_export, utils_common +from rethinkdb import export, utils_common # from rethinkdb.logger import default_logger From 9705f56787bad6988c6dc858705c3580fc90dcd8 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:55:59 +0200 Subject: [PATCH 24/29] :bug: Update old references to _export.py that caused crash --- rethinkdb/cli/_dump.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rethinkdb/cli/_dump.py b/rethinkdb/cli/_dump.py index 6e95430..30bc6d3 100644 --- a/rethinkdb/cli/_dump.py +++ b/rethinkdb/cli/_dump.py @@ -30,7 +30,8 @@ import time import traceback -from rethinkdb import export, utils_common +from rethinkdb import utils_common +from rethinkdb.cli import _export # from rethinkdb.logger import default_logger From cc3b6549cdcccc10f3b0df481603a7fa14607a83 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:56:46 +0200 Subject: [PATCH 25/29] :bug: Remove click library --- rethinkdb/cli/_repl.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rethinkdb/cli/_repl.py b/rethinkdb/cli/_repl.py index 076a253..922ba0e 100644 --- a/rethinkdb/cli/_repl.py +++ b/rethinkdb/cli/_repl.py @@ -29,13 +29,14 @@ - repl - restore """ - +""" import click @click.command def cmd_repl(): """ - Rebuild outdated secondary indexes. + #Rebuild outdated secondary indexes. """ click.echo("repl command") +""" From aff0afad32a158c605209b948e51b6b9eb9a2579 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 18:57:33 +0200 Subject: [PATCH 26/29] :bug: Remove click library --- rethinkdb/cli/_restore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rethinkdb/cli/_restore.py b/rethinkdb/cli/_restore.py index 0cd99e7..8e4de42 100644 --- a/rethinkdb/cli/_restore.py +++ b/rethinkdb/cli/_restore.py @@ -31,7 +31,8 @@ import time import traceback -from rethinkdb import _import, utils_common +from rethinkdb import utils_common +from rethinkdb.cli import _import usage = ( "rethinkdb restore FILE [-c HOST:PORT] [--tls-cert FILENAME] [-p] [--password-file FILENAME] [--clients NUM] " From 2fba96787d71e4f9915c32e640cedf60a7f0e83c Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Sun, 17 Sep 2023 19:00:51 +0200 Subject: [PATCH 27/29] :bug: Fix indentation --- rethinkdb/cli/_repl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rethinkdb/cli/_repl.py b/rethinkdb/cli/_repl.py index 922ba0e..a92950a 100644 --- a/rethinkdb/cli/_repl.py +++ b/rethinkdb/cli/_repl.py @@ -35,8 +35,8 @@ @click.command def cmd_repl(): - """ + #Rebuild outdated secondary indexes. - """ + click.echo("repl command") """ From 847ebde40975bb98b970a2db1770630574e2bf39 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sun, 17 Sep 2023 22:58:48 +0200 Subject: [PATCH 28/29] Port utils_common.py from old driver version --- rethinkdb/cli/_dump.py | 3 +- rethinkdb/cli/_export.py | 3 +- rethinkdb/cli/_import.py | 6 +- rethinkdb/cli/_index_rebuild.py | 3 +- rethinkdb/cli/_restore.py | 3 +- rethinkdb/cli/utils_common.py | 474 ++++++++++++++++++++++++++++++++ 6 files changed, 484 insertions(+), 8 deletions(-) create mode 100644 rethinkdb/cli/utils_common.py diff --git a/rethinkdb/cli/_dump.py b/rethinkdb/cli/_dump.py index 30bc6d3..763d774 100644 --- a/rethinkdb/cli/_dump.py +++ b/rethinkdb/cli/_dump.py @@ -30,8 +30,7 @@ import time import traceback -from rethinkdb import utils_common -from rethinkdb.cli import _export +from rethinkdb.cli import utils_common, _export # from rethinkdb.logger import default_logger diff --git a/rethinkdb/cli/_export.py b/rethinkdb/cli/_export.py index c211521..c7665f9 100644 --- a/rethinkdb/cli/_export.py +++ b/rethinkdb/cli/_export.py @@ -37,7 +37,8 @@ from multiprocessing.queues import SimpleQueue -from rethinkdb import errors, query, utils_common +from rethinkdb import errors, query +from rethinkdb.cli import utils_common # from rethinkdb.logger import default_logger # STATIC INFORMATION ABOUT THE EXOIRT FEATURE diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 75c9c2b..01e70a1 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -32,11 +32,13 @@ import sys import time import traceback -from multiprocessing.queues import Queue, SimpleQueue +# Unused. Delete if all tests pass even when commented +# from multiprocessing.queues import Queue, SimpleQueue import six -from rethinkdb import ast, errors, query, utils_common +from rethinkdb import ast, errors, query +from rethinkdb.cli import utils_common # from rethinkdb.logger import default_logger try: diff --git a/rethinkdb/cli/_index_rebuild.py b/rethinkdb/cli/_index_rebuild.py index 412d3f8..fc15657 100644 --- a/rethinkdb/cli/_index_rebuild.py +++ b/rethinkdb/cli/_index_rebuild.py @@ -27,7 +27,8 @@ import time import traceback -from rethinkdb import query, utils_common +from rethinkdb import query +from rethinkdb.cli import utils_common usage = ( "rethinkdb index-rebuild [-c HOST:PORT] [-n NUM] " diff --git a/rethinkdb/cli/_restore.py b/rethinkdb/cli/_restore.py index 8e4de42..41cbaf2 100644 --- a/rethinkdb/cli/_restore.py +++ b/rethinkdb/cli/_restore.py @@ -31,8 +31,7 @@ import time import traceback -from rethinkdb import utils_common -from rethinkdb.cli import _import +from rethinkdb import utils_common, _import usage = ( "rethinkdb restore FILE [-c HOST:PORT] [--tls-cert FILENAME] [-p] [--password-file FILENAME] [--clients NUM] " diff --git a/rethinkdb/cli/utils_common.py b/rethinkdb/cli/utils_common.py new file mode 100644 index 0000000..291c5de --- /dev/null +++ b/rethinkdb/cli/utils_common.py @@ -0,0 +1,474 @@ +# Copyright 2023 RethinkDB +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This file incorporates work covered by the following copyright: +# Copyright 2010-2016 RethinkDB, all rights reserved. + +from __future__ import print_function + +import collections +import copy +import distutils.version +import getpass +import inspect +import optparse +import os +import re +import sys +import threading + +from rethinkdb import ast, errors, net, query, version + +default_batch_size = 200 + + +class RetryQuery(object): + + __connectOptions = None + __local = None + + def __init__(self, connect_options): + if "host" not in connect_options: + raise AssertionError("Hostname is a required connection parameter") + + if "port" not in connect_options: + raise AssertionError("Port number is a required connection parameter") + + connect_options["port"] = int(connect_options["port"]) + + if connect_options["port"] <= 0: + raise AssertionError("Port number can not be less than one") + + self.__connectOptions = copy.deepcopy(connect_options) + + self.__local = threading.local() + + def conn(self, test_connection=True): + if not hasattr(self.__local, "connCache"): + self.__local.connCache = {} + + # check if existing connection is still good + if os.getpid() in self.__local.connCache and test_connection: + try: + ast.expr(0).run(self.__local.connCache[os.getpid()]) + except errors.ReqlError: + del self.__local.connCache[os.getpid()] + + # cache a new connection + if not os.getpid() in self.__local.connCache: + self.__local.connCache[os.getpid()] = net.make_connection( + net.DefaultConnection, **self.__connectOptions + ) + + # return the connection + return self.__local.connCache[os.getpid()] + + def __call__( + self, name, query_str, times=5, run_options=None, test_connection=True + ): + # Try a query multiple times to guard against bad connections + if name is None: + raise AssertionError("Name can not be none") + + name = str(name) + + if not isinstance(query_str, ast.RqlQuery): + raise AssertionError( + f"Query must be a ReQL query instead of {format(value=query_str)}" + ) + + if not isinstance(times, int) or times < 1: + raise ValueError( + f"Times must be a positive integer instead of {format(value=times)}" + ) + + if run_options is None: + run_options = {} + + if not isinstance(run_options, dict): + raise ValueError( + f"Run option must be a dict instead of {format(value=run_options)}" + ) + + last_error = None + test_connection = False + + for _ in range(times): + try: + conn = self.conn( + test_connection=test_connection + ) # we are already guarding for this + except errors.ReqlError as e: + last_error = RuntimeError( + f"Error connecting for during '{name}': {str(e)}" + ) + test_connection = True + try: + return query_str.run(conn, **run_options) + except (errors.ReqlTimeoutError, errors.ReqlDriverError) as e: + last_error = RuntimeError( + f"Connection error during '{name}': {str(e)}" + ) + # other errors immediately bubble up + + if last_error is not None: + raise last_error + + +def print_progress(ratio, indent=0, read=None, write=None): + total_width = 40 + done_width = min(int(ratio * total_width), total_width) + sys.stdout.write( + f"\r{' ' * indent}[{'=' * done_width}" + f"{' ' * (total_width - done_width)}] " + f"{int(100 * ratio):.3f}{(f' r: {read}') if read is not None else ''}" + f"{(f' w: {write}') if write is not None else ''}\x1b[K" + ) + sys.stdout.flush() + + +def check_minimum_version(options, minimum_version="1.6", raise_exception=True): + minimum_version = distutils.version.LooseVersion(minimum_version) + version_string = options.retryQuery( + "get server version", + query.db("rethinkdb").table("server_status")[0]["process"]["version"], + ) + + matches = re.match( + r"(rethinkdb|rebirthdb) (?P(\d+)\.(\d+)\.(\d+)).*", version_string + ) + + if not matches: + raise RuntimeError(f"invalid version string format: {version_string}") + + if distutils.version.LooseVersion(matches.group("version")) < minimum_version: + if raise_exception: + raise RuntimeError( + f"Incompatible version, expected >= " + f"{minimum_version} got: {version_string}" + ) + return False + return True + + +DbTable = collections.namedtuple("DbTable", ["db", "table"]) +_tableNameRegex = re.compile(r"^(?P[\w-]+)(\.(?P[\w-]+))?$") + + +class CommonOptionsParser(optparse.OptionParser, object): + + __retryQuery = None + __connectRegex = re.compile(r"^\s*(?P[\w.-]+)(:(?P\d+))?\s*$") + + def format_epilog(self, formatter): + return self.epilog or "" + + def __init__(self, *args, **kwargs): + # -- Type Checkers + + def check_tls_option(_, opt_str, value): + value = str(value) + + if os.path.isfile(value): + return {"ca_certs": os.path.realpath(value)} + else: + raise optparse.OptionValueError( + f"Option {opt_str} value is not a file: {value}" + ) + + def check_db_table_option(_, _opt_str, value): + res = _tableNameRegex.match(value) + + if not res: + raise optparse.OptionValueError( + f"Invalid db or db.table name: {value}" + ) + if res.group("db") == "rethinkdb": + raise optparse.OptionValueError( + "The `rethinkdb` database is special and cannot be used here" + ) + + return DbTable(res.group("db"), res.group("table")) + + def check_positive_int(_, opt_str, value): + try: + value = int(value) + if value < 1: + raise ValueError + except ValueError: + raise optparse.OptionValueError( + f"{opt_str} value must be an integer greater than 1: {value}" + ) + + return value + + def check_existing_file(_, opt_str, value): + if not os.path.isfile(value): + raise optparse.OptionValueError( + f"{opt_str} value was not an existing file: {value}" + ) + + return os.path.realpath(value) + + def check_new_file_location(_, opt_str, value): + try: + real_value = os.path.realpath(value) + except Exception: + raise optparse.OptionValueError( + f"Incorrect value for {opt_str}: {value}" + ) + + if os.path.exists(real_value): + raise optparse.OptionValueError( + f"{opt_str} value already exists: {value}" + ) + + return real_value + + def file_contents(_, opt_str, value): + if not os.path.isfile(value): + raise optparse.OptionValueError( + f"{opt_str} value is not an existing file: {value}" + ) + + try: + with open(value, "r") as passwordFile: + return passwordFile.read().rstrip("\n") + except IOError: + raise optparse.OptionValueError( + f"bad value for {opt_str}: {value}" + ) + + # -- Callbacks + + def combined_connect_action(obj, opt, value, parser, *args, **kwargs): + """optparse.takeaction() calls the callback (which this is set as) + with the following args: self, opt, value, parser *args, **kwargs + """ + res = self.__connectRegex.match(value) + if not res: + raise optparse.OptionValueError( + f"Invalid 'host:port' format: {value}" + ) + if res.group("hostname"): + parser.values.hostname = res.group("hostname") + if res.group("port"): + parser.values.driver_port = int(res.group("port")) + + # -- setup custom Options object + + class CommonOptionChecker(optparse.Option, object): + TYPES = optparse.Option.TYPES + ( + "tls_cert", + "db_table", + "pos_int", + "file", + "new_file", + "file_contents", + ) + + TYPE_CHECKER = copy.copy(optparse.Option.TYPE_CHECKER) + TYPE_CHECKER["tls_cert"] = check_tls_option + TYPE_CHECKER["db_table"] = check_db_table_option + TYPE_CHECKER["pos_int"] = check_positive_int + TYPE_CHECKER["file"] = check_existing_file + TYPE_CHECKER["new_file"] = check_new_file_location + TYPE_CHECKER["file_contents"] = file_contents + + ACTIONS = optparse.Option.ACTIONS + ("add_key", "get_password") + STORE_ACTIONS = optparse.Option.STORE_ACTIONS + ("add_key", "get_password") + TYPED_ACTIONS = optparse.Option.TYPED_ACTIONS + ("add_key",) + ALWAYS_TYPED_ACTIONS = optparse.Option.ALWAYS_TYPED_ACTIONS + ("add_key",) + + def take_action(self, action, dest, opt, value, values, parser): + if dest is None: + raise AssertionError("Destination can not be none") + + if action == "add_key": + if self.metavar is None: + raise AssertionError("Metavar can not be none") + + values.ensure_value(dest, {})[self.metavar.lower()] = value + elif action == "get_password": + values.ensure_value( + "password", + getpass.getpass("Password for `admin`: ") + ) + else: + super(CommonOptionChecker, self).take_action( + action, dest, opt, value, values, parser + ) + + kwargs["option_class"] = CommonOptionChecker + + # - default description to the module's __doc__ + if "description" not in kwargs: + # get calling module + caller = inspect.getmodule(inspect.stack()[1][0]) + if caller.__doc__: + kwargs["description"] = caller.__doc__ + + # -- add version + + if "version" not in kwargs: + kwargs["version"] = "%%prog %s" % version.VERSION + + # -- call super + + super(CommonOptionsParser, self).__init__(*args, **kwargs) + + # -- add common options + + self.add_option( + "-q", + "--quiet", + dest="quiet", + default=False, + action="store_true", + help="suppress non-error messages", + ) + self.add_option( + "--debug", + dest="debug", + default=False, + action="store_true", + help=optparse.SUPPRESS_HELP, + ) + + connection_group = optparse.OptionGroup(self, "Connection options") + connection_group.add_option( + "-c", + "--connect", + dest="driver_port", + metavar="HOST:PORT", + help=( + f"host and client port of a rethinkdb node to connect " + f"(default: localhost:{net.DEFAULT_PORT})" + ), + action="callback", + callback=combined_connect_action, + type="str", + ) + connection_group.add_option( + "--driver-port", + dest="driver_port", + metavar="PORT", + help="driver port of a rethinkdb server", + type="int", + default=os.environ.get("RETHINKDB_DRIVER_PORT", net.DEFAULT_PORT), + ) + connection_group.add_option( + "--host-name", + dest="hostname", + metavar="HOST", + help="host and driver port of a rethinkdb server", + default=os.environ.get("RETHINKDB_HOSTNAME", "localhost"), + ) + connection_group.add_option( + "-u", + "--user", + dest="user", + metavar="USERNAME", + help="user name to connect as", + default=os.environ.get("RETHINKDB_USER", "admin"), + ) + connection_group.add_option( + "-p", + "--password", + dest="password", + help="interactively prompt for a password for the connection", + action="get_password", + ) + connection_group.add_option( + "--password-file", + dest="password", + metavar="PSWD_FILE", + help="read the connection password from a file", + type="file_contents", + ) + connection_group.add_option( + "--tls-cert", + dest="ssl", + metavar="TLS_CERT", + help="certificate file to use for TLS encryption.", + type="tls_cert", + ) + self.add_option_group(connection_group) + + def parse_args(self, *args, **kwargs): + # - validate options + + connect = True + if "connect" in kwargs: + connect = kwargs["connect"] + del kwargs["connect"] + + # - validate ENV variables + + if "RETHINKDB_DRIVER_PORT" in os.environ: + driver_port = os.environ["RETHINKDB_DRIVER_PORT"] + + if not isinstance(driver_port, int) or driver_port < 1: + self.error( + "ENV variable RETHINKDB_DRIVER_PORT is not a useable " + f"integer: {os.environ['RETHINKDB_DRIVER_PORT']}" + ) + + # - parse options + + options, args = super(CommonOptionsParser, self).parse_args(*args, **kwargs) + + # - setup a RetryQuery instance + + self.__retryQuery = RetryQuery( + connect_options={ + "host": options.hostname, + "port": options.driver_port, + "user": options.user, + "password": options.password, + "ssl": options.ssl, + } + ) + options.retryQuery = self.__retryQuery + + # - test connection + + if connect: + try: + options.retryQuery.conn() + except errors.ReqlError as e: + self.error(f"Unable to connect to server: {str(e)}") + + # - + + return options, args + + +_interrupt_seen = False + + +def abort(pools, exit_event): + global _interrupt_seen + + if _interrupt_seen: + # second time + print("\nSecond terminate signal seen, aborting ungracefully") + for pool in pools: + for worker in pool: + worker.terminate() + worker.join(0.1) + else: + print("\nTerminate signal seen, aborting") + _interrupt_seen = True + exit_event.set() From 6b23a75d55c8591a4c9e37279400a58a58ed7068 Mon Sep 17 00:00:00 2001 From: lsabi1234 Date: Sun, 17 Sep 2023 23:07:52 +0200 Subject: [PATCH 29/29] Fix missing references and packages from porting the old driver --- rethinkdb/cli/_import.py | 13 ++++++++----- rethinkdb/cli/_restore.py | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/rethinkdb/cli/_import.py b/rethinkdb/cli/_import.py index 01e70a1..143596d 100644 --- a/rethinkdb/cli/_import.py +++ b/rethinkdb/cli/_import.py @@ -32,14 +32,12 @@ import sys import time import traceback -# Unused. Delete if all tests pass even when commented -# from multiprocessing.queues import Queue, SimpleQueue -import six +from multiprocessing.queues import SimpleQueue + from rethinkdb import ast, errors, query from rethinkdb.cli import utils_common -# from rethinkdb.logger import default_logger try: unicode @@ -1263,6 +1261,11 @@ def import_tables(options, sources, files_ignored=None): tables = dict(((x.db, x.table), x) for x in sources) # (db, table) => table + ctx = multiprocessing.get_context(multiprocessing.get_start_method()) + error_queue = SimpleQueue(ctx=ctx) + warning_queue = SimpleQueue(ctx=ctx) + timing_queue = SimpleQueue(ctx=ctx) + """ if six.PY3: ctx = multiprocessing.get_context(multiprocessing.get_start_method()) error_queue = SimpleQueue(ctx=ctx) @@ -1272,7 +1275,7 @@ def import_tables(options, sources, files_ignored=None): error_queue = SimpleQueue() warning_queue = SimpleQueue() timing_queue = SimpleQueue() - + """ max_queue_size = options.clients * 3 work_queue = multiprocessing.Manager().Queue(max_queue_size) diff --git a/rethinkdb/cli/_restore.py b/rethinkdb/cli/_restore.py index 41cbaf2..fcd8855 100644 --- a/rethinkdb/cli/_restore.py +++ b/rethinkdb/cli/_restore.py @@ -31,7 +31,7 @@ import time import traceback -from rethinkdb import utils_common, _import +from rethinkdb.cli import utils_common, _import usage = ( "rethinkdb restore FILE [-c HOST:PORT] [--tls-cert FILENAME] [-p] [--password-file FILENAME] [--clients NUM] "