Re: Help using HBase with Flink 1.1.4

2017-01-16 Thread Ted Yu
Logged FLINK-5517 for upgrading hbase version to 1.3.0

On Mon, Jan 16, 2017 at 5:26 PM, Ted Yu  wrote:

> hbase uses Guava 12.0.1 and Flink uses 18.0 where Stopwatch.()V is
> no longer accessible.
> HBASE-14963 removes the use of Stopwatch at this location.
>
> hbase 1.3.0 RC has passed voting period.
>
> Please use 1.3.0 where you wouldn't see the IllegalAccessError
>
> On Mon, Jan 16, 2017 at 4:50 PM, Giuliano Caliari <
> giuliano.cali...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm trying to use HBase on one of my stream transformations and I'm
>> running into the Guava/Stopwatch dependency problem
>>
>> java.lang.IllegalAccessError: tried to access method 
>> com.google.common.base.Stopwatch.()V from class 
>> org.apache.hadoop.hbase.zookeeper.MetaTableLocator
>>
>>
>> Reading on the problem it seems that there is a way to avoid it using
>> shading:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> setup/building.html#dependency-shading
>>
>> But I can't get it to work.
>> I followed the documented steps and it builds but when I try to run the
>> newly built version it fails when trying to connect to the Resource Manager:
>>
>> 2017-01-17 00:42:05,872 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>   - Using values:
>> 2017-01-17 00:42:05,872 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>   - TaskManager count = 4
>> 2017-01-17 00:42:05,873 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>   - JobManager memory = 1024
>> 2017-01-17 00:42:05,873 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>   - TaskManager memory = 32768
>> 2017-01-17 00:42:05,892 INFO  org.apache.hadoop.yarn.client.RMProxy
>> - Connecting to ResourceManager at /0.0.0.0:8032
>> 2017-01-17 00:42:07,023 INFO  org.apache.hadoop.ipc.Client
>>- Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
>> Already tried 0 time(s); retry policy is 
>> RetryUpToMaximumCountWithFixedSleep(maxRetries=10,
>> sleepTime=1000 MILLISECONDS)
>> 2017-01-17 00:42:08,024 INFO  org.apache.hadoop.ipc.Client
>>- Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
>> Already tried 1 time(s); retry policy is 
>> RetryUpToMaximumCountWithFixedSleep(maxRetries=10,
>> sleepTime=1000 MILLISECONDS)
>>
>>
>> I'm currently building version 1.1.4 of Flink based on the github repo.
>> Building it without shading (not executing `mvn clean install` on the
>> flink-dist sub-project) works fine until I try to use HBase, at which point
>> I get the Stopwatch exception.
>>
>> Has anyone been able to solve this?
>>
>> Thanks you,
>>
>> Giuliano Caliari
>> --
>> --
>> Giuliano Caliari (+55 11 984898464 <+55%2011%2098489-8464>)
>> +Google
>> 
>> Twitter 
>>
>> Master Software Engineer by Escola Politécnica da USP
>> Bachelor in Computer Science by Instituto de Matemática e Estatística da
>> USP
>>
>>
>


Re: Help using HBase with Flink 1.1.4

2017-01-16 Thread Ted Yu
hbase uses Guava 12.0.1 and Flink uses 18.0 where Stopwatch.()V is no
longer accessible.
HBASE-14963 removes the use of Stopwatch at this location.

hbase 1.3.0 RC has passed voting period.

Please use 1.3.0 where you wouldn't see the IllegalAccessError

On Mon, Jan 16, 2017 at 4:50 PM, Giuliano Caliari <
giuliano.cali...@gmail.com> wrote:

> Hello,
>
> I'm trying to use HBase on one of my stream transformations and I'm
> running into the Guava/Stopwatch dependency problem
>
> java.lang.IllegalAccessError: tried to access method 
> com.google.common.base.Stopwatch.()V from class 
> org.apache.hadoop.hbase.zookeeper.MetaTableLocator
>
>
> Reading on the problem it seems that there is a way to avoid it using
> shading:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/setup/building.html#dependency-shading
>
> But I can't get it to work.
> I followed the documented steps and it builds but when I try to run the
> newly built version it fails when trying to connect to the Resource Manager:
>
> 2017-01-17 00:42:05,872 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Using values:
> 2017-01-17 00:42:05,872 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - TaskManager count = 4
> 2017-01-17 00:42:05,873 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - JobManager memory = 1024
> 2017-01-17 00:42:05,873 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - TaskManager memory = 32768
> 2017-01-17 00:42:05,892 INFO  org.apache.hadoop.yarn.client.RMProxy
>   - Connecting to ResourceManager at /0.0.0.0:8032
> 2017-01-17 00:42:07,023 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 0 time(s); retry policy is 
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10,
> sleepTime=1000 MILLISECONDS)
> 2017-01-17 00:42:08,024 INFO  org.apache.hadoop.ipc.Client
>  - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
> Already tried 1 time(s); retry policy is 
> RetryUpToMaximumCountWithFixedSleep(maxRetries=10,
> sleepTime=1000 MILLISECONDS)
>
>
> I'm currently building version 1.1.4 of Flink based on the github repo.
> Building it without shading (not executing `mvn clean install` on the
> flink-dist sub-project) works fine until I try to use HBase, at which point
> I get the Stopwatch exception.
>
> Has anyone been able to solve this?
>
> Thanks you,
>
> Giuliano Caliari
> --
> --
> Giuliano Caliari (+55 11 984898464 <+55%2011%2098489-8464>)
> +Google
> 
> Twitter 
>
> Master Software Engineer by Escola Politécnica da USP
> Bachelor in Computer Science by Instituto de Matemática e Estatística da
> USP
>
>


Help using HBase with Flink 1.1.4

2017-01-16 Thread Giuliano Caliari
Hello,

I'm trying to use HBase on one of my stream transformations and I'm running
into the Guava/Stopwatch dependency problem

java.lang.IllegalAccessError: tried to access method
com.google.common.base.Stopwatch.()V from class
org.apache.hadoop.hbase.zookeeper.MetaTableLocator


Reading on the problem it seems that there is a way to avoid it using
shading:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/building.html#dependency-shading

But I can't get it to work.
I followed the documented steps and it builds but when I try to run the
newly built version it fails when trying to connect to the Resource Manager:

2017-01-17 00:42:05,872 INFO  org.apache.flink.yarn.YarnClusterDescriptor
- Using values:
2017-01-17 00:42:05,872 INFO  org.apache.flink.yarn.YarnClusterDescriptor
- TaskManager count = 4
2017-01-17 00:42:05,873 INFO  org.apache.flink.yarn.YarnClusterDescriptor
- JobManager memory = 1024
2017-01-17 00:42:05,873 INFO  org.apache.flink.yarn.YarnClusterDescriptor
- TaskManager memory = 32768
2017-01-17 00:42:05,892 INFO  org.apache.hadoop.yarn.client.RMProxy
- Connecting to ResourceManager at /0.0.0.0:8032
2017-01-17 00:42:07,023 INFO  org.apache.hadoop.ipc.Client
 - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
Already tried 0 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2017-01-17 00:42:08,024 INFO  org.apache.hadoop.ipc.Client
 - Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
Already tried 1 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)


I'm currently building version 1.1.4 of Flink based on the github repo.
Building it without shading (not executing `mvn clean install` on the
flink-dist sub-project) works fine until I try to use HBase, at which point
I get the Stopwatch exception.

Has anyone been able to solve this?

Thanks you,

Giuliano Caliari
-- 
-- 
Giuliano Caliari (+55 11 984898464)
+Google

Twitter 

Master Software Engineer by Escola Politécnica da USP
Bachelor in Computer Science by Instituto de Matemática e Estatística da USP


Three input stream operator and back pressure

2017-01-16 Thread Dmitry Golubets
Hi,

there are only *two *interfaces defined at the moment:
*OneInputStreamOperator*
and
*TwoInputStreamOperator.*

Is there any way to define an operator with arbitrary number of inputs?

My another concern is how to maintain *backpressure *in the operator?
Let's say I read events from two Kafka sources, both of which are ordered
by time. I want to merge them keeping the global order. But to do it, I
need to stop block one input if another one has no data yet.

Best regards,
Dmitry


Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-16 Thread Andrew Roberts
Hi Gordon,

Thanks for getting back to me. The ticket looks good, but I’m going to need to 
do something similar for our homegrown sinks. It sounds like just having the 
affected sinks participate in checkpointing is enough of a solution - is there 
anything special about `SinkFunction[T]` extending `Checkpointed[S]`, or can I 
just implement it as I would for e.g. a mapping function?

Thanks,

Andrew



