Re: Joining two kafka streams
Hi Tzu-Li, Huge thanks for the input, I'll try to implement prototype of your idea and see if it answers my requirements On 9 January 2017 at 08:02, Tzu-Li (Gordon) Taiwrote: > Hi Igor! > > What you can actually do is let a single FlinkKafkaConsumer consume from > both topics, producing a single DataStream which you can keyBy afterwards. > All versions of the FlinkKafkaConsumer support consuming multiple Kafka > topics simultaneously. This is logically the same as union and then a > keyBy, like what you described. > > Note that this approach requires that the records in both of your Kafka > topics are of the same type when consumed into Flink (ex., same POJO > classes, or simply both as Strings, etc.). > If that isn’t possible and you have different data types / schemas for the > topics, you’d probably need to use “connect” and then a keyBy. > > If you’re applying a window directly after joining the two topic streams, > you could also use a window join: > > dataStream.join(otherStream) > .where().equalTo() > .window(TumblingEventTimeWindows.of(Time.seconds(3))) > .apply (new JoinFunction () {...}); > > The “where” specifies how to select the key from the first stream, and > “equalTo” the second one. > > Hope this helps, let me know if you have other questions! > > Cheers, > Gordon > > On January 9, 2017 at 4:06:34 AM, igor.berman (igor.ber...@gmail.com) > wrote: > > Hi, > I have usecase when I need to join two kafka topics together by some > fields. > In general, I could put content of one topic into another, and partition > by > same key, but I can't touch those two topics(i.e. there are other > consumers > from those topics), on the other hand it's essential to process same keys > at > same "thread" to achieve locality and not to get races when working with > same key from different machines/threads > > my idea is to use union of two streams and then key by the field, > but is there better approach to achieve "locality"? > > any inputs will be appreciated > Igor > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Joining-two- > kafka-streams-tp10912.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > >
Re: Event processing time with lateness
thanks Kosta On 3 June 2016 at 16:47, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Igor, > > To handle late events in Flink you would have to implement you own custom > trigger. > > To see a relatively more complex example of such a trigger and how to > implement it, > you can have a look at this implementation: > https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java > > Which implements the trigger described in this article (before the > conclusions section) > http://data-artisans.com/why-apache-beam/ > > Thanks, > Kostas > > On Jun 3, 2016, at 2:55 PM, Igor Berman <igor.ber...@gmail.com> wrote: > > Hi > > according to presentation of Tyler Akidau > https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present > Flink > supports late arrivals for window processing, while I've seen several > question in the userlist regarding late arrivals and answer was - sort of > "not for all usecases" > Can somebody clarify? > > The interesting case for me - I have event processing time, while I want > to aggregate by tumbling window. The events come from kafka and might be > late. Currently we define lateness threshold with watermark (e.g. 5 mins) > > After window triggers I want to save aggregated result at some persistent > storage(redis/hbase) with start timestamp of window > > After this grace period - if I understand correctly - any event won't be > aggregated into existing window, but rather the trigger will call > aggregated function with only 1 element inside(the late one) > > so if my window method saves into persistent storage - it will override > aggregated result with new one that has only 1 element inside > > what I want to achieve - is that late arrival will trigger window method > with all elements (late + all other) so that aggregated result will be > complete > > you can think about use case of page visits counts per minute, while due > to some problems page visit events might arrive late > > thanks in advance > > > >
Event processing time with lateness
Hi according to presentation of Tyler Akidau https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present Flink supports late arrivals for window processing, while I've seen several question in the userlist regarding late arrivals and answer was - sort of "not for all usecases" Can somebody clarify? The interesting case for me - I have event processing time, while I want to aggregate by tumbling window. The events come from kafka and might be late. Currently we define lateness threshold with watermark (e.g. 5 mins) After window triggers I want to save aggregated result at some persistent storage(redis/hbase) with start timestamp of window After this grace period - if I understand correctly - any event won't be aggregated into existing window, but rather the trigger will call aggregated function with only 1 element inside(the late one) so if my window method saves into persistent storage - it will override aggregated result with new one that has only 1 element inside what I want to achieve - is that late arrival will trigger window method with all elements (late + all other) so that aggregated result will be complete you can think about use case of page visits counts per minute, while due to some problems page visit events might arrive late thanks in advance
Re: writing tests for my program
thanks Alexander, I'll take a look On 10 May 2016 at 13:07, lofifncwrote: > Hi, > > Some shameless self promotion: > > You can also checkout: > https://github.com/ottogroup/flink-spector > which has to the goal to remove such hurdles when testing flink programs. > > Best, > Alex > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writing-tests-for-my-program-tp6784p6801.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: writing tests for my program
answering my own question: testing streaming environment should be done with StreamingProgramTestBase & TestStreamEnvironment which are present in test package of flink-streaming-java project so it's not directly available? Project owners, why not to move above two to flink-test-utils? Or I don't understand something? On 9 May 2016 at 19:37, Igor Berman <igor.ber...@gmail.com> wrote: > Any idea how to handle following(the message is clear, but I'm not sure > what I need to do) > I'm opening "generic" environment in my code > (StreamExecutionEnvironment.getExecutionEnvironment()) > > and JavaProgramTestBase configures TestEnvironment... > so what I should do to support custom tests? > > > > > Error: > The LocalStreamEnvironment cannot be used when submitting a program > through a client, or running in a TestEnvironment context. > org.apache.flink.api.common.InvalidProgramException: The > LocalStreamEnvironment cannot be used when submitting a program through a > client, or running in a TestEnvironment context. > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:67) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:57) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1299) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1285) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1271) > at com.x.x.StreamConsumer.configureEnv(StreamConsumer.java:150) > > > The error happens when using > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > in my program > > > test is something like: > > public class StreamConsumerTest extends JavaProgramTestBase { > > @Override > protected void preSubmit() throws Exception { > > } > > @Override > protected void postSubmit() throws Exception { > } > @Override > protected void testProgram() throws Exception { > StreamConsumer.main(new String[] { > "--" + StreamConsumer.CONF_PARAM, "conf/test", > }); > } > > } > >
writing tests for my program
Any idea how to handle following(the message is clear, but I'm not sure what I need to do) I'm opening "generic" environment in my code (StreamExecutionEnvironment.getExecutionEnvironment()) and JavaProgramTestBase configures TestEnvironment... so what I should do to support custom tests? Error: The LocalStreamEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context. org.apache.flink.api.common.InvalidProgramException: The LocalStreamEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context. at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:67) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:57) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1299) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1285) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1271) at com.x.x.StreamConsumer.configureEnv(StreamConsumer.java:150) The error happens when using StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); in my program test is something like: public class StreamConsumerTest extends JavaProgramTestBase { @Override protected void preSubmit() throws Exception { } @Override protected void postSubmit() throws Exception { } @Override protected void testProgram() throws Exception { StreamConsumer.main(new String[] { "--" + StreamConsumer.CONF_PARAM, "conf/test", }); } }
general design questions when using flink
1. Suppose I have stream of different events(A,B,C). Each event will need it's own processing pipeline. what is recommended approach of splitting pipelines per each event? I can do some filter operator at the beginning. I can setup different jobs per each event. I can hold every such event in different topic. 2. Suppose I have some bug in business logic operator. Currently when such operator throws NPE or any other error the whole job fails(something like : unable to forward event to next operator ) So my question is - what is best approach to sink such events to somewhere else so that flink job will be more robust. I can wrap operator of Event-> EventTag into Event->EventTagOrError and then install filter that will filter out Errors into log/special-topic etc but then I'll need this for every operator. another approach that I thought of - might be I can extend map/flatMapFunction to have kind of try-catch wrappers over business logic operator, but then it's not clear how to pass messages that caused such error into some special sink would like to hear what are "best" practices regarding those cases or at least hear some thoughts thanks in advance
Re: s3 checkpointing issue
I think I've had this issue too and fixed it as Ufuk suggested in core-site.xml something like fs.s3a.buffer.dir /tmp On 4 May 2016 at 11:10, Ufuk Celebiwrote: > Hey Chen Qin, > > this seems to be an issue with the S3 file system. The root cause is: > > Caused by: java.lang.NullPointerException at > > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) > at > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) > at > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) > at > org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) > at org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87) > at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at > > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) > ... 25 more > > From [1] it looks like you have to specify > > fs.s3a.buffer.dir > > in the Hadoop configuration (where you set the S3 file system). > > The expected value is a comma separated list of local directories used > to buffer results prior to transmitting the to S3 (for large files). > > Does this fix the issue? Please report back so that we can include in > the "common issues" section of the AWS docs. > > – Ufuk > > [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/ > > > On Wed, May 4, 2016 at 2:41 AM, Chen Qin wrote: > > Hi there, > > > > I run a test job with filestatebackend and save checkpoints on s3 (via > s3a) > > > > The job crash when checkpoint triggered. Looking into s3 directory and > list > > objects. I found the directory is create successfully but all checkpoints > > directory size are empty. > > > > The host running task manager shows following error. > > > > Received error response: > com.amazonaws.services.s3.model.AmazonS3Exception: > > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, > AWS > > Error Code: null, AWS Error Message: Not Found, S3 Extended Request > ID:x > > > > Has anyone met this issue before? > > > > flink 1.0.0 > > scala 2.10 > > hadoop-aws 2.7.2 > > aws-java-sdk 1.7.4 > > > > > > Thanks, > > Chen > > > > Attached full log that shows on web dashboard when job canceled. > > java.lang.RuntimeException: Error triggering a checkpoint as the result > of > > receiving checkpoint barrier at > > > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674) > > at > > > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > > at > > > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > > at > > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) > > at > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at > > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: > Could > > not open output stream for state backend at > > > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498) > > at > > > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444) > > at java.io.DataOutputStream.write(DataOutputStream.java:88) at > > java.io.DataOutputStream.write(DataOutputStream.java:88) at > > org.apache.flink.types.StringValue.writeString(StringValue.java:813) at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) > > at > > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > > at > > > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > > at > > > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78) > > at > > > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27) > > at > > >
Re: Measuring latency in a DataStream
1. why are you doing join instead of something like System.currentTimeInMillis()? at the end you have tuple of your data with timestamp anyways...so why just not to wrap you data in tuple2 with additional info of creation ts? 2. are you sure that consumer/producer machines' clocks are in sync? you can use ntp for this. On 2 May 2016 at 20:02, Robert Schmidtkewrote: > Hi everyone, > > I have implemented a way to measure latency in a DataStream (I hope): I'm > consuming a Kafka topic and I'm union'ing the resulting stream with a > custom source that emits a (machine-local) timestamp every 1000ms (using > currentTimeMillis). On the consuming end I'm distinguishing between the > Kafka events and the timestamps. When encountering a timestamp, I take the > difference of the processing machine's local time and the timestamp found > in the stream, expecting a positive difference (with the processing > machine's timestamp being larger than the timestamp found in the stream). > However, the opposite is the case. Now I am wondering about when events are > actually processed. > > Union the Stream from Kafka+my custom source, batching them in 10s windows > (which is what I do), I expect 10 timestamps with ascending values and a > rough gap of 1000ms in the stream: > > https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68 > > On the receiving end I again take the currentTimeMillis in my fold > function, expecting the resulting value to be larger (most of the time) > than the timestamps encountered in the stream: > > https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53 > > The system clocks are in sync up to 1ms. > > Maybe I am not clear about when certain timestamps are created (i.e. when > the UDFs are invoked) or how windows are processed. Any advice is greatly > appreciated, also alternative approaches to calculating latency. > > I'm on Flink 0.10.2 by the way. > > Thanks in advance for the help! > > Robert > > -- > My GPG Key ID: 336E2680 >
Re: AvroWriter for Rolling sink
[FLINK-3854] Support Avro key-value rolling sink writer #1953 On 27 April 2016 at 19:29, Igor Berman <igor.ber...@gmail.com> wrote: > Hi Aljoscha, > > avro-mapred jar contains different M/R output formats for avro, and their > writers > it's primary used in M/R jobs that produce avro output > see some details here : https://avro.apache.org/docs/1.7.6/mr.html > > I have extracted(kind of copy-pasted+adjustments) some of the classes from > there to remove this dependency and it's seems to work with basic scenario > I still want to write it as if it was created with M/R job to be > compatible with this library(e.g. key-value pairs are wrapped into > AvroKeyValue object) so that it's not important if Flink or regular M/R > created this files, still their consumer can read them in a same way > WDYT? > > > > > > On 27 April 2016 at 11:27, Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi, >> which code did you reuse from there? I asked Robert and I think it is >> somewhat problematic to add these somewhat bigger dependencies. >> >> Cheers, >> Aljoscha >> >> On Mon, 25 Apr 2016 at 21:24 Igor Berman <igor.ber...@gmail.com> wrote: >> >>> Hi, >>> it's not a problem, I'll find time to change it(I understand the >>> refactoring is in master and not released yet). >>> Wanted to ask if it's acceptable to add following dependency to flink? >>> I mean my code reused code in this jar(pay attention it's not present >>> currently in flink classpath) >>> >>> org.apache.avro >>> avro-mapred >>> 1.7.6 >>> hadoop2 >>> >>> >>> On 25 April 2016 at 16:20, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>>> Hi, >>>> the code looks very good! Do you think it can be adapted to the >>>> slightly modified interface introduced here: >>>> https://issues.apache.org/jira/browse/FLINK-3637 >>>> >>>> It basically requires the writer to know the write position, so that we >>>> can truncate to a valid position in case of failure. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Thu, 21 Apr 2016 at 18:40 Igor Berman <igor.ber...@gmail.com> wrote: >>>> >>>>> ok, >>>>> I have working prototype already, if somebody is interested(attached) >>>>> >>>>> I might add it as PR latter(with tests etc) >>>>> >>>>> tested locally & with s3 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> as far as I know there is no one working on this. I'm only aware of >>>>>> someone working on an ORC (from Hive) Writer. >>>>>> >>>>>> This would be a welcome addition! I think you are already on the >>>>>> right track, the only thing required will probably be an AvroFileWriter >>>>>> and >>>>>> you already started looking at SequenceFileWriter, which should be >>>>>> similar. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>> >>>>>> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> Is there such implementation somewhere?(before I start to implement >>>>>>> it myself, it seems not too difficult based on SequenceFileWriter >>>>>>> example) >>>>>>> >>>>>>> anyway any ideas/pointers will be highly appreciated >>>>>>> >>>>>>> thanks in advance >>>>>>> >>>>>>> >>>>> >>> >
Re: AvroWriter for Rolling sink
Hi Aljoscha, avro-mapred jar contains different M/R output formats for avro, and their writers it's primary used in M/R jobs that produce avro output see some details here : https://avro.apache.org/docs/1.7.6/mr.html I have extracted(kind of copy-pasted+adjustments) some of the classes from there to remove this dependency and it's seems to work with basic scenario I still want to write it as if it was created with M/R job to be compatible with this library(e.g. key-value pairs are wrapped into AvroKeyValue object) so that it's not important if Flink or regular M/R created this files, still their consumer can read them in a same way WDYT? On 27 April 2016 at 11:27, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > which code did you reuse from there? I asked Robert and I think it is > somewhat problematic to add these somewhat bigger dependencies. > > Cheers, > Aljoscha > > On Mon, 25 Apr 2016 at 21:24 Igor Berman <igor.ber...@gmail.com> wrote: > >> Hi, >> it's not a problem, I'll find time to change it(I understand the >> refactoring is in master and not released yet). >> Wanted to ask if it's acceptable to add following dependency to flink? >> I mean my code reused code in this jar(pay attention it's not present >> currently in flink classpath) >> >> org.apache.avro >> avro-mapred >> 1.7.6 >> hadoop2 >> >> >> On 25 April 2016 at 16:20, Aljoscha Krettek <aljos...@apache.org> wrote: >> >>> Hi, >>> the code looks very good! Do you think it can be adapted to the slightly >>> modified interface introduced here: >>> https://issues.apache.org/jira/browse/FLINK-3637 >>> >>> It basically requires the writer to know the write position, so that we >>> can truncate to a valid position in case of failure. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 21 Apr 2016 at 18:40 Igor Berman <igor.ber...@gmail.com> wrote: >>> >>>> ok, >>>> I have working prototype already, if somebody is interested(attached) >>>> >>>> I might add it as PR latter(with tests etc) >>>> >>>> tested locally & with s3 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> as far as I know there is no one working on this. I'm only aware of >>>>> someone working on an ORC (from Hive) Writer. >>>>> >>>>> This would be a welcome addition! I think you are already on the right >>>>> track, the only thing required will probably be an AvroFileWriter and you >>>>> already started looking at SequenceFileWriter, which should be similar. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> Is there such implementation somewhere?(before I start to implement >>>>>> it myself, it seems not too difficult based on SequenceFileWriter >>>>>> example) >>>>>> >>>>>> anyway any ideas/pointers will be highly appreciated >>>>>> >>>>>> thanks in advance >>>>>> >>>>>> >>>> >>
Re: AvroWriter for Rolling sink
Hi, it's not a problem, I'll find time to change it(I understand the refactoring is in master and not released yet). Wanted to ask if it's acceptable to add following dependency to flink? I mean my code reused code in this jar(pay attention it's not present currently in flink classpath) org.apache.avro avro-mapred 1.7.6 hadoop2 On 25 April 2016 at 16:20, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > the code looks very good! Do you think it can be adapted to the slightly > modified interface introduced here: > https://issues.apache.org/jira/browse/FLINK-3637 > > It basically requires the writer to know the write position, so that we > can truncate to a valid position in case of failure. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 18:40 Igor Berman <igor.ber...@gmail.com> wrote: > >> ok, >> I have working prototype already, if somebody is interested(attached) >> >> I might add it as PR latter(with tests etc) >> >> tested locally & with s3 >> >> >> >> >> >> >> >> On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> wrote: >> >>> Hi, >>> as far as I know there is no one working on this. I'm only aware of >>> someone working on an ORC (from Hive) Writer. >>> >>> This would be a welcome addition! I think you are already on the right >>> track, the only thing required will probably be an AvroFileWriter and you >>> already started looking at SequenceFileWriter, which should be similar. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> wrote: >>> >>>> Hi All, >>>> Is there such implementation somewhere?(before I start to implement it >>>> myself, it seems not too difficult based on SequenceFileWriter example) >>>> >>>> anyway any ideas/pointers will be highly appreciated >>>> >>>> thanks in advance >>>> >>>> >>
Re: AvroWriter for Rolling sink
ok, I have working prototype already, if somebody is interested(attached) I might add it as PR latter(with tests etc) tested locally & with s3 On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > as far as I know there is no one working on this. I'm only aware of > someone working on an ORC (from Hive) Writer. > > This would be a welcome addition! I think you are already on the right > track, the only thing required will probably be an AvroFileWriter and you > already started looking at SequenceFileWriter, which should be similar. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> wrote: > >> Hi All, >> Is there such implementation somewhere?(before I start to implement it >> myself, it seems not too difficult based on SequenceFileWriter example) >> >> anyway any ideas/pointers will be highly appreciated >> >> thanks in advance >> >> package org.apache.flink.streaming.connectors.fs.avro; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.io.IOException; import java.util.Map; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.generic.GenericData; import org.apache.avro.hadoop.file.HadoopCodecFactory; import org.apache.avro.hadoop.io.AvroDatumConverter; import org.apache.avro.hadoop.io.AvroDatumConverterFactory; import org.apache.avro.hadoop.io.AvroSerialization; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyValueRecordWriter; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.mapred.JobConf; /** * Implementation of AvroKeyValue writer that can be used in Sink. * You'll need dependency(pay attention to classifier, it works only for hadoop2) {@code first thing to add avro mapred dependency org.apache.avro avro-mapred 1.7.6 hadoop2 } And then: {@code RollingSink<Tuple2<AvroKey , AvroValue>> sink = new RollingSink<Tuple2<AvroKey , AvroValue>>("/tmp/path"); sink.setBucketer(new DateTimeBucketer("-MM-dd-HH-mm")); Map<String,String> properties = new HashMap<>(); Schema longSchema = Schema.create(Type.LONG); String keySchema = longSchema.toString(); properties.put("avro.schema.output.key", keySchema); String valueSchema = longSchema.toString(); properties.put("avro.schema.output.value", valueSchema); properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true)); properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName()); sink.setWriter(new AvroSinkWriter<AvroKey , AvroValue>(properties)); sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB } to test with s3: {@code create core-site.xml(I haven't other way to test locally) fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3a.access.key xxx fs.s3a.secret.key yyy fs.s3a.buffer.dir /tmp and add following dependencies(not sure what is best option here): org.apache.hadoop hadoop-aws 2.7.0 provided guava com.google.guava } */ public class AvroSinkWriter<K, V> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { private static final long serialVersionUID = 1L; private transient FSDataOutputStream outputStream; private transient AvroKeyValueRecordWriter<K, V> writer; private Class keyClass; private Class valueClass; private final Map<String, String> properties; /** * C'tor for the writer * * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) * @param propert
Re: providing java system arguments(-D) to specific job
for the sake of history(at task manager level): in conf/flink-conf.yaml env.java.opts: -Dmy-prop=bla -Dmy-prop2=bla2 On 17 April 2016 at 16:25, Igor Berman <igor.ber...@gmail.com> wrote: > How do I provide java arguments while submitting job? Suppose I have some > legacy component that is dependent on java argument configuration. > > I suppose Flink reuses same jvm for all jobs, so in general I can start > task manager with desired arguments, but then all my jobs can't have > different system arguments. > > any suggestions? > > > > > >
providing java system arguments(-D) to specific job
How do I provide java arguments while submitting job? Suppose I have some legacy component that is dependent on java argument configuration. I suppose Flink reuses same jvm for all jobs, so in general I can start task manager with desired arguments, but then all my jobs can't have different system arguments. any suggestions?
Re: Accessing StateBackend snapshots outside of Flink
thanks a lot for the info, seems not too complex I'll try to write simple tool to read this state. Aljoscha, does the key reflects unique id of operator in some way? Or key is just a "name" that passed to ValueStateDescriptor. thanks in advance On 15 April 2016 at 15:10, Stephan Ewenwrote: > One thing to add is that you can always trigger a persistent checkpoint > via the "savepoints" feature: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html > > > > On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek > wrote: > >> Hi, >> for RocksDB we simply use a TypeSerializer to serialize the key and value >> to a byte[] array and store that in RocksDB. For a ListState, we serialize >> the individual elements using a TypeSerializer and store them in a >> comma-separated list in RocksDB. The snapshots of RocksDB that we write to >> HDFS are regular backups of a RocksDB database, as described here: >> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You >> should be possible to read them from HDFS and restore them to a RocksDB >> data base as described in the linked documentation. >> >> tl;dr As long as you know the type of values stored in the state you >> should be able to read them from RocksDB and deserialize the values using >> TypeSerializer. >> >> One more bit of information: Internally the state is keyed by (key, >> namespace) -> value where namespace can be an arbitrary type that has a >> TypeSerializer. We use this to store window state that is both local to key >> and the current window. For state that you store in a user-defined function >> the namespace will always be null and that will be serialized by a >> VoidSerializer that simply always writes a "0" byte. >> >> Cheers, >> Aljoscha >> >> On Fri, 15 Apr 2016 at 00:18 igor.berman wrote: >> >>> Hi, >>> we are evaluating Flink for new solution and several people raised >>> concern >>> of coupling too much to Flink - >>> 1. we understand that if we want to get full fault tolerance and best >>> performance we'll need to use Flink managed state(probably RocksDB >>> backend >>> due to volume of state) >>> 2. but then if we latter find that Flink doesn't answer our needs(for any >>> reason) - we'll need to extract this state in some way(since it's the >>> only >>> source of consistent state) >>> In general I'd like to be able to take snapshot of backend and try to >>> read >>> it...do you think it's will be trivial task? >>> say If I'm holding list state per partitioned key, would it be easy to >>> take >>> RocksDb file and open it? >>> >>> any thoughts regarding how can I convince people in our team? >>> >>> thanks in advance! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive at Nabble.com. >>> >> >
Re: Flink event processing immediate feedback
Yes, indeed this is direction we are trying currently thanks On 14 April 2016 at 18:31, Aljoscha Krettek <aljos...@apache.org> wrote: > Could still be, as I described it by using a message queue to do the > communication between Flink and the front end. > > On Thu, 14 Apr 2016 at 17:30 Igor Berman <igor.ber...@gmail.com> wrote: > >> Hi Aljoscha, >> thanks for the response >> >> Synchronous - in our case means that request by end-client to >> frontend(say some REST call) needs to wait until processing in >> backend(Flink) is done and should return response(e.g. alert) back to >> end-client(i.e. end-client -> frontend -> kafka-> flink) >> those request are minority and will be defined by different type of >> request to frontend or different parameters >> >> Might be Flink is not best option for this use case? >> >> Igor >> >> >> >> >> >> On 14 April 2016 at 16:37, Aljoscha Krettek <aljos...@apache.org> wrote: >> >>> Hi, >>> what do you mean by "synchronous". If I understood it correctly then >>> some events entering the Flink pipeline would trigger an alert while some >>> others would not trigger an alert. How would the component that receives >>> such alerts know when to wait and when to don't wait. >>> >>> As I see it you can push these alerts into some sort of message queue >>> and have a request in the front-end that returns as soon as something is >>> available in that queue. Then you display it and start a new query that >>> returns as soon as more data is available in the alert queue. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 7 Apr 2016 at 09:57 igor.berman <igor.ber...@gmail.com> wrote: >>> >>>> Hi, >>>> Suppose I have web facing frontend that gets stream of events(http >>>> calls). I >>>> need to process event stream and do some aggregations over those events >>>> and >>>> write aggregated statistics to Hbase - so far Flink seems as perfect >>>> match. >>>> However in some cases event should trigger some alert and frontend >>>> needs to >>>> get this alert in synchronous way - here I'm a bit lost. I thought about >>>> some kind of following flow: >>>> frontend -> queue -> flink -> redis(pub/sub)<- frontend >>>> >>>> I.e. I have two major use cases - async aggregated analytics/stats >>>> computing >>>> and "synchronous" response to frontend. Frontend might be node/play or >>>> any >>>> other technology that won't have a problem of "waiting" for the >>>> response, so >>>> the only question - how to implement this feedback ? >>>> Might be some kind of Sink? >>>> >>>> Any ideas would be appreciated, >>>> Igor >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-event-processing-immediate-feedback-tp5978.html >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive at Nabble.com. >>>> >>> >>
Re: Flink event processing immediate feedback
Hi Aljoscha, thanks for the response Synchronous - in our case means that request by end-client to frontend(say some REST call) needs to wait until processing in backend(Flink) is done and should return response(e.g. alert) back to end-client(i.e. end-client -> frontend -> kafka-> flink) those request are minority and will be defined by different type of request to frontend or different parameters Might be Flink is not best option for this use case? Igor On 14 April 2016 at 16:37, Aljoscha Krettekwrote: > Hi, > what do you mean by "synchronous". If I understood it correctly then some > events entering the Flink pipeline would trigger an alert while some others > would not trigger an alert. How would the component that receives such > alerts know when to wait and when to don't wait. > > As I see it you can push these alerts into some sort of message queue and > have a request in the front-end that returns as soon as something is > available in that queue. Then you display it and start a new query that > returns as soon as more data is available in the alert queue. > > Cheers, > Aljoscha > > On Thu, 7 Apr 2016 at 09:57 igor.berman wrote: > >> Hi, >> Suppose I have web facing frontend that gets stream of events(http >> calls). I >> need to process event stream and do some aggregations over those events >> and >> write aggregated statistics to Hbase - so far Flink seems as perfect >> match. >> However in some cases event should trigger some alert and frontend needs >> to >> get this alert in synchronous way - here I'm a bit lost. I thought about >> some kind of following flow: >> frontend -> queue -> flink -> redis(pub/sub)<- frontend >> >> I.e. I have two major use cases - async aggregated analytics/stats >> computing >> and "synchronous" response to frontend. Frontend might be node/play or any >> other technology that won't have a problem of "waiting" for the response, >> so >> the only question - how to implement this feedback ? >> Might be some kind of Sink? >> >> Any ideas would be appreciated, >> Igor >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-event-processing-immediate-feedback-tp5978.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> >
Re: [ANNOUNCE] Flink 1.0.0 has been released
Congratulations! Very nice work, very interesting features. One question regarding CEP: do you think it's feasible to define pattern over window of 1 month or even more? Is there some deep explanation regarding how this partial states are saved? I mean events that create "funnel" might be separated by very large periods of inactivity/noise On 8 March 2016 at 17:17, Kostas Tzoumaswrote: > Hi everyone! > > As you might have noticed, Apache Flink 1.0.0 has been released and > announced! > > You can read more about the release at the ASF blog and the Flink blog > - > https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88 > - http://flink.apache.org/news/2016/03/08/release-1.0.0.html > > Don't forget to retweet and spread the news :-) > - https://twitter.com/TheASF/status/707174116969857024 > - https://twitter.com/ApacheFlink/status/707175973482012672 > > Check out the changelog and the migration guide, download the release, and > check out the documentation > - http://flink.apache.org/blog/release_1.0.0-changelog_known_issues.html > - > https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x > - https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations > - http://flink.apache.org/downloads.html > - https://ci.apache.org/projects/flink/flink-docs-release-1.0/ > > Many congratulations to the Flink community for making this happen! > > Best, > Kostas >