[jira] [Assigned] (FLINK-2826) transformed is modified in BroadcastVariableMaterialization#decrementReferenceInternal without proper locking

2015-11-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reassigned FLINK-2826:
-

Assignee: Ted Yu

> transformed is modified in 
> BroadcastVariableMaterialization#decrementReferenceInternal without proper 
> locking
> -
>
> Key: FLINK-2826
> URL: https://issues.apache.org/jira/browse/FLINK-2826
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ted Yu
>
> Here is related code:
> {code}
> if (references.isEmpty()) {
> disposed = true;
> data = null;
> transformed = null;
> {code}
> Elsewhere, transformed is modified with lock on 
> BroadcastVariableMaterialization.this



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2963) Dependence on SerializationUtils#deserialize() should be avoided

2015-11-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998719#comment-14998719
 ] 

Ted Yu commented on FLINK-2963:
---

Not completely:
{code}
return SerializationUtils.deserialize(message);
./flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
T copied = SerializationUtils.deserialize(SerializationUtils
./flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
{code}

> Dependence on SerializationUtils#deserialize() should be avoided
> 
>
> Key: FLINK-2963
> URL: https://issues.apache.org/jira/browse/FLINK-2963
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> There is a problem with `SerializationUtils` from Apache Commons
> Lang. Here is an open issue where the class will throw a
> `ClassNotFoundException` even if the class is in the classpath in a
> multiple-classloader environment:
> https://issues.apache.org/jira/browse/LANG-1049
> {code}
>   state = (HashMap) 
> SerializationUtils.deserialize(bais);
> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
>   state = (HashMap) 
> SerializationUtils.deserialize(bais);
> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
>   return SerializationUtils.deserialize(message);
> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
>   T copied = SerializationUtils.deserialize(SerializationUtils
> ./flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
> {code}
> We should move away from SerializationUtils.deserialize()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2993) Set default elayBetweenExecutionRetries to 0

2015-11-10 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998850#comment-14998850
 ] 

Stephan Ewen commented on FLINK-2993:
-

Makes sense

> Set default elayBetweenExecutionRetries to 0
> 
>
> Key: FLINK-2993
> URL: https://issues.apache.org/jira/browse/FLINK-2993
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10
>Reporter: Stephan Ewen
> Fix For: 1.0
>
>
> The default value is too high and gives a strange user experience.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2993) Set default elayBetweenExecutionRetries to 0

2015-11-10 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-2993:
---

Assignee: Stephan Ewen

> Set default elayBetweenExecutionRetries to 0
> 
>
> Key: FLINK-2993
> URL: https://issues.apache.org/jira/browse/FLINK-2993
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0
>
>
> The default value is too high and gives a strange user experience.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2995) Set default number of retries to larger than 0

2015-11-10 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2995:
---

 Summary: Set default number of retries to larger than 0
 Key: FLINK-2995
 URL: https://issues.apache.org/jira/browse/FLINK-2995
 Project: Flink
  Issue Type: Wish
  Components: Core
Affects Versions: 0.10
Reporter: Stephan Ewen
Priority: Minor
 Fix For: 1.0


Right now, the default number of retries is 0, meaning fault tolerance must be 
activated explicitly.

In both streaming and batch, we can provide a smoother experience when fault 
tolerance is activated by default.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2966) Improve the way job duration is reported on web frontend.

2015-11-10 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998707#comment-14998707
 ] 

Maximilian Michels commented on FLINK-2966:
---

release-0.10: 715d33b

> Improve the way job duration is reported on web frontend.
> -
>
> Key: FLINK-2966
> URL: https://issues.apache.org/jira/browse/FLINK-2966
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Sachin Goel
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 1.0, 0.10.1
>
>
> Right now, job duration is always reported in milliseconds. For long running 
> jobs, this is not the best way.
> We should incorporate some kind of granularity in this. I propose this:
> 0-60 s : as x ms
> 60-3600 s: as x min y s
> 3600-86400 s: as x h y min [report as x h y min z s on hover]
> > 86400 s: as x d y h [report as x d y h z min t s on hover]
> I will start working on this, and we can change the granularity if someone 
> has a better idea.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2994) Client sysout logging does not report exceptions

2015-11-10 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2994:
---

 Summary: Client sysout logging does not report exceptions
 Key: FLINK-2994
 URL: https://issues.apache.org/jira/browse/FLINK-2994
 Project: Flink
  Issue Type: Improvement
  Components: Command-line client
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.0


For failures on the JobManager, the client simply prints that the job switched 
to RESTARTING and does not print the exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2993) Set default DelayBetweenExecutionRetries to 0

2015-11-10 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-2993:

Summary: Set default DelayBetweenExecutionRetries to 0  (was: Set default 
elayBetweenExecutionRetries to 0)

> Set default DelayBetweenExecutionRetries to 0
> -
>
> Key: FLINK-2993
> URL: https://issues.apache.org/jira/browse/FLINK-2993
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0
>
>
> The default value is too high and gives a strange user experience.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998926#comment-14998926
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44434469
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The 

[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44434469
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998999#comment-14998999
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44439588
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> 

[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44439588
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x

[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998963#comment-14998963
 ] 

ASF GitHub Bot commented on FLINK-2828:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1237#issuecomment-155500029
  
Yes, sorry for the delay.


> Add interfaces for Table API input formats
> --
>
> Key: FLINK-2828
> URL: https://issues.apache.org/jira/browse/FLINK-2828
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> In order to support input formats for the Table API, interfaces are 
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the 
> plan. Although the output schema stays the same, the TableSource can react on 
> field resolution and/or predicates internally and can return adapted 
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional 
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment 
> (common super class of all ExecutionEnvironments for DataSets and 
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), 
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2828] [table] Add interfaces for Table ...

2015-11-10 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1237#issuecomment-155500029
  
Yes, sorry for the delay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998953#comment-14998953
 ] 

ASF GitHub Bot commented on FLINK-2828:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1237#issuecomment-155498508
  
@aljoscha Do you think you'll find time this week for the PR? All Table API 
input formats are waiting for it...


> Add interfaces for Table API input formats
> --
>
> Key: FLINK-2828
> URL: https://issues.apache.org/jira/browse/FLINK-2828
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> In order to support input formats for the Table API, interfaces are 
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the 
> plan. Although the output schema stays the same, the TableSource can react on 
> field resolution and/or predicates internally and can return adapted 
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional 
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment 
> (common super class of all ExecutionEnvironments for DataSets and 
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), 
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998985#comment-14998985
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44438028
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java 
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class pojoTypeClass;
+
+   private String[] pojoFieldNames;
+
+   private transient PojoTypeInfo pojoTypeInfo;
+   private transient Field[] pojoFields;
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity()));
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity()));
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, createDefaultMask(fieldNames.length));
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, int[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask));
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, int[] includedFieldsMask) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, int[] 
includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null)
+   ? createDefaultMask(fieldNames.length)
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, mask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, 

[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44438028
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java 
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class pojoTypeClass;
+
+   private String[] pojoFieldNames;
+
+   private transient PojoTypeInfo pojoTypeInfo;
+   private transient Field[] pojoFields;
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity()));
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity()));
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, createDefaultMask(fieldNames.length));
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, int[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask));
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, int[] includedFieldsMask) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, int[] 
includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null)
+   ? createDefaultMask(fieldNames.length)
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, mask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, boolean[] includedFieldsMask) {

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999015#comment-14999015
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-155515305
  
I agree with @aljoscha and you on the `readRecord()` code. It would be nice 
to have the common parts of `readRecord()` in the `CsvInputFormat` and specific 
`fillRecord` in the tuple and POJO formats.

Otherwise, the PR looks really good.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-155515305
  
I agree with @aljoscha and you on the `readRecord()` code. It would be nice 
to have the common parts of `readRecord()` in the `CsvInputFormat` and specific 
`fillRecord` in the tuple and POJO formats.

Otherwise, the PR looks really good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2978) Integrate web submission interface into the new dashboard

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999044#comment-14999044
 ] 

ASF GitHub Bot commented on FLINK-2978:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1338#issuecomment-155518122
  
@rmetzger Good to test. :)


> Integrate web submission interface into the new dashboard
> -
>
> Key: FLINK-2978
> URL: https://issues.apache.org/jira/browse/FLINK-2978
> Project: Flink
>  Issue Type: Bug
>  Components: Web Client, Webfrontend
>Reporter: Sachin Goel
>Assignee: Sachin Goel
>
> As discussed in 
> http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCAL3J2zQg6UBKNDnm=8tshpz6r4p2jvx7nrlom7caajrb9s6...@mail.gmail.com%3E,
>  we should integrate job submission from the web into the dashboard.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2978][web-dashboard][webclient] Integra...

2015-11-10 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1338#issuecomment-155518122
  
@rmetzger Good to test. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2828] [table] Add interfaces for Table ...

2015-11-10 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1237#issuecomment-155498508
  
@aljoscha Do you think you'll find time this week for the PR? All Table API 
input formats are waiting for it...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999027#comment-14999027
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r1239
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, 

[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r1239
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x

[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44434150
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

Isn't the default that fields are read one after the other from the start 
of a line?
Why do we need this method then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998914#comment-14998914
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44434150
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

Isn't the default that fields are read one after the other from the start 
of a line?
Why do we need this method then?


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one 

[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435787
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

I wanted to cover that case directly in the InputFormat instead of 
*somewhere* else. This method is used to create a mask for exactly that case, 
when we can infer the mask from the number of field types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998947#comment-14998947
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435810
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

*cover it in an obvious manner


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998946#comment-14998946
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435787
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

I wanted to cover that case directly in the InputFormat instead of 
*somewhere* else. This method is used to create a mask for exactly that case, 
when we can infer the mask from the number of field types.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods 

[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435810
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

*cover it in an obvious manner


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2996) Add config entry to define BlobServer port

2015-11-10 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2996:
---

 Summary: Add config entry to define BlobServer port
 Key: FLINK-2996
 URL: https://issues.apache.org/jira/browse/FLINK-2996
 Project: Flink
  Issue Type: New Feature
  Components: JobManager
Affects Versions: 0.10
Reporter: Stephan Ewen
 Fix For: 1.0, 0.10.1


The blob server currently allocates a random port. To work better together with 
firewalled clusters, it must be possible to optionally pre-configure the port 
to a specific value,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2901) Several flink-test ITCases depend on Record API features

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998310#comment-14998310
 ] 

ASF GitHub Bot commented on FLINK-2901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44386043
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 ---
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
--- End diff --

I think we should keep this test and port it.


> Several flink-test ITCases depend on Record API features
> 
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend 
> on the Record API including:
> - ITCases for Record API operators in 
> {{flink-tests/src/test/java/org/apache/flink/test/operators}}
> - ITCases for Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobTests}}
> - Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobs}}
> - Several ITCases for iterations in 
> {{flink-tests/src/test/java/org/apache/flink/test/iterative}}
> - Tests for job canceling in 
> {{flink-tests/src/test/java/org/apache/flink/test/cancelling}}
> - Test for failing jobs in 
> {{flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase}}
> - Optimizer tests in 
> {{flink-tests/src/test/java/org/apache/flink/test/optimizer}}
> - Accumulator test in 
> {{flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase}}
> - Broadcast test in 
> {{flink-tests/src/test/java/org/apache/flink/test/broadcastvasr/BroadcastBranchingITCase}}
> - distributed cache test in 
> {{flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest}}
> and probably a few more.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2901) Several flink-test ITCases depend on Record API features

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998313#comment-14998313
 ] 

