[GitHub] [flink] JingsongLi commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
JingsongLi commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264530610
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
 ##
 @@ -77,6 +77,7 @@
try {
compiler.cook(code);
} catch (Throwable t) {
+   System.out.println(addLineNumber(code));
 
 Review comment:
   The reason for not using LOG is that System.out.print cleaner (no prefix).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
JingsongLi commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264530025
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
 ##
 @@ -141,6 +141,16 @@ public void writeRow(int pos, BaseRow input, 
BaseRowSerializer serializer) {
}
}
 
+   @Override
+   public void writeBinary(int pos, byte[] bytes) {
+   int len = bytes.length;
+   if (len <= 7) {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
JingsongLi commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264529901
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
 ##
 @@ -79,4 +79,16 @@ public void materialize() {
int offset = (int) (offsetAndSize >> 32);
return new BinaryGeneric<>(segments, offset + baseOffset, size, 
null);
}
+
+   public static  T getJavaObjectFromBinaryGeneric(BinaryGeneric 
value, TypeSerializer ser) {
+   if (value.getJavaObject() == null) {
+   try {
+   
value.setJavaObject(InstantiationUtil.deserializeFromByteArray(ser,
+   
SegmentsUtil.copyToBytes(value.getSegments(), value.getOffset(), 
value.getSizeInBytes(;
+   } catch (IOException e) {
+   throw new RuntimeException(e);
 
 Review comment:
   To avoid external code paths need capture IOException.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
JingsongLi commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264529475
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
 ##
 @@ -256,6 +256,14 @@ public Decimal getDecimal(int pos, int precision, int 
scale) {
return 
BinaryGeneric.readBinaryGenericFieldFromSegments(segments, offset, 
getLong(pos));
}
 
+   @Override
+   public byte[] getBinary(int pos) {
+   assertIndexIsValid(pos);
+   int fieldOffset = getFieldOffset(pos);
+   final long offsetAndLen = segments[0].getLong(fieldOffset);
 
 Review comment:
   It will not be empty unless there is a bug.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
JingsongLi commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264528994
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
 ##
 @@ -49,7 +49,9 @@ protected GeneratedClass(String className, String code, 
Object[] references) {
@SuppressWarnings("unchecked")
public T newInstance(ClassLoader classLoader) {
try {
-   return (T) 
compile(classLoader).getConstructor(Object[].class).newInstance(references);
+   return (T) 
compile(classLoader).getConstructor(Object[].class)
+   // newInstance(Object ... initargs)
 
 Review comment:
   Because Constructor.newInstance(Object... initargs), we need to load 
references into a new Object[], otherwise it cannot be compiled.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
JingsongLi commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264528357
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
 ##
 @@ -87,4 +88,13 @@
throw new RuntimeException("Can not load class " + 
name, e);
}
}
+
+   private static String addLineNumber(String code) {
 
 Review comment:
   To output more information when an error occurs.
   Generally, when cook fails, it shows which line is wrong. This line number 
starts at 1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
klion26 commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264520796
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
 ##
 @@ -87,4 +88,13 @@
throw new RuntimeException("Can not load class " + 
name, e);
}
}
+
+   private static String addLineNumber(String code) {
+   String[] lines = code.split("\n");
+   StringBuilder builder = new StringBuilder();
+   for (int i = 0; i < lines.length; i++) {
 
 Review comment:
   do we need to start from 1 instead of 0?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
klion26 commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264519742
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
 ##
 @@ -79,4 +79,16 @@ public void materialize() {
int offset = (int) (offsetAndSize >> 32);
return new BinaryGeneric<>(segments, offset + baseOffset, size, 
null);
}
+
+   public static  T getJavaObjectFromBinaryGeneric(BinaryGeneric 
value, TypeSerializer ser) {
+   if (value.getJavaObject() == null) {
+   try {
+   
value.setJavaObject(InstantiationUtil.deserializeFromByteArray(ser,
+   
SegmentsUtil.copyToBytes(value.getSegments(), value.getOffset(), 
value.getSizeInBytes(;
+   } catch (IOException e) {
+   throw new RuntimeException(e);
 
 Review comment:
   Why not throw IOException directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION

2019-03-11 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790236#comment-16790236
 ] 

Shengnan YU commented on FLINK-11848:
-

It seems to be a kafka client problem. The kafka consumer don't refresh 
metadata when deleting the topic.

> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> 
>
> Key: FLINK-11848
> URL: https://issues.apache.org/jira/browse/FLINK-11848
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4
>Reporter: Shengnan YU
>Assignee: frank wang
>Priority: Major
>
> Recently we are doing some streaming jobs with apache flink. There are 
> multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex 
> pattern to let a consumer to consume those topics. However, if we delete some 
> older topics, it seems that the metadata in consumer does not update properly 
> so It still remember those outdated topic in its topic list, which leads to 
> *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It 
> seems to occur in producer as well. Any idea to solve this problem? Thank you 
> very much!
>  
> Example to reproduce problem:
> There are multiple kafka topics which are 
> "test20190310","test20190311","test20190312" for instance. I run the job and 
> everything is ok. Then if I delete topic "test20190310", the consumer does 
> not perceive the topic is deleted, it will still go fetching metadata of that 
> topic. In taskmanager's log, unknown errors display. 
> {code:java}
> public static void main(String []args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.rest", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> 
> props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>"120");
> Pattern topics = Pattern.compile("^test.*$");
> FlinkKafkaConsumer011 consumer = new 
> FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props);
> DataStream stream = env.addSource(consumer);
> stream.writeToSocket("localhost", 4, new SimpleStringSchema());
> env.execute("test");
> }
> }
> {code}
>   
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
klion26 commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264522628
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
 ##
 @@ -87,4 +88,13 @@
throw new RuntimeException("Can not load class " + 
name, e);
}
}
+
+   private static String addLineNumber(String code) {
 
 Review comment:
   this function is just called by line 80, is this a function for debugging?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
klion26 commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264520929
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
 ##
 @@ -49,7 +49,9 @@ protected GeneratedClass(String className, String code, 
Object[] references) {
@SuppressWarnings("unchecked")
public T newInstance(ClassLoader classLoader) {
try {
-   return (T) 
compile(classLoader).getConstructor(Object[].class).newInstance(references);
+   return (T) 
compile(classLoader).getConstructor(Object[].class)
+   // newInstance(Object ... initargs)
 
 Review comment:
   do we need this comment here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
klion26 commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264520383
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
 ##
 @@ -256,6 +256,14 @@ public Decimal getDecimal(int pos, int precision, int 
scale) {
return 
BinaryGeneric.readBinaryGenericFieldFromSegments(segments, offset, 
getLong(pos));
}
 
+   @Override
+   public byte[] getBinary(int pos) {
+   assertIndexIsValid(pos);
+   int fieldOffset = getFieldOffset(pos);
+   final long offsetAndLen = segments[0].getLong(fieldOffset);
 
 Review comment:
   Do we need to check `segments` is empty or not?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
klion26 commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264522136
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
 ##
 @@ -141,6 +141,16 @@ public void writeRow(int pos, BaseRow input, 
BaseRowSerializer serializer) {
}
}
 
+   @Override
+   public void writeBinary(int pos, byte[] bytes) {
+   int len = bytes.length;
+   if (len <= 7) {
 
 Review comment:
   how about replacing the magic number with a variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
klion26 commented on a change in pull request #7958: 
[FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink 
table
URL: https://github.com/apache/flink/pull/7958#discussion_r264520596
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
 ##
 @@ -77,6 +77,7 @@
try {
compiler.cook(code);
} catch (Throwable t) {
+   System.out.println(addLineNumber(code));
 
 Review comment:
   Maybe this line can be deleted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tweise commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
tweise commented on a change in pull request #7896: [FLINK-9007] [kinesis] 
[e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264519222
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
+import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
+import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
+import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A simple example that shows how to read from and write to Kinesis. This 
will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group 
by some key, and finally
+ * perform a rolling addition on each key for which the results are written 
back to another topic.
+ *
+ * This example also demonstrates using a watermark assigner to generate 
per-partition
+ * watermarks directly in the Flink Kinesis consumer. For demonstration 
purposes, it is assumed that
+ * the String messages formatted as a (word,frequency,timestamp) tuple.
+ *
+ * Example usage:
+ * --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class KinesisExample {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisExample.class);
+
+   /**
+* Interface to the pubsub system for this test.
+*/
+   interface PubsubClient {
+   void createTopic(String topic, int partitions, Properties 
props) throws Exception;
+
+   void sendMessage(String topic, String msg);
+
+   List readAllMessages(String streamName) throws 
Exception;
+   }
+
+   public static void main(String[] args) throws Exception {
+   LOG.info("System properties: {}", System.getProperties());
+   // parse input arguments
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+   StreamExecutionEnvironment env = 
KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+   String inputStream = parameterTool.getRequired("input-stream");
+   String outputStream = 
parameterTool.getRequired("output-stream");
+
+   PubsubClient pubsub = new 
KinesisPubsubClient(parameterTool.getProperties());
+   pubsub.createTopic(inputStream, 2, 
parameterTool.getProperties());
+   pubsub.createTopic(outputStream, 2, 
parameterTool.getProperties());
+
+   FlinkKinesisConsumer consumer = new 
FlinkKinesisConsumer<>(
+   inputStream,
+   new KafkaEventSchema(),
+   parameterTool.getProperties());
+   consumer.setPeriodicWatermarkAssigner(new 
CustomWatermarkExtractor());
+
+   DataStream input = env
+   .addSource(consumer)
+   .keyBy("word")
+   .map(new RollingAdditionMapper());
+
+   Properties producerProperties = new 
Properties(parameterTool.getProperties());
+   // needs region event when URL is specified
+   

[jira] [Commented] (FLINK-11874) Split CheckpointStorage interface to distinguish JM and TM side

2019-03-11 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790221#comment-16790221
 ] 

Yun Tang commented on FLINK-11874:
--

[~yanghua] Thanks for your suggestion.

I thought I had given the why-what-how in the JIRA's description:
*why*: Current interface {{CheckpointStorage}} mixed JM and TM side, which 
would be confusing for users and developers to distinguish them. Especially 
when developers implement some function which should not be called in TM side 
such as init directories.
*what*: {{CheckpointStorage}} would not exist in TM side but also in JM side, 
in which they acted as different roles.
*how*: Split JM's side methods out of current {{CheckpointStorage}} interface 
to a new interface such as {{CheckpointCoordinatorStorage}}. By using this, we 
could cast {{CheckpointStorage}} to {{CheckpointCoordinatorStorage}} within 
checkpoint coordinator so that those methods could be called only in JM side.

This JIRA was first inspired when we review 
[FLINK-11696|https://github.com/apache/flink/pull/7942#discussion_r264335302]'s 
PR, it would be really very helpful to point out anything that should do. 

If you found anything deserved to be discussed more clearly to send a design 
proposal, you could leave your suggestions and concerns in details :)

> Split CheckpointStorage interface to distinguish JM and TM side
> ---
>
> Key: FLINK-11874
> URL: https://issues.apache.org/jira/browse/FLINK-11874
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, interface {{CheckpointStorage}} mixed JM and TM side, which would 
> be confusing for users and developers to distinguish them. 
> Take [FLINK-11696|https://issues.apache.org/jira/browse/FLINK-11696] as an 
> example, the directories should only be created once from JM side. However, 
> since we mixed the JM and TM side, TMs would also create directories again.
> We could let interface {{CheckpointStorage}} only has two methods:
> {code:java}
> CheckpointStreamFactory resolveCheckpointStorageLocation(
>   long checkpointId,
>   CheckpointStorageLocationReference reference)
> CheckpointStateOutputStream createTaskOwnedStateStream()
> {code}
> And a new interface {{ChekcpointCoordinatorStorgae}} could be introduced 
> extending {{CheckpointStorage}} and have methods below:
> {code:java}
> boolean supportsHighlyAvailableStorage()
> boolean hasDefaultSavepointLocation()
> CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer)
> CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId)
> CheckpointStorageLocation initializeLocationForSavepoint(
>   long checkpointId,
>   @Nullable String externalLocationPointer)
> {code}
> With this refactor, JM would only use cast {{ChekcpointCoordinatorStorgae}} 
> so that we could shield TMs from accidentally calling unexpected methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11874) Split CheckpointStorage interface to distinguish JM and TM side

2019-03-11 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-11874:
-
Summary: Split CheckpointStorage interface to distinguish JM and TM side  
(was: [checkpoint] Split CheckpointStorage interface to distinguish JM and TM 
side)

> Split CheckpointStorage interface to distinguish JM and TM side
> ---
>
> Key: FLINK-11874
> URL: https://issues.apache.org/jira/browse/FLINK-11874
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, interface {{CheckpointStorage}} mixed JM and TM side, which would 
> be confusing for users and developers to distinguish them. 
> Take [FLINK-11696|https://issues.apache.org/jira/browse/FLINK-11696] as an 
> example, the directories should only be created once from JM side. However, 
> since we mixed the JM and TM side, TMs would also create directories again.
> We could let interface {{CheckpointStorage}} only has two methods:
> {code:java}
> CheckpointStreamFactory resolveCheckpointStorageLocation(
>   long checkpointId,
>   CheckpointStorageLocationReference reference)
> CheckpointStateOutputStream createTaskOwnedStateStream()
> {code}
> And a new interface {{ChekcpointCoordinatorStorgae}} could be introduced 
> extending {{CheckpointStorage}} and have methods below:
> {code:java}
> boolean supportsHighlyAvailableStorage()
> boolean hasDefaultSavepointLocation()
> CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer)
> CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId)
> CheckpointStorageLocation initializeLocationForSavepoint(
>   long checkpointId,
>   @Nullable String externalLocationPointer)
> {code}
> With this refactor, JM would only use cast {{ChekcpointCoordinatorStorgae}} 
> so that we could shield TMs from accidentally calling unexpected methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #7959: [FLINK-11876] Introduce new TwoInputSelectable, BoundedOneInput and BoundedTwoInput interfaces

2019-03-11 Thread GitBox
flinkbot commented on issue #7959: [FLINK-11876] Introduce new 
TwoInputSelectable, BoundedOneInput and BoundedTwoInput interfaces
URL: https://github.com/apache/flink/pull/7959#issuecomment-471851158
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb opened a new pull request #7959: [FLINK-11876] Introduce new TwoInputSelectable, BoundedOneInput and BoundedTwoInput interfaces

2019-03-11 Thread GitBox
sunhaibotb opened a new pull request #7959: [FLINK-11876] Introduce new 
TwoInputSelectable, BoundedOneInput and BoundedTwoInput interfaces
URL: https://github.com/apache/flink/pull/7959
 
 
   ## What is the purpose of the change
   
   Adds new TwoInputSelectable, BoundedOneInput and BoundedTwoInput interfaces 
for stream operators to support selective reading and EndOfInput event 
processing.
   
   ## Brief change log
   
 - Add new **InputIdentifier** class to represent the identifier for the 
input(s) of the operator.
 - Add new **TwoInputSelectable** class for two-input operators that can 
select the input to get the next StreamRecord.
 - Add new **BoundedOneInput** class for one-input operators that can 
process EndOfInput event.
 - Add new **BoundedTwoInput** class for two-input operators that can 
proceess EndOfInput event.
   
   ## Verifying this change
   
   This change is to add some new interfaces which will be covered by the tests 
of the implemention subtasks 
[FLINK-11877](https://issues.apache.org/jira/browse/FLINK-11877) and 
[FLINK-11878](https://issues.apache.org/jira/browse/FLINK-11878) .
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers:  **no**
 - The runtime per-record code paths (performance sensitive):  **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector:  **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **yes**
 - If yes, how is the feature documented? **JavaDocs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11876) Introduce the new interfaces TwoInputSelectable, BoundedOneInput and BoundedTwoInput

2019-03-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11876:
---
Labels: pull-request-available  (was: )

> Introduce the new interfaces TwoInputSelectable, BoundedOneInput and 
> BoundedTwoInput
> 
>
> Key: FLINK-11876
> URL: https://issues.apache.org/jira/browse/FLINK-11876
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on issue #7949: [FLINK-11856][table-runtime-blink] Introduce BinaryHashTable to batch table runtime

2019-03-11 Thread GitBox
JingsongLi commented on issue #7949: [FLINK-11856][table-runtime-blink] 
Introduce BinaryHashTable to batch table runtime
URL: https://github.com/apache/flink/pull/7949#issuecomment-471850280
 
 
   +1 LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r264514955
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
public void emit(T record) throws IOException, InterruptedException {
broadcastEmit(record);
}
+
+   @Override
+   public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   The fist version is just trying best to reuse the common codes. But it seems 
not good to understand and distinguish these two writers in essential, 
including `BufferBuilder` and `ChannelSelector`. Furthermore in theory the 
single `BufferBuilder` seems more efficient for `BroadcastRecordWriter`. From 
this point, I am willing to keep the current version if there are no better 
solutions currently. 
   
   I would refactor the commit and related tests based on current idea if you 
have no other concerns. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
flinkbot commented on issue #7958: [FLINK-11881][table-planner-blink] Introduce 
code generated typed sort to blink table
URL: https://github.com/apache/flink/pull/7958#issuecomment-471844573
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #6631: [FLINK-10229] [sql-client] Support listing of views

2019-03-11 Thread GitBox
yanghua commented on issue #6631: [FLINK-10229] [sql-client] Support listing of 
views
URL: https://github.com/apache/flink/pull/6631#issuecomment-471844427
 
 
   Have fixed some conflicts and rebased this PR. It has stayed for more than 
half a year, hope for reviewing. cc @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11881) Introduce code generated typed sort to blink table

2019-03-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11881:
---
Labels: pull-request-available  (was: )

> Introduce code generated typed sort to blink table
> --
>
> Key: FLINK-11881
> URL: https://issues.apache.org/jira/browse/FLINK-11881
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators, SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>
> Introduce SortCodeGenerator (CodeGen efficient computation and comparison of  
> NormalizedKey):
> support sort by primitive type, string, decimal...
> support sort by ArrayType
> support sort by RowType(Nested Struct)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi opened a new pull request #7958: [FLINK-11881][table-planner-blink] Introduce code generated typed sort to blink table

2019-03-11 Thread GitBox
JingsongLi opened a new pull request #7958: [FLINK-11881][table-planner-blink] 
Introduce code generated typed sort to blink table
URL: https://github.com/apache/flink/pull/7958
 
 
   ## What is the purpose of the change
   
   Introduce SortCodeGenerator (CodeGen efficient computation and comparison of 
 NormalizedKey):
   support sort by primitive type, string, decimal...
   support sort by ArrayType
   support sort by RowType(Nested Struct)
   
   ## Verifying this change
   
   ut by random data
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on issue #7911: [FLINK-11082][network] Fix the logic of 
getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#issuecomment-471841116
 
 
   @pnowojski , thanks for your reviews and suggestions!
   
   I would remove the second synchronized buffers during `getBuffersInBacklog`, 
so it would bring any regression in theory.  I agree with we should verify this 
changes would make some effects for certain scenarios. I would verify the 
current benchmark to confirm it. If no obvious improvement, I could adjust the 
scenarios to fit this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264509124
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ##
 @@ -156,7 +156,8 @@ public BufferAndBacklog getNextBuffer() throws 
IOException, InterruptedException
checkState(nextBuffer.isFinished(),
"We can only read from 
SpillableSubpartition after it was finished");
 
-   newBacklog = 
parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
+   
parent.decreaseBuffersInBacklog(nextBuffer.isBuffer());
+   newBacklog = parent.getBuffersInBacklog();
 
 Review comment:
   1. The current backlog is based on `flushTriggered` or `isFinished` status 
after changing, if we want to return backlog during `decreaseBuffersInBacklog`, 
we might need to pass these info into `decreaseBuffersInBacklog(boolean 
flushTriggered | isFinished)`. In order not to dirty this method, I split it 
into a separate method. Maybe it does not seem so bad if passing parameter into 
`decreaseBuffersInBacklog`. So we could still merge these two into one method 
if you prefer.
   
   2. Yes, you are right. Here we could get backlog in unsafe way, no need to 
add the synchronized overhead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264508959
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -116,52 +115,58 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
-   /**
-* Gets the number of non-event buffers in this subpartition.
-*
-* Beware: This method should only be used in tests 
in non-concurrent access
-* scenarios since it does not make any concurrency guarantees.
-*/
-   @VisibleForTesting
-   public int getBuffersInBacklog() {
-   return buffersInBacklog;
-   }
-
/**
 * Makes a best effort to get the current size of the queue.
 * This method must not acquire locks or interfere with the task and 
network threads in
 * any way.
 */
public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   public abstract int getBuffersInBacklog();
+
+   /**
+* @param lastBufferAvailable whether the last buffer in this 
subpartition is available for consumption
+* @return the number of non-event buffers in this subpartition
+*/
+   protected int getBuffersInBacklog(boolean lastBufferAvailable) {
+   assert Thread.holdsLock(buffers);
+
+   if (lastBufferAvailable) {
+   return buffersInBacklog;
+   } else {
+   return Math.max(buffersInBacklog - 1, 0);
+   }
+   }
+
/**
 * Decreases the number of non-event buffers by one after fetching a 
non-event
 * buffer from this subpartition (for access by the subpartition views).
-*
-* @return backlog after the operation
 */
-   public int decreaseBuffersInBacklog(Buffer buffer) {
+   public void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
synchronized (buffers) {
-   return decreaseBuffersInBacklogUnsafe(buffer != null && 
buffer.isBuffer());
+   decreaseBuffersInBacklog(isBuffer);
}
}
 
-   protected int decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
+   protected void decreaseBuffersInBacklog(boolean isBuffer) {
assert Thread.holdsLock(buffers);
+
if (isBuffer) {
buffersInBacklog--;
}
-   return buffersInBacklog;
}
 
/**
 * Increases the number of non-event buffers by one after adding a 
non-event
 * buffer into this subpartition.
 */
-   protected void increaseBuffersInBacklog(BufferConsumer buffer) {
+   protected void increaseBuffersInBacklog(boolean isBuffer) {
 
 Review comment:
   Yes, I try to keep it consistent with another method. I should make it in a 
separate commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264508867
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -116,52 +115,58 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
-   /**
-* Gets the number of non-event buffers in this subpartition.
-*
-* Beware: This method should only be used in tests 
in non-concurrent access
-* scenarios since it does not make any concurrency guarantees.
-*/
-   @VisibleForTesting
-   public int getBuffersInBacklog() {
-   return buffersInBacklog;
-   }
-
/**
 * Makes a best effort to get the current size of the queue.
 * This method must not acquire locks or interfere with the task and 
network threads in
 * any way.
 */
public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   public abstract int getBuffersInBacklog();
+
+   /**
+* @param lastBufferAvailable whether the last buffer in this 
subpartition is available for consumption
+* @return the number of non-event buffers in this subpartition
+*/
+   protected int getBuffersInBacklog(boolean lastBufferAvailable) {
+   assert Thread.holdsLock(buffers);
+
+   if (lastBufferAvailable) {
+   return buffersInBacklog;
+   } else {
+   return Math.max(buffersInBacklog - 1, 0);
+   }
+   }
+
/**
 * Decreases the number of non-event buffers by one after fetching a 
non-event
 * buffer from this subpartition (for access by the subpartition views).
-*
-* @return backlog after the operation
 */
-   public int decreaseBuffersInBacklog(Buffer buffer) {
+   public void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
 
 Review comment:
   I misunderstood the semantic of `Unsafe` before. I define the `Unsafe` based 
on the external calling not internal implementation of 
`decreaseBuffersInBacklog`. In other words, if external method calls 
`decreaseBuffersInBacklog`, it should confirm the buffers are synchronized in 
the external method to call this safe, otherwise external method should call 
`decreaseBuffersInBacklogUnsafe`.
   
   Now I understand your point of definition, and it seems more reasonable 
based on internal confirmation of `decreaseBuffersInBacklog`. I would revert 
this change as before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11881) Introduce code generated typed sort to blink table

2019-03-11 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-11881:


 Summary: Introduce code generated typed sort to blink table
 Key: FLINK-11881
 URL: https://issues.apache.org/jira/browse/FLINK-11881
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Operators, SQL / Planner
Reporter: Jingsong Lee
Assignee: Jingsong Lee


Introduce SortCodeGenerator (CodeGen efficient computation and comparison of  
NormalizedKey):

support sort by primitive type, string, decimal...

support sort by ArrayType

support sort by RowType(Nested Struct)

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264508671
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -116,52 +115,58 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
-   /**
-* Gets the number of non-event buffers in this subpartition.
-*
-* Beware: This method should only be used in tests 
in non-concurrent access
-* scenarios since it does not make any concurrency guarantees.
-*/
-   @VisibleForTesting
-   public int getBuffersInBacklog() {
-   return buffersInBacklog;
-   }
-
/**
 * Makes a best effort to get the current size of the queue.
 * This method must not acquire locks or interfere with the task and 
network threads in
 * any way.
 */
public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   public abstract int getBuffersInBacklog();
+
+   /**
+* @param lastBufferAvailable whether the last buffer in this 
subpartition is available for consumption
+* @return the number of non-event buffers in this subpartition
+*/
+   protected int getBuffersInBacklog(boolean lastBufferAvailable) {
 
 Review comment:
   That is very good question. We define the variable `buffersInBacklog` for 
counting the non-event buffers in the queue. `getNumberOfFinishedBuffers` and 
`isAvailableUnsafe` are counting both event and non-event buffers. So they can 
not be unified directly. I even tried another way by counting only event 
buffers instead of current `buffersInBacklog`, but it still does not simplify 
the logic. Especially for `SpillableSubpartition` we could not get total 
buffers directly from the memory queue, then we still need non-event buffer 
counting even though we count the event buffers.
   
   I also find that the current conditions seem a bit complicated to maintain 
in different processes, such as `shouldNotifyDataAvailable`, `isAvailable`, 
`getBuffersInBacklog`, etc. And it would be better if we can integrate 
something to make related conditions unification. After we have a good idea for 
this issue, I would like to make changes. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264508324
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -116,52 +115,58 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
-   /**
-* Gets the number of non-event buffers in this subpartition.
-*
-* Beware: This method should only be used in tests 
in non-concurrent access
-* scenarios since it does not make any concurrency guarantees.
-*/
-   @VisibleForTesting
-   public int getBuffersInBacklog() {
-   return buffersInBacklog;
-   }
-
/**
 * Makes a best effort to get the current size of the queue.
 * This method must not acquire locks or interfere with the task and 
network threads in
 * any way.
 */
public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   public abstract int getBuffersInBacklog();
+
+   /**
+* @param lastBufferAvailable whether the last buffer in this 
subpartition is available for consumption
+* @return the number of non-event buffers in this subpartition
+*/
+   protected int getBuffersInBacklog(boolean lastBufferAvailable) {
 
 Review comment:
   That is very good question. We define the variable `buffersInBacklog` for 
counting the non-event buffers in the queue. `getNumberOfFinishedBuffers` and 
`isAvailableUnsafe` are counting both event and non-event buffers. So they can 
not be unified directly. I even tried another way by counting only event 
buffers instead of current `buffersInBacklog`, but it still does not simplify 
the logic. Especially for `SpillableSubpartition` we could not get total 
buffers directly from the memory queue,  then we still need non-event buffer 
counting even though we count the event buffers.
   
   I also find that the current conditions seem a bit complicated to maintain 
in different processes, such as `shouldNotifyDataAvailable`, `isAvailable`, 
`getBuffersInBacklog`, etc. And it would be better if we can integrate 
something to make related conditions unification. After we have a good idea for 
this issue, I would like to make changes. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11490) Add an initial Blink SQL batch runtime

2019-03-11 Thread Jingsong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-11490:


Assignee: Jingsong Lee

> Add an initial Blink SQL batch runtime
> --
>
> Key: FLINK-11490
> URL: https://issues.apache.org/jira/browse/FLINK-11490
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Table SQL
>Reporter: Timo Walther
>Assignee: Jingsong Lee
>Priority: Major
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> This issue is an umbrella issue for tasks related to the merging of Blink 
> batch runtime features. The goal is to provide minimum viable product (MVP) 
> to batch users.
> An exact list of batch features, their properties, and dependencies needs to 
> be defined.
> The type system might not have been reworked at this stage. Operations might 
> not be executed with the full performance until changes in other Flink core 
> components have taken place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11880) Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput

2019-03-11 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun closed FLINK-11880.
-
Resolution: Duplicate

It repeats FLINK-11879 .

> Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput 
> and BoundedTwoInput
> ---
>
> Key: FLINK-11880
> URL: https://issues.apache.org/jira/browse/FLINK-11880
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> - Rejects the jobs containing operators which were implemented 
> `TwoInputSelectable` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `BoundedInput` or `BoundedTwoInput` in case of enabled checkpointing.
>  - Rejects the jobs containing operators which were implemented 
> `TwoInputSelectable` in case that credit-based flow control is disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264503784
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ##
 @@ -156,7 +156,8 @@ public BufferAndBacklog getNextBuffer() throws 
IOException, InterruptedException
checkState(nextBuffer.isFinished(),
"We can only read from 
SpillableSubpartition after it was finished");
 
-   newBacklog = 
parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
+   
parent.decreaseBuffersInBacklog(nextBuffer.isBuffer());
+   newBacklog = parent.getBuffersInBacklog();
 
 Review comment:
   1. The current backlog is based on `flushTriggered` or `isFinished` status 
after changing, if we want to return backlog during `decreaseBuffersInBacklog`, 
we might need to pass these info into `decreaseBuffersInBacklog(boolean 
flushTriggered | isFinished)`. In order not to dirty this method, I split it 
into a separate method. Maybe it does not seem so bad if passing parameter into 
`decreaseBuffersInBacklog `. So we could still merge these two into one method 
if you prefer.
   
   2. Yes, you are right. Here we could get backlog in unsafe way, no need to 
add the synchronized overhead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264503784
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ##
 @@ -156,7 +156,8 @@ public BufferAndBacklog getNextBuffer() throws 
IOException, InterruptedException
checkState(nextBuffer.isFinished(),
"We can only read from 
SpillableSubpartition after it was finished");
 
-   newBacklog = 
parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
+   
parent.decreaseBuffersInBacklog(nextBuffer.isBuffer());
+   newBacklog = parent.getBuffersInBacklog();
 
 Review comment:
   1. The current backlog is based on `flushTriggered` or `finished` status 
after changing, if we want to return backlog during `decreaseBuffersInBacklog`, 
we might need to pass these info into `decreaseBuffersInBacklog(boolean 
`flushTriggered` | `finished`)`. In order not to dirty this method, I split it 
into a separate method. Maybe it does not seem so bad if passing parameter into 
`decreaseBuffersInBacklog `. So we could still merge these two into one method 
if you prefer.
   
   2. Yes, you are right. Here we could get backlog in unsafe way, no need to 
add the synchronized overhead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11879) Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput

2019-03-11 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-11879:
-

 Summary: Add JobGraph validators for the uses of 
TwoInputSelectable, BoundedOneInput and BoundedTwoInput
 Key: FLINK-11879
 URL: https://issues.apache.org/jira/browse/FLINK-11879
 Project: Flink
  Issue Type: Sub-task
Reporter: Haibo Sun
Assignee: Haibo Sun


- Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented `BoundedInput` 
or `BoundedTwoInput` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case that credit-based flow control is disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11880) Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput

2019-03-11 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-11880:
-

 Summary: Add JobGraph validators for the uses of 
TwoInputSelectable, BoundedOneInput and BoundedTwoInput
 Key: FLINK-11880
 URL: https://issues.apache.org/jira/browse/FLINK-11880
 Project: Flink
  Issue Type: Sub-task
Reporter: Haibo Sun
Assignee: Haibo Sun


- Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented `BoundedInput` 
or `BoundedTwoInput` in case of enabled checkpointing.
 - Rejects the jobs containing operators which were implemented 
`TwoInputSelectable` in case that credit-based flow control is disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11877) Implement the runtime handling of TwoInputSelectable

2019-03-11 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-11877:
--
Description: 
- Introduces a new class `Input` to represent the logical input of operators.
 - Introduces a new class `StreamTwoInputSelectableProcessor` to implement 
selectively reading.
 - Adds benchmarks for `StreamTwoInputProcessor` and 
`StreamTwoInputSelectableProcessor` to ensure that 
StreamTwoInputSelectableProcessor's throughput is the same or the regression is 
acceptable in the case of constant `ALL`.

  was:
- Introduces a new class `Input` to represent the logical input of operators.
 - Introduces a new class `StreamTwoInputSelectableProcessor` to implement 
selectively reading.
 - Adds benchmarks for `StreamTwoInputProcessor` and 
`StreamTwoInputSelectableProcessor` to ensure good performance.


> Implement the runtime handling of TwoInputSelectable
> 
>
> Key: FLINK-11877
> URL: https://issues.apache.org/jira/browse/FLINK-11877
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> - Introduces a new class `Input` to represent the logical input of operators.
>  - Introduces a new class `StreamTwoInputSelectableProcessor` to implement 
> selectively reading.
>  - Adds benchmarks for `StreamTwoInputProcessor` and 
> `StreamTwoInputSelectableProcessor` to ensure that 
> StreamTwoInputSelectableProcessor's throughput is the same or the regression 
> is acceptable in the case of constant `ALL`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11878) Implement the runtime handling of BoundedOneInput and BoundedTwoInput

2019-03-11 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-11878:
-

 Summary: Implement the runtime handling of BoundedOneInput and 
BoundedTwoInput
 Key: FLINK-11878
 URL: https://issues.apache.org/jira/browse/FLINK-11878
 Project: Flink
  Issue Type: Sub-task
Reporter: Haibo Sun
Assignee: Haibo Sun






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11877) Implement the runtime handling of TwoInputSelectable

2019-03-11 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-11877:
-

 Summary: Implement the runtime handling of TwoInputSelectable
 Key: FLINK-11877
 URL: https://issues.apache.org/jira/browse/FLINK-11877
 Project: Flink
  Issue Type: Sub-task
Reporter: Haibo Sun
Assignee: Haibo Sun


- Introduces a new class `Input` to represent the logical input of operators.
 - Introduces a new class `StreamTwoInputSelectableProcessor` to implement 
selectively reading.
 - Adds benchmarks for `StreamTwoInputProcessor` and 
`StreamTwoInputSelectableProcessor` to ensure good performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11876) Introduce the new interfaces TwoInputSelectable, BoundedOneInput and BoundedTwoInput

2019-03-11 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-11876:
-

 Summary: Introduce the new interfaces TwoInputSelectable, 
BoundedOneInput and BoundedTwoInput
 Key: FLINK-11876
 URL: https://issues.apache.org/jira/browse/FLINK-11876
 Project: Flink
  Issue Type: Sub-task
Reporter: Haibo Sun
Assignee: Haibo Sun






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264501442
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -116,52 +115,58 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
-   /**
-* Gets the number of non-event buffers in this subpartition.
-*
-* Beware: This method should only be used in tests 
in non-concurrent access
-* scenarios since it does not make any concurrency guarantees.
-*/
-   @VisibleForTesting
-   public int getBuffersInBacklog() {
-   return buffersInBacklog;
-   }
-
/**
 * Makes a best effort to get the current size of the queue.
 * This method must not acquire locks or interfere with the task and 
network threads in
 * any way.
 */
public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   public abstract int getBuffersInBacklog();
+
+   /**
+* @param lastBufferAvailable whether the last buffer in this 
subpartition is available for consumption
+* @return the number of non-event buffers in this subpartition
+*/
+   protected int getBuffersInBacklog(boolean lastBufferAvailable) {
+   assert Thread.holdsLock(buffers);
+
+   if (lastBufferAvailable) {
+   return buffersInBacklog;
+   } else {
+   return Math.max(buffersInBacklog - 1, 0);
+   }
+   }
+
/**
 * Decreases the number of non-event buffers by one after fetching a 
non-event
 * buffer from this subpartition (for access by the subpartition views).
-*
-* @return backlog after the operation
 */
-   public int decreaseBuffersInBacklog(Buffer buffer) {
+   public void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
synchronized (buffers) {
-   return decreaseBuffersInBacklogUnsafe(buffer != null && 
buffer.isBuffer());
+   decreaseBuffersInBacklog(isBuffer);
}
}
 
-   protected int decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
+   protected void decreaseBuffersInBacklog(boolean isBuffer) {
assert Thread.holdsLock(buffers);
+
if (isBuffer) {
buffersInBacklog--;
}
-   return buffersInBacklog;
}
 
/**
 * Increases the number of non-event buffers by one after adding a 
non-event
 * buffer into this subpartition.
 */
-   protected void increaseBuffersInBacklog(BufferConsumer buffer) {
+   protected void increaseBuffersInBacklog(boolean isBuffer) {
 
 Review comment:
   Yes, I try to keep it consistent with another method. I should make it in a 
separate commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11875) Enhance stream operator API to support selective reading and EndOfInput event

2019-03-11 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-11875:
--
Description: 
Towards the goal that unify Streaming and Batch, this jira proposes enhanced 
stream operator api to support selective reading and EndOfInput event for 
currently Batch’s requirements.

Design Doc: 
[https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=sharing|http://example.com/]

Discussion Mail: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhance-Operator-API-to-Support-Dynamically-Selective-Reading-and-EndOfInput-Event-td26753.html|http://example.com/]

  was:
Towards the goal that unify Streaming and Batch, this jira proposes enhanced 
stream operator api to support selective reading and EndOfInput event.

Design Doc:

[https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=sharing|http://example.com/]

Discussion Mail:

[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhance-Operator-API-to-Support-Dynamically-Selective-Reading-and-EndOfInput-Event-td26753.html|http://example.com/]


> Enhance stream operator API to support selective reading and EndOfInput event
> -
>
> Key: FLINK-11875
> URL: https://issues.apache.org/jira/browse/FLINK-11875
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Operators
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> Towards the goal that unify Streaming and Batch, this jira proposes enhanced 
> stream operator api to support selective reading and EndOfInput event for 
> currently Batch’s requirements.
> Design Doc: 
> [https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=sharing|http://example.com/]
> Discussion Mail: 
> [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhance-Operator-API-to-Support-Dynamically-Selective-Reading-and-EndOfInput-Event-td26753.html|http://example.com/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-11 Thread GitBox
zhijiangW commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r264501226
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -116,52 +115,58 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
-   /**
-* Gets the number of non-event buffers in this subpartition.
-*
-* Beware: This method should only be used in tests 
in non-concurrent access
-* scenarios since it does not make any concurrency guarantees.
-*/
-   @VisibleForTesting
-   public int getBuffersInBacklog() {
-   return buffersInBacklog;
-   }
-
/**
 * Makes a best effort to get the current size of the queue.
 * This method must not acquire locks or interfere with the task and 
network threads in
 * any way.
 */
public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   public abstract int getBuffersInBacklog();
+
+   /**
+* @param lastBufferAvailable whether the last buffer in this 
subpartition is available for consumption
+* @return the number of non-event buffers in this subpartition
+*/
+   protected int getBuffersInBacklog(boolean lastBufferAvailable) {
+   assert Thread.holdsLock(buffers);
+
+   if (lastBufferAvailable) {
+   return buffersInBacklog;
+   } else {
+   return Math.max(buffersInBacklog - 1, 0);
+   }
+   }
+
/**
 * Decreases the number of non-event buffers by one after fetching a 
non-event
 * buffer from this subpartition (for access by the subpartition views).
-*
-* @return backlog after the operation
 */
-   public int decreaseBuffersInBacklog(Buffer buffer) {
+   public void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
 
 Review comment:
   I misunderstood the semantic of `Unsafe` before. I define the `Unsafe` based 
on the external calling not internal implementation of 
`decreaseBuffersInBacklog`. In other words, if external method calls 
`decreaseBuffersInBacklog`, it should confirm the buffers are synchronized in 
the external method to call this safe, otherwise external method should call 
`decreaseBuffersInBacklogUnsafe`.
   
   Now I understand your point of definition, and it seems more reasonable 
based on internal confirmation of `decreaseBuffersInBacklog`. I would revert 
this change as before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11875) Enhance stream operator API to support selective reading and EndOfInput event

2019-03-11 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-11875:
-

 Summary: Enhance stream operator API to support selective reading 
and EndOfInput event
 Key: FLINK-11875
 URL: https://issues.apache.org/jira/browse/FLINK-11875
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Operators
Reporter: Haibo Sun
Assignee: Haibo Sun


Towards the goal that unify Streaming and Batch, this jira proposes enhanced 
stream operator api to support selective reading and EndOfInput event.

Design Doc:

[https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=sharing|http://example.com/]

Discussion Mail:

[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhance-Operator-API-to-Support-Dynamically-Selective-Reading-and-EndOfInput-Event-td26753.html|http://example.com/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11835) ZooKeeperLeaderElectionITCase#testJobExecutionOnClusterWithLeaderChange failed

2019-03-11 Thread chunpinghe (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790135#comment-16790135
 ] 

chunpinghe commented on FLINK-11835:


i can't reproduce this bug.

> ZooKeeperLeaderElectionITCase#testJobExecutionOnClusterWithLeaderChange failed
> --
>
> Key: FLINK-11835
> URL: https://issues.apache.org/jira/browse/FLINK-11835
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Priority: Critical
>  Labels: test-stability
>
> {noformat}
> 20:44:07.264 [ERROR] 
> testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase)
>   Time elapsed: 4.625 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (2e957dc4f49feaed042eb8b4a7932610)
>   at 
> org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:152)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
> not find Flink job (2e957dc4f49feaed042eb8b4a7932610)
>   at 
> org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:149)
> {noformat}
> https://api.travis-ci.org/v3/job/502210892/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-11835) ZooKeeperLeaderElectionITCase#testJobExecutionOnClusterWithLeaderChange failed

2019-03-11 Thread chunpinghe (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chunpinghe updated FLINK-11835:
---
Comment: was deleted

(was: is it possible that the recoveryOperation hasn't  finished  which causes 
requestJobResult method to  throw FlinkJobNotFoundException.

requestJobResult should wait recoveryOperation complete ?

 )

> ZooKeeperLeaderElectionITCase#testJobExecutionOnClusterWithLeaderChange failed
> --
>
> Key: FLINK-11835
> URL: https://issues.apache.org/jira/browse/FLINK-11835
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Priority: Critical
>  Labels: test-stability
>
> {noformat}
> 20:44:07.264 [ERROR] 
> testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase)
>   Time elapsed: 4.625 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (2e957dc4f49feaed042eb8b4a7932610)
>   at 
> org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:152)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
> not find Flink job (2e957dc4f49feaed042eb8b4a7932610)
>   at 
> org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:149)
> {noformat}
> https://api.travis-ci.org/v3/job/502210892/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11870) Code refactor for some details

2019-03-11 Thread shiwuliang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shiwuliang closed FLINK-11870.
--
Resolution: Incomplete

> Code refactor for some details
> --
>
> Key: FLINK-11870
> URL: https://issues.apache.org/jira/browse/FLINK-11870
> Project: Flink
>  Issue Type: Improvement
>Reporter: shiwuliang
>Priority: Trivial
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Recently, when I read the source code, I found that there are some points 
> that can be optimized in the source code, and it still seems to have not been 
> completely solved, for example:
>  * Use computeIfAbsent
>  * extra calls
> I think I can try to optimize them. This is just a very small problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carryxyh closed pull request #7948: [FLINK-11870] Code refactor for some details.

2019-03-11 Thread GitBox
carryxyh closed pull request #7948: [FLINK-11870] Code refactor for some 
details.
URL: https://github.com/apache/flink/pull/7948
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11870) Code refactor for some details

2019-03-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11870:
---
Labels: pull-request-available  (was: )

> Code refactor for some details
> --
>
> Key: FLINK-11870
> URL: https://issues.apache.org/jira/browse/FLINK-11870
> Project: Flink
>  Issue Type: Improvement
>Reporter: shiwuliang
>Priority: Trivial
>  Labels: pull-request-available
>
> Recently, when I read the source code, I found that there are some points 
> that can be optimized in the source code, and it still seems to have not been 
> completely solved, for example:
>  * Use computeIfAbsent
>  * extra calls
> I think I can try to optimize them. This is just a very small problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carryxyh commented on issue #7948: [FLINK-11870] Code refactor for some details.

2019-03-11 Thread GitBox
carryxyh commented on issue #7948: [FLINK-11870] Code refactor for some details.
URL: https://github.com/apache/flink/pull/7948#issuecomment-471820012
 
 
   Got it.
   Thanks for reminding me about this. I'd like close this first.
   If I find more places to optimize when reading the source code, I will 
resubmit pr. Thx.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2019-03-11 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790118#comment-16790118
 ] 

vinoyang commented on FLINK-8871:
-

[~yunta] Your solution sounds good, but it would wait for other things to be 
done, more details: discussion under FLINK-10966.

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11874) [checkpoint] Split CheckpointStorage interface to distinguish JM and TM side

2019-03-11 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790121#comment-16790121
 ] 

vinoyang commented on FLINK-11874:
--

I suggest you remove the "checkpoint" tag from the issue title. About your 
idea, if there is a design proposal, it would look better and help discuss.

> [checkpoint] Split CheckpointStorage interface to distinguish JM and TM side
> 
>
> Key: FLINK-11874
> URL: https://issues.apache.org/jira/browse/FLINK-11874
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, interface {{CheckpointStorage}} mixed JM and TM side, which would 
> be confusing for users and developers to distinguish them. 
> Take [FLINK-11696|https://issues.apache.org/jira/browse/FLINK-11696] as an 
> example, the directories should only be created once from JM side. However, 
> since we mixed the JM and TM side, TMs would also create directories again.
> We could let interface {{CheckpointStorage}} only has two methods:
> {code:java}
> CheckpointStreamFactory resolveCheckpointStorageLocation(
>   long checkpointId,
>   CheckpointStorageLocationReference reference)
> CheckpointStateOutputStream createTaskOwnedStateStream()
> {code}
> And a new interface {{ChekcpointCoordinatorStorgae}} could be introduced 
> extending {{CheckpointStorage}} and have methods below:
> {code:java}
> boolean supportsHighlyAvailableStorage()
> boolean hasDefaultSavepointLocation()
> CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer)
> CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId)
> CheckpointStorageLocation initializeLocationForSavepoint(
>   long checkpointId,
>   @Nullable String externalLocationPointer)
> {code}
> With this refactor, JM would only use cast {{ChekcpointCoordinatorStorgae}} 
> so that we could shield TMs from accidentally calling unexpected methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wujinhu commented on issue #7798: [FLINK-11388][fs] Add Aliyun OSS recoverable writer

2019-03-11 Thread GitBox
wujinhu commented on issue #7798: [FLINK-11388][fs] Add Aliyun OSS recoverable 
writer
URL: https://github.com/apache/flink/pull/7798#issuecomment-471811588
 
 
   @StefanRRichter would you please help review this PR? Thanks :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11835) ZooKeeperLeaderElectionITCase#testJobExecutionOnClusterWithLeaderChange failed

2019-03-11 Thread chunpinghe (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16789411#comment-16789411
 ] 

chunpinghe edited comment on FLINK-11835 at 3/12/19 1:14 AM:
-

is it possible that the recoveryOperation hasn't  finished  which causes 
requestJobResult method to  throw FlinkJobNotFoundException.

requestJobResult should wait recoveryOperation complete ?

 


was (Author: moxian):
is it possible that the recoveryOperation wasn't  finished  which causes 
requestJobResult method to  throw FlinkJobNotFoundException.

requestJobResult should wait recoveryOperation complete ?

 

> ZooKeeperLeaderElectionITCase#testJobExecutionOnClusterWithLeaderChange failed
> --
>
> Key: FLINK-11835
> URL: https://issues.apache.org/jira/browse/FLINK-11835
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Priority: Critical
>  Labels: test-stability
>
> {noformat}
> 20:44:07.264 [ERROR] 
> testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase)
>   Time elapsed: 4.625 s  <<< ERROR!
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (2e957dc4f49feaed042eb8b4a7932610)
>   at 
> org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:152)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
> not find Flink job (2e957dc4f49feaed042eb8b4a7932610)
>   at 
> org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:149)
> {noformat}
> https://api.travis-ci.org/v3/job/502210892/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] igalshilman commented on issue #7957: [FLINK-11865]

2019-03-11 Thread GitBox
igalshilman commented on issue #7957: [FLINK-11865]
URL: https://github.com/apache/flink/pull/7957#issuecomment-471774248
 
 
   @flinkbot attention @aljoscha 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException

2019-03-11 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16790033#comment-16790033
 ] 

Jürgen Kreileder commented on FLINK-11420:
--

[~dawidwys], looks good so far on 1.7.3. Thanks!

 

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> ---
>
> Key: FLINK-11420
> URL: https://issues.apache.org/jira/browse/FLINK-11420
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> We frequently run into random ArrayIndexOutOfBounds exceptions when flink 
> tries to serialize Scala case classes containing a Map[String, Any] (Any 
> being String, Long, Int, or Boolean) with the FsStateBackend. (This probably 
> happens with any case class containing a type requiring Kryo, see this thread 
> for instance: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e])
> Disabling asynchronous snapshots seems to work around the problem, so maybe 
> something is not thread-safe in CaseClassSerializer.
> Our objects look like this:
> {code}
> case class Event(timestamp: Long, [...], content: Map[String, Any]
> case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any])
> {code}
> I've looked at a few of the exceptions in a debugger. It always happens when 
> serializing the right-hand side a tuple from EnrichedEvent -> Event -> 
> content, e.g: 13 from ("foo", 13) or false from ("bar", false).
> Stacktrace:
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0
>  at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>  at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69)
>  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #7957: [FLINK-11865]

2019-03-11 Thread GitBox
flinkbot commented on issue #7957: [FLINK-11865]
URL: https://github.com/apache/flink/pull/7957#issuecomment-471773038
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] igalshilman opened a new pull request #7957: [FLINK-11865]

2019-03-11 Thread GitBox
igalshilman opened a new pull request #7957: [FLINK-11865]
URL: https://github.com/apache/flink/pull/7957
 
 
   
   
   ## What is the purpose of the change
   
   The `TraversableSerializer` is generating, at runtime, code that evaluates 
to an instance of a `CanBuildFrom`. This operation makes job 
submissions/translation prohibitively slow, and must be cached.
   This PR adds a cache to the `TraversableSerializer`.
   
   ## Brief change log
   
 - da624a47ae Minor refactoring
 - 03f18df5ed Add and use the cache
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`TraversableSerializerSnapshotMigrationTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (**yes** / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11865) Code generation in TraversableSerializer is prohibitively slow

2019-03-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11865:
---
Labels: pull-request-available  (was: )

> Code generation in TraversableSerializer is prohibitively slow
> --
>
> Key: FLINK-11865
> URL: https://issues.apache.org/jira/browse/FLINK-11865
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Assignee: Igal Shilman
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> As discussed in FLINK-11539, the new code generation makes job 
> submissions/translation prohibitively slow.
> The solution should be to introduce a Cache for the generated code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11865) Code generation in TraversableSerializer is prohibitively slow

2019-03-11 Thread Igal Shilman (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igal Shilman reassigned FLINK-11865:


Assignee: Igal Shilman

> Code generation in TraversableSerializer is prohibitively slow
> --
>
> Key: FLINK-11865
> URL: https://issues.apache.org/jira/browse/FLINK-11865
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Assignee: Igal Shilman
>Priority: Critical
> Fix For: 1.8.0
>
>
> As discussed in FLINK-11539, the new code generation makes job 
> submissions/translation prohibitively slow.
> The solution should be to introduce a Cache for the generated code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11680) Make query plan easier to read

2019-03-11 Thread Haisheng Yuan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16789911#comment-16789911
 ] 

Haisheng Yuan commented on FLINK-11680:
---

Thanks, this is a good one. We can move this issue to Calcite, so that all 
Calcite based system can benefit from this feature.

> Make query plan easier to read
> --
>
> Key: FLINK-11680
> URL: https://issues.apache.org/jira/browse/FLINK-11680
> Project: Flink
>  Issue Type: New Feature
>  Components: SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
> Attachments: image-2019-02-20-20-05-06-506.png, screenshot-1.png
>
>
>  The query plan generated by RelOptUtil#toString is hard to read, especially 
> the query is very complex(multiple joins or unions). 
> There is a query plan of tpcds q25.sql generated by RelOptUtil#toString:
>  !image-2019-02-20-20-05-06-506.png! 
> We can improve the utility method to make the query plan more readable, like:
>  !screenshot-1.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7235) Backport CALCITE-1884 to the Flink repository before Calcite 1.14

2019-03-11 Thread Xiening Dai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16789878#comment-16789878
 ] 

Xiening Dai commented on FLINK-7235:


I believe this one has been done by commit d5770fe8dd1. We should just close 
this.

> Backport CALCITE-1884 to the Flink repository before Calcite 1.14
> -
>
> Key: FLINK-7235
> URL: https://issues.apache.org/jira/browse/FLINK-7235
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Table SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Major
>
> We need to backport CALCITE-1884 in order to unblock upgrading Calcite to 
> 1.13.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tweise commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
tweise commented on a change in pull request #7896: [FLINK-9007] [kinesis] 
[e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264372875
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test driver for {@link KinesisExample#main}.
+ */
+public class KinesisExampleTest {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisExampleTest.class);
+
+   /**
+* Interface to the pubsub system for this test.
+*/
+   interface PubsubClient {
+   void createTopic(String topic, int partitions, Properties 
props) throws Exception;
+
+   void sendMessage(String topic, String msg);
+
+   List readAllMessages(String streamName) throws 
Exception;
+   }
+
+   public static void main(String[] args) throws Exception {
+   LOG.info("System properties: {}", System.getProperties());
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+   String inputStream = parameterTool.getRequired("input-stream");
+   String outputStream = 
parameterTool.getRequired("output-stream");
+
+   PubsubClient pubsub = new 
KinesisPubsubClient(parameterTool.getProperties());
+   pubsub.createTopic(inputStream, 2, 
parameterTool.getProperties());
+   pubsub.createTopic(outputStream, 2, 
parameterTool.getProperties());
+
+   // job and test driver need to run in parallel
+   final AtomicReference executeException = new 
AtomicReference<>();
+   Thread executeThread =
+   new Thread(
+   () -> {
+   try {
+   KinesisExample.main(args);
+   LOG.info("executed program");
+   } catch (Exception e) {
+   executeException.set(e);
+   }
+   });
+   executeThread.start();
 
 Review comment:
   The thread that runs the job does not terminate (I don't have a reference to 
the job and cannot cancel it). The job needs to start after the streams are 
created and run in parallel to the validation logic. Once the expected results 
are in, the driver exits and the job/cluster is terminated from the script, 
prior to terminating Kinesalite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tweise commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
tweise commented on a change in pull request #7896: [FLINK-9007] [kinesis] 
[e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264372875
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test driver for {@link KinesisExample#main}.
+ */
+public class KinesisExampleTest {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisExampleTest.class);
+
+   /**
+* Interface to the pubsub system for this test.
+*/
+   interface PubsubClient {
+   void createTopic(String topic, int partitions, Properties 
props) throws Exception;
+
+   void sendMessage(String topic, String msg);
+
+   List readAllMessages(String streamName) throws 
Exception;
+   }
+
+   public static void main(String[] args) throws Exception {
+   LOG.info("System properties: {}", System.getProperties());
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+   String inputStream = parameterTool.getRequired("input-stream");
+   String outputStream = 
parameterTool.getRequired("output-stream");
+
+   PubsubClient pubsub = new 
KinesisPubsubClient(parameterTool.getProperties());
+   pubsub.createTopic(inputStream, 2, 
parameterTool.getProperties());
+   pubsub.createTopic(outputStream, 2, 
parameterTool.getProperties());
+
+   // job and test driver need to run in parallel
+   final AtomicReference executeException = new 
AtomicReference<>();
+   Thread executeThread =
+   new Thread(
+   () -> {
+   try {
+   KinesisExample.main(args);
+   LOG.info("executed program");
+   } catch (Exception e) {
+   executeException.set(e);
+   }
+   });
+   executeThread.start();
 
 Review comment:
   The thread that runs the job does not terminate (I don't have a reference to 
the job and cannot cancel it). The job needs to start after the streams are 
created and run in parallel to the validation logic. Once the expected results 
are in, the driver exits and the job is terminated from the script.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
Myasuka commented on a change in pull request #7942: [FLINK-11696][checkpoint] 
Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264365071
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   Sounds good and thanks for your review and suggestions, already created the 
related JIRA [FLINK-11874](https://issues.apache.org/jira/browse/FLINK-11874)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11874) [checkpoint] Split CheckpointStorage interface to distinguish JM and TM side

2019-03-11 Thread Yun Tang (JIRA)
Yun Tang created FLINK-11874:


 Summary: [checkpoint] Split CheckpointStorage interface to 
distinguish JM and TM side
 Key: FLINK-11874
 URL: https://issues.apache.org/jira/browse/FLINK-11874
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.9.0


Currently, interface {{CheckpointStorage}} mixed JM and TM side, which would be 
confusing for users and developers to distinguish them. 

Take [FLINK-11696|https://issues.apache.org/jira/browse/FLINK-11696] as an 
example, the directories should only be created once from JM side. However, 
since we mixed the JM and TM side, TMs would also create directories again.

We could let interface {{CheckpointStorage}} only has two methods:

{code:java}
CheckpointStreamFactory resolveCheckpointStorageLocation(
long checkpointId,
CheckpointStorageLocationReference reference)

CheckpointStateOutputStream createTaskOwnedStateStream()
{code}

And a new interface {{ChekcpointCoordinatorStorgae}} could be introduced 
extending {{CheckpointStorage}} and have methods below:

{code:java}
boolean supportsHighlyAvailableStorage()

boolean hasDefaultSavepointLocation()

CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer)

CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId)

CheckpointStorageLocation initializeLocationForSavepoint(
long checkpointId,
@Nullable String externalLocationPointer)
{code}

With this refactor, JM would only use cast {{ChekcpointCoordinatorStorgae}} so 
that we could shield TMs from accidentally calling unexpected methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264356853
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   Yes, that sounds like a good split and would exactly help with that problem.
   The best way to handle this PR-wise would be:
   - create a JIRA for splitting the interface into something like and JM/TM 
side.
   - open PR for that change.
   - have this PR rebased, based on the interface split-PR.
   
   We should keep the cost/benefit ratio in mind. If you think rebasing would 
take you a lot of time (I think it should not) then just do it in this PR. 
Otherwise, I suggest the above strategy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
Myasuka commented on a change in pull request #7942: [FLINK-11696][checkpoint] 
Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264354794
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   I think a new interface `ChekcpointCoordinatorStorgae` extending 
`ChckecpointStorgae` with `#supportsHighlyAvailableStorage` ,  
`#hasDefaultSavepointLocation`, `#resolveCheckpoint`, 
`#initializeLocationForCheckpoint` and `#initializeLocationForSavepoint` 
methods could be introduced, while previous `CheckpointStorage` would only have 
two methods: `#resolveCheckpointStorageLocation` and 
`#createTaskOwnedStateStream`.
   
   In JM side, we could cast the created storage as 
`ChekcpointCoordinatorStorgae` so that we could have those methods while TM 
side would not call those methods accidentally.
   
   I think this would be clearer for users to distinguish checkpoint storage on 
different sides. However, I think this is more like a interface refactor which 
contains more than just "avoid to send mkdirs from task sides". Shall we also 
include this refactor in this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7896: [FLINK-9007] 
[kinesis] [e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264353858
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
 ##
 @@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisPubsubClient.class);
+
+   private final AmazonKinesis kinesisClient;
+   private final Properties properties;
+
+   KinesisPubsubClient(Properties properties) {
+   this.kinesisClient = createClientWithCredentials(properties);
+   this.properties = properties;
+   }
+
+   @Override
+   public void createTopic(String stream, int shards, Properties props) 
throws Exception {
+   try {
+   kinesisClient.describeStream(stream);
+   kinesisClient.deleteStream(stream);
+   } catch (ResourceNotFoundException rnfe) {
+   // swallow
 
 Review comment:
   Wouldn't it be more helpful to somebody running the test if problems appear 
in the log, to find a starting point for debugging?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7896: [FLINK-9007] 
[kinesis] [e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264353858
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
 ##
 @@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisPubsubClient.class);
+
+   private final AmazonKinesis kinesisClient;
+   private final Properties properties;
+
+   KinesisPubsubClient(Properties properties) {
+   this.kinesisClient = createClientWithCredentials(properties);
+   this.properties = properties;
+   }
+
+   @Override
+   public void createTopic(String stream, int shards, Properties props) 
throws Exception {
+   try {
+   kinesisClient.describeStream(stream);
+   kinesisClient.deleteStream(stream);
+   } catch (ResourceNotFoundException rnfe) {
+   // swallow
 
 Review comment:
   Wouldn't it be more helpful to somebody running the test if problems appear 
in the log, to find a starting point for debugging?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7896: [FLINK-9007] 
[kinesis] [e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264353408
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test driver for {@link KinesisExample#main}.
+ */
+public class KinesisExampleTest {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisExampleTest.class);
+
+   /**
+* Interface to the pubsub system for this test.
+*/
+   interface PubsubClient {
+   void createTopic(String topic, int partitions, Properties 
props) throws Exception;
+
+   void sendMessage(String topic, String msg);
+
+   List readAllMessages(String streamName) throws 
Exception;
+   }
+
+   public static void main(String[] args) throws Exception {
+   LOG.info("System properties: {}", System.getProperties());
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+   String inputStream = parameterTool.getRequired("input-stream");
+   String outputStream = 
parameterTool.getRequired("output-stream");
+
+   PubsubClient pubsub = new 
KinesisPubsubClient(parameterTool.getProperties());
+   pubsub.createTopic(inputStream, 2, 
parameterTool.getProperties());
+   pubsub.createTopic(outputStream, 2, 
parameterTool.getProperties());
+
+   // job and test driver need to run in parallel
+   final AtomicReference executeException = new 
AtomicReference<>();
+   Thread executeThread =
+   new Thread(
+   () -> {
+   try {
+   KinesisExample.main(args);
+   LOG.info("executed program");
+   } catch (Exception e) {
+   executeException.set(e);
+   }
+   });
+   executeThread.start();
 
 Review comment:
   If it is possible before the loop, we can also check the exception status 
just once right there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7896: [FLINK-9007] 
[kinesis] [e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264353073
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
+import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
+import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
+import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A simple example that shows how to read from and write to Kinesis. This 
will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group 
by some key, and finally
+ * perform a rolling addition on each key for which the results are written 
back to another topic.
+ *
+ * This example also demonstrates using a watermark assigner to generate 
per-partition
+ * watermarks directly in the Flink Kinesis consumer. For demonstration 
purposes, it is assumed that
+ * the String messages formatted as a (word,frequency,timestamp) tuple.
+ *
+ * Example usage:
+ * --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class KinesisExample {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisExample.class);
+
+   /**
+* Interface to the pubsub system for this test.
+*/
+   interface PubsubClient {
+   void createTopic(String topic, int partitions, Properties 
props) throws Exception;
+
+   void sendMessage(String topic, String msg);
+
+   List readAllMessages(String streamName) throws 
Exception;
+   }
+
+   public static void main(String[] args) throws Exception {
+   LOG.info("System properties: {}", System.getProperties());
+   // parse input arguments
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+   StreamExecutionEnvironment env = 
KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+   String inputStream = parameterTool.getRequired("input-stream");
+   String outputStream = 
parameterTool.getRequired("output-stream");
+
+   PubsubClient pubsub = new 
KinesisPubsubClient(parameterTool.getProperties());
+   pubsub.createTopic(inputStream, 2, 
parameterTool.getProperties());
+   pubsub.createTopic(outputStream, 2, 
parameterTool.getProperties());
+
+   FlinkKinesisConsumer consumer = new 
FlinkKinesisConsumer<>(
+   inputStream,
+   new KafkaEventSchema(),
+   parameterTool.getProperties());
+   consumer.setPeriodicWatermarkAssigner(new 
CustomWatermarkExtractor());
+
+   DataStream input = env
+   .addSource(consumer)
+   .keyBy("word")
+   .map(new RollingAdditionMapper());
+
+   Properties producerProperties = new 
Properties(parameterTool.getProperties());
+   // needs region event when URL is specified
+   

[GitHub] [flink] StefanRRichter commented on a change in pull request #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7896: [FLINK-9007] 
[kinesis] [e2e] Add Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/7896#discussion_r264350717
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test driver for {@link KinesisExample#main}.
+ */
+public class KinesisExampleTest {
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisExampleTest.class);
+
+   /**
+* Interface to the pubsub system for this test.
+*/
+   interface PubsubClient {
+   void createTopic(String topic, int partitions, Properties 
props) throws Exception;
+
+   void sendMessage(String topic, String msg);
+
+   List readAllMessages(String streamName) throws 
Exception;
+   }
+
+   public static void main(String[] args) throws Exception {
+   LOG.info("System properties: {}", System.getProperties());
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+   String inputStream = parameterTool.getRequired("input-stream");
+   String outputStream = 
parameterTool.getRequired("output-stream");
+
+   PubsubClient pubsub = new 
KinesisPubsubClient(parameterTool.getProperties());
+   pubsub.createTopic(inputStream, 2, 
parameterTool.getProperties());
+   pubsub.createTopic(outputStream, 2, 
parameterTool.getProperties());
+
+   // job and test driver need to run in parallel
+   final AtomicReference executeException = new 
AtomicReference<>();
+   Thread executeThread =
+   new Thread(
+   () -> {
+   try {
+   KinesisExample.main(args);
+   LOG.info("executed program");
+   } catch (Exception e) {
+   executeException.set(e);
+   }
+   });
+   executeThread.start();
 
 Review comment:
   I would suggest to also join on this thread somewhere later, maybe before we 
enter the readMessages part (assuming the generator will not block, if it does 
join can still happen somewhere later).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tweise commented on issue #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-11 Thread GitBox
tweise commented on issue #7896: [FLINK-9007] [kinesis] [e2e] Add Kinesis 
end-to-end test
URL: https://github.com/apache/flink/pull/7896#issuecomment-471639057
 
 
   @StefanRRichter @tillrohrmann PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2019-03-11 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16789755#comment-16789755
 ] 

Yun Tang commented on FLINK-8871:
-

We already introduce {{notifyCheckpointAbort}} which is like current 
{{notifyCheckpointComplete}} in our internal private Flink. What's more, we 
also introduce mechanism to cancel checkpoints in {{StreamTask}}. I think with 
two above mechanisms, we could at least fix this issue.

Plan to add a document with more details recently.

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264335302
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   Furthermore, I think we could also consider splitting the interface into one 
that is JM side, owned by checkpoint coordinator and has the init lifecycle 
methods, and one that only has the methods relevant for TM. so the JM side 
extends the TM side interface and his could shield TMs from accidentally 
calling init methods. Would that make sense?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264335302
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   Furthermore, I think we could also consider splitting the interface into one 
that is JM side, owned by checkpoint coordinator and has the init lifecycle 
methods, and one that only has the methods relevant for TM. Implementations 
inherit both, but his could shield TMs from accidentally calling init methods. 
Would that make sense?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264333733
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   I think it would be reasonable to add a new method. As for the names, maybe 
something like a `initializeLocation` and `initializeLocationForCheckpoint` -- 
meaning (as we can explain in the docs):  one method initializes that 
initializes the location _in general_ and one method that initializes things 
for a specific checkpoint x - like a subfolder. So shared directories etc can 
go to general init, and the checkpoint specific folder goes to the existing 
method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264333733
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   I think it would be reasonable to add a new method. As for the names, maybe 
something like a `initializeLocation` and `initializeLocationForCheckpoint` -- 
meaning (as we can explain in the docs):  one method initializes that 
initializes the location _in general_ and one method that initializes things 
for a specific checkpoint x. So shared directories etc can go to general init, 
and the checkpoint specific folder goes to the existing method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11873) Flink Document has an error

2019-03-11 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16789757#comment-16789757
 ] 

Dawid Wysakowicz commented on FLINK-11873:
--

This is because the release branch for 1.8 was already cut. Therefore the 
version on master is 1.9-SNAPSHOT already and at the same time 1.8 was not yet 
released. That's why latest stable is 1.7, whereas current snapshot is already 
1.9.

I think this does make sense during that transition period, therefore I will 
close this issue, but feel free to open it if you disagree.

> Flink Document has an error
> ---
>
> Key: FLINK-11873
> URL: https://issues.apache.org/jira/browse/FLINK-11873
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: AT-Fieldless
>Priority: Minor
>  Labels: documentation
>
> The Documentation has a drop-down box and there are two options.When I click 
> 1.8(Snapshot) option, the new page shows its version 1.9-SNAPSHOT.
> I think it might be an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11873) Flink Document has an error

2019-03-11 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-11873.

Resolution: Won't Fix

> Flink Document has an error
> ---
>
> Key: FLINK-11873
> URL: https://issues.apache.org/jira/browse/FLINK-11873
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: AT-Fieldless
>Priority: Minor
>  Labels: documentation
>
> The Documentation has a drop-down box and there are two options.When I click 
> 1.8(Snapshot) option, the new page shows its version 1.9-SNAPSHOT.
> I think it might be an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter merged pull request #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed

2019-03-11 Thread GitBox
StefanRRichter merged pull request #7943: [FLINK-11861][tests] Fix 
JobMasterTriggerSavepointIT not executed
URL: https://github.com/apache/flink/pull/7943
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on issue #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed

2019-03-11 Thread GitBox
StefanRRichter commented on issue #7943: [FLINK-11861][tests] Fix 
JobMasterTriggerSavepointIT not executed
URL: https://github.com/apache/flink/pull/7943#issuecomment-471625300
 
 
   Thanks for fixing this. LGTM   Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
Myasuka commented on a change in pull request #7942: [FLINK-11696][checkpoint] 
Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264328294
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   At first, I planed to add a new initialization method to create those 
directories. However, I found current `initializeLocationForCheckpoint` method 
already has some initialization semantics. I think since all checkpoints would 
first start from checkpoint coordinator, and no clear annotation for 
`CheckpointStorage` I'm not sure adding a new method would whether break the 
compatibility.
   
   For open-close-principle, I think it would be better to just adding an 
initialization method. And to be honest, I would not find any benefits in lazy 
initialization.
   
   If adding a new method, do you have any suggestion about the method name to 
distinguish from current `initializeLocationForCheckpoint`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys merged pull request #7954: [FLINK-11420][datastream][bp-1.7] Fix duplicate and createInstance methods of CoGroupedStreams.UnionSerializer

2019-03-11 Thread GitBox
dawidwys merged pull request #7954:  [FLINK-11420][datastream][bp-1.7] Fix 
duplicate and createInstance methods of CoGroupedStreams.UnionSerializer
URL: https://github.com/apache/flink/pull/7954
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys merged pull request #7953: [FLINK-11420][datastream][bp-1.8] Fix duplicate and createInstance methods of CoGroupedStreams.UnionSerializer

2019-03-11 Thread GitBox
dawidwys merged pull request #7953:  [FLINK-11420][datastream][bp-1.8] Fix 
duplicate and createInstance methods of CoGroupedStreams.UnionSerializer
URL: https://github.com/apache/flink/pull/7953
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys merged pull request #7951: [FLINK-11420][datastream] Fix duplicate and createInstance methods of CoGroupedStreams.UnionSerializer

2019-03-11 Thread GitBox
dawidwys merged pull request #7951:  [FLINK-11420][datastream] Fix duplicate 
and createInstance methods of CoGroupedStreams.UnionSerializer
URL: https://github.com/apache/flink/pull/7951
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on issue #7951: [FLINK-11420][datastream] Fix duplicate and createInstance methods of CoGroupedStreams.UnionSerializer

2019-03-11 Thread GitBox
dawidwys commented on issue #7951:  [FLINK-11420][datastream] Fix duplicate and 
createInstance methods of CoGroupedStreams.UnionSerializer
URL: https://github.com/apache/flink/pull/7951#issuecomment-471621638
 
 
   Thx @StefanRRichter for the review. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode

2019-03-11 Thread GitBox
pnowojski commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r264314846
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
public void emit(T record) throws IOException, InterruptedException {
broadcastEmit(record);
}
+
+   @Override
+   public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   Ok, I see the problem now with `randomTriggered`. I don't like it, but also 
I don't have anything better to suggest :( So let it be.
   
   > After abstracting the RecordWriter to separate two implementations, the 
changes seem more complex
   
   true. The alternate solution would be just dump the base class, but I don't 
think that it would be better. 
   
   > I am not sure you could accept the way of magic 0
   
   I noticed it before and again I'm not a big fan of it, but I can not find a 
better solution.
   
   > think my first version seems more easy to follow
   
   I'm not sure. It had less code/changes. Issue is that in that previous 
version, `BroadcastRecordWriter` was still using array of `BufferBuilder`s and 
this is also confusing and probably less performant. If you could run network 
benchmarks showing that both versions are equally performant I would be ok-ish 
with either one.
   
   I was expecting this to be a simple couple of lines change, but clearly I 
have missed judged it :( 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264283851
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
 
 Review comment:
   If you want to minimize calls, this one should not even be needed because it 
would simply happen as part of the `fileSystem.mkdirs(checkpointDir);` below.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264286029
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -78,11 +81,6 @@ public FsCheckpointStorage(
this.sharedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_SHARED_STATE_DIR);
this.taskOwnedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_TASK_OWNED_STATE_DIR);
this.fileSizeThreshold = fileSizeThreshold;
 
 Review comment:
   I noticed that there are more methods here with unnecessary `throws 
IOException` in this class. If you want you could remove them in a hotfix 
commit and make that part of the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264287365
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
 ##
 @@ -117,6 +122,11 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
checkState(fileSystem != null);
 
+   if (!isDirectoryCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
 
 Review comment:
   My comments/question about lazy initialization also apply to this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264280589
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -78,11 +81,6 @@ public FsCheckpointStorage(
this.sharedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_SHARED_STATE_DIR);
this.taskOwnedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_TASK_OWNED_STATE_DIR);
this.fileSizeThreshold = fileSizeThreshold;
 
 Review comment:
   You could remove the `throws IOException` from this constructor. It is no 
longer needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264285576
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
 ##
 @@ -99,6 +100,10 @@ public int getMaxStateSize() {
return maxStateSize;
}
 
+   public Path getCheckpointsDirectory() {
 
 Review comment:
   This can be package-private and could have the `@VisibleForTesting` 
annotation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264288010
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
 ##
 @@ -117,6 +122,11 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
checkState(fileSystem != null);
 
+   if (!isDirectoryCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
 
 Review comment:
   Comments about some unnecessary IOExceptions apply to some methods in the 
class as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-11 Thread GitBox
StefanRRichter commented on a change in pull request #7942: 
[FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#discussion_r264284886
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -107,6 +105,13 @@ public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpoint
// prepare all the paths needed for the checkpoints
final Path checkpointDir = 
createCheckpointDirectory(checkpointsDirectory, checkpointId);
 
+   if (!areDirectoriesCreated) {
+   fileSystem.mkdirs(checkpointsDirectory);
+   fileSystem.mkdirs(sharedStateDirectory);
 
 Review comment:
   Now that only shared and taskowned would be left, I wonder if it would not 
be cleaner to just create them in an init method that is only called on the 
JM/checkpoint coordinator side. Do you think there is still benefit in true 
lazyness, like only create the dirs when the first checkpoint is actually 
triggered. Or is it sufficient if not all TMs will also make those calls? From 
code simplicity, it would be better if we can avoid any lazy initialization 
flags.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11873) Flink Document has an error

2019-03-11 Thread AT-Fieldless (JIRA)
AT-Fieldless created FLINK-11873:


 Summary: Flink Document has an error
 Key: FLINK-11873
 URL: https://issues.apache.org/jira/browse/FLINK-11873
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: AT-Fieldless


The Documentation has a drop-down box and there are two options.When I click 
1.8(Snapshot) option, the new page shows its version 1.9-SNAPSHOT.

I think it might be an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >