[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614390#comment-16614390
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605192
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyFromSerializerToTargetChannel(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* @param targetChannel
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   boolean pruneTriggered = false;
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-  

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614389#comment-16614389
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605083
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   Yes, that makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605192
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyFromSerializerToTargetChannel(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* @param targetChannel
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   boolean pruneTriggered = false;
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
+
+   // 

[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605083
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   Yes, that makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-13 Thread Jayant Ameta (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614371#comment-16614371
 ] 

Jayant Ameta commented on FLINK-10310:
--

Hi [~till.rohrmann]

{{CassandraSinkBase}} would have a field similar to 
{{ActionRequestFailureHandler}}.

In the {{checkAsyncErrors}} method, the failureHandler would be called instead 
of throwing the {{IOException}}

Current code snippet
{code:java}
private void checkAsyncErrors() throws Exception {
Throwable error = exception;
if (error != null) {
// prevent throwing duplicated error
exception = null;
throw new IOException("Error while sending value.", 
error);
}
}
{code}

would change to:
{code:java}
private void checkAsyncErrors() throws Exception {
Throwable error = exception;
if (error != null) {
failureHandler.onFailure(error);
}
}
{code}

Here the {{failureHandler}} can decide what steps to take based on the 
{{Throwable}}.


> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10240) Pluggable scheduling strategy for batch job

2018-09-13 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614335#comment-16614335
 ] 

陈梓立 commented on FLINK-10240:
-

Introduce pluggable schedule strategy is an excellent idea that could expand a 
lot the cases Flink is able to handle.

I like this idea and can help. However, the document attached above is 
read-only. So I remain my comments as a link to a copy of it below. Most of 
them are layout improvements and minor reword, the body of document is no more 
than the original design.

https://docs.google.com/document/d/15pUYc5_yrY2IwmnADCoNWZwOIYCOcroWmuuZHs-vdlU/edit?usp=sharing

Note that this is a EDITABLE document and everyone interest on it can remains 
comments or edit it directly. As an open source software we just trust our 
contributors and the document could be frozen and left comment-only if the 
discussion reaches a consensus.

> Pluggable scheduling strategy for batch job
> ---
>
> Key: FLINK-10240
> URL: https://issues.apache.org/jira/browse/FLINK-10240
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: scheduling
>
> Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source 
> tasks are scheduled in the beginning, and other tasks are scheduled once 
> there input data are consumable.
> However, input data consumable does not always mean the task can work at 
> once. 
>  
> One example is the hash join operation, where the operator first consumes one 
> side(we call it build side) to setup a table, then consumes the other side(we 
> call it probe side) to do the real join work. If the probe side is started 
> early, it just get stuck on back pressure as the join operator will not 
> consume data from it before the building stage is done, causing a waste of 
> resources.
> If we have the probe side task started after the build stage is done, both 
> the build and probe side can have more computing resources as they are 
> staggered.
>  
> That's why we think a flexible scheduling strategy is needed, allowing job 
> owners to customize the vertex schedule order and constraints. Better 
> resource utilization usually means better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614325#comment-16614325
 ] 

ASF GitHub Bot commented on FLINK-10319:


TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Too many requestPartitionState would crash JM
> -
>
> Key: FLINK-10319
> URL: https://issues.apache.org/jira/browse/FLINK-10319
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Do not requestPartitionState from JM on partition request fail, which may 
> generate too many RPC requests and block JM.
> We gain little benefit to check what state producer is in, which in the other 
> hand crash JM by too many RPC requests. Task could always 
> retriggerPartitionRequest from its InputGate, it would be fail if the 
> producer has gone and succeed if the producer alive. Anyway, no need to ask 
> for JM for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM

2018-09-13 Thread GitBox
TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10240) Pluggable scheduling strategy for batch job

2018-09-13 Thread Zhu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614296#comment-16614296
 ] 

Zhu Zhu commented on FLINK-10240:
-

Here's the design for a pluggable scheduling mechanism, which I think can 
achieve customizable and flexible scheduling to fit for different operators and 
topologies.

https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/edit?usp=sharing

> Pluggable scheduling strategy for batch job
> ---
>
> Key: FLINK-10240
> URL: https://issues.apache.org/jira/browse/FLINK-10240
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: scheduling
>
> Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source 
> tasks are scheduled in the beginning, and other tasks are scheduled once 
> there input data are consumable.
> However, input data consumable does not always mean the task can work at 
> once. 
>  
> One example is the hash join operation, where the operator first consumes one 
> side(we call it build side) to setup a table, then consumes the other side(we 
> call it probe side) to do the real join work. If the probe side is started 
> early, it just get stuck on back pressure as the join operator will not 
> consume data from it before the building stage is done, causing a waste of 
> resources.
> If we have the probe side task started after the build stage is done, both 
> the build and probe side can have more computing resources as they are 
> staggered.
>  
> That's why we think a flexible scheduling strategy is needed, allowing job 
> owners to customize the vertex schedule order and constraints. Better 
> resource utilization usually means better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9991) Add regexp_replace supported in TableAPI and SQL

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614294#comment-16614294
 ] 

ASF GitHub Bot commented on FLINK-9991:
---

yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6450#issuecomment-421216764
 
 
   @xccui I have refactored this PR. Can you review it again?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add regexp_replace supported in TableAPI and SQL
> 
>
> Key: FLINK-9991
> URL: https://issues.apache.org/jira/browse/FLINK-9991
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> regexp_replace is a very userful function to process String. 
>  For example :
> {code:java}
> regexp_replace("foobar", "oo|ar", "") //returns 'fb.'
> {code}
> It is supported as a UDF in Hive, more details please see[1].
> [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace supported in TableAPI and SQL

2018-09-13 Thread GitBox
yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6450#issuecomment-421216764
 
 
   @xccui I have refactored this PR. Can you review it again?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10240) Pluggable scheduling strategy for batch job

2018-09-13 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10240:

Summary: Pluggable scheduling strategy for batch job  (was: Flexible 
scheduling strategy is needed for batch job)

> Pluggable scheduling strategy for batch job
> ---
>
> Key: FLINK-10240
> URL: https://issues.apache.org/jira/browse/FLINK-10240
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: scheduling
>
> Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source 
> tasks are scheduled in the beginning, and other tasks are scheduled once 
> there input data are consumable.
> However, input data consumable does not always mean the task can work at 
> once. 
>  
> One example is the hash join operation, where the operator first consumes one 
> side(we call it build side) to setup a table, then consumes the other side(we 
> call it probe side) to do the real join work. If the probe side is started 
> early, it just get stuck on back pressure as the join operator will not 
> consume data from it before the building stage is done, causing a waste of 
> resources.
> If we have the probe side task started after the build stage is done, both 
> the build and probe side can have more computing resources as they are 
> staggered.
>  
> That's why we think a flexible scheduling strategy is needed, allowing job 
> owners to customize the vertex schedule order and constraints. Better 
> resource utilization usually means better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10332) Move data available notification in PipelinedSubpartition out of the synchronized block

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614131#comment-16614131
 ] 

ASF GitHub Bot commented on FLINK-10332:


NicoK opened a new pull request #6693: [FLINK-10332][network] move data 
notification out of the synchronized block
URL: https://github.com/apache/flink/pull/6693
 
 
   ## What is the purpose of the change
   
   Currently, calls to `PipelinedSubpartition#notifyDataAvailable()` are 
unnecessarily executed inside a synchronized (buffers) block which may lead to 
lock contention which this PR fixes.
   
   Please note, that we build upon #6692.
   
   ## Brief change log
   
   - move data notification out of the synchronized block
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **yes** 
(depending on output flusher interval, rather per buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move data available notification in PipelinedSubpartition out of the 
> synchronized block
> ---
>
> Key: FLINK-10332
> URL: https://issues.apache.org/jira/browse/FLINK-10332
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, calls to {{PipelinedSubpartition#notifyDataAvailable();}} are 
> unnecessarily executed inside a {{synchronized (buffers)}} block which may 
> lead to lock contention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10332) Move data available notification in PipelinedSubpartition out of the synchronized block

2018-09-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10332:
---
Labels: pull-request-available  (was: )

> Move data available notification in PipelinedSubpartition out of the 
> synchronized block
> ---
>
> Key: FLINK-10332
> URL: https://issues.apache.org/jira/browse/FLINK-10332
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, calls to {{PipelinedSubpartition#notifyDataAvailable();}} are 
> unnecessarily executed inside a {{synchronized (buffers)}} block which may 
> lead to lock contention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK opened a new pull request #6693: [FLINK-10332][network] move data notification out of the synchronized block

2018-09-13 Thread GitBox
NicoK opened a new pull request #6693: [FLINK-10332][network] move data 
notification out of the synchronized block
URL: https://github.com/apache/flink/pull/6693
 
 
   ## What is the purpose of the change
   
   Currently, calls to `PipelinedSubpartition#notifyDataAvailable()` are 
unnecessarily executed inside a synchronized (buffers) block which may lead to 
lock contention which this PR fixes.
   
   Please note, that we build upon #6692.
   
   ## Brief change log
   
   - move data notification out of the synchronized block
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **yes** 
(depending on output flusher interval, rather per buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614130#comment-16614130
 ] 

ASF GitHub Bot commented on FLINK-10222:


fhueske commented on a change in pull request #6622: [FLINK-10222] [table] 
Table scalar function expression parses error when function name equals the 
exists keyword suffix
URL: https://github.com/apache/flink/pull/6622#discussion_r217554963
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -45,6 +45,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 ("""(?i)\Q""" + kw.key + """\E""").r
   }
 
+  // Convert the keyword into an case insensitive Parser
+  // It uses an exact matching mode. scenes to be used:
+  // a keyword as a suffix no required parameter and not as a prefix
+  def keyword2ParserForSuffixButNotAsPrefix(kw: Keyword): Parser[String] = {
 
 Review comment:
   Yes, I agree with @walterddr. This seems to be a better approach. 
   A keyword should only match if it is not followed by another identifier (see 
[Java identifier 
specs](https://docs.oracle.com/javase/specs/jls/se7/html/jls-3.html#jls-3.8)) 
character. 
   
   What do you think @yanghua?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6622: [FLINK-10222] [table] Table scalar function expression parses error when function name equals the exists keyword suffix

2018-09-13 Thread GitBox
fhueske commented on a change in pull request #6622: [FLINK-10222] [table] 
Table scalar function expression parses error when function name equals the 
exists keyword suffix
URL: https://github.com/apache/flink/pull/6622#discussion_r217554963
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -45,6 +45,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 ("""(?i)\Q""" + kw.key + """\E""").r
   }
 
+  // Convert the keyword into an case insensitive Parser
+  // It uses an exact matching mode. scenes to be used:
+  // a keyword as a suffix no required parameter and not as a prefix
+  def keyword2ParserForSuffixButNotAsPrefix(kw: Keyword): Parser[String] = {
 
 Review comment:
   Yes, I agree with @walterddr. This seems to be a better approach. 
   A keyword should only match if it is not followed by another identifier (see 
[Java identifier 
specs](https://docs.oracle.com/javase/specs/jls/se7/html/jls-3.html#jls-3.8)) 
character. 
   
   What do you think @yanghua?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614128#comment-16614128
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary 
flushing
URL: https://github.com/apache/flink/pull/6692
 
 
   ## What is the purpose of the change
   
   With the re-design of the record writer interaction with the 
result(sub)partitions, flush requests can currently pile up in these scenarios:
   - a previous flush request has not been completely handled yet and/or is 
still enqueued or
   - the network stack is still polling from this subpartition and doesn't need 
a new notification
   
   These lead to increased notifications in low latency settings (low output 
flusher intervals) which can be avoided.
   
   ## Brief change log
   
   - do not flush (again) in the scenarios mentioned above, relying on 
`flushRequested` and the `buffer` queue size
   - add intensive sanity checks to `SpillingAdaptiveSpanningRecordDeserializer`
   - several smaller improvement hotfixes (please see the individual commits)
   
   ## Verifying this change
   
   This change is already covered by existing tests plus a few new tests in 
`PipelinedSubpartitionTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **yes** 
(depending on output flusher interval, rather per buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **JavaDocs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10331:
---
Labels: pull-request-available  (was: )

> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary flushing

2018-09-13 Thread GitBox
NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary 
flushing
URL: https://github.com/apache/flink/pull/6692
 
 
   ## What is the purpose of the change
   
   With the re-design of the record writer interaction with the 
result(sub)partitions, flush requests can currently pile up in these scenarios:
   - a previous flush request has not been completely handled yet and/or is 
still enqueued or
   - the network stack is still polling from this subpartition and doesn't need 
a new notification
   
   These lead to increased notifications in low latency settings (low output 
flusher intervals) which can be avoided.
   
   ## Brief change log
   
   - do not flush (again) in the scenarios mentioned above, relying on 
`flushRequested` and the `buffer` queue size
   - add intensive sanity checks to `SpillingAdaptiveSpanningRecordDeserializer`
   - several smaller improvement hotfixes (please see the individual commits)
   
   ## Verifying this change
   
   This change is already covered by existing tests plus a few new tests in 
`PipelinedSubpartitionTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **yes** 
(depending on output flusher interval, rather per buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **JavaDocs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614115#comment-16614115
 ] 

ASF GitHub Bot commented on FLINK-10290:


fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217547642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
 ##
 @@ -742,6 +742,42 @@ class TableSourceITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testTableSourceScanWithConversion(): Unit = {
 
 Review comment:
   Rename to `testTableSourceScanWithPermutedFields`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Conversion error in StreamScan and BatchScan
> 
>
> Key: FLINK-10290
> URL: https://issues.apache.org/jira/browse/FLINK-10290
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.3, 1.6.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>  Labels: pull-request-available
>
> `RowTypeInfo#equals()` only compares field types, and fields names are not 
> considered. When checking the equality of `inputType` and `internalType`, we 
> should compare both filed types and field names.
> Behavior of this bug:
> A table T with schema (a: Long, b:Long, c:Long)
> SELECT b,c,a from T
> expected: b,c,a
> actually: a,b,c



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614114#comment-16614114
 ] 

ASF GitHub Bot commented on FLINK-10290:


fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217544935
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with 
DataStreamRel {
   f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
   f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
 
-if (input.getType == cRowType && !hasTimeIndicator) {
+if (inputType == cRowType && !hasTimeIndicator) {
 
 Review comment:
   `CRowTypeInfo` doesn't check for field name equality either. Can you add a 
`schemaEquals()` method to `CRowTypeInfo` and call this here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Conversion error in StreamScan and BatchScan
> 
>
> Key: FLINK-10290
> URL: https://issues.apache.org/jira/browse/FLINK-10290
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.3, 1.6.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>  Labels: pull-request-available
>
> `RowTypeInfo#equals()` only compares field types, and fields names are not 
> considered. When checking the equality of `inputType` and `internalType`, we 
> should compare both filed types and field names.
> Behavior of this bug:
> A table T with schema (a: Long, b:Long, c:Long)
> SELECT b,c,a from T
> expected: b,c,a
> actually: a,b,c



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614116#comment-16614116
 ] 

ASF GitHub Bot commented on FLINK-10290:


fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217547586
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 ##
 @@ -778,4 +778,42 @@ class TableSourceITCase extends AbstractTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testTableSourceScanWithConversion(): Unit = {
 
 Review comment:
   Rename to `testTableSourceScanWithPermutedFields()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Conversion error in StreamScan and BatchScan
> 
>
> Key: FLINK-10290
> URL: https://issues.apache.org/jira/browse/FLINK-10290
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.3, 1.6.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>  Labels: pull-request-available
>
> `RowTypeInfo#equals()` only compares field types, and fields names are not 
> considered. When checking the equality of `inputType` and `internalType`, we 
> should compare both filed types and field names.
> Behavior of this bug:
> A table T with schema (a: Long, b:Long, c:Long)
> SELECT b,c,a from T
> expected: b,c,a
> actually: a,b,c



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614113#comment-16614113
 ] 

ASF GitHub Bot commented on FLINK-10290:


fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217545093
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with 
DataStreamRel {
   f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
   f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
 
-if (input.getType == cRowType && !hasTimeIndicator) {
+if (inputType == cRowType && !hasTimeIndicator) {
 
 Review comment:
   Would also be good to have a test for this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Conversion error in StreamScan and BatchScan
> 
>
> Key: FLINK-10290
> URL: https://issues.apache.org/jira/browse/FLINK-10290
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.3, 1.6.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>  Labels: pull-request-available
>
> `RowTypeInfo#equals()` only compares field types, and fields names are not 
> considered. When checking the equality of `inputType` and `internalType`, we 
> should compare both filed types and field names.
> Behavior of this bug:
> A table T with schema (a: Long, b:Long, c:Long)
> SELECT b,c,a from T
> expected: b,c,a
> actually: a,b,c



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan

2018-09-13 Thread GitBox
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217544935
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with 
DataStreamRel {
   f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
   f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
 
-if (input.getType == cRowType && !hasTimeIndicator) {
+if (inputType == cRowType && !hasTimeIndicator) {
 
 Review comment:
   `CRowTypeInfo` doesn't check for field name equality either. Can you add a 
`schemaEquals()` method to `CRowTypeInfo` and call this here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan

2018-09-13 Thread GitBox
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217547642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
 ##
 @@ -742,6 +742,42 @@ class TableSourceITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testTableSourceScanWithConversion(): Unit = {
 
 Review comment:
   Rename to `testTableSourceScanWithPermutedFields`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan

2018-09-13 Thread GitBox
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217547586
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 ##
 @@ -778,4 +778,42 @@ class TableSourceITCase extends AbstractTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testTableSourceScanWithConversion(): Unit = {
 
 Review comment:
   Rename to `testTableSourceScanWithPermutedFields()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan

2018-09-13 Thread GitBox
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix 
conversion error in StreamScan and BatchScan
URL: https://github.com/apache/flink/pull/#discussion_r217545093
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with 
DataStreamRel {
   f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
   f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
 
-if (input.getType == cRowType && !hasTimeIndicator) {
+if (inputType == cRowType && !hasTimeIndicator) {
 
 Review comment:
   Would also be good to have a test for this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-8688) Enable distinct aggregation for data stream on Table/SQL API

2018-09-13 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-8688.

   Resolution: Implemented
Fix Version/s: 1.7.0

All sub-issues have been implemented

> Enable distinct aggregation for data stream on Table/SQL API
> 
>
> Key: FLINK-8688
> URL: https://issues.apache.org/jira/browse/FLINK-8688
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Distinct aggregation is not currently supported on data stream with Table/SQL 
> API. This is an umbrella task for enabling distinct aggregation in various 
> use cases.
> Discussion doc can be found here: 
> https://docs.google.com/document/d/1zj6OA-K2hi7ah8Fo-xTQB-mVmYfm6LsN2_NHgTCVmJI/edit?usp=sharing
>  
> Distinct aggregation is a very important feature in SQL processing and there 
> are many JIRAs currently open with various use cases. The goal is to create 
> one solution to both unbounded and bounded distinct aggregation on data 
> stream so that it can easily be extended to support these use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9232) Add harness test for AggregationCodeGenerator

2018-09-13 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-9232:
-
Issue Type: Improvement  (was: Sub-task)
Parent: (was: FLINK-8688)

> Add harness test for AggregationCodeGenerator 
> --
>
> Key: FLINK-9232
> URL: https://issues.apache.org/jira/browse/FLINK-9232
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Instead of relying on ITCase to cover the codegen result. We should have 
> direct test against that, for example using Harness test framework.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8739) Optimize runtime support for distinct filter

2018-09-13 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-8739:
-
Issue Type: Improvement  (was: Sub-task)
Parent: (was: FLINK-8688)

> Optimize runtime support for distinct filter
> 
>
> Key: FLINK-8739
> URL: https://issues.apache.org/jira/browse/FLINK-8739
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Possible optimizaitons:
> 1. Decouple distinct map and actual accumulator so that they can separately 
> be created in codegen.
> 2. Reuse same distinct accumulator for filtering, e.g. `SELECT 
> COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614083#comment-16614083
 ] 

ASF GitHub Bot commented on FLINK-5315:
---

asfgit closed pull request #6521: [FLINK-5315][table] Adding support for 
distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f8bcd3da1af..a9b92fad995 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -370,6 +370,44 @@ Table result = orders
Note: All aggregates must be defined over the same window, 
i.e., same partitioning, sorting, and range. Currently, only windows with 
PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges 
with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.
   
 
+
+  
+Distinct Aggregation
+Batch Streaming 
+Result Updating
+  
+  
+Similar to a SQL DISTINCT aggregation clause such as COUNT(DISTINCT 
a). Distinct aggregation declares that an aggregation function (built-in or 
user-defined) is only applied on distinct input values. Distinct can be applied 
to GroupBy Aggregation, GroupBy Window Aggregation and Over 
Window Aggregation.
+{% highlight java %}
+Table orders = tableEnv.scan("Orders");
+// Distinct aggregation on group by
+Table groupByDistinctResult = orders
+.groupBy("a")
+.select("a, b.sum.distinct as d");
+// Distinct aggregation on time window group by
+Table groupByWindowDistinctResult = orders
+.window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
+.select("a, b.sum.distinct as d");
+// Distinct aggregation on over window
+Table result = orders
+.window(Over
+.partitionBy("a")
+.orderBy("rowtime")
+.preceding("UNBOUNDED_RANGE")
+.as("w"))
+.select("a, b.avg.distinct over w, b.max over w, b.min over w");
+{% endhighlight %}
+User-defined aggregation function can also be used with DISTINCT 
modifiers. To calculate the aggregate results only for distinct values, simply 
add the distinct modifier towards the aggregation function. 
+{% highlight java %}
+Table orders = tEnv.scan("Orders");
+
+// Use distinct aggregation for user-defined aggregate functions
+tEnv.registerFunction("myUdagg", new MyUdagg());
+orders.groupBy("users").select("users, myUdagg.distinct(points) as 
myDistinctResult");
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+  
+
 
   
 Distinct
@@ -453,6 +491,44 @@ val result: Table = orders
Note: All aggregates must be defined over the same window, 
i.e., same partitioning, sorting, and range. Currently, only windows with 
PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges 
with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.
   
 
+
+  
+Distinct Aggregation
+Batch Streaming 
+Result Updating
+  
+  
+Similar to a SQL DISTINCT AGGREGATION clause such as COUNT(DISTINCT 
a). Distinct aggregation declares that an aggregation function (built-in or 
user-defined) is only applied on distinct input values. Distinct can be applied 
to GroupBy Aggregation, GroupBy Window Aggregation and Over 
Window Aggregation.
+{% highlight scala %}
+val orders: Table = tableEnv.scan("Orders");
+// Distinct aggregation on group by
+val groupByDistinctResult = orders
+.groupBy('a)
+.select('a, 'b.sum.distinct as 'd)
+// Distinct aggregation on time window group by
+val groupByWindowDistinctResult = orders
+.window(Tumble over 5.minutes on 'rowtime as 'w).groupBy('a, 'w)
+.select('a, 'b.sum.distinct as 'd)
+// Distinct aggregation on over window
+val result = orders
+.window(Over
+partitionBy 'a
+orderBy 'rowtime
+preceding UNBOUNDED_RANGE
+as 'w)
+.select('a, 'b.avg.distinct over 'w, 'b.max over 'w, 'b.min over 'w)
+{% endhighlight %}
+User-defined aggregation function can also be used with DISTINCT 
modifiers. To calculate the aggregate results only for distinct values, simply 
add the distinct modifier towards the aggregation function. 
+{% highlight scala %}
+val orders: Table = tEnv.scan("Orders");
+
+// Use distinct aggregation for user-defined 

[jira] [Closed] (FLINK-5315) Support distinct aggregations in table api

2018-09-13 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5315.

   Resolution: Implemented
Fix Version/s: 1.7.0

Implemented for 1.7.0 with 04c7cdf7d3d7b0862f7a4a3e7d821b1fb62ad3f0

> Support distinct aggregations in table api
> --
>
> Key: FLINK-5315
> URL: https://issues.apache.org/jira/browse/FLINK-5315
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Kurt Young
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Support distinct aggregations in Table API in the following format:
> For Expressions:
> {code:scala}
> 'a.count.distinct // Expressions distinct modifier
> {code}
> For User-defined Function:
> {code:scala}
> singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier
> multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-09-13 Thread GitBox
asfgit closed pull request #6521: [FLINK-5315][table] Adding support for 
distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f8bcd3da1af..a9b92fad995 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -370,6 +370,44 @@ Table result = orders
Note: All aggregates must be defined over the same window, 
i.e., same partitioning, sorting, and range. Currently, only windows with 
PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges 
with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.
   
 
+
+  
+Distinct Aggregation
+Batch Streaming 
+Result Updating
+  
+  
+Similar to a SQL DISTINCT aggregation clause such as COUNT(DISTINCT 
a). Distinct aggregation declares that an aggregation function (built-in or 
user-defined) is only applied on distinct input values. Distinct can be applied 
to GroupBy Aggregation, GroupBy Window Aggregation and Over 
Window Aggregation.
+{% highlight java %}
+Table orders = tableEnv.scan("Orders");
+// Distinct aggregation on group by
+Table groupByDistinctResult = orders
+.groupBy("a")
+.select("a, b.sum.distinct as d");
+// Distinct aggregation on time window group by
+Table groupByWindowDistinctResult = orders
+.window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
+.select("a, b.sum.distinct as d");
+// Distinct aggregation on over window
+Table result = orders
+.window(Over
+.partitionBy("a")
+.orderBy("rowtime")
+.preceding("UNBOUNDED_RANGE")
+.as("w"))
+.select("a, b.avg.distinct over w, b.max over w, b.min over w");
+{% endhighlight %}
+User-defined aggregation function can also be used with DISTINCT 
modifiers. To calculate the aggregate results only for distinct values, simply 
add the distinct modifier towards the aggregation function. 
+{% highlight java %}
+Table orders = tEnv.scan("Orders");
+
+// Use distinct aggregation for user-defined aggregate functions
+tEnv.registerFunction("myUdagg", new MyUdagg());
+orders.groupBy("users").select("users, myUdagg.distinct(points) as 
myDistinctResult");
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+  
+
 
   
 Distinct
@@ -453,6 +491,44 @@ val result: Table = orders
Note: All aggregates must be defined over the same window, 
i.e., same partitioning, sorting, and range. Currently, only windows with 
PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges 
with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.
   
 
+
+  
+Distinct Aggregation
+Batch Streaming 
+Result Updating
+  
+  
+Similar to a SQL DISTINCT AGGREGATION clause such as COUNT(DISTINCT 
a). Distinct aggregation declares that an aggregation function (built-in or 
user-defined) is only applied on distinct input values. Distinct can be applied 
to GroupBy Aggregation, GroupBy Window Aggregation and Over 
Window Aggregation.
+{% highlight scala %}
+val orders: Table = tableEnv.scan("Orders");
+// Distinct aggregation on group by
+val groupByDistinctResult = orders
+.groupBy('a)
+.select('a, 'b.sum.distinct as 'd)
+// Distinct aggregation on time window group by
+val groupByWindowDistinctResult = orders
+.window(Tumble over 5.minutes on 'rowtime as 'w).groupBy('a, 'w)
+.select('a, 'b.sum.distinct as 'd)
+// Distinct aggregation on over window
+val result = orders
+.window(Over
+partitionBy 'a
+orderBy 'rowtime
+preceding UNBOUNDED_RANGE
+as 'w)
+.select('a, 'b.avg.distinct over 'w, 'b.max over 'w, 'b.min over 'w)
+{% endhighlight %}
+User-defined aggregation function can also be used with DISTINCT 
modifiers. To calculate the aggregate results only for distinct values, simply 
add the distinct modifier towards the aggregation function. 
+{% highlight scala %}
+val orders: Table = tEnv.scan("Orders");
+
+// Use distinct aggregation for user-defined aggregate functions
+val myUdagg = new MyUdagg();
+orders.groupBy('users).select('users, myUdagg.distinct('points) as 
'myDistinctResult);
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result 

[jira] [Closed] (FLINK-10223) TaskManagers should log their ResourceID during startup

2018-09-13 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-10223.


> TaskManagers should log their ResourceID during startup
> ---
>
> Key: FLINK-10223
> URL: https://issues.apache.org/jira/browse/FLINK-10223
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.1, 1.7.0
>Reporter: Konstantin Knauf
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> To debug exceptions like "org.apache.flink.util.FlinkException: The assigned 
> slot  was removed." in the master container it is often helpful to 
> know, which slot was provided by which Taskmanager. The only way to relate 
> slots to TaskManagers right now, seems to be to enable DEBUG logging for 
> `org.apache.flink.runtime.jobmaster.slotpool.SlotPool`.
> This would be solved, if each Taskmanager would log out their `ResouceID` 
> during startup as the `SlotID` mainly consists of the `ResourceID` of the 
> providing Taskmanager. For Mesos and YARN the `ResourceID` has an intrinsic 
> meaning, but for a stand-alone or containerized setup the `ResourceID` is 
> just the a random ID.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10223) TaskManagers should log their ResourceID during startup

2018-09-13 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao resolved FLINK-10223.
--
Resolution: Fixed

Fixed via

1.5: 9cbf99e7a56c50ba4e7bd5ee6444f6a08b4e5796
1.6: f5c1d99a7fc6d9243e12329bce14a300bff767a2
1.7: a7a06b05bb714cd212f3c672a5b886a5ee82a705

> TaskManagers should log their ResourceID during startup
> ---
>
> Key: FLINK-10223
> URL: https://issues.apache.org/jira/browse/FLINK-10223
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.1, 1.7.0
>Reporter: Konstantin Knauf
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> To debug exceptions like "org.apache.flink.util.FlinkException: The assigned 
> slot  was removed." in the master container it is often helpful to 
> know, which slot was provided by which Taskmanager. The only way to relate 
> slots to TaskManagers right now, seems to be to enable DEBUG logging for 
> `org.apache.flink.runtime.jobmaster.slotpool.SlotPool`.
> This would be solved, if each Taskmanager would log out their `ResouceID` 
> during startup as the `SlotID` mainly consists of the `ResourceID` of the 
> providing Taskmanager. For Mesos and YARN the `ResourceID` has an intrinsic 
> meaning, but for a stand-alone or containerized setup the `ResourceID` is 
> just the a random ID.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10223) TaskManagers should log their ResourceID during startup

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613974#comment-16613974
 ] 

ASF GitHub Bot commented on FLINK-10223:


asfgit closed pull request #6679: [FLINK-10223][LOG]Logging with resourceId 
during taskmanager startup
URL: https://github.com/apache/flink/pull/6679
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index eb6df19a4ed..1ce29af00a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -710,6 +710,7 @@ private RegistrationResponse registerTaskExecutorInternal(
WorkerRegistration registration =
new WorkerRegistration<>(taskExecutorGateway, 
newWorker, dataPort, hardwareDescription);
 
+   log.info("Registering TaskManager {} ({}) at 
ResourceManager", taskExecutorResourceId, taskExecutorAddress);
taskExecutors.put(taskExecutorResourceId, registration);
 

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new 
HeartbeatTarget() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index afe22dea1a4..5c1f420dd54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -349,6 +349,8 @@ public static TaskExecutor startTaskManager(
checkNotNull(rpcService);
checkNotNull(highAvailabilityServices);
 
+   LOG.info("Starting TaskManager with ResourceID: {}", 
resourceID);
+
InetAddress remoteAddress = 
InetAddress.getByName(rpcService.getAddress());
 
TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0988730689a..b58a67c1c07 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -350,7 +350,7 @@ class JobManager(
   hardwareInformation,
   numberOfSlots) =>
   // we are being informed by the ResourceManager that a new task manager 
is available
-  log.debug(s"RegisterTaskManager: $msg")
+  log.info(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c04084c55f4..2008ad87d56 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1831,7 +1831,7 @@ object TaskManager {
   taskManagerClass: Class[_ <: TaskManager])
 : Unit = {
 
-LOG.info("Starting TaskManager")
+LOG.info(s"Starting TaskManager with ResourceID: $resourceID")
 
 // Bring up the TaskManager actor system first, bind it to the given 
address.
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TaskManagers should log their ResourceID during startup
> ---
>
> Key: FLINK-10223
> URL: https://issues.apache.org/jira/browse/FLINK-10223
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.1, 1.7.0
>Reporter: Konstantin Knauf
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> To debug exceptions like "org.apache.flink.util.FlinkException: The assigned 
> slot  was removed." in the master 

[GitHub] asfgit closed pull request #6679: [FLINK-10223][LOG]Logging with resourceId during taskmanager startup

2018-09-13 Thread GitBox
asfgit closed pull request #6679: [FLINK-10223][LOG]Logging with resourceId 
during taskmanager startup
URL: https://github.com/apache/flink/pull/6679
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index eb6df19a4ed..1ce29af00a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -710,6 +710,7 @@ private RegistrationResponse registerTaskExecutorInternal(
WorkerRegistration registration =
new WorkerRegistration<>(taskExecutorGateway, 
newWorker, dataPort, hardwareDescription);
 
+   log.info("Registering TaskManager {} ({}) at 
ResourceManager", taskExecutorResourceId, taskExecutorAddress);
taskExecutors.put(taskExecutorResourceId, registration);
 

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new 
HeartbeatTarget() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index afe22dea1a4..5c1f420dd54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -349,6 +349,8 @@ public static TaskExecutor startTaskManager(
checkNotNull(rpcService);
checkNotNull(highAvailabilityServices);
 
+   LOG.info("Starting TaskManager with ResourceID: {}", 
resourceID);
+
InetAddress remoteAddress = 
InetAddress.getByName(rpcService.getAddress());
 
TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0988730689a..b58a67c1c07 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -350,7 +350,7 @@ class JobManager(
   hardwareInformation,
   numberOfSlots) =>
   // we are being informed by the ResourceManager that a new task manager 
is available
-  log.debug(s"RegisterTaskManager: $msg")
+  log.info(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c04084c55f4..2008ad87d56 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1831,7 +1831,7 @@ object TaskManager {
   taskManagerClass: Class[_ <: TaskManager])
 : Unit = {
 
-LOG.info("Starting TaskManager")
+LOG.info(s"Starting TaskManager with ResourceID: $resourceID")
 
 // Bring up the TaskManager actor system first, bind it to the given 
address.
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10329) Fail with exception if job cannot be removed by ZooKeeperSubmittedJobGraphStore#removeJobGraph

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613739#comment-16613739
 ] 

ASF GitHub Bot commented on FLINK-10329:


tillrohrmann commented on issue #6686: [FLINK-10329] [FLINK-10328] Fail 
ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & 
Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6686#issuecomment-421070901
 
 
   Thanks for the review @azagrebin. Merging this PR after I've merged #6678.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail with exception if job cannot be removed by 
> ZooKeeperSubmittedJobGraphStore#removeJobGraph
> --
>
> Key: FLINK-10329
> URL: https://issues.apache.org/jira/browse/FLINK-10329
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Callers of {{ZooKeeperSubmittedJobGraph#removeJobGraph}} expect that we fail 
> with an exception if the {{JobGraph}} cannot be removed. This is not the case 
> since we call internally {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. 
> If this method returns {{false}}, then we need to fail with an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10329) Fail with exception if job cannot be removed by ZooKeeperSubmittedJobGraphStore#removeJobGraph

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613738#comment-16613738
 ] 

ASF GitHub Bot commented on FLINK-10329:


tillrohrmann commented on a change in pull request #6686: [FLINK-10329] 
[FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot 
be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6686#discussion_r217453535
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
 */
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
-   log.info("Dispatcher {} was granted leadership with fencing 
token {}", getAddress(), newLeaderSessionID);
+   runAsyncWithoutFencing(
+   () -> {
+   log.info("Dispatcher {} was granted leadership 
with fencing token {}", getAddress(), newLeaderSessionID);
 
-   final CompletableFuture> 
recoveredJobsFuture = recoverJobs();
+   final CompletableFuture> 
recoveredJobsFuture = recoveryOperation.thenComposeAsync(
+   ignored -> recoverJobs(),
 
 Review comment:
   Good point. Will fix it  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail with exception if job cannot be removed by 
> ZooKeeperSubmittedJobGraphStore#removeJobGraph
> --
>
> Key: FLINK-10329
> URL: https://issues.apache.org/jira/browse/FLINK-10329
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Callers of {{ZooKeeperSubmittedJobGraph#removeJobGraph}} expect that we fail 
> with an exception if the {{JobGraph}} cannot be removed. This is not the case 
> since we call internally {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. 
> If this method returns {{false}}, then we need to fail with an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeper

2018-09-13 Thread GitBox
tillrohrmann commented on issue #6686: [FLINK-10329] [FLINK-10328] Fail 
ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & 
Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6686#issuecomment-421070901
 
 
   Thanks for the review @azagrebin. Merging this PR after I've merged #6678.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when sto

2018-09-13 Thread GitBox
tillrohrmann commented on a change in pull request #6686: [FLINK-10329] 
[FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot 
be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6686#discussion_r217453535
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
 */
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
-   log.info("Dispatcher {} was granted leadership with fencing 
token {}", getAddress(), newLeaderSessionID);
+   runAsyncWithoutFencing(
+   () -> {
+   log.info("Dispatcher {} was granted leadership 
with fencing token {}", getAddress(), newLeaderSessionID);
 
-   final CompletableFuture> 
recoveredJobsFuture = recoverJobs();
+   final CompletableFuture> 
recoveredJobsFuture = recoveryOperation.thenComposeAsync(
+   ignored -> recoverJobs(),
 
 Review comment:
   Good point. Will fix it  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10303) Fix critical vulnerabilities Python API

2018-09-13 Thread Konstantin Knauf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613713#comment-16613713
 ] 

Konstantin Knauf commented on FLINK-10303:
--

[~Zentol] I think, this was done with Sonar. I just added the maven dependency 
check plugin to flink-streaming-python 
(https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html 
). It finds CVE-2016-4000 as well, so could be used to verify. About the other 
one, I am not sure. It is not found by the OWASP dependency check as far as I 
can tell. It looks as if it is a vulnerability in `pip`, doesn't it? 

> Fix critical vulnerabilities Python API
> ---
>
> Key: FLINK-10303
> URL: https://issues.apache.org/jira/browse/FLINK-10303
> Project: Flink
>  Issue Type: Improvement
>  Components: Python API
>Affects Versions: 1.6.0
>Reporter: Konstantin Knauf
>Priority: Major
>
> A user has reported two "critical" vulnerabilities in the Python API, which 
> we should probably fix: 
> * https://nvd.nist.gov/vuln/detail/CVE-2016-4000
> * https://cwe.mitre.org/data/definitions/384.html in 
> flink-streaming-python_2.11-1.6.0.jar <= pip-1.6-py2.py3-none-any.whl <= 
> sessions.py : [2.1.0, 2.6.0)
> For users, who don't need the Python API, an easy work-around is exclude the 
> flink-streaming-python_2.11.jar from the distribution. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10340) Implement Cosh udf

2018-09-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10340:


Assignee: vinoyang

> Implement Cosh udf
> --
>
> Key: FLINK-10340
> URL: https://issues.apache.org/jira/browse/FLINK-10340
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Sergey Tsvetkov
>Assignee: vinoyang
>Priority: Minor
>
> Implement udf of cosh, just like in oracle
> [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Deleted] (FLINK-10345) Rethink SubmittedJobGraphListener

2018-09-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann deleted FLINK-10345:
--


> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10345
> URL: https://issues.apache.org/jira/browse/FLINK-10345
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Till Rohrmann
>Priority: Major
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-13 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10344:
-

 Summary: Rethink SubmittedJobGraphListener
 Key: FLINK-10344
 URL: https://issues.apache.org/jira/browse/FLINK-10344
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.7.0


The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
return false positives. This is obviously problematic, because it causes the 
subsequent recovery operation to fail. Ideally we would not require the 
{{SubmittedJobGraphListener}}. One could, for example, periodically check from 
the main thread whether there are new jobs. That way we would know which jobs 
are currently running and which are being cleaned up. 

Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10345) Rethink SubmittedJobGraphListener

2018-09-13 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10345:
-

 Summary: Rethink SubmittedJobGraphListener
 Key: FLINK-10345
 URL: https://issues.apache.org/jira/browse/FLINK-10345
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.7.0


The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
return false positives. This is obviously problematic, because it causes the 
subsequent recovery operation to fail. Ideally we would not require the 
{{SubmittedJobGraphListener}}. One could, for example, periodically check from 
the main thread whether there are new jobs. That way we would know which jobs 
are currently running and which are being cleaned up. 

Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10336) Use ZooKeeperStateStore in ZooKeeperSubmittedJobGraphStore

2018-09-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10336:


Assignee: vinoyang

> Use ZooKeeperStateStore in ZooKeeperSubmittedJobGraphStore
> --
>
> Key: FLINK-10336
> URL: https://issues.apache.org/jira/browse/FLINK-10336
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> Use the {{ZooKeeperStateStore}} in {{ZooKeeperSubmittedJobGraphStore}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10335) Create common ZooKeeperStateStore based on ZooKeeperStateHandleStore and RetrievableStateStorageHelper

2018-09-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10335:


Assignee: vinoyang

> Create common ZooKeeperStateStore based on ZooKeeperStateHandleStore and 
> RetrievableStateStorageHelper
> --
>
> Key: FLINK-10335
> URL: https://issues.apache.org/jira/browse/FLINK-10335
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> Create a common {{ZooKeeperStateStore}} which is based on 
> {{ZooKeeperStateHandleStore}} and the {{RetrievableStateStorageHelper}} which 
> encapsulates the storage logic of large state objects whose handles are 
> persisted to ZooKeeper. This could be used by the 
> {{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperCompletedCheckpointStore}} 
> and {{ZooKeeperMesosWorkerStore}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10334) Move RetrievableStateStorageHelper out of ZooKeeperStateHandleStore

2018-09-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10334:


Assignee: vinoyang

> Move RetrievableStateStorageHelper out of ZooKeeperStateHandleStore
> ---
>
> Key: FLINK-10334
> URL: https://issues.apache.org/jira/browse/FLINK-10334
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> Move the {{RetrievableStateStorageHelper}} out of the 
> {{ZooKeeperStateHandleStore}} in order to better separate concerns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2018-09-13 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10333:
--
Description: 
While going over the ZooKeeper based stores 
({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
{{ZooKeeperCompletedCheckpointStore}}) and the underlying 
{{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
introduced with past incremental changes.

* Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} or 
{{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization problems 
will either lead to removing the Znode or not
* {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of exceptions 
(e.g. {{#getAllAndLock}} won't release the acquired locks in case of a failure)
* {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
better to move {{RetrievableStateStorageHelper}} out of it for a better 
separation of concerns
* {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even if 
it is locked. This should not happen since it could leave another system in an 
inconsistent state (imagine a changed {{JobGraph}} which restores from an old 
checkpoint)
* Redundant but also somewhat inconsistent put logic in the different stores
* Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
* Getting rid of the {{SubmittedJobGraphListener}} would be helpful

These problems made me think how reliable these components actually work. Since 
these components are very important, I propose to refactor them.

  was:
While going over the ZooKeeper based stores 
({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
{{ZooKeeperCompletedCheckpointStore}}) and the underlying 
{{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
introduced with past incremental changes.

* Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} or 
{{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization problems 
will either lead to removing the Znode or not
* {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of exceptions 
(e.g. {{#getAllAndLock}} won't release the acquired locks in case of a failure)
* {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
better to move {{RetrievableStateStorageHelper}} out of it for a better 
separation of concerns
* {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even if 
it is locked. This should not happen since it could leave another system in an 
inconsistent state (imagine a changed {{JobGraph}} which restores from an old 
checkpoint)
* Redundant but also somewhat inconsistent put logic in the different stores
* Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}

These problems made me think how reliable these components actually work. Since 
these components are very important, I propose to refactor them.


> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in 

[jira] [Updated] (FLINK-10343) Expose setCurrentKey method to streamRuntimeContext

2018-09-13 Thread aitozi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi updated FLINK-10343:
---
Description: 
when we use reducing state / aggregating keyed state and so on , we have to 
read value from state backend and update the value with userFunction and then 
put back to state backend. If we can just cache certain data in heap with a 
map, and update once in snapshot method with 

{code:java}
snapshot() {
 for(Map.Entry entry : map.entrySet()){
 setCurrentKey(entry.getKey());
 valueState.update(entry.getValue()); // put value back to state backend
}}
{code}

we just have to expose the setCurrentKey to userFunction and the will enable 
the ability to cache partitial keyedState in memory by userself.

what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? 


  was:
when we use reducing state / aggregating keyed state and so on , we have to 
read value from state backend and update the value with userFunction and then 
put back to state backend. If we can just cache certain data in heap with a 
map, and update once in snapshot method with 

```
snapshot() {
 for(Map.Entry entry : map.entrySet()){
 setCurrentKey(entry.getKey());
 valueState.update(entry.getValue()); // put value back to state backend
}}
```
we just have to expose the setCurrentKey to userFunction and the will enable 
the ability to cache partitial keyedState in memory by userself.

what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? 



> Expose setCurrentKey method to streamRuntimeContext
> ---
>
> Key: FLINK-10343
> URL: https://issues.apache.org/jira/browse/FLINK-10343
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.7.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.7.0
>
>
> when we use reducing state / aggregating keyed state and so on , we have to 
> read value from state backend and update the value with userFunction and then 
> put back to state backend. If we can just cache certain data in heap with a 
> map, and update once in snapshot method with 
> {code:java}
> snapshot() {
>  for(Map.Entry entry : map.entrySet()){
>  setCurrentKey(entry.getKey());
>  valueState.update(entry.getValue()); // put value back to state backend
> }}
> {code}
> we just have to expose the setCurrentKey to userFunction and the will enable 
> the ability to cache partitial keyedState in memory by userself.
> what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10343) Expose setCurrentKey method to streamRuntimeContext

2018-09-13 Thread aitozi (JIRA)
aitozi created FLINK-10343:
--

 Summary: Expose setCurrentKey method to streamRuntimeContext
 Key: FLINK-10343
 URL: https://issues.apache.org/jira/browse/FLINK-10343
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.7.0
Reporter: aitozi
Assignee: aitozi
 Fix For: 1.7.0


when we use reducing state / aggregating keyed state and so on , we have to 
read value from state backend and update the value with userFunction and then 
put back to state backend. If we can just cache certain data in heap with a 
map, and update once in snapshot method with 

```
snapshot() {
 for(Map.Entry entry : map.entrySet()){
 setCurrentKey(entry.getKey());
 valueState.update(entry.getValue()); // put value back to state backend
}}
```
we just have to expose the setCurrentKey to userFunction and the will enable 
the ability to cache partitial keyedState in memory by userself.

what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? 




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components

2018-09-13 Thread GitBox
walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate 
unused BaseAlignedWindowAssigner related components
URL: https://github.com/apache/flink/pull/6471#issuecomment-421048859
 
 
   Hi @fhueske @aljoscha, could you kindly take a look at this PR? We would 
like to make some changes to the "align" window assigner concept. It would be 
nice to depreciate current implementation to avoid confusions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10010) Deprecate unused BaseAlignedWindowAssigner related components

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613652#comment-16613652
 ] 

ASF GitHub Bot commented on FLINK-10010:


walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate 
unused BaseAlignedWindowAssigner related components
URL: https://github.com/apache/flink/pull/6471#issuecomment-421048859
 
 
   Hi @fhueske @aljoscha, could you kindly take a look at this PR? We would 
like to make some changes to the "align" window assigner concept. It would be 
nice to depreciate current implementation to avoid confusions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate unused BaseAlignedWindowAssigner related components
> -
>
> Key: FLINK-10010
> URL: https://issues.apache.org/jira/browse/FLINK-10010
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>
> {{BaseAlignedWindowAssigner}} should be marked as deprecated and 
> {{SlidingAlignedProcessingTimeWindows}} should be removed from the Flink Repo.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613606#comment-16613606
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability 
to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-421038704
 
 
   For this, I would await the outcome of #6577. If we end up only having one 
"modern" Kafka connector we might just pass through the `ConsumerRecord`. 
Otherwise we might have to do some wrapping.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-09-13 Thread GitBox
aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability 
to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-421038704
 
 
   For this, I would await the outcome of #6577. If we end up only having one 
"modern" Kafka connector we might just pass through the `ConsumerRecord`. 
Otherwise we might have to do some wrapping.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613603#comment-16613603
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217406052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   I guess, I was worried about the same thing as @pnowojski ... the expanded 
method here will actually look like this:
   ```
boolean pruneTriggered = false;
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
if (bufferBuilders[targetChannel].isPresent()) {
bufferBuilder = 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613604#comment-16613604
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217397453
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
 
 Review comment:
   code duplication: why not use this?
   ```
   emit(record, new int[] { rng.nextInt(numChannels) });
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613602#comment-16613602
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217404839
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyFromSerializerToTargetChannel(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* @param targetChannel
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   boolean pruneTriggered = false;
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-  

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613601#comment-16613601
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217415039
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   I meant using
   ```
   final RecordWriter writer = isBroadcastEmit ?
new RecordWriter<>(partitionWriter) :
new RecordWriter<>(partitionWriter, new Broadcast<>());
   ```
   This would also check that `broadcastEmit()` does not rely on a broadcasting 
channel selector.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217404839
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyFromSerializerToTargetChannel(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* @param targetChannel
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   boolean pruneTriggered = false;
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
+
+   // If 

[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217406052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   I guess, I was worried about the same thing as @pnowojski ... the expanded 
method here will actually look like this:
   ```
boolean pruneTriggered = false;
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
if (bufferBuilders[targetChannel].isPresent()) {
bufferBuilder = 
bufferBuilders[targetChannel].get();
bufferBuilders[targetChannel] = 
Optional.empty();
numBytesOut.inc(bufferBuilder.finish());
numBuffersOut.inc();
  

[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217397453
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
 
 Review comment:
   code duplication: why not use this?
   ```
   emit(record, new int[] { rng.nextInt(numChannels) });
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-13 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217415039
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   I meant using
   ```
   final RecordWriter writer = isBroadcastEmit ?
new RecordWriter<>(partitionWriter) :
new RecordWriter<>(partitionWriter, new Broadcast<>());
   ```
   This would also check that `broadcastEmit()` does not rely on a broadcasting 
channel selector.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-13 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi reassigned FLINK-10342:
---

Assignee: Oleksandr Nitavskyi

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-13 Thread Oleksandr Nitavskyi (JIRA)
Oleksandr Nitavskyi created FLINK-10342:
---

 Summary: Kafka duplicate topic consumption when topic name is 
changed
 Key: FLINK-10342
 URL: https://issues.apache.org/jira/browse/FLINK-10342
 Project: Flink
  Issue Type: Bug
Reporter: Oleksandr Nitavskyi


In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
consume from old and a new topic in the same time which can lead to unexpected 
behavior.

Here is the PR with reproduce: https://github.com/apache/flink/pull/6691

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] JTaky opened a new pull request #6691: Reproduce double topic subscription

2018-09-13 Thread GitBox
JTaky opened a new pull request #6691: Reproduce double topic subscription
URL: https://github.com/apache/flink/pull/6691
 
 
   *Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
 - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
 
 - Name the pull request in the form "[FLINK-] [component] Title of the 
pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
 Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
 - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
 
 - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).
   
 - Each pull request should address only one issue, not mix up code from 
multiple issues.
 
 - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)
   
 - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

[jira] [Commented] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613564#comment-16613564
 ] 

ASF GitHub Bot commented on FLINK-10304:


TisonKun commented on issue #6673: [FLINK-10304] [client] Remove deprecated 
AbstractYarnClusterDescriptor field
URL: https://github.com/apache/flink/pull/6673#issuecomment-421026733
 
 
   cc @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated AbstractYarnClusterDescriptor field
> -
>
> Key: FLINK-10304
> URL: https://issues.apache.org/jira/browse/FLINK-10304
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, YARN
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Depend on [~trohrm...@apache.org]'s 
> [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2],
>  {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED 
> mode.
> After digging I found the main usages of it are
> 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether 
> {{allOptions}} has {{DETACHED_OPTION}} locally.
> 2. when AbstractYarnClusterDescriptor start a AM, it sets 
> {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. 
> At this point it seems that YarnClusterDescriptor should know whether or not 
> it is in detached mode.
> If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 
> codebase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field

2018-09-13 Thread GitBox
TisonKun commented on issue #6673: [FLINK-10304] [client] Remove deprecated 
AbstractYarnClusterDescriptor field
URL: https://github.com/apache/flink/pull/6673#issuecomment-421026733
 
 
   cc @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10321) Make the condition of broadcast partitioner simple

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613562#comment-16613562
 ] 

ASF GitHub Bot commented on FLINK-10321:


Aitozi commented on issue #6688: [FLINK-10321][network] Simplify the condition 
of BroadcastPartitioner
URL: https://github.com/apache/flink/pull/6688#issuecomment-421025756
 
 
   LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make the condition of broadcast partitioner simple
> --
>
> Key: FLINK-10321
> URL: https://issues.apache.org/jira/browse/FLINK-10321
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> The current  {{BroadcastPartitioner}} uses the vars of {{set}} and 
> {{setNumber}} as the condition for returning channel arrays.
> Instead of using {{set}} and {{setNumber}}, we can just check whether 
> {{returnChannel.length == numberOfOutputChannels}} as the condition to make 
> it simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Aitozi commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner

2018-09-13 Thread GitBox
Aitozi commented on issue #6688: [FLINK-10321][network] Simplify the condition 
of BroadcastPartitioner
URL: https://github.com/apache/flink/pull/6688#issuecomment-421025756
 
 
   LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10321) Make the condition of broadcast partitioner simple

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613561#comment-16613561
 ] 

ASF GitHub Bot commented on FLINK-10321:


TisonKun commented on issue #6688: [FLINK-10321][network] Simplify the 
condition of BroadcastPartitioner
URL: https://github.com/apache/flink/pull/6688#issuecomment-421024900
 
 
   FYI, travis fails by irrelevant issue.
   
   ```
   Status: Downloaded newer image for java:8-jre-alpine
---> fdc893b19a14
   Step 2/16 : RUN apk add --no-cache bash snappy
---> [Warning] IPv4 forwarding is disabled. Networking will not work.
---> Running in 04ebab036ceb
   fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz
   WARNING: Ignoring 
http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: 
temporary error (try again later)
   fetch 
http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz
   WARNING: Ignoring 
http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: 
temporary error (try again later)
   ERROR: unsatisfiable constraints:
 bash (missing):
   required by: world[bash]
 snappy (missing):
   required by: world[snappy]
   The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero 
code: 2
   
   
   No output has been received in the last 10m0s, this potentially indicates a 
stalled build or something wrong with the build itself.
   Check the details on how to adjust your build configuration on: 
https://docs.travis-ci.com/user/common-build-problems/#Build-times-out-because-no-output-was-received
   
   The build has been terminated
   ```
   
   @zentol is it something wrong on travis or our script?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make the condition of broadcast partitioner simple
> --
>
> Key: FLINK-10321
> URL: https://issues.apache.org/jira/browse/FLINK-10321
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> The current  {{BroadcastPartitioner}} uses the vars of {{set}} and 
> {{setNumber}} as the condition for returning channel arrays.
> Instead of using {{set}} and {{setNumber}}, we can just check whether 
> {{returnChannel.length == numberOfOutputChannels}} as the condition to make 
> it simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner

2018-09-13 Thread GitBox
TisonKun commented on issue #6688: [FLINK-10321][network] Simplify the 
condition of BroadcastPartitioner
URL: https://github.com/apache/flink/pull/6688#issuecomment-421024900
 
 
   FYI, travis fails by irrelevant issue.
   
   ```
   Status: Downloaded newer image for java:8-jre-alpine
---> fdc893b19a14
   Step 2/16 : RUN apk add --no-cache bash snappy
---> [Warning] IPv4 forwarding is disabled. Networking will not work.
---> Running in 04ebab036ceb
   fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz
   WARNING: Ignoring 
http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: 
temporary error (try again later)
   fetch 
http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz
   WARNING: Ignoring 
http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: 
temporary error (try again later)
   ERROR: unsatisfiable constraints:
 bash (missing):
   required by: world[bash]
 snappy (missing):
   required by: world[snappy]
   The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero 
code: 2
   
   
   No output has been received in the last 10m0s, this potentially indicates a 
stalled build or something wrong with the build itself.
   Check the details on how to adjust your build configuration on: 
https://docs.travis-ci.com/user/common-build-problems/#Build-times-out-because-no-output-was-received
   
   The build has been terminated
   ```
   
   @zentol is it something wrong on travis or our script?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10275) StreamTask support object reuse

2018-09-13 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

陈梓立 updated FLINK-10275:

Description: 
StreamTask support efficient object reuse. The purpose behind this is to reduce 
pressure on the garbage collector.

All objects are reused, without backup copies. The operators and UDFs must be 
careful to not keep any objects as state or not to modify the objects.

*FYI, now this thread is blocked by FLIP-21. The pull request attached is 
closed for a needed rework.*

  was:
StreamTask support efficient object reuse. The purpose behind this is to reduce 
pressure on the garbage collector.

All objects are reused, without backup copies. The operators and UDFs must be 
careful to not keep any objects as state or not to modify the objects.


> StreamTask support object reuse
> ---
>
> Key: FLINK-10275
> URL: https://issues.apache.org/jira/browse/FLINK-10275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> StreamTask support efficient object reuse. The purpose behind this is to 
> reduce pressure on the garbage collector.
> All objects are reused, without backup copies. The operators and UDFs must be 
> careful to not keep any objects as state or not to modify the objects.
> *FYI, now this thread is blocked by FLIP-21. The pull request attached is 
> closed for a needed rework.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10275) StreamTask support object reuse

2018-09-13 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

陈梓立 reassigned FLINK-10275:
---

Assignee: (was: 陈梓立)

> StreamTask support object reuse
> ---
>
> Key: FLINK-10275
> URL: https://issues.apache.org/jira/browse/FLINK-10275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> StreamTask support efficient object reuse. The purpose behind this is to 
> reduce pressure on the garbage collector.
> All objects are reused, without backup copies. The operators and UDFs must be 
> careful to not keep any objects as state or not to modify the objects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10275) StreamTask support object reuse

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613553#comment-16613553
 ] 

ASF GitHub Bot commented on FLINK-10275:


TisonKun commented on issue #6643: [FLINK-10275] StreamTask support object reuse
URL: https://github.com/apache/flink/pull/6643#issuecomment-421020528
 
 
   Although I don't think remain a stall pull request does harm, since this 
pull request should be updated nearly as a rework, I would close it.
   Thanks for your reminding @kl0u!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamTask support object reuse
> ---
>
> Key: FLINK-10275
> URL: https://issues.apache.org/jira/browse/FLINK-10275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> StreamTask support efficient object reuse. The purpose behind this is to 
> reduce pressure on the garbage collector.
> All objects are reused, without backup copies. The operators and UDFs must be 
> careful to not keep any objects as state or not to modify the objects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10275) StreamTask support object reuse

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613554#comment-16613554
 ] 

ASF GitHub Bot commented on FLINK-10275:


TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object 
reuse
URL: https://github.com/apache/flink/pull/6643
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/execution_configuration.md 
b/docs/dev/execution_configuration.md
index f0103b0f39f..3991ab59ac5 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -59,7 +59,7 @@ With the closure cleaner disabled, it might happen that an 
anonymous user functi
 
 - `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by 
default. Forces the Flink AvroTypeInformation to use the Avro serializer 
instead of Kryo for serializing Avro POJOs.
 
-- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are 
not reused in Flink. Enabling the object reuse mode will instruct the runtime 
to reuse user objects for better performance. Keep in mind that this can lead 
to bugs when the user-code function of an operation is not aware of this 
behavior.
+- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are 
not reused in Flink. Enabling the object reuse mode will instruct the runtime 
to reuse user objects for better performance. Keep in mind that this can lead 
to bugs when the user-defined function of an operation is not aware of this 
behavior.
 
 - **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status 
updates are printed to `System.out` by default. This setting allows to disable 
this behavior.
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59fa803791a..d36fd296562 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -601,8 +601,8 @@ public boolean isForceAvroEnabled() {
 
/**
 * Enables reusing objects that Flink internally uses for 
deserialization and passing
-* data to user-code functions. Keep in mind that this can lead to bugs 
when the
-* user-code function of an operation is not aware of this behaviour.
+* data to user-defined functions. Keep in mind that this can lead to 
bugs when the
+* user-defined function of an operation is not aware of this behaviour.
 */
public ExecutionConfig enableObjectReuse() {
objectReuse = true;
@@ -611,7 +611,7 @@ public ExecutionConfig enableObjectReuse() {
 
/**
 * Disables reusing objects that Flink internally uses for 
deserialization and passing
-* data to user-code functions. @see #enableObjectReuse()
+* data to user-defined functions. @see #enableObjectReuse()
 */
public ExecutionConfig disableObjectReuse() {
objectReuse = false;
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d8ba29ae478..f4185f4c129 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -51,7 +51,7 @@ class TableSinkITCase(
 val input = CollectionDataSets.get3TupleDataSet(env)
   .map(x => x).setParallelism(4) // increase DOP to 4
 
-val results = input.toTable(tEnv, 'a, 'b, 'c)
+input.toTable(tEnv, 'a, 'b, 'c)
   .where('a < 5 || 'a > 17)
   .select('c, 'b)
   .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 70e59f3d24d..95cb1df1b04 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -21,10 +21,12 @@ package org.apache.flink.table.runtime.stream.table
 import java.io.File
 import java.lang.{Boolean => JBool}
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.MapFunction
 import 

[GitHub] TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object reuse

2018-09-13 Thread GitBox
TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object 
reuse
URL: https://github.com/apache/flink/pull/6643
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/execution_configuration.md 
b/docs/dev/execution_configuration.md
index f0103b0f39f..3991ab59ac5 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -59,7 +59,7 @@ With the closure cleaner disabled, it might happen that an 
anonymous user functi
 
 - `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by 
default. Forces the Flink AvroTypeInformation to use the Avro serializer 
instead of Kryo for serializing Avro POJOs.
 
-- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are 
not reused in Flink. Enabling the object reuse mode will instruct the runtime 
to reuse user objects for better performance. Keep in mind that this can lead 
to bugs when the user-code function of an operation is not aware of this 
behavior.
+- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are 
not reused in Flink. Enabling the object reuse mode will instruct the runtime 
to reuse user objects for better performance. Keep in mind that this can lead 
to bugs when the user-defined function of an operation is not aware of this 
behavior.
 
 - **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status 
updates are printed to `System.out` by default. This setting allows to disable 
this behavior.
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59fa803791a..d36fd296562 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -601,8 +601,8 @@ public boolean isForceAvroEnabled() {
 
/**
 * Enables reusing objects that Flink internally uses for 
deserialization and passing
-* data to user-code functions. Keep in mind that this can lead to bugs 
when the
-* user-code function of an operation is not aware of this behaviour.
+* data to user-defined functions. Keep in mind that this can lead to 
bugs when the
+* user-defined function of an operation is not aware of this behaviour.
 */
public ExecutionConfig enableObjectReuse() {
objectReuse = true;
@@ -611,7 +611,7 @@ public ExecutionConfig enableObjectReuse() {
 
/**
 * Disables reusing objects that Flink internally uses for 
deserialization and passing
-* data to user-code functions. @see #enableObjectReuse()
+* data to user-defined functions. @see #enableObjectReuse()
 */
public ExecutionConfig disableObjectReuse() {
objectReuse = false;
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d8ba29ae478..f4185f4c129 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -51,7 +51,7 @@ class TableSinkITCase(
 val input = CollectionDataSets.get3TupleDataSet(env)
   .map(x => x).setParallelism(4) // increase DOP to 4
 
-val results = input.toTable(tEnv, 'a, 'b, 'c)
+input.toTable(tEnv, 'a, 'b, 'c)
   .where('a < 5 || 'a > 17)
   .select('c, 'b)
   .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 70e59f3d24d..95cb1df1b04 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -21,10 +21,12 @@ package org.apache.flink.table.runtime.stream.table
 import java.io.File
 import java.lang.{Boolean => JBool}
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import 

[GitHub] TisonKun commented on issue #6643: [FLINK-10275] StreamTask support object reuse

2018-09-13 Thread GitBox
TisonKun commented on issue #6643: [FLINK-10275] StreamTask support object reuse
URL: https://github.com/apache/flink/pull/6643#issuecomment-421020528
 
 
   Although I don't think remain a stall pull request does harm, since this 
pull request should be updated nearly as a rework, I would close it.
   Thanks for your reminding @kl0u!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10275) StreamTask support object reuse

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613544#comment-16613544
 ] 

ASF GitHub Bot commented on FLINK-10275:


kl0u commented on issue #6643: [FLINK-10275] StreamTask support object reuse
URL: https://github.com/apache/flink/pull/6643#issuecomment-421019224
 
 
   Hi @TisonKun,
   
   In the above discussion, there is consensus that until the related FLIP is 
agreed upon, there is not going to be any activity on this PR.  Could you close 
this PR so that we keep a clean backlog of PR's?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamTask support object reuse
> ---
>
> Key: FLINK-10275
> URL: https://issues.apache.org/jira/browse/FLINK-10275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> StreamTask support efficient object reuse. The purpose behind this is to 
> reduce pressure on the garbage collector.
> All objects are reused, without backup copies. The operators and UDFs must be 
> careful to not keep any objects as state or not to modify the objects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10341) Add option to print flink command when running bin/flink

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613543#comment-16613543
 ] 

ASF GitHub Bot commented on FLINK-10341:


TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to 
print flink command when running bin/flink
URL: https://github.com/apache/flink/pull/6690#issuecomment-421019008
 
 
   I am not a bash expert and find @dawidwys, you are right. To be accurate, it 
would be `bash -x COMMAND 2>&1 | grep exec` or something, otherwise it prints 
scare text.
   From my side it is a frequent demand to see what the real command executed 
so I would approve this change. We could achieve it even just by hacking this 
script locally. So the problem is whether we provide such feature within the 
project and, well, the print format and the approach by setenv not optimized so 
it looks like no more than just `bash -x`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add option to print flink command when running bin/flink
> 
>
> Key: FLINK-10341
> URL: https://issues.apache.org/jira/browse/FLINK-10341
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> It would be very useful if I can see the final java command which is used to 
> run flink program, specially when I hit very weird issue as flink 
> contributor. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on issue #6643: [FLINK-10275] StreamTask support object reuse

2018-09-13 Thread GitBox
kl0u commented on issue #6643: [FLINK-10275] StreamTask support object reuse
URL: https://github.com/apache/flink/pull/6643#issuecomment-421019224
 
 
   Hi @TisonKun,
   
   In the above discussion, there is consensus that until the related FLIP is 
agreed upon, there is not going to be any activity on this PR.  Could you close 
this PR so that we keep a clean backlog of PR's?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink

2018-09-13 Thread GitBox
TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to 
print flink command when running bin/flink
URL: https://github.com/apache/flink/pull/6690#issuecomment-421019008
 
 
   I am not a bash expert and find @dawidwys, you are right. To be accurate, it 
would be `bash -x COMMAND 2>&1 | grep exec` or something, otherwise it prints 
scare text.
   From my side it is a frequent demand to see what the real command executed 
so I would approve this change. We could achieve it even just by hacking this 
script locally. So the problem is whether we provide such feature within the 
project and, well, the print format and the approach by setenv not optimized so 
it looks like no more than just `bash -x`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613537#comment-16613537
 ] 

ASF GitHub Bot commented on FLINK-10327:


kl0u commented on issue #6687: [FLINK-10327][streaming] Expose 
processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687#issuecomment-421017411
 
 
   Hi @pnowojski ! 
   
   I can understand that this can be an interesting addition for some usecases, 
but it is a big one, and it should be discussed more thoroughly and, most 
importantly, more publicly. I would be against merging it as just a sub-commit 
of another feature.
   
   The reason is that this allows users to "play" with watermarks from the 
level of a `Function` and not `Operator`, which was, intentionally, the case so 
far.
   
   If you want to "hold back" the watermark, the this should be done by a 
watermark assigner.
   
   If you want to run a "callback" upon watermark, then so far the trick is to 
register a timer for `watermark + 1`. 
   
   I can find usecases which do not fall into any of the above, but for those 
so far we implement custom operators. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on issue #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction

2018-09-13 Thread GitBox
kl0u commented on issue #6687: [FLINK-10327][streaming] Expose 
processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687#issuecomment-421017411
 
 
   Hi @pnowojski ! 
   
   I can understand that this can be an interesting addition for some usecases, 
but it is a big one, and it should be discussed more thoroughly and, most 
importantly, more publicly. I would be against merging it as just a sub-commit 
of another feature.
   
   The reason is that this allows users to "play" with watermarks from the 
level of a `Function` and not `Operator`, which was, intentionally, the case so 
far.
   
   If you want to "hold back" the watermark, the this should be done by a 
watermark assigner.
   
   If you want to run a "callback" upon watermark, then so far the trick is to 
register a timer for `watermark + 1`. 
   
   I can find usecases which do not fall into any of the above, but for those 
so far we implement custom operators. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10329) Fail with exception if job cannot be removed by ZooKeeperSubmittedJobGraphStore#removeJobGraph

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613534#comment-16613534
 ] 

ASF GitHub Bot commented on FLINK-10329:


azagrebin commented on a change in pull request #6686: [FLINK-10329] 
[FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot 
be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6686#discussion_r217392484
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
 */
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
-   log.info("Dispatcher {} was granted leadership with fencing 
token {}", getAddress(), newLeaderSessionID);
+   runAsyncWithoutFencing(
+   () -> {
+   log.info("Dispatcher {} was granted leadership 
with fencing token {}", getAddress(), newLeaderSessionID);
 
-   final CompletableFuture> 
recoveredJobsFuture = recoverJobs();
+   final CompletableFuture> 
recoveredJobsFuture = recoveryOperation.thenComposeAsync(
+   ignored -> recoverJobs(),
 
 Review comment:
   `recoverJobs` could be now blocking if we use 
`recoveryOperation.thenApplyAsync`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail with exception if job cannot be removed by 
> ZooKeeperSubmittedJobGraphStore#removeJobGraph
> --
>
> Key: FLINK-10329
> URL: https://issues.apache.org/jira/browse/FLINK-10329
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Callers of {{ZooKeeperSubmittedJobGraph#removeJobGraph}} expect that we fail 
> with an exception if the {{JobGraph}} cannot be removed. This is not the case 
> since we call internally {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. 
> If this method returns {{false}}, then we need to fail with an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stoppi

2018-09-13 Thread GitBox
azagrebin commented on a change in pull request #6686: [FLINK-10329] 
[FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot 
be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6686#discussion_r217392484
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
 */
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
-   log.info("Dispatcher {} was granted leadership with fencing 
token {}", getAddress(), newLeaderSessionID);
+   runAsyncWithoutFencing(
+   () -> {
+   log.info("Dispatcher {} was granted leadership 
with fencing token {}", getAddress(), newLeaderSessionID);
 
-   final CompletableFuture> 
recoveredJobsFuture = recoverJobs();
+   final CompletableFuture> 
recoveredJobsFuture = recoveryOperation.thenComposeAsync(
+   ignored -> recoverJobs(),
 
 Review comment:
   `recoverJobs` could be now blocking if we use 
`recoveryOperation.thenApplyAsync`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10223) TaskManagers should log their ResourceID during startup

2018-09-13 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-10223:


Assignee: Gary Yao  (was: aitozi)

> TaskManagers should log their ResourceID during startup
> ---
>
> Key: FLINK-10223
> URL: https://issues.apache.org/jira/browse/FLINK-10223
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.1, 1.7.0
>Reporter: Konstantin Knauf
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> To debug exceptions like "org.apache.flink.util.FlinkException: The assigned 
> slot  was removed." in the master container it is often helpful to 
> know, which slot was provided by which Taskmanager. The only way to relate 
> slots to TaskManagers right now, seems to be to enable DEBUG logging for 
> `org.apache.flink.runtime.jobmaster.slotpool.SlotPool`.
> This would be solved, if each Taskmanager would log out their `ResouceID` 
> during startup as the `SlotID` mainly consists of the `ResourceID` of the 
> providing Taskmanager. For Mesos and YARN the `ResourceID` has an intrinsic 
> meaning, but for a stand-alone or containerized setup the `ResourceID` is 
> just the a random ID.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613528#comment-16613528
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-421014612
 
 
   @tzulitai, using ```Record``` wrapping Kafka ```ConsumerRecord``` allows to 
add for example timestamp from PR #6105 w/o need to change client code, so it 
looks like more extensible approach. Not sure how it makes hard to reuse some 
already existing deserialization formats such as 
```AvroDeserializationSchema```, at least not harder then now - 
```AvroDeserializationSchema``` will be wrapped via 
```KeyedDeserializationSchemaWrapper``` in exactly same way as know. Also 
```KeyedDeserializationSchemaWrapper``` calls only ```Record.value()```, so it 
doesn't ties deserialization of byte with access to other metadata, not in 
terms of execution path (in logical terms it is always tied because underlying 
level - Kafka ```ConsumerRecord``` contains key, value and metadata)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-09-13 Thread GitBox
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-421014612
 
 
   @tzulitai, using ```Record``` wrapping Kafka ```ConsumerRecord``` allows to 
add for example timestamp from PR #6105 w/o need to change client code, so it 
looks like more extensible approach. Not sure how it makes hard to reuse some 
already existing deserialization formats such as 
```AvroDeserializationSchema```, at least not harder then now - 
```AvroDeserializationSchema``` will be wrapped via 
```KeyedDeserializationSchemaWrapper``` in exactly same way as know. Also 
```KeyedDeserializationSchemaWrapper``` calls only ```Record.value()```, so it 
doesn't ties deserialization of byte with access to other metadata, not in 
terms of execution path (in logical terms it is always tied because underlying 
level - Kafka ```ConsumerRecord``` contains key, value and metadata)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10341) Add option to print flink command when running bin/flink

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613521#comment-16613521
 ] 

ASF GitHub Bot commented on FLINK-10341:


dawidwys commented on issue #6690: [FLINK-10341][shell script] Add option to 
print flink command when running bin/flink
URL: https://github.com/apache/flink/pull/6690#issuecomment-421012461
 
 
   I am sorry, but I don't think this is a valuable addition and I doubt it 
will be merged. For debugging scripts you can always invoke the script with 
`-x` flag. E.g. like this: `bash -x flink run ...` and you will get the command 
as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add option to print flink command when running bin/flink
> 
>
> Key: FLINK-10341
> URL: https://issues.apache.org/jira/browse/FLINK-10341
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> It would be very useful if I can see the final java command which is used to 
> run flink program, specially when I hit very weird issue as flink 
> contributor. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink

2018-09-13 Thread GitBox
dawidwys commented on issue #6690: [FLINK-10341][shell script] Add option to 
print flink command when running bin/flink
URL: https://github.com/apache/flink/pull/6690#issuecomment-421012461
 
 
   I am sorry, but I don't think this is a valuable addition and I doubt it 
will be merged. For debugging scripts you can always invoke the script with 
`-x` flag. E.g. like this: `bash -x flink run ...` and you will get the command 
as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-13 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-10331:

Description: 
With the re-design of the record writer interaction with the 
result(sub)partitions, flush requests can currently pile up in these scenarios:
- a previous flush request has not been completely handled yet and/or is still 
enqueued or
- the network stack is still polling from this subpartition and doesn't need a 
new notification

These lead to increased notifications in low latency settings (low output 
flusher intervals) which can be avoided.

  was:
With the re-design of the record writer interaction with the 
result(sub)partitions, flush requests can currently pile up in these scenarios_
- a previous flush request has not been completely handled yet and/or is still 
enqueued or
- the network stack is still polling from this subpartition and doesn't need a 
new notification

These lead to increased notifications in low latency settings (low output 
flusher intervals) which can be avoided.


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske edited a comment on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-09-13 Thread GitBox
fhueske edited a comment on issue #6508: [Flink-10079] [table] Automatically 
register sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#issuecomment-421010897
 
 
   Please also rebase your PR on the current master.
   
   Thank you,
   Fabian


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-09-13 Thread GitBox
fhueske commented on issue #6508: [Flink-10079] [table] Automatically register 
sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#issuecomment-421010897
 
 
   Please also rebase your PR on the current master.
   
   Thank you,
   Fabina


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10184) HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel

2018-09-13 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613513#comment-16613513
 ] 

Till Rohrmann edited comment on FLINK-10184 at 9/13/18 1:40 PM:


[~Jamalarm] yes, you would need to checkout my branch and then build Flink 
yourself. The binaries are then located in 
flink-dist/target/flink1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT. I don't think that 
you need to rebuild your job because the branch should not contain any API 
changes. I'm about to merge the PR so that there should be a snapshot build 
with the fix very soon (hopefully by tomorrow morning).

Thanks a lot for your help [~wcummings]!


was (Author: till.rohrmann):
[~Jamalarm] yes, you would need to checkout my branch and then build Flink 
yourself. The binaries are then located in 
flink-dist/target/flink1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT. I don't think that 
you need to rebuild your job because the branch should not contain any API 
changes. I'm about to merge the PR so that there should be a snapshot build 
with the fix very soon (hopefully by tomorrow morning).

> HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel
> --
>
> Key: FLINK-10184
> URL: https://issues.apache.org/jira/browse/FLINK-10184
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Thomas Wozniakowski
>Priority: Blocker
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> We have encountered a blocking issue when upgrading our cluster to 1.5.2.
> It appears that, when jobs are cancelled manually (in our case with a 
> savepoint), the JobGraphs are NOT removed from the Zookeeper {{jobgraphs}} 
> node.
> This means that, if you start a job, cancel it, restart it, cancel it, etc. 
> You will end up with many job graphs stored in zookeeper, but none of the 
> corresponding blobs in the Flink HA directory.
> When a HA failover occurs, the newly elected leader retrieves all of those 
> old JobGraph objects from Zookeeper, then goes looking for the corresponding 
> blobs in the HA directory. The blobs are not there so the JobManager explodes 
> and the process dies.
> At this point the cluster has to be fully stopped, the zookeeper jobgraphs 
> cleared out by hand, and all the jobmanagers restarted.
> I can see the following line in the JobManager logs:
> {quote}
> 2018-08-20 16:17:20,776 INFO  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> Removed job graph 4e9a5a9d70ca99dbd394c35f8dfeda65 from ZooKeeper.
> {quote}
> But looking in Zookeeper the {{4e9a5a9d70ca99dbd394c35f8dfeda65}} job is 
> still very much there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10184) HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel

2018-09-13 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613513#comment-16613513
 ] 

Till Rohrmann commented on FLINK-10184:
---

[~Jamalarm] yes, you would need to checkout my branch and then build Flink 
yourself. The binaries are then located in 
flink-dist/target/flink1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT. I don't think that 
you need to rebuild your job because the branch should not contain any API 
changes. I'm about to merge the PR so that there should be a snapshot build 
with the fix very soon (hopefully by tomorrow morning).

> HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel
> --
>
> Key: FLINK-10184
> URL: https://issues.apache.org/jira/browse/FLINK-10184
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Thomas Wozniakowski
>Priority: Blocker
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> We have encountered a blocking issue when upgrading our cluster to 1.5.2.
> It appears that, when jobs are cancelled manually (in our case with a 
> savepoint), the JobGraphs are NOT removed from the Zookeeper {{jobgraphs}} 
> node.
> This means that, if you start a job, cancel it, restart it, cancel it, etc. 
> You will end up with many job graphs stored in zookeeper, but none of the 
> corresponding blobs in the Flink HA directory.
> When a HA failover occurs, the newly elected leader retrieves all of those 
> old JobGraph objects from Zookeeper, then goes looking for the corresponding 
> blobs in the HA directory. The blobs are not there so the JobManager explodes 
> and the process dies.
> At this point the cluster has to be fully stopped, the zookeeper jobgraphs 
> cleared out by hand, and all the jobmanagers restarted.
> I can see the following line in the JobManager logs:
> {quote}
> 2018-08-20 16:17:20,776 INFO  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> Removed job graph 4e9a5a9d70ca99dbd394c35f8dfeda65 from ZooKeeper.
> {quote}
> But looking in Zookeeper the {{4e9a5a9d70ca99dbd394c35f8dfeda65}} job is 
> still very much there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-09-13 Thread GitBox
fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Automatically register sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#discussion_r217384955
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -750,7 +751,28 @@ abstract class TableEnvironment(val config: TableConfig) {
 if (null == sinkTableName) throw TableException("Name of TableSink must 
not be null.")
 if (sinkTableName.isEmpty) throw TableException("Name of TableSink must 
not be empty.")
 if (!isRegistered(sinkTableName)) {
-  throw TableException(s"No table was registered under the name 
$sinkTableName.")
+  // try resolving and registering sink table from registered external 
catalogs
+  try {
+val paths = sinkTableName.split("\\.")
+if (paths.length > 1) {
+  var externalCatalog = getRegisteredExternalCatalog(paths(0))
+  for (i <- 1 to (paths.length - 2)) {
+externalCatalog = externalCatalog.getSubCatalog(paths(i))
+  }
+  val externalTable = externalCatalog.getTable(paths(paths.length - 1))
+  if (externalTable.isTableSink) {
+registerTableSink(sinkTableName,
 
 Review comment:
   We should not register the sink table again in the root catalog.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10341) Add option to print flink command when running bin/flink

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613504#comment-16613504
 ] 

ASF GitHub Bot commented on FLINK-10341:


TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to 
print flink command when running bin/flink
URL: https://github.com/apache/flink/pull/6690#issuecomment-421008894
 
 
   +1 for the proposal. It is a net win and there are many command-line 
interface supporting this feature. One thing remains would be how we display 
such message, I don't think `FLINK_COMMAND:...` is a beautiful format, also the 
env var `PRINT_FLINK_COMMAND` not very expressive.
   
   cc @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add option to print flink command when running bin/flink
> 
>
> Key: FLINK-10341
> URL: https://issues.apache.org/jira/browse/FLINK-10341
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> It would be very useful if I can see the final java command which is used to 
> run flink program, specially when I hit very weird issue as flink 
> contributor. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >