Joal has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/282338

Change subject: Correct CqlRecordWriter in cassandra module
......................................................................

Correct CqlRecordWriter in cassandra module

CqlRecordWriter was endlessly looping (generating mapreduce timeout) when
small-enough data was to be inserted into cassandra. The multithreading
approach in conjunction to the small data was leading to an Interrupt
signal being sent to the Cassandra Driver during connection initialisation,
leading to reconnection failure. Approach taken is to regularly check for
end-of-job condition instead of blocking and waiting for being Interrupted.
If not perfectly clean, this method at least prevent the observed failure.

Change-Id: If851c3e97f5c5ff083fb9775c8665ff35355f2d1
---
M 
refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
1 file changed, 66 insertions(+), 30 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/38/282338/1

diff --git 
a/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
 
b/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
index b53a417..c06e704 100644
--- 
a/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
+++ 
b/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
@@ -90,6 +90,7 @@
 
         CqlRecordWriter(Configuration conf)
         {
+            logger.debug("Constructing new MultiThreadCqlRecordWriter");
             this.conf = conf;
             this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 
32 * FBUtilities.getAvailableProcessors());
             batchThreshold = 
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
@@ -152,6 +153,7 @@
         public void close() throws IOException
         {
             // close all the clients before throwing anything
+            logger.debug("Closing MultiThreadCqlRecordWriter");
             IOException clientException = null;
             for (RangeClient client : clients.values())
             {
@@ -173,6 +175,7 @@
         @Override
         public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> 
values) throws IOException
         {
+            logger.debug("Writing with MultiThreadCqlRecordWriter");
             TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
 
             // get the client for the given range, or create a new one
@@ -207,6 +210,7 @@
          */
         public class RangeClient extends Thread
         {
+            protected final String name;
             // The list of endpoints for this range
             protected final List<InetAddress> endpoints;
             protected Session client;
@@ -226,6 +230,8 @@
             public RangeClient(List<InetAddress> endpoints)
             {
                 super("client-" + endpoints);
+                this.name = "RangeClient[client-" + endpoints + "]";
+                logger.debug("Constructing new RangeClient[" + name + "]");
                 this.endpoints = endpoints;
             }
 
@@ -234,6 +240,7 @@
              */
             public void put(List<ByteBuffer> value) throws IOException
             {
+                logger.debug("Putting new value in async queue");
                 while (true)
                 {
                     if (lastException != null)
@@ -258,10 +265,14 @@
                 outer:
                 while (run || !queue.isEmpty())
                 {
+                    logger.debug("Async Run - Looping while (run || 
!queue.isEmpty) (execution loop)");
                     List<ByteBuffer> bindVariables;
                     try
                     {
-                        bindVariables = queue.take();
+                        logger.debug("Async Run - Getting first batch value to 
insert into cassandra");
+                        bindVariables = queue.poll(1, TimeUnit.SECONDS);
+                        // re-check loop condition if no data available
+                        if (bindVariables == null) continue ;
                     }
                     catch (InterruptedException e)
                     {
@@ -270,15 +281,20 @@
                     }
 
                     ListIterator<InetAddress> iter = endpoints.listIterator();
+                    // Initialise client if not already done
+                    if ((client == null) && (!attempt_connect(iter))) break 
outer;
+
                     while (true)
                     {
-                        // send the mutation to the last-used endpoint.  first 
time through, this will NPE harmlessly.
+                        logger.debug("Async Run - Looping inserting batches 
into cassandra available client");
+                        // send the mutation to the last-used endpoint.
                         try
                         {
                             int i = 0;
                             PreparedStatement statement = 
preparedStatement(client);
                             while (bindVariables != null)
                             {
+                                logger.debug("Async Run - Looping inserting 
value " + Integer.toString(i) + " into cassandra with selected client");
                                 BoundStatement boundStatement = new 
BoundStatement(statement);
                                 for (int columnPosition = 0; columnPosition < 
bindVariables.size(); columnPosition++)
                                 {
@@ -291,57 +307,73 @@
                                 i++;
 
                                 if (i >= batchThreshold)
+                                {
+                                    logger.debug("Async Run - Batch full - 
Breaking values insertion loop");
                                     break;
+                                }
                                 bindVariables = queue.poll();
                             }
+                            logger.debug("Async Run - Batch full or no more 
values to insert - Breaking batches insertion loop");
                             break;
                         }
                         catch (Exception e)
                         {
+                            logger.debug("Async Run - Error while inserting 
batch with selected client", e);
                             closeInternal();
                             if (!iter.hasNext())
                             {
+                                logger.debug("Async Run - No other client to 
try to reach, breaking execution loop");
                                 lastException = new IOException(e);
                                 break outer;
                             }
                         }
 
                         // attempt to connect to a different endpoint
-                        try
-                        {
-                            InetAddress address = iter.next();
-                            String host = address.getHostName();
-                            client = CqlConfigHelper.getOutputCluster(host, 
conf).connect();
-                        }
-                        catch (Exception e)
-                        {
-                            //If connection died due to Interrupt, just try 
connecting to the endpoint again.
-                            //There are too many ways for the 
Thread.interrupted() state to be cleared, so
-                            //we can't rely on that here. Until the java 
driver gives us a better way of knowing
-                            //that this exception came from an 
InterruptedException, this is the best solution.
-                            if (canRetryDriverConnection(e))
-                            {
-                                iter.previous();
-                            }
-                            closeInternal();
-
-                            // Most exceptions mean something unexpected went 
wrong to that endpoint, so
-                            // we should try again to another.  Other 
exceptions (auth or invalid request) are fatal.
-                            if ((e instanceof AuthenticationException || e 
instanceof InvalidQueryException) || !iter.hasNext())
-                            {
-                                lastException = new IOException(e);
-                                break outer;
-                            }
-                        }
+                        if (!attempt_connect(iter)) break outer;
                     }
                 }
                 // close all our connections once we are done.
                 closeInternal();
             }
 
+            private boolean attempt_connect(ListIterator<InetAddress> iter) {
+                try
+                {
+                    logger.debug("Async Run - Try a new client from the list");
+                    InetAddress address = iter.next();
+                    String host = address.getHostName();
+                    client = CqlConfigHelper.getOutputCluster(host, 
conf).connect();
+                }
+                catch (Exception e)
+                {
+                    //If connection died due to Interrupt, just try connecting 
to the endpoint again.
+                    //There are too many ways for the Thread.interrupted() 
state to be cleared, so
+                    //we can't rely on that here. Until the java driver gives 
us a better way of knowing
+                    //that this exception came from an InterruptedException, 
this is the best solution.
+                    if (canRetryDriverConnection(e))
+                    {
+                        logger.debug("Async Run - Error is not critical, 
trying with same endpoint");
+                        iter.previous();
+                    }
+                    closeInternal();
+
+                    // Most exceptions mean something unexpected went wrong to 
that endpoint, so
+                    // we should try again to another.  Other exceptions (auth 
or invalid request) are fatal.
+                    if ((e instanceof AuthenticationException || e instanceof 
InvalidQueryException) || !iter.hasNext())
+                    {
+                        logger.debug("Async Run - Error is critical, breaking 
execution loop", e);
+                        lastException = new IOException(e);
+                        return false;
+                    }
+                }
+                return true;
+            }
+
+
             /** get prepared statement id from cache, otherwise prepare it 
from Cassandra server*/
             private PreparedStatement preparedStatement(Session client)
             {
+                logger.debug("Getting prepared statement");
                 PreparedStatement statement = preparedStatements.get(client);
                 if (statement == null)
                 {
@@ -363,9 +395,11 @@
 
             public void close() throws IOException
             {
-                // stop the run loop.  this will result in closeInternal being 
called by the time join() finishes.
+                logger.debug("Closing external");
+                // stop the run loop (Sending Interrupt signal is causing 
driver failure, so we rely on
+                // regular condition checking instead).
+                // this will result in closeInternal being called by the time 
join() finishes.
                 run = false;
-                interrupt();
                 try
                 {
                     this.join();
@@ -381,6 +415,7 @@
 
             protected void closeInternal()
             {
+                logger.debug("Closing internal");
                 if (client != null)
                 {
                     client.close();
@@ -389,6 +424,7 @@
 
             private boolean canRetryDriverConnection(Exception e)
             {
+                logger.debug("Checking retry connection");
                 if (e instanceof DriverException && 
e.getMessage().contains("Connection thread interrupted"))
                     return true;
                 if (e instanceof NoHostAvailableException)

-- 
To view, visit https://gerrit.wikimedia.org/r/282338
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If851c3e97f5c5ff083fb9775c8665ff35355f2d1
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to