[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501463#comment-16501463 ] Luke Hutchison commented on FLINK-6019: --- PS this is especially confusing because the current documentation on logging for Flink talks about configuring log4j, then it talks about configuring logback. But it never mentions that Flink internally uses the slf4j API for logging. It also doesn't point out that you don't need to configure both log4j and logback, you can pick just one of them as the logging backend. Then you need to set up a bridge from slf4j to whichever logging backend you chose. If you pick logback as the backend though, then you'll also need to bridge log4j to slf4j (which will then be bridged to logback), so that all the logs from any log4j libraries in the classpath will be forwarded through to the single logback sink. > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison >Priority: Major > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501456#comment-16501456 ] Luke Hutchison commented on FLINK-6019: --- Unfortunately I am so unknowledgeable about the complexities of Java logging that I would not be the right person to create the doc changes, I'd probably get something wrong. I'll tell you what I figured out though: * Yes, you cannot have a log4j-to-slf4j and an slf4j-to-log4j bridge in the classpath at the same time, without creating an infinite loop. * As far as I can tell though, there is no bridge installed by default (at least not for the Flink libraries I was using). * slf4j is the way forward, since it unifies all the logging frameworks -- it can work as middleware between any logger frontend and any logger backend. (I didn't know this before...) It also has its own logging API, so it can serve as its own frontend. Maybe it's enough to simply make the observation in the logging pages that there can be multiple loggers in the system, used by different libraries in the classpath or module path, and that if you want all log info to be sent to one place, with the same formatting, then you will need to install a bridge that sends all the output to one of the loggers. Giving the names of a few bridges might help direct people to the right places to go to start looking for this. (I had no idea what was wrong or how to start looking for info to fix this...) The Maven dependencies I ended up were was the slf4j logging API (for logging in my own code), and then the slf4j-log4j12 bridge for sending slf4j logging to log4j. {code} org.slf4j slf4j-api 1.7.25 org.slf4j slf4j-log4j12 1.7.25 {code} The log4j.properties that I ended up with was {code} # Root logger (default) log4j.rootLogger=INFO, stdout # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{-MM-dd HH:mm:ss} %-5p %C{1}:%L\t%m%n # Change to INFO for verbose info about Flink pipeline as it runs log4j.logger.org.apache.flink=WARN # Specifically limit other chatty loggers here (e.g. AWS logs every byte transferred in INFO mode) log4j.logger.org.apache.http=WARN log4j.logger.com.amazonaws=WARN {code} > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison >Priority: Major > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501158#comment-16501158 ] Luke Hutchison commented on FLINK-6019: --- [~StephanEwen] It looks like I don't have rights to change this bug report -- please go ahead and close it. Thanks. PS please add to the logging section of the Flink documentation that the primary logger for Flink is slf4j, but that some of the deps use JCL / log4j, so you will need a bridge installed, as described in my previous comment. > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison >Priority: Major > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501151#comment-16501151 ] Luke Hutchison commented on FLINK-6019: --- Yes, that was the problem, thank you. I had mis-diagnosed the problem -- it wasn't that there were two log4j instances running, the problem was that log4j and slf4j were both running, and were unbridged. To solve the problem, either {{jcl-over-slf4j}} (to bridge log4j to slf4j) or {{slf4j-jcl}} (to bridge slf4j to log4j) needs to be in the classpath. > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison >Priority: Major > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497586#comment-16497586 ] Luke Hutchison commented on FLINK-6019: --- [~StephanEwen] I updated to Flink 1.5 and log4j2, and I am seeing this problem again. Now {{env.getConfig().disableSysoutLogging()}} no longer suppresses output from Flink. I have log4j2 configured using {{src/main/resources/log4j2.xml}}, and logging works correctly for my own log output in my program. However, Flink does not reuse the same log4j logger. I have even tried adding a {{log4j.properties}} file in the same directory, in case Flink needs configuration in the older format, but that does not help. Log output from my own logging commands appears on stdout, and log output from Flink appears on stderr, with a different log line format. So it is clear that Flink is starting up its own instance of log4j. How do I get Flink to simply use the log4j instance that I have included on the classpath? > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison >Priority: Major > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6016) Newlines should be valid in quoted strings in CSV
[ https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487197#comment-16487197 ] Luke Hutchison commented on FLINK-6016: --- Well, surely CSV is not the only file format that cannot be accurately parsed if it is split into chunks... > Newlines should be valid in quoted strings in CSV > - > > Key: FLINK-6016 > URL: https://issues.apache.org/jira/browse/FLINK-6016 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Major > > The RFC for the CSV format specifies that newlines are valid in quoted > strings in CSV: > https://tools.ietf.org/html/rfc4180 > However, when parsing a CSV file with Flink containing a newline, such as: > {noformat} > "3 > 4",5 > {noformat} > you get this exception: > {noformat} > Line could not be parsed: '"3' > ParserError UNTERMINATED_QUOTED_STRING > Expect field types: class java.lang.String, class java.lang.String > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6016) Newlines should be valid in quoted strings in CSV
[ https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487151#comment-16487151 ] Luke Hutchison commented on FLINK-6016: --- [~fhueske] reading a file in parallel is not faster for most filesystems and most storage devices on most operating systems. In fact, for a large-latency seek device, such as an HDD, reading from several threads in parallel will actually increase the total read time, potentially dramatically. The only way reading a file in parallel can be truly fast from multiple threads is if the entire file is already cached in RAM. I suggest simply reading the file serially, and emitting lines to a collection that can then be read in parallel by multiple mappers. > Newlines should be valid in quoted strings in CSV > - > > Key: FLINK-6016 > URL: https://issues.apache.org/jira/browse/FLINK-6016 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Major > > The RFC for the CSV format specifies that newlines are valid in quoted > strings in CSV: > https://tools.ietf.org/html/rfc4180 > However, when parsing a CSV file with Flink containing a newline, such as: > {noformat} > "3 > 4",5 > {noformat} > you get this exception: > {noformat} > Line could not be parsed: '"3' > ParserError UNTERMINATED_QUOTED_STRING > Expect field types: class java.lang.String, class java.lang.String > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6016) Newlines should be valid in quoted strings in CSV
[ https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486510#comment-16486510 ] Luke Hutchison commented on FLINK-6016: --- [~pshvetso]: It looks like you were able to replicate the issue, albeit with a different error message. The underlying code has probably changed since I reported this, but I see no real difference between the error message being UNTERMINATED_QUOTED_STRING or "Row too short". > Newlines should be valid in quoted strings in CSV > - > > Key: FLINK-6016 > URL: https://issues.apache.org/jira/browse/FLINK-6016 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Major > > The RFC for the CSV format specifies that newlines are valid in quoted > strings in CSV: > https://tools.ietf.org/html/rfc4180 > However, when parsing a CSV file with Flink containing a newline, such as: > {noformat} > "3 > 4",5 > {noformat} > you get this exception: > {noformat} > Line could not be parsed: '"3' > ParserError UNTERMINATED_QUOTED_STRING > Expect field types: class java.lang.String, class java.lang.String > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207174#comment-16207174 ] Luke Hutchison commented on FLINK-6019: --- [~StephanEwen] yes, please see the last few lines of my previous comment. Thanks for the followup. > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6016) Newlines should be valid in quoted strings in CSV
[ https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16150860#comment-16150860 ] Luke Hutchison commented on FLINK-6016: --- Yes, that's what I'm suggesting. The data doesn't have to be read twice, it can be emitted in the first pass, but the efficiency of doing so depends on the bandwidth between the single reading thread and the worker threads for each shard. A more scalable approach, though more complex, would be to build a state machine for each shard, recording the state at each input character, and then "run off the end" of each shard boundary until the state of the parser from the previous shard matches the state of the parser for the next shard at the same character position. The "overrun" parser state overwrites the next shard parser state until the states match. Then the state marker for unquoted newline is found to determine line breaks. > Newlines should be valid in quoted strings in CSV > - > > Key: FLINK-6016 > URL: https://issues.apache.org/jira/browse/FLINK-6016 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The RFC for the CSV format specifies that newlines are valid in quoted > strings in CSV: > https://tools.ietf.org/html/rfc4180 > However, when parsing a CSV file with Flink containing a newline, such as: > {noformat} > "3 > 4",5 > {noformat} > you get this exception: > {noformat} > Line could not be parsed: '"3' > ParserError UNTERMINATED_QUOTED_STRING > Expect field types: class java.lang.String, class java.lang.String > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6016) Newlines should be valid in quoted strings in CSV
[ https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16149440#comment-16149440 ] Luke Hutchison commented on FLINK-6016: --- Mikhail: correct, this can't be fixed easily once the lines are already split. It needs to be fixed in the input reader state machine that does the splitting in the first place (this is single threaded code, of necessity). > Newlines should be valid in quoted strings in CSV > - > > Key: FLINK-6016 > URL: https://issues.apache.org/jira/browse/FLINK-6016 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The RFC for the CSV format specifies that newlines are valid in quoted > strings in CSV: > https://tools.ietf.org/html/rfc4180 > However, when parsing a CSV file with Flink containing a newline, such as: > {noformat} > "3 > 4",5 > {noformat} > you get this exception: > {noformat} > Line could not be parsed: '"3' > ParserError UNTERMINATED_QUOTED_STRING > Expect field types: class java.lang.String, class java.lang.String > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15991926#comment-15991926 ] Luke Hutchison commented on FLINK-6019: --- [~StephanEwen] No, the lines I am talking about do not appear in the log, that is the whole problem. See the example below -- the first four lines are in the log (which is also dumped to stderr), the remaining lines are sent to stdout, and are not sent to the log: {noformat} 2017-05-01 17:46:45 INFO Main:181 - RentLogic building score calculator 2017-05-01 17:46:45 INFO Main:127 - Removing temp dir: /tmp/RentLogic-80019139385321 2017-05-01 17:46:45 INFO Main:147 - Freed /tmp/RentLogic-80019139385321 (0 bytes) 2017-05-01 17:46:47 INFO Main:80 - Reading problem severity score spreadsheet Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1367532263] 05/01/2017 17:46:49 Job execution switched to status RUNNING. 05/01/2017 17:46:49 DataSource (read table vowner_building)(1/16) switched to SCHEDULED 05/01/2017 17:46:49 DataSource (read table vowner_building)(1/16) switched to DEPLOYING 05/01/2017 17:46:49 DataSource (read table vowner_building)(2/16) switched to SCHEDULED 05/01/2017 17:46:49 DataSource (read table vowner_building)(2/16) switched to DEPLOYING 05/01/2017 17:46:49 DataSource (read table vowner_building)(3/16) switched to SCHEDULED 05/01/2017 17:46:49 DataSource (read table vowner_building)(3/16) switched to DEPLOYING 05/01/2017 17:46:49 DataSource (read table vowner_building)(4/16) switched to SCHEDULED [...] {noformat} Calling {{executionConfig.disableSysoutLogging();}} does in fact disable the additional "Connected to" and "switched to" lines (the Akka log lines), thanks for the tip. However, these Akka log lines do not appear in the regular log. What I want to do is merge these two log types together. The "switched to" Akka log lines are generated here (I'm not sure where the "Connected to" line comes from): https://github.com/apache/flink/blob/master/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala#L72 > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977155#comment-15977155 ] Luke Hutchison commented on FLINK-3328: --- Right, but shading requires a complete copy of the deps to be shipped inside the jar. I understand the desire to reduce namespace clashes and to not allow users to depend upon Flink's deps (or to not expose users to Flink's deps). But I think it's actually worse to ship a complete copy of all deps (shaded) with a library, due to increased overall size, if a consumer of the library also uses the dep. (Also, in the general case, bundling is almost always a bad idea for security reasons -- if an urgent patch is needed to a shaded jar, users have to wait for the producer of the library to re-build with the fix, rather than just depending upon an ABI-compatible fixed version themselves -- although this is mitigated here by the fact that Flink is open source, so users can build their own Flink libraries if they need to. Bundling in general is generally frowned upon for this reason, e.g. in the situation outside the JVM world where a large number of programs statically link in OpenSSH and/or zlib, rather than using the systemwide version of it.) Apologies if I'm still misunderstanding the issues here. > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING] - org.fusesource.jansi.Ansi$Attribute > [WARNING] - org.fusesource.jansi.internal.Kernel32 > [WARNING] - 13 more... > [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, > commons-beanutils-1.7.0.jar define 10 overlapping classes: > [WARNING] - org.apache.commons.collections.FastHashMap$EntrySet > [WARNING] - org.apache.commons.collections.ArrayStack > [WARNING] - org.apache.commons.collections.FastHashMap$1 > [WARNING] - org.apache.commons.collections.FastHashMap$KeySet > [WARNING] - org.apache.commons.collections.FastHashMap$CollectionView > [WARNING] - org.apache.commons.collections.BufferUnderflowException > [WARNING] - org.apache.commons.collections.Buffer > [WARNING] - > org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator > [WARNING] - org.apache.commons.collections.FastHashMap$Values > [WARNING] - org.apache.commons.collections.FastHashMap > [WARNING] flink-streaming-scala_2.10-1.0-SNAPSHOT.jar, > flink-core-1.0-SNAPSHOT.jar,
[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976649#comment-15976649 ] Luke Hutchison commented on FLINK-3328: --- What's the advantage to Flink of including all these shaded deps, rather than just relying on something like Maven or Gradle to pull in deps? There are numerous reasons why it is not usually a good idea to incorporate copies of external dependencies. > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING] - org.fusesource.jansi.Ansi$Attribute > [WARNING] - org.fusesource.jansi.internal.Kernel32 > [WARNING] - 13 more... > [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, > commons-beanutils-1.7.0.jar define 10 overlapping classes: > [WARNING] - org.apache.commons.collections.FastHashMap$EntrySet > [WARNING] - org.apache.commons.collections.ArrayStack > [WARNING] - org.apache.commons.collections.FastHashMap$1 > [WARNING] - org.apache.commons.collections.FastHashMap$KeySet > [WARNING] - org.apache.commons.collections.FastHashMap$CollectionView > [WARNING] - org.apache.commons.collections.BufferUnderflowException > [WARNING] - org.apache.commons.collections.Buffer > [WARNING] - > org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator > [WARNING] - org.apache.commons.collections.FastHashMap$Values > [WARNING] - org.apache.commons.collections.FastHashMap > [WARNING] flink-streaming-scala_2.10-1.0-SNAPSHOT.jar, > flink-core-1.0-SNAPSHOT.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar, > flink-java-1.0-SNAPSHOT.jar, flink-streaming-java_2.10-1.0-SNAPSHOT.jar, > flink-scala_2.10-1.0-SNAPSHOT.jar, flink-clients_2.10-1.0-SNAPSHOT.jar, > flink-optimizer_2.10-1.0-SNAPSHOT.jar, > flink-runtime-web_2.10-1.0-SNAPSHOT.jar define 1690 overlapping classes: > [WARNING] - > org.apache.flink.shaded.com.google.common.collect.LinkedListMultimap > [WARNING] - > org.apache.flink.shaded.com.google.common.io.ByteSource$AsCharSource > [WARNING] - org.apache.flink.shaded.com.google.common.escape.Platform > [WARNING] - > org.apache.flink.shaded.com.google.common.util.concurrent.Futures$ImmediateFailedCheckedFuture > [WARNING] - > org.apache.flink.shaded.com.google.common.primitives.SignedBytes$LexicographicalComparator > [WARNING] - > org.apache.flink.shaded.com.google.common.cache.LocalCache$WriteQueue$2
[jira] [Comment Edited] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975268#comment-15975268 ] Luke Hutchison edited comment on FLINK-3328 at 4/19/17 6:43 PM: [~StephanEwen] It's only not a problem if you know that in every case, the version of every single shared library will be the same between all Flink jars. And even if you know that will be the case, it's hardly ideal space-wise, because you end up pulling in the same dependencies multiple times for every Flink project, until a fat jar is built. In general, there is a good reason for that Maven warning, because if there's any way that multiple different versions of a dep can occur, you can trigger bugs that are maddeningly hard to trace. If every Flink jar (like {{flink-runtime}}) depends on one of the core Flink jars (e.g. {{flink-core}}), couldn't all the shared dependencies just be put in that one jar? was (Author: lukehutch): [~StephanEwen] It's only not a problem if you know that in every case, the version of every single shared library will be the same between all Flink jars. And even if you know that will be the case, it's hardly ideal space-wise, because you end up pulling in the same dependencies multiple times for every Flink project, until a fat jar is built. In general, there is a good reason for that Maven warning, because if there's any way that multiple different versions of a dep can occur, you can trigger bugs that are maddeningly hard to trace. If every Flink jar (like `flink-runtime`) depends on one of the core Flink jars (e.g. `flink-core`), couldn't all the shared dependencies just be put in that one jar? > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING] - org.fusesource.jansi.Ansi$Attribute > [WARNING] - org.fusesource.jansi.internal.Kernel32 > [WARNING] - 13 more... > [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, > commons-beanutils-1.7.0.jar define 10 overlapping classes: > [WARNING] - org.apache.commons.collections.FastHashMap$EntrySet > [WARNING] - org.apache.commons.collections.ArrayStack > [WARNING] - org.apache.commons.collections.FastHashMap$1 > [WARNING] - org.apache.commons.collections.FastHashMap$KeySet > [WARNING] - org.apache.commons.collections.FastHashMap$CollectionView > [WARNING] -
[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975268#comment-15975268 ] Luke Hutchison commented on FLINK-3328: --- [~StephanEwen] It's only not a problem if you know that in every case, the version of every single shared library will be the same between all Flink jars. And even if you know that will be the case, it's hardly ideal space-wise, because you end up pulling in the same dependencies multiple times for every Flink project, until a fat jar is built. In general, there is a good reason for that Maven warning, because if there's any way that multiple different versions of a dep can occur, you can trigger bugs that are maddeningly hard to trace. If every Flink jar (like `flink-runtime`) depends on one of the core Flink jars (e.g. `flink-core`), couldn't all the shared dependencies just be put in that one jar? > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING] - org.fusesource.jansi.Ansi$Attribute > [WARNING] - org.fusesource.jansi.internal.Kernel32 > [WARNING] - 13 more... > [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, > commons-beanutils-1.7.0.jar define 10 overlapping classes: > [WARNING] - org.apache.commons.collections.FastHashMap$EntrySet > [WARNING] - org.apache.commons.collections.ArrayStack > [WARNING] - org.apache.commons.collections.FastHashMap$1 > [WARNING] - org.apache.commons.collections.FastHashMap$KeySet > [WARNING] - org.apache.commons.collections.FastHashMap$CollectionView > [WARNING] - org.apache.commons.collections.BufferUnderflowException > [WARNING] - org.apache.commons.collections.Buffer > [WARNING] - > org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator > [WARNING] - org.apache.commons.collections.FastHashMap$Values > [WARNING] - org.apache.commons.collections.FastHashMap > [WARNING] flink-streaming-scala_2.10-1.0-SNAPSHOT.jar, > flink-core-1.0-SNAPSHOT.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar, > flink-java-1.0-SNAPSHOT.jar, flink-streaming-java_2.10-1.0-SNAPSHOT.jar, > flink-scala_2.10-1.0-SNAPSHOT.jar, flink-clients_2.10-1.0-SNAPSHOT.jar, > flink-optimizer_2.10-1.0-SNAPSHOT.jar, > flink-runtime-web_2.10-1.0-SNAPSHOT.jar define 1690 overlapping classes: > [WARNING] - > org.apache.flink.shaded.com.google.common.collect.LinkedListMultimap > [WARNING]
[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15969820#comment-15969820 ] Luke Hutchison commented on FLINK-3328: --- [~rmetzger] I'm still seeing this in Flink 1.2.0: {noformat} [WARNING] flink-core-1.2.0.jar, flink-java-1.2.0.jar, flink-runtime_2.10-1.2.0.jar define 166 overlapping classes: [WARNING] - org.apache.flink.shaded.org.objectweb.asm.xml.ASMContentHandler$LocalVarRule [WARNING] - org.apache.flink.shaded.org.objectweb.asm.tree.InvokeDynamicInsnNode [WARNING] - org.apache.flink.shaded.org.objectweb.asm.xml.ASMContentHandler$ExceptionRule [WARNING] - org.apache.flink.shaded.org.objectweb.asm.Attribute [WARNING] - org.apache.flink.shaded.org.objectweb.asm.xml.ASMContentHandler$LookupSwitchLabelRule [WARNING] - org.apache.flink.shaded.org.objectweb.asm.tree.LdcInsnNode [WARNING] - org.apache.flink.shaded.org.objectweb.asm.commons.JSRInlinerAdapter$Instantiation [WARNING] - org.apache.flink.shaded.org.objectweb.asm.xml.Processor$OutputSlicingHandler [WARNING] - org.apache.flink.shaded.org.objectweb.asm.tree.LocalVariableAnnotationNode [WARNING] - org.apache.flink.shaded.org.objectweb.asm.commons.StaticInitMerger [WARNING] - 156 more... [WARNING] flink-runtime-web_2.10-1.2.0.jar, flink-runtime_2.10-1.2.0.jar, flink-optimizer_2.10-1.2.0.jar define 1690 overlapping classes: [WARNING] - org.apache.flink.shaded.com.google.common.collect.Synchronized$SynchronizedSortedSetMultimap [WARNING] - org.apache.flink.shaded.com.google.common.util.concurrent.FutureCallback [WARNING] - org.apache.flink.shaded.com.google.common.util.concurrent.Monitor$Guard [WARNING] - org.apache.flink.shaded.com.google.common.util.concurrent.Striped$2 [WARNING] - org.apache.flink.shaded.com.google.common.collect.ImmutableEnumSet$1 [WARNING] - org.apache.flink.shaded.com.google.common.primitives.Bytes$ByteArrayAsList [WARNING] - org.apache.flink.shaded.com.google.common.math.IntMath$1 [WARNING] - org.apache.flink.shaded.com.google.common.hash.Hashing$ConcatenatedHashFunction [WARNING] - org.apache.flink.shaded.com.google.common.hash.Funnels$UnencodedCharsFunnel [WARNING] - org.apache.flink.shaded.com.google.common.collect.DenseImmutableTable$ImmutableArrayMap$1$1 [WARNING] - 1680 more... {noformat} > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING]
[jira] [Commented] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961980#comment-15961980 ] Luke Hutchison commented on FLINK-6276: --- [~Zentol] Thanks for explaining. I assume that this would all be fixed for JVM languages with fully reified types, e.g. if a Ceylon binding were created for Flink? I hope that at least the error messages I listed in points 1-3 in a previous comment can be fixed. > InvalidTypesException: Unknown Error. Type is null. > --- > > Key: FLINK-6276 > URL: https://issues.apache.org/jira/browse/FLINK-6276 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Quite frequently when writing Flink code, I get the exception > {{InvalidTypesException: Unknown Error. Type is null.}} > A small example that triggers it is: > {code} > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > public class TestMain { > @SafeVarargs > public staticDataSet > join(V > missingValuePlaceholder, > DataSet >... datasets) { > DataSet > join = null; > for (int i = 0; i < datasets.length; i++) { > final int datasetIdx = i; > if (datasetIdx == 0) { > join = datasets[datasetIdx] >.map(t -> { > List initialList = new ArrayList<>(); > initialList.add(t.f1); > return new Tuple2<>(t.f0, initialList); > }) > .name("start join"); > } else { > join = join.coGroup(datasets[datasetIdx]) // > .where(0).equalTo(0) // > .with((Iterable > li, > Iterable > ri, > Collector > out) -> { > K key = null; > List vals = new ArrayList<>(datasetIdx + 1); > Iterator > lIter = > li.iterator(); > if (!lIter.hasNext()) { > for (int j = 0; j < datasetIdx; j++) { > vals.add(missingValuePlaceholder); > } > } else { > Tuple2 lt = lIter.next(); > key = lt.f0; > vals.addAll(lt.f1); > if (lIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > Iterator > rIter = ri.iterator(); > if (!rIter.hasNext()) { > vals.add(missingValuePlaceholder); > } else { > Tuple2 rt = rIter.next(); > key = rt.f0; > vals.add(rt.f1); > if (rIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > out.collect(new Tuple2 (key, vals)); > }) // > .name("join #" + datasetIdx); > } > } > return join; > } > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet > x = // > env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), > new Tuple2<>("c", 5)); > DataSet > y = // > env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), > new Tuple2<>("d", 2)); > DataSet > z = // > env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), > new Tuple2<>("e", 9)); > System.out.println(join(-1, x, y, z).collect()); > } > } > {code} > The stacktrace that is triggered is: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The return type > of function 'join(TestMain.java:23)' could
[jira] [Commented] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961936#comment-15961936 ] Luke Hutchison commented on FLINK-6276: --- [~sunjincheng121] [~Zentol] I understand that type parameters are not currently accepted. But why can't the serializer just serialize whatever concrete type it receives, and deserialize it to the same concrete type on the other end? If the program typechecks, I don't see why this would be a problem. Is the issue that the serializer does not currently store the concrete type in the serialized representation? If so, in the case of generics, the concrete type should in fact be serialized with the object, so that generics may be supported. Not supporting generics is a huge shortcoming. > InvalidTypesException: Unknown Error. Type is null. > --- > > Key: FLINK-6276 > URL: https://issues.apache.org/jira/browse/FLINK-6276 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Quite frequently when writing Flink code, I get the exception > {{InvalidTypesException: Unknown Error. Type is null.}} > A small example that triggers it is: > {code} > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > public class TestMain { > @SafeVarargs > public staticDataSet > join(V > missingValuePlaceholder, > DataSet >... datasets) { > DataSet > join = null; > for (int i = 0; i < datasets.length; i++) { > final int datasetIdx = i; > if (datasetIdx == 0) { > join = datasets[datasetIdx] >.map(t -> { > List initialList = new ArrayList<>(); > initialList.add(t.f1); > return new Tuple2<>(t.f0, initialList); > }) > .name("start join"); > } else { > join = join.coGroup(datasets[datasetIdx]) // > .where(0).equalTo(0) // > .with((Iterable > li, > Iterable > ri, > Collector > out) -> { > K key = null; > List vals = new ArrayList<>(datasetIdx + 1); > Iterator > lIter = > li.iterator(); > if (!lIter.hasNext()) { > for (int j = 0; j < datasetIdx; j++) { > vals.add(missingValuePlaceholder); > } > } else { > Tuple2 lt = lIter.next(); > key = lt.f0; > vals.addAll(lt.f1); > if (lIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > Iterator > rIter = ri.iterator(); > if (!rIter.hasNext()) { > vals.add(missingValuePlaceholder); > } else { > Tuple2 rt = rIter.next(); > key = rt.f0; > vals.add(rt.f1); > if (rIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > out.collect(new Tuple2 (key, vals)); > }) // > .name("join #" + datasetIdx); > } > } > return join; > } > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet > x = // > env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), > new Tuple2<>("c", 5)); > DataSet > y = // > env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), > new Tuple2<>("d", 2)); > DataSet > z = // > env.fromElements(new Tuple2<>("c", 7), new
[jira] [Commented] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960407#comment-15960407 ] Luke Hutchison commented on FLINK-6276: --- [~Zentol] how does my code throw away more information than Java already throws away because of type erasure? Why does the type hint not work? To be clear, there are several issues here: # The {{Unknown Error. Type is null.}} message is not at all helpful. # The {{The return type of function 'join(TestMain.java:23)' could not be determined automatically}} message gives the wrong reason for the problem (it is not the return type of {{join()}} that is the problem). # The suggested solution {{You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface}} doesn't work for generics. > InvalidTypesException: Unknown Error. Type is null. > --- > > Key: FLINK-6276 > URL: https://issues.apache.org/jira/browse/FLINK-6276 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Quite frequently when writing Flink code, I get the exception > {{InvalidTypesException: Unknown Error. Type is null.}} > A small example that triggers it is: > {code} > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > public class TestMain { > @SafeVarargs > public staticDataSet > join(V > missingValuePlaceholder, > DataSet >... datasets) { > DataSet > join = null; > for (int i = 0; i < datasets.length; i++) { > final int datasetIdx = i; > if (datasetIdx == 0) { > join = datasets[datasetIdx] >.map(t -> { > List initialList = new ArrayList<>(); > initialList.add(t.f1); > return new Tuple2<>(t.f0, initialList); > }) > .name("start join"); > } else { > join = join.coGroup(datasets[datasetIdx]) // > .where(0).equalTo(0) // > .with((Iterable > li, > Iterable > ri, > Collector > out) -> { > K key = null; > List vals = new ArrayList<>(datasetIdx + 1); > Iterator > lIter = > li.iterator(); > if (!lIter.hasNext()) { > for (int j = 0; j < datasetIdx; j++) { > vals.add(missingValuePlaceholder); > } > } else { > Tuple2 lt = lIter.next(); > key = lt.f0; > vals.addAll(lt.f1); > if (lIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > Iterator > rIter = ri.iterator(); > if (!rIter.hasNext()) { > vals.add(missingValuePlaceholder); > } else { > Tuple2 rt = rIter.next(); > key = rt.f0; > vals.add(rt.f1); > if (rIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > out.collect(new Tuple2 (key, vals)); > }) // > .name("join #" + datasetIdx); > } > } > return join; > } > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet > x = // > env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), > new Tuple2<>("c", 5)); > DataSet > y = // > env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), > new
[jira] [Commented] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960367#comment-15960367 ] Luke Hutchison commented on FLINK-6276: --- I just tested, and the code as currently shown in the initial bug report does in fact work with {{String}} substituted for {{K}} and {{Integer}} substituted for {{V}} . > InvalidTypesException: Unknown Error. Type is null. > --- > > Key: FLINK-6276 > URL: https://issues.apache.org/jira/browse/FLINK-6276 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Quite frequently when writing Flink code, I get the exception > {{InvalidTypesException: Unknown Error. Type is null.}} > A small example that triggers it is: > {code} > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > public class TestMain { > @SafeVarargs > public staticDataSet > join(V > missingValuePlaceholder, > DataSet >... datasets) { > DataSet > join = null; > for (int i = 0; i < datasets.length; i++) { > final int datasetIdx = i; > if (datasetIdx == 0) { > join = datasets[datasetIdx] >.map(t -> { > List initialList = new ArrayList<>(); > initialList.add(t.f1); > return new Tuple2<>(t.f0, initialList); > }) > .name("start join"); > } else { > join = join.coGroup(datasets[datasetIdx]) // > .where(0).equalTo(0) // > .with((Iterable > li, > Iterable > ri, > Collector > out) -> { > K key = null; > List vals = new ArrayList<>(datasetIdx + 1); > Iterator > lIter = > li.iterator(); > if (!lIter.hasNext()) { > for (int j = 0; j < datasetIdx; j++) { > vals.add(missingValuePlaceholder); > } > } else { > Tuple2 lt = lIter.next(); > key = lt.f0; > vals.addAll(lt.f1); > if (lIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > Iterator > rIter = ri.iterator(); > if (!rIter.hasNext()) { > vals.add(missingValuePlaceholder); > } else { > Tuple2 rt = rIter.next(); > key = rt.f0; > vals.add(rt.f1); > if (rIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > out.collect(new Tuple2 (key, vals)); > }) // > .name("join #" + datasetIdx); > } > } > return join; > } > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet > x = // > env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), > new Tuple2<>("c", 5)); > DataSet > y = // > env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), > new Tuple2<>("d", 2)); > DataSet > z = // > env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), > new Tuple2<>("e", 9)); > System.out.println(join(-1, x, y, z).collect()); > } > } > {code} > The stacktrace that is triggered is: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The return type > of function 'join(TestMain.java:23)' could not be determined automatically, > due to type erasure. You can give type information hints by
[jira] [Comment Edited] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960349#comment-15960349 ] Luke Hutchison edited comment on FLINK-6276 at 4/7/17 6:43 AM: --- Looking at the flink code, the issue here is the type of {{join}} not being available when {{datasetIdx == 1}}. Adding a type hint as follows: {code} if (datasetIdx == 0) { join = datasets[datasetIdx] .map(t -> { List initialList = new ArrayList<>(); initialList.add(t.f1); return new Tuple2<>(t.f0, initialList); }) .name("start join") .returns(TypeInformation.of(new TypeHint>(){})); } else { /* ... */ } {code} causes the following exception to be thrown: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet com.rentlogic.buildingscores.flink.experimental.TestMain.join(java.lang.Object,org.apache.flink.api.java.DataSet[])' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588) at org.apache.flink.api.common.typeinfo.TypeHint.(TypeHint.java:47) at com.rentlogic.buildingscores.flink.experimental.TestMain$1.(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:77) {noformat} (Every time I have hit this problem in the past (at least five or six separate times), I have tried numerous alternatives, and hit a brick wall, so had to go back and redesign the pipeline so that it didn't use generics. I have no idea what the right solution to this is.) was (Author: lukehutch): Looking at the flink code, the issue here is the type of {{join}} not being available when {{datasetIdx == 1}}. Adding a type hint as follows: {code} if (datasetIdx == 0) { join = datasets[datasetIdx] // .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) // .name("start join") .returns(TypeInformation.of(new TypeHint >(){})); } else { /* ... */ } {code} causes the following exception to be thrown: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet com.rentlogic.buildingscores.flink.experimental.TestMain.join(java.lang.Object,org.apache.flink.api.java.DataSet[])' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588) at org.apache.flink.api.common.typeinfo.TypeHint.(TypeHint.java:47) at com.rentlogic.buildingscores.flink.experimental.TestMain$1.(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:77) {noformat} (Every time I have hit this problem in the past (at least five or six separate times), I have tried numerous alternatives, and hit a brick wall, so had to go back and redesign the pipeline so that it didn't use generics. I have no idea what the right solution to this is.) > InvalidTypesException: Unknown Error. Type is
[jira] [Updated] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6276: -- Description: Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown Error. Type is null.}} A small example that triggers it is: {code} import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class TestMain { @SafeVarargs public staticDataSet > join(V missingValuePlaceholder, DataSet >... datasets) { DataSet > join = null; for (int i = 0; i < datasets.length; i++) { final int datasetIdx = i; if (datasetIdx == 0) { join = datasets[datasetIdx] .map(t -> { List initialList = new ArrayList<>(); initialList.add(t.f1); return new Tuple2<>(t.f0, initialList); }) .name("start join"); } else { join = join.coGroup(datasets[datasetIdx]) // .where(0).equalTo(0) // .with((Iterable > li, Iterable > ri, Collector > out) -> { K key = null; List vals = new ArrayList<>(datasetIdx + 1); Iterator > lIter = li.iterator(); if (!lIter.hasNext()) { for (int j = 0; j < datasetIdx; j++) { vals.add(missingValuePlaceholder); } } else { Tuple2 lt = lIter.next(); key = lt.f0; vals.addAll(lt.f1); if (lIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } Iterator > rIter = ri.iterator(); if (!rIter.hasNext()) { vals.add(missingValuePlaceholder); } else { Tuple2 rt = rIter.next(); key = rt.f0; vals.add(rt.f1); if (rIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } out.collect(new Tuple2 (key, vals)); }) // .name("join #" + datasetIdx); } } return join; } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet > x = // env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5)); DataSet > y = // env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new Tuple2<>("d", 2)); DataSet > z = // env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new Tuple2<>("e", 9)); System.out.println(join(-1, x, y, z).collect()); } } {code} The stacktrace that is triggered is: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'join(TestMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424) at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
[jira] [Commented] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960349#comment-15960349 ] Luke Hutchison commented on FLINK-6276: --- Looking at the flink code, the issue here is the type of {{join}} not being available when {{datasetIdx == 1}}. Adding a type hint as follows: {code} if (datasetIdx == 0) { join = datasets[datasetIdx] // .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) // .name("start join") .returns(TypeInformation.of(new TypeHint>(){})); } else { /* ... */ } {code} causes the following exception to be thrown: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet com.rentlogic.buildingscores.flink.experimental.TestMain.join(java.lang.Object,org.apache.flink.api.java.DataSet[])' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588) at org.apache.flink.api.common.typeinfo.TypeHint.(TypeHint.java:47) at com.rentlogic.buildingscores.flink.experimental.TestMain$1.(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:77) {noformat} (Every time I have hit this problem in the past (at least five or six separate times), I have tried numerous alternatives, and hit a brick wall, so had to go back and redesign the pipeline so that it didn't use generics. I have no idea what the right solution to this is.) > InvalidTypesException: Unknown Error. Type is null. > --- > > Key: FLINK-6276 > URL: https://issues.apache.org/jira/browse/FLINK-6276 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Quite frequently when writing Flink code, I get the exception > {{InvalidTypesException: Unknown Error. Type is null.}} > A small example that triggers it is: > {code} > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > public class TestMain { > @SafeVarargs > public static DataSet > join(V > missingValuePlaceholder, > DataSet >... datasets) { > DataSet > join = null; > for (int i = 0; i < datasets.length; i++) { > final int datasetIdx = i; > if (datasetIdx == 0) { > join = datasets[datasetIdx] // > .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) // > .name("start join"); > } else { > join = join.coGroup(datasets[datasetIdx]) // > .where(0).equalTo(0) // > .with((Iterable > li, > Iterable > ri, > Collector > out) -> { > K key = null; > List vals = new ArrayList<>(datasetIdx + 1); > Iterator > lIter = > li.iterator(); > if (!lIter.hasNext()) { > for (int j = 0; j < datasetIdx; j++) { > vals.add(missingValuePlaceholder); > } > } else { > Tuple2 lt = lIter.next(); > key = lt.f0; > vals.addAll(lt.f1); > if (lIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); >
[jira] [Updated] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6276: -- Description: Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown Error. Type is null.}} A small example that triggers it is: {code} import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class TestMain { @SafeVarargs public staticDataSet > join(V missingValuePlaceholder, DataSet >... datasets) { DataSet > join = null; for (int i = 0; i < datasets.length; i++) { final int datasetIdx = i; if (datasetIdx == 0) { join = datasets[datasetIdx] // .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) // .name("start join"); } else { join = join.coGroup(datasets[datasetIdx]) // .where(0).equalTo(0) // .with((Iterable > li, Iterable > ri, Collector > out) -> { K key = null; List vals = new ArrayList<>(datasetIdx + 1); Iterator > lIter = li.iterator(); if (!lIter.hasNext()) { for (int j = 0; j < datasetIdx; j++) { vals.add(missingValuePlaceholder); } } else { Tuple2 lt = lIter.next(); key = lt.f0; vals.addAll(lt.f1); if (lIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } Iterator > rIter = ri.iterator(); if (!rIter.hasNext()) { vals.add(missingValuePlaceholder); } else { Tuple2 rt = rIter.next(); key = rt.f0; vals.add(rt.f1); if (rIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } out.collect(new Tuple2 (key, vals)); }) // .name("join #" + datasetIdx); } } return join; } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet > x = // env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5)); DataSet > y = // env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new Tuple2<>("d", 2)); DataSet > z = // env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new Tuple2<>("e", 9)); System.out.println(join(-1, x, y, z).collect()); } } {code} The stacktrace that is triggered is: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'join(TestMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424) at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409) at
[jira] [Created] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.
Luke Hutchison created FLINK-6276: - Summary: InvalidTypesException: Unknown Error. Type is null. Key: FLINK-6276 URL: https://issues.apache.org/jira/browse/FLINK-6276 Project: Flink Issue Type: Bug Components: Core, DataSet API Affects Versions: 1.2.0 Reporter: Luke Hutchison Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown Error. Type is null.}} A small example that triggers it is: {code} import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class TestMain { @SafeVarargs public staticDataSet > join(V missingValuePlaceholder, DataSet >... datasets) { DataSet > join = null; for (int i = 0; i < datasets.length; i++) { final int datasetIdx = i; if (datasetIdx == 0) { join = datasets[datasetIdx] // .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) // .name("start join"); } else { join = join.coGroup(datasets[datasetIdx]) // .where(0).equalTo(0) // .with((Iterable > li, Iterable > ri, Collector > out) -> { K key = null; List vals = new ArrayList<>(datasetIdx + 1); Iterator > lIter = li.iterator(); if (!lIter.hasNext()) { for (int j = 0; j < datasetIdx; j++) { vals.add(missingValuePlaceholder); } } else { Tuple2 lt = lIter.next(); key = lt.f0; vals.addAll(lt.f1); if (lIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } Iterator > rIter = ri.iterator(); if (!rIter.hasNext()) { vals.add(missingValuePlaceholder); } else { Tuple2 rt = rIter.next(); key = rt.f0; vals.add(rt.f1); if (rIter.hasNext()) { throw new RuntimeException("Got non-unique key: " + key); } } out.collect(new Tuple2 (key, vals)); }) // .name("join #" + datasetIdx); } } return join; } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet > x = // env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5)); DataSet > y = // env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new Tuple2<>("d", 2)); DataSet > z = // env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new Tuple2<>("e", 9)); System.out.println(join(-1, x, y, z).collect()); } } {code} The stacktrace that is triggered is: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'join(TestMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424) at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950505#comment-15950505 ] Luke Hutchison commented on FLINK-6115: --- [~Zentol] In point (1), I wasn't referring to the overhead of serialization at all -- I was referring to the memory and time overhead of wrapping types in {{Optional}}, as a workaround for the lack of {{null}} value support in tuples. Specifically, using {{Optional}} types requires an additional heap allocation, which incurs a 16-byte overhead on a 64-bit VM, and comes at significant speed cost (memory allocations and garbage collection are the principal reason for inefficiency in JITted Java vs. native code). ( {{Optional}} also adds a layer of indirection, which adds a marginal additional performance cost.) In other words, the point I was making in (1) is that it is circular reasoning to say "we decided not to support null values in tuples for efficiency reasons -- so use {{Optional}} instead if you need null values". The other case to consider is the impact on code that never needs {{null}} values. That is what my point (2) addresses: that even for code that doesn't use {{null}} , the overhead of adding a bitfield to the serialized format will have an extremely minimal performance impact, relative to the amount of time actually doing anything with the serialized format -- because inevitably when you serialize an object, you're going to store it or transmit it, which takes several orders of magnitude more time to do than serializing or deserializing. And the memory overhead is one bit per field, rounded up to the nearest byte. So -- to summarize those two points more succintly: # Not supporting serialization of {{null}} incurs a major overhead for code that needs to use {{null}} values, by requiring the use of {{Optional}} wrappers, which can incur a large performance and memory penalty. # Supporting serialization of {{null}} would incur only a very minimal relative overhead on code that doesn't need to use {{null}} values. In other words, so far the efficiency argument has been the strongest argument made in favor of the status quo -- however, the efficiency argument really doesn't hold water as a reason not to support {{null}} values in tuples. Premature optimization is often a source of problems, which is why I asked if this has been benchmarked before. I have to imagine that somebody has benchmark numbers somewhere comparing different approaches. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at >
[jira] [Comment Edited] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950319#comment-15950319 ] Luke Hutchison edited comment on FLINK-6115 at 3/31/17 4:49 AM: [~greghogan] You're assuming data with {{null}} values in it is "bad data", or that getting a {{null}} value is always going to be unexpected, or that it will trigger a {{NullPointerException}} downstream. Far from it, there is a huge range of valid and intentional uses for {{null}} values. Many programmers won't be able to use {{Optional}}, for years, due to being stuck on Java 7 for reasons beyond their control). But even with {{Optional}} available, many Java 8 programmers still use {{null}} for a wide range of uses, and will until the language dies a very slow death. (It will always be impossible until the end of time, for example, to tell without an extra call if {{Map#get(key)}} returned {{null}} because there was no value corresponding to that key, or because {{null}} was mapped to the key -- but programmers know and expect this, for better or for worse.) The most ironic thing about this whole conversation though is the claim that omitting {{null}} was done for performance reasons, followed by the recommendation to use {{Optional}}: (1) both the performance overhead _and_ the memory overhead of wrapping types in {{Optional}} is _significantly higher_ than using a bitfield in the wire format of a serialized tuple to mark null fields; (2) the overhead of actually _using_ a serialized tuple for any of the things you ever need to serialize one for (i.e. writing a serialized tuple to persistent storage, and/or sending it over the wire) takes orders of magnitude more time than the serialization and deserialization process, which makes the impact of adding a bitfield vanishingly small. Did you actually benchmark the impact of serializing {{null}} before concluding it would be too inefficient, and are those numbers available? Having hard numbers, particularly in a head-to-head comparison (both using tuples without {{null}} values, using {{Record}} types with {{null}} values, and using tuples with {{Optional}} types) would be an important factor in the community decision process that you linked. was (Author: lukehutch): [~greghogan] You're assuming data with {{null}} values in it is "bad data", or that getting a {{null}} value is always going to be unexpected, or that it will trigger {{NullPointerException}}s downstream. Far from it, there is a huge range of valid and intentional uses for {{null}} values. Many programmers won't be able to use {{Optional}}, for years, due to being stuck on Java 7 for reasons beyond their control). But even with {{Optional}} available, many Java 8 programmers still use {{null}} for a wide range of uses, and will until the language dies a very slow death. (It will always be impossible until the end of time, for example, to tell without an extra call if {{Map#get(key)}} returned {{null}} because there was no value corresponding to that key, or because {{null}} was mapped to the key -- but programmers know and expect this, for better or for worse.) The most ironic thing about this whole conversation though is the claim that omitting {{null}} was done for performance reasons, followed by the recommendation to use {{Optional}}: (1) both the performance overhead _and_ the memory overhead of wrapping types in {{Optional}} is _significantly higher_ than using a bitfield in the wire format of a serialized tuple to mark null fields; (2) the overhead of actually _using_ a serialized tuple for any of the things you ever need to serialize one for (i.e. writing a serialized tuple to persistent storage, and/or sending it over the wire) takes orders of magnitude more time than the serialization and deserialization process, which makes the impact of adding a bitfield vanishingly small. Did you actually benchmark the impact of serializing {{null}} before concluding it would be too inefficient, and are those numbers available? Having hard numbers, particularly in a head-to-head comparison (both using tuples without {{null}}s, using {{Record}}s with nulls, and using tuples with {{Optional}}) would be an important factor in the community decision process that you linked. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950319#comment-15950319 ] Luke Hutchison commented on FLINK-6115: --- [~greghogan] You're assuming data with {{null}} values in it is "bad data", or that getting a {{null}} value is always going to be unexpected, or that it will trigger {{NullPointerException}}s downstream. Far from it, there is a huge range of valid and intentional uses for {{null}} values. Many programmers won't be able to use {{Optional}}, for years, due to being stuck on Java 7 for reasons beyond their control). But even with {{Optional}} available, many Java 8 programmers still use {{null}} for a wide range of uses, and will until the language dies a very slow death. (It will always be impossible until the end of time, for example, to tell without an extra call if {{Map#get(key)}} returned {{null}} because there was no value corresponding to that key, or because {{null}} was mapped to the key -- but programmers know and expect this, for better or for worse.) The most ironic thing about this whole conversation though is the claim that omitting {{null}} was done for performance reasons, followed by the recommendation to use {{Optional}}: (1) both the performance overhead _and_ the memory overhead of wrapping types in {{Optional}} is _significantly higher_ than using a bitfield in the wire format of a serialized tuple to mark null fields; (2) the overhead of actually _using_ a serialized tuple for any of the things you ever need to serialize one for (i.e. writing a serialized tuple to persistent storage, and/or sending it over the wire) takes orders of magnitude more time than the serialization and deserialization process, which makes the impact of adding a bitfield vanishingly small. Did you actually benchmark the impact of serializing {{null}} before concluding it would be too inefficient, and are those numbers available? Having hard numbers, particularly in a head-to-head comparison (both using tuples without {{null}}s, using {{Record}}s with nulls, and using tuples with {{Optional}}) would be an important factor in the community decision process that you linked. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948836#comment-15948836 ] Luke Hutchison commented on FLINK-6019: --- It is definitely the Akka log printing all these messages. However, I cannot get Flink's Akka log messages to be sent to the regular log. I have tried using a {{log4j.properties}} file, a {{logback.xml}} file, an Akka {{application.conf}} file, and I have even tried programmatically configuring log4j, but Flink's internal Akka implementation seems to be using an entirely separate Log4j instance. Why is Flink set up to have two separate log appenders, one of which is not even configurable? > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948412#comment-15948412 ] Luke Hutchison commented on FLINK-6115: --- [~greghogan]: fang yong's comment exactly illustrates the point: whether or not null values are efficient, and whether or not they were a good idea to add to Java in the first place, they are a fundamental part of the language _that is used, today, by millions of programmers, for reasons that go well beyond cases where something went wrong_ -- and not supporting this usage will only ever be a major point of surprise and pain for users of Flink. This is made worse by the fact that code that doesn't crash on small test datasets will start to crash on larger datasets, or in production, once things start to get serialized, as sounds like was experienced by fan yong. It's already an error in Flink to use non-serializable types in parts of the computation graph where objects can be serialized. This is statically checked, for exactly the reason that Flink should not simply find out about the need to serialize a non-serializable object at runtime. Similarly, Flink should not be discovering non-serializable values (nulls) within a serializable object only at runtime. If you strongly believe that tuples should never support nulls, then you should not allow tuple types to be used anywhere a serializable type is required, and enforce this in the computation graph builder/analyzer before graph execution even begins. Of course this will never happen, because tuples are too fundamental and useful -- but so is {{null}}. Ergo, tuples must support nulls. Sorry to belabor the point. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at >
[jira] [Comment Edited] (FLINK-6185) Output writers and OutputFormats need to support compression
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941797#comment-15941797 ] Luke Hutchison edited comment on FLINK-6185 at 3/25/17 8:23 PM: Here's my simple gzip OutputFormat though, in case anyone else is looking for a quick solution: {code} import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.zip.GZIPOutputStream; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; public class GZippableTextOutputFormat implements OutputFormat { private static final long serialVersionUID = 1L; private File file; private PrintWriter writer; private boolean gzip; public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean gzip) { if (writeMode != WriteMode.OVERWRITE) { // Make this explicit, since we're about to overwrite the file throw new IllegalArgumentException("writeMode must be WriteMode.OVERWRITE"); } this.file = file.getPath().endsWith(".gz") == gzip ? file : new File(file.getPath() + ".gz"); this.gzip = gzip; } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0) { throw new IllegalArgumentException("Invalid task number"); } if (numTasks == 0 || numTasks > 1) { throw new IllegalArgumentException( "must call setParallelism(1) to use " + ZippedJSONCollectionOutputFormat.class.getName()); } writer = gzip ? new PrintWriter(new GZIPOutputStream(new FileOutputStream(file))) : new PrintWriter(new FileOutputStream(file)); } public File getFile() { return file; } @Override public void configure(Configuration parameters) { } @Override public void writeRecord(T record) throws IOException { writer.println(record.toString()); } @Override public void close() throws IOException { if (writer != null) { writer.close(); } } @Override public String toString() { return this.getClass().getSimpleName() + "(" + file + ")"; } } {code} was (Author: lukehutch): Here's my simple gzip OutputFormat though, in case anyone else is looking for a quick solution: {code} import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.zip.GZIPOutputStream; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; public class GZippableTextOutputFormat implements OutputFormat { private static final long serialVersionUID = 1L; private File file; private PrintWriter writer; private boolean gzip; public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean gzip) { if (writeMode != WriteMode.OVERWRITE) { // Make this explicit, since we're about to overwrite the file throw new IllegalArgumentException("writeMode must be WriteMode.OVERWRITE"); } this.file = file.getPath().endsWith(".gz") == gzip ? file : new File(file.getPath() + ".gz"); this.gzip = gzip; } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0) { throw new IllegalArgumentException("Invalid task number"); } if (numTasks == 0 || numTasks > 1) { throw new IllegalArgumentException( "must call setParallelism(1) to use " + ZippedJSONCollectionOutputFormat.class.getName()); } try { writer = gzip ? new PrintWriter(new GZIPOutputStream(new FileOutputStream(file))) : new PrintWriter(new FileOutputStream(file)); } catch (Exception e) { throw new RuntimeException(e); } } public File getFile() { return file; } @Override public void configure(Configuration parameters) { } @Override public void writeRecord(T record) throws IOException { writer.println(record.toString()); } @Override public void close() throws IOException { if (writer != null) { writer.close(); } } @Override public String toString() { return this.getClass().getSimpleName() + "(" + file + ")"; } } {code} > Output writers and OutputFormats need to support compression > > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink >
[jira] [Comment Edited] (FLINK-6185) Output writers and OutputFormats need to support compression
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941797#comment-15941797 ] Luke Hutchison edited comment on FLINK-6185 at 3/25/17 8:22 PM: Here's my simple gzip OutputFormat though, in case anyone else is looking for a quick solution: {code} import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.zip.GZIPOutputStream; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; public class GZippableTextOutputFormat implements OutputFormat { private static final long serialVersionUID = 1L; private File file; private PrintWriter writer; private boolean gzip; public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean gzip) { if (writeMode != WriteMode.OVERWRITE) { // Make this explicit, since we're about to overwrite the file throw new IllegalArgumentException("writeMode must be WriteMode.OVERWRITE"); } this.file = file.getPath().endsWith(".gz") == gzip ? file : new File(file.getPath() + ".gz"); this.gzip = gzip; } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0) { throw new IllegalArgumentException("Invalid task number"); } if (numTasks == 0 || numTasks > 1) { throw new IllegalArgumentException( "must call setParallelism(1) to use " + ZippedJSONCollectionOutputFormat.class.getName()); } try { writer = gzip ? new PrintWriter(new GZIPOutputStream(new FileOutputStream(file))) : new PrintWriter(new FileOutputStream(file)); } catch (Exception e) { throw new RuntimeException(e); } } public File getFile() { return file; } @Override public void configure(Configuration parameters) { } @Override public void writeRecord(T record) throws IOException { writer.println(record.toString()); } @Override public void close() throws IOException { if (writer != null) { writer.close(); } } @Override public String toString() { return this.getClass().getSimpleName() + "(" + file + ")"; } } {code} was (Author: lukehutch): Here's my simple gzip OutputFormat though, in case anyone else is looking for a quick solution: {code} import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.zip.GZIPOutputStream; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; public class GZippableTextOutputFormat implements OutputFormat { private static final long serialVersionUID = 1L; private File file; private PrintWriter writer; private boolean gzip; public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean gzip) { if (writeMode != WriteMode.OVERWRITE) { // Make this explicit, since we're about to overwrite the file throw new IllegalArgumentException("writeMode must be WriteMode.OVERWRITE"); } this.file = file.getPath().endsWith(".gz") == gzip ? file : new File(file.getPath() + ".gz"); this.gzip = gzip; } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0) { throw new IllegalArgumentException("Invalid task number"); } if (numTasks == 0 || numTasks > 1) { throw new IllegalArgumentException( "must call setParallelism(1) to use " + ZippedJSONCollectionOutputFormat.class.getName()); } try { writer = gzip ? new PrintWriter(new GZIPOutputStream(new FileOutputStream(file))) : new PrintWriter(new FileOutputStream(file)); } catch (Exception e) { close(); throw new RuntimeException(e); } } public File getFile() { return file; } @Override public void configure(Configuration parameters) { } @Override public void writeRecord(T record) throws IOException { writer.println(record.toString()); } @Override public void close() throws IOException { writer.close(); } @Override public String toString() { return this.getClass().getSimpleName() + "(" + file + ")"; } } {code} > Output writers and OutputFormats need to support compression > > > Key: FLINK-6185 > URL:
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941882#comment-15941882 ] Luke Hutchison commented on FLINK-6115: --- [~greghogan] versioning of durable binary formats is not only standard practice, it is important. Yes, allowing null values in Java was a big mistake, but option types didn't exist back when nullable references were added to Java. And an {{Option}} or {{Maybe}} type was the first thing I went looking for in Flink to address the non-nullable tuple field issue. Flink definitely needs an {{Option}} type until Java supports its own (just as having its own {{Tuple}} types is critical to the usefulness of Flink). (For that matter, {{flatMap}} should work over {{Option}} types, treating them as collected lists of length 0 or 1, I went looking for that too...) However, the fact that null pointers (and general lack of strong nullability analysis) have caused no manner of pain to users of Java doesn't mean that they are not crucial to the way that the language works today, or to how programmers tend to use it. Even Flink uses null values the way they're generally used in Java in place of {{Option}} types -- e.g. giving you null values on an outer join when there is no corresponding key in one dataset. Yes, throwing a NPE in tuple constructors misses the manual setting of field values, I mentioned that in an earlier comment. However, it actually highly surprised me when I noticed that the tuple fields were non-final, based on one of the first things I read in the Flink documentation: "Flink has the special classes {{DataSet}} and {{DataStream}} to represent data in a program. You can think of them as immutable collections of data that can contain duplicates." If the collections and streams that contain tuples are supposed to be thought of as immutable, why should the individual elements of those collections and streams be mutable? Perhaps tuple field values should be made final (which would of course be a breaking change for some users, and would probably especially require changes internally in the aggregation operator code). Setting aside the issue of null fields in tuples, this will surely not be the last time that the serialization format will need to change! What if, for example, Flink needs to add support for some future new Java type, or needs to support another JVM language that requires some extra metadata of some form to be stored along with its serialized objects? I strongly recommend versioning the checkpoint files. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) >
[jira] [Commented] (FLINK-6185) Output writers and OutputFormats need to support compression
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941797#comment-15941797 ] Luke Hutchison commented on FLINK-6185: --- Here's my simple gzip OutputFormat though, in case anyone else is looking for a quick solution: {code} import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.zip.GZIPOutputStream; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; public class GZippableTextOutputFormat implements OutputFormat { private static final long serialVersionUID = 1L; private File file; private PrintWriter writer; private boolean gzip; public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean gzip) { if (writeMode != WriteMode.OVERWRITE) { // Make this explicit, since we're about to overwrite the file throw new IllegalArgumentException("writeMode must be WriteMode.OVERWRITE"); } this.file = file.getPath().endsWith(".gz") == gzip ? file : new File(file.getPath() + ".gz"); this.gzip = gzip; } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0) { throw new IllegalArgumentException("Invalid task number"); } if (numTasks == 0 || numTasks > 1) { throw new IllegalArgumentException( "must call setParallelism(1) to use " + ZippedJSONCollectionOutputFormat.class.getName()); } try { writer = gzip ? new PrintWriter(new GZIPOutputStream(new FileOutputStream(file))) : new PrintWriter(new FileOutputStream(file)); } catch (Exception e) { close(); throw new RuntimeException(e); } } public File getFile() { return file; } @Override public void configure(Configuration parameters) { } @Override public void writeRecord(T record) throws IOException { writer.println(record.toString()); } @Override public void close() throws IOException { writer.close(); } @Override public String toString() { return this.getClass().getSimpleName() + "(" + file + ")"; } } {code} > Output writers and OutputFormats need to support compression > > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such > as {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6185) Output writers and OutputFormats need to support compression
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6185: -- Summary: Output writers and OutputFormats need to support compression (was: Output writers and OutputFormats need to support gzip) > Output writers and OutputFormats need to support compression > > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such > as {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6185) Output writers and OutputFormats need to support gzip
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6185: -- Summary: Output writers and OutputFormats need to support gzip (was: Input readers and output writers/formats need to support gzip) > Output writers and OutputFormats need to support gzip > - > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such > as {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6185) Input readers and output writers/formats need to support gzip
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941795#comment-15941795 ] Luke Hutchison commented on FLINK-6185: --- Ah, sorry that I missed that. I may contribute some code to Flink in the future, but unfortunately right now I'm maxed out, so I hope that all the bug reports I'm sending in constitute some sort of contribution. > Input readers and output writers/formats need to support gzip > - > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such > as {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6185) Input readers and output writers/formats need to support gzip
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6185: -- Description: File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such as {{FileOutputFormat}} and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. was: File sources (such as {{env#readCsvFile()}}) and sinks (such as {{FileOutputFormat}} and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. > Input readers and output writers/formats need to support gzip > - > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such > as {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6185) Input readers and output writers/formats need to support gzip
Luke Hutchison created FLINK-6185: - Summary: Input readers and output writers/formats need to support gzip Key: FLINK-6185 URL: https://issues.apache.org/jira/browse/FLINK-6185 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor File sources (such as {{env#readCsvFile()}}) and sinks (such as FileOutputFormat and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6185) Input readers and output writers/formats need to support gzip
[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6185: -- Description: File sources (such as {{env#readCsvFile()}}) and sinks (such as {{FileOutputFormat}} and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. was: File sources (such as {{env#readCsvFile()}}) and sinks (such as FileOutputFormat and its subclasses, and methods such as {{DataSet#writeAsText()}}) need the ability to transparently decompress and compress files. Primarily gzip would be useful, but it would be nice if this were pluggable to support bzip2, xz, etc. There could be options for autodetect (based on file extension and/or file content), which could be the default, as well as no compression or a selected compression method. > Input readers and output writers/formats need to support gzip > - > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > File sources (such as {{env#readCsvFile()}}) and sinks (such as > {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940106#comment-15940106 ] Luke Hutchison commented on FLINK-6019: --- As far as I can tell, Akka uses a different log from Flink, and it looks like the above status switching log messages are sent to the Akka log, not to the Flink log. How do I get Akka to use the same log4j log that I have configured for Flink? There is an enormous amount of this log output dumped to stdout currently, and I cannot figure out how to get these log messages to go to the right place. > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940101#comment-15940101 ] Luke Hutchison commented on FLINK-6115: --- Hi [~fhueske]], So the wire format of individual serialized objects cannot be touched, but is there no version code or magic number at the beginning of savepoint files? If there is currently no magic number or version code, there should be one, for exactly this reason! You could add a magic number and serialization number to the beginning of future savepoint files, with only a vanishingly small chance of spurious collision with legitimate values if these numbers are looked for in the first couple of words of existing savepoint files, assuming you only need backwards compatibility, not also forwards compatibility. Then you would be free to use different serializers depending on the wire format version. If this is not done, then the Tuple constructors should probably throw NullPointerExceptions for now, until this is fixed. This is clearly not desirable, as not all tuples will be serialized -- but my point was that the user can't even predict or control when tuples are serialized, so this should probably be enforced at the point where the problem is introduced, and not later when the Flink runtime decides to serialize a tuple without the context that caused the problem being available. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937185#comment-15937185 ] Luke Hutchison commented on FLINK-6115: --- bq. Why do you think there is no penalty to encoding null? I didn't know that POJO serialization was slower than tuple serialization, I based my assumption on a tuple being a POJO. However, it still seems like the time penalty would still be minimal to have a bit vector at the beginning of a tuple's serialized representation indicating which fields were null. This would cost a few cycles per field to set or test a bit for a specific field, which would add maybe 50% to the amount of time needed to serialize or deserialize the bytes of an Integer, but would be negligible relative to the time needed to instantiate an object, or to deserialize most strings. What am I missing? > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source
[jira] [Comment Edited] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935589#comment-15935589 ] Luke Hutchison edited comment on FLINK-6114 at 3/22/17 1:05 AM: I figured maybe I was using distinct() wrong, so I tried replacing the code around {{// (1)}} with the following, to no avail: {code} DataSet> blank_numKeys = blank_key_totScore .groupBy(1) // (1) .reduceGroup((it, out) -> { for (@SuppressWarnings("unused") Tuple3 t : it) { out.collect(new Tuple2 ("", 1)); } }).groupBy(0).sum(1); {code} was (Author: lukehutch): I figured maybe I was using distinct() wrong, so I tried replacing the code around {{// (1)}} with the following, to no avail: {code} DataSet > blank_numKeys = blank_key_totScore .groupBy(1) // (1) .reduceGroup((it, out) -> { for (@SuppressWarnings("unused") Tuple3 t : it) { out.collect(new Tuple2 ("", 1)); } }); {code} > Type checking fails with generics, even when concrete type of field is not > needed > - > > Key: FLINK-6114 > URL: https://issues.apache.org/jira/browse/FLINK-6114 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The Flink type checker does not allow generic types to be used in any field > of a tuple when a join is being executed, even if the generic is not in a > field that is involved in the join. > I have a type Tuple3 , which contains a generic type > parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is > well-defined as String. However, this gives me the following error: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet > mypkg.MyClass.method(params)' could not be determined. This is most likely a > type erasure problem. The type extraction currently supports types with > generic variables only in cases where all variables in the return type can be > deduced from the input type(s). > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) > {noformat} > The code compiles fine, however -- the static type system is able to > correctly resolve the types in the surrounding code. > Really only the fields that are affected by joins (or groupBy, aggregation > etc.) should be checked for concrete types in this way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935576#comment-15935576 ] Luke Hutchison edited comment on FLINK-6114 at 3/22/17 12:45 AM: - [~greghogan] I partially reconstructed exactly what I was doing before the exception was triggered that I reported originally. I was wrong about not joining using the generic type, I was actually using it as the join key. This works for smaller tests that I try, but for my more extensive example shown below, I cannot get it to work. Given the following code: {code} import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.util.Collector; public class MainTest { public static DataSet> convertToFractionalRank(DataSet > key_score) { // Sum within each key // Result: ("", key, totScore) DataSet > blank_key_totScore = key_score .groupBy(0).sum(1) // Prepend with "" to prep for for join .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1)) // .returns(TypeInformation.of(new TypeHint >(){})) // (2) ; // Count unique keys. Result: ("", numKeys) DataSet > blank_numKeys = blank_key_totScore .distinct(1) // (1) .map(t -> new Tuple2 ("", 1)) .groupBy(0).sum(1); // Sort scores into order, then return the fractional rank in the range [0, 1] return blank_key_totScore .coGroup(blank_numKeys) .where(0).equalTo(0) .with((Iterable > ai, Iterable > bi, Collector > out) -> { int numKeys = bi.iterator().next().f1; for (Tuple3 a : ai) { out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys)); } }) // Group by "" (i.e. make into one group, so all the scores can be sorted together) .groupBy(0) // Sort in descending order of score (the highest score gets the lowest rank, and vice versa) .sortGroup(2, Order.DESCENDING) // Convert sorted rank from [0, numKeys-1] -> [0, 1] .reduceGroup( (Iterable > iter, Collector > out) -> { int rank = 0; for (Tuple4 t : iter) { int numKeys = t.f3; // Same for all tuples float fracRank = rank / (float) (numKeys - 1); out.collect(new Tuple2<>(/* key = */ t.f1, fracRank)); rank++; } }) .name("convert problem severity scores into building scores"); } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource , Float>> ds = env.fromElements( new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new Tuple2<>("x", 2), 1.0f), new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new Tuple2<>("y", 3), 1.0f)); DataSet , Float>> ds2 = convertToFractionalRank(ds); System.out.println(ds2.collect()); } } {code} This exception is thrown at the line marked {{// (1)}}: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the
[jira] [Commented] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935589#comment-15935589 ] Luke Hutchison commented on FLINK-6114: --- I figured maybe I was using distinct() wrong, so I tried replacing the code around {{// (1)}} with the following, to no avail: {code} DataSet> blank_numKeys = blank_key_totScore .groupBy(1) // (1) .reduceGroup((it, out) -> { for (@SuppressWarnings("unused") Tuple3 t : it) { out.collect(new Tuple2 ("", 1)); } }); {code} > Type checking fails with generics, even when concrete type of field is not > needed > - > > Key: FLINK-6114 > URL: https://issues.apache.org/jira/browse/FLINK-6114 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The Flink type checker does not allow generic types to be used in any field > of a tuple when a join is being executed, even if the generic is not in a > field that is involved in the join. > I have a type Tuple3 , which contains a generic type > parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is > well-defined as String. However, this gives me the following error: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet > mypkg.MyClass.method(params)' could not be determined. This is most likely a > type erasure problem. The type extraction currently supports types with > generic variables only in cases where all variables in the return type can be > deduced from the input type(s). > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) > {noformat} > The code compiles fine, however -- the static type system is able to > correctly resolve the types in the surrounding code. > Really only the fields that are affected by joins (or groupBy, aggregation > etc.) should be checked for concrete types in this way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6146) Incorrect function name given in exception thrown by DataSet.getType()
[ https://issues.apache.org/jira/browse/FLINK-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6146: -- Description: In the following code, this exception is thrown at the line marked {{// (1)}}: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28) at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164) at org.apache.flink.api.java.DataSet.map(DataSet.java:215) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21) ... 1 more Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131) ... 6 more {noformat} The code: {code} import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.util.Collector; public class MainTest { public static DataSet> convertToFractionalRank(DataSet > key_score) { // Sum within each key // Result: ("", key, totScore) DataSet > blank_key_totScore = key_score .groupBy(0).sum(1) // Prepend with "" to prep for for join .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1)); // Count unique keys. Result: ("", numKeys) DataSet > blank_numKeys = blank_key_totScore .distinct(1) // (1) .map(t -> new Tuple2 ("", 1)) .groupBy(0).sum(1); // Sort scores into order, then return the fractional rank in the range [0, 1] return blank_key_totScore .coGroup(blank_numKeys) .where(0).equalTo(0) .with((Iterable > ai, Iterable > bi, Collector > out) -> { int numKeys = bi.iterator().next().f1; for (Tuple3 a : ai) { out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys)); } }) // Group by "" (i.e. make into one group, so all the scores can be sorted together) .groupBy(0) // Sort in descending order of score (the highest score gets the lowest rank, and vice versa) .sortGroup(2, Order.DESCENDING) // Convert sorted rank from [0, numKeys-1] -> [0, 1] .reduceGroup( (Iterable > iter, Collector > out) -> { int rank = 0; for (Tuple4 t : iter) { int numKeys = t.f3; // Same for all tuples float fracRank = rank / (float) (numKeys - 1); out.collect(new Tuple2<>(/* key = */ t.f1, fracRank));
[jira] [Updated] (FLINK-6146) Incorrect function name given in exception thrown by DataSet.getType()
[ https://issues.apache.org/jira/browse/FLINK-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6146: -- Description: In the following code, this exception is thrown at the line marked {{// (1)}}: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28) at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164) at org.apache.flink.api.java.DataSet.map(DataSet.java:215) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21) ... 1 more Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131) ... 6 more {noformat} The code: {code} import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.util.Collector; public class MainTest { public static DataSet> convertToFractionalRank(DataSet > key_score) { // Sum within each key // Result: ("", key, totScore) DataSet > blank_key_totScore = key_score .groupBy(0).sum(1) // Prepend with "" to prep for for join .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1)); // Count unique keys. Result: ("", numKeys) DataSet > blank_numKeys = blank_key_totScore .distinct(0) // (1) .map(t -> new Tuple2 ("", 1)) .groupBy(0).sum(1); // Sort scores into order, then return the fractional rank in the range [0, 1] return blank_key_totScore .coGroup(blank_numKeys) .where(0).equalTo(0) .with((Iterable > ai, Iterable > bi, Collector > out) -> { int numKeys = bi.iterator().next().f1; for (Tuple3 a : ai) { out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys)); } }) // Group by "" (i.e. make into one group, so all the scores can be sorted together) .groupBy(0) // Sort in descending order of score (the highest score gets the lowest rank, and vice versa) .sortGroup(2, Order.DESCENDING) // Convert sorted rank from [0, numKeys-1] -> [0, 1] .reduceGroup( (Iterable > iter, Collector > out) -> { int rank = 0; for (Tuple4 t : iter) { int numKeys = t.f3; // Same for all tuples float fracRank = rank / (float) (numKeys - 1); out.collect(new Tuple2<>(/* key = */ t.f1, fracRank));
[jira] [Comment Edited] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935576#comment-15935576 ] Luke Hutchison edited comment on FLINK-6114 at 3/22/17 12:25 AM: - [~greghogan] I partially reconstructed exactly what I was doing before the exception was triggered that I reported originally. I was wrong about not joining using the generic type, I was actually using it as the join key. This works for smaller tests that I try, but for my more extensive example shown below, I cannot get it to work. Given the following code: {code} import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.util.Collector; public class MainTest { public static DataSet> convertToFractionalRank(DataSet > key_score) { // Sum within each key // Result: ("", key, totScore) DataSet > blank_key_totScore = key_score .groupBy(0).sum(1) // Prepend with "" to prep for for join .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1)) // .returns(TypeInformation.of(new TypeHint >(){})) // (2) ; // Count unique keys. Result: ("", numKeys) DataSet > blank_numKeys = blank_key_totScore .distinct(0) // (1) .map(t -> new Tuple2 ("", 1)) .groupBy(0).sum(1); // Sort scores into order, then return the fractional rank in the range [0, 1] return blank_key_totScore .coGroup(blank_numKeys) .where(0).equalTo(0) .with((Iterable > ai, Iterable > bi, Collector > out) -> { int numKeys = bi.iterator().next().f1; for (Tuple3 a : ai) { out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys)); } }) // Group by "" (i.e. make into one group, so all the scores can be sorted together) .groupBy(0) // Sort in descending order of score (the highest score gets the lowest rank, and vice versa) .sortGroup(2, Order.DESCENDING) // Convert sorted rank from [0, numKeys-1] -> [0, 1] .reduceGroup( (Iterable > iter, Collector > out) -> { int rank = 0; for (Tuple4 t : iter) { int numKeys = t.f3; // Same for all tuples float fracRank = rank / (float) (numKeys - 1); out.collect(new Tuple2<>(/* key = */ t.f1, fracRank)); rank++; } }) .name("convert problem severity scores into building scores"); } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource , Float>> ds = env.fromElements( new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new Tuple2<>("x", 2), 1.0f), new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new Tuple2<>("y", 3), 1.0f)); DataSet , Float>> ds2 = convertToFractionalRank(ds); System.out.println(ds2.collect()); } } {code} This exception is thrown at the line marked {{// (1)}}: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the
[jira] [Commented] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935576#comment-15935576 ] Luke Hutchison commented on FLINK-6114: --- [~greghogan] I partially reconstructed exactly what I was doing before the exception was triggered that I reported originally. I was wrong about not joining using the generic type, I was actually using it as the join key. This works for smaller tests that I try, but for my more extensive example shown below, I cannot get it to work. Given the following code: {{code}} import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.util.Collector; public class MainTest { public static DataSet> convertToFractionalRank(DataSet > key_score) { // Sum within each key // Result: ("", key, totScore) DataSet > blank_key_totScore = key_score .groupBy(0).sum(1) // Prepend with "" to prep for for join .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1)) // .returns(TypeInformation.of(new TypeHint >(){})) // (2) ; // Count unique keys. Result: ("", numKeys) DataSet > blank_numKeys = blank_key_totScore .distinct(0) // (1) .map(t -> new Tuple2 ("", 1)) .groupBy(0).sum(1); // Sort scores into order, then return the fractional rank in the range [0, 1] return blank_key_totScore .coGroup(blank_numKeys) .where(0).equalTo(0) .with((Iterable > ai, Iterable > bi, Collector > out) -> { int numKeys = bi.iterator().next().f1; for (Tuple3 a : ai) { out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys)); } }) // Group by "" (i.e. make into one group, so all the scores can be sorted together) .groupBy(0) // Sort in descending order of score (the highest score gets the lowest rank, and vice versa) .sortGroup(2, Order.DESCENDING) // Convert sorted rank from [0, numKeys-1] -> [0, 1] .reduceGroup( (Iterable > iter, Collector > out) -> { int rank = 0; for (Tuple4 t : iter) { int numKeys = t.f3; // Same for all tuples float fracRank = rank / (float) (numKeys - 1); out.collect(new Tuple2<>(/* key = */ t.f1, fracRank)); rank++; } }) .name("convert problem severity scores into building scores"); } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource , Float>> ds = env.fromElements( new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new Tuple2<>("x", 2), 1.0f), new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new Tuple2<>("y", 3), 1.0f)); DataSet , Float>> ds2 = convertToFractionalRank(ds); System.out.println(ds2.collect()); } } {{code}} This exception is thrown at the line marked {{// (1)}}: {{noformat}} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at
[jira] [Created] (FLINK-6146) Incorrect function name given in exception thrown by DataSet.getType()
Luke Hutchison created FLINK-6146: - Summary: Incorrect function name given in exception thrown by DataSet.getType() Key: FLINK-6146 URL: https://issues.apache.org/jira/browse/FLINK-6146 Project: Flink Issue Type: Bug Components: DataSet API Affects Versions: 1.2.0 Reporter: Luke Hutchison In the following code, this exception is thrown at the line marked {{// (1)}}: {{noformat}} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28) at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164) at org.apache.flink.api.java.DataSet.map(DataSet.java:215) at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21) ... 1 more Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null. at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234) at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131) ... 6 more {{noformat}} {{code}} import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.util.Collector; public class MainTest { public static DataSet> convertToFractionalRank(DataSet > key_score) { // Sum within each key // Result: ("", key, totScore) DataSet > blank_key_totScore = key_score .groupBy(0).sum(1) // Prepend with "" to prep for for join .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1)); // Count unique keys. Result: ("", numKeys) DataSet > blank_numKeys = blank_key_totScore .distinct(0) // (1) .map(t -> new Tuple2 ("", 1)) .groupBy(0).sum(1); // Sort scores into order, then return the fractional rank in the range [0, 1] return blank_key_totScore .coGroup(blank_numKeys) .where(0).equalTo(0) .with((Iterable > ai, Iterable > bi, Collector > out) -> { int numKeys = bi.iterator().next().f1; for (Tuple3 a : ai) { out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys)); } }) // Group by "" (i.e. make into one group, so all the scores can be sorted together) .groupBy(0) // Sort in descending order of score (the highest score gets the lowest rank, and vice versa) .sortGroup(2, Order.DESCENDING) // Convert sorted rank from [0, numKeys-1] -> [0, 1] .reduceGroup( (Iterable > iter, Collector > out) -> { int rank = 0; for (Tuple4 t : iter) { int
[jira] [Comment Edited] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934109#comment-15934109 ] Luke Hutchison edited comment on FLINK-6114 at 3/21/17 5:18 AM: Hi Greg, the code that caused the exception was doing the following, if memory serves correctly: {code} public static DataSet> sumReduceLastValue( DataSet > input) { return input.groupBy(0).sum(2); } {code} although I realize now that this leaves field 1 in an indeterminate state: if not all fields of a tuple are covered in the {{groupBy}} and {{sum}} parameters, and if different input tuples have different values for the unspecified fields, then aggregation has to either pick one of the field 1 values, or pick none of them (e.g. set field 1 to null). I assume that this is what caused the exception, because the typechecker was not expecting any fields to be dropped like this -- but if so, the correct exception message would be something along the lines of {{"sum aggregation on field 2 after grouping by field 0 leaves field 1 undefined"}} (or similar). was (Author: lukehutch): Hi Greg, the code that caused the exception was doing the following, if memory serves correctly: {code} public static DataSet > doSomething( DataSet > input) { return input.groupBy(0).sum(2); } {code} although I realize now that this leaves field 1 in an indeterminate state: if not all fields of a tuple are covered in the {{groupBy}} and {{sum}} parameters, and if different input tuples have different values for the unspecified fields, then aggregation has to either pick one of the field 1 values, or pick none of them (e.g. set field 1 to null). I assume that this is what caused the exception, because the typechecker was not expecting any fields to be dropped like this -- but if so, the correct exception message would be something along the lines of {{"sum aggregation on field 2 after grouping by field 0 leaves field 1 undefined"}} (or similar). > Type checking fails with generics, even when concrete type of field is not > needed > - > > Key: FLINK-6114 > URL: https://issues.apache.org/jira/browse/FLINK-6114 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The Flink type checker does not allow generic types to be used in any field > of a tuple when a join is being executed, even if the generic is not in a > field that is involved in the join. > I have a type Tuple3 , which contains a generic type > parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is > well-defined as String. However, this gives me the following error: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet > mypkg.MyClass.method(params)' could not be determined. This is most likely a > type erasure problem. The type extraction currently supports types with > generic variables only in cases where all variables in the return type can be > deduced from the input type(s). > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) > {noformat} > The code compiles fine, however -- the static type system is able to > correctly resolve the types in the surrounding code. > Really only the fields that are affected by joins (or groupBy, aggregation > etc.) should be checked for concrete types in this way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934109#comment-15934109 ] Luke Hutchison edited comment on FLINK-6114 at 3/21/17 5:17 AM: Hi Greg, the code that caused the exception was doing the following, if memory serves correctly: {code} public static DataSet> doSomething( DataSet > input) { return input.groupBy(0).sum(2); } {code} although I realize now that this leaves field 1 in an indeterminate state: if not all fields of a tuple are covered in the {{groupBy}} and {{sum}} parameters, and if different input tuples have different values for the unspecified fields, then aggregation has to either pick one of the field 1 values, or pick none of them (e.g. set field 1 to null). I assume that this is what caused the exception, because the typechecker was not expecting any fields to be dropped like this -- but if so, the correct exception message would be something along the lines of {{"sum aggregation on field 2 after grouping by field 0 leaves field 1 undefined"}} (or similar). was (Author: lukehutch): Hi Greg, the code that caused the exception was doing the following, if memory serves correctly: {{code}} public static DataSet > doSomething( DataSet > input) { return input.groupBy(0).sum(2); } {{code}} although I realize now that this leaves field 1 in an indeterminate state: if not all fields of a tuple are covered in the {{groupBy}} and {{sum}} parameters, and if different input tuples have different values for the unspecified fields, then aggregation has to either pick one of the field 1 values, or pick none of them (e.g. set field 1 to null). I assume that this is what caused the exception, because the typechecker was not expecting any fields to be dropped like this -- but if so, the correct exception message would be something along the lines of "sum aggregation on field 2 after grouping by field 0 leaves field 1 undefined" (or similar). > Type checking fails with generics, even when concrete type of field is not > needed > - > > Key: FLINK-6114 > URL: https://issues.apache.org/jira/browse/FLINK-6114 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The Flink type checker does not allow generic types to be used in any field > of a tuple when a join is being executed, even if the generic is not in a > field that is involved in the join. > I have a type Tuple3 , which contains a generic type > parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is > well-defined as String. However, this gives me the following error: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet > mypkg.MyClass.method(params)' could not be determined. This is most likely a > type erasure problem. The type extraction currently supports types with > generic variables only in cases where all variables in the return type can be > deduced from the input type(s). > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) > {noformat} > The code compiles fine, however -- the static type system is able to > correctly resolve the types in the surrounding code. > Really only the fields that are affected by joins (or groupBy, aggregation > etc.) should be checked for concrete types in this way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
[ https://issues.apache.org/jira/browse/FLINK-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934109#comment-15934109 ] Luke Hutchison commented on FLINK-6114: --- Hi Greg, the code that caused the exception was doing the following, if memory serves correctly: {{code}} public static DataSet> doSomething( DataSet > input) { return input.groupBy(0).sum(2); } {{code}} although I realize now that this leaves field 1 in an indeterminate state: if not all fields of a tuple are covered in the {{groupBy}} and {{sum}} parameters, and if different input tuples have different values for the unspecified fields, then aggregation has to either pick one of the field 1 values, or pick none of them (e.g. set field 1 to null). I assume that this is what caused the exception, because the typechecker was not expecting any fields to be dropped like this -- but if so, the correct exception message would be something along the lines of "sum aggregation on field 2 after grouping by field 0 leaves field 1 undefined" (or similar). > Type checking fails with generics, even when concrete type of field is not > needed > - > > Key: FLINK-6114 > URL: https://issues.apache.org/jira/browse/FLINK-6114 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The Flink type checker does not allow generic types to be used in any field > of a tuple when a join is being executed, even if the generic is not in a > field that is involved in the join. > I have a type Tuple3 , which contains a generic type > parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is > well-defined as String. However, this gives me the following error: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet > mypkg.MyClass.method(params)' could not be determined. This is most likely a > type erasure problem. The type extraction currently supports types with > generic variables only in cases where all variables in the return type can be > deduced from the input type(s). > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) > {noformat} > The code compiles fine, however -- the static type system is able to > correctly resolve the types in the surrounding code. > Really only the fields that are affected by joins (or groupBy, aggregation > etc.) should be checked for concrete types in this way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934071#comment-15934071 ] Luke Hutchison edited comment on FLINK-6115 at 3/21/17 4:15 AM: It's quite easy to end up with null values in tuples though -- it's currently entirely valid to store null values in tuples, they just can't be serialized -- and you can't always predict when tuples will be serialized -- e.g. (I suspect) code may work fine for a while in RAM on a single machine, but then when you scale your code up to run on a cluster, or even when code decides to spill to disk, suddenly it breaks. This is very poor behavior. Even worse though is that it's exceedingly hard to tell where the problem is caused, as shown in the stack trace I posted. Not only is the location where the null value is set separated from the location where the problem is triggered on serialization, but the serialization trace doesn't tell you anything about where in the program the serializer was running, other than what operation type it was contained within. Another common scenario in which null values get set in tuples is doing an outer join. Basically if the Flink policy is "we won't support nulls in tuples as valid, ever", then you should not be able to produce a tuple as a result of an outer join. More generally, you should simply throw an exception when the constructor of a tuple is called with a null parameter, so that the user is notified immediately of the invalid behavior, with the exception tied directly to where the null value setting happened. This would not be a perfect fix though, since the fields of a tuple are not final, so it is possible to simply set the field values to null directly. I don't see any of these as good solutions to this issue. Really the best thing to do is find a way to efficiently serialize null values in tuples. Why exactly is it slower to support serializing null values in tuples than it is for a POJO or a {{Row}} object? In theory, if you simply ran tuples through the POJO serializer, it should be able to serialize them fine, with the same efficiency that it can serialize regular POJOs (which are allowed to contain null values) -- so I don't see how or why this would incur a performance pentalty. was (Author: lukehutch): It's quite easy to end up with null values in tuples though -- it's currently entirely valid to store values in tuples, they just can't be serialized -- and you can't always predict when tuples will be serialized -- e.g. (I suspect) code may work fine for a while in RAM on a single machine, but then when you scale your code up to run on a cluster, or even when code decides to spill to disk, suddenly it breaks. This is very poor behavior. Even worse though is that it's exceedingly hard to tell where the problem is caused, as shown in the stack trace I posted. Not only is the location where the null value is set separated from the location where the problem is triggered on serialization, but the serialization trace doesn't tell you anything about where in the program the serializer was running, other than what operation type it was contained within. Another common scenario in which null values get set in tuples is doing an outer join. Basically if the Flink policy is "we won't support nulls in tuples as valid, ever", then you should not be able to produce a tuple as a result of an outer join. More generally, you should simply throw an exception when the constructor of a tuple is called with a null parameter, so that the user is notified immediately of the invalid behavior, with the exception tied directly to where the null value setting happened. This would not be a perfect fix though, since the fields of a tuple are not final, so it is possible to simply set the field values to null directly. I don't see any of these as good solutions to this issue. Really the best thing to do is find a way to efficiently serialize null values in tuples. Why exactly is it slower to support serializing null values in tuples than it is for a POJO or a {{Row}} object? In theory, if you simply ran tuples through the POJO serializer, it should be able to serialize them fine, with the same efficiency that it can serialize regular POJOs (which are allowed to contain null values) -- so I don't see how or why this would incur a performance pentalty. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934071#comment-15934071 ] Luke Hutchison commented on FLINK-6115: --- It's quite easy to end up with null values in tuples though -- it's currently entirely valid to store values in tuples, they just can't be serialized -- and you can't always predict when tuples will be serialized -- e.g. (I suspect) code may work fine for a while in RAM on a single machine, but then when you scale your code up to run on a cluster, or even when code decides to spill to disk, suddenly it breaks. This is very poor behavior. Even worse though is that it's exceedingly hard to tell where the problem is caused, as shown in the stack trace I posted. Not only is the location where the null value is set separated from the location where the problem is triggered on serialization, but the serialization trace doesn't tell you anything about where in the program the serializer was running, other than what operation type it was contained within. Another common scenario in which null values get set in tuples is doing an outer join. Basically if the Flink policy is "we won't support nulls in tuples as valid, ever", then you should not be able to produce a tuple as a result of an outer join. More generally, you should simply throw an exception when the constructor of a tuple is called with a null parameter, so that the user is notified immediately of the invalid behavior, with the exception tied directly to where the null value setting happened. This would not be a perfect fix though, since the fields of a tuple are not final, so it is possible to simply set the field values to null directly. I don't see any of these as good solutions to this issue. Really the best thing to do is find a way to efficiently serialize null values in tuples. Why exactly is it slower to support serializing null values in tuples than it is for a POJO or a {{Row}} object? In theory, if you simply ran tuples through the POJO serializer, it should be able to serialize them fine, with the same efficiency that it can serialize regular POJOs (which are allowed to contain null values) -- so I don't see how or why this would incur a performance pentalty. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932558#comment-15932558 ] Luke Hutchison commented on FLINK-6115: --- Hi Fabian, is there a fundamental reason why the Tuple serializers do not support null? If it is because of type inference, the compiler will give a warning or error if the type cannot be inferred from a null value. The Tuple class has a {{getFieldNotNull(int pos)}} method, with the description "Gets the field at the specified position, throws NullFieldException if the field is null" -- so it sounds like Tuples were actually designed to support storing nulls, just not serializing them? (But serializing them can happen implicitly as Flink passes things around...) > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932059#comment-15932059 ] Luke Hutchison commented on FLINK-6115: --- N.B. the tuple field in question was of type String[], so maybe it's only array types that cannot be null? I'm pretty sure I have successfully used null values for String and Integer tuple fields before. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931600#comment-15931600 ] Luke Hutchison commented on FLINK-6115: --- More generally, why is it not possible to serialize tuples with null fields? It's possible to serialize POJOs with null fields, right? > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
Luke Hutchison created FLINK-6115: - Summary: Need more helpful error message when trying to serialize a tuple with a null field Key: FLINK-6115 URL: https://issues.apache.org/jira/browse/FLINK-6115 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison When Flink tries to serialize a tuple with a null field, you get the following, which has no information about where in the program the problem occurred (all the stack trace lines are in Flink, not in user code). {noformat} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: The record must not be null. at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) {noformat} The only thing I can tell from this is that it happened somewhere in a flatMap (but I have dozens of them in my code). Surely there's a way to pull out the source file name and line number from the program DAG node when errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6114) Type checking fails with generics, even when concrete type of field is not needed
Luke Hutchison created FLINK-6114: - Summary: Type checking fails with generics, even when concrete type of field is not needed Key: FLINK-6114 URL: https://issues.apache.org/jira/browse/FLINK-6114 Project: Flink Issue Type: Bug Affects Versions: 1.2.0 Reporter: Luke Hutchison The Flink type checker does not allow generic types to be used in any field of a tuple when a join is being executed, even if the generic is not in a field that is involved in the join. I have a type Tuple3, which contains a generic type parameter K. I am joining using .where(0).equalTo(0). The type of field 0 is well-defined as String. However, this gives me the following error: {noformat} Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'public static org.apache.flink.api.java.DataSet mypkg.MyClass.method(params)' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:989) {noformat} The code compiles fine, however -- the static type system is able to correctly resolve the types in the surrounding code. Really only the fields that are affected by joins (or groupBy, aggregation etc.) should be checked for concrete types in this way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6110) Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency
[ https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931214#comment-15931214 ] Luke Hutchison commented on FLINK-6110: --- Chesnay: Sorry, I couldn't find that dup, and yes, thanks, having a single mapPartition is more or less what I have ended up doing in these two cases. A more complex situation arises where a join is a lot more complex to write than something like Map#get(), but a dataset won't fit in RAM as a Map. I would like to be able to look up values in one DataSet from within another DataSet's mapper function. Once FLINK-2250 is fixed/implemented, it should be possible to do something like {code} dataSet1.flatMap((xIter, out) -> { for (T x : xIter) { for (T xy : dataSet2.filter(y -> y.id == x.id).iterator()) { out.collect(new Tuple2<>(x.val, y.val)); } } {code} except that performing a Map#get() using a filter is horribly inefficient. Flink's where() syntax exists only on joins. I assume that groupBy() builds an index, so maybe groupBy() could get a query-like where() syntax too, for doing fast indexed lookups within group keys? > Flink unnecessarily repeats shared work triggered by different blocking > sinks, leading to massive inefficiency > -- > > Key: FLINK-6110 > URL: https://issues.apache.org/jira/browse/FLINK-6110 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > After a blocking sink (collect() or count()) is called, all already-computed > intermediate DataSets are thrown away, and any subsequent code that tries to > make use of an already-computed DataSet will require the DataSet to be > computed from scratch. For example, the following code prints the elements a, > b and c twice in succession, even though the DataSet ds should only have to > be computed once: > {code} > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet ds = env.fromElements("a", "b", "c").map(s -> { > System.out.println(s); return s + s;}); > List c1 = ds.map(s -> s).collect(); > List c2 = ds.map(s -> s).collect(); > env.execute(); > {code} > This is problematic because not every operation is easy to express in Flink > using joins and filters -- sometimes for smaller datasets (such as marginal > counts) it's easier to collect the values into a HashMap, and then pass that > HashMap into subsequent operations so they can look up the values they need > directly. A more complex example is the need to sort a set of values, then > use the sorted array for subsequent binary search operations to identify rank > -- this is only really possible using an array of sorted values, as long as > that array fits easily in RAM (there's no way to do binary search as a join > type) -- so you have to drop out of the Flink pipeline using collect() to > produce the sorted binary search lookup array. > However, any collect() or count() operation causes immediate execution of the > Flink pipeline, which throws away *all* intermediate values that could be > reused for future executions. As a result, code can be extremely inefficient, > recomputing the same values over and over again unnecessarily. > I believe that intermediate values should not be released or garbage > collected until after env.execute() is called. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6110) Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency
[ https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6110: -- Description: After a blocking sink (collect() or count()) is called, all already-computed intermediate DataSets are thrown away, and any subsequent code that tries to make use of an already-computed DataSet will require the DataSet to be computed from scratch. For example, the following code prints the elements a, b and c twice in succession, even though the DataSet ds should only have to be computed once: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = env.fromElements("a", "b", "c").map(s -> { System.out.println(s); return s + s;}); List c1 = ds.map(s -> s).collect(); List c2 = ds.map(s -> s).collect(); env.execute(); {code} This is problematic because not every operation is easy to express in Flink using joins and filters -- sometimes for smaller datasets (such as marginal counts) it's easier to collect the values into a HashMap, and then pass that HashMap into subsequent operations so they can look up the values they need directly. A more complex example is the need to sort a set of values, then use the sorted array for subsequent binary search operations to identify rank -- this is only really possible using an array of sorted values, as long as that array fits easily in RAM (there's no way to do binary search as a join type) -- so you have to drop out of the Flink pipeline using collect() to produce the sorted binary search lookup array. However, any collect() or count() operation causes immediate execution of the Flink pipeline, which throws away *all* intermediate values that could be reused for future executions. As a result, code can be extremely inefficient, recomputing the same values over and over again unnecessarily. I believe that intermediate values should not be released or garbage collected until after env.execute() is called. was: After a blocking sink (collect() or count()) is called, all already-computed intermediate DataSets are thrown away, and any subsequent code that tries to make use of an already-computed DataSet will require the DataSet to be computed from scratch. For example, the following code prints the elements a, b and c twice in succession, even though the DataSet ds should only have to be computed once: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = env.fromElements("a", "b", "c").map(s -> { System.out.println(s); return s + s;}); List c1 = ds.map(s -> s).collect(); List c2 = ds.map(s -> s).collect(); {code} This is problematic because not every operation is easy to express in Flink using joins and filters -- sometimes for smaller datasets (such as marginal counts) it's easier to collect the values into a HashMap, and then pass that HashMap into subsequent operations so they can look up the values they need directly. A more complex example is the need to sort a set of values, then use the sorted array for subsequent binary search operations to identify rank -- this is only really possible using an array of sorted values, as long as that array fits easily in RAM (there's no way to do binary search as a join type) -- so you have to drop out of the Flink pipeline using collect() to produce the sorted binary search lookup array. However, any collect() or count() operation causes immediate execution of the Flink pipeline, which throws away *all* intermediate values that could be reused for future executions. As a result, code can be extremely inefficient, recomputing the same values over and over again unnecessarily. I believe that intermediate values should not be released or garbage collected until after env.execute() is called. > Flink unnecessarily repeats shared work triggered by different blocking > sinks, leading to massive inefficiency > -- > > Key: FLINK-6110 > URL: https://issues.apache.org/jira/browse/FLINK-6110 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > After a blocking sink (collect() or count()) is called, all already-computed > intermediate DataSets are thrown away, and any subsequent code that tries to > make use of an already-computed DataSet will require the DataSet to be > computed from scratch. For example, the following code prints the elements a, > b and c twice in succession, even though the DataSet ds should only have to > be computed once: > {code} > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet ds = env.fromElements("a",
[jira] [Updated] (FLINK-6110) Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency
[ https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6110: -- Description: After a blocking sink (collect() or count()) is called, all already-computed intermediate DataSets are thrown away, and any subsequent code that tries to make use of an already-computed DataSet will require the DataSet to be computed from scratch. For example, the following code prints the elements a, b and c twice in succession, even though the DataSet ds should only have to be computed once: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = env.fromElements("a", "b", "c").map(s -> { System.out.println(s); return s + s;}); List c1 = ds.map(s -> s).collect(); List c2 = ds.map(s -> s).collect(); {code} This is problematic because not every operation is easy to express in Flink using joins and filters -- sometimes for smaller datasets (such as marginal counts) it's easier to collect the values into a HashMap, and then pass that HashMap into subsequent operations so they can look up the values they need directly. A more complex example is the need to sort a set of values, then use the sorted array for subsequent binary search operations to identify rank -- this is only really possible using an array of sorted values, as long as that array fits easily in RAM (there's no way to do binary search as a join type) -- so you have to drop out of the Flink pipeline using collect() to produce the sorted binary search lookup array. However, any collect() or count() operation causes immediate execution of the Flink pipeline, which throws away *all* intermediate values that could be reused for future executions. As a result, code can be extremely inefficient, recomputing the same values over and over again unnecessarily. I believe that intermediate values should not be released or garbage collected until after env.execute() is called. was: After a blocking sink (collect() or count()) is called, all already-computed intermediate DataSets are thrown away, and any subsequent code that tries to make use of an already-computed DataSet will require the DataSet to be computed from scratch. For example, the following code prints the elements a, b and c twice in succession, even though the DataSet ds should only have to be computed once: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = env.fromElements("a", "b", "c").map(s -> { System.out.println(s); return s + s;}); List c1 = ds.map(s -> s).collect(); List c2 = ds.map(s -> s).collect(); {code} This is problematic because not every operation is easy to express in Flink using joins and filters -- sometimes for smaller datasets (such as marginal counts) it's easier to collect the values into a HashMap, and then pass that HashMap into subsequent operations so they can look up the values they need directly. A more complex example is the need to sort a set of values, then use the sorted array for subsequent binary search operations to identify rank -- this is a lot easier to do using an array of sorted values, as long as that array fits easily in RAM. However, any collect() or count() operation causes immediate execution of the Flink pipeline, which throws away *all* intermediate values that could be reused for future executions. As a result, code can be extremely inefficient, recomputing the same values over and over again unnecessarily. I believe that intermediate values should not be released or garbage collected until after env.execute() is called. > Flink unnecessarily repeats shared work triggered by different blocking > sinks, leading to massive inefficiency > -- > > Key: FLINK-6110 > URL: https://issues.apache.org/jira/browse/FLINK-6110 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > After a blocking sink (collect() or count()) is called, all already-computed > intermediate DataSets are thrown away, and any subsequent code that tries to > make use of an already-computed DataSet will require the DataSet to be > computed from scratch. For example, the following code prints the elements a, > b and c twice in succession, even though the DataSet ds should only have to > be computed once: > {code} > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet ds = env.fromElements("a", "b", "c").map(s -> { > System.out.println(s); return s + s;}); > List c1 = ds.map(s -> s).collect(); > List c2 = ds.map(s -> s).collect(); > {code} > This is problematic because
[jira] [Created] (FLINK-6110) Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency
Luke Hutchison created FLINK-6110: - Summary: Flink unnecessarily repeats shared work triggered by different blocking sinks, leading to massive inefficiency Key: FLINK-6110 URL: https://issues.apache.org/jira/browse/FLINK-6110 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison After a blocking sink (collect() or count()) is called, all already-computed intermediate DataSets are thrown away, and any subsequent code that tries to make use of an already-computed DataSet will require the DataSet to be computed from scratch. For example, the following code prints the elements a, b and c twice in succession, even though the DataSet ds should only have to be computed once: {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet ds = env.fromElements("a", "b", "c").map(s -> { System.out.println(s); return s + s;}); List c1 = ds.map(s -> s).collect(); List c2 = ds.map(s -> s).collect(); {code} This is problematic because not every operation is easy to express in Flink using joins and filters -- sometimes for smaller datasets (such as marginal counts) it's easier to collect the values into a HashMap, and then pass that HashMap into subsequent operations so they can look up the values they need directly. A more complex example is the need to sort a set of values, then use the sorted array for subsequent binary search operations to identify rank -- this is a lot easier to do using an array of sorted values, as long as that array fits easily in RAM. However, any collect() or count() operation causes immediate execution of the Flink pipeline, which throws away *all* intermediate values that could be reused for future executions. As a result, code can be extremely inefficient, recomputing the same values over and over again unnecessarily. I believe that intermediate values should not be released or garbage collected until after env.execute() is called. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6108) Add sum(fieldName) to sum based on named POJO fields
Luke Hutchison created FLINK-6108: - Summary: Add sum(fieldName) to sum based on named POJO fields Key: FLINK-6108 URL: https://issues.apache.org/jira/browse/FLINK-6108 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Currently it is possible to do {code} dataSet.groupBy(0).sum(1) {code} but not {code} dataSet.groupBy("id").sum("count") {code} -- groupBy() takes named fields, but sum does not, it only works with tuples. It would be great if sum() could take named fields too -- otherwise you have to map to tuples to make use of easy summing. (This would probably only work reliably with POJOs, since a new object instance would need to be constructed to take the field containing the sum.) Similarly, min, max etc. should also take named fields. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6108) Add sum(fieldName) to sum based on named POJO fields
[ https://issues.apache.org/jira/browse/FLINK-6108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6108: -- Description: Currently it is possible to do {code} dataSet.groupBy(0).sum(1) {code} but not {code} dataSet.groupBy("id").sum("count") {code} groupBy() takes named fields, but sum() does not, it only works with tuples. It would be great if sum() could take named fields too -- otherwise you have to map to tuples to make use of easy summing. (This would probably only work reliably with POJOs, since a new object instance would need to be constructed to take the field containing the sum.) Similarly, min(), max() etc. should also take named fields. was: Currently it is possible to do {code} dataSet.groupBy(0).sum(1) {code} but not {code} dataSet.groupBy("id").sum("count") {code} -- groupBy() takes named fields, but sum does not, it only works with tuples. It would be great if sum() could take named fields too -- otherwise you have to map to tuples to make use of easy summing. (This would probably only work reliably with POJOs, since a new object instance would need to be constructed to take the field containing the sum.) Similarly, min, max etc. should also take named fields. > Add sum(fieldName) to sum based on named POJO fields > > > Key: FLINK-6108 > URL: https://issues.apache.org/jira/browse/FLINK-6108 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Currently it is possible to do > {code} > dataSet.groupBy(0).sum(1) > {code} > but not > {code} > dataSet.groupBy("id").sum("count") > {code} > groupBy() takes named fields, but sum() does not, it only works with tuples. > It would be great if sum() could take named fields too -- otherwise you have > to map to tuples to make use of easy summing. > (This would probably only work reliably with POJOs, since a new object > instance would need to be constructed to take the field containing the sum.) > Similarly, min(), max() etc. should also take named fields. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6070) Suggestion: add ComparableTuple types
[ https://issues.apache.org/jira/browse/FLINK-6070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6070: -- Description: Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator. I created a tuple sorting class, as follows. (Only the methods for Tuple2 are defined at the bottom, similar methods could be added for other tuple types.) I wanted to get feedback on whether something like this would be considered useful for Flink before I submitted a PR. {code} import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; /** A class for sorting collections of tuples. */ public class TupleSorter { /** Produce a Tuple comparator for the given number of fields, with the requested field priority and sort order. */ private static Comparator newComparator(final int tupleLen, final int[] fieldPriority, final int[] sortDescendingIndices) { if (fieldPriority == null || fieldPriority.length != tupleLen) { throw new IllegalArgumentException("Invalid sort order"); } boolean[] idxUsed = new boolean[tupleLen]; for (int i = 0; i < fieldPriority.length; i++) { int idx = fieldPriority[i]; if (idx < 0 || idx >= tupleLen) { throw new IllegalArgumentException("fieldPriority entry out of range: " + idx); } if (idxUsed[idx]) { throw new IllegalArgumentException("fieldPriority entry duplicated: " + idx); } idxUsed[idx] = true; } boolean[] sortDescending = new boolean[tupleLen]; for (int i = 0; i < sortDescendingIndices.length; i++) { int idx = sortDescendingIndices[i]; if (idx < 0 || idx >= tupleLen) { throw new IllegalArgumentException("sortDescendingIndices entry out of range: " + idx); } sortDescending[idx] = true; } return (tuple0, tuple1) -> { for (int i = 0; i < tupleLen; i++) { int idx = fieldPriority[i]; @SuppressWarnings({ "rawtypes", "unchecked" }) int diff = ((Comparable) tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx)); if (sortDescending[i]) { diff = -diff; } if (diff != 0) { return diff; } } return 0; }; } /** * Sort a list of tuples. * * @param list *The list of tuples. * @param fieldPriority *The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0 *and 1. The default sort order within a field is ascending. * @param sortDescendingIndices *If provided, inverts the sort order for a given field index from ascending to descending order. */ public static , T1 extends Comparable> void sort(final List> list, final int[] fieldPriority, final int... sortDescendingIndices) { list.sort(newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices)); } /** * Produce a sorted copy of a collection of tuples. * * @param list *The list of tuples. * @param fieldPriority *The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0 *and 1. The default sort order within a field is ascending. * @param sortDescendingIndices *If provided, inverts the sort order for a given field index from ascending to descending order. */ public static , T1 extends Comparable> ArrayList > sortCopy( final Collection > collection, final int[] fieldPriority, final int... sortDescendingIndices) { ArrayList > list = new ArrayList<>(collection); Collections.sort(list, newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices)); return list; } } {code} was: Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator. I created a ComparableTuple2 type, as follows. I wanted to get feedback on whether something like this
[jira] [Created] (FLINK-6070) Suggestion: add ComparableTuple types
Luke Hutchison created FLINK-6070: - Summary: Suggestion: add ComparableTuple types Key: FLINK-6070 URL: https://issues.apache.org/jira/browse/FLINK-6070 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator. I created a ComparableTuple2 type, as follows. I wanted to get feedback on whether something like this would be considered useful for Flink before I submitted a PR. Also, I don't know how high I should go with the field arity for a ComparableTuple -- presumably not as high as for non-comparable tuples? {code} import org.apache.flink.api.java.tuple.Tuple2; /** A comparable tuple, consisting of comparable fields that act as primary and secondary sort keys. */ public class ComparableTuple2, T1 extends Comparable> extends Tuple2implements Comparable > { private static final long serialVersionUID = 1L; private boolean invertSortOrder0; private boolean invertSortOrder1; public ComparableTuple2() { } /** * Create a 2-tuple of comparable elements. * * @param f0 *The first element, which is also the primary sort key, and sorts in ascending order. * @param f1 *The second element, which is also the secondary sort key, and sorts in ascending order. * @param invertSortOrder0 *If true, invert the sort order for the first field (i.e. sort in descending order). * @param invertSortOrder1 *If true, invert the sort order for the second field (i.e. sort in descending order). */ public ComparableTuple2(T0 f0, T1 f1) { super(f0, f1); } /** * Create a comparable 2-tuple out of comparable elements. * * @param f0 *The first element, which is also the primary sort key, and sorts in ascending order if *invertSortOrder0 == false, else sorts in descending order. * @param f1 *The second element, which is also the secondary sort key, and sorts in decending order if *invertSortOrder1 == false, else sorts in descending order. * @param invertSortOrder0 *If true, invert the sort order for the first field (i.e. sort in descending order). * @param invertSortOrder1 *If true, invert the sort order for the second field (i.e. sort in descending order). */ public ComparableTuple2(final T0 f0, final T1 f1, final boolean invertSortOrder0, final boolean invertSortOrder1) { super(f0, f1); this.invertSortOrder0 = invertSortOrder0; this.invertSortOrder1 = invertSortOrder1; } /** * Comparison function that compares first the primary sort key, f0, and then if equal, compares the secondary sort * key, f1. */ @Override public int compareTo(final Tuple2 o) { int diff = this.f0.compareTo(o.f0); if (invertSortOrder0) { diff = -diff; } if (diff != 0) { return diff; } diff = this.f1.compareTo(o.f1); if (invertSortOrder1) { diff = -diff; } return diff; } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison closed FLINK-6026. - Resolution: Not A Bug > Return type of flatMap with lambda function not correctly resolved > -- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API, DataStream API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927065#comment-15927065 ] Luke Hutchison commented on FLINK-6026: --- Makes sense. I was wondering if there was some sort of type signature tweak that could be performed to make this work. I guess not -- thanks anyway! > Return type of flatMap with lambda function not correctly resolved > -- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API, DataStream API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6057) Better default needed for num network buffers
[ https://issues.apache.org/jira/browse/FLINK-6057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison closed FLINK-6057. - Resolution: Duplicate > Better default needed for num network buffers > - > > Key: FLINK-6057 > URL: https://issues.apache.org/jira/browse/FLINK-6057 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Using the default environment, > {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > {code} > my code will sometimes fail with an error that Flink ran out of network > buffers. To fix this, I have to do: > {code} > int numTasks = Runtime.getRuntime().availableProcessors(); > config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, numTasks); > config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTasks); > config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, > numTasks * 2048); > {code} > The default value of 2048 fails when I increase the degree of parallelism for > a large Flink pipeline (hence the fix to set the number of buffers to > numTasks * 2048). > This is particularly problematic because a pipeline can work fine on one > machine, and when you start the pipeline on a machine with more cores, it can > fail. > The default execution environment should pick a saner default based on the > level of parallelism (or whatever is needed to ensure that the number of > network buffers is not going to be exceeded for a given execution > environment). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6057) Better default needed for num network buffers
[ https://issues.apache.org/jira/browse/FLINK-6057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926090#comment-15926090 ] Luke Hutchison commented on FLINK-6057: --- Yes, thanks, sorry for the dup. > Better default needed for num network buffers > - > > Key: FLINK-6057 > URL: https://issues.apache.org/jira/browse/FLINK-6057 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > Using the default environment, > {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > {code} > my code will sometimes fail with an error that Flink ran out of network > buffers. To fix this, I have to do: > {code} > int numTasks = Runtime.getRuntime().availableProcessors(); > config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, numTasks); > config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTasks); > config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, > numTasks * 2048); > {code} > The default value of 2048 fails when I increase the degree of parallelism for > a large Flink pipeline (hence the fix to set the number of buffers to > numTasks * 2048). > This is particularly problematic because a pipeline can work fine on one > machine, and when you start the pipeline on a machine with more cores, it can > fail. > The default execution environment should pick a saner default based on the > level of parallelism (or whatever is needed to ensure that the number of > network buffers is not going to be exceeded for a given execution > environment). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6057) Better default needed for num network buffers
Luke Hutchison created FLINK-6057: - Summary: Better default needed for num network buffers Key: FLINK-6057 URL: https://issues.apache.org/jira/browse/FLINK-6057 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Using the default environment, {code} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); {code} my code will sometimes fail with an error that Flink ran out of network buffers. To fix this, I have to do: {code} int numTasks = Runtime.getRuntime().availableProcessors(); config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, numTasks); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTasks); config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, numTasks * 2048); {code} The default value of 2048 fails when I increase the degree of parallelism for a large Flink pipeline (hence the fix to set the number of buffers to numTasks * 2048). This is particularly problematic because a pipeline can work fine on one machine, and when you start the pipeline on a machine with more cores, it can fail. The default execution environment should pick a saner default based on the level of parallelism (or whatever is needed to ensure that the number of network buffers is not going to be exceeded for a given execution environment). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6035) Sinks need optional name parameter: .collect(name), .count(name)
[ https://issues.apache.org/jira/browse/FLINK-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923849#comment-15923849 ] Luke Hutchison edited comment on FLINK-6035 at 3/14/17 9:19 AM: The name parameter shouldn't be required, just as calling .name() on an operator is not required. Therefore it makes perfect sense to add a new method that sets the name and then calls the regular namless sink method (i.e. this doesn't need to break the API). I'll change the name of this bug -- thanks! was (Author: lukehutch): The name parameter shouldn't be required, just as calling .name() on an operator is not required. Therefore it makes perfect sense to add a new method that sets the name and then calls the regular namless sink method (i.e. this doesn't need to wait for Flink 2.0.0). I'll change the name of this bug -- thanks! > Sinks need optional name parameter: .collect(name), .count(name) > > > Key: FLINK-6035 > URL: https://issues.apache.org/jira/browse/FLINK-6035 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Trivial > > If I call > {code} > dataSet.groupBy(0).name("group by key") > {code} > I get an error message like > {noformat} > The method name(String) is undefined for the type > UnsortedGrouping> > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6035) Sinks need optional name parameter: .collect(name), .count(name)
[ https://issues.apache.org/jira/browse/FLINK-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6035: -- Summary: Sinks need optional name parameter: .collect(name), .count(name) (was: The method name(String) is undefined for the type UnsortedGrouping) > Sinks need optional name parameter: .collect(name), .count(name) > > > Key: FLINK-6035 > URL: https://issues.apache.org/jira/browse/FLINK-6035 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Trivial > > If I call > {code} > dataSet.groupBy(0).name("group by key") > {code} > I get an error message like > {noformat} > The method name(String) is undefined for the type > UnsortedGrouping> > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6035) The method name(String) is undefined for the type UnsortedGrouping
[ https://issues.apache.org/jira/browse/FLINK-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923849#comment-15923849 ] Luke Hutchison commented on FLINK-6035: --- The name parameter shouldn't be required, just as calling .name() on an operator is not required. Therefore it makes perfect sense to add a new method that sets the name and then calls the regular namless sink method (i.e. this doesn't need to wait for Flink 2.0.0). I'll change the name of this bug -- thanks! > The method name(String) is undefined for the type UnsortedGrouping > -- > > Key: FLINK-6035 > URL: https://issues.apache.org/jira/browse/FLINK-6035 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Trivial > > If I call > {code} > dataSet.groupBy(0).name("group by key") > {code} > I get an error message like > {noformat} > The method name(String) is undefined for the type > UnsortedGrouping> > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6035) The method name(String) is undefined for the type UnsortedGrouping
[ https://issues.apache.org/jira/browse/FLINK-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923789#comment-15923789 ] Luke Hutchison commented on FLINK-6035: --- I see, makes sense. I was going through my program and naming everything, and noticed that groupBy seemed to be one of the only things that couldn't be named. I didn't realize those were fused in the graph display of the pipeline. Actually, one other thing I noticed was that collect() can't be named too, due to the return type. It's probably not a time-consuming operation, since it creates a complete instantiation of a DataSet in RAM, but maybe support a {code}.collect(String name){code} method? (I could open a separate bug for that, or rename this one, if you agree -- otherwise, feel free to close this bug.) > The method name(String) is undefined for the type UnsortedGrouping > -- > > Key: FLINK-6035 > URL: https://issues.apache.org/jira/browse/FLINK-6035 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Trivial > > If I call > {code} > dataSet.groupBy(0).name("group by key") > {code} > I get an error message like > {noformat} > The method name(String) is undefined for the type > UnsortedGrouping> > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6035) The method name(String) is undefined for the type UnsortedGrouping
Luke Hutchison created FLINK-6035: - Summary: The method name(String) is undefined for the type UnsortedGrouping Key: FLINK-6035 URL: https://issues.apache.org/jira/browse/FLINK-6035 Project: Flink Issue Type: Bug Components: DataSet API Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Trivial If I call {code} dataSet.groupBy(0).name("group by key") {code} I get an error message like {noformat} The method name(String) is undefined for the type UnsortedGrouping> {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922732#comment-15922732 ] Luke Hutchison commented on FLINK-6026: --- Makes sense, I wondered if that may be the issue here. I guess the real question is why the type system is not able to pass information from the local variable definition across two chained calls, when it can unify the types between the single call and its nested lambda. The type info is still present in the final local variable type declaration, even in the chained case. I guess the way to fix this is to file a feature request for The Java and/or Eclipse compiler to increase their maximum type propagation depth? > Return type of flatMap with lambda function not correctly resolved > -- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API, DataStream API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6024) Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key"
[ https://issues.apache.org/jira/browse/FLINK-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922699#comment-15922699 ] Luke Hutchison commented on FLINK-6024: --- This was for the batch API (DataSet). More generally, why should the requirements be different here for batch and streaming APIs? I have come across other differences too between the APIs that do not seem to be related to limitations of one of the processing modes. Shouldn't more be shared between the two APIs? > Need more fine-grained info for "InvalidProgramException: This type (...) > cannot be used as key" > > > Key: FLINK-6024 > URL: https://issues.apache.org/jira/browse/FLINK-6024 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > I got this very confusing exception: > {noformat} > InvalidProgramException: This type (MyType) cannot be used as key > {noformat} > I dug through the code, and could not find what was causing this. The help > text for type.isKeyType(), in Keys.java:329, right before the exception is > thrown, says: "Checks whether this type can be used as a key. As a bare > minimum, types have to be hashable and comparable to be keys." However, this > didn't solve the problem. > I discovered that in my case, the error was occurring because I added a new > constructor to the type, and I didn't have a default constructor. This is > probably quite a common thing to happen for POJOs, so the error message > should give some detail saying that this is the problem. > Other things that can cause this to fail, including that the class is not > public, or the constructor is not public, or the key field is not public, or > that the key field is not a serializable type, or the key is not Comparable, > or the key is not hashable, should be given in the error message instead, > depending on the actual cause of the problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6029) Strange linebreaks in web dashboard graph view make it hard to read text
[ https://issues.apache.org/jira/browse/FLINK-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6029: -- Attachment: Screenshot_2017-03-13_01-40-52.png Screenshot > Strange linebreaks in web dashboard graph view make it hard to read text > > > Key: FLINK-6029 > URL: https://issues.apache.org/jira/browse/FLINK-6029 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > Attachments: Screenshot_2017-03-13_01-40-52.png > > > Text lines inside the boxes in the Flink web dashboard have linebreaks at > very odd places, and it can make the content of the boxes hard to read. (See > attached screenshot.) > For clarity the content could be tabulated and indented, e.g with each "->" > arrow at the start of a new text line along with the operation type ("-> > Filter"), and the description indented on the line below, with long > descriptions wrapped to the same start indentation, so that it's easy to > visually separate out operations from descriptions based on indentation: > {noformat} > Filter > (filter for problems with date <= 2017-01) > -> FlatMap > (calculate problem severity scores for > date 2017-01) > -> Combine > (sum all severity scores for (bin, apt) for > date 2017-01) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6029) Strange linebreaks in web dashboard graph view make it hard to read text
Luke Hutchison created FLINK-6029: - Summary: Strange linebreaks in web dashboard graph view make it hard to read text Key: FLINK-6029 URL: https://issues.apache.org/jira/browse/FLINK-6029 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor Text lines inside the boxes in the Flink web dashboard have linebreaks at very odd places, and it can make the content of the boxes hard to read. (See attached screenshot.) For clarity the content could be tabulated and indented, e.g with each "->" arrow at the start of a new text line along with the operation type ("-> Filter"), and the description indented on the line below, with long descriptions wrapped to the same start indentation, so that it's easy to visually separate out operations from descriptions based on indentation: {noformat} Filter (filter for problems with date <= 2017-01) -> FlatMap (calculate problem severity scores for date 2017-01) -> Combine (sum all severity scores for (bin, apt) for date 2017-01) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6029) Strange linebreaks in web dashboard graph view make it hard to read text
[ https://issues.apache.org/jira/browse/FLINK-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6029: -- Description: Text lines inside the boxes in the Flink web dashboard have linebreaks at very odd places, and it can make the content of the boxes hard to read. (See attached screenshot.) For clarity the content could be tabulated and indented, e.g with each "->" arrow at the start of a new text line along with the operation type ("-> Filter"), and the description indented on the line below, with long descriptions wrapped to the same start indentation, so that it's easy to visually separate out operations from descriptions based on indentation: {noformat} Filter (filter for problems with date <= 2017-01) -> FlatMap (calculate problem severity scores for date 2017-01) -> Combine (sum all severity scores for (bin, apt) for date 2017-01) {noformat} was: Text lines inside the boxes in the Flink web dashboard have linebreaks at very odd places, and it can make the content of the boxes hard to read. (See attached screenshot.) For clarity the content could be tabulated and indented, e.g with each "->" arrow at the start of a new text line along with the operation type ("-> Filter"), and the description indented on the line below, with long descriptions wrapped to the same start indentation, so that it's easy to visually separate out operations from descriptions based on indentation: {noformat} Filter (filter for problems with date <= 2017-01) -> FlatMap (calculate problem severity scores for date 2017-01) -> Combine (sum all severity scores for (bin, apt) for date 2017-01) > Strange linebreaks in web dashboard graph view make it hard to read text > > > Key: FLINK-6029 > URL: https://issues.apache.org/jira/browse/FLINK-6029 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > Text lines inside the boxes in the Flink web dashboard have linebreaks at > very odd places, and it can make the content of the boxes hard to read. (See > attached screenshot.) > For clarity the content could be tabulated and indented, e.g with each "->" > arrow at the start of a new text line along with the operation type ("-> > Filter"), and the description indented on the line below, with long > descriptions wrapped to the same start indentation, so that it's easy to > visually separate out operations from descriptions based on indentation: > {noformat} > Filter > (filter for problems with date <= 2017-01) > -> FlatMap > (calculate problem severity scores for > date 2017-01) > -> Combine > (sum all severity scores for (bin, apt) for > date 2017-01) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6028) Unnamed operations in main method logged as "main(null:-1)"
Luke Hutchison created FLINK-6028: - Summary: Unnamed operations in main method logged as "main(null:-1)" Key: FLINK-6028 URL: https://issues.apache.org/jira/browse/FLINK-6028 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Trivial If in main I have some code like this: {code} x.filter(t -> !t.f0.isEmpty()) .flatMap((t, out) -> out.collect(new Tuple3<>(t.f1, t.f2, t.f3))) .writeAsText(filename, WriteMode.OVERWRITE).setParallelism(1); {code} In the log, the origin of these unnamed operations shows up as "main(null:-1)": {noformat} CHAIN Filter (Filter at main(null:-1)) -> FlatMap (FlatMap at main(null:-1))(2/2) switched to SCHEDULED {noformat} However, operations inside lambdas seem to correctly provide the class name and line number in the logs, e.g. "Main.java:217". -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6026) [Type Inference] Cannot name flatMap operations
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907003#comment-15907003 ] Luke Hutchison commented on FLINK-6026: --- This does seem to be a problem limited to flatMap, as far as I have seen so far. The same problem (of needing to specify lambda types) occurs when chaining .collect() after a flatMap too. > [Type Inference] Cannot name flatMap operations > --- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6019: -- Description: Some of the log messages do not appear to have a loglevel value set, so they can't be suppressed by setting the log4j level to WARN. There's this line at the beginning which doesn't even have a timestamp: {noformat} Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] {noformat} And then there are numerous lines like this, missing an "INFO" field: {noformat} 03/10/2017 00:01:14 Job execution switched to status RUNNING. 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED 03/10/2017 00:01:17 Job execution switched to status FINISHED. {noformat} was: Some of the log messages do not appear to have a loglevel value set, so they can't be suppressed by setting the log4j level to WARN. There's this line at the beginning which doesn't even have a timestamp: Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] And then there are numerous lines like this, missing an "INFO" field: 03/10/2017 00:01:14 Job execution switched to status RUNNING. 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED 03/10/2017 00:01:17 Job execution switched to status FINISHED. > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > {noformat} > And then there are numerous lines like this, missing an "INFO" field: > {noformat} > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6024) Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key"
[ https://issues.apache.org/jira/browse/FLINK-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6024: -- Description: I got this very confusing exception: {noformat} InvalidProgramException: This type (MyType) cannot be used as key {noformat} I dug through the code, and could not find what was causing this. The help text for type.isKeyType(), in Keys.java:329, right before the exception is thrown, says: "Checks whether this type can be used as a key. As a bare minimum, types have to be hashable and comparable to be keys." However, this didn't solve the problem. I discovered that in my case, the error was occurring because I added a new constructor to the type, and I didn't have a default constructor. This is probably quite a common thing to happen for POJOs, so the error message should give some detail saying that this is the problem. Other things that can cause this to fail, including that the class is not public, or the constructor is not public, or the key field is not public, or that the key field is not a serializable type, or the key is not Comparable, or the key is not hashable, should be given in the error message instead, depending on the actual cause of the problem. was: I got this very confusing exception: InvalidProgramException: This type (MyType) cannot be used as key I dug through the code, and could not find what was causing this. The help text for type.isKeyType(), in Keys.java:329, right before the exception is thrown, says: "Checks whether this type can be used as a key. As a bare minimum, types have to be hashable and comparable to be keys." However, this didn't solve the problem. I discovered that in my case, the error was occurring because I added a new constructor to the type, and I didn't have a default constructor. This is probably quite a common thing to happen for POJOs, so the error message should give some detail saying that this is the problem. Other things that can cause this to fail, including that the class is not public, or the constructor is not public, or the key field is not public, or that the key field is not a serializable type, or the key is not Comparable, or the key is not hashable, should be given in the error message instead, depending on the actual cause of the problem. > Need more fine-grained info for "InvalidProgramException: This type (...) > cannot be used as key" > > > Key: FLINK-6024 > URL: https://issues.apache.org/jira/browse/FLINK-6024 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > I got this very confusing exception: > {noformat} > InvalidProgramException: This type (MyType) cannot be used as key > {noformat} > I dug through the code, and could not find what was causing this. The help > text for type.isKeyType(), in Keys.java:329, right before the exception is > thrown, says: "Checks whether this type can be used as a key. As a bare > minimum, types have to be hashable and comparable to be keys." However, this > didn't solve the problem. > I discovered that in my case, the error was occurring because I added a new > constructor to the type, and I didn't have a default constructor. This is > probably quite a common thing to happen for POJOs, so the error message > should give some detail saying that this is the problem. > Other things that can cause this to fail, including that the class is not > public, or the constructor is not public, or the key field is not public, or > that the key field is not a serializable type, or the key is not Comparable, > or the key is not hashable, should be given in the error message instead, > depending on the actual cause of the problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6016) Newlines should be valid in quoted strings in CSV
[ https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6016: -- Description: The RFC for the CSV format specifies that newlines are valid in quoted strings in CSV: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing a newline, such as: {noformat} "3 4",5 {noformat} you get this exception: {noformat} Line could not be parsed: '"3' ParserError UNTERMINATED_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String {noformat} was: The RFC for the CSV format specifies that newlines are valid in quoted strings in CSV: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing a newline, such as: "3 4",5 you get this exception: Line could not be parsed: '"3' ParserError UNTERMINATED_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String > Newlines should be valid in quoted strings in CSV > - > > Key: FLINK-6016 > URL: https://issues.apache.org/jira/browse/FLINK-6016 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > The RFC for the CSV format specifies that newlines are valid in quoted > strings in CSV: > https://tools.ietf.org/html/rfc4180 > However, when parsing a CSV file with Flink containing a newline, such as: > {noformat} > "3 > 4",5 > {noformat} > you get this exception: > {noformat} > Line could not be parsed: '"3' > ParserError UNTERMINATED_QUOTED_STRING > Expect field types: class java.lang.String, class java.lang.String > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6026) [Type Inference] Cannot name flatMap operations
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6026: -- Summary: [Type Inference] Cannot name flatMap operations (was: Cannot name flatMap operations) > [Type Inference] Cannot name flatMap operations > --- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6026) Cannot name flatMap operations
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6026: -- Component/s: Core > Cannot name flatMap operations > -- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6026) Cannot name flatMap operations
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6026: -- Description: I get an error if I try naming a flatMap operation: {code} DataSet> y = x.flatMap((t, out) -> out.collect(t)).name("op"); {code} Type mismatch: cannot convert from FlatMapOperator ,Object> to DataSet > If I try to do it as two steps, I get the error that DataSet does not have a .name(String) method: {code} DataSet > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); {code} If I use Eclipse type inference on x, it shows me that the output type is not correctly inferred: {code} FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This now works, but "Object" is not the output type {code} However, these steps still cannot be chained -- the following still gives an error: {code} FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)).name("op"); {code} i.e. first you have to assign the result to a field, so that the type is fully specified; then you can name the operation. And the weird thing is that you can give the correct, more specific type for the local variable, without a type narrowing error: {code} FlatMapOperator , Tuple2 > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This works, although chaining these two lines still does not work {code} If the types of the lambda args are specified, then everything works: {code} DataSet > y = x.flatMap((Tuple2 t, Collector > out) -> out.collect(t)).name("op"); {code} So, at least two things are going on here: (1) type inference is not working correctly for the lambda parameters (2) this breaks type inference for intermediate expressions, unless the type can be resolved using a local variable definition Is this a bug in the type signature of flatMap? (Or a compiler bug or limitation, or a fundamental limitation of Java 8 type inference?) It seems odd that the type of a local variable definition can make the result of the flatMap operator *more* specific, taking the type from {code} FlatMapOperator , Object> {code} to {code} FlatMapOperator , Tuple2 > {code} i.e. if the output type is provided in the local variable definition, it is properly unified with the type of the parameter t of collect(t), however that type is not propagated out of that call. Can anything be done about this in Flink? I have hit this problem a few times. was: I get an error if I try naming a flatMap operation: ``` DataSet > y = x.flatMap((t, out) -> out.collect(t)).name("op"); ``` Type mismatch: cannot convert from FlatMapOperator ,Object> to DataSet > If I try to do it as two steps, I get the error that DataSet does not have a .name(String) method: DataSet > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); If I use Eclipse type inference on x, it shows me that the output type is not correctly inferred: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This now works, but "Object" is not the output type However, these steps still cannot be chained -- the following still gives an error: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)).name("op"); i.e. first you have to assign the result to a field, so that the type is fully specified; then you can name the operation. And the weird thing is that you can give the correct, more specific type for the local variable, without a type narrowing error: FlatMapOperator , Tuple2 > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This works, although chaining these two lines still does not work If the types of the lambda args are specified, then everything works: DataSet > y = x.flatMap((Tuple2 t, Collector > out) -> out.collect(t)).name("op"); So, at least two things are going on here: (1) type inference is not working correctly for the lambda parameters (2) this breaks type inference for intermediate expressions, unless the type can be resolved using a local variable definition Is this a bug in the type signature of flatMap? (Or a compiler bug or limitation, or a fundamental limitation of Java 8 type inference?) It seems odd that the type of a local variable definition can make the result of the flatMap operator *more* specific, taking the type from FlatMapOperator
[jira] [Created] (FLINK-6026) Cannot name flatMap operations
Luke Hutchison created FLINK-6026: - Summary: Cannot name flatMap operations Key: FLINK-6026 URL: https://issues.apache.org/jira/browse/FLINK-6026 Project: Flink Issue Type: Bug Affects Versions: 1.2.0 Reporter: Luke Hutchison Priority: Minor I get an error if I try naming a flatMap operation: DataSet> y = x.flatMap((t, out) -> out.collect(t)).name("op"); Type mismatch: cannot convert from FlatMapOperator ,Object> to DataSet > If I try to do it as two steps, I get the error that DataSet does not have a .name(String) method: DataSet > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); If I use Eclipse type inference on x, it shows me that the output type is not correctly inferred: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This now works, but "Object" is not the output type However, these steps still cannot be chained -- the following still gives an error: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)).name("op"); i.e. first you have to assign the result to a field, so that the type is fully specified; then you can name the operation. And the weird thing is that you can give the correct, more specific type for the local variable, without a type narrowing error: FlatMapOperator , Tuple2 > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This works, although chaining these two lines still does not work If the types of the lambda args are specified, then everything works: DataSet > y = x.flatMap((Tuple2 t, Collector > out) -> out.collect(t)).name("op"); So, at least two things are going on here: (1) type inference is not working correctly for the lambda parameters (2) this breaks type inference for intermediate expressions, unless the type can be resolved using a local variable definition Is this a bug in the type signature of flatMap? (Or a compiler bug or limitation, or a fundamental limitation of Java 8 type inference?) It seems odd that the type of a local variable definition can make the result of the flatMap operator *more* specific, taking the type from FlatMapOperator , Object> to FlatMapOperator , Tuple2 > i.e. if the output type is provided in the local variable definition, it is properly unified with the type of the parameter t of collect(t), however that type is not propagated out of that call. Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6026) Cannot name flatMap operations
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison updated FLINK-6026: -- Description: I get an error if I try naming a flatMap operation: ``` DataSet> y = x.flatMap((t, out) -> out.collect(t)).name("op"); ``` Type mismatch: cannot convert from FlatMapOperator ,Object> to DataSet > If I try to do it as two steps, I get the error that DataSet does not have a .name(String) method: DataSet > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); If I use Eclipse type inference on x, it shows me that the output type is not correctly inferred: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This now works, but "Object" is not the output type However, these steps still cannot be chained -- the following still gives an error: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)).name("op"); i.e. first you have to assign the result to a field, so that the type is fully specified; then you can name the operation. And the weird thing is that you can give the correct, more specific type for the local variable, without a type narrowing error: FlatMapOperator , Tuple2 > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This works, although chaining these two lines still does not work If the types of the lambda args are specified, then everything works: DataSet > y = x.flatMap((Tuple2 t, Collector > out) -> out.collect(t)).name("op"); So, at least two things are going on here: (1) type inference is not working correctly for the lambda parameters (2) this breaks type inference for intermediate expressions, unless the type can be resolved using a local variable definition Is this a bug in the type signature of flatMap? (Or a compiler bug or limitation, or a fundamental limitation of Java 8 type inference?) It seems odd that the type of a local variable definition can make the result of the flatMap operator *more* specific, taking the type from FlatMapOperator , Object> to FlatMapOperator , Tuple2 > i.e. if the output type is provided in the local variable definition, it is properly unified with the type of the parameter t of collect(t), however that type is not propagated out of that call. Can anything be done about this in Flink? I have hit this problem a few times. was: I get an error if I try naming a flatMap operation: DataSet > y = x.flatMap((t, out) -> out.collect(t)).name("op"); Type mismatch: cannot convert from FlatMapOperator ,Object> to DataSet > If I try to do it as two steps, I get the error that DataSet does not have a .name(String) method: DataSet > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); If I use Eclipse type inference on x, it shows me that the output type is not correctly inferred: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This now works, but "Object" is not the output type However, these steps still cannot be chained -- the following still gives an error: FlatMapOperator , Object> y = x.flatMap((t, out) -> out.collect(t)).name("op"); i.e. first you have to assign the result to a field, so that the type is fully specified; then you can name the operation. And the weird thing is that you can give the correct, more specific type for the local variable, without a type narrowing error: FlatMapOperator , Tuple2 > y = x.flatMap((t, out) -> out.collect(t)); y.name("op"); // This works, although chaining these two lines still does not work If the types of the lambda args are specified, then everything works: DataSet > y = x.flatMap((Tuple2 t, Collector > out) -> out.collect(t)).name("op"); So, at least two things are going on here: (1) type inference is not working correctly for the lambda parameters (2) this breaks type inference for intermediate expressions, unless the type can be resolved using a local variable definition Is this a bug in the type signature of flatMap? (Or a compiler bug or limitation, or a fundamental limitation of Java 8 type inference?) It seems odd that the type of a local variable definition can make the result of the flatMap operator *more* specific, taking the type from FlatMapOperator , Object> to FlatMapOperator , Tuple2 > i.e. if the output type is
[jira] [Created] (FLINK-6024) Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key"
Luke Hutchison created FLINK-6024: - Summary: Need more fine-grained info for "InvalidProgramException: This type (...) cannot be used as key" Key: FLINK-6024 URL: https://issues.apache.org/jira/browse/FLINK-6024 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.2.0 Reporter: Luke Hutchison I got this very confusing exception: InvalidProgramException: This type (MyType) cannot be used as key I dug through the code, and could not find what was causing this. The help text for type.isKeyType(), in Keys.java:329, right before the exception is thrown, says: "Checks whether this type can be used as a key. As a bare minimum, types have to be hashable and comparable to be keys." However, this didn't solve the problem. I discovered that in my case, the error was occurring because I added a new constructor to the type, and I didn't have a default constructor. This is probably quite a common thing to happen for POJOs, so the error message should give some detail saying that this is the problem. Other things that can cause this to fail, including that the class is not public, or the constructor is not public, or the key field is not public, or that the key field is not a serializable type, or the key is not Comparable, or the key is not hashable, should be given in the error message instead, depending on the actual cause of the problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes
[ https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905779#comment-15905779 ] Luke Hutchison edited comment on FLINK-4785 at 3/10/17 11:48 PM: - I'm pretty sure I have seen backslash escaping in CSV before, but the old-school way of quoting quote characters (double double quotes) is the one that made it into the RFC, presumably for backwards compatibility with spreadsheets. >From my dup bug report, https://issues.apache.org/jira/browse/FLINK-6107 : -- The RFC for the CSV format specifies that double quotes are valid in quoted strings in CSV, by doubling the quote character: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing quoted quotes, such as: bob,"The name is ""Bob""" you get this exception: org.apache.flink.api.common.io.ParseException: Line could not be parsed: 'bob,"The name is ""Bob"""' ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String -- See also https://issues.apache.org/jira/browse/FLINK-6016 (quoted strings in CSV should be able to contain newlines). was (Author: lukehutch): I'm pretty sure I have seen backslash escaping in CSV before, but the old-school way of quoting quote characters (double double quotes) is the one that made it into the RFC, presumably for backwards compatibility with spreadsheets. Fabian -- you copied the text from the wrong bug report, https://issues.apache.org/jira/browse/FLINK-6016 , rather than https://issues.apache.org/jira/browse/FLINK-6107 , which is: -- The RFC for the CSV format specifies that double quotes are valid in quoted strings in CSV, by doubling the quote character: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing quoted quotes, such as: bob,"The name is ""Bob""" you get this exception: org.apache.flink.api.common.io.ParseException: Line could not be parsed: 'bob,"The name is ""Bob"""' ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String > Flink string parser doesn't handle string fields containing two consecutive > double quotes > - > > Key: FLINK-4785 > URL: https://issues.apache.org/jira/browse/FLINK-4785 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.2 >Reporter: Flavio Pompermaier > Labels: csv > > To reproduce the error run > https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes
[ https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905779#comment-15905779 ] Luke Hutchison commented on FLINK-4785: --- I'm pretty sure I have seen backslash escaping in CSV before, but the old-school way of quoting quote characters (double double quotes) is the one that made it into the RFC, presumably for backwards compatibility with spreadsheets. Fabian -- you copied the text from the wrong bug report, https://issues.apache.org/jira/browse/FLINK-6016 , rather than https://issues.apache.org/jira/browse/FLINK-6107 , which is: -- The RFC for the CSV format specifies that double quotes are valid in quoted strings in CSV, by doubling the quote character: https://tools.ietf.org/html/rfc4180 However, when parsing a CSV file with Flink containing quoted quotes, such as: bob,"The name is ""Bob""" you get this exception: org.apache.flink.api.common.io.ParseException: Line could not be parsed: 'bob,"The name is ""Bob"""' ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING Expect field types: class java.lang.String, class java.lang.String > Flink string parser doesn't handle string fields containing two consecutive > double quotes > - > > Key: FLINK-4785 > URL: https://issues.apache.org/jira/browse/FLINK-4785 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.2 >Reporter: Flavio Pompermaier > Labels: csv > > To reproduce the error run > https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6017) CSV reader does not support quoted double quotes
[ https://issues.apache.org/jira/browse/FLINK-6017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905775#comment-15905775 ] Luke Hutchison commented on FLINK-6017: --- Thanks, sorry for the dup! > CSV reader does not support quoted double quotes > > > Key: FLINK-6017 > URL: https://issues.apache.org/jira/browse/FLINK-6017 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats > Environment: Linux >Reporter: Luke Hutchison > > The RFC for the CSV format specifies that double quotes are valid in quoted > strings in CSV, by doubling the quote character: > https://tools.ietf.org/html/rfc4180 > However, when parsing a CSV file with Flink containing quoted quotes, such as: > bob,"The name is ""Bob""" > you get this exception: > org.apache.flink.api.common.io.ParseException: Line could not be parsed: > 'bob,"The name is ""Bob"""' > ParserError UNQUOTED_CHARS_AFTER_QUOTED_STRING > Expect field types: class java.lang.String, class java.lang.String -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6019) Some log4j messages do not have a loglevel field set, so they can't be suppressed
[ https://issues.apache.org/jira/browse/FLINK-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904680#comment-15904680 ] Luke Hutchison commented on FLINK-6019: --- Also, these messages do not respond to setting the logger to send to stderr rather than stdout, e.g. using log4j.appender.stdout.Target=System.err (other log messages do respond to this change to use stderre, and they display the correct loglevel, e.g. "INFO"/"WARN") i.e. it seems that there are two logger systems running: the one that writes through log4j, respecting its settings, and whatever is writing the log entries above. > Some log4j messages do not have a loglevel field set, so they can't be > suppressed > - > > Key: FLINK-6019 > URL: https://issues.apache.org/jira/browse/FLINK-6019 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 > Environment: Linux >Reporter: Luke Hutchison > > Some of the log messages do not appear to have a loglevel value set, so they > can't be suppressed by setting the log4j level to WARN. There's this line at > the beginning which doesn't even have a timestamp: > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1844933939] > And then there are numerous lines like this, missing an "INFO" field: > 03/10/2017 00:01:14 Job execution switched to status RUNNING. > 03/10/2017 00:01:14 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(1/8) switched to SCHEDULED > 03/10/2017 00:01:14 DataSink (count())(3/8) switched to DEPLOYING > 03/10/2017 00:01:15 DataSink (count())(3/8) switched to RUNNING > 03/10/2017 00:01:17 DataSink (count())(6/8) switched to FINISHED > 03/10/2017 00:01:17 DataSource (at readTable(DBTableReader.java:165) > (org.apache.flink.api.java.io.PojoCsvInputFormat))(6/8) switched to FINISHED > 03/10/2017 00:01:17 Job execution switched to status FINISHED. -- This message was sent by Atlassian JIRA (v6.3.15#6346)