ASF GitHub Bot commented on FLINK-2901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44386181
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 ---
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
-   private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + 
"5\n";
-   private static final String EXPECTED = "22\n";
-
-   protected String dataPath;
-   protected String resultPath;
-
-   public IterationTerminationWithTwoTails(){
-   setTaskManagerNumSlots(parallelism);
-   }
-
-   @Override
-   protected void preSubmit() throws Exception {
-   dataPath = createTempFile("datapoints.txt", INPUT);
-   resultPath = getTempFilePath("result");
-   }
-   
-   @Override
-   protected void postSubmit() throws Exception {
-   compareResultsByLinesInMemory(EXPECTED, resultPath);
-   }
-
-   @Override
-   protected Plan getTestJob() {
-   return getTestPlanPlan(parallelism, dataPath, resultPath);
-   }
-   
-   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
-
-   FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-   
-   BulkIteration iteration = new BulkIteration("Loop");
-   iteration.setInput(initialInput);
-   iteration.setMaximumNumberOfIterations(5);
-   Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
-   ReduceOperator sumReduce = ReduceOperator.builder(new 
SumReducer())
-   .input(iteration.getPartialSolution())
-   .name("Compute sum (Reduce)")
-   .build();
-   
-   iteration.setNextPartialSolution(sumReduce);
-   
-   MapOperator terminationMapper = MapOperator.builder(new 
TerminationMapper())
-   .input(iteration.getPartialSolution())
--- End diff --

The difference to `IterationTerminationWithTerminationTail` is that the 
input of the `terminationMapper` is the partial solution and not the result of 
the `SumReducer`, right?


> Several flink-test ITCases depend on Record API features
> 
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 0.10
>Reporter: 

[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44386181
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 ---
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
-   private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + 
"5\n";
-   private static final String EXPECTED = "22\n";
-
-   protected String dataPath;
-   protected String resultPath;
-
-   public IterationTerminationWithTwoTails(){
-   setTaskManagerNumSlots(parallelism);
-   }
-
-   @Override
-   protected void preSubmit() throws Exception {
-   dataPath = createTempFile("datapoints.txt", INPUT);
-   resultPath = getTempFilePath("result");
-   }
-   
-   @Override
-   protected void postSubmit() throws Exception {
-   compareResultsByLinesInMemory(EXPECTED, resultPath);
-   }
-
-   @Override
-   protected Plan getTestJob() {
-   return getTestPlanPlan(parallelism, dataPath, resultPath);
-   }
-   
-   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
-
-   FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-   
-   BulkIteration iteration = new BulkIteration("Loop");
-   iteration.setInput(initialInput);
-   iteration.setMaximumNumberOfIterations(5);
-   Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
-   ReduceOperator sumReduce = ReduceOperator.builder(new 
SumReducer())
-   .input(iteration.getPartialSolution())
-   .name("Compute sum (Reduce)")
-   .build();
-   
-   iteration.setNextPartialSolution(sumReduce);
-   
-   MapOperator terminationMapper = MapOperator.builder(new 
TerminationMapper())
-   .input(iteration.getPartialSolution())
--- End diff --

The difference to `IterationTerminationWithTerminationTail` is that the 
input of the `terminationMapper` is the partial solution and not the result of 
the `SumReducer`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2213) Configure number of vcores

2015-11-10 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998321#comment-14998321
 ] 

Maximilian Michels commented on FLINK-2213:
---

Seems to be more likely the container was killed due to memory usage.

Thanks for clarifying that every non-chained operator has its own thread 
[~fhueske]. Still, setting the vcores to the number of task slots seems like a 
much better estimate to me than defaulting to 1.

> Configure number of vcores
> --
>
> Key: FLINK-2213
> URL: https://issues.apache.org/jira/browse/FLINK-2213
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN Client
>Affects Versions: 0.10
>Reporter: Ufuk Celebi
> Fix For: 1.0
>
>
> Currently, the number of vcores per YARN container is set to 1.
> It is desirable to allow configuring this value. As a simple heuristic it 
> makes sense to at least set it to the number of slots per container.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2977) Cannot access HBase in a Kerberos secured Yarn cluster

2015-11-10 Thread Niels Basjes (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998346#comment-14998346
 ] 

Niels Basjes commented on FLINK-2977:
-

I ran this version over night but the VPN from my system stopped before the 
ticket could expire.
I am quite confident that this patch is right though.
I'll start a new test on my end. With this pull request you guys can verify my 
patch and give me feedback on any improvements you see.

> Cannot access HBase in a Kerberos secured Yarn cluster
> --
>
> Key: FLINK-2977
> URL: https://issues.apache.org/jira/browse/FLINK-2977
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Attachments: FLINK-2977-20151005-untested.patch, 
> FLINK-2977-20151009.patch
>
>
> I have created a very simple Flink topology consisting of a streaming Source 
> (the outputs the timestamp a few times per second) and a Sink (that puts that 
> timestamp into a single record in HBase).
> Running this on a non-secure Yarn cluster works fine.
> To run it on a secured Yarn cluster my main routine now looks like this:
> {code}
> public static void main(String[] args) throws Exception {
> System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
> UserGroupInformation.loginUserFromKeytab("nbas...@xx.net", 
> "/home/nbasjes/.krb/nbasjes.keytab");
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream stream = env.addSource(new TimerTicksSource());
> stream.addSink(new SetHBaseRowSink());
> env.execute("Long running Flink application");
> }
> {code}
> When I run this 
>  flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096 
> ./kerberos-1.0-SNAPSHOT.jar
> I see after the startup messages:
> {quote}
> 17:13:24,466 INFO  org.apache.hadoop.security.UserGroupInformation
>- Login successful for user nbas...@xx.net using keytab file 
> /home/nbasjes/.krb/nbasjes.keytab
> 11/03/2015 17:13:25   Job execution switched to status RUNNING.
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to SCHEDULED 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to DEPLOYING 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to RUNNING 
> {quote}
> Which looks good.
> However ... no data goes into HBase.
> After some digging I found this error in the task managers log:
> {quote}
> 17:13:42,677 WARN  org.apache.hadoop.hbase.ipc.RpcClient  
>- Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 17:13:42,677 FATAL org.apache.hadoop.hbase.ipc.RpcClient  
>- SASL authentication failed. The most likely cause is missing or invalid 
> credentials. Consider 'kinit'.
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
>   at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>   at 
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:177)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupSaslConnection(RpcClient.java:815)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.access$800(RpcClient.java:349)
> {quote}
> First starting a yarn-session and then loading my job gives the same error.
> My best guess at this point is that Flink needs the same fix as described 
> here:
> https://issues.apache.org/jira/browse/SPARK-6918   ( 
> https://github.com/apache/spark/pull/5586 )



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2977) Cannot access HBase in a Kerberos secured Yarn cluster

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998373#comment-14998373
 ] 

ASF GitHub Bot commented on FLINK-2977:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1342#issuecomment-155380888
  
Thanks for the pull request. Looks good to me except for one comment. I 
would like to postpone merging until you have verified the changes.


> Cannot access HBase in a Kerberos secured Yarn cluster
> --
>
> Key: FLINK-2977
> URL: https://issues.apache.org/jira/browse/FLINK-2977
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Attachments: FLINK-2977-20151005-untested.patch, 
> FLINK-2977-20151009.patch
>
>
> I have created a very simple Flink topology consisting of a streaming Source 
> (the outputs the timestamp a few times per second) and a Sink (that puts that 
> timestamp into a single record in HBase).
> Running this on a non-secure Yarn cluster works fine.
> To run it on a secured Yarn cluster my main routine now looks like this:
> {code}
> public static void main(String[] args) throws Exception {
> System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
> UserGroupInformation.loginUserFromKeytab("nbas...@xx.net", 
> "/home/nbasjes/.krb/nbasjes.keytab");
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream stream = env.addSource(new TimerTicksSource());
> stream.addSink(new SetHBaseRowSink());
> env.execute("Long running Flink application");
> }
> {code}
> When I run this 
>  flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096 
> ./kerberos-1.0-SNAPSHOT.jar
> I see after the startup messages:
> {quote}
> 17:13:24,466 INFO  org.apache.hadoop.security.UserGroupInformation
>- Login successful for user nbas...@xx.net using keytab file 
> /home/nbasjes/.krb/nbasjes.keytab
> 11/03/2015 17:13:25   Job execution switched to status RUNNING.
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to SCHEDULED 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to DEPLOYING 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to RUNNING 
> {quote}
> Which looks good.
> However ... no data goes into HBase.
> After some digging I found this error in the task managers log:
> {quote}
> 17:13:42,677 WARN  org.apache.hadoop.hbase.ipc.RpcClient  
>- Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 17:13:42,677 FATAL org.apache.hadoop.hbase.ipc.RpcClient  
>- SASL authentication failed. The most likely cause is missing or invalid 
> credentials. Consider 'kinit'.
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
>   at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>   at 
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:177)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupSaslConnection(RpcClient.java:815)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.access$800(RpcClient.java:349)
> {quote}
> First starting a yarn-session and then loading my job gives the same error.
> My best guess at this point is that Flink needs the same fix as described 
> here:
> https://issues.apache.org/jira/browse/SPARK-6918   ( 
> https://github.com/apache/spark/pull/5586 )



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2977] Added support for accessing a Ker...

2015-11-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1342#issuecomment-155380888
  
Thanks for the pull request. Looks good to me except for one comment. I 
would like to postpone merging until you have verified the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2977] Added support for accessing a Ker...

