Repository: flink Updated Branches: refs/heads/master e7dfa2894 -> a973d84b2
[FLINK-4097] Fix NullPointerException on CassandraSinkBase and CassandraTupleWriteAheadSink's close() This closes #2144 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a973d84b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a973d84b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a973d84b Branch: refs/heads/master Commit: a973d84b251ddf87bc47d9806d37d353febcab41 Parents: e7dfa28 Author: Andrea Sella <andrea.se...@radicalbit.io> Authored: Tue Jun 21 18:12:31 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Jun 22 12:33:43 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/cassandra/CassandraSinkBase.java | 8 ++++++-- .../connectors/cassandra/CassandraTupleWriteAheadSink.java | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a973d84b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index c823f5b..49b1efa 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -81,12 +81,16 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void close() { try { - session.close(); + if (session != null) { + session.close(); + } } catch (Exception e) { LOG.error("Error while closing session.", e); } try { - cluster.close(); + if (cluster != null) { + cluster.close(); + } } catch (Exception e) { LOG.error("Error while closing cluster.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/a973d84b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java index f784647..8bce9d6 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java @@ -94,12 +94,16 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite public void close() throws Exception { super.close(); try { - session.close(); + if (session != null) { + session.close(); + } } catch (Exception e) { LOG.error("Error while closing session.", e); } try { - cluster.close(); + if (cluster != null) { + cluster.close(); + } } catch (Exception e) { LOG.error("Error while closing cluster.", e); }