[jira] [Created] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.

2017-04-07 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6276:
-

 Summary: InvalidTypesException: Unknown Error. Type is null.
 Key: FLINK-6276
 URL: https://issues.apache.org/jira/browse/FLINK-6276
 Project: Flink
  Issue Type: Bug
  Components: Core, DataSet API
Affects Versions: 1.2.0
Reporter: Luke Hutchison


Quite frequently when writing Flink code, I get the exception 
{{InvalidTypesException: Unknown Error. Type is null.}} 

A small example that triggers it is:

{code}
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class TestMain {

@SafeVarargs
public static  DataSet> join(V 
missingValuePlaceholder,
DataSet>... datasets) {
DataSet> join = null;
for (int i = 0; i < datasets.length; i++) {
final int datasetIdx = i;
if (datasetIdx == 0) {
join = datasets[datasetIdx] //
.map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) //
.name("start join");
} else {
join = join.coGroup(datasets[datasetIdx]) //
.where(0).equalTo(0) //
.with((Iterable> li, 
Iterable> ri,
Collector> out) -> {
K key = null;
List vals = new ArrayList<>(datasetIdx + 1);
Iterator> lIter = li.iterator();
if (!lIter.hasNext()) {
for (int j = 0; j < datasetIdx; j++) {
vals.add(missingValuePlaceholder);
}
} else {
Tuple2 lt = lIter.next();
key = lt.f0;
vals.addAll(lt.f1);
if (lIter.hasNext()) {
throw new RuntimeException("Got non-unique 
key: " + key);
}
}
Iterator> rIter = ri.iterator();
if (!rIter.hasNext()) {
vals.add(missingValuePlaceholder);
} else {
Tuple2 rt = rIter.next();
key = rt.f0;
vals.add(rt.f1);
if (rIter.hasNext()) {
throw new RuntimeException("Got non-unique 
key: " + key);
}
}
out.collect(new Tuple2(key, vals));
}) //
.name("join #" + datasetIdx);
}
}
return join;
}

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

DataSet> x = //
env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), 
new Tuple2<>("c", 5));
DataSet> y = //
env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), 
new Tuple2<>("d", 2));
DataSet> z = //
env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), 
new Tuple2<>("e", 9));

System.out.println(join(-1, x, y, z).collect());
}
}
{code}

The stacktrace that is triggered is:

{noformat}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'join(TestMain.java:23)' could not be determined automatically, due to 
type erasure. You can give type information hints by using the returns(...) 
method on the result of the transformation call, or by letting your function 
implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
at 
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
at 
com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
at 
com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input 
mismatch: Unknown Error. Type is null.
at 

[jira] [Created] (FLINK-6185) Input readers and output writers/formats need to support gzip

2017-03-24 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6185:
-

 Summary: Input readers and output writers/formats need to support 
gzip
 Key: FLINK-6185
 URL: https://issues.apache.org/jira/browse/FLINK-6185
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison
Priority: Minor