2015-11-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1342#discussion_r44390519
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -135,7 +142,40 @@ public static void setTokensFor(ContainerLaunchContext 
amContainer, Path[] paths
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
amContainer.setTokens(securityTokens);
}
-   
+
+   /**
+* Obtain Kerberos security token for HBase.
+*/
+   private static void obtainTokenForHBase(Credentials credentials, 
Configuration conf) {
+   if (UserGroupInformation.isSecurityEnabled()) {
+   LOG.info("Attempting to obtain Kerberos security token 
for HBase");
+   try {
+   HBaseConfiguration.addHbaseResources(conf);
+   if 
(!"kerberos".equals(conf.get("hbase.security.authentication"))) {
+   LOG.info("HBase has not been configured 
to use Kerberos.");
+   return;
+   }
+
+   LOG.info("Connecting to HBase");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
+
+   LOG.info("Obtaining Kerberos security token for 
HBase");
+   Token token = 
TokenUtil.obtainToken(connection);
+
+   if (token == null) {
+   LOG.error("No Kerberos security token 
for HBase available");
+   return;
+   }
+
+   credentials.addToken(token.getService(), token);
+   LOG.info("Added HBase Kerberos security token 
to credentials.");
+   } catch (IOException e) {
+   LOG.error("Caught exception while trying to 
obtain HBase Kerberos security token.");
+   e.printStackTrace();
--- End diff --

You might want to re`throw` the Exception here or not catch it at all. This 
ensures that it is probably forwarded to the client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998392#comment-14998392
 ] 

ASF GitHub Bot commented on FLINK-2797:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155385085
  
>>> You're probably right. It may just seem odd to the user that code gets 
executed even though the job won't be submitted. We might fix that in the 
future if users don't like it.

I think most people will be aware that Flink executes lazily, and just 
because the program goes right till the end printing whatever it does, it 
doesn't guarantee a true submission. But there is no point debating on this. We 
can adapt to whatever people prefer. This'll likely go through a major change 
before 1.0 to support truly detached mode anyways.

>>>Looks like your recent changes broke the YARN tests?
Yes. It's also kind of hard to debug them. I can't seem to run them on my 
VM, and travis build takes a full one and a half hour to produce anything 
useful, and even then, I can't access the complete logs. It probably requires 
AWS keys or something. I'm not sure.
Do you know of any change I can make to travis conf so all modules are 
built but only Yarn tests get executed? 

I have one more concern:
There are four types of Yarn runs we can perform:
1. Yarn session in detached mode: Equivalent to `bin/start-cluster.sh`
2. Yarn session: Doesn't have a counterpart in standalone mode
3. Per job yarn in detached mode: No counterpart in standalone mode, even 
after we add this PR.
4. Per job yarn: No counterpart in standalone mode.

Now, when `bin/flink -d` is run, user will expect there to be a single 
semantics; that a cluster will be brought up, job will be submitted and the 
cluster will be terminated. However, there doesn't seem to be any way of 
achieving this in standalone mode. Will this create any confusion? 


> CLI: Missing option to submit jobs in detached mode
> ---
>
> Key: FLINK-2797
> URL: https://issues.apache.org/jira/browse/FLINK-2797
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> Jobs can only be submitted in detached mode using YARN but not on a 
> standalone installation. This has been requested by users who want to submit 
> a job, get the job id, and later query its status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...

2015-11-10 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155385085
  
>>> You're probably right. It may just seem odd to the user that code gets 
executed even though the job won't be submitted. We might fix that in the 
future if users don't like it.

I think most people will be aware that Flink executes lazily, and just 
because the program goes right till the end printing whatever it does, it 
doesn't guarantee a true submission. But there is no point debating on this. We 
can adapt to whatever people prefer. This'll likely go through a major change 
before 1.0 to support truly detached mode anyways.

>>>Looks like your recent changes broke the YARN tests?
Yes. It's also kind of hard to debug them. I can't seem to run them on my 
VM, and travis build takes a full one and a half hour to produce anything 
useful, and even then, I can't access the complete logs. It probably requires 
AWS keys or something. I'm not sure.
Do you know of any change I can make to travis conf so all modules are 
built but only Yarn tests get executed? 

I have one more concern:
There are four types of Yarn runs we can perform:
1. Yarn session in detached mode: Equivalent to `bin/start-cluster.sh`
2. Yarn session: Doesn't have a counterpart in standalone mode
3. Per job yarn in detached mode: No counterpart in standalone mode, even 
after we add this PR.
4. Per job yarn: No counterpart in standalone mode.

Now, when `bin/flink -d` is run, user will expect there to be a single 
semantics; that a cluster will be brought up, job will be submitted and the 
cluster will be terminated. However, there doesn't seem to be any way of 
achieving this in standalone mode. Will this create any confusion? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...

2015-11-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155373495
  
>This is a full fledged execution of the use code, and should show any user 
generated out and err messages in my opinion.

You're probably right. It may just seem odd to the user that code gets 
executed even though the job won't be submitted. We might fix that in the 
future if users don't like it.

>Of course, anything Flink will print, say, log messages, will already be 
disabled as there is no actor to receive the messages.

There are still jobmanager/taskmanager log files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2901) Several flink-test ITCases depend on Record API features

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998340#comment-14998340
 ] 

ASF GitHub Bot commented on FLINK-2901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44388274
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
 ---
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
--- End diff --

We should check if there is another test in the test suite that uses a 
broadcast variable in a bulk iteration. 
If not, we should implement such a test (not necessarily reimplementing the 
KMeans program).


> Several flink-test ITCases depend on Record API features
> 
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend 
> on the Record API including:
> - ITCases for Record API operators in 
> {{flink-tests/src/test/java/org/apache/flink/test/operators}}
> - ITCases for Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobTests}}
> - Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobs}}
> - Several ITCases for iterations in 
> {{flink-tests/src/test/java/org/apache/flink/test/iterative}}
> - Tests for job canceling in 
> {{flink-tests/src/test/java/org/apache/flink/test/cancelling}}
> - Test for failing jobs in 
> {{flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase}}
> - Optimizer tests in 
> {{flink-tests/src/test/java/org/apache/flink/test/optimizer}}
> - Accumulator test in 
> {{flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase}}
> - Broadcast test in 
> {{flink-tests/src/test/java/org/apache/flink/test/broadcastvasr/BroadcastBranchingITCase}}
> - distributed cache test in 
> {{flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest}}
> and probably a few more.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44388274
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
 ---
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
--- End diff --

We should check if there is another test in the test suite that uses a 
broadcast variable in a bulk iteration. 
If not, we should implement such a test (not necessarily reimplementing the 
KMeans program).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2977] Added support for accessing a Ker...

2015-11-10 Thread nielsbasjes
Github user nielsbasjes commented on the pull request:

https://github.com/apache/flink/pull/1342#issuecomment-155376887
  
I ran this version over night but the VPN from my system to the cluster 
stopped before the Kerberos ticket could expire.
I am quite confident that this patch is right though.
I'll start a new test on my end. With this pull request you guys can verify 
my patch and give me feedback on any improvements you see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2977) Cannot access HBase in a Kerberos secured Yarn cluster

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998350#comment-14998350
 ] 

ASF GitHub Bot commented on FLINK-2977:
---

Github user nielsbasjes commented on the pull request:

https://github.com/apache/flink/pull/1342#issuecomment-155376887
  
I ran this version over night but the VPN from my system to the cluster 
stopped before the Kerberos ticket could expire.
I am quite confident that this patch is right though.
I'll start a new test on my end. With this pull request you guys can verify 
my patch and give me feedback on any improvements you see.


> Cannot access HBase in a Kerberos secured Yarn cluster
> --
>
> Key: FLINK-2977
> URL: https://issues.apache.org/jira/browse/FLINK-2977
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Attachments: FLINK-2977-20151005-untested.patch, 
> FLINK-2977-20151009.patch
>
>
> I have created a very simple Flink topology consisting of a streaming Source 
> (the outputs the timestamp a few times per second) and a Sink (that puts that 
> timestamp into a single record in HBase).
> Running this on a non-secure Yarn cluster works fine.
> To run it on a secured Yarn cluster my main routine now looks like this:
> {code}
> public static void main(String[] args) throws Exception {
> System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
> UserGroupInformation.loginUserFromKeytab("nbas...@xx.net", 
> "/home/nbasjes/.krb/nbasjes.keytab");
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream stream = env.addSource(new TimerTicksSource());
> stream.addSink(new SetHBaseRowSink());
> env.execute("Long running Flink application");
> }
> {code}
> When I run this 
>  flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096 
> ./kerberos-1.0-SNAPSHOT.jar
> I see after the startup messages:
> {quote}
> 17:13:24,466 INFO  org.apache.hadoop.security.UserGroupInformation
>- Login successful for user nbas...@xx.net using keytab file 
> /home/nbasjes/.krb/nbasjes.keytab
> 11/03/2015 17:13:25   Job execution switched to status RUNNING.
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to SCHEDULED 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to DEPLOYING 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to RUNNING 
> {quote}
> Which looks good.
> However ... no data goes into HBase.
> After some digging I found this error in the task managers log:
> {quote}
> 17:13:42,677 WARN  org.apache.hadoop.hbase.ipc.RpcClient  
>- Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 17:13:42,677 FATAL org.apache.hadoop.hbase.ipc.RpcClient  
>- SASL authentication failed. The most likely cause is missing or invalid 
> credentials. Consider 'kinit'.
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
>   at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>   at 
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:177)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupSaslConnection(RpcClient.java:815)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.access$800(RpcClient.java:349)
> {quote}
> First starting a yarn-session and then loading my job gives the same error.
> My best guess at this point is that Flink needs the same fix as described 
> here:
> https://issues.apache.org/jira/browse/SPARK-6918   ( 
> https://github.com/apache/spark/pull/5586 )



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998376#comment-14998376
 ] 

ASF GitHub Bot commented on FLINK-2797:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155381011
  
Looks like your recent changes broke the YARN tests?


> CLI: Missing option to submit jobs in detached mode
> ---
>
> Key: FLINK-2797
> URL: https://issues.apache.org/jira/browse/FLINK-2797
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> Jobs can only be submitted in detached mode using YARN but not on a 
> standalone installation. This has been requested by users who want to submit 
> a job, get the job id, and later query its status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...

2015-11-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155381011
  
Looks like your recent changes broke the YARN tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2901) Several flink-test ITCases depend on Record API features

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998320#comment-14998320
 ] 

ASF GitHub Bot commented on FLINK-2901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44386994
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
 ---
