Re: streaming using DeserializationSchema

2016-02-12 Thread Martin Neumann
Its not only about testing, I will also need to run things against
different datasets. I want to reuse as much of the code as possible to load
the same data from a file instead of kafka.

Is there a simple way of loading the data from a File using the same
conversion classes that I would use to transfrom them when I read them from
kafka or do I have to write a new avro deserializer (InputFormat).

On Fri, Feb 12, 2016 at 2:06 AM, Gyula Fóra  wrote:

> Hey,
>
> A very simple thing you could do is to set up a simple kafka producer in a
> java program that will feed the data into a topic. This also has the
> additional benefit that you are actually testing against kafka.
>
> Cheers,
> Gyula
>
> Martin Neumann  ezt írta (időpont: 2016. febr. 12., P,
> 0:20):
>
>> Hej,
>>
>> I have a stream program reading data from Kafka where the data is in
>> avro. I have my own DeserializationSchema to deal with it.
>>
>> For testing reasons I want to read a dump from hdfs instead, is there a
>> way to use the same DeserializationSchema to read from an avro file stored
>> on hdfs?
>>
>> cheers Martin
>>
>


Re: consume kafka stream with flink

2016-02-12 Thread Robert Metzger
Hi Tanguy,

I would recommend to refer to the documentation of the specific Flink
version you are using.

This is the documentation for 1.0-SNAPSHOT:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html
and this is the doc for 0.10.x:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#apache-kafka

I would recommend to use Flink 0.10.x and the FlinkKafkaConsumer082. With
Flink 1.0 the consumer will be renamed to FlinkKafkaConsumer08.

Let me know if you have further questions.


Regards,
Robert


On Fri, Feb 12, 2016 at 12:07 PM, Tanguy Racinet 
wrote:

> Hello,
>
> I am currently trying to develop am algorithm mining frequent item sets
> over a data stream.
> I am using kafka to generate the stream, however I cannot manage to link
> Flink to Kafka.
>
> The code presented here is working but only using Flink version 0.9.1
>
> https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java
> It doesn’t work for me when using FLink 10-SNAPSHOT or the latest release.
>
> I tried several other things and was hoping someone could help me
> understand the differences between kafkaSource, FlinkKafkaConsummer,
> FlinkKafkaConsummer081 and FlinkKafkaConsummer082.
>
> Which one is the latest API ? Is there another way to consume Kafka steam
> with Flink that is even more up to date ? Or should I use
> FlinkKafkaConsummer082 and stay with Flink version 9 ?
>
> Thank you for your help.
> Regards,
> Tanguy R.
>


Re: Compile issues with Flink 1.0-SNAPSHOT and Scala 2.11

2016-02-12 Thread Stephan Ewen
Hi Cory!

We found the problem. There is a development fork of Flink for Stream SQL,
whose CI infrastructure accidentally also deployed snapshots and overwrote
some of the proper master branch snapshots.

That's why the snapshots got inconsistent. We fixed that, and newer
snapshots should be online.
Hope that this is resolved now.

Sorry for the inconvenience,
Stephan


On Fri, Feb 12, 2016 at 12:51 AM, Stephan Ewen  wrote:

> Hi!
>
> The CI system has just finished uploading an new snapshot. In that one,
> the scalatest dependency is now correctly at 2.11 again.
>
>
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.232156-288.pom
>
> I am very puzzled, we did not touch any parts that seem to affect this. I
> am wondering if it is possible that Maven had a hiccup...
>
> Can you retry (force dependency update), see if the dependencies are
> correct again?
>
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 12, 2016 at 12:23 AM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> I examined the Apache Snapshot Repository, and I could see that in the
>> latest snapshot a "scalatest_2.10" version was introduced. I could not
>> figure out how, yet. I could not find a "flink-core_2.10" or
>> "flink-annotations_2.10" dependency, yet.
>>
>>
>> Previous snapshot:
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.162913-286.pom
>>
>> Latest Snapshot:
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.201205-287.pom
>>
>>
>> We'll try and fix this ASAP. Sorry for that, this is quite a mystery
>> right now...
>>
>> Best,
>> Stephan
>>
>> On Thu, Feb 11, 2016 at 11:56 PM, Cory Monty > > wrote:
>>
>>> Ufuk,
>>>
>>> Thanks for the link. I've double-checked everything in our dependencies
>>> list and it's all correct.
>>>
>>> Stephan,
>>>
>>> We don't explicitly depend on "flink-java", so there should be no
>>> suffix. It's curious, to me, that scalatest is showing in the stack trace.
>>> I also tried clearing ~/.sbt/staging and it did not help. Our build server
>>> (CircleCI) is also experiencing the same issue, so I don't think it's local
>>> to my machine.
>>>
>>> On Thu, Feb 11, 2016 at 4:09 PM, Stephan Ewen  wrote:
>>>
 Hi Cory!

 Hmmm, curios... I just double check the code, there are no more
 references to a Scala-versioned "flink-core" and "flink-annotations"
 project in the code base.

 The projects you use with Scala version suffix look good, actually.
 Just to be safe, can you check that the "flink-java" dependency is without
 suffix?

 One other thing I can imagine is a mixed up dependency cache. Can you
 try to refresh all snapshot dependencies (maybe clear "~/.sbt/staging/").


 It is high-time for a 1.0 release, so you need not work on the SNAPSHOT
 versions any more. That should really solve this version conflict pain.
 If we are fast tomorrow, there may be a nice surprise coming up in the
 next days...

 Greetings,
 Stephan


 On Thu, Feb 11, 2016 at 10:24 PM, Cory Monty <
 cory.mo...@getbraintree.com> wrote:

> Hmm. We don't explicitly include "flink-annotations" and we do not
> append the Scala suffix for "flink-core":
>
> `"org.apache.flink" % "flink-core" % "1.0-SNAPSHOT"`
>
> Here are the packages we currently include with a Scala suffix:
>
> flink-scala
> flink-clients
> flink-streaming-scala
> flink-connector-kafka-0.8
> flink-test-utils
> flink-streaming-contrib
>
> If there is any documentation you can point to regarding when to
> include the Scala suffix on Flink packages, let me know.
>
>
>
> On Thu, Feb 11, 2016 at 2:55 PM, Stephan Ewen 
> wrote:
>
>> Hi Cory!
>>
>> "flink-core" and "flink-annotations" should not have Scala suffixes,
>> because they do not depend on Scala.
>>
>> So far, we mark the Scala independent projects without suffixes. Is
>> that very confusing, or does that interfere with build tools?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Thu, Feb 11, 2016 at 9:50 PM, Cory Monty <
>> cory.mo...@getbraintree.com> wrote:
>>
>>> As of this afternoon, SBT is running into issues compiling with the
>>> following error:
>>>
>>> [error] Modules were resolved with conflicting cross-version
>>> suffixes in
>>> [error]org.scalatest:scalatest _2.10, _2.11
>>> [error]org.apache.flink:flink-core _2.11, 
>>> [error]org.apache.flink:flink-annotations _2.11, 
>>> java.lang.RuntimeException: Conflicting cross-version suffixes in:

consume kafka stream with flink

2016-02-12 Thread Tanguy Racinet
Hello,

I am currently trying to develop am algorithm mining frequent item sets over a 
data stream.
I am using kafka to generate the stream, however I cannot manage to link Flink 
to Kafka.

The code presented here is working but only using Flink version 0.9.1
https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java
It doesn’t work for me when using FLink 10-SNAPSHOT or the latest release.

I tried several other things and was hoping someone could help me understand 
the differences between kafkaSource, FlinkKafkaConsummer, 
FlinkKafkaConsummer081 and FlinkKafkaConsummer082.

Which one is the latest API ? Is there another way to consume Kafka steam with 
Flink that is even more up to date ? Or should I use FlinkKafkaConsummer082 and 
stay with Flink version 9 ?