> On Jan 13, 2017, at 4:34 PM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Andrew,
> 
> Your observations are correct. Like you mentioned, the current problem 
> circles around how we deal with the pending buffered requests with accordance 
> to Flink’s checkpointing.
> I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
> description: https://issues.apache.org/jira/browse/FLINK-5487 
> . What do you think?
> 
> Thank you for bringing this up! We should probably fix this soon.
> There’s already some on-going effort in fixing some other aspects of proper 
> at-least-once support in the Elasticsearch sinks, so I believe this will be 
> brought to attention very soon too.
> 
> Cheers,
> Gordon
> 
> 
> 
> 
> On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com 
> ) wrote:
> 
>> I’m trying to understand the guarantees made by Flink’s Elasticsearch sink 
>> in terms of message delivery. according to (1), the ES sink offers 
>> at-least-once guarantees. This page doesn’t differentiate between 
>> flink-elasticsearch and flink-elasticsearch2, so I have to assume for the 
>> moment that they both offer that guarantee. However, a look at the code (2) 
>> shows that the invoke() method puts the record into a buffer, and then that 
>> buffer is flushed to elasticsearch some time later.



Re: Measuring Execution Time

2017-01-16 Thread Charith Wickramarachchi
Thanks very much.

Regards,
Charith

On Mon, Jan 16, 2017 at 12:49 PM, Ufuk Celebi  wrote:

> This is exposed via the REST API under:
>
> /jobs/:jobid/vertices/:vertexid/subtasktimes
>
> The SubtasksTimesHandler serves this endpoint.
>
> – Ufuk
>
>
> On Mon, Jan 16, 2017 at 6:20 PM, Charith Wickramarachchi
>  wrote:
> > Thanks very much. I noticed the times recorded in the web interface. I
> was a
> > little hesitant to use that as I might make mistake when recording the
> > times. Could you please point me to the code in flink these runtimes are
> > computed?
> >
> > Thanks,
> > Charith
> >
> > On Mon, Jan 16, 2017 at 3:15 AM, Ufuk Celebi  wrote:
> >>
> >> Unfortunately no. You only get the complete execution time after the
> >> job has finished. What you can do is browse to the web interface and
> >> check the runtime for each operator there (asumming that each
> >> iterative process is a separate operator).
> >>
> >> Does this help?
> >>
> >> On Mon, Jan 16, 2017 at 7:36 AM, Charith Wickramarachchi
> >>  wrote:
> >> > Hi All,
> >> >
> >> > I have a program that executes multiple (data dependent) iterative
> jobs
> >> > one
> >> > after another in the same execution plan.
> >> >
> >> > It has the following structure
> >> >
> >> > ExecutionEnvironment env = ..
> >> >
> >> > LoadData
> >> >
> >> > Iterative Process 1
> >> >
> >> > Iterative Process 2
> >> >
> >> > ...
> >> >
> >> > Iterative Process N
> >> >
> >> > env.execute("...")
> >> >
> >> >
> >> > I want to measure the execution time for each Iterative process. Does
> >> > flink
> >> > has a direct way to measure this time? It will be great if you could
> >> > provide
> >> > a pointer.
> >> >
> >> > Thanks,
> >> > Charith
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Charith Dhanushka Wickramaarachchi
> >> >
> >> > Tel  +1 213 447 4253
> >> > Blog  http://charith.wickramaarachchi.org/
> >> > Twitter  @charithwiki
> >> >
> >> > This communication may contain privileged or other confidential
> >> > information
> >> > and is intended exclusively for the addressee/s. If you are not the
> >> > intended
> >> > recipient/s, or believe that you may have
> >> > received this communication in error, please reply to the sender
> >> > indicating
> >> > that fact and delete the copy you received and in addition, you should
> >> > not
> >> > print, copy, retransmit, disseminate, or otherwise use the information
> >> > contained in this communication. Internet communications cannot be
> >> > guaranteed to be timely, secure, error or virus-free. The sender does
> >> > not
> >> > accept liability for any errors or omissions
> >
> >
> >
> >
> > --
> > Charith Dhanushka Wickramaarachchi
> >
> > Tel  +1 213 447 4253
> > Blog  http://charith.wickramaarachchi.org/
> > Twitter  @charithwiki
> >
> > This communication may contain privileged or other confidential
> information
> > and is intended exclusively for the addressee/s. If you are not the
> intended
> > recipient/s, or believe that you may have
> > received this communication in error, please reply to the sender
> indicating
> > that fact and delete the copy you received and in addition, you should
> not
> > print, copy, retransmit, disseminate, or otherwise use the information
> > contained in this communication. Internet communications cannot be
> > guaranteed to be timely, secure, error or virus-free. The sender does not
> > accept liability for any errors or omissions
>



-- 
Charith Dhanushka Wickramaarachchi

Tel  +1 213 447 4253
Blog  http://charith.wickramaarachchi.org/

Twitter  @charithwiki 

This communication may contain privileged or other confidential information
and is intended exclusively for the addressee/s. If you are not the
intended recipient/s, or believe that you may have
received this communication in error, please reply to the sender indicating
that fact and delete the copy you received and in addition, you should not
print, copy, retransmit, disseminate, or otherwise use the information
contained in this communication. Internet communications cannot be
guaranteed to be timely, secure, error or virus-free. The sender does not
accept liability for any errors or omissions


Re: Measuring Execution Time

2017-01-16 Thread Ufuk Celebi
This is exposed via the REST API under:

/jobs/:jobid/vertices/:vertexid/subtasktimes

The SubtasksTimesHandler serves this endpoint.

– Ufuk


On Mon, Jan 16, 2017 at 6:20 PM, Charith Wickramarachchi
 wrote:
> Thanks very much. I noticed the times recorded in the web interface. I was a
> little hesitant to use that as I might make mistake when recording the
> times. Could you please point me to the code in flink these runtimes are
> computed?
>
> Thanks,
> Charith
>
> On Mon, Jan 16, 2017 at 3:15 AM, Ufuk Celebi  wrote:
>>
>> Unfortunately no. You only get the complete execution time after the
>> job has finished. What you can do is browse to the web interface and
>> check the runtime for each operator there (asumming that each
>> iterative process is a separate operator).
>>
>> Does this help?
>>
>> On Mon, Jan 16, 2017 at 7:36 AM, Charith Wickramarachchi
>>  wrote:
>> > Hi All,
>> >
>> > I have a program that executes multiple (data dependent) iterative jobs
>> > one
>> > after another in the same execution plan.
>> >
>> > It has the following structure
>> >
>> > ExecutionEnvironment env = ..
>> >
>> > LoadData
>> >
>> > Iterative Process 1
>> >
>> > Iterative Process 2
>> >
>> > ...
>> >
>> > Iterative Process N
>> >
>> > env.execute("...")
>> >
>> >
>> > I want to measure the execution time for each Iterative process. Does
>> > flink
>> > has a direct way to measure this time? It will be great if you could
>> > provide
>> > a pointer.
>> >
>> > Thanks,
>> > Charith
>> >
>> >
>> >
>> >
>> > --
>> > Charith Dhanushka Wickramaarachchi
>> >
>> > Tel  +1 213 447 4253
>> > Blog  http://charith.wickramaarachchi.org/
>> > Twitter  @charithwiki
>> >
>> > This communication may contain privileged or other confidential
>> > information
>> > and is intended exclusively for the addressee/s. If you are not the
>> > intended
>> > recipient/s, or believe that you may have
>> > received this communication in error, please reply to the sender
>> > indicating
>> > that fact and delete the copy you received and in addition, you should
>> > not
>> > print, copy, retransmit, disseminate, or otherwise use the information
>> > contained in this communication. Internet communications cannot be
>> > guaranteed to be timely, secure, error or virus-free. The sender does
>> > not
>> > accept liability for any errors or omissions
>
>
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Blog  http://charith.wickramaarachchi.org/
> Twitter  @charithwiki
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the intended
> recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should not
> print, copy, retransmit, disseminate, or otherwise use the information
> contained in this communication. Internet communications cannot be
> guaranteed to be timely, secure, error or virus-free. The sender does not
> accept liability for any errors or omissions


Re: How to read from a Kafka topic from the beginning

2017-01-16 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

I would put this differently: "auto.offset.reset" policy is only used,
if there are no valid committed offsets for a topic.

See here:
http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups
- -and-offset-management

(don't be confused about "earliest/latest" and "smallest/larges" --
the former is for Kafka 0.8.2 and the later for 0.9+ -- but the
mechanism is the same)

But I though, Flink does not rely on consumer offsets commits but does
"manual" offset management? So I am wondering, if this property is
passed into the Kafka source operator's Kafka consumer or not?


- -Matthias