@@ -18,91 +18,34 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationWithAllReducerITCase extends RecordAPITestBase {
-
-   private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" + "1\n";
+import java.util.List;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class IterationWithAllReducerITCase extends JavaProgramTestBase {
private static final String EXPECTED = "1\n";
 
-   protected String dataPath;
-   protected String resultPath;
-
-   public IterationWithAllReducerITCase(){
-   setTaskManagerNumSlots(4);
-   }
-
@Override
-   protected void preSubmit() throws Exception {
-   dataPath = createTempFile("datapoints.txt", INPUT);
-   resultPath = getTempFilePath("result");
-   }
-   
-   @Override
-   protected void postSubmit() throws Exception {
-   compareResultsByLinesInMemory(EXPECTED, resultPath);
-   }
-
-   @Override
-   protected Plan getTestJob() {
-   Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
-   return plan;
-   }
+   protected void testProgram() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(4);
 
-   
-   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
+   DataSet initialInput = env.fromElements("1", "1", "1", 
"1", "1", "1", "1", "1");
 
-   FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-   
-   BulkIteration iteration = new BulkIteration("Loop");
-   iteration.setInput(initialInput);
-   iteration.setMaximumNumberOfIterations(5);
-   
-   Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
+   IterativeDataSet iteration = 
initialInput.iterate(5).name("Loop");
 
-   ReduceOperator sumReduce = ReduceOperator.builder(new 
PickOneReducer())
-   .input(iteration.getPartialSolution())
-   .name("Compute sum (Reduce)")
-   .build();
-   
-   iteration.setNextPartialSolution(sumReduce);
+   DataSet sumReduce = iteration.reduce(new 
ReduceFunction(){
--- End diff --

A `GroupReduceFunction` would be closer to the original test.


> Several flink-test ITCases depend on Record API features
> 
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend 
> on the Record API including:
> - ITCases for Record API operators in 
> {{flink-tests/src/test/java/org/apache/flink/test/operators}}
> - ITCases for Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobTests}}
> - Record API programs 

[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998327#comment-14998327
 ] 

ASF GitHub Bot commented on FLINK-2797:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155373495
  
>This is a full fledged execution of the use code, and should show any user 
generated out and err messages in my opinion.

You're probably right. It may just seem odd to the user that code gets 
executed even though the job won't be submitted. We might fix that in the 
future if users don't like it.

>Of course, anything Flink will print, say, log messages, will already be 
disabled as there is no actor to receive the messages.

There are still jobmanager/taskmanager log files.


> CLI: Missing option to submit jobs in detached mode
> ---
>
> Key: FLINK-2797
> URL: https://issues.apache.org/jira/browse/FLINK-2797
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> Jobs can only be submitted in detached mode using YARN but not on a 
> standalone installation. This has been requested by users who want to submit 
> a job, get the job id, and later query its status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2977] Added support for accessing a Ker...

2015-11-10 Thread nielsbasjes
GitHub user nielsbasjes opened a pull request:

https://github.com/apache/flink/pull/1342

[FLINK-2977] Added support for accessing a Kerberos secured HBase 
installation.

See https://issues.apache.org/jira/browse/FLINK-2977

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nielsbasjes/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1342.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1342


commit 81c0180f74dd8711d6cf8a61ecfb4fb5c0d187d1
Author: Niels Basjes 
Date:   2015-11-10T10:04:00Z

[FLINK-2977] Added support for accessing a Kerberos secured HBase 
installation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2977) Cannot access HBase in a Kerberos secured Yarn cluster

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998341#comment-14998341
 ] 

ASF GitHub Bot commented on FLINK-2977:
---

GitHub user nielsbasjes opened a pull request:

https://github.com/apache/flink/pull/1342

[FLINK-2977] Added support for accessing a Kerberos secured HBase 
installation.

See https://issues.apache.org/jira/browse/FLINK-2977

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nielsbasjes/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1342.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1342


commit 81c0180f74dd8711d6cf8a61ecfb4fb5c0d187d1
Author: Niels Basjes 
Date:   2015-11-10T10:04:00Z

[FLINK-2977] Added support for accessing a Kerberos secured HBase 
installation.




> Cannot access HBase in a Kerberos secured Yarn cluster
> --
>
> Key: FLINK-2977
> URL: https://issues.apache.org/jira/browse/FLINK-2977
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Attachments: FLINK-2977-20151005-untested.patch, 
> FLINK-2977-20151009.patch
>
>
> I have created a very simple Flink topology consisting of a streaming Source 
> (the outputs the timestamp a few times per second) and a Sink (that puts that 
> timestamp into a single record in HBase).
> Running this on a non-secure Yarn cluster works fine.
> To run it on a secured Yarn cluster my main routine now looks like this:
> {code}
> public static void main(String[] args) throws Exception {
> System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
> UserGroupInformation.loginUserFromKeytab("nbas...@xx.net", 
> "/home/nbasjes/.krb/nbasjes.keytab");
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream stream = env.addSource(new TimerTicksSource());
> stream.addSink(new SetHBaseRowSink());
> env.execute("Long running Flink application");
> }
> {code}
> When I run this 
>  flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096 
> ./kerberos-1.0-SNAPSHOT.jar
> I see after the startup messages:
> {quote}
> 17:13:24,466 INFO  org.apache.hadoop.security.UserGroupInformation
>- Login successful for user nbas...@xx.net using keytab file 
> /home/nbasjes/.krb/nbasjes.keytab
> 11/03/2015 17:13:25   Job execution switched to status RUNNING.
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to SCHEDULED 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to DEPLOYING 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to RUNNING 
> {quote}
> Which looks good.
> However ... no data goes into HBase.
> After some digging I found this error in the task managers log:
> {quote}
> 17:13:42,677 WARN  org.apache.hadoop.hbase.ipc.RpcClient  
>- Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 17:13:42,677 FATAL org.apache.hadoop.hbase.ipc.RpcClient  
>- SASL authentication failed. The most likely cause is missing or invalid 
> credentials. Consider 'kinit'.
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
>   at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>   at 
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:177)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupSaslConnection(RpcClient.java:815)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClient$Connection.access$800(RpcClient.java:349)
> {quote}
> First starting a yarn-session and then loading my job gives the same error.
> My best guess at this point is that Flink needs the same fix as described 
> here:
> https://issues.apache.org/jira/browse/SPARK-6918   ( 
> https://github.com/apache/spark/pull/5586 )



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2977) Cannot access HBase in a Kerberos secured Yarn cluster

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998371#comment-14998371
 ] 

