[jira] [Closed] (FLINK-3758) Add possibility to register accumulators in custom triggers

2016-06-15 Thread Konstantin Knauf (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf closed FLINK-3758.
---
Resolution: Resolved

> Add possibility to register accumulators in custom triggers
> ---
>
> Key: FLINK-3758
> URL: https://issues.apache.org/jira/browse/FLINK-3758
> Project: Flink
>  Issue Type: Improvement
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>
> For monitoring purposes it would be nice to be able to to use accumulators in 
> custom trigger functions. 
> Basically, the trigger context could just expose {{getAccumulator}} of 
> {{RuntimeContext}} or does this create problems I am not aware of?
> Adding accumulators in a trigger function is more difficult, I think, but 
> that's not really neccessary as the accummulator could just be added in some 
> other upstream operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers

2016-06-15 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333144#comment-15333144
 ] 

Konstantin Knauf commented on FLINK-3758:
-

Sure. I see you alreaday open a PR for it. Great.

> Add possibility to register accumulators in custom triggers
> ---
>
> Key: FLINK-3758
> URL: https://issues.apache.org/jira/browse/FLINK-3758
> Project: Flink
>  Issue Type: Improvement
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>
> For monitoring purposes it would be nice to be able to to use accumulators in 
> custom trigger functions. 
> Basically, the trigger context could just expose {{getAccumulator}} of 
> {{RuntimeContext}} or does this create problems I am not aware of?
> Adding accumulators in a trigger function is more difficult, I think, but 
> that's not really neccessary as the accummulator could just be added in some 
> other upstream operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()

2016-06-15 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3734:
--
Description: 
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}
DataInputView in is not closed upon return.

  was:
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}

DataInputView in is not closed upon return.


> Unclosed DataInputView in 
> AbstractAlignedProcessingTimeWindowOperator#restoreState()
> 
>
> Key: FLINK-3734
> URL: https://issues.apache.org/jira/browse/FLINK-3734
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataInputView in = inputState.getState(getUserCodeClassloader());
> final long nextEvaluationTime = in.readLong();
> final long nextSlideTime = in.readLong();
> AbstractKeyedTimePanes panes = 
> createPanes(keySelector, function);
> panes.readFromInput(in, keySerializer, stateTypeSerializer);
> restoredState = new RestoredState<>(panes, nextEvaluationTime, 
> nextSlideTime);
>   }
> {code}
> DataInputView in is not closed upon return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3753) KillerWatchDog should not use kill on toKill thread

2016-06-15 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3753:
--
Description: 
{code}
// this is harsh, but this watchdog is a last resort
if (toKill.isAlive()) {
  toKill.stop();
}
{code}

stop() is deprecated.

See:
https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads

  was:
{code}
// this is harsh, but this watchdog is a last resort
if (toKill.isAlive()) {
  toKill.stop();
}
{code}
stop() is deprecated.

See:
https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads


> KillerWatchDog should not use kill on toKill thread
> ---
>
> Key: FLINK-3753
> URL: https://issues.apache.org/jira/browse/FLINK-3753
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> // this is harsh, but this watchdog is a last resort
> if (toKill.isAlive()) {
>   toKill.stop();
> }
> {code}
> stop() is deprecated.
> See:
> https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4053) Return value from Connection should be checked against null

2016-06-15 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4053:
--
Description: 
In RMQSource.java and RMQSink.java, there is code in the following pattern:
{code}
  connection = factory.newConnection();
  channel = connection.createChannel();
{code}
According to 
https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel()
 :

{code}
Returns:
a new channel descriptor, or null if none is available
{code}
The return value should be checked against null.

  was:
In RMQSource.java and RMQSink.java, there is code in the following pattern:
{code}
  connection = factory.newConnection();
  channel = connection.createChannel();
{code}
According to 
https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel()
 :
{code}
Returns:
a new channel descriptor, or null if none is available
{code}
The return value should be checked against null.


> Return value from Connection should be checked against null
> ---
>
> Key: FLINK-4053
> URL: https://issues.apache.org/jira/browse/FLINK-4053
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In RMQSource.java and RMQSink.java, there is code in the following pattern:
> {code}
>   connection = factory.newConnection();
>   channel = connection.createChannel();
> {code}
> According to 
> https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel()
>  :
> {code}
> Returns:
> a new channel descriptor, or null if none is available
> {code}
> The return value should be checked against null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2102: [FLINK-4068] [tableAPI] Move constant computations...

2016-06-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2102#discussion_r67276795
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
 ---
@@ -146,4 +148,21 @@ class SelectITCase(
 tEnv.sql(sqlQuery)
   }
 
