Repository: flink
Updated Branches:
  refs/heads/master e7dfa2894 -> a973d84b2


[FLINK-4097] Fix NullPointerException on CassandraSinkBase and 
CassandraTupleWriteAheadSink's close()

This closes #2144


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a973d84b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a973d84b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a973d84b

Branch: refs/heads/master
Commit: a973d84b251ddf87bc47d9806d37d353febcab41
Parents: e7dfa28
Author: Andrea Sella <andrea.se...@radicalbit.io>
Authored: Tue Jun 21 18:12:31 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Jun 22 12:33:43 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/cassandra/CassandraSinkBase.java    | 8 ++++++--
 .../connectors/cassandra/CassandraTupleWriteAheadSink.java   | 8 ++++++--
 2 files changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a973d84b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index c823f5b..49b1efa 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -81,12 +81,16 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
        @Override
        public void close() {
                try {
-                       session.close();
+                       if (session != null) {
+                               session.close();
+                       }
                } catch (Exception e) {
                        LOG.error("Error while closing session.", e);
                }
                try {
-                       cluster.close();
+                       if (cluster != null) {
+                               cluster.close();
+                       }
                } catch (Exception e) {
                        LOG.error("Error while closing cluster.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a973d84b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index f784647..8bce9d6 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -94,12 +94,16 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> 
extends GenericWrite
        public void close() throws Exception {
                super.close();
                try {
-                       session.close();
+                       if (session != null) {
+                               session.close();
+                       }
                } catch (Exception e) {
                        LOG.error("Error while closing session.", e);
                }
                try {
-                       cluster.close();
+                       if (cluster != null) {
+                               cluster.close();
+                       }
                } catch (Exception e) {
                        LOG.error("Error while closing cluster.", e);
                }

Reply via email to