Upgrading to 0.8.1 helped, thx!

On 19.02.2015 22:08, Robert Metzger wrote:
Hey,

are you using Flink 0.8.0 ? I think we've added support for Hadoop input
formats with scala in 0.8.1 and 0.9 (master).

The following code just printed me the List of all page titles of the
catalan wikipedia ;)
(build against master)

defmain(args: Array[String]) {

   valenv = ExecutionEnvironment.getExecutionEnvironment
   valjob =newJobConf()
   valhadoopInput =newTextInputFormat()
   
FileInputFormat.addInputPath(job,newPath("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
   vallines = 
env.createHadoopInput(hadoopInput,classOf[LongWritable],classOf[Text], job)

   lines.print


   env.execute("Scala WordCount Example")
}




On Thu, Feb 19, 2015 at 9:56 PM, Sebastian <ssc.o...@googlemail.com
<mailto:ssc.o...@googlemail.com>> wrote:

    I tried to follow the example on the web page like this:

    ------------------------------__------------------------------__-----------

    implicit val env = ExecutionEnvironment.__getExecutionEnvironment

    val job = Job.getInstance

    val hadoopInput = new HadoopInputFormat[__LongWritable,Text](
       new TextInputFormat, classOf[LongWritable], classOf[Text], job)

    FileInputFormat.addInputPath(__job, new
    Path("/home/ssc/pld-index.gz")__)

    val lines: DataSet[Tuple2[LongWritable, Text]] =
         env.createInput(hadoopInput)

    val numLines = lines.map { _ => Tuple1(1) }
                           .sum(0)

    numLines.printToErr()

    env.execute()

    ------------------------------__------------------------------__-----------

    Unfortunately, I get the following exception, which I cannot resolve:

    Exception in thread "main"
    org.apache.flink.api.common.__functions.__InvalidTypesException:
    Tuple needs to be parameterized by using generics.
             at
    
org.apache.flink.api.java.__typeutils.TypeExtractor.__createTypeInfoWithTypeHierarch__y(TypeExtractor.java:258)
             at
    
org.apache.flink.api.java.__typeutils.TypeExtractor.__privateCreateTypeInfo(__TypeExtractor.java:201)
             at
    
org.apache.flink.api.java.__typeutils.TypeExtractor.__createTypeInfo(TypeExtractor.__java:188)
             at
    
io.ssc.trackthetrackers.__analysis.statistics.Playing$__delayedInit$body.apply(__Playing.scala:24)
             at scala.Function0$class.apply$__mcV$sp(Function0.scala:40)
             at
    scala.runtime.__AbstractFunction0.apply$mcV$__sp(AbstractFunction0.scala:12)
             at scala.App$$anonfun$main$1.__apply(App.scala:71)
             at scala.App$$anonfun$main$1.__apply(App.scala:71)
             at scala.collection.immutable.__List.foreach(List.scala:318)
             at
    
scala.collection.generic.__TraversableForwarder$class.__foreach(TraversableForwarder.__scala:32)
             at scala.App$class.main(App.__scala:71)
             at
    
io.ssc.trackthetrackers.__analysis.statistics.Playing$.__main(Playing.scala:15)
             at
    io.ssc.trackthetrackers.__analysis.statistics.Playing.__main(Playing.scala)
             at sun.reflect.__NativeMethodAccessorImpl.__invoke0(Native
    Method)
             at
    
sun.reflect.__NativeMethodAccessorImpl.__invoke(__NativeMethodAccessorImpl.java:__62)
             at
    
sun.reflect.__DelegatingMethodAccessorImpl.__invoke(__DelegatingMethodAccessorImpl.__java:43)
             at java.lang.reflect.Method.__invoke(Method.java:483)
             at
    com.intellij.rt.execution.__application.AppMain.main(__AppMain.java:134)

    Any tips on how to proceed?

    Best,
    Sebastian







    On 19.02.2015 21:36, Robert Metzger wrote:

        I just had a look at Hadoop's TextInputFormat.
        In hadoop-common-2.2.0.jar there are the following compression
        codecs
        contained:

        org.apache.hadoop.io.compress.__BZip2Codec
        org.apache.hadoop.io.compress.__DefaultCodec
        org.apache.hadoop.io.compress.__DeflateCodec
        org.apache.hadoop.io.compress.__GzipCodec
        org.apache.hadoop.io.compress.__Lz4Codec
        org.apache.hadoop.io.compress.__SnappyCodec

        (See also CompressionCodecFactory). So you should be good to go.


        On Thu, Feb 19, 2015 at 9:31 PM, Robert Metzger
        <rmetz...@apache.org <mailto:rmetz...@apache.org>
        <mailto:rmetz...@apache.org <mailto:rmetz...@apache.org>>> wrote:

             Hi,

             right now Flink itself has only support for reading ".deflate"
             files. Its basically the same algorithm as gzip but gzip
        files seem
             to have some header which makes the two formats incompatible.

             But you can easily use HadoopInputFormats with Flink. I'm
        sure there
             is a Hadoop IF for reading gzip'ed files.


             Best,
             Robert


             On Thu, Feb 19, 2015 at 9:25 PM, Sebastian
        <ssc.o...@googlemail.com <mailto:ssc.o...@googlemail.com>
             <mailto:ssc.open@googlemail.__com
        <mailto:ssc.o...@googlemail.com>>> wrote:

                 Hi,

                 does flink support reading gzipped files? Haven't found
        any info
                 about this on the website.

                 Best,
                 Sebastian




Reply via email to