+  @Test
+  def testConstantReduce(): Unit = {
--- End diff --

👍  It's a  good idea. I will try it later. And the CI throws `cannot 
translate call AS...` error, I will figure it out today. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257863
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
+// assertEquals(new Double(2.0), result.productElement(2));
+//
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("a test", 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332755#comment-15332755
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257812
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
--- End diff --

Enable after FLINK-3908 was fixed.


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332766#comment-15332766
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1989
  
Hi @fpompermaier, thanks for the PR. The `flink-table` module is almost 
completely implemented in Scala. We do not have good experience with mixed 
Scala / Java modules and I would like to ask if you could port the Java classes 
to Scala. 
I haven't had a detailed look at the test yet.


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>Priority: Minor
>  Labels: csv, null-values, row, tuple
>
> At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that 
> has the big limitation of 25 fields and null handling.
> A new IF producing Row object is indeed necessary to avoid those limitations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1989
  
Hi @fpompermaier, thanks for the PR. The `flink-table` module is almost 
completely implemented in Scala. We do not have good experience with mixed 
Scala / Java modules and I would like to ask if you could port the Java classes 
to Scala. 
I haven't had a detailed look at the test yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332758#comment-15332758
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257863
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
+// 

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257728
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
 ---
@@ -25,6 +25,8 @@ import 
org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, TupleTypeInfo}
 import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
 import org.apache.flink.api.table.Row
 import org.apache.flink.core.fs.Path
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.io.RowCsvInputFormat
 
 /**
   * A [[TableSource]] for simple CSV files with up to 25 fields.
--- End diff --

Remove 25 field limitation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257812
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
--- End diff --

Enable after FLINK-3908 was fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332752#comment-15332752
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257728
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
 ---
@@ -25,6 +25,8 @@ import 
org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, TupleTypeInfo}
 import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
 import org.apache.flink.api.table.Row
 import org.apache.flink.core.fs.Path
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.io.RowCsvInputFormat
 
 /**
   * A [[TableSource]] for simple CSV files with up to 25 fields.
--- End diff --

Remove 25 field limitation


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>Priority: Minor
>  Labels: csv, null-values, row, tuple
>
> At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that 
> has the big limitation of 25 fields and null handling.
> A new IF producing Row object is indeed necessary to avoid those limitations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332750#comment-15332750
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257606
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse 

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257606
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse == null) {
+   reuse = new Row(rowSerializer.getLength());
+   }
+   for (int i = 0; i < parsedValues.length; i++) {
+   reuse.setField(i, parsedValues[i]);
+   }
 

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257499
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse == null) {
+   reuse = new Row(rowSerializer.getLength());
+   }
+   for (int i = 0; i < parsedValues.length; i++) {
+   reuse.setField(i, parsedValues[i]);
+   }
 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332743#comment-15332743
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67257499
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332724#comment-15332724
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67256445
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse 

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67256445
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse == null) {
+   reuse = new Row(rowSerializer.getLength());
+   }
+   for (int i = 0; i < parsedValues.length; i++) {
+   reuse.setField(i, parsedValues[i]);
+   }
 

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67254028
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
--- End diff --

We do not need the `RowSerializer`. It is sufficient to remember the number 
of `Row` fields (`rowTypeInfo.getArity()`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332680#comment-15332680
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67254028
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
--- End diff --

We do not need the `RowSerializer`. It is sufficient to remember the number 
of `Row` fields (`rowTypeInfo.getArity()`)


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67252744
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
--- End diff --

let all constructors (transitively) call this constructor and move the 
functionality of `configure` into this constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332671#comment-15332671
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67252744
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
--- End diff --

let all constructors (transitively) call this constructor and move the 
functionality of `configure` into this constructor.


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>Priority: Minor
>  Labels: csv, null-values, row, tuple
>
> At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that 
> has the big limitation of 25 fields and null handling.
> A new IF producing Row object is indeed necessary to avoid those limitations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-15 Thread eliaslevy
Github user eliaslevy commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67248839
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +314,41 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
+   // flushing is activated: We need to wait until 
pendingRecords is 0
+   while(pendingRecords > 0) {
+   try {
+   Thread.sleep(10);
--- End diff --

Any reason to sleep instead of calling producer.flush() to wait for the 
acks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332616#comment-15332616
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user eliaslevy commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67248839
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +314,41 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
+   // flushing is activated: We need to wait until 
pendingRecords is 0
+   while(pendingRecords > 0) {
+   try {
+   Thread.sleep(10);
--- End diff --

Any reason to sleep instead of calling producer.flush() to wait for the 
acks?


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332598#comment-15332598
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user eliaslevy commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67247830
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -51,10 +54,11 @@
  * Flink Sink to produce data into a Kafka topic.
  *
  * Please note that this producer does not have any reliability guarantees.
+ * The producer implements the checkpointed interface for allowing 
synchronization on checkpoints.
--- End diff --

May want to change:

> note that this producer does not have any reliability guarantees.

to

> note that this producer provides at-least-once reliability guarantees 
when checkpoints are enabled.



> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-15 Thread eliaslevy
Github user eliaslevy commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67247830
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -51,10 +54,11 @@
  * Flink Sink to produce data into a Kafka topic.
  *
  * Please note that this producer does not have any reliability guarantees.
+ * The producer implements the checkpointed interface for allowing 
synchronization on checkpoints.
--- End diff --

May want to change:

> note that this producer does not have any reliability guarantees.

to

> note that this producer provides at-least-once reliability guarantees 
when checkpoints are enabled.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332547#comment-15332547
 ] 

ASF GitHub Bot commented on FLINK-4024:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2100
  
Thanks @fhueske !


> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

2016-06-15 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2100
  
Thanks @fhueske !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332544#comment-15332544
 ] 

ASF GitHub Bot commented on FLINK-4024:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2100
  
Merging


> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332545#comment-15332545
 ] 

ASF GitHub Bot commented on FLINK-3908:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2007
  
Merging


> FieldParsers error state is not reset correctly to NONE
> ---
>
> Key: FLINK-3908
> URL: https://issues.apache.org/jira/browse/FLINK-3908
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: parser
>
> If during the parse of a csv there's a parse error (for example when in a 
> integer column there are non-int values) the errorState is not reset 
> correctly in the next parseField call. A simple fix would be to add as a 
> first statement of the {{parseField()}} function a call to 
> {{setErrorState(ParseErrorState.NONE)}} but it is something that should be 
> handled better (by default) for every subclass of {{FieldParser}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2007: [FLINK-3908] Fixed Parser's error state reset

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2007
  
Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2100
  
Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332538#comment-15332538
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
I think the easiest way is to fork of a branch from the current master and 
cherry-pick all your commits one after the other onto that branch (except for 
the merge commit of course). Then you squash all commits and force push into 
the PR branch to update the PR.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
I think the easiest way is to fork of a branch from the current master and 
cherry-pick all your commits one after the other onto that branch (except for 
the merge commit of course). Then you squash all commits and force push into 
the PR branch to update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-15 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-4027:
-

Assignee: Robert Metzger

> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332450#comment-15332450
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2108

[FLINK-4027] Flush FlinkKafkaProducer on checkpoints

A user on the mailing list raised the point that our Kafka producer can be 
made at-least-once quite easily.
The current producer code doesn't have any guarantees 

We are using the producer's callbacks to account for unacknowledged 
records. When a checkpoint barrier reaches the sink, it will confirm the 
checkpoint once all pending records have been acked.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink4027

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2108.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2108


commit d657ca8be1420a3e73c48bbbf65788fbd0b75c2c
Author: Robert Metzger 
Date:   2016-06-15T15:50:38Z

[FLINK-4027] Flush FlinkKafkaProducer on checkpoints




> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-15 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2108

[FLINK-4027] Flush FlinkKafkaProducer on checkpoints

A user on the mailing list raised the point that our Kafka producer can be 
made at-least-once quite easily.
The current producer code doesn't have any guarantees 

We are using the producer's callbacks to account for unacknowledged 
records. When a checkpoint barrier reaches the sink, it will confirm the 
checkpoint once all pending records have been acked.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink4027

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2108.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2108


commit d657ca8be1420a3e73c48bbbf65788fbd0b75c2c
Author: Robert Metzger 
Date:   2016-06-15T15:50:38Z

[FLINK-4027] Flush FlinkKafkaProducer on checkpoints




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2102: [FLINK-4068] [tableAPI] Move constant computations out of...

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2102
  
This @wuchong, your approach looks good. I also found that the 
`ReduceExpressionRules` had no effect due to the missing `RexExecutor`. 

However, it seems that several tests of `ExpressionITCase` are failing with 
this change. You can verify that the PR does not break the build by locally 
running `mvn clean install`.

In addition, the added test should be changed as sketched in the comment. 
Please let me know if you have questions.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2102: [FLINK-4068] [tableAPI] Move constant computations...

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2102#discussion_r67233943
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
 ---
@@ -146,4 +148,21 @@ class SelectITCase(
 tEnv.sql(sqlQuery)
   }
 
+  @Test
+  def testConstantReduce(): Unit = {
--- End diff --

I think the assertion of this this test should not check the name of the 
generate Flink operator.
Instead I propose the following:
- split the `translate()` method into an `optimize()` method that generates 
the optimized `RelNode` tree and a `translate()` method that translates into a 
`DataSet` / `DataStream` program.
- make the `optimize()` method `private[flink]` and therefore accessible 
from a unit test
- add a `BatchTableEnvironmentTest` and a `StreamTableEnvironmentTest` 
which check that the optimized `RelNode` tree contains reduced expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332427#comment-15332427
 ] 

ASF GitHub Bot commented on FLINK-4068:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2102#discussion_r67233943
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
 ---
@@ -146,4 +148,21 @@ class SelectITCase(
 tEnv.sql(sqlQuery)
   }
 
+  @Test
+  def testConstantReduce(): Unit = {
--- End diff --

I think the assertion of this this test should not check the name of the 
generate Flink operator.
Instead I propose the following:
- split the `translate()` method into an `optimize()` method that generates 
the optimized `RelNode` tree and a `translate()` method that translates into a 
`DataSet` / `DataStream` program.
- make the `optimize()` method `private[flink]` and therefore accessible 
from a unit test
- add a `BatchTableEnvironmentTest` and a `StreamTableEnvironmentTest` 
which check that the optimized `RelNode` tree contains reduced expressions.


> Move constant computations out of code-generated `flatMap` functions.
> -
>
> Key: FLINK-4068
> URL: https://issues.apache.org/jira/browse/FLINK-4068
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The generated functions for expressions of the Table API or SQL include 
> constant computations.
> For instance the code generated for a predicate like:
> {code}
> myInt < (10 + 20)
> {code}
> looks roughly like:
> {code}
> public void flatMap(Row in, Collector out) {
>   Integer in1 = in.productElement(1);
>   int temp = 10 + 20;  
>   if (in1 < temp) {
> out.collect(in)
>   }
> }
> {code}
> In this example the computation of {{temp}} is constant and could be moved 
> out of the {{flatMap()}} method.
> The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3916) Allow generic types passing the Table API

2016-06-15 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332354#comment-15332354
 ] 

Vasia Kalavri commented on FLINK-3916:
--

FLINK-3615 is about extending or refactoring {{sqlTypeToTypeInfo}}. I do not 
recall what kind of trouble it had caused at that time, but it looks like we 
were thinking of refactoring the code to determine return types per operator.

> Allow generic types passing the Table API
> -
>
> Key: FLINK-3916
> URL: https://issues.apache.org/jira/browse/FLINK-3916
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The Table API currently only supports BasicTypes that can pass the Table API. 
> Other types should also be supported but treated as black boxes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
How to remove the merge commit? If I try to remove it I lose all my 
commits. @fhueske - I have updated the PR. Thanks for very sharp eyes like 
seeing the spaces and new lines that were missed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332336#comment-15332336
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
How to remove the merge commit? If I try to remove it I lose all my 
commits. @fhueske - I have updated the PR. Thanks for very sharp eyes like 
seeing the spaces and new lines that were missed. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332251#comment-15332251
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67216793
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
--- End diff --

Ok


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332250#comment-15332250
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67216772
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*/
+  def minBy(fields: Int*) : DataSet[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+reduce(new 
SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
+
+  /**
+* Selects an element with maximum value.
+*
+* The maximum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* maxBy(0)[1, 0]
+* maxBy(1)[0, 1]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* maxBy(0, 1)[0, 1]
+* If multiple values with maximum value at the specified fields exist, 
a random one will be
+* picked
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def maxBy(fields: Int*) : DataSet[T] = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+reduce(new 
SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
--- End diff --

This is very sharp eyes :)


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67216793
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67216772
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*/
+  def minBy(fields: Int*) : DataSet[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+reduce(new 
SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
+
+  /**
+* Selects an element with maximum value.
+*
+* The maximum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* maxBy(0)[1, 0]
+* maxBy(1)[0, 1]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* maxBy(0, 1)[0, 1]
+* If multiple values with maximum value at the specified fields exist, 
a random one will be
+* picked
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def maxBy(fields: Int*) : DataSet[T] = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+reduce(new 
SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
--- End diff --

This is very sharp eyes :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1533#comment-1533
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67214413
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -521,7 +521,7 @@ public long count() throws Exception {
}
 
return new ReduceOperator<>(this, new SelectByMinFunction(
-   (TupleTypeInfo) getType(), fields), 
Utils.getCallLocationName());
--- End diff --

I have already reverted the style change. The only thing is that it was 
reverted on top of the previous commit. How to totally avoid this change from 
appearing from my commit history? Am not sure how to do it.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67214413
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -521,7 +521,7 @@ public long count() throws Exception {
}
 
return new ReduceOperator<>(this, new SelectByMinFunction(
-   (TupleTypeInfo) getType(), fields), 
Utils.getCallLocationName());
--- End diff --

I have already reverted the style change. The only thing is that it was 
reverted on top of the previous commit. How to totally avoid this change from 
appearing from my commit history? Am not sure how to do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4080:
---
Description: 
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix:
1. Use the extended `UserRecord` class in KCL to represent all records (either 
non- or de-aggregated) instead of the basic `Record` class. This gives access 
to whether or not the record was originally aggregated.
2. The sequence number state we are checkpointing needs to be able to indicate 
that the last seen sequence number of a shard may be a de-aggregated shard, 
i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record of the 
5th record was last seen for shard 0. On restore, we start again from record 5 
for shard 0 and skip the first 7 sub-records; however, for shard 1 we start 
from record 3 since record 2 is non-aggregated and already fully processed.

  was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4078) Use ClosureCleaner for CoGroup where

2016-06-15 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-4078:
---
Assignee: Stefan Richter

> Use ClosureCleaner for CoGroup where
> 
>
> Key: FLINK-4078
> URL: https://issues.apache.org/jira/browse/FLINK-4078
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
>Assignee: Stefan Richter
> Fix For: 1.1.0
>
>
> When specifying a key selector in the where clause of a CoGroup, the closure 
> cleaner is not used.
> {code}
> .coGroup(filteredIds)
> .where(new KeySelector() {
> @Override
> public String getKey(T t) throws Exception {
> String s = (String) t.get(fieldName);
> return s != null ? s : UUID.randomUUID().toString();
> }
> })
> {code}
> The problem is that the KeySelector is an anonymous inner class and as such 
> as a reference to the outer object. Normally, this would be rectified by the 
> closure cleaner but the cleaner is not used in CoGroup.where().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-15 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-4079:
--
Fix Version/s: 1.1.0

> YARN properties file used for per-job cluster
> -
>
> Key: FLINK-4079
> URL: https://issues.apache.org/jira/browse/FLINK-4079
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
>Assignee: Maximilian Michels
> Fix For: 1.1.0, 1.0.4
>
>
> YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
> properties file, which defines the container configuration. This can lead to 
> unexpected behaviour, because the per-job-cluster configuration is merged  
> with the YARN properties file (or used as only configuration source).
> A user ran into this as follows:
> - Create a long-lived YARN session with HA (creates a hidden YARN properties 
> file)
> - Submits standalone batch jobs with a per job cluster (flink run -m 
> yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
> because of the properties file.
> [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
> and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-15 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-4079:
--
Component/s: Command-line client

> YARN properties file used for per-job cluster
> -
>
> Key: FLINK-4079
> URL: https://issues.apache.org/jira/browse/FLINK-4079
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
>Assignee: Maximilian Michels
> Fix For: 1.1.0, 1.0.4
>
>
> YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
> properties file, which defines the container configuration. This can lead to 
> unexpected behaviour, because the per-job-cluster configuration is merged  
> with the YARN properties file (or used as only configuration source).
> A user ran into this as follows:
> - Create a long-lived YARN session with HA (creates a hidden YARN properties 
> file)
> - Submits standalone batch jobs with a per job cluster (flink run -m 
> yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
> because of the properties file.
> [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
> and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-15 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reopened FLINK-4079:
---
  Assignee: Maximilian Michels

I see what you mean now :) 

The issue is that the Yarn properties file is loaded regardless of whether "-m 
yarn-cluster" is specified on the command-line. This loads the dynamic 
properties from the Yarn properties file and applies all configuration of the 
running (session) cluster cluster to the to-be-created cluster. This is not 
expected behavior.

> YARN properties file used for per-job cluster
> -
>
> Key: FLINK-4079
> URL: https://issues.apache.org/jira/browse/FLINK-4079
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
>Assignee: Maximilian Michels
> Fix For: 1.1.0, 1.0.4
>
>
> YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
> properties file, which defines the container configuration. This can lead to 
> unexpected behaviour, because the per-job-cluster configuration is merged  
> with the YARN properties file (or used as only configuration source).
> A user ran into this as follows:
> - Create a long-lived YARN session with HA (creates a hidden YARN properties 
> file)
> - Submits standalone batch jobs with a per job cluster (flink run -m 
> yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
> because of the properties file.
> [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
> and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4080:
---
Description: 
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.

  was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current state of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
> records (either non- or de-aggregated) instead of the basic `Record` class. 
> This gives access to whether or not the record was originally aggregated. If 
> we encounter a de-aggregated record, don't update state until we finished 
> processing the last record of the batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4080:
--

 Summary: Kinesis consumer not exactly-once if stopped in the 
middle of processing aggregated records
 Key: FLINK-4080
 URL: https://issues.apache.org/jira/browse/FLINK-4080
 Project: Flink
  Issue Type: Sub-task
  Components: Kinesis Connector, Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Priority: Critical
 Fix For: 1.1.0


I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current state of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-15 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332032#comment-15332032
 ] 

Maximilian Michels commented on FLINK-4079:
---

Exactly, the yarn properties file is a means to easily submit jobs against a 
long-running Flink cluster (so called yarn session). Actually, it is the only 
proper way at the moment besides figuring out the JobManager location manually 
and essentially treating the Yarn cluster as a Standalone cluster.

I would also prefer to get rid of the properties file and only use the yarn 
application id instead. We could probably do that for one of the next releases 
but it would be a breaking change for the 1.1 release.

> YARN properties file used for per-job cluster
> -
>
> Key: FLINK-4079
> URL: https://issues.apache.org/jira/browse/FLINK-4079
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
> Fix For: 1.0.4
>
>
> YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
> properties file, which defines the container configuration. This can lead to 
> unexpected behaviour, because the per-job-cluster configuration is merged  
> with the YARN properties file (or used as only configuration source).
> A user ran into this as follows:
> - Create a long-lived YARN session with HA (creates a hidden YARN properties 
> file)
> - Submits standalone batch jobs with a per job cluster (flink run -m 
> yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
> because of the properties file.
> [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
> and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4077) Register Pojo DataSet/DataStream as Table requires alias expression.

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332025#comment-15332025
 ] 

ASF GitHub Bot commented on FLINK-4077:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/2107

[FLINK-4077] Register Pojo DataSet/DataStream as Table with field references

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tablePojoInput

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2107.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2107


commit 7e394b4097af95bc2ddfd7c33a69ba3188b5e17b
Author: Fabian Hueske 
Date:   2016-06-15T15:12:10Z

[FLINK-4077] Register Pojo DataSet/DataStream as Table with field 
references.




> Register Pojo DataSet/DataStream as Table requires alias expression.
> 
>
> Key: FLINK-4077
> URL: https://issues.apache.org/jira/browse/FLINK-4077
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.1.0
>
>
> Registering a Pojo DataSet / DataStream as Table requires alias expressions 
> and does not work with simple field references. However, alias expressions 
> would only be necessary if the fields of the Pojo should be renamed. 
> {code}
> DataStream persons = ...
> // DOES NOT WORK
> tEnv.registerDataStream(
>   "Persons", 
>   persons, 
>   "name, age, address");
> // DOES WORK
> tEnv.registerDataStream(
>   "Persons", 
>   persons, 
>   "name AS name, age AS age, address AS address");
> {code}
> We should also allow simple field name references in addition to alias 
> expressions to rename fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2107: [FLINK-4077] Register Pojo DataSet/DataStream as T...

2016-06-15 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/2107

[FLINK-4077] Register Pojo DataSet/DataStream as Table with field references

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tablePojoInput

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2107.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2107


commit 7e394b4097af95bc2ddfd7c33a69ba3188b5e17b
Author: Fabian Hueske 
Date:   2016-06-15T15:12:10Z

[FLINK-4077] Register Pojo DataSet/DataStream as Table with field 
references.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-15 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332002#comment-15332002
 ] 

Ufuk Celebi commented on FLINK-4079:


I understand where you are coming from, but it seems very weird to me that the 
behaviour I describe can happen (where the first started long running Flink 
session is actually still running, so the warning you mention would not be 
shown).

I always had the mental model of "flink/bin bun -m yarn-cluster" starts a 
cluster for this job only, which is not the case if we re-use the properties 
file. I think that the properties file is mainly a concern for the long-lived 
YARN session, where cluster start and job submission are not tied together.

> YARN properties file used for per-job cluster
> -
>
> Key: FLINK-4079
> URL: https://issues.apache.org/jira/browse/FLINK-4079
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
> Fix For: 1.0.4
>
>
> YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
> properties file, which defines the container configuration. This can lead to 
> unexpected behaviour, because the per-job-cluster configuration is merged  
> with the YARN properties file (or used as only configuration source).
> A user ran into this as follows:
> - Create a long-lived YARN session with HA (creates a hidden YARN properties 
> file)
> - Submits standalone batch jobs with a per job cluster (flink run -m 
> yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
> because of the properties file.
> [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
> and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-15 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-4079.
---
Resolution: Not A Problem

This is a feature :) The Yarn properties file is supposed to be picked up. We 
can't break this behavior because users rely on it.

My changes will give an appropriate error message if the properties file 
configuration doesn't correspond to a running application in the Yarn cluster. 

> YARN properties file used for per-job cluster
> -
>
> Key: FLINK-4079
> URL: https://issues.apache.org/jira/browse/FLINK-4079
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
> Fix For: 1.0.4
>
>
> YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
> properties file, which defines the container configuration. This can lead to 
> unexpected behaviour, because the per-job-cluster configuration is merged  
> with the YARN properties file (or used as only configuration source).
> A user ran into this as follows:
> - Create a long-lived YARN session with HA (creates a hidden YARN properties 
> file)
> - Submits standalone batch jobs with a per job cluster (flink run -m 
> yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
> because of the properties file.
> [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
> and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:14 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted. Maybe 
there is some cleaner way of introducing a real life cycle for ExecutionGraphs?

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed into the 
ExecutionGraph implementations. The web interface needs to be changed to 
extract the information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed into the 
ExecutionGraph implementations. The web interface needs to be changed to 
extract the information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:13 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed into the 
ExecutionGraph implementations. The web interface needs to be changed to 
extract the information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:12 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:11 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter commented on FLINK-4037:
---

I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-15 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-4079:
--

 Summary: YARN properties file used for per-job cluster
 Key: FLINK-4079
 URL: https://issues.apache.org/jira/browse/FLINK-4079
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.3
Reporter: Ufuk Celebi
 Fix For: 1.0.4


YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
properties file, which defines the container configuration. This can lead to 
unexpected behaviour, because the per-job-cluster configuration is merged  with 
the YARN properties file (or used as only configuration source).

A user ran into this as follows:
- Create a long-lived YARN session with HA (creates a hidden YARN properties 
file)
- Submits standalone batch jobs with a per job cluster (flink run -m 
yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
because of the properties file.

[~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3105) Submission in per job YARN cluster mode reuses properties file of long-lived session

2016-06-15 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-3105.
---
Resolution: Not A Problem

This is actually the expected behavior.

> Submission in per job YARN cluster mode reuses properties file of long-lived 
> session
> 
>
> Key: FLINK-3105
> URL: https://issues.apache.org/jira/browse/FLINK-3105
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 0.10.1
>Reporter: Ufuk Celebi
>
> Starting a YARN session with `bin/yarn-session.sh` creates a properties file, 
> which is used to parse job manager information when submitting jobs.
> This properties file is also used when submitting a job with {{bin/flink run 
> -m yarn-cluster}}. The {{yarn-cluster}} mode should actually start a new YARN 
> session.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4078) Use ClosureCleaner for CoGroup where

2016-06-15 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-4078:
--

 Summary: Use ClosureCleaner for CoGroup where
 Key: FLINK-4078
 URL: https://issues.apache.org/jira/browse/FLINK-4078
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.3
Reporter: Ufuk Celebi
 Fix For: 1.1.0


When specifying a key selector in the where clause of a CoGroup, the closure 
cleaner is not used.

{code}
.coGroup(filteredIds)
.where(new KeySelector() {
@Override
public String getKey(T t) throws Exception {
String s = (String) t.get(fieldName);
return s != null ? s : UUID.randomUUID().toString();
}
})
{code}

The problem is that the KeySelector is an anonymous inner class and as such as 
a reference to the outer object. Normally, this would be rectified by the 
closure cleaner but the cleaner is not used in CoGroup.where().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331894#comment-15331894
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2085
  
Rebased to the latest master and refined the tests.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-15 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2085
  
Rebased to the latest master and refined the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4077) Register Pojo DataSet/DataStream as Table requires alias expression.

2016-06-15 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4077:


 Summary: Register Pojo DataSet/DataStream as Table requires alias 
expression.
 Key: FLINK-4077
 URL: https://issues.apache.org/jira/browse/FLINK-4077
 Project: Flink
  Issue Type: Bug
  Components: Table API
Affects Versions: 1.1.0
Reporter: Fabian Hueske
Assignee: Fabian Hueske
 Fix For: 1.1.0


Registering a Pojo DataSet / DataStream as Table requires alias expressions and 
does not work with simple field references. However, alias expressions would 
only be necessary if the fields of the Pojo should be renamed. 

{code}
DataStream persons = ...

// DOES NOT WORK
tEnv.registerDataStream(
  "Persons", 
  persons, 
  "name, age, address");

// DOES WORK
tEnv.registerDataStream(
  "Persons", 
  persons, 
  "name AS name, age AS age, address AS address");
{code}

We should also allow simple field name references in addition to alias 
expressions to rename fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331882#comment-15331882
 ] 

ASF GitHub Bot commented on FLINK-4063:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2106
  
+1



> Add Metrics Support for Triggers
> 
>
> Key: FLINK-4063
> URL: https://issues.apache.org/jira/browse/FLINK-4063
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Now that we have proper support for metrics we should also add a hook that 
> allows triggers to report metrics.
> This supersedes FLINK-3758 which was about using accumulators for metrics in 
> triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers

2016-06-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2106
  
+1



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331880#comment-15331880
 ] 

ASF GitHub Bot commented on FLINK-4063:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2106
  
@zentol updated, this is super minimal now.


> Add Metrics Support for Triggers
> 
>
> Key: FLINK-4063
> URL: https://issues.apache.org/jira/browse/FLINK-4063
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Now that we have proper support for metrics we should also add a hook that 
> allows triggers to report metrics.
> This supersedes FLINK-3758 which was about using accumulators for metrics in 
> triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers

2016-06-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2106
  
@zentol updated, this is super minimal now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()

2016-06-15 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331841#comment-15331841
 ] 

Aljoscha Krettek commented on FLINK-4076:
-

+1 this is correct

> BoltWrapper#dispose() should call AbstractStreamOperator#dispose()
> --
>
> Key: FLINK-4076
> URL: https://issues.apache.org/jira/browse/FLINK-4076
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   @Override
>   public void dispose() {
> this.bolt.cleanup();
>   }
> {code}
> AbstractStreamOperator#dispose() should be called first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331804#comment-15331804
 ] 

ASF GitHub Bot commented on FLINK-4063:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2106
  
Ok, good to know. I'll change it.  


> Add Metrics Support for Triggers
> 
>
> Key: FLINK-4063
> URL: https://issues.apache.org/jira/browse/FLINK-4063
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Now that we have proper support for metrics we should also add a hook that 
> allows triggers to report metrics.
> This supersedes FLINK-3758 which was about using accumulators for metrics in 
> triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers

2016-06-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2106
  
Ok, good to know. I'll change it. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331788#comment-15331788
 ] 

ASF GitHub Bot commented on FLINK-4063:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2106
  
well it would work, but i think we can improve it a little. :)

The AbstractStreamOperator contains a protected MetricGroup field which you 
can use in the WindowOperator instead of going through the RuntimeContext. You 
can access it directly or via getMetricGroup().

Also, you create 2 groups with a constant name. This means that all metrics 
that are registered in a trigger now contain "WindowOperator.Trigger" in their 
final metric name.

Now the Trigger group is probably _fine_, but is now also mandatory. I 
would prefer removing it; if users want to group their metrics by Trigger they 
can easily do so themselves.

The WindowOperator group should be removed. It is inconsistent with other 
operators and provides no really specific information. It will usually be 
redundant since the operator name often contains the String "window" somewhere.



> Add Metrics Support for Triggers
> 
>
> Key: FLINK-4063
> URL: https://issues.apache.org/jira/browse/FLINK-4063
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Now that we have proper support for metrics we should also add a hook that 
> allows triggers to report metrics.
> This supersedes FLINK-3758 which was about using accumulators for metrics in 
> triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers

2016-06-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2106
  
well it would work, but i think we can improve it a little. :)

The AbstractStreamOperator contains a protected MetricGroup field which you 
can use in the WindowOperator instead of going through the RuntimeContext. You 
can access it directly or via getMetricGroup().

Also, you create 2 groups with a constant name. This means that all metrics 
that are registered in a trigger now contain "WindowOperator.Trigger" in their 
final metric name.

Now the Trigger group is probably _fine_, but is now also mandatory. I 
would prefer removing it; if users want to group their metrics by Trigger they 
can easily do so themselves.

The WindowOperator group should be removed. It is inconsistent with other 
operators and provides no really specific information. It will usually be 
redundant since the operator name often contains the String "window" somewhere.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331785#comment-15331785
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2088
  
Thanks for the update @twalthr. The changes look good. Should be good to 
merge after the aggregators commit is added.


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger support to...

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2088
  
Thanks for the update @twalthr. The changes look good. Should be good to 
merge after the aggregators commit is added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()

2016-06-15 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4076:
-

 Summary: BoltWrapper#dispose() should call 
AbstractStreamOperator#dispose()
 Key: FLINK-4076
 URL: https://issues.apache.org/jira/browse/FLINK-4076
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  @Override
  public void dispose() {
this.bolt.cleanup();
  }
{code}
AbstractStreamOperator#dispose() should be called first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331751#comment-15331751
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r67161013
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 ---
@@ -143,15 +143,30 @@ object ScalarFunctions {
 new MultiTypeMethodCallGen(BuiltInMethods.ABS))
 
   addSqlFunction(
+ABS,
+Seq(BIG_DEC_TYPE_INFO),
+new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC))
+
+  addSqlFunction(
--- End diff --

I see, thanks


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r67161013
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 ---
@@ -143,15 +143,30 @@ object ScalarFunctions {
 new MultiTypeMethodCallGen(BuiltInMethods.ABS))
 
   addSqlFunction(
+ABS,
+Seq(BIG_DEC_TYPE_INFO),
+new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC))
+
+  addSqlFunction(
--- End diff --

I see, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331750#comment-15331750
 ] 

ASF GitHub Bot commented on FLINK-4024:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2100
  
+1 to merge


> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2100
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

2016-06-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2093
  
For the tests, I think you need to compare the expected results to the 
actual results the same way the existing tests do it, via exact comparison. As 
you put it now, it would allow that all actual results are the same value 
because the tests don't verify that we see all expected values, just that the 
size of the result list matches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3714) Add Support for "Allowed Lateness"

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331745#comment-15331745
 ] 

ASF GitHub Bot commented on FLINK-3714:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2093
  
For the tests, I think you need to compare the expected results to the 
actual results the same way the existing tests do it, via exact comparison. As 
you put it now, it would allow that all actual results are the same value 
because the tests don't verify that we see all expected values, just that the 
size of the result list matches.


> Add Support for "Allowed Lateness"
> --
>
> Key: FLINK-3714
> URL: https://issues.apache.org/jira/browse/FLINK-3714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> As mentioned in 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
>  we should add support for an allowed lateness setting.
> This includes several things:
>  - API for setting allowed lateness
>  - Dropping of late elements 
>  - Garbage collection of windows state/timers
> Depending on whether the {{WindowAssigner}} assigns windows based on event 
> time or processing time we have to adjust the GC behavior. For event-time 
> windows "allowed lateness" makes sense and we should garbage collect after 
> this expires. For processing-time windows "allowed lateness" does not make 
> sense and we should always GC window state/timers at the end timestamp of a 
> processing-time window. I think that we need a method for this on 
> {{WindowAssigner}} that allows to differentiate between event-time windows 
> and processing-time windows: {{boolean WindowAssigner.isEventTime()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331741#comment-15331741
 ] 

ASF GitHub Bot commented on FLINK-4024:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2100
  
Done! Thanks @fhueske .
Kostas


> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

2016-06-15 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2100
  
Done! Thanks @fhueske .
Kostas


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331721#comment-15331721
 ] 

ASF GitHub Bot commented on FLINK-4024:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2100
  
Good to merge after the `isFormatOpen` flag was removed.
Thanks, Fabian


> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...

2016-06-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2100
  
Good to merge after the `isFormatOpen` flag was removed.
Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

2016-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2100#discussion_r67158378
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -173,6 +175,7 @@ public void close() throws Exception {
private class SplitReader extends Thread {
 
private volatile boolean isRunning;
+   private volatile boolean isFormatOpen = false;
--- End diff --

I think this flag can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331718#comment-15331718
 ] 

ASF GitHub Bot commented on FLINK-4024:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2100#discussion_r67158378
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -173,6 +175,7 @@ public void close() throws Exception {
private class SplitReader extends Thread {
 
private volatile boolean isRunning;
+   private volatile boolean isFormatOpen = false;
--- End diff --

I think this flag can be removed.


> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3714) Add Support for "Allowed Lateness"

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331711#comment-15331711
 ] 

ASF GitHub Bot commented on FLINK-3714:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2093
  
Can you please also add a short javadoc on `EventTimeTriggerAccum`, i.e. 
what it does and why we need a special trigger for the tests. 


> Add Support for "Allowed Lateness"
> --
>
> Key: FLINK-3714
> URL: https://issues.apache.org/jira/browse/FLINK-3714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> As mentioned in 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
>  we should add support for an allowed lateness setting.
> This includes several things:
>  - API for setting allowed lateness
>  - Dropping of late elements 
>  - Garbage collection of windows state/timers
> Depending on whether the {{WindowAssigner}} assigns windows based on event 
> time or processing time we have to adjust the GC behavior. For event-time 
> windows "allowed lateness" makes sense and we should garbage collect after 
> this expires. For processing-time windows "allowed lateness" does not make 
> sense and we should always GC window state/timers at the end timestamp of a 
> processing-time window. I think that we need a method for this on 
> {{WindowAssigner}} that allows to differentiate between event-time windows 
> and processing-time windows: {{boolean WindowAssigner.isEventTime()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

2016-06-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2093
  
Can you please also add a short javadoc on `EventTimeTriggerAccum`, i.e. 
what it does and why we need a special trigger for the tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >