回复:[DISCUSS] Improve broadcast serialization

2018-07-19 Thread Zhijiang(wangzhijiang999)
Ok, that is fine. :)

I will create JIRA today and submit the PR next week.

Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年7月19日(星期四) 17:52
收件人:Zhijiang(wangzhijiang999) 
抄 送:Nico Kruber ; dev 
主 题:Re: [DISCUSS] Improve broadcast serialization

Hi,

I have only noticed your second response after sending my email :) 

Ok, now I think we are on the same page :) I think you can work on 2.1 and 
later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira 
issues/PRs please CC me.

Piotrek  

On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) 
 wrote:
Hi Piotr

1. I agree with we should discuss higher level first and focus on 
implementation on jira/pr. As long as RecordSerializer does not maintain the 
BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the 
RecordWriter at any time.  And I think it is the precondition to improve 
serializing only once for multi channels, otherwise we have to select 
serializer based on target channel index.

2. I already corrected this thought in last reply, maybe you have not seen it 
before you reply. :)  
We can break the broadcast improvement into two steps:
2.1 Serialize the record into temporary byte buffer only once for multi 
selected channels. (currently serialize many times)
2.2 Copy the temporary byte buffer into BufferBuilder only once and create 
different BufferConsumers based on the same BufferBuilder for each channel. 
(currently copy many times)
Regarding 2.1, just the same as your proposal[c], it is worth to do currently 
and can get good benefits I think.
Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to 
flush/finish last broadcast BufferBuilder for current non-broadcast writes and 
vice versa. I agree with your proposal[2] for this issue, and we can further 
consider it in future, maybe there are other better ways for avoiding it.

4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your 
proposal[c] which has no problem for mixed write mode, so no need additional 
flush. The 2.2 is just as your proposal[2] which concerns additional flush. 
Maybe my last reply make you misunderstand.

I can submit jira for above 2.1 first if no other concerns, thanks for the 
helpful advice. :)

Best,

Zhijiang
--
发件人:Piotr Nowojski 
发送时间:2018年7月18日(星期三) 20:04
收件人:Zhijiang(wangzhijiang999) ; Nico Kruber 

抄 送:dev 
主 题:Re: [DISCUSS] Improve broadcast serialization


Hi 

1. I want to define a new AbstractRecordWriter as base class which defines some 
abstract methods and utility codes. The current RecordWriter used for other 
partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner 
will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are 
extactly as you showed below, but for current RecordWriter it also only needs 
one RecordSerializer if we make the RecordSerializer has no internal state.

Lets first discuss what we would like to have/implement on higher level and 
later focus on implementation details. Regarding making RecordSerializer 
stateless, there were some discussions about it previously and it was on our 
TODO list but I don’t remember what was holding us back. Maybe Nico will 
remember?


2. You pointed the key problem that how to handle `randomEmit` in 
BroadcastRecordWriter, and I think this process may resue the `emit` logic in 
current RecordWriter. Then the `emit` and `broadcastEmit` logics in 
BroadcastRecordWriter will serialize data only once and copy to BufferBuilder 
only once. So this improvement is deterministic for BroadcastPartitioner.


What logic to reuse do you have in mind? 
4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast 
partitioner, we can also do as you suggested in option [2], but it has to 
finish/flush the previous BufferBuilder generated by common `emit` operation. 
So it may bring bad impacts on buffer utility which was improved well in 
event-driven flush feature. So I am not sure whether it is worth doing 
`broadcastEmit` improvement in RecordWriter.


The whole point of my proposal [c] was to avoid the need to flush. Code would 
need a little bit more refactoring but it should look something like this:

void broadcastEmit(record):
 serializedRecord = serializer.serialize(record)
 for bufferBuilder in bufferBuilders:
 bufferBuilder.append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue 
writing

void emit(record, channel)
 serializedRecord = serializer.serialize(record)
 bufferBuilders[channel].append(serializedRecord)
 // if we overfilled bufferBuilder, finish it, request new one and continue 
writing

I do not see here a need for additional flushes and it should be strict 
improvement over current code base.


I already realized the demo covering above 1,2,5 before. I can create jiras 
after we reach a final 

[jira] [Created] (FLINK-9901) Refactor InputStreamReader to Channels.newReader

2018-07-19 Thread zhangminglei (JIRA)
zhangminglei created FLINK-9901:
---

 Summary: Refactor InputStreamReader to Channels.newReader
 Key: FLINK-9901
 URL: https://issues.apache.org/jira/browse/FLINK-9901
 Project: Flink
  Issue Type: Sub-task
Reporter: zhangminglei






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9900) Failed to testRestoreBehaviourWithFaultyStateHandles (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)

2018-07-19 Thread zhangminglei (JIRA)
zhangminglei created FLINK-9900:
---

 Summary: Failed to testRestoreBehaviourWithFaultyStateHandles 
(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
 Key: FLINK-9900
 URL: https://issues.apache.org/jira/browse/FLINK-9900
 Project: Flink
  Issue Type: Bug
Reporter: zhangminglei


Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec <<< 
FAILURE! - in 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
  Time elapsed: 120.036 sec  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 12 
milliseconds
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)


Results :

Tests in error: 
  
ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244 
» TestTimedOut

Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-19 Thread Lakshmi Rao (JIRA)
Lakshmi Rao created FLINK-9899:
--

 Summary: Add more metrics to the Kinesis source connector
 Key: FLINK-9899
 URL: https://issues.apache.org/jira/browse/FLINK-9899
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Lakshmi Rao


Currently there are sparse metrics available for the Kinesis Connector. Using 
the 
[ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
 add more stats. For example:

- sleepTimeMillis 
- maxNumberOfRecordsPerFetch
- numberOfAggregatedRecordsPerFetch
- numberOfDeaggregatedRecordsPerFetch
- bytesPerFetch
- averageRecordSizeBytes
- runLoopTimeNanos
- loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9898) Prometheus metrics reporter doesn't respect `metrics.scope`

2018-07-19 Thread Prithvi Raj (JIRA)
Prithvi Raj created FLINK-9898:
--

 Summary: Prometheus metrics reporter doesn't respect 
`metrics.scope`
 Key: FLINK-9898
 URL: https://issues.apache.org/jira/browse/FLINK-9898
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.1, 1.4.2, 1.4.1, 1.5.0, 1.4.0
Reporter: Prithvi Raj


The Apache Flink 
[documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#system-scope]
 details that users may change the default scope of metrics emitted by using a 
scope format. 

Changing the scope format allows end users to store metrics with lower 
cardinality while introducing the drawback of being unable to differentiate 
between metrics from different tasks/operators/etc sharing the same name. 

With the Prometheus reporter, regardless of the scope format used, every 
variable is always emitted. 

Would it be reasonable for the Prometheus reporter to respect the scope format 
and only emit dimensions that are in scope?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9897) Further enhance adaptiveReads in Kinesis Connector to read more records in the case of long running loops

2018-07-19 Thread Lakshmi Rao (JIRA)
Lakshmi Rao created FLINK-9897:
--

 Summary: Further enhance adaptiveReads in Kinesis Connector to 
read more records in the case of long running loops
 Key: FLINK-9897
 URL: https://issues.apache.org/jira/browse/FLINK-9897
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Lakshmi Rao


In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
read more records based on the current average record size to optimize the 2 
Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
reads/sec (as prescribed by Kinesis limits). In the case where applications 
take more time to process records in the run loop, they are no longer able to 
read at a frequency of 5 reads/sec (even though their fetchIntervalMillis maybe 
set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch should be 
calculated based on the time that the run loop actually takes as opposed to 
fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9896) Fix flink documentation error

2018-07-19 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-9896:
--

 Summary: Fix flink documentation error
 Key: FLINK-9896
 URL: https://issues.apache.org/jira/browse/FLINK-9896
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Hequn Cheng
 Attachments: image-2018-07-19-23-19-32-259.png

Flink version of master has been upgraded to 1.7 snapshot, but documentation 
still point to 1.6
 !image-2018-07-19-23-19-32-259.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9895) Ensure correct logging settings for NettyLeakDetectionResource

2018-07-19 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9895:
---

 Summary: Ensure correct logging settings for 
NettyLeakDetectionResource
 Key: FLINK-9895
 URL: https://issues.apache.org/jira/browse/FLINK-9895
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.6.0


The {{NettyLeakDetectionResource}} only works properly if ERROR logging is 
enabled for nettys {{ResourceLeakDetector}}. We should add an assertion to the 
resource constructor to ensure this is actually set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9894) Potential Data Race

2018-07-19 Thread JIRA
陈梓立 created FLINK-9894:
--

 Summary: Potential Data Race
 Key: FLINK-9894
 URL: https://issues.apache.org/jira/browse/FLINK-9894
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立


CoLocationGroup#ensureConstraints



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9893) Cannot run Flink job in IDE when we have more than 1 taskslot

2018-07-19 Thread Aleksandr Filichkin (JIRA)
Aleksandr Filichkin created FLINK-9893:
--

 Summary: Cannot run Flink job in IDE when we have more than 1 
taskslot
 Key: FLINK-9893
 URL: https://issues.apache.org/jira/browse/FLINK-9893
 Project: Flink
  Issue Type: Wish
  Components: Streaming
Affects Versions: 1.5.1
Reporter: Aleksandr Filichkin


The problem is I cannot run it in IDE when I have more than 1 taskslot in my 
job.

public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "test");
env.setParallelism(1);

DataStream kafkaSource = env.addSource(new 
FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), 
kafkaProperties)).name("Kafka-Source").slotSharingGroup("Kafka-Source");

kafkaSource.print().slotSharingGroup("Print");

env.execute("Flink Streaming Java API Skeleton");
}
}

I know that job need 2 slot for this job and I can have two taskmanagers in 
Flink cluster, but how can I run it locally in IDE.

Currently I have to specify the same slotSharingGroup name for all operator 
locally to have one slot. But it's not flexible.

How do you handle it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9892) Disable local recovery in Jepsen tests

2018-07-19 Thread Gary Yao (JIRA)
Gary Yao created FLINK-9892:
---

 Summary: Disable local recovery in Jepsen tests
 Key: FLINK-9892
 URL: https://issues.apache.org/jira/browse/FLINK-9892
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Gary Yao
 Fix For: 1.6.0


Due to FLINK-9635, local recovery should be disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)