Re: Joining two kafka streams

2017-01-09 Thread Igor Berman
Hi Tzu-Li,
Huge thanks for the input, I'll try to implement prototype of your idea and
see if it answers my requirements


On 9 January 2017 at 08:02, Tzu-Li (Gordon) Tai  wrote:

> Hi Igor!
>
> What you can actually do is let a single FlinkKafkaConsumer consume from
> both topics, producing a single DataStream which you can keyBy afterwards.
> All versions of the FlinkKafkaConsumer support consuming multiple Kafka
> topics simultaneously. This is logically the same as union and then a
> keyBy, like what you described.
>
> Note that this approach requires that the records in both of your Kafka
> topics are of the same type when consumed into Flink (ex., same POJO
> classes, or simply both as Strings, etc.).
> If that isn’t possible and you have different data types / schemas for the
> topics, you’d probably need to use “connect” and then a keyBy.
>
> If you’re applying a window directly after joining the two topic streams,
> you could also use a window join:
>
> dataStream.join(otherStream)
> .where().equalTo()
> .window(TumblingEventTimeWindows.of(Time.seconds(3)))
> .apply (new JoinFunction () {...});
>
> The “where” specifies how to select the key from the first stream, and
> “equalTo” the second one.
>
> Hope this helps, let me know if you have other questions!
>
> Cheers,
> Gordon
>
> On January 9, 2017 at 4:06:34 AM, igor.berman (igor.ber...@gmail.com)
> wrote:
>
> Hi,
> I have usecase when I need to join two kafka topics together by some
> fields.
> In general, I could put content of one topic into another, and partition
> by
> same key, but I can't touch those two topics(i.e. there are other
> consumers
> from those topics), on the other hand it's essential to process same keys
> at
> same "thread" to achieve locality and not to get races when working with
> same key from different machines/threads
>
> my idea is to use union of two streams and then key by the field,
> but is there better approach to achieve "locality"?
>
> any inputs will be appreciated
> Igor
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Joining-two-
> kafka-streams-tp10912.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>


Re: Event processing time with lateness

2016-06-03 Thread Igor Berman
thanks Kosta

On 3 June 2016 at 16:47, Kostas Kloudas <k.klou...@data-artisans.com> wrote:

> Hi Igor,
>
> To handle late events in Flink you would have to implement you own custom
> trigger.
>
> To see a relatively more complex example of such a trigger and how to
> implement it,
> you can have a look at this implementation:
> https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>
> Which implements the trigger described in this article (before the
> conclusions section)
> http://data-artisans.com/why-apache-beam/
>
> Thanks,
> Kostas
>
> On Jun 3, 2016, at 2:55 PM, Igor Berman <igor.ber...@gmail.com> wrote:
>
> Hi
>
> according to presentation of Tyler Akidau
> https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present
>  Flink
> supports late arrivals for window processing, while I've seen several
> question in the userlist regarding late arrivals and answer was - sort of
> "not for all usecases"
> Can somebody clarify?
>
> The interesting case for me - I have event processing time, while I want
> to aggregate by tumbling window. The events come from kafka and might be
> late. Currently we define lateness threshold with watermark (e.g. 5 mins)
>
> After window triggers I want to save aggregated result at some persistent
> storage(redis/hbase) with start timestamp of window
>
> After this grace period - if I understand correctly - any event won't be
> aggregated into existing window, but rather the trigger will call
> aggregated function with only 1 element inside(the late one)
>
> so if my window method saves into persistent storage - it will override
> aggregated result with new one that has only 1 element inside
>
> what I want to achieve - is that late arrival will trigger window method
> with all elements (late + all other) so that aggregated result will be
> complete
>
> you can think about use case of page visits counts per minute, while due
> to some problems page visit events might arrive late
>
> thanks in advance
>
>
>
>


Event processing time with lateness

2016-06-03 Thread Igor Berman
Hi

according to presentation of Tyler Akidau
https://docs.google.com/presentation/d/13YZy2trPugC8Zr9M8_TfSApSCZBGUDZdzi-WUz95JJw/present
Flink
supports late arrivals for window processing, while I've seen several
question in the userlist regarding late arrivals and answer was - sort of
"not for all usecases"
Can somebody clarify?

