[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614390#comment-16614390 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605192 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyFromSerializerToTargetChannel(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* @param targetChannel +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + boolean pruneTriggered = false; + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). -
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614389#comment-16614389 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605083 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: Yes, that makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605192 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyFromSerializerToTargetChannel(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* @param targetChannel +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + boolean pruneTriggered = false; + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); + + //
[GitHub] zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605083 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: Yes, that makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10310) Cassandra Sink - Handling failing requests
[ https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614371#comment-16614371 ] Jayant Ameta commented on FLINK-10310: -- Hi [~till.rohrmann] {{CassandraSinkBase}} would have a field similar to {{ActionRequestFailureHandler}}. In the {{checkAsyncErrors}} method, the failureHandler would be called instead of throwing the {{IOException}} Current code snippet {code:java} private void checkAsyncErrors() throws Exception { Throwable error = exception; if (error != null) { // prevent throwing duplicated error exception = null; throw new IOException("Error while sending value.", error); } } {code} would change to: {code:java} private void checkAsyncErrors() throws Exception { Throwable error = exception; if (error != null) { failureHandler.onFailure(error); } } {code} Here the {{failureHandler}} can decide what steps to take based on the {{Throwable}}. > Cassandra Sink - Handling failing requests > -- > > Key: FLINK-10310 > URL: https://issues.apache.org/jira/browse/FLINK-10310 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jayant Ameta >Priority: Major > > The cassandra sink fails for any kind of error. For some exceptions (e.g > WriteTimeoutException), ignoring the exception may be acceptable as well. > Can we discuss having a FailureHandler on the lines of > ActionRequestFailureHandler? > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10240) Pluggable scheduling strategy for batch job
[ https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614335#comment-16614335 ] 陈梓立 commented on FLINK-10240: - Introduce pluggable schedule strategy is an excellent idea that could expand a lot the cases Flink is able to handle. I like this idea and can help. However, the document attached above is read-only. So I remain my comments as a link to a copy of it below. Most of them are layout improvements and minor reword, the body of document is no more than the original design. https://docs.google.com/document/d/15pUYc5_yrY2IwmnADCoNWZwOIYCOcroWmuuZHs-vdlU/edit?usp=sharing Note that this is a EDITABLE document and everyone interest on it can remains comments or edit it directly. As an open source software we just trust our contributors and the document could be frozen and left comment-only if the discussion reaches a consensus. > Pluggable scheduling strategy for batch job > --- > > Key: FLINK-10240 > URL: https://issues.apache.org/jira/browse/FLINK-10240 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Reporter: Zhu Zhu >Priority: Major > Labels: scheduling > > Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source > tasks are scheduled in the beginning, and other tasks are scheduled once > there input data are consumable. > However, input data consumable does not always mean the task can work at > once. > > One example is the hash join operation, where the operator first consumes one > side(we call it build side) to setup a table, then consumes the other side(we > call it probe side) to do the real join work. If the probe side is started > early, it just get stuck on back pressure as the join operator will not > consume data from it before the building stage is done, causing a waste of > resources. > If we have the probe side task started after the build stage is done, both > the build and probe side can have more computing resources as they are > staggered. > > That's why we think a flexible scheduling strategy is needed, allowing job > owners to customize the vertex schedule order and constraints. Better > resource utilization usually means better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM
[ https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614325#comment-16614325 ] ASF GitHub Bot commented on FLINK-10319: TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Too many requestPartitionState would crash JM > - > > Key: FLINK-10319 > URL: https://issues.apache.org/jira/browse/FLINK-10319 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Do not requestPartitionState from JM on partition request fail, which may > generate too many RPC requests and block JM. > We gain little benefit to check what state producer is in, which in the other > hand crash JM by too many RPC requests. Task could always > retriggerPartitionRequest from its InputGate, it would be fail if the > producer has gone and succeed if the producer alive. Anyway, no need to ask > for JM for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM
TisonKun commented on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10240) Pluggable scheduling strategy for batch job
[ https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614296#comment-16614296 ] Zhu Zhu commented on FLINK-10240: - Here's the design for a pluggable scheduling mechanism, which I think can achieve customizable and flexible scheduling to fit for different operators and topologies. https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/edit?usp=sharing > Pluggable scheduling strategy for batch job > --- > > Key: FLINK-10240 > URL: https://issues.apache.org/jira/browse/FLINK-10240 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Reporter: Zhu Zhu >Priority: Major > Labels: scheduling > > Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source > tasks are scheduled in the beginning, and other tasks are scheduled once > there input data are consumable. > However, input data consumable does not always mean the task can work at > once. > > One example is the hash join operation, where the operator first consumes one > side(we call it build side) to setup a table, then consumes the other side(we > call it probe side) to do the real join work. If the probe side is started > early, it just get stuck on back pressure as the join operator will not > consume data from it before the building stage is done, causing a waste of > resources. > If we have the probe side task started after the build stage is done, both > the build and probe side can have more computing resources as they are > staggered. > > That's why we think a flexible scheduling strategy is needed, allowing job > owners to customize the vertex schedule order and constraints. Better > resource utilization usually means better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9991) Add regexp_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614294#comment-16614294 ] ASF GitHub Bot commented on FLINK-9991: --- yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#issuecomment-421216764 @xccui I have refactored this PR. Can you review it again? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regexp_replace supported in TableAPI and SQL > > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace supported in TableAPI and SQL
yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#issuecomment-421216764 @xccui I have refactored this PR. Can you review it again? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10240) Pluggable scheduling strategy for batch job
[ https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10240: Summary: Pluggable scheduling strategy for batch job (was: Flexible scheduling strategy is needed for batch job) > Pluggable scheduling strategy for batch job > --- > > Key: FLINK-10240 > URL: https://issues.apache.org/jira/browse/FLINK-10240 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Reporter: Zhu Zhu >Priority: Major > Labels: scheduling > > Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source > tasks are scheduled in the beginning, and other tasks are scheduled once > there input data are consumable. > However, input data consumable does not always mean the task can work at > once. > > One example is the hash join operation, where the operator first consumes one > side(we call it build side) to setup a table, then consumes the other side(we > call it probe side) to do the real join work. If the probe side is started > early, it just get stuck on back pressure as the join operator will not > consume data from it before the building stage is done, causing a waste of > resources. > If we have the probe side task started after the build stage is done, both > the build and probe side can have more computing resources as they are > staggered. > > That's why we think a flexible scheduling strategy is needed, allowing job > owners to customize the vertex schedule order and constraints. Better > resource utilization usually means better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10332) Move data available notification in PipelinedSubpartition out of the synchronized block
[ https://issues.apache.org/jira/browse/FLINK-10332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614131#comment-16614131 ] ASF GitHub Bot commented on FLINK-10332: NicoK opened a new pull request #6693: [FLINK-10332][network] move data notification out of the synchronized block URL: https://github.com/apache/flink/pull/6693 ## What is the purpose of the change Currently, calls to `PipelinedSubpartition#notifyDataAvailable()` are unnecessarily executed inside a synchronized (buffers) block which may lead to lock contention which this PR fixes. Please note, that we build upon #6692. ## Brief change log - move data notification out of the synchronized block ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** (depending on output flusher interval, rather per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move data available notification in PipelinedSubpartition out of the > synchronized block > --- > > Key: FLINK-10332 > URL: https://issues.apache.org/jira/browse/FLINK-10332 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, calls to {{PipelinedSubpartition#notifyDataAvailable();}} are > unnecessarily executed inside a {{synchronized (buffers)}} block which may > lead to lock contention. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10332) Move data available notification in PipelinedSubpartition out of the synchronized block
[ https://issues.apache.org/jira/browse/FLINK-10332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10332: --- Labels: pull-request-available (was: ) > Move data available notification in PipelinedSubpartition out of the > synchronized block > --- > > Key: FLINK-10332 > URL: https://issues.apache.org/jira/browse/FLINK-10332 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, calls to {{PipelinedSubpartition#notifyDataAvailable();}} are > unnecessarily executed inside a {{synchronized (buffers)}} block which may > lead to lock contention. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK opened a new pull request #6693: [FLINK-10332][network] move data notification out of the synchronized block
NicoK opened a new pull request #6693: [FLINK-10332][network] move data notification out of the synchronized block URL: https://github.com/apache/flink/pull/6693 ## What is the purpose of the change Currently, calls to `PipelinedSubpartition#notifyDataAvailable()` are unnecessarily executed inside a synchronized (buffers) block which may lead to lock contention which this PR fixes. Please note, that we build upon #6692. ## Brief change log - move data notification out of the synchronized block ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** (depending on output flusher interval, rather per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix
[ https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614130#comment-16614130 ] ASF GitHub Bot commented on FLINK-10222: fhueske commented on a change in pull request #6622: [FLINK-10222] [table] Table scalar function expression parses error when function name equals the exists keyword suffix URL: https://github.com/apache/flink/pull/6622#discussion_r217554963 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -45,6 +45,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { ("""(?i)\Q""" + kw.key + """\E""").r } + // Convert the keyword into an case insensitive Parser + // It uses an exact matching mode. scenes to be used: + // a keyword as a suffix no required parameter and not as a prefix + def keyword2ParserForSuffixButNotAsPrefix(kw: Keyword): Parser[String] = { Review comment: Yes, I agree with @walterddr. This seems to be a better approach. A keyword should only match if it is not followed by another identifier (see [Java identifier specs](https://docs.oracle.com/javase/specs/jls/se7/html/jls-3.html#jls-3.8)) character. What do you think @yanghua? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Table scalar function expression parses error when function name equals the > exists keyword suffix > - > > Key: FLINK-10222 > URL: https://issues.apache.org/jira/browse/FLINK-10222 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: vinoyang >Priority: Major > Labels: pull-request-available > > Suffix extraction in ExpressionParser.scala does not actually support > extraction of keywords with the same prefix. For example: Adding suffix > parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes > exceptions. > some discussion : > [https://github.com/apache/flink/pull/6432#issuecomment-416127815] > https://github.com/apache/flink/pull/6585#discussion_r212797015 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6622: [FLINK-10222] [table] Table scalar function expression parses error when function name equals the exists keyword suffix
fhueske commented on a change in pull request #6622: [FLINK-10222] [table] Table scalar function expression parses error when function name equals the exists keyword suffix URL: https://github.com/apache/flink/pull/6622#discussion_r217554963 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -45,6 +45,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { ("""(?i)\Q""" + kw.key + """\E""").r } + // Convert the keyword into an case insensitive Parser + // It uses an exact matching mode. scenes to be used: + // a keyword as a suffix no required parameter and not as a prefix + def keyword2ParserForSuffixButNotAsPrefix(kw: Keyword): Parser[String] = { Review comment: Yes, I agree with @walterddr. This seems to be a better approach. A keyword should only match if it is not followed by another identifier (see [Java identifier specs](https://docs.oracle.com/javase/specs/jls/se7/html/jls-3.html#jls-3.8)) character. What do you think @yanghua? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614128#comment-16614128 ] ASF GitHub Bot commented on FLINK-10331: NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692 ## What is the purpose of the change With the re-design of the record writer interaction with the result(sub)partitions, flush requests can currently pile up in these scenarios: - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification These lead to increased notifications in low latency settings (low output flusher intervals) which can be avoided. ## Brief change log - do not flush (again) in the scenarios mentioned above, relying on `flushRequested` and the `buffer` queue size - add intensive sanity checks to `SpillingAdaptiveSpanningRecordDeserializer` - several smaller improvement hotfixes (please see the individual commits) ## Verifying this change This change is already covered by existing tests plus a few new tests in `PipelinedSubpartitionTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** (depending on output flusher interval, rather per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10331: --- Labels: pull-request-available (was: ) > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary flushing
NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692 ## What is the purpose of the change With the re-design of the record writer interaction with the result(sub)partitions, flush requests can currently pile up in these scenarios: - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification These lead to increased notifications in low latency settings (low output flusher intervals) which can be avoided. ## Brief change log - do not flush (again) in the scenarios mentioned above, relying on `flushRequested` and the `buffer` queue size - add intensive sanity checks to `SpillingAdaptiveSpanningRecordDeserializer` - several smaller improvement hotfixes (please see the individual commits) ## Verifying this change This change is already covered by existing tests plus a few new tests in `PipelinedSubpartitionTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** (depending on output flusher interval, rather per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan
[ https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614115#comment-16614115 ] ASF GitHub Bot commented on FLINK-10290: fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217547642 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala ## @@ -742,6 +742,42 @@ class TableSourceITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testTableSourceScanWithConversion(): Unit = { Review comment: Rename to `testTableSourceScanWithPermutedFields`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Conversion error in StreamScan and BatchScan > > > Key: FLINK-10290 > URL: https://issues.apache.org/jira/browse/FLINK-10290 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.3, 1.6.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > `RowTypeInfo#equals()` only compares field types, and fields names are not > considered. When checking the equality of `inputType` and `internalType`, we > should compare both filed types and field names. > Behavior of this bug: > A table T with schema (a: Long, b:Long, c:Long) > SELECT b,c,a from T > expected: b,c,a > actually: a,b,c -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan
[ https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614114#comment-16614114 ] ASF GitHub Bot commented on FLINK-10290: fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217544935 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with DataStreamRel { f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) -if (input.getType == cRowType && !hasTimeIndicator) { +if (inputType == cRowType && !hasTimeIndicator) { Review comment: `CRowTypeInfo` doesn't check for field name equality either. Can you add a `schemaEquals()` method to `CRowTypeInfo` and call this here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Conversion error in StreamScan and BatchScan > > > Key: FLINK-10290 > URL: https://issues.apache.org/jira/browse/FLINK-10290 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.3, 1.6.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > `RowTypeInfo#equals()` only compares field types, and fields names are not > considered. When checking the equality of `inputType` and `internalType`, we > should compare both filed types and field names. > Behavior of this bug: > A table T with schema (a: Long, b:Long, c:Long) > SELECT b,c,a from T > expected: b,c,a > actually: a,b,c -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan
[ https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614116#comment-16614116 ] ASF GitHub Bot commented on FLINK-10290: fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217547586 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala ## @@ -778,4 +778,42 @@ class TableSourceITCase extends AbstractTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testTableSourceScanWithConversion(): Unit = { Review comment: Rename to `testTableSourceScanWithPermutedFields()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Conversion error in StreamScan and BatchScan > > > Key: FLINK-10290 > URL: https://issues.apache.org/jira/browse/FLINK-10290 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.3, 1.6.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > `RowTypeInfo#equals()` only compares field types, and fields names are not > considered. When checking the equality of `inputType` and `internalType`, we > should compare both filed types and field names. > Behavior of this bug: > A table T with schema (a: Long, b:Long, c:Long) > SELECT b,c,a from T > expected: b,c,a > actually: a,b,c -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan
[ https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614113#comment-16614113 ] ASF GitHub Bot commented on FLINK-10290: fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217545093 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with DataStreamRel { f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) -if (input.getType == cRowType && !hasTimeIndicator) { +if (inputType == cRowType && !hasTimeIndicator) { Review comment: Would also be good to have a test for this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Conversion error in StreamScan and BatchScan > > > Key: FLINK-10290 > URL: https://issues.apache.org/jira/browse/FLINK-10290 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.3, 1.6.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > `RowTypeInfo#equals()` only compares field types, and fields names are not > considered. When checking the equality of `inputType` and `internalType`, we > should compare both filed types and field names. > Behavior of this bug: > A table T with schema (a: Long, b:Long, c:Long) > SELECT b,c,a from T > expected: b,c,a > actually: a,b,c -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217544935 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with DataStreamRel { f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) -if (input.getType == cRowType && !hasTimeIndicator) { +if (inputType == cRowType && !hasTimeIndicator) { Review comment: `CRowTypeInfo` doesn't check for field name equality either. Can you add a `schemaEquals()` method to `CRowTypeInfo` and call this here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217547642 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala ## @@ -742,6 +742,42 @@ class TableSourceITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testTableSourceScanWithConversion(): Unit = { Review comment: Rename to `testTableSourceScanWithPermutedFields`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217547586 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala ## @@ -778,4 +778,42 @@ class TableSourceITCase extends AbstractTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testTableSourceScanWithConversion(): Unit = { Review comment: Rename to `testTableSourceScanWithPermutedFields()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan
fhueske commented on a change in pull request #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#discussion_r217545093 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -50,11 +51,12 @@ trait StreamScan extends CommonScan[CRow] with DataStreamRel { f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) -if (input.getType == cRowType && !hasTimeIndicator) { +if (inputType == cRowType && !hasTimeIndicator) { Review comment: Would also be good to have a test for this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-8688) Enable distinct aggregation for data stream on Table/SQL API
[ https://issues.apache.org/jira/browse/FLINK-8688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-8688. Resolution: Implemented Fix Version/s: 1.7.0 All sub-issues have been implemented > Enable distinct aggregation for data stream on Table/SQL API > > > Key: FLINK-8688 > URL: https://issues.apache.org/jira/browse/FLINK-8688 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Distinct aggregation is not currently supported on data stream with Table/SQL > API. This is an umbrella task for enabling distinct aggregation in various > use cases. > Discussion doc can be found here: > https://docs.google.com/document/d/1zj6OA-K2hi7ah8Fo-xTQB-mVmYfm6LsN2_NHgTCVmJI/edit?usp=sharing > > Distinct aggregation is a very important feature in SQL processing and there > are many JIRAs currently open with various use cases. The goal is to create > one solution to both unbounded and bounded distinct aggregation on data > stream so that it can easily be extended to support these use cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9232) Add harness test for AggregationCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-9232: - Issue Type: Improvement (was: Sub-task) Parent: (was: FLINK-8688) > Add harness test for AggregationCodeGenerator > -- > > Key: FLINK-9232 > URL: https://issues.apache.org/jira/browse/FLINK-9232 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Instead of relying on ITCase to cover the codegen result. We should have > direct test against that, for example using Harness test framework. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8739) Optimize runtime support for distinct filter
[ https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-8739: - Issue Type: Improvement (was: Sub-task) Parent: (was: FLINK-8688) > Optimize runtime support for distinct filter > > > Key: FLINK-8739 > URL: https://issues.apache.org/jira/browse/FLINK-8739 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Possible optimizaitons: > 1. Decouple distinct map and actual accumulator so that they can separately > be created in codegen. > 2. Reuse same distinct accumulator for filtering, e.g. `SELECT > COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614083#comment-16614083 ] ASF GitHub Bot commented on FLINK-5315: --- asfgit closed pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index f8bcd3da1af..a9b92fad995 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -370,6 +370,44 @@ Table result = orders Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute. + + +Distinct Aggregation +Batch Streaming +Result Updating + + +Similar to a SQL DISTINCT aggregation clause such as COUNT(DISTINCT a). Distinct aggregation declares that an aggregation function (built-in or user-defined) is only applied on distinct input values. Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation. +{% highlight java %} +Table orders = tableEnv.scan("Orders"); +// Distinct aggregation on group by +Table groupByDistinctResult = orders +.groupBy("a") +.select("a, b.sum.distinct as d"); +// Distinct aggregation on time window group by +Table groupByWindowDistinctResult = orders +.window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w") +.select("a, b.sum.distinct as d"); +// Distinct aggregation on over window +Table result = orders +.window(Over +.partitionBy("a") +.orderBy("rowtime") +.preceding("UNBOUNDED_RANGE") +.as("w")) +.select("a, b.avg.distinct over w, b.max over w, b.min over w"); +{% endhighlight %} +User-defined aggregation function can also be used with DISTINCT modifiers. To calculate the aggregate results only for distinct values, simply add the distinct modifier towards the aggregation function. +{% highlight java %} +Table orders = tEnv.scan("Orders"); + +// Use distinct aggregation for user-defined aggregate functions +tEnv.registerFunction("myUdagg", new MyUdagg()); +orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult"); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + + Distinct @@ -453,6 +491,44 @@ val result: Table = orders Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute. + + +Distinct Aggregation +Batch Streaming +Result Updating + + +Similar to a SQL DISTINCT AGGREGATION clause such as COUNT(DISTINCT a). Distinct aggregation declares that an aggregation function (built-in or user-defined) is only applied on distinct input values. Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation. +{% highlight scala %} +val orders: Table = tableEnv.scan("Orders"); +// Distinct aggregation on group by +val groupByDistinctResult = orders +.groupBy('a) +.select('a, 'b.sum.distinct as 'd) +// Distinct aggregation on time window group by +val groupByWindowDistinctResult = orders +.window(Tumble over 5.minutes on 'rowtime as 'w).groupBy('a, 'w) +.select('a, 'b.sum.distinct as 'd) +// Distinct aggregation on over window +val result = orders +.window(Over +partitionBy 'a +orderBy 'rowtime +preceding UNBOUNDED_RANGE +as 'w) +.select('a, 'b.avg.distinct over 'w, 'b.max over 'w, 'b.min over 'w) +{% endhighlight %} +User-defined aggregation function can also be used with DISTINCT modifiers. To calculate the aggregate results only for distinct values, simply add the distinct modifier towards the aggregation function. +{% highlight scala %} +val orders: Table = tEnv.scan("Orders"); + +// Use distinct aggregation for user-defined
[jira] [Closed] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-5315. Resolution: Implemented Fix Version/s: 1.7.0 Implemented for 1.7.0 with 04c7cdf7d3d7b0862f7a4a3e7d821b1fb62ad3f0 > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:scala} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:scala} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
asfgit closed pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index f8bcd3da1af..a9b92fad995 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -370,6 +370,44 @@ Table result = orders Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute. + + +Distinct Aggregation +Batch Streaming +Result Updating + + +Similar to a SQL DISTINCT aggregation clause such as COUNT(DISTINCT a). Distinct aggregation declares that an aggregation function (built-in or user-defined) is only applied on distinct input values. Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation. +{% highlight java %} +Table orders = tableEnv.scan("Orders"); +// Distinct aggregation on group by +Table groupByDistinctResult = orders +.groupBy("a") +.select("a, b.sum.distinct as d"); +// Distinct aggregation on time window group by +Table groupByWindowDistinctResult = orders +.window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w") +.select("a, b.sum.distinct as d"); +// Distinct aggregation on over window +Table result = orders +.window(Over +.partitionBy("a") +.orderBy("rowtime") +.preceding("UNBOUNDED_RANGE") +.as("w")) +.select("a, b.avg.distinct over w, b.max over w, b.min over w"); +{% endhighlight %} +User-defined aggregation function can also be used with DISTINCT modifiers. To calculate the aggregate results only for distinct values, simply add the distinct modifier towards the aggregation function. +{% highlight java %} +Table orders = tEnv.scan("Orders"); + +// Use distinct aggregation for user-defined aggregate functions +tEnv.registerFunction("myUdagg", new MyUdagg()); +orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult"); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + + Distinct @@ -453,6 +491,44 @@ val result: Table = orders Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute. + + +Distinct Aggregation +Batch Streaming +Result Updating + + +Similar to a SQL DISTINCT AGGREGATION clause such as COUNT(DISTINCT a). Distinct aggregation declares that an aggregation function (built-in or user-defined) is only applied on distinct input values. Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation. +{% highlight scala %} +val orders: Table = tableEnv.scan("Orders"); +// Distinct aggregation on group by +val groupByDistinctResult = orders +.groupBy('a) +.select('a, 'b.sum.distinct as 'd) +// Distinct aggregation on time window group by +val groupByWindowDistinctResult = orders +.window(Tumble over 5.minutes on 'rowtime as 'w).groupBy('a, 'w) +.select('a, 'b.sum.distinct as 'd) +// Distinct aggregation on over window +val result = orders +.window(Over +partitionBy 'a +orderBy 'rowtime +preceding UNBOUNDED_RANGE +as 'w) +.select('a, 'b.avg.distinct over 'w, 'b.max over 'w, 'b.min over 'w) +{% endhighlight %} +User-defined aggregation function can also be used with DISTINCT modifiers. To calculate the aggregate results only for distinct values, simply add the distinct modifier towards the aggregation function. +{% highlight scala %} +val orders: Table = tEnv.scan("Orders"); + +// Use distinct aggregation for user-defined aggregate functions +val myUdagg = new MyUdagg(); +orders.groupBy('users).select('users, myUdagg.distinct('points) as 'myDistinctResult); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result
[jira] [Closed] (FLINK-10223) TaskManagers should log their ResourceID during startup
[ https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-10223. > TaskManagers should log their ResourceID during startup > --- > > Key: FLINK-10223 > URL: https://issues.apache.org/jira/browse/FLINK-10223 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.1, 1.7.0 >Reporter: Konstantin Knauf >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > To debug exceptions like "org.apache.flink.util.FlinkException: The assigned > slot was removed." in the master container it is often helpful to > know, which slot was provided by which Taskmanager. The only way to relate > slots to TaskManagers right now, seems to be to enable DEBUG logging for > `org.apache.flink.runtime.jobmaster.slotpool.SlotPool`. > This would be solved, if each Taskmanager would log out their `ResouceID` > during startup as the `SlotID` mainly consists of the `ResourceID` of the > providing Taskmanager. For Mesos and YARN the `ResourceID` has an intrinsic > meaning, but for a stand-alone or containerized setup the `ResourceID` is > just the a random ID. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10223) TaskManagers should log their ResourceID during startup
[ https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao resolved FLINK-10223. -- Resolution: Fixed Fixed via 1.5: 9cbf99e7a56c50ba4e7bd5ee6444f6a08b4e5796 1.6: f5c1d99a7fc6d9243e12329bce14a300bff767a2 1.7: a7a06b05bb714cd212f3c672a5b886a5ee82a705 > TaskManagers should log their ResourceID during startup > --- > > Key: FLINK-10223 > URL: https://issues.apache.org/jira/browse/FLINK-10223 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.1, 1.7.0 >Reporter: Konstantin Knauf >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > To debug exceptions like "org.apache.flink.util.FlinkException: The assigned > slot was removed." in the master container it is often helpful to > know, which slot was provided by which Taskmanager. The only way to relate > slots to TaskManagers right now, seems to be to enable DEBUG logging for > `org.apache.flink.runtime.jobmaster.slotpool.SlotPool`. > This would be solved, if each Taskmanager would log out their `ResouceID` > during startup as the `SlotID` mainly consists of the `ResourceID` of the > providing Taskmanager. For Mesos and YARN the `ResourceID` has an intrinsic > meaning, but for a stand-alone or containerized setup the `ResourceID` is > just the a random ID. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10223) TaskManagers should log their ResourceID during startup
[ https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613974#comment-16613974 ] ASF GitHub Bot commented on FLINK-10223: asfgit closed pull request #6679: [FLINK-10223][LOG]Logging with resourceId during taskmanager startup URL: https://github.com/apache/flink/pull/6679 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index eb6df19a4ed..1ce29af00a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -710,6 +710,7 @@ private RegistrationResponse registerTaskExecutorInternal( WorkerRegistration registration = new WorkerRegistration<>(taskExecutorGateway, newWorker, dataPort, hardwareDescription); + log.info("Registering TaskManager {} ({}) at ResourceManager", taskExecutorResourceId, taskExecutorAddress); taskExecutors.put(taskExecutorResourceId, registration); taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index afe22dea1a4..5c1f420dd54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -349,6 +349,8 @@ public static TaskExecutor startTaskManager( checkNotNull(rpcService); checkNotNull(highAvailabilityServices); + LOG.info("Starting TaskManager with ResourceID: {}", resourceID); + InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); TaskManagerServicesConfiguration taskManagerServicesConfiguration = diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0988730689a..b58a67c1c07 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -350,7 +350,7 @@ class JobManager( hardwareInformation, numberOfSlots) => // we are being informed by the ResourceManager that a new task manager is available - log.debug(s"RegisterTaskManager: $msg") + log.info(s"RegisterTaskManager: $msg") val taskManager = sender() diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index c04084c55f4..2008ad87d56 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1831,7 +1831,7 @@ object TaskManager { taskManagerClass: Class[_ <: TaskManager]) : Unit = { -LOG.info("Starting TaskManager") +LOG.info(s"Starting TaskManager with ResourceID: $resourceID") // Bring up the TaskManager actor system first, bind it to the given address. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TaskManagers should log their ResourceID during startup > --- > > Key: FLINK-10223 > URL: https://issues.apache.org/jira/browse/FLINK-10223 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.1, 1.7.0 >Reporter: Konstantin Knauf >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > To debug exceptions like "org.apache.flink.util.FlinkException: The assigned > slot was removed." in the master
[GitHub] asfgit closed pull request #6679: [FLINK-10223][LOG]Logging with resourceId during taskmanager startup
asfgit closed pull request #6679: [FLINK-10223][LOG]Logging with resourceId during taskmanager startup URL: https://github.com/apache/flink/pull/6679 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index eb6df19a4ed..1ce29af00a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -710,6 +710,7 @@ private RegistrationResponse registerTaskExecutorInternal( WorkerRegistration registration = new WorkerRegistration<>(taskExecutorGateway, newWorker, dataPort, hardwareDescription); + log.info("Registering TaskManager {} ({}) at ResourceManager", taskExecutorResourceId, taskExecutorAddress); taskExecutors.put(taskExecutorResourceId, registration); taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index afe22dea1a4..5c1f420dd54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -349,6 +349,8 @@ public static TaskExecutor startTaskManager( checkNotNull(rpcService); checkNotNull(highAvailabilityServices); + LOG.info("Starting TaskManager with ResourceID: {}", resourceID); + InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); TaskManagerServicesConfiguration taskManagerServicesConfiguration = diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0988730689a..b58a67c1c07 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -350,7 +350,7 @@ class JobManager( hardwareInformation, numberOfSlots) => // we are being informed by the ResourceManager that a new task manager is available - log.debug(s"RegisterTaskManager: $msg") + log.info(s"RegisterTaskManager: $msg") val taskManager = sender() diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index c04084c55f4..2008ad87d56 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1831,7 +1831,7 @@ object TaskManager { taskManagerClass: Class[_ <: TaskManager]) : Unit = { -LOG.info("Starting TaskManager") +LOG.info(s"Starting TaskManager with ResourceID: $resourceID") // Bring up the TaskManager actor system first, bind it to the given address. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10329) Fail with exception if job cannot be removed by ZooKeeperSubmittedJobGraphStore#removeJobGraph
[ https://issues.apache.org/jira/browse/FLINK-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613739#comment-16613739 ] ASF GitHub Bot commented on FLINK-10329: tillrohrmann commented on issue #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6686#issuecomment-421070901 Thanks for the review @azagrebin. Merging this PR after I've merged #6678. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fail with exception if job cannot be removed by > ZooKeeperSubmittedJobGraphStore#removeJobGraph > -- > > Key: FLINK-10329 > URL: https://issues.apache.org/jira/browse/FLINK-10329 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Callers of {{ZooKeeperSubmittedJobGraph#removeJobGraph}} expect that we fail > with an exception if the {{JobGraph}} cannot be removed. This is not the case > since we call internally {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. > If this method returns {{false}}, then we need to fail with an exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10329) Fail with exception if job cannot be removed by ZooKeeperSubmittedJobGraphStore#removeJobGraph
[ https://issues.apache.org/jira/browse/FLINK-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613738#comment-16613738 ] ASF GitHub Bot commented on FLINK-10329: tillrohrmann commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6686#discussion_r217453535 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); + runAsyncWithoutFencing( + () -> { + log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); - final CompletableFuture> recoveredJobsFuture = recoverJobs(); + final CompletableFuture> recoveredJobsFuture = recoveryOperation.thenComposeAsync( + ignored -> recoverJobs(), Review comment: Good point. Will fix it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fail with exception if job cannot be removed by > ZooKeeperSubmittedJobGraphStore#removeJobGraph > -- > > Key: FLINK-10329 > URL: https://issues.apache.org/jira/browse/FLINK-10329 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Callers of {{ZooKeeperSubmittedJobGraph#removeJobGraph}} expect that we fail > with an exception if the {{JobGraph}} cannot be removed. This is not the case > since we call internally {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. > If this method returns {{false}}, then we need to fail with an exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeper
tillrohrmann commented on issue #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6686#issuecomment-421070901 Thanks for the review @azagrebin. Merging this PR after I've merged #6678. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when sto
tillrohrmann commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6686#discussion_r217453535 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); + runAsyncWithoutFencing( + () -> { + log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); - final CompletableFuture> recoveredJobsFuture = recoverJobs(); + final CompletableFuture> recoveredJobsFuture = recoveryOperation.thenComposeAsync( + ignored -> recoverJobs(), Review comment: Good point. Will fix it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10303) Fix critical vulnerabilities Python API
[ https://issues.apache.org/jira/browse/FLINK-10303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613713#comment-16613713 ] Konstantin Knauf commented on FLINK-10303: -- [~Zentol] I think, this was done with Sonar. I just added the maven dependency check plugin to flink-streaming-python (https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html ). It finds CVE-2016-4000 as well, so could be used to verify. About the other one, I am not sure. It is not found by the OWASP dependency check as far as I can tell. It looks as if it is a vulnerability in `pip`, doesn't it? > Fix critical vulnerabilities Python API > --- > > Key: FLINK-10303 > URL: https://issues.apache.org/jira/browse/FLINK-10303 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.6.0 >Reporter: Konstantin Knauf >Priority: Major > > A user has reported two "critical" vulnerabilities in the Python API, which > we should probably fix: > * https://nvd.nist.gov/vuln/detail/CVE-2016-4000 > * https://cwe.mitre.org/data/definitions/384.html in > flink-streaming-python_2.11-1.6.0.jar <= pip-1.6-py2.py3-none-any.whl <= > sessions.py : [2.1.0, 2.6.0) > For users, who don't need the Python API, an easy work-around is exclude the > flink-streaming-python_2.11.jar from the distribution. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10340) Implement Cosh udf
[ https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10340: Assignee: vinoyang > Implement Cosh udf > -- > > Key: FLINK-10340 > URL: https://issues.apache.org/jira/browse/FLINK-10340 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Sergey Tsvetkov >Assignee: vinoyang >Priority: Minor > > Implement udf of cosh, just like in oracle > [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Deleted] (FLINK-10345) Rethink SubmittedJobGraphListener
[ https://issues.apache.org/jira/browse/FLINK-10345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann deleted FLINK-10345: -- > Rethink SubmittedJobGraphListener > - > > Key: FLINK-10345 > URL: https://issues.apache.org/jira/browse/FLINK-10345 > Project: Flink > Issue Type: Sub-task >Reporter: Till Rohrmann >Priority: Major > > The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can > return false positives. This is obviously problematic, because it causes the > subsequent recovery operation to fail. Ideally we would not require the > {{SubmittedJobGraphListener}}. One could, for example, periodically check > from the main thread whether there are new jobs. That way we would know which > jobs are currently running and which are being cleaned up. > Alternatively it is necessary to tolerate false positives :-( -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10344) Rethink SubmittedJobGraphListener
Till Rohrmann created FLINK-10344: - Summary: Rethink SubmittedJobGraphListener Key: FLINK-10344 URL: https://issues.apache.org/jira/browse/FLINK-10344 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.6.0, 1.5.3, 1.7.0 Reporter: Till Rohrmann Fix For: 1.7.0 The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can return false positives. This is obviously problematic, because it causes the subsequent recovery operation to fail. Ideally we would not require the {{SubmittedJobGraphListener}}. One could, for example, periodically check from the main thread whether there are new jobs. That way we would know which jobs are currently running and which are being cleaned up. Alternatively it is necessary to tolerate false positives :-( -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10345) Rethink SubmittedJobGraphListener
Till Rohrmann created FLINK-10345: - Summary: Rethink SubmittedJobGraphListener Key: FLINK-10345 URL: https://issues.apache.org/jira/browse/FLINK-10345 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.6.0, 1.5.3, 1.7.0 Reporter: Till Rohrmann Fix For: 1.7.0 The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can return false positives. This is obviously problematic, because it causes the subsequent recovery operation to fail. Ideally we would not require the {{SubmittedJobGraphListener}}. One could, for example, periodically check from the main thread whether there are new jobs. That way we would know which jobs are currently running and which are being cleaned up. Alternatively it is necessary to tolerate false positives :-( -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10336) Use ZooKeeperStateStore in ZooKeeperSubmittedJobGraphStore
[ https://issues.apache.org/jira/browse/FLINK-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10336: Assignee: vinoyang > Use ZooKeeperStateStore in ZooKeeperSubmittedJobGraphStore > -- > > Key: FLINK-10336 > URL: https://issues.apache.org/jira/browse/FLINK-10336 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > Use the {{ZooKeeperStateStore}} in {{ZooKeeperSubmittedJobGraphStore}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10335) Create common ZooKeeperStateStore based on ZooKeeperStateHandleStore and RetrievableStateStorageHelper
[ https://issues.apache.org/jira/browse/FLINK-10335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10335: Assignee: vinoyang > Create common ZooKeeperStateStore based on ZooKeeperStateHandleStore and > RetrievableStateStorageHelper > -- > > Key: FLINK-10335 > URL: https://issues.apache.org/jira/browse/FLINK-10335 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > Create a common {{ZooKeeperStateStore}} which is based on > {{ZooKeeperStateHandleStore}} and the {{RetrievableStateStorageHelper}} which > encapsulates the storage logic of large state objects whose handles are > persisted to ZooKeeper. This could be used by the > {{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperCompletedCheckpointStore}} > and {{ZooKeeperMesosWorkerStore}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10334) Move RetrievableStateStorageHelper out of ZooKeeperStateHandleStore
[ https://issues.apache.org/jira/browse/FLINK-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10334: Assignee: vinoyang > Move RetrievableStateStorageHelper out of ZooKeeperStateHandleStore > --- > > Key: FLINK-10334 > URL: https://issues.apache.org/jira/browse/FLINK-10334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > Move the {{RetrievableStateStorageHelper}} out of the > {{ZooKeeperStateHandleStore}} in order to better separate concerns. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10333: -- Description: While going over the ZooKeeper based stores ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, {{ZooKeeperCompletedCheckpointStore}}) and the underlying {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were introduced with past incremental changes. * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization problems will either lead to removing the Znode or not * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case of a failure) * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be better to move {{RetrievableStateStorageHelper}} out of it for a better separation of concerns * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even if it is locked. This should not happen since it could leave another system in an inconsistent state (imagine a changed {{JobGraph}} which restores from an old checkpoint) * Redundant but also somewhat inconsistent put logic in the different stores * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} * Getting rid of the {{SubmittedJobGraphListener}} would be helpful These problems made me think how reliable these components actually work. Since these components are very important, I propose to refactor them. was: While going over the ZooKeeper based stores ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, {{ZooKeeperCompletedCheckpointStore}}) and the underlying {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were introduced with past incremental changes. * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization problems will either lead to removing the Znode or not * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case of a failure) * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be better to move {{RetrievableStateStorageHelper}} out of it for a better separation of concerns * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even if it is locked. This should not happen since it could leave another system in an inconsistent state (imagine a changed {{JobGraph}} which restores from an old checkpoint) * Redundant but also somewhat inconsistent put logic in the different stores * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} These problems made me think how reliable these components actually work. Since these components are very important, I propose to refactor them. > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in
[jira] [Updated] (FLINK-10343) Expose setCurrentKey method to streamRuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi updated FLINK-10343: --- Description: when we use reducing state / aggregating keyed state and so on , we have to read value from state backend and update the value with userFunction and then put back to state backend. If we can just cache certain data in heap with a map, and update once in snapshot method with {code:java} snapshot() { for(Map.Entry entry : map.entrySet()){ setCurrentKey(entry.getKey()); valueState.update(entry.getValue()); // put value back to state backend }} {code} we just have to expose the setCurrentKey to userFunction and the will enable the ability to cache partitial keyedState in memory by userself. what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? was: when we use reducing state / aggregating keyed state and so on , we have to read value from state backend and update the value with userFunction and then put back to state backend. If we can just cache certain data in heap with a map, and update once in snapshot method with ``` snapshot() { for(Map.Entry entry : map.entrySet()){ setCurrentKey(entry.getKey()); valueState.update(entry.getValue()); // put value back to state backend }} ``` we just have to expose the setCurrentKey to userFunction and the will enable the ability to cache partitial keyedState in memory by userself. what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? > Expose setCurrentKey method to streamRuntimeContext > --- > > Key: FLINK-10343 > URL: https://issues.apache.org/jira/browse/FLINK-10343 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Fix For: 1.7.0 > > > when we use reducing state / aggregating keyed state and so on , we have to > read value from state backend and update the value with userFunction and then > put back to state backend. If we can just cache certain data in heap with a > map, and update once in snapshot method with > {code:java} > snapshot() { > for(Map.Entry entry : map.entrySet()){ > setCurrentKey(entry.getKey()); > valueState.update(entry.getValue()); // put value back to state backend > }} > {code} > we just have to expose the setCurrentKey to userFunction and the will enable > the ability to cache partitial keyedState in memory by userself. > what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10343) Expose setCurrentKey method to streamRuntimeContext
aitozi created FLINK-10343: -- Summary: Expose setCurrentKey method to streamRuntimeContext Key: FLINK-10343 URL: https://issues.apache.org/jira/browse/FLINK-10343 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.7.0 Reporter: aitozi Assignee: aitozi Fix For: 1.7.0 when we use reducing state / aggregating keyed state and so on , we have to read value from state backend and update the value with userFunction and then put back to state backend. If we can just cache certain data in heap with a map, and update once in snapshot method with ``` snapshot() { for(Map.Entry entry : map.entrySet()){ setCurrentKey(entry.getKey()); valueState.update(entry.getValue()); // put value back to state backend }} ``` we just have to expose the setCurrentKey to userFunction and the will enable the ability to cache partitial keyedState in memory by userself. what's your opinion [~stefanrichte...@gmail.com] [~azagrebin] ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components
walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components URL: https://github.com/apache/flink/pull/6471#issuecomment-421048859 Hi @fhueske @aljoscha, could you kindly take a look at this PR? We would like to make some changes to the "align" window assigner concept. It would be nice to depreciate current implementation to avoid confusions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10010) Deprecate unused BaseAlignedWindowAssigner related components
[ https://issues.apache.org/jira/browse/FLINK-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613652#comment-16613652 ] ASF GitHub Bot commented on FLINK-10010: walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components URL: https://github.com/apache/flink/pull/6471#issuecomment-421048859 Hi @fhueske @aljoscha, could you kindly take a look at this PR? We would like to make some changes to the "align" window assigner concept. It would be nice to depreciate current implementation to avoid confusions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deprecate unused BaseAlignedWindowAssigner related components > - > > Key: FLINK-10010 > URL: https://issues.apache.org/jira/browse/FLINK-10010 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > {{BaseAlignedWindowAssigner}} should be marked as deprecated and > {{SlidingAlignedProcessingTimeWindows}} should be removed from the Flink Repo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers
[ https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613606#comment-16613606 ] ASF GitHub Bot commented on FLINK-8354: --- aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-421038704 For this, I would await the outcome of #6577. If we end up only having one "modern" Kafka connector we might just pass through the `ConsumerRecord`. Otherwise we might have to do some wrapping. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Kafka connector ignores Kafka message headers > - > > Key: FLINK-8354 > URL: https://issues.apache.org/jira/browse/FLINK-8354 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Environment: Kafka 0.11.0.0 > Flink 1.4.0 > flink-connector-kafka-0.11_2.11 >Reporter: Mohammad Abareghi >Assignee: Aegeaner >Priority: Major > Labels: pull-request-available > > Kafka has introduced notion of Header for messages in version 0.11.0.0 > https://issues.apache.org/jira/browse/KAFKA-4208. > But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores > headers when consuming kafka messages. > It would be useful in some scenarios, such as distributed log tracing, to > support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
aljoscha commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-421038704 For this, I would await the outcome of #6577. If we end up only having one "modern" Kafka connector we might just pass through the `ConsumerRecord`. Otherwise we might have to do some wrapping. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613603#comment-16613603 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217406052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: I guess, I was worried about the same thing as @pnowojski ... the expanded method here will actually look like this: ``` boolean pruneTriggered = false; BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { if (bufferBuilders[targetChannel].isPresent()) { bufferBuilder =
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613604#comment-16613604 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217397453 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } Review comment: code duplication: why not use this? ``` emit(record, new int[] { rng.nextInt(numChannels) }); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613602#comment-16613602 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217404839 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyFromSerializerToTargetChannel(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* @param targetChannel +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + boolean pruneTriggered = false; + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). -
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613601#comment-16613601 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217415039 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: I meant using ``` final RecordWriter writer = isBroadcastEmit ? new RecordWriter<>(partitionWriter) : new RecordWriter<>(partitionWriter, new Broadcast<>()); ``` This would also check that `broadcastEmit()` does not rely on a broadcasting channel selector. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217404839 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyFromSerializerToTargetChannel(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* @param targetChannel +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + boolean pruneTriggered = false; + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); + + // If
[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217406052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: I guess, I was worried about the same thing as @pnowojski ... the expanded method here will actually look like this: ``` boolean pruneTriggered = false; BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { if (bufferBuilders[targetChannel].isPresent()) { bufferBuilder = bufferBuilders[targetChannel].get(); bufferBuilders[targetChannel] = Optional.empty(); numBytesOut.inc(bufferBuilder.finish()); numBuffersOut.inc();
[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217397453 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } Review comment: code duplication: why not use this? ``` emit(record, new int[] { rng.nextInt(numChannels) }); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217415039 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: I meant using ``` final RecordWriter writer = isBroadcastEmit ? new RecordWriter<>(partitionWriter) : new RecordWriter<>(partitionWriter, new Broadcast<>()); ``` This would also check that `broadcastEmit()` does not rely on a broadcasting channel selector. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi reassigned FLINK-10342: --- Assignee: Oleksandr Nitavskyi > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
Oleksandr Nitavskyi created FLINK-10342: --- Summary: Kafka duplicate topic consumption when topic name is changed Key: FLINK-10342 URL: https://issues.apache.org/jira/browse/FLINK-10342 Project: Flink Issue Type: Bug Reporter: Oleksandr Nitavskyi In case of topic name is simply renamed for a KafkaConsumer Flink starts to consume from old and a new topic in the same time which can lead to unexpected behavior. Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] JTaky opened a new pull request #6691: Reproduce double topic subscription
JTaky opened a new pull request #6691: Reproduce double topic subscription URL: https://github.com/apache/flink/pull/6691 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field
[ https://issues.apache.org/jira/browse/FLINK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613564#comment-16613564 ] ASF GitHub Bot commented on FLINK-10304: TisonKun commented on issue #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673#issuecomment-421026733 cc @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated AbstractYarnClusterDescriptor field > - > > Key: FLINK-10304 > URL: https://issues.apache.org/jira/browse/FLINK-10304 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Depend on [~trohrm...@apache.org]'s > [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2], > {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED > mode. > After digging I found the main usages of it are > 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether > {{allOptions}} has {{DETACHED_OPTION}} locally. > 2. when AbstractYarnClusterDescriptor start a AM, it sets > {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. > At this point it seems that YarnClusterDescriptor should know whether or not > it is in detached mode. > If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 > codebase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field
TisonKun commented on issue #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673#issuecomment-421026733 cc @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10321) Make the condition of broadcast partitioner simple
[ https://issues.apache.org/jira/browse/FLINK-10321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613562#comment-16613562 ] ASF GitHub Bot commented on FLINK-10321: Aitozi commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner URL: https://github.com/apache/flink/pull/6688#issuecomment-421025756 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make the condition of broadcast partitioner simple > -- > > Key: FLINK-10321 > URL: https://issues.apache.org/jira/browse/FLINK-10321 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > The current {{BroadcastPartitioner}} uses the vars of {{set}} and > {{setNumber}} as the condition for returning channel arrays. > Instead of using {{set}} and {{setNumber}}, we can just check whether > {{returnChannel.length == numberOfOutputChannels}} as the condition to make > it simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Aitozi commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner
Aitozi commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner URL: https://github.com/apache/flink/pull/6688#issuecomment-421025756 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10321) Make the condition of broadcast partitioner simple
[ https://issues.apache.org/jira/browse/FLINK-10321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613561#comment-16613561 ] ASF GitHub Bot commented on FLINK-10321: TisonKun commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner URL: https://github.com/apache/flink/pull/6688#issuecomment-421024900 FYI, travis fails by irrelevant issue. ``` Status: Downloaded newer image for java:8-jre-alpine ---> fdc893b19a14 Step 2/16 : RUN apk add --no-cache bash snappy ---> [Warning] IPv4 forwarding is disabled. Networking will not work. ---> Running in 04ebab036ceb fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: temporary error (try again later) fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: temporary error (try again later) ERROR: unsatisfiable constraints: bash (missing): required by: world[bash] snappy (missing): required by: world[snappy] The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero code: 2 No output has been received in the last 10m0s, this potentially indicates a stalled build or something wrong with the build itself. Check the details on how to adjust your build configuration on: https://docs.travis-ci.com/user/common-build-problems/#Build-times-out-because-no-output-was-received The build has been terminated ``` @zentol is it something wrong on travis or our script? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make the condition of broadcast partitioner simple > -- > > Key: FLINK-10321 > URL: https://issues.apache.org/jira/browse/FLINK-10321 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > The current {{BroadcastPartitioner}} uses the vars of {{set}} and > {{setNumber}} as the condition for returning channel arrays. > Instead of using {{set}} and {{setNumber}}, we can just check whether > {{returnChannel.length == numberOfOutputChannels}} as the condition to make > it simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner
TisonKun commented on issue #6688: [FLINK-10321][network] Simplify the condition of BroadcastPartitioner URL: https://github.com/apache/flink/pull/6688#issuecomment-421024900 FYI, travis fails by irrelevant issue. ``` Status: Downloaded newer image for java:8-jre-alpine ---> fdc893b19a14 Step 2/16 : RUN apk add --no-cache bash snappy ---> [Warning] IPv4 forwarding is disabled. Networking will not work. ---> Running in 04ebab036ceb fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: temporary error (try again later) fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: temporary error (try again later) ERROR: unsatisfiable constraints: bash (missing): required by: world[bash] snappy (missing): required by: world[snappy] The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero code: 2 No output has been received in the last 10m0s, this potentially indicates a stalled build or something wrong with the build itself. Check the details on how to adjust your build configuration on: https://docs.travis-ci.com/user/common-build-problems/#Build-times-out-because-no-output-was-received The build has been terminated ``` @zentol is it something wrong on travis or our script? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10275) StreamTask support object reuse
[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] 陈梓立 updated FLINK-10275: Description: StreamTask support efficient object reuse. The purpose behind this is to reduce pressure on the garbage collector. All objects are reused, without backup copies. The operators and UDFs must be careful to not keep any objects as state or not to modify the objects. *FYI, now this thread is blocked by FLIP-21. The pull request attached is closed for a needed rework.* was: StreamTask support efficient object reuse. The purpose behind this is to reduce pressure on the garbage collector. All objects are reused, without backup copies. The operators and UDFs must be careful to not keep any objects as state or not to modify the objects. > StreamTask support object reuse > --- > > Key: FLINK-10275 > URL: https://issues.apache.org/jira/browse/FLINK-10275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > StreamTask support efficient object reuse. The purpose behind this is to > reduce pressure on the garbage collector. > All objects are reused, without backup copies. The operators and UDFs must be > careful to not keep any objects as state or not to modify the objects. > *FYI, now this thread is blocked by FLIP-21. The pull request attached is > closed for a needed rework.* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10275) StreamTask support object reuse
[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] 陈梓立 reassigned FLINK-10275: --- Assignee: (was: 陈梓立) > StreamTask support object reuse > --- > > Key: FLINK-10275 > URL: https://issues.apache.org/jira/browse/FLINK-10275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > StreamTask support efficient object reuse. The purpose behind this is to > reduce pressure on the garbage collector. > All objects are reused, without backup copies. The operators and UDFs must be > careful to not keep any objects as state or not to modify the objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10275) StreamTask support object reuse
[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613553#comment-16613553 ] ASF GitHub Bot commented on FLINK-10275: TisonKun commented on issue #6643: [FLINK-10275] StreamTask support object reuse URL: https://github.com/apache/flink/pull/6643#issuecomment-421020528 Although I don't think remain a stall pull request does harm, since this pull request should be updated nearly as a rework, I would close it. Thanks for your reminding @kl0u! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamTask support object reuse > --- > > Key: FLINK-10275 > URL: https://issues.apache.org/jira/browse/FLINK-10275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > StreamTask support efficient object reuse. The purpose behind this is to > reduce pressure on the garbage collector. > All objects are reused, without backup copies. The operators and UDFs must be > careful to not keep any objects as state or not to modify the objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10275) StreamTask support object reuse
[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613554#comment-16613554 ] ASF GitHub Bot commented on FLINK-10275: TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object reuse URL: https://github.com/apache/flink/pull/6643 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md index f0103b0f39f..3991ab59ac5 100644 --- a/docs/dev/execution_configuration.md +++ b/docs/dev/execution_configuration.md @@ -59,7 +59,7 @@ With the closure cleaner disabled, it might happen that an anonymous user functi - `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs. -- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior. +- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-defined function of an operation is not aware of this behavior. - **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 59fa803791a..d36fd296562 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -601,8 +601,8 @@ public boolean isForceAvroEnabled() { /** * Enables reusing objects that Flink internally uses for deserialization and passing -* data to user-code functions. Keep in mind that this can lead to bugs when the -* user-code function of an operation is not aware of this behaviour. +* data to user-defined functions. Keep in mind that this can lead to bugs when the +* user-defined function of an operation is not aware of this behaviour. */ public ExecutionConfig enableObjectReuse() { objectReuse = true; @@ -611,7 +611,7 @@ public ExecutionConfig enableObjectReuse() { /** * Disables reusing objects that Flink internally uses for deserialization and passing -* data to user-code functions. @see #enableObjectReuse() +* data to user-defined functions. @see #enableObjectReuse() */ public ExecutionConfig disableObjectReuse() { objectReuse = false; diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala index d8ba29ae478..f4185f4c129 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala @@ -51,7 +51,7 @@ class TableSinkITCase( val input = CollectionDataSets.get3TupleDataSet(env) .map(x => x).setParallelism(4) // increase DOP to 4 -val results = input.toTable(tEnv, 'a, 'b, 'c) +input.toTable(tEnv, 'a, 'b, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) .writeToSink(new CsvTableSink(path, fieldDelim = "|")) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 70e59f3d24d..95cb1df1b04 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -21,10 +21,12 @@ package org.apache.flink.table.runtime.stream.table import java.io.File import java.lang.{Boolean => JBool} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.MapFunction import
[GitHub] TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object reuse
TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object reuse URL: https://github.com/apache/flink/pull/6643 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md index f0103b0f39f..3991ab59ac5 100644 --- a/docs/dev/execution_configuration.md +++ b/docs/dev/execution_configuration.md @@ -59,7 +59,7 @@ With the closure cleaner disabled, it might happen that an anonymous user functi - `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs. -- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior. +- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-defined function of an operation is not aware of this behavior. - **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 59fa803791a..d36fd296562 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -601,8 +601,8 @@ public boolean isForceAvroEnabled() { /** * Enables reusing objects that Flink internally uses for deserialization and passing -* data to user-code functions. Keep in mind that this can lead to bugs when the -* user-code function of an operation is not aware of this behaviour. +* data to user-defined functions. Keep in mind that this can lead to bugs when the +* user-defined function of an operation is not aware of this behaviour. */ public ExecutionConfig enableObjectReuse() { objectReuse = true; @@ -611,7 +611,7 @@ public ExecutionConfig enableObjectReuse() { /** * Disables reusing objects that Flink internally uses for deserialization and passing -* data to user-code functions. @see #enableObjectReuse() +* data to user-defined functions. @see #enableObjectReuse() */ public ExecutionConfig disableObjectReuse() { objectReuse = false; diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala index d8ba29ae478..f4185f4c129 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala @@ -51,7 +51,7 @@ class TableSinkITCase( val input = CollectionDataSets.get3TupleDataSet(env) .map(x => x).setParallelism(4) // increase DOP to 4 -val results = input.toTable(tEnv, 'a, 'b, 'c) +input.toTable(tEnv, 'a, 'b, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) .writeToSink(new CsvTableSink(path, fieldDelim = "|")) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 70e59f3d24d..95cb1df1b04 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -21,10 +21,12 @@ package org.apache.flink.table.runtime.stream.table import java.io.File import java.lang.{Boolean => JBool} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import
[GitHub] TisonKun commented on issue #6643: [FLINK-10275] StreamTask support object reuse
TisonKun commented on issue #6643: [FLINK-10275] StreamTask support object reuse URL: https://github.com/apache/flink/pull/6643#issuecomment-421020528 Although I don't think remain a stall pull request does harm, since this pull request should be updated nearly as a rework, I would close it. Thanks for your reminding @kl0u! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10275) StreamTask support object reuse
[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613544#comment-16613544 ] ASF GitHub Bot commented on FLINK-10275: kl0u commented on issue #6643: [FLINK-10275] StreamTask support object reuse URL: https://github.com/apache/flink/pull/6643#issuecomment-421019224 Hi @TisonKun, In the above discussion, there is consensus that until the related FLIP is agreed upon, there is not going to be any activity on this PR. Could you close this PR so that we keep a clean backlog of PR's? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamTask support object reuse > --- > > Key: FLINK-10275 > URL: https://issues.apache.org/jira/browse/FLINK-10275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > StreamTask support efficient object reuse. The purpose behind this is to > reduce pressure on the garbage collector. > All objects are reused, without backup copies. The operators and UDFs must be > careful to not keep any objects as state or not to modify the objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10341) Add option to print flink command when running bin/flink
[ https://issues.apache.org/jira/browse/FLINK-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613543#comment-16613543 ] ASF GitHub Bot commented on FLINK-10341: TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink URL: https://github.com/apache/flink/pull/6690#issuecomment-421019008 I am not a bash expert and find @dawidwys, you are right. To be accurate, it would be `bash -x COMMAND 2>&1 | grep exec` or something, otherwise it prints scare text. From my side it is a frequent demand to see what the real command executed so I would approve this change. We could achieve it even just by hacking this script locally. So the problem is whether we provide such feature within the project and, well, the print format and the approach by setenv not optimized so it looks like no more than just `bash -x` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add option to print flink command when running bin/flink > > > Key: FLINK-10341 > URL: https://issues.apache.org/jira/browse/FLINK-10341 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > It would be very useful if I can see the final java command which is used to > run flink program, specially when I hit very weird issue as flink > contributor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #6643: [FLINK-10275] StreamTask support object reuse
kl0u commented on issue #6643: [FLINK-10275] StreamTask support object reuse URL: https://github.com/apache/flink/pull/6643#issuecomment-421019224 Hi @TisonKun, In the above discussion, there is consensus that until the related FLIP is agreed upon, there is not going to be any activity on this PR. Could you close this PR so that we keep a clean backlog of PR's? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink
TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink URL: https://github.com/apache/flink/pull/6690#issuecomment-421019008 I am not a bash expert and find @dawidwys, you are right. To be accurate, it would be `bash -x COMMAND 2>&1 | grep exec` or something, otherwise it prints scare text. From my side it is a frequent demand to see what the real command executed so I would approve this change. We could achieve it even just by hacking this script locally. So the problem is whether we provide such feature within the project and, well, the print format and the approach by setenv not optimized so it looks like no more than just `bash -x` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613537#comment-16613537 ] ASF GitHub Bot commented on FLINK-10327: kl0u commented on issue #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction URL: https://github.com/apache/flink/pull/6687#issuecomment-421017411 Hi @pnowojski ! I can understand that this can be an interesting addition for some usecases, but it is a big one, and it should be discussed more thoroughly and, most importantly, more publicly. I would be against merging it as just a sub-commit of another feature. The reason is that this allows users to "play" with watermarks from the level of a `Function` and not `Operator`, which was, intentionally, the case so far. If you want to "hold back" the watermark, the this should be done by a watermark assigner. If you want to run a "callback" upon watermark, then so far the trick is to register a timer for `watermark + 1`. I can find usecases which do not fall into any of the above, but for those so far we implement custom operators. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction
kl0u commented on issue #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction URL: https://github.com/apache/flink/pull/6687#issuecomment-421017411 Hi @pnowojski ! I can understand that this can be an interesting addition for some usecases, but it is a big one, and it should be discussed more thoroughly and, most importantly, more publicly. I would be against merging it as just a sub-commit of another feature. The reason is that this allows users to "play" with watermarks from the level of a `Function` and not `Operator`, which was, intentionally, the case so far. If you want to "hold back" the watermark, the this should be done by a watermark assigner. If you want to run a "callback" upon watermark, then so far the trick is to register a timer for `watermark + 1`. I can find usecases which do not fall into any of the above, but for those so far we implement custom operators. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10329) Fail with exception if job cannot be removed by ZooKeeperSubmittedJobGraphStore#removeJobGraph
[ https://issues.apache.org/jira/browse/FLINK-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613534#comment-16613534 ] ASF GitHub Bot commented on FLINK-10329: azagrebin commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6686#discussion_r217392484 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); + runAsyncWithoutFencing( + () -> { + log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); - final CompletableFuture> recoveredJobsFuture = recoverJobs(); + final CompletableFuture> recoveredJobsFuture = recoveryOperation.thenComposeAsync( + ignored -> recoverJobs(), Review comment: `recoverJobs` could be now blocking if we use `recoveryOperation.thenApplyAsync` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fail with exception if job cannot be removed by > ZooKeeperSubmittedJobGraphStore#removeJobGraph > -- > > Key: FLINK-10329 > URL: https://issues.apache.org/jira/browse/FLINK-10329 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Callers of {{ZooKeeperSubmittedJobGraph#removeJobGraph}} expect that we fail > with an exception if the {{JobGraph}} cannot be removed. This is not the case > since we call internally {{ZooKeeperStateHandleStore#releaseAndTryRemove}}. > If this method returns {{false}}, then we need to fail with an exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stoppi
azagrebin commented on a change in pull request #6686: [FLINK-10329] [FLINK-10328] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if job cannot be removed & Release all locks when stopping the ZooKeeperSubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6686#discussion_r217392484 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -768,27 +788,40 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); + runAsyncWithoutFencing( + () -> { + log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID); - final CompletableFuture> recoveredJobsFuture = recoverJobs(); + final CompletableFuture> recoveredJobsFuture = recoveryOperation.thenComposeAsync( + ignored -> recoverJobs(), Review comment: `recoverJobs` could be now blocking if we use `recoveryOperation.thenApplyAsync` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10223) TaskManagers should log their ResourceID during startup
[ https://issues.apache.org/jira/browse/FLINK-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-10223: Assignee: Gary Yao (was: aitozi) > TaskManagers should log their ResourceID during startup > --- > > Key: FLINK-10223 > URL: https://issues.apache.org/jira/browse/FLINK-10223 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.1, 1.7.0 >Reporter: Konstantin Knauf >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > To debug exceptions like "org.apache.flink.util.FlinkException: The assigned > slot was removed." in the master container it is often helpful to > know, which slot was provided by which Taskmanager. The only way to relate > slots to TaskManagers right now, seems to be to enable DEBUG logging for > `org.apache.flink.runtime.jobmaster.slotpool.SlotPool`. > This would be solved, if each Taskmanager would log out their `ResouceID` > during startup as the `SlotID` mainly consists of the `ResourceID` of the > providing Taskmanager. For Mesos and YARN the `ResourceID` has an intrinsic > meaning, but for a stand-alone or containerized setup the `ResourceID` is > just the a random ID. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers
[ https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613528#comment-16613528 ] ASF GitHub Bot commented on FLINK-8354: --- alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-421014612 @tzulitai, using ```Record``` wrapping Kafka ```ConsumerRecord``` allows to add for example timestamp from PR #6105 w/o need to change client code, so it looks like more extensible approach. Not sure how it makes hard to reuse some already existing deserialization formats such as ```AvroDeserializationSchema```, at least not harder then now - ```AvroDeserializationSchema``` will be wrapped via ```KeyedDeserializationSchemaWrapper``` in exactly same way as know. Also ```KeyedDeserializationSchemaWrapper``` calls only ```Record.value()```, so it doesn't ties deserialization of byte with access to other metadata, not in terms of execution path (in logical terms it is always tied because underlying level - Kafka ```ConsumerRecord``` contains key, value and metadata) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Kafka connector ignores Kafka message headers > - > > Key: FLINK-8354 > URL: https://issues.apache.org/jira/browse/FLINK-8354 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Environment: Kafka 0.11.0.0 > Flink 1.4.0 > flink-connector-kafka-0.11_2.11 >Reporter: Mohammad Abareghi >Assignee: Aegeaner >Priority: Major > Labels: pull-request-available > > Kafka has introduced notion of Header for messages in version 0.11.0.0 > https://issues.apache.org/jira/browse/KAFKA-4208. > But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores > headers when consuming kafka messages. > It would be useful in some scenarios, such as distributed log tracing, to > support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-421014612 @tzulitai, using ```Record``` wrapping Kafka ```ConsumerRecord``` allows to add for example timestamp from PR #6105 w/o need to change client code, so it looks like more extensible approach. Not sure how it makes hard to reuse some already existing deserialization formats such as ```AvroDeserializationSchema```, at least not harder then now - ```AvroDeserializationSchema``` will be wrapped via ```KeyedDeserializationSchemaWrapper``` in exactly same way as know. Also ```KeyedDeserializationSchemaWrapper``` calls only ```Record.value()```, so it doesn't ties deserialization of byte with access to other metadata, not in terms of execution path (in logical terms it is always tied because underlying level - Kafka ```ConsumerRecord``` contains key, value and metadata) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10341) Add option to print flink command when running bin/flink
[ https://issues.apache.org/jira/browse/FLINK-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613521#comment-16613521 ] ASF GitHub Bot commented on FLINK-10341: dawidwys commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink URL: https://github.com/apache/flink/pull/6690#issuecomment-421012461 I am sorry, but I don't think this is a valuable addition and I doubt it will be merged. For debugging scripts you can always invoke the script with `-x` flag. E.g. like this: `bash -x flink run ...` and you will get the command as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add option to print flink command when running bin/flink > > > Key: FLINK-10341 > URL: https://issues.apache.org/jira/browse/FLINK-10341 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > It would be very useful if I can see the final java command which is used to > run flink program, specially when I hit very weird issue as flink > contributor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink
dawidwys commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink URL: https://github.com/apache/flink/pull/6690#issuecomment-421012461 I am sorry, but I don't think this is a valuable addition and I doubt it will be merged. For debugging scripts you can always invoke the script with `-x` flag. E.g. like this: `bash -x flink run ...` and you will get the command as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-10331: Description: With the re-design of the record writer interaction with the result(sub)partitions, flush requests can currently pile up in these scenarios: - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification These lead to increased notifications in low latency settings (low output flusher intervals) which can be avoided. was: With the re-design of the record writer interaction with the result(sub)partitions, flush requests can currently pile up in these scenarios_ - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification These lead to increased notifications in low latency settings (low output flusher intervals) which can be avoided. > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske edited a comment on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
fhueske edited a comment on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#issuecomment-421010897 Please also rebase your PR on the current master. Thank you, Fabian This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
fhueske commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#issuecomment-421010897 Please also rebase your PR on the current master. Thank you, Fabina This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10184) HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel
[ https://issues.apache.org/jira/browse/FLINK-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613513#comment-16613513 ] Till Rohrmann edited comment on FLINK-10184 at 9/13/18 1:40 PM: [~Jamalarm] yes, you would need to checkout my branch and then build Flink yourself. The binaries are then located in flink-dist/target/flink1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT. I don't think that you need to rebuild your job because the branch should not contain any API changes. I'm about to merge the PR so that there should be a snapshot build with the fix very soon (hopefully by tomorrow morning). Thanks a lot for your help [~wcummings]! was (Author: till.rohrmann): [~Jamalarm] yes, you would need to checkout my branch and then build Flink yourself. The binaries are then located in flink-dist/target/flink1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT. I don't think that you need to rebuild your job because the branch should not contain any API changes. I'm about to merge the PR so that there should be a snapshot build with the fix very soon (hopefully by tomorrow morning). > HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel > -- > > Key: FLINK-10184 > URL: https://issues.apache.org/jira/browse/FLINK-10184 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.2, 1.6.0 >Reporter: Thomas Wozniakowski >Priority: Blocker > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > We have encountered a blocking issue when upgrading our cluster to 1.5.2. > It appears that, when jobs are cancelled manually (in our case with a > savepoint), the JobGraphs are NOT removed from the Zookeeper {{jobgraphs}} > node. > This means that, if you start a job, cancel it, restart it, cancel it, etc. > You will end up with many job graphs stored in zookeeper, but none of the > corresponding blobs in the Flink HA directory. > When a HA failover occurs, the newly elected leader retrieves all of those > old JobGraph objects from Zookeeper, then goes looking for the corresponding > blobs in the HA directory. The blobs are not there so the JobManager explodes > and the process dies. > At this point the cluster has to be fully stopped, the zookeeper jobgraphs > cleared out by hand, and all the jobmanagers restarted. > I can see the following line in the JobManager logs: > {quote} > 2018-08-20 16:17:20,776 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - > Removed job graph 4e9a5a9d70ca99dbd394c35f8dfeda65 from ZooKeeper. > {quote} > But looking in Zookeeper the {{4e9a5a9d70ca99dbd394c35f8dfeda65}} job is > still very much there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10184) HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel
[ https://issues.apache.org/jira/browse/FLINK-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613513#comment-16613513 ] Till Rohrmann commented on FLINK-10184: --- [~Jamalarm] yes, you would need to checkout my branch and then build Flink yourself. The binaries are then located in flink-dist/target/flink1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT. I don't think that you need to rebuild your job because the branch should not contain any API changes. I'm about to merge the PR so that there should be a snapshot build with the fix very soon (hopefully by tomorrow morning). > HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel > -- > > Key: FLINK-10184 > URL: https://issues.apache.org/jira/browse/FLINK-10184 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.2, 1.6.0 >Reporter: Thomas Wozniakowski >Priority: Blocker > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > We have encountered a blocking issue when upgrading our cluster to 1.5.2. > It appears that, when jobs are cancelled manually (in our case with a > savepoint), the JobGraphs are NOT removed from the Zookeeper {{jobgraphs}} > node. > This means that, if you start a job, cancel it, restart it, cancel it, etc. > You will end up with many job graphs stored in zookeeper, but none of the > corresponding blobs in the Flink HA directory. > When a HA failover occurs, the newly elected leader retrieves all of those > old JobGraph objects from Zookeeper, then goes looking for the corresponding > blobs in the HA directory. The blobs are not there so the JobManager explodes > and the process dies. > At this point the cluster has to be fully stopped, the zookeeper jobgraphs > cleared out by hand, and all the jobmanagers restarted. > I can see the following line in the JobManager logs: > {quote} > 2018-08-20 16:17:20,776 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - > Removed job graph 4e9a5a9d70ca99dbd394c35f8dfeda65 from ZooKeeper. > {quote} > But looking in Zookeeper the {{4e9a5a9d70ca99dbd394c35f8dfeda65}} job is > still very much there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
fhueske commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#discussion_r217384955 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -750,7 +751,28 @@ abstract class TableEnvironment(val config: TableConfig) { if (null == sinkTableName) throw TableException("Name of TableSink must not be null.") if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.") if (!isRegistered(sinkTableName)) { - throw TableException(s"No table was registered under the name $sinkTableName.") + // try resolving and registering sink table from registered external catalogs + try { +val paths = sinkTableName.split("\\.") +if (paths.length > 1) { + var externalCatalog = getRegisteredExternalCatalog(paths(0)) + for (i <- 1 to (paths.length - 2)) { +externalCatalog = externalCatalog.getSubCatalog(paths(i)) + } + val externalTable = externalCatalog.getTable(paths(paths.length - 1)) + if (externalTable.isTableSink) { +registerTableSink(sinkTableName, Review comment: We should not register the sink table again in the root catalog. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10341) Add option to print flink command when running bin/flink
[ https://issues.apache.org/jira/browse/FLINK-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613504#comment-16613504 ] ASF GitHub Bot commented on FLINK-10341: TisonKun commented on issue #6690: [FLINK-10341][shell script] Add option to print flink command when running bin/flink URL: https://github.com/apache/flink/pull/6690#issuecomment-421008894 +1 for the proposal. It is a net win and there are many command-line interface supporting this feature. One thing remains would be how we display such message, I don't think `FLINK_COMMAND:...` is a beautiful format, also the env var `PRINT_FLINK_COMMAND` not very expressive. cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add option to print flink command when running bin/flink > > > Key: FLINK-10341 > URL: https://issues.apache.org/jira/browse/FLINK-10341 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > Labels: pull-request-available > > It would be very useful if I can see the final java command which is used to > run flink program, specially when I hit very weird issue as flink > contributor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)