Re: Using Hadoop Input/Output formats

2015-12-04 Thread Nick Dimiduk
Thanks for the comments everyone. For my part, i'm interested most in using
Hadoop's OutputFormats for writing out data at the end of a streaming job.

I also agree that while these "convenience methods" make for good example
code in slide decks, they're often not helpful for "real" applications. The
additional maintenance burden of a bloated API tends to be
counter-productive.

-n

On Wed, Nov 25, 2015 at 8:41 AM, Robert Metzger  wrote:

> I agree with Stephan.
>
> Reading static files is quite uncommon with the DataStream API. Before We
> add such a method, we should add a convenience method for Kafka ;)
> But in general, I'm not a big fan of adding too many of these methods
> because they pull in so many external classes, which lead to breaking API
> changes, dependency issues etc.
>
> I think such issues can be addressed easily with a good documentation
> (maybe in the "Best practices" guide), good answers on Stack Overflow and
> so on.
>
> On Wed, Nov 25, 2015 at 12:12 PM, Stephan Ewen  wrote:
>
>> For streaming, I am a bit torn whether reading a file will should have so
>> many such prominent functions. Most streaming programs work on message
>> queues, or on monitored directories.
>>
>> Not saying no, but not sure DataSet/DataStream parity is the main goal -
>> they are for different use cases after all...
>>
>> On Wed, Nov 25, 2015 at 8:22 AM, Chiwan Park 
>> wrote:
>>
>>> Thanks for correction @Fabian. :)
>>>
>>> > On Nov 25, 2015, at 4:40 AM, Suneel Marthi  wrote:
>>> >
>>> > Guess, it makes sense to add readHadoopXXX() methods to
>>> StreamExecutionEnvironment (for feature parity with what's existing
>>> presently in ExecutionEnvironment).
>>> >
>>> > Also Flink-2949 addresses the need to add relevant syntactic sugar
>>> wrappers in DataSet api for the code snippet in Fabian's previous email.
>>> Its not cool, having to instantiate a JobConf in client code and having to
>>> pass that around.
>>> >
>>> >
>>> >
>>> > On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske 
>>> wrote:
>>> > Hi Nick,
>>> >
>>> > you can use Flink's HadoopInputFormat wrappers also for the DataStream
>>> API. However, DataStream does not offer as much "sugar" as DataSet because
>>> StreamEnvironment does not offer dedicated createHadoopInput or
>>> readHadoopFile methods.
>>> >
>>> > In DataStream Scala you can read from a Hadoop InputFormat
>>> (TextInputFormat in this case) as follows:
>>> >
>>> > val textData: DataStream[(LongWritable, Text)] = env.createInput(
>>> >   new HadoopInputFormat[LongWritable, Text](
>>> > new TextInputFormat,
>>> > classOf[LongWritable],
>>> > classOf[Text],
>>> > new JobConf()
>>> > ))
>>> >
>>> > The Java version is very similar.
>>> >
>>> > Note: Flink has wrappers for both MR APIs: mapred and mapreduce.
>>> >
>>> > Cheers,
>>> > Fabian
>>> >
>>> > 2015-11-24 19:36 GMT+01:00 Chiwan Park :
>>> > I’m not streaming expert. AFAIK, the layer can be used with only
>>> DataSet. There are some streaming-specific features such as distributed
>>> snapshot in Flink. These need some supports of source and sink. So you have
>>> to implement I/O.
>>> >
>>> > > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk 
>>> wrote:
>>> > >
>>> > > I completely missed this, thanks Chiwan. Can these be used with
>>> DataStreams as well as DataSets?
>>> > >
>>> > > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park 
>>> wrote:
>>> > > Hi Nick,
>>> > >
>>> > > You can use Hadoop Input/Output Format without modification! Please
>>> check the documentation[1] in Flink homepage.
>>> > >
>>> > > [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
>>> > >
>>> > > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk 
>>> wrote:
>>> > > >
>>> > > > Hello,
>>> > > >
>>> > > > Is it possible to use existing Hadoop Input and OutputFormats with
>>> Flink? There's a lot of existing code that conforms to these interfaces,
>>> seems a shame to have to re-implement it all. Perhaps some adapter shim..?
>>> > > >
>>> > > > Thanks,
>>> > > > Nick
>>> > >
>>> > > Regards,
>>> > > Chiwan Park
>>> > >
>>> > >
>>> >
>>> > Regards,
>>> > Chiwan Park
>>> >
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>>
>>>
>>>
>>
>


Re: Using Hadoop Input/Output formats

2015-11-25 Thread Stephan Ewen
For streaming, I am a bit torn whether reading a file will should have so
many such prominent functions. Most streaming programs work on message
queues, or on monitored directories.

Not saying no, but not sure DataSet/DataStream parity is the main goal -
they are for different use cases after all...

On Wed, Nov 25, 2015 at 8:22 AM, Chiwan Park  wrote:

> Thanks for correction @Fabian. :)
>
> > On Nov 25, 2015, at 4:40 AM, Suneel Marthi  wrote:
> >
> > Guess, it makes sense to add readHadoopXXX() methods to
> StreamExecutionEnvironment (for feature parity with what's existing
> presently in ExecutionEnvironment).
> >
> > Also Flink-2949 addresses the need to add relevant syntactic sugar
> wrappers in DataSet api for the code snippet in Fabian's previous email.
> Its not cool, having to instantiate a JobConf in client code and having to
> pass that around.
> >
> >
> >
> > On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske 
> wrote:
> > Hi Nick,
> >
> > you can use Flink's HadoopInputFormat wrappers also for the DataStream
> API. However, DataStream does not offer as much "sugar" as DataSet because
> StreamEnvironment does not offer dedicated createHadoopInput or
> readHadoopFile methods.
> >
> > In DataStream Scala you can read from a Hadoop InputFormat
> (TextInputFormat in this case) as follows:
> >
> > val textData: DataStream[(LongWritable, Text)] = env.createInput(
> >   new HadoopInputFormat[LongWritable, Text](
> > new TextInputFormat,
> > classOf[LongWritable],
> > classOf[Text],
> > new JobConf()
> > ))
> >
> > The Java version is very similar.
> >
> > Note: Flink has wrappers for both MR APIs: mapred and mapreduce.
> >
> > Cheers,
> > Fabian
> >
> > 2015-11-24 19:36 GMT+01:00 Chiwan Park :
> > I’m not streaming expert. AFAIK, the layer can be used with only
> DataSet. There are some streaming-specific features such as distributed
> snapshot in Flink. These need some supports of source and sink. So you have
> to implement I/O.
> >
> > > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk  wrote:
> > >
> > > I completely missed this, thanks Chiwan. Can these be used with
> DataStreams as well as DataSets?
> > >
> > > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park 
> wrote:
> > > Hi Nick,
> > >
> > > You can use Hadoop Input/Output Format without modification! Please
> check the documentation[1] in Flink homepage.
> > >
> > > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
> > >
> > > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk 
> wrote:
> > > >
> > > > Hello,
> > > >
> > > > Is it possible to use existing Hadoop Input and OutputFormats with
> Flink? There's a lot of existing code that conforms to these interfaces,
> seems a shame to have to re-implement it all. Perhaps some adapter shim..?
> > > >
> > > > Thanks,
> > > > Nick
> > >
> > > Regards,
> > > Chiwan Park
> > >
> > >
> >
> > Regards,
> > Chiwan Park
> >
>
> Regards,
> Chiwan Park
>
>
>
>