The interesting case for me - I have event processing time, while I want to
aggregate by tumbling window. The events come from kafka and might be late.
Currently we define lateness threshold with watermark (e.g. 5 mins)

After window triggers I want to save aggregated result at some persistent
storage(redis/hbase) with start timestamp of window

After this grace period - if I understand correctly - any event won't be
aggregated into existing window, but rather the trigger will call
aggregated function with only 1 element inside(the late one)

so if my window method saves into persistent storage - it will override
aggregated result with new one that has only 1 element inside

what I want to achieve - is that late arrival will trigger window method
with all elements (late + all other) so that aggregated result will be
complete

you can think about use case of page visits counts per minute, while due to
some problems page visit events might arrive late

thanks in advance


Re: writing tests for my program

2016-05-10 Thread Igor Berman
thanks Alexander, I'll take a look


On 10 May 2016 at 13:07, lofifnc  wrote:

> Hi,
>
> Some shameless self promotion:
>
> You can also checkout:
> https://github.com/ottogroup/flink-spector
> which has to the goal to remove such hurdles when testing flink programs.
>
> Best,
> Alex
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writing-tests-for-my-program-tp6784p6801.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: writing tests for my program

2016-05-09 Thread Igor Berman
answering my own question:
testing streaming environment should be done with StreamingProgramTestBase
& TestStreamEnvironment which are present in test package
of flink-streaming-java project so it's not directly available?

Project owners, why not to move above two to flink-test-utils? Or I don't
understand something?



On 9 May 2016 at 19:37, Igor Berman <igor.ber...@gmail.com> wrote:

> Any idea how to handle following(the message is clear, but I'm not sure
> what I need to do)
> I'm opening "generic" environment in my code
> (StreamExecutionEnvironment.getExecutionEnvironment())
>
> and JavaProgramTestBase configures TestEnvironment...
> so what I should do to support custom tests?
>
>
>
>
> Error:
> The LocalStreamEnvironment cannot be used when submitting a program
> through a client, or running in a TestEnvironment context.
> org.apache.flink.api.common.InvalidProgramException: The
> LocalStreamEnvironment cannot be used when submitting a program through a
> client, or running in a TestEnvironment context.
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:67)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:57)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1299)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1285)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1271)
> at com.x.x.StreamConsumer.configureEnv(StreamConsumer.java:150)
>
>
> The error happens when using
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> in my program
>
>
> test is something like:
>
> public class StreamConsumerTest extends JavaProgramTestBase {
>
> @Override
> protected void preSubmit() throws Exception {
>
> }
>
> @Override
> protected void postSubmit() throws Exception {
> }
> @Override
> protected void testProgram() throws Exception {
> StreamConsumer.main(new String[] {
> "--" + StreamConsumer.CONF_PARAM, "conf/test",
> });
> }
>
> }
>
>


writing tests for my program

2016-05-09 Thread Igor Berman
Any idea how to handle following(the message is clear, but I'm not sure
what I need to do)
I'm opening "generic" environment in my code
(StreamExecutionEnvironment.getExecutionEnvironment())

and JavaProgramTestBase configures TestEnvironment...
so what I should do to support custom tests?




Error:
The LocalStreamEnvironment cannot be used when submitting a program through
a client, or running in a TestEnvironment context.
org.apache.flink.api.common.InvalidProgramException: The
LocalStreamEnvironment cannot be used when submitting a program through a
client, or running in a TestEnvironment context.
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:67)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.(LocalStreamEnvironment.java:57)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1299)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.java:1285)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1271)
at com.x.x.StreamConsumer.configureEnv(StreamConsumer.java:150)


The error happens when using
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
in my program


test is something like:

public class StreamConsumerTest extends JavaProgramTestBase {

@Override
protected void preSubmit() throws Exception {

}

@Override
protected void postSubmit() throws Exception {
}
@Override
protected void testProgram() throws Exception {
StreamConsumer.main(new String[] {
"--" + StreamConsumer.CONF_PARAM, "conf/test",
});
}

}


