Abort cqlsh copy-from in case of no answer after prolonged period of time

Patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-12740


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/72c9eb2d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/72c9eb2d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/72c9eb2d

Branch: refs/heads/trunk
Commit: 72c9eb2dc6732a1f20e769ae162f02e5766f397f
Parents: 8455286
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Mon Oct 3 08:51:16 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Tue Oct 11 11:25:03 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                |  1 +
 pylib/cqlshlib/copyutil.py | 50 +++++++++++++++++++++++++++++++++--------
 2 files changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/72c9eb2d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a517995..9f7fff8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.10
+ * Abort cqlsh copy-from in case of no answer after prolonged period of time 
(CASSANDRA-12740)
  * Avoid sstable corrupt exception due to dropped static column 
(CASSANDRA-12582)
  * Make stress use client mode to avoid checking commit log size on startup 
(CASSANDRA-12478)
  * Fix exceptions with new vnode allocation (CASSANDRA-12715)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72c9eb2d/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index 23e739d..d2084d7 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -43,6 +43,7 @@ from select import select
 from uuid import UUID
 from util import profile_on, profile_off
 
+from cassandra import OperationTimedOut
 from cassandra.cluster import Cluster, DefaultConnection
 from cassandra.cqltypes import ReversedType, UserType
 from cassandra.metadata import protect_name, protect_names, protect_value
@@ -362,6 +363,11 @@ class CopyTask(object):
         copy_options['maxinflightmessages'] = 
int(opts.pop('maxinflightmessages', '512'))
         copy_options['maxbackoffattempts'] = 
int(opts.pop('maxbackoffattempts', '12'))
         copy_options['maxpendingchunks'] = int(opts.pop('maxpendingchunks', 
'24'))
+        # set requesttimeout to a value high enough so that maxbatchsize rows 
will never timeout if the server
+        # responds: here we set it to 1 sec per 10 rows but no less than 60 
seconds
+        copy_options['requesttimeout'] = int(opts.pop('requesttimeout', 
max(60, 1 * copy_options['maxbatchsize'] / 10)))
+        # set childtimeout higher than requesttimeout so that child processes 
have a chance to report request timeouts
+        copy_options['childtimeout'] = int(opts.pop('childtimeout', 
copy_options['requesttimeout'] + 30))
 
         self.check_options(copy_options)
         return CopyOptions(copy=copy_options, dialect=dialect_options, 
unrecognized=opts)
@@ -1186,9 +1192,21 @@ class ImportTask(CopyTask):
         if not self.fname:
             self.send_stdin_rows()
 
+        child_timeout = self.options.copy['childtimeout']
+        last_recv_num_records = 0
+        last_recv_time = time.time()
+
         while self.feeding_result is None or self.receive_meter.total_records 
< self.feeding_result.sent:
             self.receive_results()
 
+            if self.feeding_result is not None:
+                if self.receive_meter.total_records != last_recv_num_records:
+                    last_recv_num_records = self.receive_meter.total_records
+                    last_recv_time = time.time()
+                elif (time.time() - last_recv_time) > child_timeout:
+                    self.shell.printerr("No records inserted in {} seconds, 
aborting".format(child_timeout))
+                    break
+
             if self.error_handler.max_exceeded() or not 
self.all_processes_running():
                 break
 
@@ -2197,6 +2215,7 @@ class ImportProcess(ChildProcess):
         self.use_prepared_statements = options.copy['preparedstatements']
         self.max_inflight_messages = options.copy['maxinflightmessages']
         self.max_backoff_attempts = options.copy['maxbackoffattempts']
+        self.request_timeout = options.copy['requesttimeout']
 
         self.dialect_options = options.dialect
         self._session = None
@@ -2223,7 +2242,7 @@ class ImportProcess(ChildProcess):
                 connection_class=ConnectionWrapper)
 
             self._session = cluster.connect(self.ks)
-            self._session.default_timeout = None
+            self._session.default_timeout = self.request_timeout
         return self._session
 
     def run(self):
@@ -2306,6 +2325,10 @@ class ImportProcess(ChildProcess):
                         future = session.execute_async(statement)
                         future.add_callbacks(callback=result_callback, 
callback_args=(batch, chunk),
                                              errback=err_callback, 
errback_args=(batch, chunk, replicas))
+                    # do not handle else case, if a statement could not be 
created, the exception is handled
+                    # in self.wrap_make_statement and the error is reported, 
if a failure is injected that
+                    # causes the statement to be None, then we should not 
report the error so that we can test
+                    # the parent process handling missing batches from child 
processes
 
             except Exception, exc:
                 self.report_error(exc, chunk, chunk['rows'])
@@ -2320,8 +2343,8 @@ class ImportProcess(ChildProcess):
                 return None
 
         def make_statement_with_failures(query, conv, chunk, batch, replicas):
-            failed_batch = self.maybe_inject_failures(batch)
-            if failed_batch:
+            failed_batch, apply_failure = self.maybe_inject_failures(batch)
+            if apply_failure:
                 return failed_batch
             return make_statement(query, conv, chunk, batch, replicas)
 
@@ -2411,10 +2434,12 @@ class ImportProcess(ChildProcess):
 
     def maybe_inject_failures(self, batch):
         """
-        Examine self.test_failures and see if token_range is either a token 
range
-        supposed to cause a failure (failing_range) or to terminate the worker 
process
-        (exit_range). If not then call prepare_export_query(), which 
implements the
-        normal behavior.
+        Examine self.test_failures and see if the batch is a batch
+        supposed to cause a failure (failing_batch), or to terminate the 
worker process
+        (exit_batch), or not to be sent (unsent_batch).
+
+        @return any statement that will cause a failure or None if the 
statement should not be sent
+        plus a boolean indicating if a failure should be applied at all
         """
         if 'failing_batch' in self.test_failures:
             failing_batch = self.test_failures['failing_batch']
@@ -2422,14 +2447,19 @@ class ImportProcess(ChildProcess):
                 if batch['attempts'] < failing_batch['failures']:
                     statement = SimpleStatement("INSERT INTO badtable (a, b) 
VALUES (1, 2)",
                                                 
consistency_level=self.consistency_level)
-                    return statement
+                    return statement, True  # use this statement, which will 
cause an error
 
         if 'exit_batch' in self.test_failures:
             exit_batch = self.test_failures['exit_batch']
             if exit_batch['id'] == batch['id']:
                 sys.exit(1)
 
-        return None  # carry on as normal
+        if 'unsent_batch' in self.test_failures:
+            unsent_batch = self.test_failures['unsent_batch']
+            if unsent_batch['id'] == batch['id']:
+                return None, True  # do not send this batch, which will cause 
missing acks in the parent process
+
+        return None, False  # carry on as normal, do not apply any failures
 
     @staticmethod
     def make_batch(batch_id, rows, attempts=1):
@@ -2490,6 +2520,8 @@ class ImportProcess(ChildProcess):
         self.update_chunk(batch['rows'], chunk)
 
     def err_callback(self, response, batch, chunk, replicas):
+        if isinstance(response, OperationTimedOut) and chunk['imported'] == 
chunk['num_rows_sent']:
+            return  # occasionally the driver sends false timeouts for rows 
already processed (PYTHON-652)
         err_is_final = batch['attempts'] >= self.max_attempts
         self.report_error(response, chunk, batch['rows'], batch['attempts'], 
err_is_final)
         if not err_is_final:

Reply via email to