Re: Using Hadoop Input/Output formats

2015-11-24 Thread Chiwan Park
Thanks for correction @Fabian. :)

> On Nov 25, 2015, at 4:40 AM, Suneel Marthi  wrote:
> 
> Guess, it makes sense to add readHadoopXXX() methods to 
> StreamExecutionEnvironment (for feature parity with what's existing presently 
> in ExecutionEnvironment).
> 
> Also Flink-2949 addresses the need to add relevant syntactic sugar wrappers 
> in DataSet api for the code snippet in Fabian's previous email. Its not cool, 
> having to instantiate a JobConf in client code and having to pass that 
> around. 
> 
> 
> 
> On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske  wrote:
> Hi Nick,
> 
> you can use Flink's HadoopInputFormat wrappers also for the DataStream API. 
> However, DataStream does not offer as much "sugar" as DataSet because 
> StreamEnvironment does not offer dedicated createHadoopInput or 
> readHadoopFile methods.
> 
> In DataStream Scala you can read from a Hadoop InputFormat (TextInputFormat 
> in this case) as follows:
> 
> val textData: DataStream[(LongWritable, Text)] = env.createInput(
>   new HadoopInputFormat[LongWritable, Text](
> new TextInputFormat,
> classOf[LongWritable],
> classOf[Text],
> new JobConf()
> ))
> 
> The Java version is very similar.
> 
> Note: Flink has wrappers for both MR APIs: mapred and mapreduce.
> 
> Cheers,
> Fabian
> 
> 2015-11-24 19:36 GMT+01:00 Chiwan Park :
> I’m not streaming expert. AFAIK, the layer can be used with only DataSet. 
> There are some streaming-specific features such as distributed snapshot in 
> Flink. These need some supports of source and sink. So you have to implement 
> I/O.
> 
> > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk  wrote:
> >
> > I completely missed this, thanks Chiwan. Can these be used with DataStreams 
> > as well as DataSets?
> >
> > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park  wrote:
> > Hi Nick,
> >
> > You can use Hadoop Input/Output Format without modification! Please check 
> > the documentation[1] in Flink homepage.
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
> >
> > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk  wrote:
> > >
> > > Hello,
> > >
> > > Is it possible to use existing Hadoop Input and OutputFormats with Flink? 
> > > There's a lot of existing code that conforms to these interfaces, seems a 
> > > shame to have to re-implement it all. Perhaps some adapter shim..?
> > >
> > > Thanks,
> > > Nick
> >
> > Regards,
> > Chiwan Park
> >
> >
> 
> Regards,
> Chiwan Park
> 