ASF GitHub Bot commented on FLINK-2977:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1342#discussion_r44390519
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -135,7 +142,40 @@ public static void setTokensFor(ContainerLaunchContext 
amContainer, Path[] paths
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
amContainer.setTokens(securityTokens);
}
-   
+
+   /**
+* Obtain Kerberos security token for HBase.
+*/
+   private static void obtainTokenForHBase(Credentials credentials, 
Configuration conf) {
+   if (UserGroupInformation.isSecurityEnabled()) {
+   LOG.info("Attempting to obtain Kerberos security token 
for HBase");
+   try {
+   HBaseConfiguration.addHbaseResources(conf);
+   if 
(!"kerberos".equals(conf.get("hbase.security.authentication"))) {
+   LOG.info("HBase has not been configured 
to use Kerberos.");
+   return;
+   }
+
+   LOG.info("Connecting to HBase");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
+
+   LOG.info("Obtaining Kerberos security token for 
HBase");
+   Token token = 
TokenUtil.obtainToken(connection);
+
+   if (token == null) {
+   LOG.error("No Kerberos security token 
for HBase available");
+   return;
+   }
+
+   credentials.addToken(token.getService(), token);
+   LOG.info("Added HBase Kerberos security token 
to credentials.");
+   } catch (IOException e) {
+   LOG.error("Caught exception while trying to 
obtain HBase Kerberos security token.");
+   e.printStackTrace();
--- End diff --

You might want to re`throw` the Exception here or not catch it at all. This 
ensures that it is probably forwarded to the client.


> Cannot access HBase in a Kerberos secured Yarn cluster
> --
>
> Key: FLINK-2977
> URL: https://issues.apache.org/jira/browse/FLINK-2977
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Attachments: FLINK-2977-20151005-untested.patch, 
> FLINK-2977-20151009.patch
>
>
> I have created a very simple Flink topology consisting of a streaming Source 
> (the outputs the timestamp a few times per second) and a Sink (that puts that 
> timestamp into a single record in HBase).
> Running this on a non-secure Yarn cluster works fine.
> To run it on a secured Yarn cluster my main routine now looks like this:
> {code}
> public static void main(String[] args) throws Exception {
> System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
> UserGroupInformation.loginUserFromKeytab("nbas...@xx.net", 
> "/home/nbasjes/.krb/nbasjes.keytab");
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream stream = env.addSource(new TimerTicksSource());
> stream.addSink(new SetHBaseRowSink());
> env.execute("Long running Flink application");
> }
> {code}
> When I run this 
>  flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096 
> ./kerberos-1.0-SNAPSHOT.jar
> I see after the startup messages:
> {quote}
> 17:13:24,466 INFO  org.apache.hadoop.security.UserGroupInformation
>- Login successful for user nbas...@xx.net using keytab file 
> /home/nbasjes/.krb/nbasjes.keytab
> 11/03/2015 17:13:25   Job execution switched to status RUNNING.
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to SCHEDULED 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to DEPLOYING 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to RUNNING 
> {quote}
> Which looks good.
> However ... no data goes into HBase.
> After some digging I found this error in the task managers log:
> {quote}
> 17:13:42,677 WARN  org.apache.hadoop.hbase.ipc.RpcClient  
>- Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> 

[jira] [Created] (FLINK-2991) Extend Window Operators to Allow Efficient Fold Operation

2015-11-10 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-2991:
---

 Summary: Extend Window Operators to Allow Efficient Fold Operation
 Key: FLINK-2991
 URL: https://issues.apache.org/jira/browse/FLINK-2991
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek


Right now, a window fold is implemented as a WindowFunction that gets all the 
elements as input. No pre-aggregation is performed. The window operator should 
be extended to also allow the fold to also be pre-aggregated.

This requires changing the signature of the {{WindowBuffer}} so that it can 
emit a type other than the input type. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44386043
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 ---
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
--- End diff --

I think we should keep this test and port it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44386994
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
 ---
@@ -18,91 +18,34 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationWithAllReducerITCase extends RecordAPITestBase {
-
-   private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" + "1\n";
+import java.util.List;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class IterationWithAllReducerITCase extends JavaProgramTestBase {
private static final String EXPECTED = "1\n";
 
-   protected String dataPath;
-   protected String resultPath;
-
-   public IterationWithAllReducerITCase(){
-   setTaskManagerNumSlots(4);
-   }
-
@Override
-   protected void preSubmit() throws Exception {
-   dataPath = createTempFile("datapoints.txt", INPUT);
-   resultPath = getTempFilePath("result");
-   }
-   
-   @Override
-   protected void postSubmit() throws Exception {
-   compareResultsByLinesInMemory(EXPECTED, resultPath);
-   }
-
-   @Override
-   protected Plan getTestJob() {
-   Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
-   return plan;
-   }
+   protected void testProgram() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(4);
 
-   
-   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
+   DataSet initialInput = env.fromElements("1", "1", "1", 
"1", "1", "1", "1", "1");
 
-   FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-   
-   BulkIteration iteration = new BulkIteration("Loop");
-   iteration.setInput(initialInput);
-   iteration.setMaximumNumberOfIterations(5);
-   
-   Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
+   IterativeDataSet iteration = 
initialInput.iterate(5).name("Loop");
 
-   ReduceOperator sumReduce = ReduceOperator.builder(new 
PickOneReducer())
-   .input(iteration.getPartialSolution())
-   .name("Compute sum (Reduce)")
-   .build();
-   
-   iteration.setNextPartialSolution(sumReduce);
+   DataSet sumReduce = iteration.reduce(new 
ReduceFunction(){
--- End diff --

A `GroupReduceFunction` would be closer to the original test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2977) Cannot access HBase in a Kerberos secured Yarn cluster

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998382#comment-14998382
 ] 

ASF GitHub Bot commented on FLINK-2977:
---

Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/1342#discussion_r44391146
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -135,7 +142,40 @@ public static void setTokensFor(ContainerLaunchContext 
amContainer, Path[] paths
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
amContainer.setTokens(securityTokens);
}
-   
+
+   /**
+* Obtain Kerberos security token for HBase.
+*/
+   private static void obtainTokenForHBase(Credentials credentials, 
Configuration conf) {
+   if (UserGroupInformation.isSecurityEnabled()) {
+   LOG.info("Attempting to obtain Kerberos security token 
for HBase");
+   try {
+   HBaseConfiguration.addHbaseResources(conf);
+   if 
(!"kerberos".equals(conf.get("hbase.security.authentication"))) {
+   LOG.info("HBase has not been configured 
to use Kerberos.");
+   return;
+   }
+
+   LOG.info("Connecting to HBase");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
+
+   LOG.info("Obtaining Kerberos security token for 
HBase");
+   Token token = 
TokenUtil.obtainToken(connection);
+
+   if (token == null) {
+   LOG.error("No Kerberos security token 
for HBase available");
+   return;
+   }
+
+   credentials.addToken(token.getService(), token);
+   LOG.info("Added HBase Kerberos security token 
to credentials.");
+   } catch (IOException e) {
+   LOG.error("Caught exception while trying to 
obtain HBase Kerberos security token.");
+   e.printStackTrace();
--- End diff --

Yes I considered that. I figured that continuing would be 'better'. But 
simply rethrowing would make more sense.
I'll update the patch.


> Cannot access HBase in a Kerberos secured Yarn cluster
> --
>
> Key: FLINK-2977
> URL: https://issues.apache.org/jira/browse/FLINK-2977
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Attachments: FLINK-2977-20151005-untested.patch, 
> FLINK-2977-20151009.patch
>
>
> I have created a very simple Flink topology consisting of a streaming Source 
> (the outputs the timestamp a few times per second) and a Sink (that puts that 
> timestamp into a single record in HBase).
> Running this on a non-secure Yarn cluster works fine.
> To run it on a secured Yarn cluster my main routine now looks like this:
> {code}
> public static void main(String[] args) throws Exception {
> System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
> UserGroupInformation.loginUserFromKeytab("nbas...@xx.net", 
> "/home/nbasjes/.krb/nbasjes.keytab");
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream stream = env.addSource(new TimerTicksSource());
> stream.addSink(new SetHBaseRowSink());
> env.execute("Long running Flink application");
> }
> {code}
> When I run this 
>  flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 4096 
> ./kerberos-1.0-SNAPSHOT.jar
> I see after the startup messages:
> {quote}
> 17:13:24,466 INFO  org.apache.hadoop.security.UserGroupInformation
>- Login successful for user nbas...@xx.net using keytab file 
> /home/nbasjes/.krb/nbasjes.keytab
> 11/03/2015 17:13:25   Job execution switched to status RUNNING.
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to SCHEDULED 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to DEPLOYING 
> 11/03/2015 17:13:25   Custom Source -> Stream Sink(1/1) switched to RUNNING 
> {quote}
> Which looks good.
> However ... no data goes into HBase.
> After some digging I found this error in the task managers log:
> {quote}
> 17:13:42,677 WARN  org.apache.hadoop.hbase.ipc.RpcClient  
>- Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused 

[GitHub] flink pull request: [FLINK-2977] Added support for accessing a Ker...

2015-11-10 Thread nielsbasjes
Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/1342#discussion_r44391146
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -135,7 +142,40 @@ public static void setTokensFor(ContainerLaunchContext 
amContainer, Path[] paths
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, 
dob.getLength());
amContainer.setTokens(securityTokens);
}
-   
+
+   /**
+* Obtain Kerberos security token for HBase.
+*/
+   private static void obtainTokenForHBase(Credentials credentials, 
Configuration conf) {
+   if (UserGroupInformation.isSecurityEnabled()) {
+   LOG.info("Attempting to obtain Kerberos security token 
for HBase");
+   try {
+   HBaseConfiguration.addHbaseResources(conf);
+   if 
(!"kerberos".equals(conf.get("hbase.security.authentication"))) {
+   LOG.info("HBase has not been configured 
to use Kerberos.");
+   return;
+   }
+
+   LOG.info("Connecting to HBase");
+   Connection connection = 
ConnectionFactory.createConnection(conf);
+
+   LOG.info("Obtaining Kerberos security token for 
HBase");
+   Token token = 
TokenUtil.obtainToken(connection);
+
+   if (token == null) {
+   LOG.error("No Kerberos security token 
for HBase available");
+   return;
+   }
+
+   credentials.addToken(token.getService(), token);
+   LOG.info("Added HBase Kerberos security token 
to credentials.");
+   } catch (IOException e) {
+   LOG.error("Caught exception while trying to 
obtain HBase Kerberos security token.");
+   e.printStackTrace();
--- End diff --

Yes I considered that. I figured that continuing would be 'better'. But 
simply rethrowing would make more sense.
I'll update the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2942) Dangling operators in web UI's program visualization (non-deterministic)

2015-11-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998495#comment-14998495
 ] 

Fabian Hueske commented on FLINK-2942:
--

If I remember correctly, mirror nodes were used to connect nodes within an 
iteration with nodes that are out-side of an iteration. In the old dashboard, 
an iteration was drawn as a regular node with another graph inside. The mirror 
nodes were used to illustrate the connection between the inside and the outside 
graph.
I searched the source code to verify this, but couldn't find the mirror nodes. 
Can you point me to the source file?

The program I posted does not include an iteration, so it should not be 
affected by the mirror nodes.

> Dangling operators in web UI's program visualization (non-deterministic)
> 
>
> Key: FLINK-2942
> URL: https://issues.apache.org/jira/browse/FLINK-2942
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 0.10
> Environment: OSX, Firefox and Chrome
>Reporter: Fabian Hueske
>Priority: Critical
> Fix For: 0.10, 1.0
>
> Attachments: Screen Shot 2015-10-29 at 17.11.19.png, Screen Shot 
> 2015-10-29 at 20.51.46.png, Screen Shot 2015-10-29 at 20.52.13.png, Screen 
> Shot 2015-11-09 at 14.48.03.png
>
>
> When visualizing a program with three {{MapPartition}} operators that branch 
> off from an {{OuterJoin}} operator, two of the three {{MapPartition}} 
> operators are not connected to the {{OuterJoin}} operator and appear to have 
> no input.
> The problem is present in FireFox as well as in Chrome. I'll attach a 
> screenshot.
> The problem and be reproduced by executing the "Cascading for the impatient" 
> [TFIDF example 
> program|https://github.com/Cascading/Impatient/tree/master/part5] using the 
> [Cascading Flink Connector|https://github.com/dataArtisans/cascading-flink].
> Update: It appears that the problem is non-deterministic. I ran the same job 
> again (same setup) and the previously missing connections were visualized. 
> However, the UI showed only one input for a binary operator (OuterJoin). 
> Running the job a third time resulted in a graph layout which was again 
> different from both runs before. However, two of the {{MapPartition}} 
> operators had not inputs just as in the first run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2955] Add operators description in Tabl...

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1318#discussion_r44402293
  
--- Diff: docs/libs/table.md ---
@@ -123,13 +123,205 @@ DataSet result = tableEnv.toDataSet(filtered, 
WC.class);
 When using Java, the embedded DSL for specifying expressions cannot be 
used. Only String expressions
 are supported. They support exactly the same feature set as the expression 
DSL.
 
-## Expression Syntax
+## Table API Operators
+Table API provide a domain-spcific language to execute language-integrated 
query on structured data in Scala and Java.
+This section gives a brief overview of all available operators. You can 
find more details of operators in the 
[Javadoc](http://flink.apache.org/docs/latest/api/java/org/apache/flink/api/table/Table.html).
--- End diff --

If you use, "{{ site.baseurl 
}}/api/java/org/apache/flink/api/table/Table.html", the correct version will be 
injected when the markdown file is compiled to html.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2955) Add operations introduction in Table API page.

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998506#comment-14998506
 ] 

ASF GitHub Bot commented on FLINK-2955:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1318#discussion_r44402293
  
--- Diff: docs/libs/table.md ---
@@ -123,13 +123,205 @@ DataSet result = tableEnv.toDataSet(filtered, 
WC.class);
 When using Java, the embedded DSL for specifying expressions cannot be 
used. Only String expressions
 are supported. They support exactly the same feature set as the expression 
DSL.
 
-## Expression Syntax
+## Table API Operators
+Table API provide a domain-spcific language to execute language-integrated 
query on structured data in Scala and Java.
+This section gives a brief overview of all available operators. You can 
find more details of operators in the 
[Javadoc](http://flink.apache.org/docs/latest/api/java/org/apache/flink/api/table/Table.html).
--- End diff --

If you use, "{{ site.baseurl 
}}/api/java/org/apache/flink/api/table/Table.html", the correct version will be 
injected when the markdown file is compiled to html.


> Add operations introduction in Table API page.
> --
>
> Key: FLINK-2955
> URL: https://issues.apache.org/jira/browse/FLINK-2955
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> On the Table API page, there is no formal introduction of current supported 
> operations, it should be nice to have it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2993) Set default elayBetweenExecutionRetries to 0

2015-11-10 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2993:
---

 Summary: Set default elayBetweenExecutionRetries to 0
 Key: FLINK-2993
 URL: https://issues.apache.org/jira/browse/FLINK-2993
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 0.10
Reporter: Stephan Ewen
 Fix For: 1.0


The default value is too high and gives a strange user experience.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998413#comment-14998413
 ] 

ASF GitHub Bot commented on FLINK-2797:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155391848
  
>Do you know of any change I can make to travis conf so all modules are 
built but only Yarn tests get executed? 

Yes change the build instructions in .travis.yml to run `mvn clean install 
-DskipTests` followed by a `cd flink-yarn-tests && mvn verify`. Locally, you 
can find the logs in the target directory of flink-yarn-tests. There is also a 
way to upload these log files with an AWS account.

I think the different semantics for YARN and standalone are fine. The `-d` 
flag always starts the job detached. That's independent of YARN or standalone 
mode.


> CLI: Missing option to submit jobs in detached mode
> ---
>
> Key: FLINK-2797
> URL: https://issues.apache.org/jira/browse/FLINK-2797
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> Jobs can only be submitted in detached mode using YARN but not on a 
> standalone installation. This has been requested by users who want to submit 
> a job, get the job id, and later query its status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2992) New Windowing code is using SerializationUtils with wrong classloader

2015-11-10 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-2992:
-

 Summary: New Windowing code is using SerializationUtils with wrong 
classloader
 Key: FLINK-2992
 URL: https://issues.apache.org/jira/browse/FLINK-2992
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Robert Metzger
Priority: Critical


During release testing, I found the following issue

{code}
robert@hn0-flink0:~/flink010-26-211/flink-0.10.0$ ./bin/flink run 
../../scratch/target/flink0.10-scala2.11-1.0-SNAPSHOT.jar --input 
hdfs:///user/robert/file.txt --out hdfs:///user/robert/result
Found YARN properties file /tmp/.yarn-properties-robert
Using JobManager address from YARN properties 10.0.0.5/10.0.0.5:59812
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.commons.lang.SerializationException: 
java.lang.ClassNotFoundException: com.dataartisans.Job$$anon$3$$anon$2
at 
org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
at 
org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
at 
org.apache.commons.lang.SerializationUtils.clone(SerializationUtils.java:81)
at 
org.apache.flink.streaming.api.datastream.WindowedStream.reduce(WindowedStream.java:172)
at 
org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:352)
at 
org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:332)
at 
org.apache.flink.streaming.api.scala.WindowedStream.sum(WindowedStream.scala:300)
at com.dataartisans.Job$.main(Job.scala:59)
at com.dataartisans.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
... 6 more
Caused by: java.lang.ClassNotFoundException: 
com.dataartisans.Job$$anon$3$$anon$2
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:163)
... 19 more

The exception above occurred while trying to run your command.

{code}

The problem is that we are using the 

[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...

2015-11-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-155391848
  
>Do you know of any change I can make to travis conf so all modules are 
built but only Yarn tests get executed? 

Yes change the build instructions in .travis.yml to run `mvn clean install 
-DskipTests` followed by a `cd flink-yarn-tests && mvn verify`. Locally, you 
can find the logs in the target directory of flink-yarn-tests. There is also a 
way to upload these log files with an AWS account.

I think the different semantics for YARN and standalone are fine. The `-d` 
flag always starts the job detached. That's independent of YARN or standalone 
mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1306#issuecomment-155393090
  
Hi @zentol, thanks again for the huge effort to port all these tests!

Looks mostly good, except for some minor things and three tests which 
should be ported to preserve test coverage, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2901) Several flink-test ITCases depend on Record API features

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998422#comment-14998422
 ] 

ASF GitHub Bot commented on FLINK-2901:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1306#issuecomment-155393090
  
Hi @zentol, thanks again for the huge effort to port all these tests!

Looks mostly good, except for some minor things and three tests which 
should be ported to preserve test coverage, IMO.


> Several flink-test ITCases depend on Record API features
> 
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend 
> on the Record API including:
> - ITCases for Record API operators in 
> {{flink-tests/src/test/java/org/apache/flink/test/operators}}
> - ITCases for Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobTests}}
> - Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobs}}
> - Several ITCases for iterations in 
> {{flink-tests/src/test/java/org/apache/flink/test/iterative}}
> - Tests for job canceling in 
> {{flink-tests/src/test/java/org/apache/flink/test/cancelling}}
> - Test for failing jobs in 
> {{flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase}}
> - Optimizer tests in 
> {{flink-tests/src/test/java/org/apache/flink/test/optimizer}}
> - Accumulator test in 
> {{flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase}}
> - Broadcast test in 
> {{flink-tests/src/test/java/org/apache/flink/test/broadcastvasr/BroadcastBranchingITCase}}
> - distributed cache test in 
> {{flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest}}
> and probably a few more.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2901) Several flink-test ITCases depend on Record API features

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998400#comment-14998400
 ] 

ASF GitHub Bot commented on FLINK-2901:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44392775
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 ---
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
-   private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + 
"5\n";
-   private static final String EXPECTED = "22\n";
-
-   protected String dataPath;
-   protected String resultPath;
-
-   public IterationTerminationWithTwoTails(){
-   setTaskManagerNumSlots(parallelism);
-   }
-
-   @Override
-   protected void preSubmit() throws Exception {
-   dataPath = createTempFile("datapoints.txt", INPUT);
-   resultPath = getTempFilePath("result");
-   }
-   
-   @Override
-   protected void postSubmit() throws Exception {
-   compareResultsByLinesInMemory(EXPECTED, resultPath);
-   }
-
-   @Override
-   protected Plan getTestJob() {
-   return getTestPlanPlan(parallelism, dataPath, resultPath);
-   }
-   
-   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
-
-   FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-   
-   BulkIteration iteration = new BulkIteration("Loop");
-   iteration.setInput(initialInput);
-   iteration.setMaximumNumberOfIterations(5);
-   Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
-   ReduceOperator sumReduce = ReduceOperator.builder(new 
SumReducer())
-   .input(iteration.getPartialSolution())
-   .name("Compute sum (Reduce)")
-   .build();
-   
-   iteration.setNextPartialSolution(sumReduce);
-   
-   MapOperator terminationMapper = MapOperator.builder(new 
TerminationMapper())
-   .input(iteration.getPartialSolution())
--- End diff --

so THAT was the difference, will port it!


> Several flink-test ITCases depend on Record API features
> 
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend 
> 

[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44392775
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 ---
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
-   private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + 
"5\n";
-   private static final String EXPECTED = "22\n";
-
-   protected String dataPath;
-   protected String resultPath;
-
-   public IterationTerminationWithTwoTails(){
-   setTaskManagerNumSlots(parallelism);
-   }
-
-   @Override
-   protected void preSubmit() throws Exception {
-   dataPath = createTempFile("datapoints.txt", INPUT);
-   resultPath = getTempFilePath("result");
-   }
-   
-   @Override
-   protected void postSubmit() throws Exception {
-   compareResultsByLinesInMemory(EXPECTED, resultPath);
-   }
-
-   @Override
-   protected Plan getTestJob() {
-   return getTestPlanPlan(parallelism, dataPath, resultPath);
-   }
-   
-   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
-
-   FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-   
-   BulkIteration iteration = new BulkIteration("Loop");
-   iteration.setInput(initialInput);
-   iteration.setMaximumNumberOfIterations(5);
-   Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
-   ReduceOperator sumReduce = ReduceOperator.builder(new 
SumReducer())
-   .input(iteration.getPartialSolution())
-   .name("Compute sum (Reduce)")
-   .build();
-   
-   iteration.setNextPartialSolution(sumReduce);
-   
-   MapOperator terminationMapper = MapOperator.builder(new 
TerminationMapper())
-   .input(iteration.getPartialSolution())
--- End diff --

so THAT was the difference, will port it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2901) Several flink-test ITCases depend on Record API features

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998401#comment-14998401
 ] 

ASF GitHub Bot commented on FLINK-2901:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44393132
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
 ---
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
--- End diff --

does the KMeansForTest qualify for this?


> Several flink-test ITCases depend on Record API features
> 
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend 
> on the Record API including:
> - ITCases for Record API operators in 
> {{flink-tests/src/test/java/org/apache/flink/test/operators}}
> - ITCases for Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobTests}}
> - Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobs}}
> - Several ITCases for iterations in 
> {{flink-tests/src/test/java/org/apache/flink/test/iterative}}
> - Tests for job canceling in 
> {{flink-tests/src/test/java/org/apache/flink/test/cancelling}}
> - Test for failing jobs in 
> {{flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase}}
> - Optimizer tests in 
> {{flink-tests/src/test/java/org/apache/flink/test/optimizer}}
> - Accumulator test in 
> {{flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase}}
> - Broadcast test in 
> {{flink-tests/src/test/java/org/apache/flink/test/broadcastvasr/BroadcastBranchingITCase}}
> - distributed cache test in 
> {{flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest}}
> and probably a few more.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44393132
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
 ---
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
--- End diff --

does the KMeansForTest qualify for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Remove and forbid use of SerializationUtils. F...

2015-11-10 Thread mbalassi
Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1343#discussion_r44397599
  
--- Diff: tools/maven/checkstyle.xml ---
@@ -54,11 +54,18 @@ under the License.



+   





+   
+   
+   
+   
+   
+   

--- End diff --

Big :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2992) New Windowing code is using SerializationUtils with wrong classloader

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998429#comment-14998429
 ] 

ASF GitHub Bot commented on FLINK-2992:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/1343

Remove and forbid use of SerializationUtils. Fix FLINK-2992

The SerializationUtils are usually not using the right classloader, and 
they have some security issues.
I'm using our checkstyle rules to forbid the use of them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink cancel_hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1343


commit b6f00f96d4b38ee69e081ced189f5817c3f57fd4
Author: Robert Metzger 
Date:   2015-11-10T10:32:20Z

[hotfix] Check for null in StreamSource.cancel()

commit 7b5650bccd8959dbf1547742f610793aed7aeebe
Author: Robert Metzger 
Date:   2015-11-10T11:29:17Z

[FLINK-2992] Remove use of SerializationUtils




> New Windowing code is using SerializationUtils with wrong classloader
> -
>
> Key: FLINK-2992
> URL: https://issues.apache.org/jira/browse/FLINK-2992
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>
> During release testing, I found the following issue
> {code}
> robert@hn0-flink0:~/flink010-26-211/flink-0.10.0$ ./bin/flink run 
> ../../scratch/target/flink0.10-scala2.11-1.0-SNAPSHOT.jar --input 
> hdfs:///user/robert/file.txt --out hdfs:///user/robert/result
> Found YARN properties file /tmp/.yarn-properties-robert
> Using JobManager address from YARN properties 10.0.0.5/10.0.0.5:59812
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
> Caused by: org.apache.commons.lang.SerializationException: 
> java.lang.ClassNotFoundException: com.dataartisans.Job$$anon$3$$anon$2
>   at 
> org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
>   at 
> org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
>   at 
> org.apache.commons.lang.SerializationUtils.clone(SerializationUtils.java:81)
>   at 
> org.apache.flink.streaming.api.datastream.WindowedStream.reduce(WindowedStream.java:172)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:352)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:332)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.sum(WindowedStream.scala:300)
>   at com.dataartisans.Job$.main(Job.scala:59)
>   at com.dataartisans.Job.main(Job.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>   ... 6 more
> Caused by: java.lang.ClassNotFoundException: 
> com.dataartisans.Job$$anon$3$$anon$2
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:278)
>   at 

[GitHub] flink pull request: Remove and forbid use of SerializationUtils. F...

2015-11-10 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/1343

Remove and forbid use of SerializationUtils. Fix FLINK-2992

The SerializationUtils are usually not using the right classloader, and 
they have some security issues.
I'm using our checkstyle rules to forbid the use of them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink cancel_hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1343


commit b6f00f96d4b38ee69e081ced189f5817c3f57fd4
Author: Robert Metzger 
Date:   2015-11-10T10:32:20Z

[hotfix] Check for null in StreamSource.cancel()

commit 7b5650bccd8959dbf1547742f610793aed7aeebe
Author: Robert Metzger 
Date:   2015-11-10T11:29:17Z

[FLINK-2992] Remove use of SerializationUtils




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2992) New Windowing code is using SerializationUtils with wrong classloader

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998456#comment-14998456
 ] 