general design questions when using flink

2016-05-06 Thread Igor Berman
1. Suppose I have stream of different events(A,B,C). Each event will need
it's own processing pipeline.
what is recommended approach of splitting pipelines per each event? I can
do some filter operator at the beginning. I can setup different jobs per
each event. I can hold every such event in different topic.

2. Suppose I have some bug in business logic operator. Currently when such
operator throws NPE or any other error the whole job fails(something like :
unable to forward event to next operator )
So my question is - what is best approach to sink such events to somewhere
else so that flink job will be more robust. I can wrap operator of Event->
EventTag into Event->EventTagOrError and then install filter that will
filter out Errors into log/special-topic etc but then I'll need this for
every operator.
another approach that I thought of - might be I can extend
map/flatMapFunction to have kind of try-catch wrappers over business logic
operator, but then it's not clear how to pass messages that caused such
error into some special sink

would like to hear what are "best" practices regarding those cases or at
least hear some thoughts

thanks in advance


Re: s3 checkpointing issue

2016-05-04 Thread Igor Berman
I think I've had this issue too and fixed it as Ufuk suggested
in core-site.xml

something like

fs.s3a.buffer.dir
/tmp



On 4 May 2016 at 11:10, Ufuk Celebi  wrote:

> Hey Chen Qin,
>
> this seems to be an issue with the S3 file system. The root cause is:
>
>  Caused by: java.lang.NullPointerException at
>
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
> at
> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
>
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
> ... 25 more
>
> From [1] it looks like you have to specify
>
> fs.s3a.buffer.dir
>
> in the Hadoop configuration (where you set the S3 file system).
>
> The expected value is a comma separated list of local directories used
> to buffer results prior to transmitting the to S3 (for large files).
>
> Does this fix the issue? Please report back so that we can include in
> the "common issues" section of the AWS docs.
>
> – Ufuk
>
> [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
>
>
> On Wed, May 4, 2016 at 2:41 AM, Chen Qin  wrote:
> > Hi there,
> >
> > I run a test job with filestatebackend and save checkpoints on s3 (via
> s3a)
> >
> > The job crash when checkpoint triggered. Looking into s3 directory and
> list
> > objects. I found the directory is create successfully but all checkpoints
> > directory size are empty.
> >
> > The host running task manager shows following error.
> >
> > Received error response:
> com.amazonaws.services.s3.model.AmazonS3Exception:
> > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549,
> AWS
> > Error Code: null, AWS Error Message: Not Found, S3 Extended Request
> ID:x
> >
> > Has anyone met this issue before?
> >
> > flink 1.0.0
> > scala 2.10
> > hadoop-aws 2.7.2
> > aws-java-sdk 1.7.4
> >
> >
> > Thanks,
> > Chen
> >
> > Attached full log that shows on web dashboard when job canceled.
> > java.lang.RuntimeException: Error triggering a checkpoint as the result
> of
> > receiving checkpoint barrier at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
> > at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> > at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> > at
> >
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
> > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException:
> Could
> > not open output stream for state backend at
> >
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
> > at
> >
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
> > at java.io.DataOutputStream.write(DataOutputStream.java:88) at
> > java.io.DataOutputStream.write(DataOutputStream.java:88) at
> > org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
> >
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> > at
> >
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> > at
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> > at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
> > at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
> > at
> >
> 

Re: Measuring latency in a DataStream

2016-05-02 Thread Igor Berman
1. why are you doing join instead of something like
System.currentTimeInMillis()? at the end you have tuple of your data with
timestamp anyways...so why just not to wrap you data in tuple2 with
additional info of creation ts?

2. are you sure that consumer/producer machines' clocks are in sync?
you can use ntp for this.

On 2 May 2016 at 20:02, Robert Schmidtke  wrote:

> Hi everyone,
>
> I have implemented a way to measure latency in a DataStream (I hope): I'm
> consuming a Kafka topic and I'm union'ing the resulting stream with a
> custom source that emits a (machine-local) timestamp every 1000ms (using
> currentTimeMillis). On the consuming end I'm distinguishing between the
> Kafka events and the timestamps. When encountering a timestamp, I take the
> difference of the processing machine's local time and the timestamp found
> in the stream, expecting a positive difference (with the processing
> machine's timestamp being larger than the timestamp found in the stream).
> However, the opposite is the case. Now I am wondering about when events are
> actually processed.
>
> Union the Stream from Kafka+my custom source, batching them in 10s windows
> (which is what I do), I expect 10 timestamps with ascending values and a
> rough gap of 1000ms in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>
> On the receiving end I again take the currentTimeMillis in my fold
> function, expecting the resulting value to be larger (most of the time)
> than the timestamps encountered in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>
> The system clocks are in sync up to 1ms.
>
> Maybe I am not clear about when certain timestamps are created (i.e. when
> the UDFs are invoked) or how windows are processed. Any advice is greatly
> appreciated, also alternative approaches to calculating latency.
>
> I'm on Flink 0.10.2 by the way.
>
> Thanks in advance for the help!
>
> Robert
>
> --
> My GPG Key ID: 336E2680
>


Re: AvroWriter for Rolling sink

2016-04-30 Thread Igor Berman
[FLINK-3854] Support Avro key-value rolling sink writer #1953

On 27 April 2016 at 19:29, Igor Berman <igor.ber...@gmail.com> wrote:

> Hi Aljoscha,
>
> avro-mapred jar contains different M/R output formats for avro, and their
> writers
> it's primary used in M/R jobs that produce avro output
> see some details here : https://avro.apache.org/docs/1.7.6/mr.html
>
> I have extracted(kind of copy-pasted+adjustments) some of the classes from
> there to remove this dependency and it's seems to work with basic scenario
> I still want to write it as if it was created with M/R job to be
> compatible with this library(e.g. key-value pairs are wrapped into
> AvroKeyValue object) so that it's not important if Flink or regular M/R
> created this files, still their consumer can read them in a same way
> WDYT?
>
>
>
>
>
> On 27 April 2016 at 11:27, Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Hi,
>> which code did you reuse from there? I asked Robert and I think it is
>> somewhat problematic to add these somewhat bigger dependencies.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 25 Apr 2016 at 21:24 Igor Berman <igor.ber...@gmail.com> wrote:
>>
>>> Hi,
>>> it's not a problem, I'll find time to change it(I understand the
>>> refactoring is in master and not released yet).
>>> Wanted to ask if it's acceptable to add following dependency to flink?
>>> I mean my code reused code in this jar(pay attention it's not present
>>> currently in flink classpath)
>>> 
>>> org.apache.avro
>>> avro-mapred
>>> 1.7.6
>>> hadoop2
>>> 
>>>
>>> On 25 April 2016 at 16:20, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>>> Hi,
>>>> the code looks very good! Do you think it can be adapted to the
>>>> slightly modified interface introduced here:
>>>> https://issues.apache.org/jira/browse/FLINK-3637
>>>>
>>>> It basically requires the writer to know the write position, so that we
>>>> can truncate to a valid position in case of failure.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 18:40 Igor Berman <igor.ber...@gmail.com> wrote:
>>>>
>>>>> ok,
>>>>> I have working prototype already, if somebody is interested(attached)
>>>>>
>>>>> I might add it as PR latter(with tests etc)
>>>>>
>>>>> tested locally & with s3
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> as far as I know there is no one working on this. I'm only aware of
>>>>>> someone working on an ORC (from Hive) Writer.
>>>>>>
>>>>>> This would be a welcome addition! I think you are already on the
>>>>>> right track, the only thing required will probably be an AvroFileWriter 
>>>>>> and
>>>>>> you already started looking at SequenceFileWriter, which should be 
>>>>>> similar.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>> Is there such implementation somewhere?(before I start to implement
>>>>>>> it myself, it seems not too difficult based on SequenceFileWriter 
>>>>>>> example)
>>>>>>>
>>>>>>> anyway any ideas/pointers will be highly appreciated
>>>>>>>
>>>>>>> thanks in advance
>>>>>>>
>>>>>>>
>>>>>
>>>
>


Re: AvroWriter for Rolling sink

2016-04-27 Thread Igor Berman
Hi Aljoscha,

avro-mapred jar contains different M/R output formats for avro, and their
writers
it's primary used in M/R jobs that produce avro output
see some details here : https://avro.apache.org/docs/1.7.6/mr.html

I have extracted(kind of copy-pasted+adjustments) some of the classes from
there to remove this dependency and it's seems to work with basic scenario
I still want to write it as if it was created with M/R job to be compatible
with this library(e.g. key-value pairs are wrapped into AvroKeyValue
object) so that it's not important if Flink or regular M/R created this
files, still their consumer can read them in a same way
WDYT?





On 27 April 2016 at 11:27, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> which code did you reuse from there? I asked Robert and I think it is
> somewhat problematic to add these somewhat bigger dependencies.
>
> Cheers,
> Aljoscha
>
> On Mon, 25 Apr 2016 at 21:24 Igor Berman <igor.ber...@gmail.com> wrote:
>
>> Hi,
>> it's not a problem, I'll find time to change it(I understand the
>> refactoring is in master and not released yet).
>> Wanted to ask if it's acceptable to add following dependency to flink?
>> I mean my code reused code in this jar(pay attention it's not present
>> currently in flink classpath)
>> 
>> org.apache.avro
>> avro-mapred
>> 1.7.6
>> hadoop2
>> 
>>
>> On 25 April 2016 at 16:20, Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>>> Hi,
>>> the code looks very good! Do you think it can be adapted to the slightly
>>> modified interface introduced here:
>>> https://issues.apache.org/jira/browse/FLINK-3637
>>>
>>> It basically requires the writer to know the write position, so that we
>>> can truncate to a valid position in case of failure.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 18:40 Igor Berman <igor.ber...@gmail.com> wrote:
>>>
>>>> ok,
>>>> I have working prototype already, if somebody is interested(attached)
>>>>
>>>> I might add it as PR latter(with tests etc)
>>>>
>>>> tested locally & with s3
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> as far as I know there is no one working on this. I'm only aware of
>>>>> someone working on an ORC (from Hive) Writer.
>>>>>
>>>>> This would be a welcome addition! I think you are already on the right
>>>>> track, the only thing required will probably be an AvroFileWriter and you
>>>>> already started looking at SequenceFileWriter, which should be similar.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>> Is there such implementation somewhere?(before I start to implement
>>>>>> it myself, it seems not too difficult based on SequenceFileWriter 
>>>>>> example)
>>>>>>
>>>>>> anyway any ideas/pointers will be highly appreciated
>>>>>>
>>>>>> thanks in advance
>>>>>>
>>>>>>
>>>>
>>


Re: AvroWriter for Rolling sink

2016-04-25 Thread Igor Berman
Hi,
it's not a problem, I'll find time to change it(I understand the
refactoring is in master and not released yet).
Wanted to ask if it's acceptable to add following dependency to flink?
I mean my code reused code in this jar(pay attention it's not present
currently in flink classpath)

org.apache.avro
avro-mapred
1.7.6
hadoop2


On 25 April 2016 at 16:20, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> the code looks very good! Do you think it can be adapted to the slightly
> modified interface introduced here:
> https://issues.apache.org/jira/browse/FLINK-3637
>
> It basically requires the writer to know the write position, so that we
> can truncate to a valid position in case of failure.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 18:40 Igor Berman <igor.ber...@gmail.com> wrote:
>
>> ok,
>> I have working prototype already, if somebody is interested(attached)
>>
>> I might add it as PR latter(with tests etc)
>>
>> tested locally & with s3
>>
>>
>>
>>
>>
>>
>>
>> On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>>> Hi,
>>> as far as I know there is no one working on this. I'm only aware of
>>> someone working on an ORC (from Hive) Writer.
>>>
>>> This would be a welcome addition! I think you are already on the right
>>> track, the only thing required will probably be an AvroFileWriter and you
>>> already started looking at SequenceFileWriter, which should be similar.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>> Is there such implementation somewhere?(before I start to implement it
>>>> myself, it seems not too difficult based on SequenceFileWriter example)
>>>>
>>>> anyway any ideas/pointers will be highly appreciated
>>>>
>>>> thanks in advance
>>>>
>>>>
>>


Re: AvroWriter for Rolling sink

2016-04-21 Thread Igor Berman
ok,
I have working prototype already, if somebody is interested(attached)

I might add it as PR latter(with tests etc)

tested locally & with s3







On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> as far as I know there is no one working on this. I'm only aware of
> someone working on an ORC (from Hive) Writer.
>
> This would be a welcome addition! I think you are already on the right
> track, the only thing required will probably be an AvroFileWriter and you
> already started looking at SequenceFileWriter, which should be similar.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> wrote:
>
>> Hi All,
>> Is there such implementation somewhere?(before I start to implement it
>> myself, it seems not too difficult based on SequenceFileWriter example)
>>
>> anyway any ideas/pointers will be highly appreciated
>>
>> thanks in advance
>>
>>
package org.apache.flink.streaming.connectors.fs.avro;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.Map;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericData;
import org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueRecordWriter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapred.JobConf;

/**
 * Implementation of AvroKeyValue writer that can be used in Sink.
 * 
 You'll need dependency(pay attention to classifier, it works only for hadoop2)
 
 {@code
 
 first thing to add avro mapred dependency
	
		org.apache.avro
		avro-mapred
		1.7.6
		hadoop2
	
}
		
 And then:
  
 {@code
 RollingSink<Tuple2<AvroKey , AvroValue>> sink = new RollingSink<Tuple2<AvroKey , AvroValue>>("/tmp/path");
 sink.setBucketer(new DateTimeBucketer("-MM-dd-HH-mm"));
 Map<String,String> properties = new HashMap<>();
 Schema longSchema = Schema.create(Type.LONG);
 String keySchema = longSchema.toString();
 properties.put("avro.schema.output.key", keySchema);
 String valueSchema = longSchema.toString();
 properties.put("avro.schema.output.value", valueSchema);
 properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true));
 properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());

 sink.setWriter(new AvroSinkWriter<AvroKey , AvroValue>(properties));
 sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB
 }
 
 
 to test with s3:

{@code
	create core-site.xml(I haven't other way to test locally)	

	
	  fs.s3.impl
	  org.apache.hadoop.fs.s3a.S3AFileSystem
	
	
	  fs.s3a.access.key
	  xxx
	
	
	
	  fs.s3a.secret.key
	  yyy
	
	
	
		
		fs.s3a.buffer.dir
		/tmp
	



and add following dependencies(not sure what is best option here):
		
			org.apache.hadoop
			hadoop-aws
			2.7.0
			provided
			

	guava
	com.google.guava

			
		

 }
 
 */
public class AvroSinkWriter<K, V> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
	private static final long serialVersionUID = 1L;

	private transient FSDataOutputStream outputStream;

	private transient AvroKeyValueRecordWriter<K, V> writer;

	private Class keyClass;

	private Class valueClass;

	private final Map<String, String> properties;

	/**
	 * C'tor for the writer
	 * 
	 * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
	 * @param propert

Re: providing java system arguments(-D) to specific job

2016-04-17 Thread Igor Berman
for the sake of history(at task manager level):
in conf/flink-conf.yaml
env.java.opts: -Dmy-prop=bla -Dmy-prop2=bla2


On 17 April 2016 at 16:25, Igor Berman <igor.ber...@gmail.com> wrote:

> How do I provide java arguments while submitting job? Suppose I have some
> legacy component that is dependent on java argument configuration.
>
> I suppose Flink reuses same jvm for all jobs, so in general I can start
> task manager with desired arguments, but then all my jobs can't have
> different system arguments.
>
> any suggestions?
>
>
>
>
>
>


providing java system arguments(-D) to specific job

2016-04-17 Thread Igor Berman
How do I provide java arguments while submitting job? Suppose I have some
legacy component that is dependent on java argument configuration.

I suppose Flink reuses same jvm for all jobs, so in general I can start
task manager with desired arguments, but then all my jobs can't have
different system arguments.

any suggestions?


Re: Accessing StateBackend snapshots outside of Flink

2016-04-16 Thread Igor Berman
thanks a lot for the info, seems not too complex
I'll try to write simple tool to read this state.

Aljoscha, does the key reflects unique id of operator in some way? Or key
is just a "name" that passed to ValueStateDescriptor.

thanks in advance


On 15 April 2016 at 15:10, Stephan Ewen  wrote:

> One thing to add is that you can always trigger a persistent checkpoint
> via the "savepoints" feature:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>
>
>
> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> for RocksDB we simply use a TypeSerializer to serialize the key and value
>> to a byte[] array and store that in RocksDB. For a ListState, we serialize
>> the individual elements using a TypeSerializer and store them in a
>> comma-separated list in RocksDB. The snapshots of RocksDB that we write to
>> HDFS are regular backups of a RocksDB database, as described here:
>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
>> should be possible to read them from HDFS and restore them to a RocksDB
>> data base as described in the linked documentation.
>>
>> tl;dr As long as you know the type of values stored in the state you
>> should be able to read them from RocksDB and deserialize the values using
>> TypeSerializer.
>>
>> One more bit of information: Internally the state is keyed by (key,
>> namespace) -> value where namespace can be an arbitrary type that has a
>> TypeSerializer. We use this to store window state that is both local to key
>> and the current window. For state that you store in a user-defined function
>> the namespace will always be null and that will be serialized by a
>> VoidSerializer that simply always writes a "0" byte.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 15 Apr 2016 at 00:18 igor.berman  wrote:
>>
>>> Hi,
>>> we are evaluating Flink for new solution and several people raised
>>> concern
>>> of coupling too much to Flink -
>>> 1. we understand that if we want to get full fault tolerance and best
>>> performance we'll need to use Flink managed state(probably RocksDB
>>> backend
>>> due to volume of state)
>>> 2. but then if we latter find that Flink doesn't answer our needs(for any
>>> reason) - we'll need to extract this state in some way(since it's the
>>> only
>>> source of consistent state)
>>> In general I'd like to be able to take snapshot of backend and try to
>>> read
>>> it...do you think it's will be trivial task?
>>> say If I'm holding list state per partitioned key, would it be easy to
>>> take
>>> RocksDb file and open it?
>>>
>>> any thoughts regarding how can I convince people in our team?
>>>
>>> thanks in advance!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>


Re: Flink event processing immediate feedback

2016-04-14 Thread Igor Berman
Yes, indeed this is direction we are trying currently

thanks


On 14 April 2016 at 18:31, Aljoscha Krettek <aljos...@apache.org> wrote:

> Could still be, as I described it by using a message queue to do the
> communication between Flink and the front end.
>
> On Thu, 14 Apr 2016 at 17:30 Igor Berman <igor.ber...@gmail.com> wrote:
>
>> Hi Aljoscha,
>> thanks for the response
>>
>> Synchronous - in our case means that request by end-client to
>> frontend(say some REST call) needs to wait until processing in
>> backend(Flink) is done and should return response(e.g. alert) back to
>> end-client(i.e. end-client -> frontend -> kafka-> flink)
>> those request are minority and will be defined by different type of
>> request to frontend or different parameters
>>
>> Might be Flink is not best option for this use case?
>>
>> Igor
>>
>>
>>
>>
>>
>> On 14 April 2016 at 16:37, Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>>> Hi,
>>> what do you mean by "synchronous". If I understood it correctly then
>>> some events entering the Flink pipeline would trigger an alert while some
>>> others would not trigger an alert. How would the component that receives
>>> such alerts know when to wait and when to don't wait.
>>>
>>>  As I see it you can push these alerts into some sort of message queue
>>> and have a request in the front-end that returns as soon as something is
>>> available in that queue. Then you display it and start a new query that
>>> returns as soon as more data is available in the alert queue.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 7 Apr 2016 at 09:57 igor.berman <igor.ber...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> Suppose I have web facing frontend that gets stream of events(http
>>>> calls). I
>>>> need to process event stream and do some aggregations over those events
>>>> and
>>>> write aggregated statistics to Hbase - so far Flink seems as perfect
>>>> match.
>>>> However in some cases event should trigger some alert and frontend
>>>> needs to
>>>> get this alert in synchronous way - here I'm a bit lost. I thought about
>>>> some kind of following flow:
>>>> frontend -> queue -> flink -> redis(pub/sub)<- frontend
>>>>
>>>> I.e. I have two major use cases - async aggregated analytics/stats
>>>> computing
>>>> and "synchronous" response to frontend. Frontend might be node/play or
>>>> any
>>>> other technology that won't have a problem of "waiting" for the
>>>> response, so
>>>> the only question - how to implement this feedback ?
>>>> Might be some kind of Sink?
>>>>
>>>> Any ideas would be appreciated,
>>>> Igor
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-event-processing-immediate-feedback-tp5978.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>
>>


Re: Flink event processing immediate feedback

2016-04-14 Thread Igor Berman
Hi Aljoscha,
thanks for the response

Synchronous - in our case means that request by end-client to frontend(say
some REST call) needs to wait until processing in backend(Flink) is done
and should return response(e.g. alert) back to end-client(i.e. end-client
-> frontend -> kafka-> flink)
those request are minority and will be defined by different type of request
to frontend or different parameters

Might be Flink is not best option for this use case?

Igor





On 14 April 2016 at 16:37, Aljoscha Krettek  wrote:

> Hi,
> what do you mean by "synchronous". If I understood it correctly then some
> events entering the Flink pipeline would trigger an alert while some others
> would not trigger an alert. How would the component that receives such
> alerts know when to wait and when to don't wait.
>
>  As I see it you can push these alerts into some sort of message queue and
> have a request in the front-end that returns as soon as something is
> available in that queue. Then you display it and start a new query that
> returns as soon as more data is available in the alert queue.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 09:57 igor.berman  wrote:
>
>> Hi,
>> Suppose I have web facing frontend that gets stream of events(http
>> calls). I
>> need to process event stream and do some aggregations over those events
>> and
>> write aggregated statistics to Hbase - so far Flink seems as perfect
>> match.
>> However in some cases event should trigger some alert and frontend needs
>> to
>> get this alert in synchronous way - here I'm a bit lost. I thought about
>> some kind of following flow:
>> frontend -> queue -> flink -> redis(pub/sub)<- frontend
>>
>> I.e. I have two major use cases - async aggregated analytics/stats
>> computing
>> and "synchronous" response to frontend. Frontend might be node/play or any
>> other technology that won't have a problem of "waiting" for the response,
>> so
>> the only question - how to implement this feedback ?
>> Might be some kind of Sink?
>>
>> Any ideas would be appreciated,
>> Igor
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-event-processing-immediate-feedback-tp5978.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>


Re: [ANNOUNCE] Flink 1.0.0 has been released

2016-03-08 Thread Igor Berman
Congratulations!
Very nice work, very interesting features.

One question regarding CEP: do you think it's feasible to define pattern
over window of 1 month or even more?
Is there some deep explanation regarding how this partial states are saved?
I mean events that create "funnel" might be separated by very large periods
of inactivity/noise



On 8 March 2016 at 17:17, Kostas Tzoumas  wrote:

> Hi everyone!
>
> As you might have noticed, Apache Flink 1.0.0 has been released and
> announced!
>
> You can read more about the release at the ASF blog and the Flink blog
> -
> https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
> - http://flink.apache.org/news/2016/03/08/release-1.0.0.html
>
> Don't forget to retweet and spread the news :-)
> - https://twitter.com/TheASF/status/707174116969857024
> - https://twitter.com/ApacheFlink/status/707175973482012672
>
> Check out the changelog and the migration guide, download the release, and
> check out the documentation
> - http://flink.apache.org/blog/release_1.0.0-changelog_known_issues.html
> -
> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
> - https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations
> - http://flink.apache.org/downloads.html
> - https://ci.apache.org/projects/flink/flink-docs-release-1.0/
>
> Many congratulations to the Flink community for making this happen!
>
> Best,
> Kostas
>