[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713022#comment-14713022 ] ASF GitHub Bot commented on FLINK-2394: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1056#issuecomment-134982359 LGTM. +1 for merging. HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli Assignee: Fabian Hueske Fix For: 0.10, 0.9.1 MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14712929#comment-14712929 ] ASF GitHub Bot commented on FLINK-2394: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1056#discussion_r37968466 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala --- @@ -18,11 +18,17 @@ package org.apache.flink.api.scala.hadoop.mapred import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase -import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat} class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf) extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) { + def this(mapredOutputFormat: OutputFormat[K, V], outputCommitterClass: Class[OutputCommitter], + job: JobConf) { --- End diff -- Can we use multi line parameter lists as in other Scala files? HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli Assignee: Fabian Hueske Fix For: 0.10, 0.9.1 MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14712936#comment-14712936 ] ASF GitHub Bot commented on FLINK-2394: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1056#discussion_r37969330 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala --- @@ -18,11 +18,17 @@ package org.apache.flink.api.scala.hadoop.mapred import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase -import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat} class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf) extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) { + def this(mapredOutputFormat: OutputFormat[K, V], outputCommitterClass: Class[OutputCommitter], + job: JobConf) { --- End diff -- Fixed. I'd propose to add this to the Scala checkstyle, if we want to enforce it. HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli Assignee: Fabian Hueske Fix For: 0.10, 0.9.1 MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713023#comment-14713023 ] ASF GitHub Bot commented on FLINK-2394: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1056#issuecomment-134982439 This looks good. I'm merging this to the release branch. We have an issue to add tests for the output formats, so it's fine that this does not include a test yet. HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli Assignee: Fabian Hueske Fix For: 0.10, 0.9.1 MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713026#comment-14713026 ] ASF GitHub Bot commented on FLINK-2394: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1056 HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli Assignee: Fabian Hueske Fix For: 0.10, 0.9.1 MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14711474#comment-14711474 ] ASF GitHub Bot commented on FLINK-2394: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1056 [FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters. Right now, Flink's wrappers for Hadoop OutputFormats always use a `FileOutputCommitter`. - In the `mapreduce` API, Hadoop OutputFormats have a method `getOutputCommitter()` which can be overwritten and returns the `FileOutputFormat` by default. - In the `mapred`API, the `OutputCommitter` should be obtained from the `JobConf`. If nothing custom is set, a `FileOutputCommitter` is returned. This PR uses the respective methods to obtain the correct `OutputCommitter`. Since, `FileOutputCommitter` is the default in both cases, the original semantics are preserved if no custom committer is implemented or set by the user. I also added convenience methods to the constructors of the `mapred` wrappers to set the `OutputCommitter` in the `JobConf`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink hadoopOutCommitter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1056 commit a632203a948f2e7973339a0eab88750f7ce70cc5 Author: Fabian Hueske fhue...@apache.org Date: 2015-07-30T19:47:01Z [FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters. HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli Assignee: Fabian Hueske Fix For: 0.10, 0.9.1 MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14640007#comment-14640007 ] Stefano Bortoli commented on FLINK-2394: I see. I was in fact a little disappointed with mongo-hadoop for having a method getOutputCommitter() which was not standard in the OutputFormat. However, my first implementation was using the mapred Hadoop, so no big surprises. I would say that a good idea could be to simply have either 2 HadoopOutputFormatBase classes (1 per version of Hadoop), or handle the hadoop version to get the OutputCommitter accordingly. Meanwhile, I have implemented my own MongoHadoopOutputFormat extending the HadoopOutputFormat and overriding the open and close methods replacing the FileOutputCommitter with the MongoOutputCommiter. HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14640999#comment-14640999 ] Fabian Hueske commented on FLINK-2394: -- Hi [~stefano.bortoli], we do already have two HadoopOutputFormatBase classes, one for each Hadoop API. So treating both APIs differently is not a problem. The issue is that one API supports different OutputCommitters out-of-the-box (mapreduce) and the other one requires that the OutputCommitter is explicitly set (mapred), unless I overlooked something. HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639545#comment-14639545 ] Fabian Hueske commented on FLINK-2394: -- Hadoop handles {{OutputCommitters}} differently for its both APIs ({{mapred}} and {{mapreduce}}). - For the newer {{mapreduce}} API, things are quite simple. {{org.apache.hadoop.mapreduce.OutputFormat}} has an abstract method {{getOutputCommitter(TaskAttemptContext context)}} which provides the output committer that must be used. So updating the Flink {{mapreduce.HadoopOutputFormatBase}} class such that it gets the {{OutputCommitter}} directly from the {{OutputFormat}} instead of creating a {{FileOutputCommitter}} should be easy. - For the older {{mapred}} API, the OutputFormat interface does not define a {{getOutputCommitter()}} method. Instead the {{JobConf}} has a {{getOutputCommitter()}} method which returns a {{FileOutputCommitter}} if nothing else is defined. So we could update the {{mapred.HadoopOutputFormatBase}} to get the {{OutputCommitter}} from the {{JobConf}}. However, this would require that users manually set a different {{OutputCommitter}} in the {{JobConf}} which is not really intuitive. We could offer an additional constructor which sets a {{OutputCommitter}} in the provided {{JobConf}} argument, like [~stefano.bortoli] suggested. HadoopOutFormat OutputCommitter is default to FileOutputCommiter Key: FLINK-2394 URL: https://issues.apache.org/jira/browse/FLINK-2394 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.9.0 Reporter: Stefano Bortoli MongoOutputFormat does not write back in collection because the HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and is set as default to FileOutputCommitter. Therefore, on close and globalFinalize execution the commit does not happen and mongo collection stays untouched. A simple solution would be to: 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat that gets the OutputCommitter as a parameter 2 - change the outputCommitter field of HadoopOutputFormatBase to be a generic OutputCommitter 3 - remove the default assignment in the open() and finalizeGlobal to the outputCommitter to FileOutputCommitter(), or keep it as a default in case of no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)