Thank you for your help.
Regards,
Tanguy R.

Re: How to convert List to flink DataSet

2016-02-12 Thread subash basnet
Hello Fabian,

Thank you for the response, but I have been stuck on how to iterate over
the DataSet, perform operations and return a new modified DataSet similar
to that of list operation as shown below.
Eg: Currently I am doing the following:
for (Centroid centroid : centroids.collect()) {
for (Tuple2 element : clusteredPoints.collect()) {
   //perform necessary operations
 }
//add elements
}
//return elements list

It would be really nice if I could just get started.

I have been trying to add element to DataSet using *join*, but when I print
the DataSet it contains only one initial element, it prints the same value
as initial set value.
for(){
newElement = new Tuple3();
dataSetElement.join(env.fromElements(newElement));
dataSetElement.print();
}

Unsure if I am using right function or using join in a wrong manner.

Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 6:33 PM, Fabian Hueske  wrote:

> I would try to do the outlier compuation with the DataSet API instead of
> fetching the results to the client with collect().
> If you do that, you can directly use writeAsCsv because the result is
> still a DataSet.
>
> What you have to do, is to translate your findOutliers method into DataSet
> API code.
>
> Best, Fabian
>
> 2016-02-10 18:29 GMT+01:00 subash basnet :
>
>> Hello Fabian,
>>
>> As written before code:
>>
>>
>>
>> *DataSet fElements =
>> env.fromCollection(findOutliers(clusteredPoints,
>> finalCentroids));fElements.writeAsCsv(outputPath, "\n", "
>> ");env.execute("KMeans Example");*
>> I am very new to flink so not so clear about what you suggested, by
>> option(1) you meant that I write my own FileWriter here rather than using
>> *writeAsCsv()* method. And option(2) I couldn't understand where to
>> compute the outlier. I would want to use the *writeAsCsv() *method but
>> currently it doesn't perform the write operation and unable to understand
>> why.
>>
>> An interesting thing I found is, when I run the *outlierDetection* class
>> from eclipse a single file *result* gets written within the kmeans
>> folder, whereas in case of default *KMeans* class it writes a result
>> folder within the kmeans folder and the files with points are written
>> inside the result folder.
>> I give the necessary path in the arguments while running.
>> Eg: file:///home/softwares/flink-0.10.0/kmeans/points
>> file:///home/softwares/flink-0.10.0/kmeans/centers
>> file:///home/softwares/flink-0.10.0/kmeans/result 10
>>
>> Now, after I create the runnable jar file for KMeans and outlierDetection
>> class,  when I upload it to *flink web submission client *it works fine
>> for *KMeans.jar*, the folder and files get created. But incase of
>> *outlierDetection.jar* no file or folder get's written inside kmeans.
>>
>> How is it that outlier class is able to write file via eclipse but
>> outlier jar not able to write via flink web submission client.
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>> On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske  wrote:
>>
>>> Hi Subash,
>>>
>>> I would not fetch the data to the client, do the computation there, and
>>> send it back, just for the purpose of writing it to a file.
>>>
>>> Either 1) pull the results to the client and write the file from there
>>> or 2) compute the outliers in the cluster.
>>> I did not study your code completely, but the two nested loops and the
>>> condition are a join for example.
>>>
>>> I would go for option 2, if possible.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2016-02-10 13:07 GMT+01:00 subash basnet :
>>>
 Hello Fabian,

 I use the collect() method to get the elements locally and perform
 operations on that and return the result as a collection. The collection
 result is converted to the DataSet in the calling method.
 Below is the code of *findOutliers *method:

 public static List findOutliers(DataSet>
 clusteredPoints,
 DataSet centroids) throws Exception {
 List finalElements = new ArrayList();
 *List> elements = clusteredPoints.collect();*
 * List centroidList = centroids.collect();*
 List, Double>>
 elementsWithDistance = new ArrayList, Double>>();
 for (Centroid centroid : centroidList) {
 elementsWithDistance = new ArrayList, Double>>();
 double totalDistance = 0;
 int elementsCount = 0;
 for (Tuple2 e : elements) {
 // compute distance
 if (e.f0 == centroid.id) {
 Tuple3, Double> newElement = new
 Tuple3, Double>();
 double distance = e.f1.euclideanDistance(centroid);
 totalDistance += distance;
 newElement.setFields(centroid, e, distance);
 

Re: Merge or minus Dataset API missing

2016-02-12 Thread Flavio Pompermaier
Ah ok, I didn't know about it! Thanks Till and Fabian!

On Fri, Feb 12, 2016 at 5:11 PM, Fabian Hueske  wrote:

> Hi Flavio,
>
> If I got it right, you can use a FullOuterJoin.
> It will give you both elements on a match and otherwise a left or a right
> element and null.
>
> Best, Fabian
>
> 2016-02-12 16:48 GMT+01:00 Flavio Pompermaier :
>
>> Hi to all,
>>
>> I have a use case where I have to merge 2 datasets but I can't find a
>> direct dataset API to do that.
>> I want to execute some function when there's a match, otherwise move on
>> the not-null element.
>> At the moment I can do this in a fairly complicated way (I want to avoid
>> broadcasting because the dataset could be big): using 2 leftOuterJoin plus
>> a union. Is there a simpler way?
>>
>>
>> Best,
>> Flavio
>>
>
>


Re: Merge or minus Dataset API missing

2016-02-12 Thread Fabian Hueske
Hi Flavio,

If I got it right, you can use a FullOuterJoin.
It will give you both elements on a match and otherwise a left or a right
element and null.

Best, Fabian

2016-02-12 16:48 GMT+01:00 Flavio Pompermaier :

> Hi to all,
>
> I have a use case where I have to merge 2 datasets but I can't find a
> direct dataset API to do that.
> I want to execute some function when there's a match, otherwise move on
> the not-null element.
> At the moment I can do this in a fairly complicated way (I want to avoid
> broadcasting because the dataset could be big): using 2 leftOuterJoin plus
> a union. Is there a simpler way?
>
>
> Best,
> Flavio
>


Re: Master Thesis [Apache-flink paper references]

2016-02-12 Thread Matthias J. Sax
You might want to check out the Stratosphere project web site:
http://stratosphere.eu/project/publications/

-Matthias

On 02/12/2016 05:52 PM, subash basnet wrote:
> Hello all,
> 
> I am currently doing master's thesis on Apache-flink. It would be really
> helpful to know about the reference papers followed for the
> development/background of flink. 
> It would help me build a solid background knowledge to analyze flink. 
> 
> Currently I am reading all the related materials found in internet and
> flink/data-artisan materials provided. 
> Could you please suggest me. 
> 
> 
> 
> Best Regards,
> Subash Basnet



signature.asc
Description: OpenPGP digital signature


Re: Master Thesis [Apache-flink paper references]

2016-02-12 Thread subash basnet
Hello Matthias,

Thank you very much :)

Best Regards,
Subash Basnet

On Fri, Feb 12, 2016 at 8:22 PM, Matthias J. Sax  wrote:

> You might want to check out the Stratosphere project web site:
> http://stratosphere.eu/project/publications/
>
> -Matthias
>
> On 02/12/2016 05:52 PM, subash basnet wrote:
> > Hello all,
> >
> > I am currently doing master's thesis on Apache-flink. It would be really
> > helpful to know about the reference papers followed for the
> > development/background of flink.
> > It would help me build a solid background knowledge to analyze flink.
> >
> > Currently I am reading all the related materials found in internet and
> > flink/data-artisan materials provided.
> > Could you please suggest me.
> >
> >
> >
> > Best Regards,
> > Subash Basnet
>
>


writeAsCSV with partitionBy

2016-02-12 Thread Srikanth
Hello,



Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?

I'm looking to save output as CSV files partitioned by two columns(date and
hour).

The partitionBy dataset API is more to partition the data based on a column
for further processing.



I'm thinking there is no direct API to do this. But what will be the best
way of achieving this?



Srikanth


Re: streaming using DeserializationSchema

2016-02-12 Thread Nick Dimiduk
My input file contains newline-delimited JSON records, one per text line.
The records on the Kafka topic are JSON blobs encoded to UTF8 and written
as bytes.

On Fri, Feb 12, 2016 at 1:41 PM, Martin Neumann  wrote:

> I'm trying the same thing now.
>
> I guess you need to read the file as byte arrays somehow to make it work.
> What read function did you use? The mapper is not hard to write but the
> byte array stuff gives me a headache.
>
> cheers Martin
>
>
>
>
> On Fri, Feb 12, 2016 at 9:12 PM, Nick Dimiduk  wrote:
>
>> Hi Martin,
>>
>> I have the same usecase. I wanted to be able to load from dumps of data
>> in the same format as is on the kafak queue. I created a new application
>> main, call it the "job" instead of the "flow". I refactored my code a bit
>> for building the flow so all that can be reused via factory method. I then
>> implemented a MapFunction that simply calls my existing deserializer.
>> Create a new DataStream from flat file and tack on the MapFunction step.
>> The resulting DataStream is then type-compatible with the Kakfa consumer
>> that starts the "flow" application, so I pass it into the factory method.
>> Tweak the ParameterTools options for the "job" application, et voilà!
>>
>> Sorry I don't have example code for you; this would be a good example to
>> contribute back to the community's example library though.
>>
>> Good luck!
>> -n
>>
>> On Fri, Feb 12, 2016 at 2:25 AM, Martin Neumann  wrote:
>>
>>> Its not only about testing, I will also need to run things against
>>> different datasets. I want to reuse as much of the code as possible to load
>>> the same data from a file instead of kafka.
>>>
>>> Is there a simple way of loading the data from a File using the same
>>> conversion classes that I would use to transfrom them when I read them from
>>> kafka or do I have to write a new avro deserializer (InputFormat).
>>>
>>> On Fri, Feb 12, 2016 at 2:06 AM, Gyula Fóra 
>>> wrote:
>>>
 Hey,

 A very simple thing you could do is to set up a simple kafka producer
 in a java program that will feed the data into a topic. This also has the
 additional benefit that you are actually testing against kafka.

 Cheers,
 Gyula

 Martin Neumann  ezt írta (időpont: 2016. febr. 12.,
 P, 0:20):

> Hej,
>
> I have a stream program reading data from Kafka where the data is in
> avro. I have my own DeserializationSchema to deal with it.
>
> For testing reasons I want to read a dump from hdfs instead, is there
> a way to use the same DeserializationSchema to read from an avro file
> stored on hdfs?
>
> cheers Martin
>

>>>
>>
>


Flink packaging makes life hard for SBT fat jar's

2016-02-12 Thread shikhar
Repro at https://github.com/shikhar/flink-sbt-fatjar-troubles, run `sbt
assembly`

A fat jar seems like the best way to provide jobs for Flink to execute.

I am declaring deps like:
{noformat}
"org.apache.flink" %% "flink-clients" % "1.0-SNAPSHOT" % "provided"
"org.apache.flink" %% "flink-streaming-scala" % "1.0-SNAPSHOT" % "provided"
"org.apache.flink" %% "flink-connector-kafka-0.8" % "1.0-SNAPSHOT"
{noformat}

Connectors aren't included in the distribution so can't mark the Kafka
connector as 'provided'.

Using sbt-assembly plugin and running the 'assembly' task, I get lots of
failures because:

```
[error] deduplicate: different file contents found in the following:
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-connector-kafka-0.8_2.11/jars/flink-connector-kafka-0.8_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-connector-kafka-base_2.11/jars/flink-connector-kafka-base_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-streaming-java_2.11/jars/flink-streaming-java_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-core/jars/flink-core-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-shaded-hadoop2/jars/flink-shaded-hadoop2-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-runtime_2.11/jars/flink-runtime_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-java/jars/flink-java-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-clients_2.11/jars/flink-clients_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
[error]
/Users/shikhar/.ivy2/cache/org.apache.flink/flink-optimizer_2.11/jars/flink-optimizer_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class
```

I tried declaring a MergeStrategy as per
https://github.com/shikhar/flink-sbt-fatjar-troubles/blob/master/build.sbt#L13-L18,
which helps with the shading conflicts, but then I get lots of errors from
conflicts in `commons-collections` vs `commons-beanutils` vs
`commons-beanutils-core`, which are deps pulled in via Flink:

```
[error] deduplicate: different file contents found in the following:
[error]
/Users/shikhar/.ivy2/cache/commons-collections/commons-collections/jars/commons-collections-3.2.2.jar:org/apache/commons/collections/FastHashMap.class
[error]
/Users/shikhar/.ivy2/cache/commons-beanutils/commons-beanutils/jars/commons-beanutils-1.7.0.jar:org/apache/commons/collections/FastHashMap.class
[error]
/Users/shikhar/.ivy2/cache/commons-beanutils/commons-beanutils-core/jars/commons-beanutils-core-1.8.0.jar:org/apache/commons/collections/FastHashMap.class
```

The best way I have found to work around this for now is also mark the
flink-kafka connector as a 'provided' dependency and customize flink-dist to
include it :( I'd really rather not create a custom distribution.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-packaging-makes-life-hard-for-SBT-fat-jar-s-tp4897.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: streaming using DeserializationSchema

2016-02-12 Thread Martin Neumann
I'm trying the same thing now.

I guess you need to read the file as byte arrays somehow to make it work.
What read function did you use? The mapper is not hard to write but the
byte array stuff gives me a headache.

cheers Martin




On Fri, Feb 12, 2016 at 9:12 PM, Nick Dimiduk  wrote:

> Hi Martin,
>
> I have the same usecase. I wanted to be able to load from dumps of data in
> the same format as is on the kafak queue. I created a new application main,
> call it the "job" instead of the "flow". I refactored my code a bit for
> building the flow so all that can be reused via factory method. I then
> implemented a MapFunction that simply calls my existing deserializer.
> Create a new DataStream from flat file and tack on the MapFunction step.
> The resulting DataStream is then type-compatible with the Kakfa consumer
> that starts the "flow" application, so I pass it into the factory method.
> Tweak the ParameterTools options for the "job" application, et voilà!
>
> Sorry I don't have example code for you; this would be a good example to
> contribute back to the community's example library though.
>
> Good luck!
> -n
>
> On Fri, Feb 12, 2016 at 2:25 AM, Martin Neumann  wrote:
>
>> Its not only about testing, I will also need to run things against
>> different datasets. I want to reuse as much of the code as possible to load
>> the same data from a file instead of kafka.
>>
>> Is there a simple way of loading the data from a File using the same
>> conversion classes that I would use to transfrom them when I read them from
>> kafka or do I have to write a new avro deserializer (InputFormat).
>>
>> On Fri, Feb 12, 2016 at 2:06 AM, Gyula Fóra  wrote:
>>
>>> Hey,
>>>
>>> A very simple thing you could do is to set up a simple kafka producer in
>>> a java program that will feed the data into a topic. This also has the
>>> additional benefit that you are actually testing against kafka.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> Martin Neumann  ezt írta (időpont: 2016. febr. 12.,
>>> P, 0:20):
>>>
 Hej,

 I have a stream program reading data from Kafka where the data is in
 avro. I have my own DeserializationSchema to deal with it.

 For testing reasons I want to read a dump from hdfs instead, is there a
 way to use the same DeserializationSchema to read from an avro file stored
 on hdfs?

 cheers Martin

>>>
>>
>