ASF GitHub Bot commented on FLINK-2992:
---

Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1343#discussion_r44397599
  
--- Diff: tools/maven/checkstyle.xml ---
@@ -54,11 +54,18 @@ under the License.



+   





+   
+   
+   
+   
+   
+   

--- End diff --

Big :+1: 


> New Windowing code is using SerializationUtils with wrong classloader
> -
>
> Key: FLINK-2992
> URL: https://issues.apache.org/jira/browse/FLINK-2992
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>
> During release testing, I found the following issue
> {code}
> robert@hn0-flink0:~/flink010-26-211/flink-0.10.0$ ./bin/flink run 
> ../../scratch/target/flink0.10-scala2.11-1.0-SNAPSHOT.jar --input 
> hdfs:///user/robert/file.txt --out hdfs:///user/robert/result
> Found YARN properties file /tmp/.yarn-properties-robert
> Using JobManager address from YARN properties 10.0.0.5/10.0.0.5:59812
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
> Caused by: org.apache.commons.lang.SerializationException: 
> java.lang.ClassNotFoundException: com.dataartisans.Job$$anon$3$$anon$2
>   at 
> org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
>   at 
> org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
>   at 
> org.apache.commons.lang.SerializationUtils.clone(SerializationUtils.java:81)
>   at 
> org.apache.flink.streaming.api.datastream.WindowedStream.reduce(WindowedStream.java:172)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:352)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:332)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.sum(WindowedStream.scala:300)
>   at com.dataartisans.Job$.main(Job.scala:59)
>   at com.dataartisans.Job.main(Job.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>   ... 6 more
> Caused by: java.lang.ClassNotFoundException: 
> com.dataartisans.Job$$anon$3$$anon$2
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:278)
>   at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>   at 