File sources (such as {{env#readCsvFile()}}) and sinks (such as 
FileOutputFormat and its subclasses, and methods such as 
{{DataSet#writeAsText()}}) need the ability to transparently decompress and 
compress files. Primarily gzip would be useful, but it would be nice if this 
were pluggable to support bzip2, xz, etc.

There could be options for autodetect (based on file extension and/or file 
content), which could be the default, as well as no compression or a selected 
compression method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6146) Incorrect function name given in exception thrown by DataSet.getType()

2017-03-21 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6146:
-

 Summary: Incorrect function name given in exception thrown by 
DataSet.getType()
 Key: FLINK-6146
 URL: https://issues.apache.org/jira/browse/FLINK-6146
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Affects Versions: 1.2.0
Reporter: Luke Hutchison


In the following code, this exception is thrown at the line marked {{// (1)}}:

{{noformat}}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: The return type of 
function 'convertToFractionalRank(MainTest.java:21)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input 
mismatch: Unknown Error. Type is null.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
at 
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown 
Error. Type is null.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
... 6 more
{{noformat}}


{{code}}
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

public class MainTest {

public static  DataSet> 
convertToFractionalRank(DataSet> key_score) {
// Sum within each key
// Result: ("", key, totScore)
DataSet> blank_key_totScore = 
key_score 
.groupBy(0).sum(1) 
// Prepend with "" to prep for for join
.map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = 
*/ t.f1));

// Count unique keys. Result: ("", numKeys)
DataSet> blank_numKeys = 
blank_key_totScore 
.distinct(0)
 // (1)
.map(t -> new Tuple2("", 1)) 
.groupBy(0).sum(1);

// Sort scores into order, then return the fractional rank in the range 
[0, 1]
return blank_key_totScore 
.coGroup(blank_numKeys) 
.where(0).equalTo(0) 
.with((Iterable> ai, 
Iterable> bi,
Collector> out) -> {
int numKeys = bi.iterator().next().f1;
for (Tuple3 a : ai) {
out.collect(new Tuple4<>("", /* key = */ a.f1, /* 
totScore = */ a.f2, numKeys));
}
}) 
// Group by "" (i.e. make into one group, so all the scores can 
be sorted together)
.groupBy(0) 
// Sort in descending order of score (the highest score gets 
the lowest rank, and vice versa)
.sortGroup(2, Order.DESCENDING) 
// Convert sorted rank from [0, numKeys-1] -> [0, 1]
.reduceGroup(
(Iterable> iter, 
Collector> out) -> {
int rank = 0;
for (Tuple4 t : iter) {
int 

[jira] [Created] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-19 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6115:
-

 Summary: Need more helpful error message when trying to serialize 
a tuple with a null field
 Key: FLINK-6115
 URL: https://issues.apache.org/jira/browse/FLINK-6115
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison


When Flink tries to serialize a tuple with a null field, you get the following, 
which has no information about where in the program the problem occurred (all 
the stack trace lines are in Flink, not in user code).

{noformat}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: The record must not be null.
at 
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
at 
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only thing I can tell from this is that it happened somewhere in a flatMap 
(but I have dozens of them in my code). Surely there's a way to pull out the 
source file name and line number from the program DAG node when errors like 
this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed

2017-03-19 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6114:
-

 Summary: Type checking fails with generics, even when concrete 
type of field is not needed
 Key: FLINK-6114
 URL: https://issues.apache.org/jira/browse/FLINK-6114
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0
Reporter: Luke Hutchison


The Flink type checker does not allow generic types to be used in any field of 
a tuple when a join is being executed, even if the generic is not in a field 
that is involved in the join.

I have a type Tuple3, which contains a generic type parameter 
K. I am joining using .where(0).equalTo(0). The type of field 0 is well-defined 
as String. However, this gives me the following error:

{noformat}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet 
mypkg.MyClass.method(params)' could not be determined. This is most likely a 
type erasure problem. The type extraction currently supports types with generic 
variables only in cases where all variables in the return type can be deduced 
from the input type(s).
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989)
{noformat}

The code compiles fine, however -- the static type system is able to correctly 
resolve the types in the surrounding code.

Really only the fields that are affected by joins (or groupBy, aggregation 
etc.) should be checked for concrete types in this way.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6110) Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency

2017-03-18 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6110:
-

 Summary: Flink unnecessarily repeats shared work triggered by 
different blocking sinks, leading to massive inefficiency
 Key: FLINK-6110
 URL: https://issues.apache.org/jira/browse/FLINK-6110
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison


After a blocking sink (collect() or count()) is called, all already-computed 
intermediate DataSets are thrown away, and any subsequent code that tries to 
make use of an already-computed DataSet will require the DataSet to be computed 
from scratch. For example, the following code prints the elements a, b and c 
twice in succession, even though the DataSet ds should only have to be computed 
once:

{code}
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
DataSet ds = env.fromElements("a", "b", "c").map(s -> { 
System.out.println(s); return s + s;});
List c1 = ds.map(s -> s).collect();
List c2 = ds.map(s -> s).collect();
{code}

This is problematic because not every operation is easy to express in Flink 
using joins and filters -- sometimes for smaller datasets (such as marginal 
counts) it's easier to collect the values into a HashMap, and then pass that 
HashMap into subsequent operations so they can look up the values they need 
directly. A more complex example is the need to sort a set of values, then use 
the sorted array for subsequent binary search operations to identify rank -- 
this is a lot easier to do using an array of sorted values, as long as that 
array fits easily in RAM.

However, any collect() or count() operation causes immediate execution of the 
Flink pipeline, which throws away *all* intermediate values that could be 
reused for future executions. As a result, code can be extremely inefficient, 
recomputing the same values over and over again unnecessarily.

I believe that intermediate values should not be released or garbage collected 
until after env.execute() is called.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6108) Add sum(fieldName) to sum based on named POJO fields

2017-03-18 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6108:
-

 Summary: Add sum(fieldName) to sum based on named POJO fields
 Key: FLINK-6108
 URL: https://issues.apache.org/jira/browse/FLINK-6108
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison


Currently it is possible to do

{code}
dataSet.groupBy(0).sum(1)
{code}

but not

{code}
dataSet.groupBy("id").sum("count")
{code}

-- groupBy() takes named fields, but sum does not, it only works with tuples. 
It would be great if sum() could take named fields too -- otherwise you have to 
map to tuples to make use of easy summing.

(This would probably only work reliably with POJOs, since a new object instance 
would need to be constructed to take the field containing the sum.)

Similarly, min, max etc. should also take named fields.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6070) Suggestion: add ComparableTuple types

2017-03-16 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6070:
-

 Summary: Suggestion: add ComparableTuple types
 Key: FLINK-6070
 URL: https://issues.apache.org/jira/browse/FLINK-6070
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison
Priority: Minor


Since Java doesn't have built-in tuple types, I find myself using Flink tuples 
for a lot of tasks in Flink programs. One downside is that these tuples are not 
inherently comparable, so when you want to sort a collection of tuples, you 
have to provide a custom comparator.

I created a ComparableTuple2 type, as follows. I wanted to get feedback on 
whether something like this would be considered useful for Flink before I 
submitted a PR. Also, I don't know how high I should go with the field arity 
for a ComparableTuple -- presumably not as high as for non-comparable tuples?

{code}
import org.apache.flink.api.java.tuple.Tuple2;

/** A comparable tuple, consisting of comparable fields that act as primary and 
secondary sort keys. */
public class ComparableTuple2, T1 extends 
Comparable> extends Tuple2
implements Comparable> {
private static final long serialVersionUID = 1L;

private boolean invertSortOrder0;
private boolean invertSortOrder1;

public ComparableTuple2() {
}

/**
 * Create a 2-tuple of comparable elements.
 * 
 * @param f0
 *The first element, which is also the primary sort key, and 
sorts in ascending order.
 * @param f1
 *The second element, which is also the secondary sort key, and 
sorts in ascending order.
 * @param invertSortOrder0
 *If true, invert the sort order for the first field (i.e. sort 
in descending order).
 * @param invertSortOrder1
 *If true, invert the sort order for the second field (i.e. 
sort in descending order).
 */
public ComparableTuple2(T0 f0, T1 f1) {
super(f0, f1);
}

/**
 * Create a comparable 2-tuple out of comparable elements.
 * 
 * @param f0
 *The first element, which is also the primary sort key, and 
sorts in ascending order if
 *invertSortOrder0 == false, else sorts in descending order.
 * @param f1
 *The second element, which is also the secondary sort key, and 
sorts in decending order if
 *invertSortOrder1 == false, else sorts in descending order.
 * @param invertSortOrder0
 *If true, invert the sort order for the first field (i.e. sort 
in descending order).
 * @param invertSortOrder1
 *If true, invert the sort order for the second field (i.e. 
sort in descending order).
 */
public ComparableTuple2(final T0 f0, final T1 f1, final boolean 
invertSortOrder0,
final boolean invertSortOrder1) {
super(f0, f1);
this.invertSortOrder0 = invertSortOrder0;
this.invertSortOrder1 = invertSortOrder1;
}

/**
 * Comparison function that compares first the primary sort key, f0, and 
then if equal, compares the secondary sort
 * key, f1.
 */
@Override
public int compareTo(final Tuple2 o) {
int diff = this.f0.compareTo(o.f0);
if (invertSortOrder0) {
diff = -diff;
}
if (diff != 0) {
return diff;
}
diff = this.f1.compareTo(o.f1);
if (invertSortOrder1) {
diff = -diff;
}
return diff;
}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6057) Better default needed for num network buffers

2017-03-15 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6057:
-

 Summary: Better default needed for num network buffers
 Key: FLINK-6057
 URL: https://issues.apache.org/jira/browse/FLINK-6057
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison


Using the default environment,

{code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
{code}

my code will sometimes fail with an error that Flink ran out of network 
buffers. To fix this, I have to do:

{code}
int numTasks = Runtime.getRuntime().availableProcessors();
config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, numTasks);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTasks);
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 
numTasks * 2048);
{code}

The default value of 2048 fails when I increase the degree of parallelism for a 
large Flink pipeline (hence the fix to set the number of buffers to numTasks * 
2048).

This is particularly problematic because a pipeline can work fine on one 
machine, and when you start the pipeline on a machine with more cores, it can 
fail.

The default execution environment should pick a saner default based on the 
level of parallelism (or whatever is needed to ensure that the number of 
network buffers is not going to be exceeded for a given execution environment).




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6035) The method name(String) is undefined for the type UnsortedGrouping