On 11/17/15 2:30 AM, Robert Metzger wrote:
> Hi Will,
> 
> In Kafka's consumer configuration [1] there is a configuration
> parameter called "auto.offset.reset". Setting it to "smallest" will
> tell the consumer to start reading a topic from the smallest
> available offset.
> 
> You can pass the configuration using the properties of the Kafka
> consumer.
> 
> 
> [1] http://kafka.apache.org/documentation.html#consumerconfigs
> 
> 
> On Tue, Nov 17, 2015 at 8:55 AM, Miaoyongqiang (Will) 
> >
> wrote:
> 
> Hi,
> 
> __ __
> 
> How can I tell a “FlinkKafkaConsumer” that I want to read from a 
> topic from the beginning?
> 
> __ __
> 
> Thanks,
> 
> Will
> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIYBAEBCgAGBQJYfSWaAAoJELz8Z8hxAGOilZ0P2gJZzeSpSU5RK7gmrL5oohyA
T+mKWXIkdMepDNec6w4zM0V07NnObu0UsVqPWEJmdOHg6bFihxmjO8i+7vYFShDH
9h26pChB7W6nvrwrASRiTXLNQl9rhMrBmp2qsMXskjKCHn+pHGeT0+LIt91sCwL0
VndFzk36UolfleGxpeQkcmPfNeTvlHws7nI5Imv5flsGIvWuGyJr/1v1Z2bWuXYj
PxE2vndoQo4yvcgEfSI3kNnm3vKnflPi83SuCY5r+C2lfiz1c83GM/yPPwlcUR5c
KjfeDQidy0B9npYkvTqoJV7Fm0oGvWjKKHCoS5HRrk4ha8WrakS/5FNpwf+FaOhi
+TCCdi9TAHhYd0lD183HK/F6bbnHTvo75C9PsCjcF7gFWDOj9sBgvTNvz8SgokpQ
g+QeiWtfi/YeU1TRWfM/KlpBdr5O/KmPFJ6XxIzXzUQmjR+z+Rp0j/hWq6o4loS5
OlJbtZon08HMcGIC0hQOGlnF2tKMkwEuatA3/fDor9AU2TAmQjhdZGvAu/RIa9IX
yKATrFjdxLLk3sUVvowTnnK1kSEApM4g3m3hGdPVzqsIWzbjgsNSvBDPKEma7oFu
y3cpo+x7uqE0QkJpDaja2zvYdRu91lwAJIkpDPknE/Ip2x6j+sWPwz3NRTRK7eEN
NH65TaPJXQvipDA=
=iVUW
-END PGP SIGNATURE-


Re: How to read from a Kafka topic from the beginning

2017-01-16 Thread Jonas
You also need to have a new  for this to work.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-read-from-a-Kafka-topic-from-the-beginning-tp3522p11087.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Can serialization be disabled between chains?

2017-01-16 Thread Ufuk Celebi
+1 to what Fabian said. Regarding the memory consumption: Flink's back
pressure mechanisms also depends on this, because the availability of
(network) buffers determines how fast operator can produce data. If no
buffers are available, the producing operator will slow down.

On Mon, Jan 16, 2017 at 2:32 PM, Dmitry Golubets  wrote:
> First issue is not a problem with idiomatic Scala - we make all our data
> objects immutable.
> Second.. yeah, I guess it makes sense.
> Thanks for clarification.
>
> Best regards,
> Dmitry
>
> On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske  wrote:
>>
>> One of the reasons is to ensure that data cannot be modified after it left
>> a thread.
>> A function that emits the same object several times (in order to reduce
>> object creation & GC) might accidentally modify emitted records if they
>> would be put as object in a queue.
>> Moreover, it is easier to control the memory consumption if data is
>> serialized into a fixed number of buffers instead of being put on the JVM
>> heap.
>>
>> Best, Fabian
>>
>> 2017-01-16 14:21 GMT+01:00 Dmitry Golubets :
>>>
>>> Hi Ufuk,
>>>
>>> Do you know what's the reason for serialization of data between different
>>> threads?
>>>
>>> Also, thanks for the link!
>>>
>>> Best regards,
>>> Dmitry
>>>
>>> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi  wrote:

 Hey Dmitry,

 this is not possible if I'm understanding you correctly.

 A task chain is executed by a single task thread and hence it is not
 possible to continue processing before the record "leaves" the thread,
 which only happens when the next task thread or the network stack
 consumes it.

 Hand over between chained tasks happens without serialization. Only
 data between different task threads is serialized.

 Depending on your use case the newly introduced async I/O feature
 might be worth a look (will be part of the upcoming 1.2 release):
 https://github.com/apache/flink/pull/2629
>>>
>>>
>>
>


Re: Restart the job from a checkpoint

2017-01-16 Thread Ufuk Celebi
Yes, exactly. This is a little cumbersome at the moment, but there are plans to 
improve this after 1.2 is released.

– Ufuk

On 16 January 2017 at 16:33:49, tao xiao (xiaotao...@gmail.com) wrote:
> Hi Ufuk,
>  
> Thank you for the reply. I want to know what the difference is between
> state.backend.fs.checkpoint.dir
> and state.checkpoints.dir in this case? Does state.checkpoint.dir store the
> metadata that points to the checkpoint that is stored in
> state.backend.fs.checkpoint.dir?
>  
> On Mon, 16 Jan 2017 at 19:24 Ufuk Celebi wrote:
>  
> > Hey!
> >
> > This is possible with the upcoming 1.2 version of Flink (also in the
> > current snapshot version):
> >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html#externalized-checkpoints
> >   
> >
> > You have to manually activate it via the checkpoint config (see docs).
> >
> > Ping me if you have any questions.
> >
> > – Ufuk
> >
> >
> > On Mon, Jan 16, 2017 at 5:51 AM, tao xiao wrote:
> > > Hi team,
> > >
> > > Can we restart a flink job from previous successful checkpoint? I know we
> > > can start a flink from a savepoint but I wonder if I can do it similar by
> > > passing the checkpoint path to the flink run command to restore the job
> > from
> > > checkpoint.
> >
>  



Re: Regarding caching the evicted elements and re-emitting them to the next window

2017-01-16 Thread Shaoxuan Wang
Hi Abdul,
You may want to check out FLIP13 "side output" https://goo.gl/6KSYd0 . Once
we have this feature, you should be able to collect the data to the
external distributed storage, and use these data later on demand.
BTW, can you explain your use case in more details, such that people here
may help you figure out a better solution (it perhaps may just need some
tunings on your query plan).

Shaoxuan


On Sat, Jan 14, 2017 at 12:22 PM, Aljoscha Krettek 
wrote:

> Hi,
> I'm afraid there is no functionality for this in Flink. What you can do,
> however, is to not evict these elements from the window buffer but instead
> ignore them when processing your elements in the WindowFunction. This way
> they will be preserved for the next firing. You have to make sure to
> eventually evict some elements, however. Otherwise you would have a memory
> leak.
>
> Aljoscha
>
> On Sun, Jan 8, 2017, 23:47 Abdul Salam Shaikh 
> wrote:
>
>> Hi,
>>
>> I am using 1.2-Snapshot version of Apache Flink which provides the new
>> enhanced Evictor functionality and using customized triggers for Global
>> Window. I have a use case where I am evicting the unwanted event(element)
>> for the current window before it is evaluated. However, I am looking for
>> options to cache this evicted element and re-use it in the next window. Is
>> there a possibility which can help me achieve this in the context of Flink
>> or in a more generic programming approach.
>>
>> Thanks in anticipation!
>>
>


Re: Restart the job from a checkpoint

2017-01-16 Thread tao xiao
Hi Ufuk,

Thank you for the reply. I want to know what the difference is between
state.backend.fs.checkpoint.dir
and state.checkpoints.dir in this case? Does state.checkpoint.dir store the
metadata that points to the checkpoint that is stored in
state.backend.fs.checkpoint.dir?

On Mon, 16 Jan 2017 at 19:24 Ufuk Celebi  wrote:

> Hey!
>
> This is possible with the upcoming 1.2 version of Flink (also in the
> current snapshot version):
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html#externalized-checkpoints
>
> You have to manually activate it via the checkpoint config (see docs).
>
> Ping me if you have any questions.
>
> – Ufuk
>
>
> On Mon, Jan 16, 2017 at 5:51 AM, tao xiao  wrote:
> > Hi team,
> >
> > Can we restart a flink job from previous successful checkpoint? I know we
> > can start a flink from a savepoint but I wonder if I can do it similar by
> > passing the checkpoint path to the flink run command to restore the job
> from
> > checkpoint.
>


Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-16 Thread Vasiliki Kalavri
Hi Miguel,