[jira] [Created] (FLINK-2998) Support range partition comparison for multi input nodes.

2015-11-10 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2998:


 Summary: Support range partition comparison for multi input nodes.
 Key: FLINK-2998
 URL: https://issues.apache.org/jira/browse/FLINK-2998
 Project: Flink
  Issue Type: New Feature
  Components: Optimizer
Reporter: Chengxiang Li
Priority: Minor


The optimizer may have potential opportunity to optimize the DAG while it found 
two input range partition are equivalent, we does not support the comparison 
yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: fix pos assignment of instance

2015-11-10 Thread fangmu
GitHub user fangmu opened a pull request:

https://github.com/apache/flink/pull/1345

fix pos assignment of instance



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fangmu/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1345


commit d1c56cdc2eb447be53498976eba9f86062987fc5
Author: qingmeng.wyh 
Date:   2015-11-11T07:40:19Z

fix pos assignment of instance




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2997) Support range partition with user customized data distribution.

2015-11-10 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2997:


 Summary: Support range partition with user customized data 
distribution.
 Key: FLINK-2997
 URL: https://issues.apache.org/jira/browse/FLINK-2997
 Project: Flink
  Issue Type: New Feature
Reporter: Chengxiang Li


This is a followup work of FLINK-7, sometime user have better knowledge of the 
source data, and they can build customized data distribution to do range 
partition more efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2963) Dependence on SerializationUtils#deserialize() should be avoided

2015-11-10 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998662#comment-14998662
 ] 

Chesnay Schepler commented on FLINK-2963:
-

resolved with FLINK-2992?

> Dependence on SerializationUtils#deserialize() should be avoided
> 
>
> Key: FLINK-2963
> URL: https://issues.apache.org/jira/browse/FLINK-2963
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> There is a problem with `SerializationUtils` from Apache Commons
> Lang. Here is an open issue where the class will throw a
> `ClassNotFoundException` even if the class is in the classpath in a
> multiple-classloader environment:
> https://issues.apache.org/jira/browse/LANG-1049
> {code}
>   state = (HashMap) 
> SerializationUtils.deserialize(bais);
> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
>   state = (HashMap) 
> SerializationUtils.deserialize(bais);
> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
>   return SerializationUtils.deserialize(message);
> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
>   T copied = SerializationUtils.deserialize(SerializationUtils
> ./flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
> {code}
> We should move away from SerializationUtils.deserialize()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998676#comment-14998676
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-155437914
  
Hi @rmetzger, do you want to give it another look and check if it does what 
you intended with FLINK-2017?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-11-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-155437914
  
Hi @rmetzger, do you want to give it another look and check if it does what 
you intended with FLINK-2017?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2015-11-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998682#comment-14998682
 ] 

Fabian Hueske commented on FLINK-2662:
--

Hi guys, did you make progress on this issue? 
Is it still reproducable?

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10
>Reporter: Gabor Gevay
> Fix For: 0.10
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-7] [Runtime] Enable Range Partitioner.

2015-11-10 Thread ChengXiangLi
Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1255#issuecomment-155403470
  
Thanks, @fhueske , I've updated the PR and verified in test environment 
that the deadlock issue has fixed on latest code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7) [GitHub] Enable Range Partitioner

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998465#comment-14998465
 ] 

ASF GitHub Bot commented on FLINK-7:


Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1255#issuecomment-155403470
  
Thanks, @fhueske , I've updated the PR and verified in test environment 
that the deadlock issue has fixed on latest code.


> [GitHub] Enable Range Partitioner
> -
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Runtime
>Reporter: GitHub Import
>Assignee: Chengxiang Li
> Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the 
> following aspects:
> 1) Distribution information, if available, must be propagated back together 
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
>  - TeraSortITCase
>  - GlobalSortingITCase
>  - GlobalSortingMixedOrderITCase
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer, 
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2989) Job Cancel button doesn't work on Yarn

2015-11-10 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998474#comment-14998474
 ] 

Maximilian Michels commented on FLINK-2989:
---

Should we allow both, GET and DELETE? Via DELETE we would support Rest-style 
API calls if desired. For the cancel button we would use the GET interface to 
support YARN properly.

> Job Cancel button doesn't work on Yarn
> --
>
> Key: FLINK-2989
> URL: https://issues.apache.org/jira/browse/FLINK-2989
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sachin Goel
>
> The newly added Cancel button doesn't work on Yarn, when accessing via Yarn 
> Web Application proxy. It works fine when we're directly on the web monitor.
> The reason is Yarn doesn't allow DELETE requests to the AM yet. It should be 
> enabled in 2.8.0. [YARN-2031, YARN-2084.]
> A workaround for now can be to use a {{GET}} method instead of {{DELETE}}, 
> but that breaks the conventions of REST.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2989) Job Cancel button doesn't work on Yarn

2015-11-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998477#comment-14998477
 ] 

Fabian Hueske commented on FLINK-2989:
--

Sounds like a good idea to me

> Job Cancel button doesn't work on Yarn
> --
>
> Key: FLINK-2989
> URL: https://issues.apache.org/jira/browse/FLINK-2989
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sachin Goel
>
> The newly added Cancel button doesn't work on Yarn, when accessing via Yarn 
> Web Application proxy. It works fine when we're directly on the web monitor.
> The reason is Yarn doesn't allow DELETE requests to the AM yet. It should be 
> enabled in 2.8.0. [YARN-2031, YARN-2084.]
> A workaround for now can be to use a {{GET}} method instead of {{DELETE}}, 
> but that breaks the conventions of REST.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2989) Job Cancel button doesn't work on Yarn

2015-11-10 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998485#comment-14998485
 ] 

Sachin Goel commented on FLINK-2989:


Good enough. Do we need this in the release?

> Job Cancel button doesn't work on Yarn
> --
>
> Key: FLINK-2989
> URL: https://issues.apache.org/jira/browse/FLINK-2989
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sachin Goel
>
> The newly added Cancel button doesn't work on Yarn, when accessing via Yarn 
> Web Application proxy. It works fine when we're directly on the web monitor.
> The reason is Yarn doesn't allow DELETE requests to the AM yet. It should be 
> enabled in 2.8.0. [YARN-2031, YARN-2084.]
> A workaround for now can be to use a {{GET}} method instead of {{DELETE}}, 
> but that breaks the conventions of REST.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2942) Dangling operators in web UI's program visualization (non-deterministic)

2015-11-10 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998497#comment-14998497
 ] 

Stephan Ewen commented on FLINK-2942:
-

Exactly, the "inside iteration" parts were in a nested DIV and no edges could 
be drawn across the DIV boundary, hence the mirror nodes.
Now, we have the flattened representation that needs no mirror nodes any more...

