回复:[DISCUSS] Improve broadcast serialization
Ok, that is fine. :) I will create JIRA today and submit the PR next week. Zhijiang -- 发件人:Piotr Nowojski 发送时间:2018年7月19日(星期四) 17:52 收件人:Zhijiang(wangzhijiang999) 抄 送:Nico Kruber ; dev 主 题:Re: [DISCUSS] Improve broadcast serialization Hi, I have only noticed your second response after sending my email :) Ok, now I think we are on the same page :) I think you can work on 2.1 and later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira issues/PRs please CC me. Piotrek On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) wrote: Hi Piotr 1. I agree with we should discuss higher level first and focus on implementation on jira/pr. As long as RecordSerializer does not maintain the BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the RecordWriter at any time. And I think it is the precondition to improve serializing only once for multi channels, otherwise we have to select serializer based on target channel index. 2. I already corrected this thought in last reply, maybe you have not seen it before you reply. :) We can break the broadcast improvement into two steps: 2.1 Serialize the record into temporary byte buffer only once for multi selected channels. (currently serialize many times) 2.2 Copy the temporary byte buffer into BufferBuilder only once and create different BufferConsumers based on the same BufferBuilder for each channel. (currently copy many times) Regarding 2.1, just the same as your proposal[c], it is worth to do currently and can get good benefits I think. Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to flush/finish last broadcast BufferBuilder for current non-broadcast writes and vice versa. I agree with your proposal[2] for this issue, and we can further consider it in future, maybe there are other better ways for avoiding it. 4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your proposal[c] which has no problem for mixed write mode, so no need additional flush. The 2.2 is just as your proposal[2] which concerns additional flush. Maybe my last reply make you misunderstand. I can submit jira for above 2.1 first if no other concerns, thanks for the helpful advice. :) Best, Zhijiang -- 发件人:Piotr Nowojski 发送时间:2018年7月18日(星期三) 20:04 收件人:Zhijiang(wangzhijiang999) ; Nico Kruber 抄 送:dev 主 题:Re: [DISCUSS] Improve broadcast serialization Hi 1. I want to define a new AbstractRecordWriter as base class which defines some abstract methods and utility codes. The current RecordWriter used for other partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are extactly as you showed below, but for current RecordWriter it also only needs one RecordSerializer if we make the RecordSerializer has no internal state. Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember? 2. You pointed the key problem that how to handle `randomEmit` in BroadcastRecordWriter, and I think this process may resue the `emit` logic in current RecordWriter. Then the `emit` and `broadcastEmit` logics in BroadcastRecordWriter will serialize data only once and copy to BufferBuilder only once. So this improvement is deterministic for BroadcastPartitioner. What logic to reuse do you have in mind? 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast partitioner, we can also do as you suggested in option [2], but it has to finish/flush the previous BufferBuilder generated by common `emit` operation. So it may bring bad impacts on buffer utility which was improved well in event-driven flush feature. So I am not sure whether it is worth doing `broadcastEmit` improvement in RecordWriter. The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this: void broadcastEmit(record): serializedRecord = serializer.serialize(record) for bufferBuilder in bufferBuilders: bufferBuilder.append(serializedRecord) // if we overfilled bufferBuilder, finish it, request new one and continue writing void emit(record, channel) serializedRecord = serializer.serialize(record) bufferBuilders[channel].append(serializedRecord) // if we overfilled bufferBuilder, finish it, request new one and continue writing I do not see here a need for additional flushes and it should be strict improvement over current code base. I already realized the demo covering above 1,2,5 before. I can create jiras after we reach a final
[jira] [Created] (FLINK-9901) Refactor InputStreamReader to Channels.newReader
zhangminglei created FLINK-9901: --- Summary: Refactor InputStreamReader to Channels.newReader Key: FLINK-9901 URL: https://issues.apache.org/jira/browse/FLINK-9901 Project: Flink Issue Type: Sub-task Reporter: zhangminglei -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
zhangminglei created FLINK-9900: --- Summary: Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) Key: FLINK-9900 URL: https://issues.apache.org/jira/browse/FLINK-9900 Project: Flink Issue Type: Bug Reporter: zhangminglei Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) Time elapsed: 120.036 sec <<< ERROR! org.junit.runners.model.TestTimedOutException: test timed out after 12 milliseconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244) Results : Tests in error: ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 » TestTimedOut Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9899) Add more metrics to the Kinesis source connector
Lakshmi Rao created FLINK-9899: -- Summary: Add more metrics to the Kinesis source connector Key: FLINK-9899 URL: https://issues.apache.org/jira/browse/FLINK-9899 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Lakshmi Rao Currently there are sparse metrics available for the Kinesis Connector. Using the [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java] add more stats. For example: - sleepTimeMillis - maxNumberOfRecordsPerFetch - numberOfAggregatedRecordsPerFetch - numberOfDeaggregatedRecordsPerFetch - bytesPerFetch - averageRecordSizeBytes - runLoopTimeNanos - loopFrequencyHz -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9898) Prometheus metrics reporter doesn't respect `metrics.scope`
Prithvi Raj created FLINK-9898: -- Summary: Prometheus metrics reporter doesn't respect `metrics.scope` Key: FLINK-9898 URL: https://issues.apache.org/jira/browse/FLINK-9898 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.1, 1.4.2, 1.4.1, 1.5.0, 1.4.0 Reporter: Prithvi Raj The Apache Flink [documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#system-scope] details that users may change the default scope of metrics emitted by using a scope format. Changing the scope format allows end users to store metrics with lower cardinality while introducing the drawback of being unable to differentiate between metrics from different tasks/operators/etc sharing the same name. With the Prometheus reporter, regardless of the scope format used, every variable is always emitted. Would it be reasonable for the Prometheus reporter to respect the scope format and only emit dimensions that are in scope? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9897) Further enhance adaptiveReads in Kinesis Connector to read more records in the case of long running loops
Lakshmi Rao created FLINK-9897: -- Summary: Further enhance adaptiveReads in Kinesis Connector to read more records in the case of long running loops Key: FLINK-9897 URL: https://issues.apache.org/jira/browse/FLINK-9897 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Lakshmi Rao In FLINK-9692, we introduced the ability for the shardConsumer to adaptively read more records based on the current average record size to optimize the 2 Mb/sec shard limit. The feature maximizes maxNumberOfRecordsPerFetch of 5 reads/sec (as prescribed by Kinesis limits). In the case where applications take more time to process records in the run loop, they are no longer able to read at a frequency of 5 reads/sec (even though their fetchIntervalMillis maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch should be calculated based on the time that the run loop actually takes as opposed to fetchIntervalMillis. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9896) Fix flink documentation error
Hequn Cheng created FLINK-9896: -- Summary: Fix flink documentation error Key: FLINK-9896 URL: https://issues.apache.org/jira/browse/FLINK-9896 Project: Flink Issue Type: Bug Components: Documentation Reporter: Hequn Cheng Attachments: image-2018-07-19-23-19-32-259.png Flink version of master has been upgraded to 1.7 snapshot, but documentation still point to 1.6 !image-2018-07-19-23-19-32-259.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9895) Ensure correct logging settings for NettyLeakDetectionResource
Chesnay Schepler created FLINK-9895: --- Summary: Ensure correct logging settings for NettyLeakDetectionResource Key: FLINK-9895 URL: https://issues.apache.org/jira/browse/FLINK-9895 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.6.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.6.0 The {{NettyLeakDetectionResource}} only works properly if ERROR logging is enabled for nettys {{ResourceLeakDetector}}. We should add an assertion to the resource constructor to ensure this is actually set. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9894) Potential Data Race
陈梓立 created FLINK-9894: -- Summary: Potential Data Race Key: FLINK-9894 URL: https://issues.apache.org/jira/browse/FLINK-9894 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.5.1 Reporter: 陈梓立 Assignee: 陈梓立 CoLocationGroup#ensureConstraints -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9893) Cannot run Flink job in IDE when we have more than 1 taskslot
Aleksandr Filichkin created FLINK-9893: -- Summary: Cannot run Flink job in IDE when we have more than 1 taskslot Key: FLINK-9893 URL: https://issues.apache.org/jira/browse/FLINK-9893 Project: Flink Issue Type: Wish Components: Streaming Affects Versions: 1.5.1 Reporter: Aleksandr Filichkin The problem is I cannot run it in IDE when I have more than 1 taskslot in my job. public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "localhost:9092"); kafkaProperties.setProperty("group.id", "test"); env.setParallelism(1); DataStream kafkaSource = env.addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties)).name("Kafka-Source").slotSharingGroup("Kafka-Source"); kafkaSource.print().slotSharingGroup("Print"); env.execute("Flink Streaming Java API Skeleton"); } } I know that job need 2 slot for this job and I can have two taskmanagers in Flink cluster, but how can I run it locally in IDE. Currently I have to specify the same slotSharingGroup name for all operator locally to have one slot. But it's not flexible. How do you handle it? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9892) Disable local recovery in Jepsen tests
Gary Yao created FLINK-9892: --- Summary: Disable local recovery in Jepsen tests Key: FLINK-9892 URL: https://issues.apache.org/jira/browse/FLINK-9892 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.6.0 Reporter: Gary Yao Fix For: 1.6.0 Due to FLINK-9635, local recovery should be disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)