Regards,
Chiwan Park





Re: Using Hadoop Input/Output formats

2015-11-24 Thread Nick Dimiduk
I completely missed this, thanks Chiwan. Can these be used with DataStreams
as well as DataSets?

On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park  wrote:

> Hi Nick,
>
> You can use Hadoop Input/Output Format without modification! Please check
> the documentation[1] in Flink homepage.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
>
> > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk  wrote:
> >
> > Hello,
> >
> > Is it possible to use existing Hadoop Input and OutputFormats with
> Flink? There's a lot of existing code that conforms to these interfaces,
> seems a shame to have to re-implement it all. Perhaps some adapter shim..?
> >
> > Thanks,
> > Nick
>
> Regards,
> Chiwan Park
>
>
>
>


Re: Using Hadoop Input/Output formats

2015-11-24 Thread Chiwan Park
I’m not streaming expert. AFAIK, the layer can be used with only DataSet. There 
are some streaming-specific features such as distributed snapshot in Flink. 
These need some supports of source and sink. So you have to implement I/O.

> On Nov 25, 2015, at 3:22 AM, Nick Dimiduk  wrote:
> 
> I completely missed this, thanks Chiwan. Can these be used with DataStreams 
> as well as DataSets?
> 
> On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park  wrote:
> Hi Nick,
> 
> You can use Hadoop Input/Output Format without modification! Please check the 
> documentation[1] in Flink homepage.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
> 
> > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk  wrote:
> >
> > Hello,
> >
> > Is it possible to use existing Hadoop Input and OutputFormats with Flink? 
> > There's a lot of existing code that conforms to these interfaces, seems a 
> > shame to have to re-implement it all. Perhaps some adapter shim..?
> >
> > Thanks,
> > Nick
> 
> Regards,
> Chiwan Park
> 
> 

Regards,
Chiwan Park





Re: Using Hadoop Input/Output formats

2015-11-24 Thread Chiwan Park
Hi Nick,

You can use Hadoop Input/Output Format without modification! Please check the 
documentation[1] in Flink homepage.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html

> On Nov 25, 2015, at 3:04 AM, Nick Dimiduk  wrote:
> 
> Hello,
> 
> Is it possible to use existing Hadoop Input and OutputFormats with Flink? 
> There's a lot of existing code that conforms to these interfaces, seems a 
> shame to have to re-implement it all. Perhaps some adapter shim..?
> 
> Thanks,
> Nick

Regards,
Chiwan Park





Re: Using Hadoop Input/Output formats

2015-11-24 Thread Fabian Hueske
Hi Nick,

you can use Flink's HadoopInputFormat wrappers also for the DataStream API.
However, DataStream does not offer as much "sugar" as DataSet because
StreamEnvironment does not offer dedicated createHadoopInput or
readHadoopFile methods.

In DataStream Scala you can read from a Hadoop InputFormat (TextInputFormat
in this case) as follows:

val textData: DataStream[(LongWritable, Text)] = env.createInput(
  new HadoopInputFormat[LongWritable, Text](
new TextInputFormat,
classOf[LongWritable],
classOf[Text],
new JobConf()
))

The Java version is very similar.

Note: Flink has wrappers for both MR APIs: mapred and mapreduce.

Cheers,
Fabian

2015-11-24 19:36 GMT+01:00 Chiwan Park :

> I’m not streaming expert. AFAIK, the layer can be used with only DataSet.
> There are some streaming-specific features such as distributed snapshot in
> Flink. These need some supports of source and sink. So you have to
> implement I/O.
>
> > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk  wrote:
> >
> > I completely missed this, thanks Chiwan. Can these be used with
> DataStreams as well as DataSets?
> >
> > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park 
> wrote:
> > Hi Nick,
> >
> > You can use Hadoop Input/Output Format without modification! Please
> check the documentation[1] in Flink homepage.
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
> >
> > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk  wrote:
> > >
> > > Hello,
> > >
> > > Is it possible to use existing Hadoop Input and OutputFormats with
> Flink? There's a lot of existing code that conforms to these interfaces,
> seems a shame to have to re-implement it all. Perhaps some adapter shim..?
> > >
> > > Thanks,
> > > Nick
> >
> > Regards,
> > Chiwan Park
> >
> >
>
> Regards,
> Chiwan Park
>
>
>
>


Re: Using Hadoop Input/Output formats

2015-11-24 Thread Suneel Marthi
Guess, it makes sense to add readHadoopXXX() methods to
StreamExecutionEnvironment (for feature parity with what's existing
presently in ExecutionEnvironment).

Also Flink-2949 addresses the need to add relevant syntactic sugar wrappers
in DataSet api for the code snippet in Fabian's previous email. Its not
cool, having to instantiate a JobConf in client code and having to pass
that around.



On Tue, Nov 24, 2015 at 2:26 PM, Fabian Hueske  wrote:

> Hi Nick,
>
> you can use Flink's HadoopInputFormat wrappers also for the DataStream
> API. However, DataStream does not offer as much "sugar" as DataSet because
> StreamEnvironment does not offer dedicated createHadoopInput or
> readHadoopFile methods.
>
> In DataStream Scala you can read from a Hadoop InputFormat
> (TextInputFormat in this case) as follows:
>
> val textData: DataStream[(LongWritable, Text)] = env.createInput(
>   new HadoopInputFormat[LongWritable, Text](
> new TextInputFormat,
> classOf[LongWritable],
> classOf[Text],
> new JobConf()
> ))
>
> The Java version is very similar.
>
> Note: Flink has wrappers for both MR APIs: mapred and mapreduce.
>
> Cheers,
> Fabian
>
> 2015-11-24 19:36 GMT+01:00 Chiwan Park :
>
>> I’m not streaming expert. AFAIK, the layer can be used with only DataSet.
>> There are some streaming-specific features such as distributed snapshot in
>> Flink. These need some supports of source and sink. So you have to
>> implement I/O.
>>
>> > On Nov 25, 2015, at 3:22 AM, Nick Dimiduk  wrote:
>> >
>> > I completely missed this, thanks Chiwan. Can these be used with
>> DataStreams as well as DataSets?
>> >
>> > On Tue, Nov 24, 2015 at 10:06 AM, Chiwan Park 
>> wrote:
>> > Hi Nick,
>> >
>> > You can use Hadoop Input/Output Format without modification! Please
>> check the documentation[1] in Flink homepage.
>> >
>> > [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html
>> >
>> > > On Nov 25, 2015, at 3:04 AM, Nick Dimiduk 
>> wrote:
>> > >
>> > > Hello,
>> > >
>> > > Is it possible to use existing Hadoop Input and OutputFormats with
>> Flink? There's a lot of existing code that conforms to these interfaces,
>> seems a shame to have to re-implement it all. Perhaps some adapter shim..?
>> > >
>> > > Thanks,
>> > > Nick
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> >
>>
>> Regards,
>> Chiwan Park
>>
>>
>>
>>
>