2017-03-14 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6035:
-

 Summary: The method name(String) is undefined for the type 
UnsortedGrouping
 Key: FLINK-6035
 URL: https://issues.apache.org/jira/browse/FLINK-6035
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Affects Versions: 1.2.0
Reporter: Luke Hutchison
Priority: Trivial


If I call

{code}
dataSet.groupBy(0).name("group by key")
{code}

I get an error message like

{noformat}
The method name(String) is undefined for the type 
UnsortedGrouping>
{noformat}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6029) Strange linebreaks in web dashboard graph view make it hard to read text

2017-03-13 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6029:
-

 Summary: Strange linebreaks in web dashboard graph view make it 
hard to read text
 Key: FLINK-6029
 URL: https://issues.apache.org/jira/browse/FLINK-6029
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.2.0
Reporter: Luke Hutchison
Priority: Minor


Text lines inside the boxes in the Flink web dashboard have linebreaks at very 
odd places, and it can make the content of the boxes hard to read. (See 
attached screenshot.)

For clarity the content could be tabulated and indented, e.g with each "->" 
arrow at the start of a new text line along with the operation type ("-> 
Filter"), and the description indented on the line below, with long 
descriptions wrapped to the same start indentation, so that it's easy to 
visually separate out operations from descriptions based on indentation:

{noformat}
Filter
(filter for problems with date <= 2017-01)
-> FlatMap
(calculate problem severity scores for
date 2017-01)
-> Combine
(sum all severity scores for (bin, apt) for
date 2017-01)




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6028) Unnamed operations in main method logged as "main(null:-1)"

2017-03-13 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6028:
-

 Summary: Unnamed operations in main method logged as 
"main(null:-1)"
 Key: FLINK-6028
 URL: https://issues.apache.org/jira/browse/FLINK-6028
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison
Priority: Trivial


If in main I have some code like this:

{code}
x.filter(t -> !t.f0.isEmpty())
.flatMap((t, out) -> out.collect(new Tuple3<>(t.f1, t.f2, t.f3)))
.writeAsText(filename, WriteMode.OVERWRITE).setParallelism(1);
{code}

In the log, the origin of these unnamed operations shows up as "main(null:-1)":

{noformat}
CHAIN Filter (Filter at main(null:-1)) -> FlatMap (FlatMap at 
main(null:-1))(2/2) switched to SCHEDULED 
{noformat}

However, operations inside lambdas seem to correctly provide the class name and 
line number in the logs, e.g. "Main.java:217".



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6026) Cannot name flatMap operations

2017-03-13 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6026:
-

 Summary: Cannot name flatMap operations
 Key: FLINK-6026
 URL: https://issues.apache.org/jira/browse/FLINK-6026
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0
Reporter: Luke Hutchison
Priority: Minor


I get an error if I try naming a flatMap operation:

DataSet> y = x.flatMap((t, out) -> 
out.collect(t)).name("op");

Type mismatch: cannot convert from 
FlatMapOperator,Object> to 
DataSet>

If I try to do it as two steps, I get the error that DataSet does not have a 
.name(String) method:

DataSet> y = x.flatMap((t, out) -> out.collect(t));
y.name("op");

If I use Eclipse type inference on x, it shows me that the output type is not 
correctly inferred:

FlatMapOperator, Object> y = x.flatMap((t, out) -> 
out.collect(t));
y.name("op");   // This now works, but "Object" is not the output type

However, these steps still cannot be chained -- the following still gives an 
error:

FlatMapOperator, Object> y = x.flatMap((t, out) -> 
out.collect(t)).name("op");

i.e. first you have to assign the result to a field, so that the type is fully 
specified; then you can name the operation.

And the weird thing is that you can give the correct, more specific type for 
the local variable, without a type narrowing error:

FlatMapOperator, Tuple2> y = 
x.flatMap((t, out) -> out.collect(t));
y.name("op");   // This works, although chaining these two lines still does not 
work

If the types of the lambda args are specified, then everything works:

DataSet> y = x.flatMap((Tuple2 t, 
Collector> out) -> out.collect(t)).name("op");

So, at least two things are going on here:

(1) type inference is not working correctly for the lambda parameters

(2) this breaks type inference for intermediate expressions, unless the type 
can be resolved using a local variable definition

Is this a bug in the type signature of flatMap? (Or a compiler bug or 
limitation, or a fundamental limitation of Java 8 type inference?)

It seems odd that the type of a local variable definition can make the result 
of the flatMap operator *more* specific, taking the type from 

FlatMapOperator, Object>

to 

FlatMapOperator, Tuple2>

i.e. if the output type is provided in the local variable definition, it is 
properly unified with the type of the parameter t of collect(t), however that 
type is not propagated out of that call.

Can anything be done about this in Flink? I have hit this problem a few times.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6024) Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key"

2017-03-10 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6024:
-

 Summary: Need more fine-grained info for "InvalidProgramException: 
This type (...) cannot be used as key"
 Key: FLINK-6024
 URL: https://issues.apache.org/jira/browse/FLINK-6024
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison


I got this very confusing exception:

InvalidProgramException: This type (MyType) cannot be used as key

I dug through the code, and could not find what was causing this. The help text 
for type.isKeyType(), in Keys.java:329, right before the exception is thrown, 
says: "Checks whether this type can be used as a key. As a bare minimum, types 
have to be hashable and comparable to be keys." However, this didn't solve the 
problem.

I discovered that in my case, the error was occurring because I added a new 
constructor to the type, and I didn't have a default constructor. This is 
probably quite a common thing to happen for POJOs, so the error message should 
give some detail saying that this is the problem.

Other things that can cause this to fail, including that the class is not 
public, or the constructor is not public, or the key field is not public, or 
that the key field is not a serializable type, or the key is not Comparable, or 
the key is not hashable, should be given in the error message instead, 
depending on the actual cause of the problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed

2017-03-10 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6019:
-

 Summary: Some log4j messages do not have a loglevel field set, so 
they can't be suppressed
 Key: FLINK-6019
 URL: https://issues.apache.org/jira/browse/FLINK-6019
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
 Environment: Linux
Reporter: Luke Hutchison


Some of the log messages do not appear to have a loglevel value set, so they 
can't be suppressed by setting the log4j level to WARN. There's this line at 
the beginning which doesn't even have a timestamp:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939]

And then there are numerous lines like this, missing an "INFO" field:

03/10/2017 00:01:14 Job execution switched to status RUNNING.
03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) 
(org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED 
03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED 
03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING 
03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING 
03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED 
03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) 
(org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED 
03/10/2017 00:01:17 Job execution switched to status FINISHED.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6017) CSV reader does not support quoted double quotes

2017-03-09 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6017:
-

 Summary: CSV reader does not support quoted double quotes
 Key: FLINK-6017
 URL: https://issues.apache.org/jira/browse/FLINK-6017
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
 Environment: Linux
Reporter: Luke Hutchison


The RFC for the CSV format specifies that double quotes are valid in quoted 
strings in CSV, by doubling the quote character:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing quoted quotes, such as:

bob,"The name is ""Bob"""

you get this exception:

org.apache.flink.api.common.io.ParseException: Line could not be parsed: 
'bob,"The name is ""Bob"""'
ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6016) Newlines should be valid in quoted strings in CSV

2017-03-09 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6016:
-

 Summary: Newlines should be valid in quoted strings in CSV
 Key: FLINK-6016
 URL: https://issues.apache.org/jira/browse/FLINK-6016
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
Affects Versions: 1.2.0
Reporter: Luke Hutchison


The RFC for the CSV format specifies that newlines are valid in quoted strings 
in CSV:

https://tools.ietf.org/html/rfc4180

However, when parsing a CSV file with Flink containing a newline, such as:

"3
4",5

you get this exception:

Line could not be parsed: '"3'
ParserError UNTERMINATED_QUOTED_STRING 
Expect field types: class java.lang.String, class java.lang.String 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)

2017-03-09 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6015:
-

 Summary: "Row too short" when reading CSV line with empty last 
field (i.e. ending in comma)
 Key: FLINK-6015
 URL: https://issues.apache.org/jira/browse/FLINK-6015
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats
Affects Versions: 1.2.0
 Environment: Linux
Reporter: Luke Hutchison


When using env.readCsvFile(filename), if a line in the CSV file has an empty 
last field, the line ends with a comma. This triggers an exception in 
GenericCsvInput.parseRecord():

// check valid start position
if (startPos >= limit) {
if (lenient) {
return false;
} else {
throw new ParseException("Row too 
short: " + new String(bytes, offset, numBytes));
}
}

Setting the parser to lenient would cause the last field to be left as null, 
rather than setting its value to "".

The parser should accept empty values for the last field on a row.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)