thank you for opening the issue!
Changes/improvements to the documentation are also typically handled with
JIRAs and pull requests [1]. Would you like to give it a try and improve
the community detection docs?

Cheers,
-Vasia.

[1]: https://flink.apache.org/contribute-documentation.html

On 16 January 2017 at 12:58, Miguel Coimbra 
wrote:

> Hello,
>
> I created the JIRA issue at:
> https://issues.apache.org/jira/browse/FLINK-5506
>
> Is it possible to submit suggestions to the documentation?
> If so, where can I do so?
>
> I actually did this based on the example at this page (possible Flink
> versions aside):
>
> https://flink.apache.org/news/2015/08/24/introducing-flink-
> gelly.html#use-case-music-profiles
>
> ​From the documentation I assumed that CommunityDetection has the same
> internal semantics as LabelPropagation (minus the algorithm difference
> itself)​
> ​​
> ​.
> It would be relevant to mention that it is not necessary to generate IDs
> (as in the music example) and that an undirected representation of the
> graph is generated before the algorithm being executed.
>
> Kind regards,​
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>


Re: Queryable State

2017-01-16 Thread Dawid Wysakowicz
Hi Nico, Ufuk,

Thanks for diving into this issue.

@Nico

I don't think that's the problem. The code can be exactly reproduced in
java. I am using other constructor for ListDescriptor than you did:

You used:

> public ListStateDescriptor(String name, TypeInformation typeInfo)
>

While I used:

>  public ListStateDescriptor(String name, Class typeClass)


I think the problem is with the way I deserialized the value on the
QueryClient side as I tried to use:

>

KvStateRequestSerializer.deserializeList(serializedResult, {

  TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})

.createSerializer(new ExecutionConfig)

})


I have not checked it, but now I suspect this code would work:

> KvStateRequestSerializer.deserializeValue(serializedResult, {
>   TypeInformation.of(new
> TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> .createSerializer(new ExecutionConfig)
> })


Regarding removing the queryable state list I agree, using it seems
pointless. Moreover while removing it I would take a second look at those
functions:

> KvStateRequestSerializer::deserializeList
>
 KvStateRequestSerializer.serializeList


As I think they are not used at all even right now. Thanks for your time.

Regards
Dawid Wysakowicz

2017-01-16 13:25 GMT+01:00 Nico Kruber :

> Hi Dawid,
> regarding the original code, I couldn't reproduce this with the Java code I
> wrote and my guess is that the second parameter of the ListStateDescriptor
> is
> wrong:
>
>   .asQueryableState(
> "type-time-series-count",
> new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>   "type-time-series-count",
>   classOf[KeyedDataPoint[java.lang.Integer]]))
>
> this should rather be
>
> TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
>
> as in the query itself. It sounds strange to me that you don't get ant
> ClassCastException or a compile-time error due to the type being wrong but
> I
> lack some Scala knowledge to get to the ground of this.
>
>
> Regarding the removal of the queryable list state "sink", I created a JIRA
> issue for it and will open a PR:
> https://issues.apache.org/jira/browse/FLINK-5507
>
>
> Nico
>
> On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > Hi Nico,
> >
> > Recently I've tried the queryable state a bit differently, by using
> > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > util.ArrayList and it works as expected.
> >
> > The non-working example you can browse here:
> > https://github.com/dawidwys/flink-intro/tree/
> c66f01117b0fe3c0adc8923000543a7
> > 0a6fe2219 The working example here:
> > https://github.com/dawidwys/flink-intro/tree/master
> > (The QueryableJob is in module flink-queryable-job and the QueryClient in
> > flink-state-server)
> >
> > Sure, I am aware of the downfall of the ListState. I need it just for
> > presentational purpose, but you may be right there might not be any
> > production use for this state and it should be removed.
> > Maybe the problem is just with the ListState and removing it would
> resolve
> > also my problem :)
> >
> > Regards
> > Dawid Wysakowicz
> >
> > 2017-01-13 18:50 GMT+01:00 Nico Kruber :
> > > Hi Dawid,
> > > I'll try to reproduce the error in the next couple of days. Can you
> also
> > > share
> > > the value deserializer you use? Also, have you tried even smaller
> examples
> > > in
> > > the meantime? Did they work?
> > >
> > > As a side-note in general regarding the queryable state "sink" using
> > > ListState
> > > (".asQueryableState(, ListStateDescriptor)"): everything that
> enters
> > > this operator will be stored forever and never cleaned. Eventually, it
> > > will
> > > pile up too much memory and is thus of limited use. Maybe it should
> even
> > > be
> > > removed from the API.
> > >
> > >
> > > Nico
> > >
> > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > Hey Ufuk.
> > > > Did you maybe had a while to have a look at that problem?
> > > >
> > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> over
> > > > > the course of the day. From a first impression, this seems like a
> bug
> > > > > to me.
> > > > >
> > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > >
> > > > >  wrote:
> > > > > > Hi I was experimenting with the Query State feature and I have
> some
> > > > >
> > > > > problems
> > > > >
> > > > > > querying the state.
> > > > > >
> > > > > > The code which I use to produce the queryable state is:
> > > > > > env.addSource(kafkaConsumer).map(
> > > > > >
> > > > > >   e => e match {
> > > > > >
> > > > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > > case ButtonClickEvent(_, _, t) => 

Re: Can serialization be disabled between chains?

2017-01-16 Thread Dmitry Golubets
First issue is not a problem with idiomatic Scala - we make all our data
objects immutable.
Second.. yeah, I guess it makes sense.
Thanks for clarification.

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske  wrote:

> One of the reasons is to ensure that data cannot be modified after it left
> a thread.
> A function that emits the same object several times (in order to reduce
> object creation & GC) might accidentally modify emitted records if they
> would be put as object in a queue.
> Moreover, it is easier to control the memory consumption if data is
> serialized into a fixed number of buffers instead of being put on the JVM
> heap.
>
> Best, Fabian
>
> 2017-01-16 14:21 GMT+01:00 Dmitry Golubets :
>
>> Hi Ufuk,
>>
>> Do you know what's the reason for serialization of data between different
>> threads?
>>
>> Also, thanks for the link!
>>
>> Best regards,
>> Dmitry
>>
>> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi  wrote:
>>
>>> Hey Dmitry,
>>>
>>> this is not possible if I'm understanding you correctly.
>>>
>>> A task chain is executed by a single task thread and hence it is not
>>> possible to continue processing before the record "leaves" the thread,
>>> which only happens when the next task thread or the network stack
>>> consumes it.
>>>
>>> Hand over between chained tasks happens without serialization. Only
>>> data between different task threads is serialized.
>>>
>>> Depending on your use case the newly introduced async I/O feature
>>> might be worth a look (will be part of the upcoming 1.2 release):
>>> https://github.com/apache/flink/pull/2629
>>>
>>
>>
>


Re: Can serialization be disabled between chains?

2017-01-16 Thread Fabian Hueske
One of the reasons is to ensure that data cannot be modified after it left
a thread.
A function that emits the same object several times (in order to reduce
object creation & GC) might accidentally modify emitted records if they
would be put as object in a queue.
Moreover, it is easier to control the memory consumption if data is
serialized into a fixed number of buffers instead of being put on the JVM
heap.

Best, Fabian

2017-01-16 14:21 GMT+01:00 Dmitry Golubets :

> Hi Ufuk,
>
> Do you know what's the reason for serialization of data between different
> threads?
>
> Also, thanks for the link!
>
> Best regards,
> Dmitry
>
> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi  wrote:
>
>> Hey Dmitry,
>>
>> this is not possible if I'm understanding you correctly.
>>
>> A task chain is executed by a single task thread and hence it is not
>> possible to continue processing before the record "leaves" the thread,
>> which only happens when the next task thread or the network stack
>> consumes it.
>>
>> Hand over between chained tasks happens without serialization. Only
>> data between different task threads is serialized.
>>
>> Depending on your use case the newly introduced async I/O feature
>> might be worth a look (will be part of the upcoming 1.2 release):
>> https://github.com/apache/flink/pull/2629
>>
>
>


Re: Can serialization be disabled between chains?

2017-01-16 Thread Dmitry Golubets
Hi Ufuk,

Do you know what's the reason for serialization of data between different
threads?

Also, thanks for the link!

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi  wrote:

> Hey Dmitry,
>
> this is not possible if I'm understanding you correctly.
>
> A task chain is executed by a single task thread and hence it is not
> possible to continue processing before the record "leaves" the thread,
> which only happens when the next task thread or the network stack
> consumes it.
>
> Hand over between chained tasks happens without serialization. Only
> data between different task threads is serialized.
>
> Depending on your use case the newly introduced async I/O feature
> might be worth a look (will be part of the upcoming 1.2 release):
> https://github.com/apache/flink/pull/2629
>


Re: Can serialization be disabled between chains?

2017-01-16 Thread Ufuk Celebi
Hey Dmitry,

this is not possible if I'm understanding you correctly.

A task chain is executed by a single task thread and hence it is not
possible to continue processing before the record "leaves" the thread,
which only happens when the next task thread or the network stack
consumes it.

Hand over between chained tasks happens without serialization. Only
data between different task threads is serialized.

Depending on your use case the newly introduced async I/O feature
might be worth a look (will be part of the upcoming 1.2 release):
https://github.com/apache/flink/pull/2629


Re: Queryable State

2017-01-16 Thread Nico Kruber
Hi Dawid,
regarding the original code, I couldn't reproduce this with the Java code I 
wrote and my guess is that the second parameter of the ListStateDescriptor is 
wrong:

  .asQueryableState(
"type-time-series-count",
new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
  "type-time-series-count",
  classOf[KeyedDataPoint[java.lang.Integer]]))

this should rather be 

TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}

