Abort cqlsh copy-from in case of no answer after prolonged period of time Patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-12740
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/72c9eb2d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/72c9eb2d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/72c9eb2d Branch: refs/heads/cassandra-3.X Commit: 72c9eb2dc6732a1f20e769ae162f02e5766f397f Parents: 8455286 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Mon Oct 3 08:51:16 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Tue Oct 11 11:25:03 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + pylib/cqlshlib/copyutil.py | 50 +++++++++++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/72c9eb2d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a517995..9f7fff8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.10 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740) * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582) * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478) * Fix exceptions with new vnode allocation (CASSANDRA-12715) http://git-wip-us.apache.org/repos/asf/cassandra/blob/72c9eb2d/pylib/cqlshlib/copyutil.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py index 23e739d..d2084d7 100644 --- a/pylib/cqlshlib/copyutil.py +++ b/pylib/cqlshlib/copyutil.py @@ -43,6 +43,7 @@ from select import select from uuid import UUID from util import profile_on, profile_off +from cassandra import OperationTimedOut from cassandra.cluster import Cluster, DefaultConnection from cassandra.cqltypes import ReversedType, UserType from cassandra.metadata import protect_name, protect_names, protect_value @@ -362,6 +363,11 @@ class CopyTask(object): copy_options['maxinflightmessages'] = int(opts.pop('maxinflightmessages', '512')) copy_options['maxbackoffattempts'] = int(opts.pop('maxbackoffattempts', '12')) copy_options['maxpendingchunks'] = int(opts.pop('maxpendingchunks', '24')) + # set requesttimeout to a value high enough so that maxbatchsize rows will never timeout if the server + # responds: here we set it to 1 sec per 10 rows but no less than 60 seconds + copy_options['requesttimeout'] = int(opts.pop('requesttimeout', max(60, 1 * copy_options['maxbatchsize'] / 10))) + # set childtimeout higher than requesttimeout so that child processes have a chance to report request timeouts + copy_options['childtimeout'] = int(opts.pop('childtimeout', copy_options['requesttimeout'] + 30)) self.check_options(copy_options) return CopyOptions(copy=copy_options, dialect=dialect_options, unrecognized=opts) @@ -1186,9 +1192,21 @@ class ImportTask(CopyTask): if not self.fname: self.send_stdin_rows() + child_timeout = self.options.copy['childtimeout'] + last_recv_num_records = 0 + last_recv_time = time.time() + while self.feeding_result is None or self.receive_meter.total_records < self.feeding_result.sent: self.receive_results() + if self.feeding_result is not None: + if self.receive_meter.total_records != last_recv_num_records: + last_recv_num_records = self.receive_meter.total_records + last_recv_time = time.time() + elif (time.time() - last_recv_time) > child_timeout: + self.shell.printerr("No records inserted in {} seconds, aborting".format(child_timeout)) + break + if self.error_handler.max_exceeded() or not self.all_processes_running(): break @@ -2197,6 +2215,7 @@ class ImportProcess(ChildProcess): self.use_prepared_statements = options.copy['preparedstatements'] self.max_inflight_messages = options.copy['maxinflightmessages'] self.max_backoff_attempts = options.copy['maxbackoffattempts'] + self.request_timeout = options.copy['requesttimeout'] self.dialect_options = options.dialect self._session = None @@ -2223,7 +2242,7 @@ class ImportProcess(ChildProcess): connection_class=ConnectionWrapper) self._session = cluster.connect(self.ks) - self._session.default_timeout = None + self._session.default_timeout = self.request_timeout return self._session def run(self): @@ -2306,6 +2325,10 @@ class ImportProcess(ChildProcess): future = session.execute_async(statement) future.add_callbacks(callback=result_callback, callback_args=(batch, chunk), errback=err_callback, errback_args=(batch, chunk, replicas)) + # do not handle else case, if a statement could not be created, the exception is handled + # in self.wrap_make_statement and the error is reported, if a failure is injected that + # causes the statement to be None, then we should not report the error so that we can test + # the parent process handling missing batches from child processes except Exception, exc: self.report_error(exc, chunk, chunk['rows']) @@ -2320,8 +2343,8 @@ class ImportProcess(ChildProcess): return None def make_statement_with_failures(query, conv, chunk, batch, replicas): - failed_batch = self.maybe_inject_failures(batch) - if failed_batch: + failed_batch, apply_failure = self.maybe_inject_failures(batch) + if apply_failure: return failed_batch return make_statement(query, conv, chunk, batch, replicas) @@ -2411,10 +2434,12 @@ class ImportProcess(ChildProcess): def maybe_inject_failures(self, batch): """ - Examine self.test_failures and see if token_range is either a token range - supposed to cause a failure (failing_range) or to terminate the worker process - (exit_range). If not then call prepare_export_query(), which implements the - normal behavior. + Examine self.test_failures and see if the batch is a batch + supposed to cause a failure (failing_batch), or to terminate the worker process + (exit_batch), or not to be sent (unsent_batch). + + @return any statement that will cause a failure or None if the statement should not be sent + plus a boolean indicating if a failure should be applied at all """ if 'failing_batch' in self.test_failures: failing_batch = self.test_failures['failing_batch'] @@ -2422,14 +2447,19 @@ class ImportProcess(ChildProcess): if batch['attempts'] < failing_batch['failures']: statement = SimpleStatement("INSERT INTO badtable (a, b) VALUES (1, 2)", consistency_level=self.consistency_level) - return statement + return statement, True # use this statement, which will cause an error if 'exit_batch' in self.test_failures: exit_batch = self.test_failures['exit_batch'] if exit_batch['id'] == batch['id']: sys.exit(1) - return None # carry on as normal + if 'unsent_batch' in self.test_failures: + unsent_batch = self.test_failures['unsent_batch'] + if unsent_batch['id'] == batch['id']: + return None, True # do not send this batch, which will cause missing acks in the parent process + + return None, False # carry on as normal, do not apply any failures @staticmethod def make_batch(batch_id, rows, attempts=1): @@ -2490,6 +2520,8 @@ class ImportProcess(ChildProcess): self.update_chunk(batch['rows'], chunk) def err_callback(self, response, batch, chunk, replicas): + if isinstance(response, OperationTimedOut) and chunk['imported'] == chunk['num_rows_sent']: + return # occasionally the driver sends false timeouts for rows already processed (PYTHON-652) err_is_final = batch['attempts'] >= self.max_attempts self.report_error(response, chunk, batch['rows'], batch['attempts'], err_is_final) if not err_is_final: