Re: streaming using DeserializationSchema
Its not only about testing, I will also need to run things against different datasets. I want to reuse as much of the code as possible to load the same data from a file instead of kafka. Is there a simple way of loading the data from a File using the same conversion classes that I would use to transfrom them when I read them from kafka or do I have to write a new avro deserializer (InputFormat). On Fri, Feb 12, 2016 at 2:06 AM, Gyula Fórawrote: > Hey, > > A very simple thing you could do is to set up a simple kafka producer in a > java program that will feed the data into a topic. This also has the > additional benefit that you are actually testing against kafka. > > Cheers, > Gyula > > Martin Neumann ezt írta (időpont: 2016. febr. 12., P, > 0:20): > >> Hej, >> >> I have a stream program reading data from Kafka where the data is in >> avro. I have my own DeserializationSchema to deal with it. >> >> For testing reasons I want to read a dump from hdfs instead, is there a >> way to use the same DeserializationSchema to read from an avro file stored >> on hdfs? >> >> cheers Martin >> >
Re: consume kafka stream with flink
Hi Tanguy, I would recommend to refer to the documentation of the specific Flink version you are using. This is the documentation for 1.0-SNAPSHOT: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html and this is the doc for 0.10.x: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#apache-kafka I would recommend to use Flink 0.10.x and the FlinkKafkaConsumer082. With Flink 1.0 the consumer will be renamed to FlinkKafkaConsumer08. Let me know if you have further questions. Regards, Robert On Fri, Feb 12, 2016 at 12:07 PM, Tanguy Racinetwrote: > Hello, > > I am currently trying to develop am algorithm mining frequent item sets > over a data stream. > I am using kafka to generate the stream, however I cannot manage to link > Flink to Kafka. > > The code presented here is working but only using Flink version 0.9.1 > > https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java > It doesn’t work for me when using FLink 10-SNAPSHOT or the latest release. > > I tried several other things and was hoping someone could help me > understand the differences between kafkaSource, FlinkKafkaConsummer, > FlinkKafkaConsummer081 and FlinkKafkaConsummer082. > > Which one is the latest API ? Is there another way to consume Kafka steam > with Flink that is even more up to date ? Or should I use > FlinkKafkaConsummer082 and stay with Flink version 9 ? > > Thank you for your help. > Regards, > Tanguy R. >
Re: Compile issues with Flink 1.0-SNAPSHOT and Scala 2.11
Hi Cory! We found the problem. There is a development fork of Flink for Stream SQL, whose CI infrastructure accidentally also deployed snapshots and overwrote some of the proper master branch snapshots. That's why the snapshots got inconsistent. We fixed that, and newer snapshots should be online. Hope that this is resolved now. Sorry for the inconvenience, Stephan On Fri, Feb 12, 2016 at 12:51 AM, Stephan Ewenwrote: > Hi! > > The CI system has just finished uploading an new snapshot. In that one, > the scalatest dependency is now correctly at 2.11 again. > > > https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.232156-288.pom > > I am very puzzled, we did not touch any parts that seem to affect this. I > am wondering if it is possible that Maven had a hiccup... > > Can you retry (force dependency update), see if the dependencies are > correct again? > > > Greetings, > Stephan > > > On Fri, Feb 12, 2016 at 12:23 AM, Stephan Ewen wrote: > >> Hi! >> >> I examined the Apache Snapshot Repository, and I could see that in the >> latest snapshot a "scalatest_2.10" version was introduced. I could not >> figure out how, yet. I could not find a "flink-core_2.10" or >> "flink-annotations_2.10" dependency, yet. >> >> >> Previous snapshot: >> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.162913-286.pom >> >> Latest Snapshot: >> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-test-utils_2.11/1.0-SNAPSHOT/flink-test-utils_2.11-1.0-20160211.201205-287.pom >> >> >> We'll try and fix this ASAP. Sorry for that, this is quite a mystery >> right now... >> >> Best, >> Stephan >> >> On Thu, Feb 11, 2016 at 11:56 PM, Cory Monty > > wrote: >> >>> Ufuk, >>> >>> Thanks for the link. I've double-checked everything in our dependencies >>> list and it's all correct. >>> >>> Stephan, >>> >>> We don't explicitly depend on "flink-java", so there should be no >>> suffix. It's curious, to me, that scalatest is showing in the stack trace. >>> I also tried clearing ~/.sbt/staging and it did not help. Our build server >>> (CircleCI) is also experiencing the same issue, so I don't think it's local >>> to my machine. >>> >>> On Thu, Feb 11, 2016 at 4:09 PM, Stephan Ewen wrote: >>> Hi Cory! Hmmm, curios... I just double check the code, there are no more references to a Scala-versioned "flink-core" and "flink-annotations" project in the code base. The projects you use with Scala version suffix look good, actually. Just to be safe, can you check that the "flink-java" dependency is without suffix? One other thing I can imagine is a mixed up dependency cache. Can you try to refresh all snapshot dependencies (maybe clear "~/.sbt/staging/"). It is high-time for a 1.0 release, so you need not work on the SNAPSHOT versions any more. That should really solve this version conflict pain. If we are fast tomorrow, there may be a nice surprise coming up in the next days... Greetings, Stephan On Thu, Feb 11, 2016 at 10:24 PM, Cory Monty < cory.mo...@getbraintree.com> wrote: > Hmm. We don't explicitly include "flink-annotations" and we do not > append the Scala suffix for "flink-core": > > `"org.apache.flink" % "flink-core" % "1.0-SNAPSHOT"` > > Here are the packages we currently include with a Scala suffix: > > flink-scala > flink-clients > flink-streaming-scala > flink-connector-kafka-0.8 > flink-test-utils > flink-streaming-contrib > > If there is any documentation you can point to regarding when to > include the Scala suffix on Flink packages, let me know. > > > > On Thu, Feb 11, 2016 at 2:55 PM, Stephan Ewen > wrote: > >> Hi Cory! >> >> "flink-core" and "flink-annotations" should not have Scala suffixes, >> because they do not depend on Scala. >> >> So far, we mark the Scala independent projects without suffixes. Is >> that very confusing, or does that interfere with build tools? >> >> Greetings, >> Stephan >> >> >> On Thu, Feb 11, 2016 at 9:50 PM, Cory Monty < >> cory.mo...@getbraintree.com> wrote: >> >>> As of this afternoon, SBT is running into issues compiling with the >>> following error: >>> >>> [error] Modules were resolved with conflicting cross-version >>> suffixes in >>> [error]org.scalatest:scalatest _2.10, _2.11 >>> [error]org.apache.flink:flink-core _2.11, >>> [error]org.apache.flink:flink-annotations _2.11, >>> java.lang.RuntimeException: Conflicting cross-version suffixes in:
consume kafka stream with flink
Hello, I am currently trying to develop am algorithm mining frequent item sets over a data stream. I am using kafka to generate the stream, however I cannot manage to link Flink to Kafka. The code presented here is working but only using Flink version 0.9.1 https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java It doesn’t work for me when using FLink 10-SNAPSHOT or the latest release. I tried several other things and was hoping someone could help me understand the differences between kafkaSource, FlinkKafkaConsummer, FlinkKafkaConsummer081 and FlinkKafkaConsummer082. Which one is the latest API ? Is there another way to consume Kafka steam with Flink that is even more up to date ? Or should I use FlinkKafkaConsummer082 and stay with Flink version 9 ? Thank you for your help. Regards, Tanguy R.
Re: How to convert List to flink DataSet
Hello Fabian, Thank you for the response, but I have been stuck on how to iterate over the DataSet, perform operations and return a new modified DataSet similar to that of list operation as shown below. Eg: Currently I am doing the following: for (Centroid centroid : centroids.collect()) { for (Tuple2element : clusteredPoints.collect()) { //perform necessary operations } //add elements } //return elements list It would be really nice if I could just get started. I have been trying to add element to DataSet using *join*, but when I print the DataSet it contains only one initial element, it prints the same value as initial set value. for(){ newElement = new Tuple3 (); dataSetElement.join(env.fromElements(newElement)); dataSetElement.print(); } Unsure if I am using right function or using join in a wrong manner. Best Regards, Subash Basnet On Wed, Feb 10, 2016 at 6:33 PM, Fabian Hueske wrote: > I would try to do the outlier compuation with the DataSet API instead of > fetching the results to the client with collect(). > If you do that, you can directly use writeAsCsv because the result is > still a DataSet. > > What you have to do, is to translate your findOutliers method into DataSet > API code. > > Best, Fabian > > 2016-02-10 18:29 GMT+01:00 subash basnet : > >> Hello Fabian, >> >> As written before code: >> >> >> >> *DataSet fElements = >> env.fromCollection(findOutliers(clusteredPoints, >> finalCentroids));fElements.writeAsCsv(outputPath, "\n", " >> ");env.execute("KMeans Example");* >> I am very new to flink so not so clear about what you suggested, by >> option(1) you meant that I write my own FileWriter here rather than using >> *writeAsCsv()* method. And option(2) I couldn't understand where to >> compute the outlier. I would want to use the *writeAsCsv() *method but >> currently it doesn't perform the write operation and unable to understand >> why. >> >> An interesting thing I found is, when I run the *outlierDetection* class >> from eclipse a single file *result* gets written within the kmeans >> folder, whereas in case of default *KMeans* class it writes a result >> folder within the kmeans folder and the files with points are written >> inside the result folder. >> I give the necessary path in the arguments while running. >> Eg: file:///home/softwares/flink-0.10.0/kmeans/points >> file:///home/softwares/flink-0.10.0/kmeans/centers >> file:///home/softwares/flink-0.10.0/kmeans/result 10 >> >> Now, after I create the runnable jar file for KMeans and outlierDetection >> class, when I upload it to *flink web submission client *it works fine >> for *KMeans.jar*, the folder and files get created. But incase of >> *outlierDetection.jar* no file or folder get's written inside kmeans. >> >> How is it that outlier class is able to write file via eclipse but >> outlier jar not able to write via flink web submission client. >> >> >> Best Regards, >> Subash Basnet >> >> On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske wrote: >> >>> Hi Subash, >>> >>> I would not fetch the data to the client, do the computation there, and >>> send it back, just for the purpose of writing it to a file. >>> >>> Either 1) pull the results to the client and write the file from there >>> or 2) compute the outliers in the cluster. >>> I did not study your code completely, but the two nested loops and the >>> condition are a join for example. >>> >>> I would go for option 2, if possible. >>> >>> Best, Fabian >>> >>> >>> 2016-02-10 13:07 GMT+01:00 subash basnet : >>> Hello Fabian, I use the collect() method to get the elements locally and perform operations on that and return the result as a collection. The collection result is converted to the DataSet in the calling method. Below is the code of *findOutliers *method: public static List findOutliers(DataSet > clusteredPoints, DataSet centroids) throws Exception { List finalElements = new ArrayList(); *List > elements = clusteredPoints.collect();* * List centroidList = centroids.collect();* List , Double>> elementsWithDistance = new ArrayList , Double>>(); for (Centroid centroid : centroidList) { elementsWithDistance = new ArrayList , Double>>(); double totalDistance = 0; int elementsCount = 0; for (Tuple2 e : elements) { // compute distance if (e.f0 == centroid.id) { Tuple3 , Double> newElement = new Tuple3 , Double>(); double distance = e.f1.euclideanDistance(centroid); totalDistance += distance; newElement.setFields(centroid, e, distance);
Re: Merge or minus Dataset API missing
Ah ok, I didn't know about it! Thanks Till and Fabian! On Fri, Feb 12, 2016 at 5:11 PM, Fabian Hueskewrote: > Hi Flavio, > > If I got it right, you can use a FullOuterJoin. > It will give you both elements on a match and otherwise a left or a right > element and null. > > Best, Fabian > > 2016-02-12 16:48 GMT+01:00 Flavio Pompermaier : > >> Hi to all, >> >> I have a use case where I have to merge 2 datasets but I can't find a >> direct dataset API to do that. >> I want to execute some function when there's a match, otherwise move on >> the not-null element. >> At the moment I can do this in a fairly complicated way (I want to avoid >> broadcasting because the dataset could be big): using 2 leftOuterJoin plus >> a union. Is there a simpler way? >> >> >> Best, >> Flavio >> > >
Re: Merge or minus Dataset API missing
Hi Flavio, If I got it right, you can use a FullOuterJoin. It will give you both elements on a match and otherwise a left or a right element and null. Best, Fabian 2016-02-12 16:48 GMT+01:00 Flavio Pompermaier: > Hi to all, > > I have a use case where I have to merge 2 datasets but I can't find a > direct dataset API to do that. > I want to execute some function when there's a match, otherwise move on > the not-null element. > At the moment I can do this in a fairly complicated way (I want to avoid > broadcasting because the dataset could be big): using 2 leftOuterJoin plus > a union. Is there a simpler way? > > > Best, > Flavio >
Re: Master Thesis [Apache-flink paper references]
You might want to check out the Stratosphere project web site: http://stratosphere.eu/project/publications/ -Matthias On 02/12/2016 05:52 PM, subash basnet wrote: > Hello all, > > I am currently doing master's thesis on Apache-flink. It would be really > helpful to know about the reference papers followed for the > development/background of flink. > It would help me build a solid background knowledge to analyze flink. > > Currently I am reading all the related materials found in internet and > flink/data-artisan materials provided. > Could you please suggest me. > > > > Best Regards, > Subash Basnet signature.asc Description: OpenPGP digital signature
Re: Master Thesis [Apache-flink paper references]
Hello Matthias, Thank you very much :) Best Regards, Subash Basnet On Fri, Feb 12, 2016 at 8:22 PM, Matthias J. Saxwrote: > You might want to check out the Stratosphere project web site: > http://stratosphere.eu/project/publications/ > > -Matthias > > On 02/12/2016 05:52 PM, subash basnet wrote: > > Hello all, > > > > I am currently doing master's thesis on Apache-flink. It would be really > > helpful to know about the reference papers followed for the > > development/background of flink. > > It would help me build a solid background knowledge to analyze flink. > > > > Currently I am reading all the related materials found in internet and > > flink/data-artisan materials provided. > > Could you please suggest me. > > > > > > > > Best Regards, > > Subash Basnet > >
writeAsCSV with partitionBy
Hello, Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink? I'm looking to save output as CSV files partitioned by two columns(date and hour). The partitionBy dataset API is more to partition the data based on a column for further processing. I'm thinking there is no direct API to do this. But what will be the best way of achieving this? Srikanth
Re: streaming using DeserializationSchema
My input file contains newline-delimited JSON records, one per text line. The records on the Kafka topic are JSON blobs encoded to UTF8 and written as bytes. On Fri, Feb 12, 2016 at 1:41 PM, Martin Neumannwrote: > I'm trying the same thing now. > > I guess you need to read the file as byte arrays somehow to make it work. > What read function did you use? The mapper is not hard to write but the > byte array stuff gives me a headache. > > cheers Martin > > > > > On Fri, Feb 12, 2016 at 9:12 PM, Nick Dimiduk wrote: > >> Hi Martin, >> >> I have the same usecase. I wanted to be able to load from dumps of data >> in the same format as is on the kafak queue. I created a new application >> main, call it the "job" instead of the "flow". I refactored my code a bit >> for building the flow so all that can be reused via factory method. I then >> implemented a MapFunction that simply calls my existing deserializer. >> Create a new DataStream from flat file and tack on the MapFunction step. >> The resulting DataStream is then type-compatible with the Kakfa consumer >> that starts the "flow" application, so I pass it into the factory method. >> Tweak the ParameterTools options for the "job" application, et voilà! >> >> Sorry I don't have example code for you; this would be a good example to >> contribute back to the community's example library though. >> >> Good luck! >> -n >> >> On Fri, Feb 12, 2016 at 2:25 AM, Martin Neumann wrote: >> >>> Its not only about testing, I will also need to run things against >>> different datasets. I want to reuse as much of the code as possible to load >>> the same data from a file instead of kafka. >>> >>> Is there a simple way of loading the data from a File using the same >>> conversion classes that I would use to transfrom them when I read them from >>> kafka or do I have to write a new avro deserializer (InputFormat). >>> >>> On Fri, Feb 12, 2016 at 2:06 AM, Gyula Fóra >>> wrote: >>> Hey, A very simple thing you could do is to set up a simple kafka producer in a java program that will feed the data into a topic. This also has the additional benefit that you are actually testing against kafka. Cheers, Gyula Martin Neumann ezt írta (időpont: 2016. febr. 12., P, 0:20): > Hej, > > I have a stream program reading data from Kafka where the data is in > avro. I have my own DeserializationSchema to deal with it. > > For testing reasons I want to read a dump from hdfs instead, is there > a way to use the same DeserializationSchema to read from an avro file > stored on hdfs? > > cheers Martin > >>> >> >
Flink packaging makes life hard for SBT fat jar's
Repro at https://github.com/shikhar/flink-sbt-fatjar-troubles, run `sbt assembly` A fat jar seems like the best way to provide jobs for Flink to execute. I am declaring deps like: {noformat} "org.apache.flink" %% "flink-clients" % "1.0-SNAPSHOT" % "provided" "org.apache.flink" %% "flink-streaming-scala" % "1.0-SNAPSHOT" % "provided" "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.0-SNAPSHOT" {noformat} Connectors aren't included in the distribution so can't mark the Kafka connector as 'provided'. Using sbt-assembly plugin and running the 'assembly' task, I get lots of failures because: ``` [error] deduplicate: different file contents found in the following: [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-connector-kafka-0.8_2.11/jars/flink-connector-kafka-0.8_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-connector-kafka-base_2.11/jars/flink-connector-kafka-base_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-streaming-java_2.11/jars/flink-streaming-java_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-core/jars/flink-core-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-shaded-hadoop2/jars/flink-shaded-hadoop2-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-runtime_2.11/jars/flink-runtime_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-java/jars/flink-java-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-clients_2.11/jars/flink-clients_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class [error] /Users/shikhar/.ivy2/cache/org.apache.flink/flink-optimizer_2.11/jars/flink-optimizer_2.11-1.0-SNAPSHOT.jar:org/apache/flink/shaded/com/google/common/util/concurrent/package-info.class ``` I tried declaring a MergeStrategy as per https://github.com/shikhar/flink-sbt-fatjar-troubles/blob/master/build.sbt#L13-L18, which helps with the shading conflicts, but then I get lots of errors from conflicts in `commons-collections` vs `commons-beanutils` vs `commons-beanutils-core`, which are deps pulled in via Flink: ``` [error] deduplicate: different file contents found in the following: [error] /Users/shikhar/.ivy2/cache/commons-collections/commons-collections/jars/commons-collections-3.2.2.jar:org/apache/commons/collections/FastHashMap.class [error] /Users/shikhar/.ivy2/cache/commons-beanutils/commons-beanutils/jars/commons-beanutils-1.7.0.jar:org/apache/commons/collections/FastHashMap.class [error] /Users/shikhar/.ivy2/cache/commons-beanutils/commons-beanutils-core/jars/commons-beanutils-core-1.8.0.jar:org/apache/commons/collections/FastHashMap.class ``` The best way I have found to work around this for now is also mark the flink-kafka connector as a 'provided' dependency and customize flink-dist to include it :( I'd really rather not create a custom distribution. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-packaging-makes-life-hard-for-SBT-fat-jar-s-tp4897.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: streaming using DeserializationSchema
I'm trying the same thing now. I guess you need to read the file as byte arrays somehow to make it work. What read function did you use? The mapper is not hard to write but the byte array stuff gives me a headache. cheers Martin On Fri, Feb 12, 2016 at 9:12 PM, Nick Dimidukwrote: > Hi Martin, > > I have the same usecase. I wanted to be able to load from dumps of data in > the same format as is on the kafak queue. I created a new application main, > call it the "job" instead of the "flow". I refactored my code a bit for > building the flow so all that can be reused via factory method. I then > implemented a MapFunction that simply calls my existing deserializer. > Create a new DataStream from flat file and tack on the MapFunction step. > The resulting DataStream is then type-compatible with the Kakfa consumer > that starts the "flow" application, so I pass it into the factory method. > Tweak the ParameterTools options for the "job" application, et voilà! > > Sorry I don't have example code for you; this would be a good example to > contribute back to the community's example library though. > > Good luck! > -n > > On Fri, Feb 12, 2016 at 2:25 AM, Martin Neumann wrote: > >> Its not only about testing, I will also need to run things against >> different datasets. I want to reuse as much of the code as possible to load >> the same data from a file instead of kafka. >> >> Is there a simple way of loading the data from a File using the same >> conversion classes that I would use to transfrom them when I read them from >> kafka or do I have to write a new avro deserializer (InputFormat). >> >> On Fri, Feb 12, 2016 at 2:06 AM, Gyula Fóra wrote: >> >>> Hey, >>> >>> A very simple thing you could do is to set up a simple kafka producer in >>> a java program that will feed the data into a topic. This also has the >>> additional benefit that you are actually testing against kafka. >>> >>> Cheers, >>> Gyula >>> >>> Martin Neumann ezt írta (időpont: 2016. febr. 12., >>> P, 0:20): >>> Hej, I have a stream program reading data from Kafka where the data is in avro. I have my own DeserializationSchema to deal with it. For testing reasons I want to read a dump from hdfs instead, is there a way to use the same DeserializationSchema to read from an avro file stored on hdfs? cheers Martin >>> >> >