as in the query itself. It sounds strange to me that you don't get ant 
ClassCastException or a compile-time error due to the type being wrong but I 
lack some Scala knowledge to get to the ground of this.


Regarding the removal of the queryable list state "sink", I created a JIRA 
issue for it and will open a PR:
https://issues.apache.org/jira/browse/FLINK-5507


Nico

On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> Hi Nico,
> 
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
> 
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a7
> 0a6fe2219 The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
> 
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
> 
> Regards
> Dawid Wysakowicz
> 
> 2017-01-13 18:50 GMT+01:00 Nico Kruber :
> > Hi Dawid,
> > I'll try to reproduce the error in the next couple of days. Can you also
> > share
> > the value deserializer you use? Also, have you tried even smaller examples
> > in
> > the meantime? Did they work?
> > 
> > As a side-note in general regarding the queryable state "sink" using
> > ListState
> > (".asQueryableState(, ListStateDescriptor)"): everything that enters
> > this operator will be stored forever and never cleaned. Eventually, it
> > will
> > pile up too much memory and is thus of limited use. Maybe it should even
> > be
> > removed from the API.
> > 
> > 
> > Nico
> > 
> > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > Hey Ufuk.
> > > Did you maybe had a while to have a look at that problem?
> > > 
> > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > > the course of the day. From a first impression, this seems like a bug
> > > > to me.
> > > > 
> > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > 
> > > >  wrote:
> > > > > Hi I was experimenting with the Query State feature and I have some
> > > > 
> > > > problems
> > > > 
> > > > > querying the state.
> > > > > 
> > > > > The code which I use to produce the queryable state is:
> > > > > env.addSource(kafkaConsumer).map(
> > > > > 
> > > > >   e => e match {
> > > > >   
> > > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > >   
> > > > >   }).keyBy(0).timeWindow(Time.seconds(1))
> > > > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > >   e2._3)))
> > > > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> > 
> > e._2))
> > 
> > > > >   .keyBy("key")
> > > > >   .asQueryableState(
> > > > >   
> > > > > "type-time-series-count",
> > > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > > > 
> > > > >   "type-time-series-count",
> > > > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > 
> > > > > As you see it is a rather simple job, in which I try to count events
> > 
> > of
> > 
> > > > > different types in windows and then query by event type.
> > > > > 
> > > > > In client code I do:
> > > > > // Query Flink state
> > > > > val future = client.getKvState(jobId, "type-time-series-count",
> > > > > 
> > > > > key.hashCode, seralizedKey)
> > > > > 
> > > > > // Await async result
> > > > > val serializedResult: Array[Byte] = Await.result(
> > > > > 
> > > > >   future, new FiniteDuration(
> > > > >   
> > > > > 10,
> > > > > duration.SECONDS))
> > > > > 
> > > > > // Deserialize response
> > > > > val results = deserializeResponse(serializedResult)
> > > > > 
> > > > > results
> > > > >   
> > > > >   }
> > > > > 
> > > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > > 

Re: Queryable State

2017-01-16 Thread Ufuk Celebi
Hey Dawid! I talked offline with Nico last week and he took this over.
He also suggested to remove the list queryable state variant
altogether which makes a lot of sense to me (at least with the current
state of things). @Nico: could you open an issue for it?

Nico also found a difference in your code in the way you get the type
information in the job vs. your code (class of vs. manual type
information), which could be the root of the initial problem.

– Ufuk


On Sat, Jan 14, 2017 at 2:03 PM, Dawid Wysakowicz
 wrote:
> Hi Nico,
>
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
>
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219
> The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
>
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
>
> Regards
> Dawid Wysakowicz
>
>
> 2017-01-13 18:50 GMT+01:00 Nico Kruber :
>>
>> Hi Dawid,
>> I'll try to reproduce the error in the next couple of days. Can you also
>> share
>> the value deserializer you use? Also, have you tried even smaller examples
>> in
>> the meantime? Did they work?
>>
>> As a side-note in general regarding the queryable state "sink" using
>> ListState
>> (".asQueryableState(, ListStateDescriptor)"): everything that enters
>> this operator will be stored forever and never cleaned. Eventually, it
>> will
>> pile up too much memory and is thus of limited use. Maybe it should even
>> be
>> removed from the API.
>>
>>
>> Nico
>>
>> On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
>> > Hey Ufuk.
>> > Did you maybe had a while to have a look at that problem?
>> >
>> > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
>> > > Hey Dawid! Thanks for reporting this. I will try to have a look over
>> > > the course of the day. From a first impression, this seems like a bug
>> > > to me.
>> > >
>> > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
>> > >
>> > >  wrote:
>> > > > Hi I was experimenting with the Query State feature and I have some
>> > >
>> > > problems
>> > >
>> > > > querying the state.
>> > > >
>> > > > The code which I use to produce the queryable state is:
>> > > > env.addSource(kafkaConsumer).map(
>> > > >
>> > > >   e => e match {
>> > > >
>> > > > case LoginClickEvent(_, t) => ("login", 1, t)
>> > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
>> > > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
>> > > >
>> > > >   }).keyBy(0).timeWindow(Time.seconds(1))
>> > > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
>> > > >   e2._3)))
>> > > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
>> > > > e._2))
>> > > >   .keyBy("key")
>> > > >   .asQueryableState(
>> > > >
>> > > > "type-time-series-count",
>> > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>> > > >
>> > > >   "type-time-series-count",
>> > > >   classOf[KeyedDataPoint[java.lang.Integer]]))
>> > > >
>> > > > As you see it is a rather simple job, in which I try to count events
>> > > > of
>> > > > different types in windows and then query by event type.
>> > > >
>> > > > In client code I do:
>> > > > // Query Flink state
>> > > > val future = client.getKvState(jobId, "type-time-series-count",
>> > > >
>> > > > key.hashCode, seralizedKey)
>> > > >
>> > > > // Await async result
>> > > > val serializedResult: Array[Byte] = Await.result(
>> > > >
>> > > >   future, new FiniteDuration(
>> > > >
>> > > > 10,
>> > > > duration.SECONDS))
>> > > >
>> > > > // Deserialize response
>> > > > val results = deserializeResponse(serializedResult)
>> > > >
>> > > > results
>> > > >
>> > > >   }
>> > > >
>> > > >   private def deserializeResponse(serializedResult: Array[Byte]):
>> > > > util.List[KeyedDataPoint[lang
>> > > >
>> > > >   .Integer]] = {
>> > > >
>> > > > KvStateRequestSerializer.deserializeList(serializedResult,
>> > > >
>> > > > getValueSerializer())
>> > > >
>> > > >   }
>> > > >
>> > > > As I was trying to debug the issue I see the first element in list
>> > > > gets
>> > > > deserialized correctly, but it fails on the second one. It seems
>> > > > like
>> > > > the
>> > > > serialized result is broken. Do you have any idea if I am doing sth
>> > >
>> > > wrong or
>> > >
>> > > > there is some bug?
>> 

Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-16 Thread Miguel Coimbra
Hello,

I created the JIRA issue at:
https://issues.apache.org/jira/browse/FLINK-5506

Is it possible to submit suggestions to the documentation?
If so, where can I do so?

I actually did this based on the example at this page (possible Flink
versions aside):

https://flink.apache.org/news/2015/08/24/introducing-flink-gelly.html#use-case-music-profiles

​From the documentation I assumed that CommunityDetection has the same
internal semantics as LabelPropagation (minus the algorithm difference
itself)​
​​
​.
It would be relevant to mention that it is not necessary to generate IDs
(as in the music example) and that an undirected representation of the
graph is generated before the algorithm being executed.

Kind regards,​


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra


Re: Restart the job from a checkpoint

2017-01-16 Thread Ufuk Celebi
Hey!

This is possible with the upcoming 1.2 version of Flink (also in the
current snapshot version):
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html#externalized-checkpoints

You have to manually activate it via the checkpoint config (see docs).

Ping me if you have any questions.

– Ufuk


On Mon, Jan 16, 2017 at 5:51 AM, tao xiao  wrote:
> Hi team,
>
> Can we restart a flink job from previous successful checkpoint? I know we
> can start a flink from a savepoint but I wonder if I can do it similar by
> passing the checkpoint path to the flink run command to restore the job from
> checkpoint.


Re: Events are assigned to wrong window

2017-01-16 Thread Nico
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map
function using the ValueState. But first, the assignTimestampsAndWatermarks()
is called after the connector to Kafka is generated:

FlinkKafkaConsumer09 carFlinkKafkaConsumer09  =
  new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new
TimestampGenerator(Time.seconds(0)));

In the map function I try to calculate the direction between two GPS
data points. For this, I store the last event in ValueState. The
function looks like this:

private static class BearingMap extends RichMapFunction {

   private transient ValueState state;
   private final double maxdiff = 12; // in Sekunden

   @Override
   public Car map(Car destination) throws Exception {

  Car origin = state.value();
  double olat, olon, dlat, dlon;

  /**
   *  Wenn State leer, berechne keine Richtung, sondern speichere
Event nur in State
   */
  if (origin == null){
 state.update(destination);
 // gebe Car ohne Aenderungen zurueck
 return destination;
  }

  double diff = origin.getTimestamp()-destination.getTimestamp();

   System.out.println("Differenz: " +diff);

   if(Math.abs(diff) <= maxdiff*1000){

 /*
  * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
  */
 if(diff > 0){
Car tmp = destination;
destination = origin;
origin = tmp;
 }

 /*
  * Car tmp ist immer der Ursprung
  */

 double bearing = Helper.calculateBearing(
   
origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

 // Update des State's
 state.update(destination);

 origin.setDirection(bearing);
 return origin;

  }

  // Bei zu spaeten Events behalte jetzigen Status und gebe diesen
ohne Richtung zurück
 return origin;

   }


   @Override
   public void open(Configuration parameters) throws Exception {

  ValueStateDescriptor vsd = new ValueStateDescriptor<>(
"lastEvent",
Car.class,
null
  );

  state = getRuntimeContext().getState(vsd);
   }

}

Together with the window function:


private static class TimeWindowTest implements WindowFunction, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable iterable, Collector> collector)
throws Exception {
String s = "Zeitfenster: " +timeWindow.getStart() +" - " +
timeWindow.getEnd() +"\n";
Set timestamps = new HashSet();

for( Car c : iterable){
timestamps.add(c.getTimestamp());
}

System.out.println( s +timestamps +"\n\n");
}
}

I get for :

stream
   .filter(new FilterFunction() {
  @Override
  public boolean filter(Car car) throws Exception {
 return car.getId().equals("car.330");
  }
   })
 .keyBy("id")
 .map(new BearingMap())
 .keyBy("id")
 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 .apply(new TimeWindowTest());

So actually, when an event e1 arrives the Map Operator, it is stored
in ValueState and after the next element e2 arrives, e1
will be forwarded. This is after 5 seconds. This generates the
following outcome. One Element is always around 5 seconds before the
start of the window.

Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 148456469 - 148456470 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 148456470 - 148456471
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 148456471 - 148456472
[1484564706296, 1484564711303]


Best,

Nico



2017-01-09 16:10 GMT+01:00 Aljoscha Krettek :

> Hi,
> I'm assuming you also have the call to assignTimestampsAndWatermarks()
> somewhere in there as well, as in:
>
> stream
>   .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
> somewhere else in the pipeline
>   .keyBy("id")
>   .map(...)
>   .filter(...)
>   .map(...)
>   .keyBy("areaID")
>   .map(new KeyExtractor())
>   .keyBy("f1.areaID","f0.sinterval")
>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>   .apply(new TrafficInformation());
>
> Just checking, to make sure. If you have this we might have to dig a
> little deeper. Could you also please trying to bring the whole output of
> your apply() method in one go, i.e. collect all the output in a String and
> then have one call to System.out.println(), it could be that the output in
> 

Re: Measuring Execution Time

2017-01-16 Thread Ufuk Celebi
Unfortunately no. You only get the complete execution time after the
job has finished. What you can do is browse to the web interface and
check the runtime for each operator there (asumming that each
iterative process is a separate operator).

Does this help?

On Mon, Jan 16, 2017 at 7:36 AM, Charith Wickramarachchi
 wrote:
> Hi All,
>
> I have a program that executes multiple (data dependent) iterative jobs one
> after another in the same execution plan.
>
> It has the following structure
>
> ExecutionEnvironment env = ..
>
> LoadData
>
> Iterative Process 1
>
> Iterative Process 2
>
> ...
>
> Iterative Process N
>
> env.execute("...")
>
>
> I want to measure the execution time for each Iterative process. Does flink
> has a direct way to measure this time? It will be great if you could provide
> a pointer.
>
> Thanks,
> Charith
>
>
>
>
> --
> Charith Dhanushka Wickramaarachchi
>
> Tel  +1 213 447 4253
> Blog  http://charith.wickramaarachchi.org/
> Twitter  @charithwiki
>
> This communication may contain privileged or other confidential information
> and is intended exclusively for the addressee/s. If you are not the intended
> recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should not
> print, copy, retransmit, disseminate, or otherwise use the information
> contained in this communication. Internet communications cannot be
> guaranteed to be timely, secure, error or virus-free. The sender does not
> accept liability for any errors or omissions


Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-16 Thread Fabian Hueske
A user reported that outer joins on the Table API and SQL compute wrong
results:

https://issues.apache.org/jira/browse/FLINK-5498

2017-01-15 20:23 GMT+01:00 Till Rohrmann :