> Dangling operators in web UI's program visualization (non-deterministic)
> 
>
> Key: FLINK-2942
> URL: https://issues.apache.org/jira/browse/FLINK-2942
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 0.10
> Environment: OSX, Firefox and Chrome
>Reporter: Fabian Hueske
>Priority: Critical
> Fix For: 0.10, 1.0
>
> Attachments: Screen Shot 2015-10-29 at 17.11.19.png, Screen Shot 
> 2015-10-29 at 20.51.46.png, Screen Shot 2015-10-29 at 20.52.13.png, Screen 
> Shot 2015-11-09 at 14.48.03.png
>
>
> When visualizing a program with three {{MapPartition}} operators that branch 
> off from an {{OuterJoin}} operator, two of the three {{MapPartition}} 
> operators are not connected to the {{OuterJoin}} operator and appear to have 
> no input.
> The problem is present in FireFox as well as in Chrome. I'll attach a 
> screenshot.
> The problem and be reproduced by executing the "Cascading for the impatient" 
> [TFIDF example 
> program|https://github.com/Cascading/Impatient/tree/master/part5] using the 
> [Cascading Flink Connector|https://github.com/dataArtisans/cascading-flink].
> Update: It appears that the problem is non-deterministic. I ran the same job 
> again (same setup) and the previously missing connections were visualized. 
> However, the UI showed only one input for a binary operator (OuterJoin). 
> Running the job a third time resulted in a graph layout which was again 
> different from both runs before. However, two of the {{MapPartition}} 
> operators had not inputs just as in the first run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2992) New Windowing code is using SerializationUtils with wrong classloader

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998508#comment-14998508
 ] 

ASF GitHub Bot commented on FLINK-2992:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1343#issuecomment-155412487
  
I tested the change on a cluster and it's working.


> New Windowing code is using SerializationUtils with wrong classloader
> -
>
> Key: FLINK-2992
> URL: https://issues.apache.org/jira/browse/FLINK-2992
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>
> During release testing, I found the following issue
> {code}
> robert@hn0-flink0:~/flink010-26-211/flink-0.10.0$ ./bin/flink run 
> ../../scratch/target/flink0.10-scala2.11-1.0-SNAPSHOT.jar --input 
> hdfs:///user/robert/file.txt --out hdfs:///user/robert/result
> Found YARN properties file /tmp/.yarn-properties-robert
> Using JobManager address from YARN properties 10.0.0.5/10.0.0.5:59812
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
> Caused by: org.apache.commons.lang.SerializationException: 
> java.lang.ClassNotFoundException: com.dataartisans.Job$$anon$3$$anon$2
>   at 
> org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
>   at 
> org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
>   at 
> org.apache.commons.lang.SerializationUtils.clone(SerializationUtils.java:81)
>   at 
> org.apache.flink.streaming.api.datastream.WindowedStream.reduce(WindowedStream.java:172)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:352)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.aggregate(WindowedStream.scala:332)
>   at 
> org.apache.flink.streaming.api.scala.WindowedStream.sum(WindowedStream.scala:300)
>   at com.dataartisans.Job$.main(Job.scala:59)
>   at com.dataartisans.Job.main(Job.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>   ... 6 more
> Caused by: java.lang.ClassNotFoundException: 
> com.dataartisans.Job$$anon$3$$anon$2
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:278)
>   at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>   at 
> 

[GitHub] flink pull request: Remove and forbid use of SerializationUtils. F...

2015-11-10 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1343#issuecomment-155412487
  
I tested the change on a cluster and it's working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2989) Job Cancel button doesn't work on Yarn

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998616#comment-14998616
 ] 

ASF GitHub Bot commented on FLINK-2989:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1344

[FLINK-2989] job cancel button doesn't work on YARN

In addition to the REST-compliant "DELETE /jobs/", allows
cancellation also via a special GET request of the form
"GET /jobs//yarn-cancel".

That enables us to cancel jobs from the web frontend on YARN while
keeping a REST-compliant DELETE alternative.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink yarn-cancel

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1344


commit 313aee9be2bbc18ce82469de5e75606083761016
Author: Maximilian Michels 
Date:   2015-11-10T12:35:58Z

[FLINK-2989] job cancel button doesn't work on YARN

In addition to the REST-compliant "DELETE /jobs/", allows
cancellation also via a special GET request of the form
"GET /jobs//yarn-cancel".

That enables us to cancel jobs from the web frontend on YARN while
keeping a REST-compliant DELETE alternative.




> Job Cancel button doesn't work on Yarn
> --
>
> Key: FLINK-2989
> URL: https://issues.apache.org/jira/browse/FLINK-2989
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Sachin Goel
>
> The newly added Cancel button doesn't work on Yarn, when accessing via Yarn 
> Web Application proxy. It works fine when we're directly on the web monitor.
> The reason is Yarn doesn't allow DELETE requests to the AM yet. It should be 
> enabled in 2.8.0. [YARN-2031, YARN-2084.]
> A workaround for now can be to use a {{GET}} method instead of {{DELETE}}, 
> but that breaks the conventions of REST.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2989] job cancel button doesn't work on...

2015-11-10 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1344

[FLINK-2989] job cancel button doesn't work on YARN

In addition to the REST-compliant "DELETE /jobs/", allows
cancellation also via a special GET request of the form
"GET /jobs//yarn-cancel".

That enables us to cancel jobs from the web frontend on YARN while
keeping a REST-compliant DELETE alternative.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink yarn-cancel

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1344


commit 313aee9be2bbc18ce82469de5e75606083761016
Author: Maximilian Michels 
Date:   2015-11-10T12:35:58Z

[FLINK-2989] job cancel button doesn't work on YARN

In addition to the REST-compliant "DELETE /jobs/", allows
cancellation also via a special GET request of the form
"GET /jobs//yarn-cancel".

That enables us to cancel jobs from the web frontend on YARN while
keeping a REST-compliant DELETE alternative.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-7] [Runtime] Enable Range Partitioner.

2015-11-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1255#discussion_r44408781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
 ---
@@ -231,4 +268,44 @@ private int murmurHash(int k) {
throw new RuntimeException("Error while calling custom 
partitioner.", t);
}
}
+
+   private final int compareRecordAndBoundary(T record, Object[] boundary) 
{
+   TypeComparator[] flatComparators = 
this.comparator.getFlatComparators();
--- End diff --

can we cache the `flatComparators` and `keys` to avoid many object 
instantiations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7) [GitHub] Enable Range Partitioner

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998618#comment-14998618
 ] 

ASF GitHub Bot commented on FLINK-7:


Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1255#discussion_r44408781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
 ---
@@ -231,4 +268,44 @@ private int murmurHash(int k) {
throw new RuntimeException("Error while calling custom 
partitioner.", t);
}
}
+
+   private final int compareRecordAndBoundary(T record, Object[] boundary) 
{
+   TypeComparator[] flatComparators = 
this.comparator.getFlatComparators();
--- End diff --

can we cache the `flatComparators` and `keys` to avoid many object 
instantiations?


> [GitHub] Enable Range Partitioner
> -
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Runtime
>Reporter: GitHub Import
>Assignee: Chengxiang Li
> Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the 
> following aspects:
> 1) Distribution information, if available, must be propagated back together 
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
>  - TeraSortITCase
>  - GlobalSortingITCase
>  - GlobalSortingMixedOrderITCase
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer, 
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-7] [Runtime] Enable Range Partitioner.

2015-11-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1255#issuecomment-155431001
  
Thanks for the fast update @ChengXiangLi! I added a few comments, mostly 
regarding the shipping strategies, data exchange modes, and object 
instantiations. I think, we're almost there. After the next update, I'll play 
around with range partitioning on a cluster and see how it performs. We might 
want to add a few more tests, esp. to ensure the plan rewriting performs well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2966][web-dashboard] Improve the way jo...

2015-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1327


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7) [GitHub] Enable Range Partitioner

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998630#comment-14998630
 ] 

ASF GitHub Bot commented on FLINK-7:


Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1255#issuecomment-155431001
  
Thanks for the fast update @ChengXiangLi! I added a few comments, mostly 
regarding the shipping strategies, data exchange modes, and object 
instantiations. I think, we're almost there. After the next update, I'll play 
around with range partitioning on a cluster and see how it performs. We might 
want to add a few more tests, esp. to ensure the plan rewriting performs well.


> [GitHub] Enable Range Partitioner
> -
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Runtime
>Reporter: GitHub Import
>Assignee: Chengxiang Li
> Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the 
> following aspects:
> 1) Distribution information, if available, must be propagated back together 
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
>  - TeraSortITCase
>  - GlobalSortingITCase
>  - GlobalSortingMixedOrderITCase
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer, 
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2966) Improve the way job duration is reported on web frontend.

2015-11-10 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-2966.
---
   Resolution: Fixed
Fix Version/s: 0.10.1
   1.0

On master: d7da7d4

> Improve the way job duration is reported on web frontend.
> -
>
> Key: FLINK-2966
> URL: https://issues.apache.org/jira/browse/FLINK-2966
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Sachin Goel
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 1.0, 0.10.1
>
>
> Right now, job duration is always reported in milliseconds. For long running 
> jobs, this is not the best way.
> We should incorporate some kind of granularity in this. I propose this:
> 0-60 s : as x ms
> 60-3600 s: as x min y s
> 3600-86400 s: as x h y min [report as x h y min z s on hover]
> > 86400 s: as x d y h [report as x d y h z min t s on hover]
> I will start working on this, and we can change the granularity if someone 
> has a better idea.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2966) Improve the way job duration is reported on web frontend.

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998636#comment-14998636
 ] 

ASF GitHub Bot commented on FLINK-2966:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1327


> Improve the way job duration is reported on web frontend.
> -
>
> Key: FLINK-2966
> URL: https://issues.apache.org/jira/browse/FLINK-2966
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Sachin Goel
>Assignee: Sachin Goel
>Priority: Minor
>
> Right now, job duration is always reported in milliseconds. For long running 
> jobs, this is not the best way.
> We should incorporate some kind of granularity in this. I propose this:
> 0-60 s : as x ms
> 60-3600 s: as x min y s
> 3600-86400 s: as x h y min [report as x h y min z s on hover]
> > 86400 s: as x d y h [report as x d y h z min t s on hover]
> I will start working on this, and we can change the granularity if someone 
> has a better idea.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2851] Move PythonAPI to flink-libraries

2015-11-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1257#issuecomment-155432214
  
Hi @zentol, this is another step towards the Maven module refactoring that 
we decided to do.
I think you can merge this PR to the master branch. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >