[jira] [Created] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
Luke Hutchison created FLINK-6276: - Summary: InvalidTypesException: Unknown Error. Type is null. Key: FLINK-6276 URL: https://issues.apache.org/jira/browse/FLINK-6276 Project: Flink Issue Type: Bug Components: Core, DataSet API Affects Versions: 1.2.0 Reporter: Luke Hutchison Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown Error. Type is null.}} A small example that triggers it is: {code} import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class TestMain { @SafeVarargs public staticDataSet > join(V missingValuePlaceholder, DataSet >... datasets) { DataSet > join = null; for (int i = 0; i < datasets.length; i++) { final int datasetIdx = i; if (datasetIdx == 0) { join = datasets[datasetIdx] // .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) // .name("start join"); } else { join = join.coGroup(datasets[datasetIdx]) // .where(0).equalTo(0) // .with((Iterable > li, Iterable > ri, Collector > out) -> { K key = null; List vals = new ArrayList<>(datasetIdx + 1); Iterator > lIter = li.iterator(); if (!lIter.hasNext()) { for (int j = 0; j < datasetIdx; j++) { vals.add(missingValuePlaceholder); } } else { Tuple2 lt = lIter.next(); key = lt.f0; vals.addAll(lt.f1); if (lIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } Iterator > rIter = ri.iterator(); if (!rIter.hasNext()) { vals.add(missingValuePlaceholder); } else { Tuple2 rt = rIter.next(); key = rt.f0; vals.add(rt.f1); if (rIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } out.collect(new Tuple2 (key, vals)); }) // .name("join #" + datasetIdx); } } return join; } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet > x = // env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5)); DataSet > y = // env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new Tuple2<>("d", 2)); DataSet > z = // env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new Tuple2<>("e", 9)); System.out.println(join(-1, x, y, z).collect()); } } {code} The stacktrace that is triggered is: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'join(TestMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424) at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at
[jira] [Created] (FLINK-6185) Input readers and output writers/formats need to support gzip
Luke Hutchison created FLINK-6185: - Summary: Input readers and output writers/formats need to support gzip Key: FLINK-6185 URL: https://issues.apache.org/jira/browse/FLINK-6185 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor File sources (such as {{env#readCsvFile()}}) and sinks (such as FileOutputFormat and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6146) Incorrect function name given in exception thrown by DataSet.getType()
Luke Hutchison created FLINK-6146: - Summary: Incorrect function name given in exception thrown by DataSet.getType() Key: FLINK-6146 URL: https://issues.apache.org/jira/browse/FLINK-6146 Project: Flink Issue Type: Bug Components: DataSet API Affects Versions: 1.2.0 Reporter: Luke Hutchison In the following code, this exception is thrown at the line marked {{// (1)}}: {{noformat}} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28) at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164) at org.apache.flink.api.java.DataSet.map(DataSet.java:215) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21) ... 1 more Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131) ... 6 more {{noformat}} {{code}} import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.util.Collector; public class MainTest { public static DataSet> convertToFractionalRank(DataSet > key_score) { // Sum within each key // Result: ("", key, totScore) DataSet > blank_key_totScore = key_score .groupBy(0).sum(1) // Prepend with "" to prep for for join .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1)); // Count unique keys. Result: ("", numKeys) DataSet > blank_numKeys = blank_key_totScore .distinct(0) // (1) .map(t -> new Tuple2 ("", 1)) .groupBy(0).sum(1); // Sort scores into order, then return the fractional rank in the range [0, 1] return blank_key_totScore .coGroup(blank_numKeys) .where(0).equalTo(0) .with((Iterable > ai, Iterable > bi, Collector > out) -> { int numKeys = bi.iterator().next().f1; for (Tuple3 a : ai) { out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys)); } }) // Group by "" (i.e. make into one group, so all the scores can be sorted together) .groupBy(0) // Sort in descending order of score (the highest score gets the lowest rank, and vice versa) .sortGroup(2, Order.DESCENDING) // Convert sorted rank from [0, numKeys-1] -> [0, 1] .reduceGroup( (Iterable > iter, Collector > out) -> { int rank = 0; for (Tuple4 t : iter) { int
[jira] [Created] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
Luke Hutchison created FLINK-6115: - Summary: Need more helpful error message when trying to serialize a tuple with a null field Key: FLINK-6115 URL: https://issues.apache.org/jira/browse/FLINK-6115 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison When Flink tries to serialize a tuple with a null field, you get the following, which has no information about where in the program the problem occurred (all the stack trace lines are in Flink, not in user code). {noformat} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: The record must not be null. at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) {noformat} The only thing I can tell from this is that it happened somewhere in a flatMap (but I have dozens of them in my code). Surely there's a way to pull out the source file name and line number from the program DAG node when errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
Luke Hutchison created FLINK-6114: - Summary: Type checking fails with generics, even when concrete type of field is not needed Key: FLINK-6114 URL: https://issues.apache.org/jira/browse/FLINK-6114 Project: Flink Issue Type: Bug Affects Versions: 1.2.0 Reporter: Luke Hutchison The Flink type checker does not allow generic types to be used in any field of a tuple when a join is being executed, even if the generic is not in a field that is involved in the join. I have a type Tuple3, which contains a generic type parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is well-defined as String. However, this gives me the following error: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet mypkg.MyClass.method(params)' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) {noformat} The code compiles fine, however -- the static type system is able to correctly resolve the types in the surrounding code. Really only the fields that are affected by joins (or groupBy, aggregation etc.) should be checked for concrete types in this way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6110) Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency
Luke Hutchison created FLINK-6110: - Summary: Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency Key: FLINK-6110 URL: https://issues.apache.org/jira/browse/FLINK-6110 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison After a blocking sink (collect() or count()) is called, all already-computed intermediate DataSets are thrown away, and any subsequent code that tries to make use of an already-computed DataSet will require the DataSet to be computed from scratch. For example, the following code prints the elements a, b and c twice in succession, even though the DataSet ds should only have to be computed once: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = env.fromElements("a", "b", "c").map(s -> { System.out.println(s); return s + s;}); List c1 = ds.map(s -> s).collect(); List c2 = ds.map(s -> s).collect(); {code} This is problematic because not every operation is easy to express in Flink using joins and filters -- sometimes for smaller datasets (such as marginal counts) it's easier to collect the values into a HashMap, and then pass that HashMap into subsequent operations so they can look up the values they need directly. A more complex example is the need to sort a set of values, then use the sorted array for subsequent binary search operations to identify rank -- this is a lot easier to do using an array of sorted values, as long as that array fits easily in RAM. However, any collect() or count() operation causes immediate execution of the Flink pipeline, which throws away *all* intermediate values that could be reused for future executions. As a result, code can be extremely inefficient, recomputing the same values over and over again unnecessarily. I believe that intermediate values should not be released or garbage collected until after env.execute() is called. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6108) Add sum(fieldName) to sum based on named POJO fields
Luke Hutchison created FLINK-6108: - Summary: Add sum(fieldName) to sum based on named POJO fields Key: FLINK-6108 URL: https://issues.apache.org/jira/browse/FLINK-6108 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Currently it is possible to do {code} dataSet.groupBy(0).sum(1) {code} but not {code} dataSet.groupBy("id").sum("count") {code} -- groupBy() takes named fields, but sum does not, it only works with tuples. It would be great if sum() could take named fields too -- otherwise you have to map to tuples to make use of easy summing. (This would probably only work reliably with POJOs, since a new object instance would need to be constructed to take the field containing the sum.) Similarly, min, max etc. should also take named fields. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6070) Suggestion: add ComparableTuple types
Luke Hutchison created FLINK-6070: - Summary: Suggestion: add ComparableTuple types Key: FLINK-6070 URL: https://issues.apache.org/jira/browse/FLINK-6070 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator. I created a ComparableTuple2 type, as follows. I wanted to get feedback on whether something like this would be considered useful for Flink before I submitted a PR. Also, I don't know how high I should go with the field arity for a ComparableTuple -- presumably not as high as for non-comparable tuples? {code} import org.apache.flink.api.java.tuple.Tuple2; /** A comparable tuple, consisting of comparable fields that act as primary and secondary sort keys. */ public class ComparableTuple2, T1 extends Comparable> extends Tuple2implements Comparable > { private static final long serialVersionUID = 1L; private boolean invertSortOrder0; private boolean invertSortOrder1; public ComparableTuple2() { } /** * Create a 2-tuple of comparable elements. * * @param f0 *The first element, which is also the primary sort key, and sorts in ascending order. * @param f1 *The second element, which is also the secondary sort key, and sorts in ascending order. * @param invertSortOrder0 *If true, invert the sort order for the first field (i.e. sort in descending order). * @param invertSortOrder1 *If true, invert the sort order for the second field (i.e. sort in descending order). */ public ComparableTuple2(T0 f0, T1 f1) { super(f0, f1); } /** * Create a comparable 2-tuple out of comparable elements. * * @param f0 *The first element, which is also the primary sort key, and sorts in ascending order if *invertSortOrder0 == false, else sorts in descending order. * @param f1 *The second element, which is also the secondary sort key, and sorts in decending order if *invertSortOrder1 == false, else sorts in descending order. * @param invertSortOrder0 *If true, invert the sort order for the first field (i.e. sort in descending order). * @param invertSortOrder1 *If true, invert the sort order for the second field (i.e. sort in descending order). */ public ComparableTuple2(final T0 f0, final T1 f1, final boolean invertSortOrder0, final boolean invertSortOrder1) { super(f0, f1); this.invertSortOrder0 = invertSortOrder0; this.invertSortOrder1 = invertSortOrder1; } /** * Comparison function that compares first the primary sort key, f0, and then if equal, compares the secondary sort * key, f1. */ @Override public int compareTo(final Tuple2 o) { int diff = this.f0.compareTo(o.f0); if (invertSortOrder0) { diff = -diff; } if (diff != 0) { return diff; } diff = this.f1.compareTo(o.f1); if (invertSortOrder1) { diff = -diff; } return diff; } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6057) Better default needed for num network buffers
Luke Hutchison created FLINK-6057: - Summary: Better default needed for num network buffers Key: FLINK-6057 URL: https://issues.apache.org/jira/browse/FLINK-6057 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Using the default environment, {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); {code} my code will sometimes fail with an error that Flink ran out of network buffers. To fix this, I have to do: {code} int numTasks = Runtime.getRuntime().availableProcessors(); config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, numTasks); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTasks); config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, numTasks * 2048); {code} The default value of 2048 fails when I increase the degree of parallelism for a large Flink pipeline (hence the fix to set the number of buffers to numTasks * 2048). This is particularly problematic because a pipeline can work fine on one machine, and when you start the pipeline on a machine with more cores, it can fail. The default execution environment should pick a saner default based on the level of parallelism (or whatever is needed to ensure that the number of network buffers is not going to be exceeded for a given execution environment). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6035) The method name(String) is undefined for the type UnsortedGrouping
Luke Hutchison created FLINK-6035: - Summary: The method name(String) is undefined for the type UnsortedGrouping Key: FLINK-6035 URL: https://issues.apache.org/jira/browse/FLINK-6035 Project: Flink Issue Type: Bug Components: DataSet API Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Trivial If I call {code} dataSet.groupBy(0).name("group by key") {code} I get an error message like {noformat} The method name(String) is undefined for the type UnsortedGrouping> {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6029) Strange linebreaks in web dashboard graph view make it hard to read text
Luke Hutchison created FLINK-6029: - Summary: Strange linebreaks in web dashboard graph view make it hard to read text Key: FLINK-6029 URL: https://issues.apache.org/jira/browse/FLINK-6029 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor Text lines inside the boxes in the Flink web dashboard have linebreaks at very odd places, and it can make the content of the boxes hard to read. (See attached screenshot.) For clarity the content could be tabulated and indented, e.g with each "->" arrow at the start of a new text line along with the operation type ("-> Filter"), and the description indented on the line below, with long descriptions wrapped to the same start indentation, so that it's easy to visually separate out operations from descriptions based on indentation: {noformat} Filter (filter for problems with date <= 2017-01) -> FlatMap (calculate problem severity scores for date 2017-01) -> Combine (sum all severity scores for (bin, apt) for date 2017-01) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6028) Unnamed operations in main method logged as "main(null:-1)"
Luke Hutchison created FLINK-6028: - Summary: Unnamed operations in main method logged as "main(null:-1)" Key: FLINK-6028 URL: https://issues.apache.org/jira/browse/FLINK-6028 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Trivial If in main I have some code like this: {code} x.filter(t -> !t.f0.isEmpty()) .flatMap((t, out) -> out.collect(new Tuple3<>(t.f1, t.f2, t.f3))) .writeAsText(filename, WriteMode.OVERWRITE).setParallelism(1); {code} In the log, the origin of these unnamed operations shows up as "main(null:-1)": {noformat} CHAIN Filter (Filter at main(null:-1)) -> FlatMap (FlatMap at main(null:-1))(2/2) switched to SCHEDULED {noformat} However, operations inside lambdas seem to correctly provide the class name and line number in the logs, e.g. "Main.java:217". -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6026) Cannot name flatMap operations
Luke Hutchison created FLINK-6026: - Summary: Cannot name flatMap operations Key: FLINK-6026 URL: https://issues.apache.org/jira/browse/FLINK-6026 Project: Flink Issue Type: Bug Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor I get an error if I try naming a flatMap operation: DataSet> y = x.flatMap((t, out) -> out.collect(t)).name("op"); Type mismatch: cannot convert from FlatMapOperator ,Object> to DataSet > If I try to do it as two steps, I get the error that DataSet does not have a .name(String) method: DataSet > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); If I use Eclipse type inference on x, it shows me that the output type is not correctly inferred: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This now works, but "Object" is not the output type However, these steps still cannot be chained -- the following still gives an error: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)).name("op"); i.e. first you have to assign the result to a field, so that the type is fully specified; then you can name the operation. And the weird thing is that you can give the correct, more specific type for the local variable, without a type narrowing error: FlatMapOperator , Tuple2 > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This works, although chaining these two lines still does not work If the types of the lambda args are specified, then everything works: DataSet > y = x.flatMap((Tuple2 t, Collector > out) -> out.collect(t)).name("op"); So, at least two things are going on here: (1) type inference is not working correctly for the lambda parameters (2) this breaks type inference for intermediate expressions, unless the type can be resolved using a local variable definition Is this a bug in the type signature of flatMap? (Or a compiler bug or limitation, or a fundamental limitation of Java 8 type inference?) It seems odd that the type of a local variable definition can make the result of the flatMap operator *more* specific, taking the type from FlatMapOperator , Object> to FlatMapOperator , Tuple2 > i.e. if the output type is provided in the local variable definition, it is properly unified with the type of the parameter t of collect(t), however that type is not propagated out of that call. Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6024) Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key"
Luke Hutchison created FLINK-6024: - Summary: Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key" Key: FLINK-6024 URL: https://issues.apache.org/jira/browse/FLINK-6024 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison I got this very confusing exception: InvalidProgramException: This type (MyType) cannot be used as key I dug through the code, and could not find what was causing this. The help text for type.isKeyType(), in Keys.java:329, right before the exception is thrown, says: "Checks whether this type can be used as a key. As a bare minimum, types have to be hashable and comparable to be keys." However, this didn't solve the problem. I discovered that in my case, the error was occurring because I added a new constructor to the type, and I didn't have a default constructor. This is probably quite a common thing to happen for POJOs, so the error message should give some detail saying that this is the problem. Other things that can cause this to fail, including that the class is not public, or the constructor is not public, or the key field is not public, or that the key field is not a serializable type, or the key is not Comparable, or the key is not hashable, should be given in the error message instead, depending on the actual cause of the problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
Luke Hutchison created FLINK-6019: - Summary: Some log4j messages do not have a loglevel field set, so they can't be suppressed Key: FLINK-6019 URL: https://issues.apache.org/jira/browse/FLINK-6019 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Environment: Linux Reporter: Luke Hutchison Some of the log messages do not appear to have a loglevel value set, so they can't be suppressed by setting the log4j level to WARN. There's this line at the beginning which doesn't even have a timestamp: Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] And then there are numerous lines like this, missing an "INFO" field: 03/10/2017 00:01:14 Job execution switched to status RUNNING. 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED 03/10/2017 00:01:17 Job execution switched to status FINISHED. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6017) CSV reader does not support quoted double quotes
Luke Hutchison created FLINK-6017: - Summary: CSV reader does not support quoted double quotes Key: FLINK-6017 URL: https://issues.apache.org/jira/browse/FLINK-6017 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Environment: Linux Reporter: Luke Hutchison The RFC for the CSV format specifies that double quotes are valid in quoted strings in CSV, by doubling the quote character: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing quoted quotes, such as: bob,"The name is ""Bob""" you get this exception: org.apache.flink.api.common.io.ParseException: Line could not be parsed: 'bob,"The name is ""Bob"""' ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6016) Newlines should be valid in quoted strings in CSV
Luke Hutchison created FLINK-6016: - Summary: Newlines should be valid in quoted strings in CSV Key: FLINK-6016 URL: https://issues.apache.org/jira/browse/FLINK-6016 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Affects Versions: 1.2.0 Reporter: Luke Hutchison The RFC for the CSV format specifies that newlines are valid in quoted strings in CSV: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing a newline, such as: "3 4",5 you get this exception: Line could not be parsed: '"3' ParserError UNTERMINATED_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6015) "Row too short" when reading CSV line with empty last field (i.e. ending in comma)
Luke Hutchison created FLINK-6015: - Summary: "Row too short" when reading CSV line with empty last field (i.e. ending in comma) Key: FLINK-6015 URL: https://issues.apache.org/jira/browse/FLINK-6015 Project: Flink Issue Type: Bug Components: Batch Connectors and Input/Output Formats Affects Versions: 1.2.0 Environment: Linux Reporter: Luke Hutchison When using env.readCsvFile(filename), if a line in the CSV file has an empty last field, the line ends with a comma. This triggers an exception in GenericCsvInput.parseRecord(): // check valid start position if (startPos >= limit) { if (lenient) { return false; } else { throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); } } Setting the parser to lenient would cause the last field to be left as null, rather than setting its value to "". The parser should accept empty values for the last field on a row. -- This message was sent by Atlassian JIRA (v6.3.15#6346)