> I found two problematic issues with Mesos HA mode which breaks it:
>
> https://issues.apache.org/jira/browse/FLINK-5495
> https://issues.apache.org/jira/browse/FLINK-5496
>
> On Fri, Jan 13, 2017 at 11:29 AM, Fabian Hueske  wrote:
>
> > I tested the Table API / SQL a bit.
> >
> > I implemented a windowed aggregation with the streaming Table API and it
> > produced the same results as a DataStream API implementation.
> > Joining a stream with a TableFunction also seemed to work well.
> > Moreover, I checked the results of a bunch of TPC-H queries (batch SQL)
> > and all produced correct results.
> >
> >
> >
> > 2017-01-12 17:45 GMT+01:00 Till Rohrmann :
> >
> >> I'm wondering whether we should not depend the webserver encryption on
> the
> >> global encryption activation and activating it instead per default.
> >>
> >> On Thu, Jan 12, 2017 at 4:54 PM, Chesnay Schepler 
> >> wrote:
> >>
> >> > FLINK-5470 is a duplicate of FLINK-5298 for which there is also an
> open
> >> PR.
> >> >
> >> > FLINK-5472 is imo invalid since the webserver does support https, you
> >> just
> >> > have to enable it as per the security documentation.
> >> >
> >> >
> >> > On 12.01.2017 16:20, Till Rohrmann wrote:
> >> >
> >> > I also found an issue:
> >> >
> >> > https://issues.apache.org/jira/browse/FLINK-5470
> >> >
> >> > I also noticed that Flink's webserver does not support https requests.
> >> It
> >> > might be worthwhile to add it, though.
> >> >
> >> > https://issues.apache.org/jira/browse/FLINK-5472
> >> >
> >> > On Thu, Jan 12, 2017 at 11:24 AM, Robert Metzger  >
> >> > wrote:
> >> >
> >> >> I also found a bunch of issues
> >> >>
> >> >> https://issues.apache.org/jira/browse/FLINK-5465
> >> >> https://issues.apache.org/jira/browse/FLINK-5462
> >> >> https://issues.apache.org/jira/browse/FLINK-5464
> >> >> https://issues.apache.org/jira/browse/FLINK-5463
> >> >>
> >> >>
> >> >> On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske < 
> >> >> fhue...@gmail.com> wrote:
> >> >>
> >> >> > I have another bugfix for 1.2.:
> >> >> >
> >> >> > https://issues.apache.org/jira/browse/FLINK-2662 (pending PR)
> >> >> >
> >> >> > 2017-01-10 15:16 GMT+01:00 Robert Metzger < 
> >> >> rmetz...@apache.org>:
> >> >> >
> >> >> > > Hi,
> >> >> > >
> >> >> > > this depends a lot on the number of issues we find during the
> >> testing.
> >> >> > >
> >> >> > >
> >> >> > > These are the issues I found so far:
> >> >> > >
> >> >> > > https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
> >> >> > > https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
> >> >> > > https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
> >> >> > > https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
> >> >> > > https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui <
> shijin...@huawei.com>
> >> >> > wrote:
> >> >> > >
> >> >> > > > Do we have a probable time of 1.2 release? This month or Next
> >> month?
> >> >> > > >
> >> >> > > > -邮件原件-
> >> >> > > > 发件人: Robert Metzger [mailto: 
> >> >> rmetz...@apache.org]
> >> >> > > > 发送时间: 2017年1月3日 20:44
> >> >> > > > 收件人: d...@flink.apache.org
> >> >> > > > 抄送: user@flink.apache.org
> >> >> > > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing
> release
> >> >> > > candidate)
> >> >> > > >
> >> >> > > > Hi,
> >> >> > > >
> >> >> > > > First of all, I wish everybody a happy new year 2017.
> >> >> > > >
> >> >> > > > I've set user@flink in CC so that users who are interested in
> >> >> helping
> >> >> > > > with the testing get notified. Please respond only to the dev@
> >> >> list to
> >> >> > > > keep the discussion there!
> >> >> > > >
> >> >> > > > According to the 1.2 release discussion thread, I've created a
> >> first
> >> >> > > > release candidate for Flink 1.2.
> >> >> > > > The release candidate will not be the final release, because
> I'm
> >> >> > certain
> >> >> > > > that we'll find at least one blocking issue in the candidate :)
> >> >> > > >
> >> >> > > > Therefore, the RC is meant as a testing only release candidate.
> >> >> > > > Please report every issue we need to fix before the next RC in
> >> this
> >> >> > > thread
> >> >> > > > so that we have a good overview.
> >> >> > > >
> >> >> > > > The release artifacts are located here:
> >> >> > > > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/
> >> >> > > >
> >> >> > > > The maven staging repository is located here:
> >> >> > > > https://repository.apache.org/content/repositories/orgapache
> >> >> flink-
> >> >> > > >
> >> >> > > > The release commit (in branch 

Re: State in flink jobs

2017-01-16 Thread Stephan Ewen
Hi!

State is only persisted as part of checkpoints or savepoints.
If both are not used, then state is not persistent across restarts.

Stephan


On Mon, Jan 16, 2017 at 8:47 AM, Janardhan Reddy <
janardhan.re...@olacabs.com> wrote:

> Hi,
>
> Is the value state persisted across job restart/deployment without
> savepoint if checkpointing is disabled.
>


Re: How to get help on ClassCastException when re-submitting a job

2017-01-16 Thread Stephan Ewen
Hi!

I think Yury pointed out the correct diagnosis. Caching the classes across
multiple jobs in the same session can cause these types of issues.

For YARN single-job deployments, Flink 1.2 will not to any dynamic
classloading any more, but start with everything in the application
classpath.
For YARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot
containers.

Best,
Stephan



On Mon, Jan 16, 2017 at 11:07 AM, Ufuk Celebi  wrote:

> @Giuliano: any updates? Very curious to figure out what's causing
> this. As Fabian said, this is most likely a class loading issue.
> Judging from the stack trace, you are not running with YARN but a
> standalone cluster. Is that correct? Class loading wise nothing
> changed between Flink 1.1 and Flink 1.2 with respect to class loading
> and standalone clusters. Did you put any JARs into the lib folder of
> Flink before submitting the job?
>
> – Ufuk
>
> On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin 
> wrote:
> > Hi,
> >
> > I'd like to chime in since I've faced the same issue running Flink
> 1.1.4. I
> > have a long-running YARN session which I use to run multiple streaming
> jobs
> > concurrently. Once after cancelling and resubmitting the job I saw the "X
> > cannot be cast to X" ClassCastException exception in logs. I restarted
> YARN
> > session, then the problem disappeared.
> >
> > The class that failed to be cast was autogenerated by Avro compiler. I
> know
> > that Avro's Java binding does caching schemas in some static WeakHashMap.
> > I'm wondering whether that may step in the way of Flink classloading
> design.
> >
> > Anyway, I would be interested in watching the issue in Flink JIRA.
> >
> > Giuliano, could you provide the issue number?
> >
> > Thanks,
> > Yury
> >
> > 2017-01-11 14:11 GMT+03:00 Fabian Hueske :
> >>
> >> Hi Guiliano,
> >>
> >> thanks for bringing up this issue.
> >> A "ClassCastException: X cannot be cast to X" often points to a
> >> classloader issue.
> >> So it might actually be a bug in Flink.
> >>
> >> I assume you submit the same application (same jar file) with the same
> >> command right?
> >> Did you cancel the job before resubmitting?
> >>
> >> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
> >> on top) and include the commit hash from which you built Flink?
> >> It would be great if you could provide a short example program and
> >> instructions how to reproduce the problem.
> >>
> >> Thank you very much,
> >> Fabian
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK
> >>
> >>
> >>
> >> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari  >:
> >>>
> >>> Hello,
> >>>
> >>>
> >>>
> >>> I need some guidance on how to report a bug.
> >>>
> >>>
> >>>
> >>> I’m testing version 1.2 on my local cluster and the first time I submit
> >>> the job everything works but whenever I re-submit the same job it
> fails with
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The
> program
> >>> execution failed: Job execution failed.
> >>>
> >>> at
> >>> org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:427)
> >>>
> >>> at
> >>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:101)
> >>>
> >>> at
> >>> org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:400)
> >>>
> >>> at
> >>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.
> execute(StreamContextEnvironment.java:66)
> >>>
> >>> at
> >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
> execute(StreamExecutionEnvironment.scala:634)
> >>>
> >>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
> >>>
> >>> at
> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.
> delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(
> TraitorApp.scala:22)
> >>>
> >>> at
> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(
> TraitorApp.scala:21)
> >>>
> >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> >>>
> >>> at
> >>> scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> >>>
> >>> at scala.App$$anonfun$main$1.apply(App.scala:76)
> >>>
> >>> at scala.App$$anonfun$main$1.apply(App.scala:76)
> >>>
> >>> at scala.collection.immutable.List.foreach(List.scala:381)
> >>>
> >>> at
> >>> scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> >>>
> >>> at scala.App$class.main(App.scala:76)
> >>>
> >>> at
> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(
> TraitorApp.scala:21)
> >>>
> >>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(
> TraitorApp.scala)
> >>>
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>
> >>> at
> >>> sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >>>
> >>> at
> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> 

Re: How to get help on ClassCastException when re-submitting a job

2017-01-16 Thread Ufuk Celebi
@Giuliano: any updates? Very curious to figure out what's causing
this. As Fabian said, this is most likely a class loading issue.
Judging from the stack trace, you are not running with YARN but a
standalone cluster. Is that correct? Class loading wise nothing
changed between Flink 1.1 and Flink 1.2 with respect to class loading
and standalone clusters. Did you put any JARs into the lib folder of
Flink before submitting the job?

– Ufuk

On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin  wrote:
> Hi,
>
> I'd like to chime in since I've faced the same issue running Flink 1.1.4. I
> have a long-running YARN session which I use to run multiple streaming jobs
> concurrently. Once after cancelling and resubmitting the job I saw the "X
> cannot be cast to X" ClassCastException exception in logs. I restarted YARN
> session, then the problem disappeared.
>
> The class that failed to be cast was autogenerated by Avro compiler. I know
> that Avro's Java binding does caching schemas in some static WeakHashMap.
> I'm wondering whether that may step in the way of Flink classloading design.
>
> Anyway, I would be interested in watching the issue in Flink JIRA.
>
> Giuliano, could you provide the issue number?
>
> Thanks,
> Yury
>
> 2017-01-11 14:11 GMT+03:00 Fabian Hueske :
>>
>> Hi Guiliano,
>>
>> thanks for bringing up this issue.
>> A "ClassCastException: X cannot be cast to X" often points to a
>> classloader issue.
>> So it might actually be a bug in Flink.
>>
>> I assume you submit the same application (same jar file) with the same
>> command right?
>> Did you cancel the job before resubmitting?
>>
>> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
>> on top) and include the commit hash from which you built Flink?
>> It would be great if you could provide a short example program and
>> instructions how to reproduce the problem.
>>
>> Thank you very much,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK
>>
>>
>>
>> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari :
>>>
>>> Hello,
>>>
>>>
>>>
>>> I need some guidance on how to report a bug.
>>>
>>>
>>>
>>> I’m testing version 1.2 on my local cluster and the first time I submit
>>> the job everything works but whenever I re-submit the same job it fails with
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>>
>>> at
>>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>>
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>>>
>>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>>>
>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>>
>>> at
>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>>
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>>
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>>
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>
>>> at
>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>>
>>> at scala.App$class.main(App.scala:76)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>>>
>>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>>>
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>>>
>>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>>>
>>> at 

