[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713022#comment-14713022
 ] 

ASF GitHub Bot commented on FLINK-2394:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1056#issuecomment-134982359
  
LGTM. +1 for merging.


 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli
Assignee: Fabian Hueske
 Fix For: 0.10, 0.9.1


 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14712929#comment-14712929
 ] 

ASF GitHub Bot commented on FLINK-2394:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1056#discussion_r37968466
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
 ---
@@ -18,11 +18,17 @@
 package org.apache.flink.api.scala.hadoop.mapred
 
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
-import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
 
 class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], 
job: JobConf)
   extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
 
+  def this(mapredOutputFormat: OutputFormat[K, V], outputCommitterClass: 
Class[OutputCommitter],
+   job: JobConf) {
--- End diff --

Can we use multi line parameter lists as in other Scala files?


 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli
Assignee: Fabian Hueske
 Fix For: 0.10, 0.9.1


 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14712936#comment-14712936
 ] 

ASF GitHub Bot commented on FLINK-2394:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1056#discussion_r37969330
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
 ---
@@ -18,11 +18,17 @@
 package org.apache.flink.api.scala.hadoop.mapred
 
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
-import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
 
 class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], 
job: JobConf)
   extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
 
+  def this(mapredOutputFormat: OutputFormat[K, V], outputCommitterClass: 
Class[OutputCommitter],
+   job: JobConf) {
--- End diff --

Fixed. 
I'd propose to add this to the Scala checkstyle, if we want to enforce it.


 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli
Assignee: Fabian Hueske
 Fix For: 0.10, 0.9.1


 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713023#comment-14713023
 ] 

ASF GitHub Bot commented on FLINK-2394:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1056#issuecomment-134982439
  
This looks good. I'm merging this to the release branch. We have an issue 
to add tests for the output formats, so it's fine that this does not include a 
test yet.


 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli
Assignee: Fabian Hueske
 Fix For: 0.10, 0.9.1


 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14713026#comment-14713026
 ] 

ASF GitHub Bot commented on FLINK-2394:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1056


 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli
Assignee: Fabian Hueske
 Fix For: 0.10, 0.9.1


 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14711474#comment-14711474
 ] 

ASF GitHub Bot commented on FLINK-2394:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/1056

[FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters.

Right now, Flink's wrappers for Hadoop OutputFormats always use a 
`FileOutputCommitter`.

- In the `mapreduce` API, Hadoop OutputFormats have a method 
`getOutputCommitter()` which can be overwritten and returns the 
`FileOutputFormat` by default.
- In the `mapred`API, the `OutputCommitter` should be obtained from the 
`JobConf`. If nothing custom is set, a `FileOutputCommitter` is returned.

This PR uses the respective methods to obtain the correct 
`OutputCommitter`. Since, `FileOutputCommitter` is the default in both cases, 
the original semantics are preserved if no custom committer is implemented or 
set by the user.
I also added convenience methods to the constructors of the `mapred` 
wrappers to set the `OutputCommitter` in the `JobConf`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink hadoopOutCommitter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1056.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1056


commit a632203a948f2e7973339a0eab88750f7ce70cc5
Author: Fabian Hueske fhue...@apache.org
Date:   2015-07-30T19:47:01Z

[FLINK-2394] [fix] HadoopOutputFormats use correct OutputCommitters.




 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli
Assignee: Fabian Hueske
 Fix For: 0.10, 0.9.1


 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-07-24 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14640007#comment-14640007
 ] 

Stefano Bortoli commented on FLINK-2394:


I see. I was in fact a little disappointed with mongo-hadoop for having a 
method getOutputCommitter() which was not standard in the OutputFormat. 
However, my first implementation was using the mapred Hadoop, so no big 
surprises.

I would say that a good idea could be to simply have either 2 
HadoopOutputFormatBase classes (1 per version of Hadoop), or handle the hadoop 
version to get the OutputCommitter accordingly.

Meanwhile, I have implemented my own MongoHadoopOutputFormat extending the 
HadoopOutputFormat and overriding the open and close methods replacing the 
FileOutputCommitter with the MongoOutputCommiter. 

 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli

 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-07-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14640999#comment-14640999
 ] 

Fabian Hueske commented on FLINK-2394:
--

Hi [~stefano.bortoli], we do already have two HadoopOutputFormatBase classes, 
one for each Hadoop API. So treating both APIs differently is not a problem. 
The issue is that one API supports different OutputCommitters out-of-the-box 
(mapreduce) and the other one requires that the OutputCommitter is explicitly 
set (mapred), unless I overlooked something.

 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli

 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-07-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639545#comment-14639545
 ] 

Fabian Hueske commented on FLINK-2394:
--

Hadoop handles {{OutputCommitters}} differently for its both APIs ({{mapred}} 
and {{mapreduce}}).

- For the newer {{mapreduce}} API, things are quite simple.
{{org.apache.hadoop.mapreduce.OutputFormat}} has an abstract method 
{{getOutputCommitter(TaskAttemptContext context)}} which provides the output 
committer that must be used. So updating the Flink 
{{mapreduce.HadoopOutputFormatBase}} class such that it gets the 
{{OutputCommitter}} directly from the {{OutputFormat}} instead of creating a 
{{FileOutputCommitter}} should be easy.

- For the older {{mapred}} API, the OutputFormat interface does not define a 
{{getOutputCommitter()}} method. Instead the {{JobConf}} has a 
{{getOutputCommitter()}} method which returns a {{FileOutputCommitter}} if 
nothing else is defined. So we could update the 
{{mapred.HadoopOutputFormatBase}} to get the {{OutputCommitter}} from the 
{{JobConf}}. However, this would require that users manually set a different 
{{OutputCommitter}} in the {{JobConf}} which is not really intuitive. We could 
offer an additional constructor which sets a {{OutputCommitter}} in the 
provided {{JobConf}} argument, like [~stefano.bortoli] suggested.

 HadoopOutFormat OutputCommitter is default to FileOutputCommiter
 

 Key: FLINK-2394
 URL: https://issues.apache.org/jira/browse/FLINK-2394
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Affects Versions: 0.9.0
Reporter: Stefano Bortoli

 MongoOutputFormat does not write back in collection because the 
 HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
 is set as default to FileOutputCommitter. Therefore, on close and 
 globalFinalize execution the commit does not happen and mongo collection 
 stays untouched. 
 A simple solution would be to:
 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
 that gets the OutputCommitter as a parameter
 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
 generic OutputCommitter
 3 - remove the default assignment in the open() and finalizeGlobal to the 
 outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
 no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)