SuXingLee commented on a change in pull request #50: [BAHIR-202] Improve
KuduSink throughput by using async FlushMode
URL: https://github.com/apache/bahir-flink/pull/50#discussion_r268228468
##########
File path:
flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
##########
@@ -79,9 +86,12 @@ public KuduSink(String kuduMasters, KuduTableInfo
tableInfo, KuduSerialization<O
@Override
public void open(Configuration parameters) throws IOException {
- if (connector != null) return;
- connector = new KuduConnector(kuduMasters, tableInfo, consistency,
writeMode);
- serializer.withSchema(tableInfo.getSchema());
+ if (this.connector != null) return;
+ FlushMode flushMode = ((StreamingRuntimeContext)
getRuntimeContext()).isCheckpointingEnabled() ?
Review comment:
In some scenarios, the user does not care about the order, only cares about
the throughput. So we give different option to user.Now, the logic is :
- If flink checkpoint is disable,synchronously write data to kudu.
- If flink checkpoint is enable, asynchronously write data to kudu by
default.
- User also can change to sync by explicitly
calling```KuduSink.withSyncFlushMode()``` when initializing KuduSink.
The out-of-order tip is writed in ```KuduSink.withSyncFlushMode()```
comments, if someone want to orderly write data to kudu when checkpoint is
enable, he can explicitly calling```KuduSink.withSyncFlushMode()``` .
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services