Re: Kafka KeyedStream source

2017-01-16 Thread Fabian Hueske
Hi Niels,

I think the biggest problem for keyed sources is that Flink must be able to
co-locate key-partitioned state with the pre-partitioned data.

This might work, if the key is the partition ID, i.e, not the original key
attribue that was hashed to assign events to partitions.
Flink could need to distribute topic partitions to source functions based
on its own hash function.

However, if you would like to keyBy the original key attribute, Flink would
need to have access to the hash function that was used to assign events to
partitions.

Best,
Fabian

2017-01-15 21:48 GMT+01:00 Tzu-Li (Gordon) Tai :

> Hi Niels,
>
> If it’s only for simple data filtering that does not depend on the key, a
> simple “flatMap” or “filter" directly after the source can be chained to
> the source instances.
> What that does is that the filter processing will be done within the same
> thread as the one fetching data from a Kafka partition, hence no excessive
> network transfers for this simple filtering.
> You can read more about operator chaining here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/concepts/runtime.
> html#tasks-and-operator-chains
>
> So, what that sums up to is that you have a FlinkKafkaConsumer as source,
> do a filter transformation right after, and then a keyBy followed with your
> heavy-processing, key-wise computations.
> Does that makes sense for what you have in mind?
>
> Cheers,
> Gordon
>
> On January 11, 2017 at 4:11:26 PM, Niels Basjes (ni...@basjes.nl) wrote:
>
> Hi,
>
> Ok. I think I get it.
>
> WHAT IF:
> Assume we create a addKeyedSource(...) which will allow us to add a source
> that makes some guarantees about the data.
> And assume this source returns simply the Kafka partition id as the result
> of this 'hash' function.
> Then if I have 10 kafka partitions I would read these records in and I
> could filter the data more efficiently because the data would not need to
> go over the network before this filter.
> Afterwards I can scale it up to 'many' tasks for the heavier processing
> that follows.
>
> As a concept: Could that be made to work?
>
> Niels
>
> On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Niels,
>>
>> Thank you for bringing this up. I recall there was some previous
>> discussion related to this before: [1].
>>
>> I don’t think this is possible at the moment, mainly because of how the
>> API is designed.
>>
>> On the other hand, a KeyedStream in Flink is basically just a DataStream
>> with a hash partitioner that is used when deciding which instance of the
>> following downstream operator an emitted record of the stream is sent to.
>> So, even if we have a Kafka source that directly produces a KeyedStream
>> on “addSource”, redistribution of data can still happen. I.e., if the
>> parallelism of the compute operators right after is different than the
>> number of Kafka partitions, redistribution will happen to let the key space
>> and state be evenly distributed in Flink.
>>
>> This leads to the argument that we probably need to think about whether
>> retaining the original partitioning of records in Kafka when consumed by
>> Flink is actually only a special case.
>> Flink, as a parallel compute engine, can freely adjust the parallelism of
>> its operators regardless of the parallelism of Kafka topics (rescaling
>> isn’t actually in yet, but is on the near-future roadmap).
>>
>> So, under the general case, the parallelism of a Flink operator may be
>> different than the number of Kafka partitions, and therefore redistributing
>> must occur.
>> For redistribution to not need to take place right after an already
>> partitioned Kafka topic, you’d need identical numbers of 1) Kafka
>> partitions, 2) Flink source instances consuming the partitions, and 3) the
>> parallelism of the keyed computation afterwards. This seems like a very
>> specific situation, considering that you’ll be able to rescale Flink
>> operators as the data’s key space / volume grows.
>>
>> The main observation, I think, is that Flink itself maintains how the key
>> space is partitioned within the system, which plays a crucial part in
>> rescaling. That’s why by default it doesn’t respect existing partitioning
>> of the key space in Kafka (or other external sources). Even if it initially
>> does at the beginning of a job, partitioning will most likely change as you
>> rescale your job / operators (which is a good thing, to be able to adapt).
>>
>> Cheers,
>> Gordon
>>
>> [1] http://apache-flink-mailing-list-archive.1008284.n3.
>> nabble.com/kafka-partition-assignment-td12123.html
>>
>> On January 6, 2017 at 1:38:05 AM, Niels Basjes (ni...@basjes.nl) wrote:
>>
>> Hi,
>>
>> In my scenario I have click stream data that I persist in Kafka.
>> I use the sessionId as the key to instruct Kafka to put everything with
>> the same sessionId into the same Kafka partition. That way I already have
>> all events of a visitor in a single kafka 

Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-16 Thread Kathleen Sharp
Hi Fabian,

A case consists of all events sharing the same case id. This id is
what we initially key the stream by.

The order of these events is the trace.

For example,
caseid: case1, consisting of event1, event2, event3. Start time 11:00,
end 11:05, run time 5 minutes
caseid: case12, consisting of event1, event2, event3 Start time 11:00,
end 11:15, run time 15 minutes

These are 2 distinct cases, with the same trace (event1, event2,
event3). This trace would have 2 occurrences with a min run time of 5
minutes, max 15 and average 10.

I have implemented your 2nd suggestion for the first job, I hope I
have made the Traces clearer as I am still unsure of the best approach
here.

Thanks a lot,
Kat

On Fri, Jan 13, 2017 at 10:45 PM, Fabian Hueske  wrote:
> On thing to add: the Flink KafkaProducer provides only at-least-once if
> flush-on-checkpoint is enabled [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean-
>
> 2017-01-13 22:02 GMT+01:00 Fabian Hueske :
>>
>> Hi Kat,
>>
>> I did not understand the difference between a case and a trace.
>> If I got it right, the goal of your first job is to assemble the
>> individual events into cases. Is a case here the last event for a case-id or
>> all events of a case-id?
>> If a case is the collection of all events (which I assume) what is the
>> difference to a trace which is also the list of events (if I got it right)?
>>
>> In any case, I think your first job can also be solved without a session
>> window (which is quite complex internally).
>> There are two options:
>> 1) use a global window [1] with a custom trigger that triggers for each
>> arriving record. A global window does never end, which would be OK since
>> your cases do not end as well.
>> 2) use a MapFunction with key-partitioned operator state [2]. The map
>> function would simply update the state for every new event and emit a new
>> result.
>>
>> Regarding your concerns of losing data when writing to Kafka. Flink's
>> KafkaProducer provides at-least-once guarantees, which means that data might
>> be written more than once in case of a failure but won't be lost. If the
>> Kafka topic is partitioned by case-id and you only need the last record per
>> case-id, Kafka's log compaction should give you upsert semantics.
>>
>> Regarding your question "Is using state in this way a somewhat standard
>> practice, or is state intended more for recovery?":
>> Many streaming applications require state for their semantics (just like
>> yours), i.e., they need to buffer data and wait for more data to arrive. In
>> order to guarantee consistent result semantics of an application, the state
>> must not be lost and be recovered in case of a failure. So state is not
>> intended for recovery, but recovery is needed to guarantee application
>> semantics.
>>
>> As I said before, I did not get the difference between cases and trace, so
>> I cannot really comment on the job to analyze traces.
>>
>> Hope this helps,
>> Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#global-windows
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
>>
>> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp :
>>>
>>> I have been playing around with Flink for a few weeks to try to
>>> ascertain whether or not it meets our use cases, and also what best
>>> practices we should be following. I have a few questions I would
>>> appreciate answers to.
>>>
>>>
>>> Our scenario is that we want to process a lot of event data into
>>> cases. A case is an inorder sequence of events; this event data could
>>> be quite old. We never know when a case is complete, so we just want
>>> to have the most up to date picture of what a case looks like.
>>>
>>>
>>> The inorder sequence of events of a case is called the trace. Many
>>> cases could have an identical trace. We would like to construct these
>>> traces, and do some aggregations on those (case count, average/min/max
>>> life-cycle time).
>>>
>>>
>>> We then have further downstream processing we will do on a case, some
>>> of which would require additional inputs, either from side-inputs of
>>> somehow joining data sources.
>>>
>>>
>>> We don’t really care about event time at the moment, because we just
>>> want to build cases and traces with all the data we have received.
>>>
>>>
>>> The end results should be available for our web front end via rest api.
>>>
>>>
>>> Based on the above I have the following idea for a first implementation:
>>>
>>>
>>> Kafka source -> key by case id -> session window with rocks db state
>>> backend holding case for that key -> postgres sink
>>>
>>>
>>> The reason for a session window is that, as I mentioned above, we just
>>> want to build a group