Re: set flink yarn jvm options

2016-08-04 Thread Prabhu V
Thanks much Jamie,

This works just like you mentioned. Sorry my bad, i was mistaken with this
parameter.



On Thu, Aug 4, 2016 at 4:39 PM, Jamie Grier  wrote:

> The YARN client should pass these JVM options along to each of the
> launched containers.  Have you tried this?
>
> On Thu, Aug 4, 2016 at 4:07 PM, Prabhu V  wrote:
>
>>
>> This property i believe affects only the yarn client. I want to set jvm
>> opts on application-manager and task-manager containers.
>>
>> Thanks,
>> Prabhu
>>
>> On Thu, Aug 4, 2016 at 3:07 PM, Jamie Grier 
>> wrote:
>>
>>> Use *env.java.opts*
>>>
>>> This will be respected by the YARN client.
>>>
>>>
>>>
>>> On Thu, Aug 4, 2016 at 11:10 AM, Prabhu V  wrote:
>>>
 The docs mention that

 env.java.opts.jobmanager
 env.java.opts.taskmanager

 parameters are available but are ignored by the yarn client, is there a
 way to set the jvm opts for yarn ?

 Thanks,
 Prabhu

 On Wed, Aug 3, 2016 at 7:03 PM, Prabhu V  wrote:

> Hi,
>
> Is there a way to set jvm options on the yarn application-manager and
> task-manager with flink ?
>
> Thanks,
> Prabhu
>


>>>
>>>
>>> --
>>>
>>> Jamie Grier
>>> data Artisans, Director of Applications Engineering
>>> @jamiegrier 
>>> ja...@data-artisans.com
>>>
>>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com
>
>


Re: Having a single copy of an object read in a RichMapFunction

2016-08-04 Thread Sameer W
Theodore,

Broadcast variables do that when using the DataSet API -
http://data-artisans.com/how-to-factorize-a-700-gb-matrix-with-apache-flink/

See the following lines in the article-
To support the above presented algorithm efficiently we had to improve
Flink’s broadcasting mechanism since it easily becomes the bottleneck of
the implementation. The enhanced Flink version can share broadcast
variables among multiple tasks running on the same machine. *Sharing avoids
having to keep for each task an individual copy of the broadcasted variable
on the heap. This increases the memory efficiency significantly, especially
if the broadcasted variables can grow up to several GBs of size.*

If you are using in the DataStream API then side-inputs (not yet
implemented) would achieve the same as broadcast variables.  (
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#)
. I use keyed Connected Streams in situation where I need them for one of
my use-cases (propagating rule changes to the data) where I could have used
side-inputs.

Sameer




On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Hello all,
>
> for a prototype we are looking into we would like to read a big matrix
> from HDFS, and for every element that comes in a stream of vectors do on
> multiplication with the matrix. The matrix should fit in the memory of one
> machine.
>
> We can read in the matrix using a RichMapFunction, but that would mean
> that a copy of the matrix is made for each Task Slot AFAIK, if the
> RichMapFunction is instantiated once per Task Slot.
>
> So I'm wondering how should we try address this problem, is it possible to
> have just one copy of the object in memory per TM?
>
> As a follow-up if we have more than one TM per node, is it possible to
> share memory between them? My guess is that we have to look at some
> external store for that.
>
> Cheers,
> Theo
>


Having a single copy of an object read in a RichMapFunction

2016-08-04 Thread Theodore Vasiloudis
Hello all,

for a prototype we are looking into we would like to read a big matrix from
HDFS, and for every element that comes in a stream of vectors do on
multiplication with the matrix. The matrix should fit in the memory of one
machine.

We can read in the matrix using a RichMapFunction, but that would mean
that a copy of the matrix is made for each Task Slot AFAIK, if the
RichMapFunction is instantiated once per Task Slot.

So I'm wondering how should we try address this problem, is it possible to
have just one copy of the object in memory per TM?

As a follow-up if we have more than one TM per node, is it possible to
share memory between them? My guess is that we have to look at some
external store for that.

Cheers,
Theo


Re: set flink yarn jvm options

2016-08-04 Thread Jamie Grier
The YARN client should pass these JVM options along to each of the launched
containers.  Have you tried this?

On Thu, Aug 4, 2016 at 4:07 PM, Prabhu V  wrote:

>
> This property i believe affects only the yarn client. I want to set jvm
> opts on application-manager and task-manager containers.
>
> Thanks,
> Prabhu
>
> On Thu, Aug 4, 2016 at 3:07 PM, Jamie Grier 
> wrote:
>
>> Use *env.java.opts*
>>
>> This will be respected by the YARN client.
>>
>>
>>
>> On Thu, Aug 4, 2016 at 11:10 AM, Prabhu V  wrote:
>>
>>> The docs mention that
>>>
>>> env.java.opts.jobmanager
>>> env.java.opts.taskmanager
>>>
>>> parameters are available but are ignored by the yarn client, is there a
>>> way to set the jvm opts for yarn ?
>>>
>>> Thanks,
>>> Prabhu
>>>
>>> On Wed, Aug 3, 2016 at 7:03 PM, Prabhu V  wrote:
>>>
 Hi,

 Is there a way to set jvm options on the yarn application-manager and
 task-manager with flink ?

 Thanks,
 Prabhu

>>>
>>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier 
>> ja...@data-artisans.com
>>
>>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: set flink yarn jvm options

2016-08-04 Thread Prabhu V
This property i believe affects only the yarn client. I want to set jvm
opts on application-manager and task-manager containers.

Thanks,
Prabhu

On Thu, Aug 4, 2016 at 3:07 PM, Jamie Grier  wrote:

> Use *env.java.opts*
>
> This will be respected by the YARN client.
>
>
>
> On Thu, Aug 4, 2016 at 11:10 AM, Prabhu V  wrote:
>
>> The docs mention that
>>
>> env.java.opts.jobmanager
>> env.java.opts.taskmanager
>>
>> parameters are available but are ignored by the yarn client, is there a
>> way to set the jvm opts for yarn ?
>>
>> Thanks,
>> Prabhu
>>
>> On Wed, Aug 3, 2016 at 7:03 PM, Prabhu V  wrote:
>>
>>> Hi,
>>>
>>> Is there a way to set jvm options on the yarn application-manager and
>>> task-manager with flink ?
>>>
>>> Thanks,
>>> Prabhu
>>>
>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com
>
>


Re: set flink yarn jvm options

2016-08-04 Thread Jamie Grier
Use *env.java.opts*

This will be respected by the YARN client.



On Thu, Aug 4, 2016 at 11:10 AM, Prabhu V  wrote:

> The docs mention that
>
> env.java.opts.jobmanager
> env.java.opts.taskmanager
>
> parameters are available but are ignored by the yarn client, is there a
> way to set the jvm opts for yarn ?
>
> Thanks,
> Prabhu
>
> On Wed, Aug 3, 2016 at 7:03 PM, Prabhu V  wrote:
>
>> Hi,
>>
>> Is there a way to set jvm options on the yarn application-manager and
>> task-manager with flink ?
>>
>> Thanks,
>> Prabhu
>>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: set flink yarn jvm options

2016-08-04 Thread Prabhu V
The docs mention that

env.java.opts.jobmanager
env.java.opts.taskmanager

parameters are available but are ignored by the yarn client, is there a way
to set the jvm opts for yarn ?

Thanks,
Prabhu

On Wed, Aug 3, 2016 at 7:03 PM, Prabhu V  wrote:

> Hi,
>
> Is there a way to set jvm options on the yarn application-manager and
> task-manager with flink ?
>
> Thanks,
> Prabhu
>


RE: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-04 Thread Alam, Zeeshan
Hi Stephan,

My AvroDeserializationSchema worked fine with a different Kafka topic, it seems 
like the previous Kafka topic was having heterogeneous data with both AVRO and 
JSON formatted data. Thanks for your time ☺.

Thanks & Regards
Zeeshan Alam

From: Stephan Ewen [mailto:se...@apache.org]
Sent: Thursday, August 04, 2016 6:00 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using 
flink.

Hi!

To read data from Kafka, you need a DeserializationSchema. You could create one 
that wraps the AvroInputFormat, but an AvroDeserializationSchema would simply 
be an adjustment of the AvroInputFormat to the interface of the 
DeserializationSchema.

In your Avro DeserializationSchema, you can probably create the Avro readers 
internally with an Avro schema (I believe).

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan 
> wrote:
Hi Stephan,

I went through one of the old mail thread 
http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E


Here it is mentioned that  When reading from Kafka you are expected to define a 
DeserializationSchema. There is no out of the box (de)serializer for Flink with 
Kafka, but it should be not very hard to add.



I have some questions:



1.   As per FLINK-3691  you are adding GenericDatumReader, so I suppose I 
need to use it instead of DatumReader in my  DeserializationSchema which is 
required to read data from Kafka?



2.  What is the recommended way to read AVRO binary data from Kafka if I  have 
the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
approach?



3.   Can AvroInputFormat be used to read Kafka data or 
DeserializationSchema is a must to read data from Kafka, also AvroInputFormat 
doesn’t have any javaDoc with it.





Thanks & Regards,
Zeeshan Alam




From: Stephan Ewen [mailto:se...@apache.org]
Sent: Tuesday, August 02, 2016 7:52 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using 
flink.

Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E

You could try and use the latest release candidate to get the 
fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan 
> wrote:
Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", 
"dojo3x:9092,dojox:9092,dojox:9092");
  properties.setProperty("zookeeper.connect", 
"dojo3x:2181,dojox:2181,dojox:2181");
  properties.setProperty("group.id", 
"Zeeshantest");
  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
  FlinkKafkaConsumer08 kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
   }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema implements DeserializationSchema {

   private static final long serialVersionUID = 4330538776656642778L;

   private final Class avroType;
   private transient DatumReader reader;
   private transient BinaryDecoder decoder;

   public AvroDeserializationSchema(Class avroType) {
  this.avroType = avroType;
   }

   @Override
   public T deserialize(byte[] message) {
  ensureInitialized();
  try {
 decoder = 

Re: Generate timestamps in front of event for event time windows

2016-08-04 Thread Jason Brelloch
Thanks Aljoscha,

Looking forward to the 1.1. release.  I managed to solve my problem using
this example code:

https://bitbucket.org/snippets/vstoyak/o9Rqp
(courtesy of Vladimir Stoyak)

I had to create a custom window and window assigner.  Hopefully that will
help someone else.

On Wed, Aug 3, 2016 at 8:35 PM, Aljoscha Krettek 
wrote:

> Hi,
> a watermark cannot be sent before the element that makes you send that
> watermark. A watermark of time T tells the system that no element will
> arrive in the future with timestamp T or less, thus you cannot send it
> before. It seems that what you are trying to achieve can be solved by using
> session windows, which will be part of the upcoming 1.1 release:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows
>
> Cheer,
> Aljoscha
>
> On Wed, 3 Aug 2016 at 12:19 Jason Brelloch  wrote:
>
>> A little more info.  Here is a simplified version of my
>> trigger: (windowConfiguration.timespan is the duration of the window)
>>
>> class CustomTrigger extends Trigger[QualifiedEvent, Window] {
>>
>>   val stateTimeDescr = new
>> ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0)
>>
>>   override def onElement(event: QualifiedEvent, timestamp: Long, W:
>> Window, ctx: TriggerContext): TriggerResult = {
>>
>> val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr)
>> val windowConfigurationState =
>> ctx.getPartitionedState(windowConfigDescr)
>> var windowConfiguration = windowConfigurationState.value()
>> if(windowConfiguration == null) {
>>   windowConfigurationState.update(event.alertConfiguration.window.get)
>>   windowConfiguration = event.alertConfiguration.window.get
>> }
>>
>> if(relevantTimestamp.value() == 0) {
>>   ctx.registerEventTimeTimer(event.event.created.toEpochMilli +
>> windowConfiguration.timespan.toMillis)
>>   relevantTimestamp.update (event.event.created.toEpochMilli +
>> windowConfiguration.timespan.toMillis)
>> }
>>
>> TriggerResult.CONTINUE
>>   }
>>
>>   override def onEventTime(timestamp: Long, W: Window, ctx:
>> TriggerContext): TriggerResult = {
>> TriggerResult.FIRE_AND_PURGE
>>   }
>>
>>   override def onProcessingTime(timestamp: Long, W: Window, ctx:
>> TriggerContext): TriggerResult = {
>> TriggerResult.CONTINUE
>>   }
>> }
>>
>> And here is the actual window execution:
>>
>> val stream = env.fromCollection(inputEvents)
>> .assignAscendingTimestamps((e: QualifiedEvent) => {
>> e.event.created.toEpochMilli })
>> .keyBy((e: QualifiedEvent) => {
>> e.alertConfiguration.alertId.toString })
>> .window(GlobalWindows.create)
>> .trigger(ConfigurableTrigger.create)
>> .apply(new GrouperFunction).name("Grouper Function")
>>
>> Oddly enough when I do this with just a basic window function it works
>> and I only get the two events I am supposed to:
>>
>> val stream = env.fromCollection(inputEvents)
>> .assignAscendingTimestamps((e: QualifiedEvent) => {
>> e.event.created.toEpochMilli })
>> .keyBy((e: QualifiedEvent) => {
>> e.alertConfiguration.alertId.toString })
>> .timeWindow(Time.minutes(5))
>> .apply(new GrouperFunction).name("Grouper Function")
>>
>>
>> On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch 
>> wrote:
>>
>>> Hey guys,
>>>
>>> I am trying to use event time along with a custom window to capture a
>>> subset of events.  The problem I am running into is that it seems that
>>> event that generates the timestamp/watermark arrives in the window before
>>> the onEventTime() call is made that closes the window.  Example:
>>>
>>> Window is supposed to capture 5 minutes of events after first event
>>> arrives
>>> Event 1: timestamp 12:01 - registers event timer for 12:06
>>> Event 2: timestamp 12:03
>>> Event 3: timestamp 12:20 - fires and purges window
>>>
>>> I get all three events in the window, instead of just the two the are
>>> really within the 5 minute window.
>>>
>>> Is there someway to force the timestamp to arrive in the window before
>>> the event that generated it?
>>>
>>> Thanks!
>>>
>>> --
>>> *Jason Brelloch* | Product Developer
>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>> 
>>> Subscribe to the BetterCloud Monitor
>>> 
>>>  -
>>> Get IT delivered to your inbox
>>>
>>
>>
>>
>> --
>> *Jason Brelloch* | Product Developer
>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>> 
>> Subscribe to the BetterCloud Monitor
>> 
>>  -
>> Get IT delivered to your inbox
>>
>


-- 
*Jason Brelloch* | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305

Re: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-04 Thread Stephan Ewen
Hi!

To read data from Kafka, you need a DeserializationSchema. You could create
one that wraps the AvroInputFormat, but an AvroDeserializationSchema would
simply be an adjustment of the AvroInputFormat to the interface of the
DeserializationSchema.

In your Avro DeserializationSchema, you can probably create the Avro
readers internally with an Avro schema (I believe).

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan  wrote:

> Hi Stephan,
>
>
>
> I went through one of the old mail thread
> http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E
>
>
>
> Here it is mentioned that  *When reading from Kafka you are expected to 
> define a DeserializationSchema. There is no out of the box (de)serializer for 
> Flink with Kafka, but it should be not very hard to add.*
>
>
>
> I have some questions:
>
>
>
> 1.   As per FLINK-3691  you are adding *GenericDatumReader*, so I suppose 
> I need to use it instead of DatumReader in my  *DeserializationSchema *which 
> is required to read data from Kafka?
>
>
>
> 2.  What is the recommended way to read AVRO binary data from Kafka if I  
> have the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
> approach?
>
>
>
> 3.   Can *AvroInputFormat* be used to read Kafka data or 
> *DeserializationSchema* is a must to read data from Kafka, also 
> *AvroInputFormat* doesn’t have any javaDoc with it.
>
>
>
>
>
>
>
> Thanks & Regards,
>
> Zeeshan Alam
>
>
>
>
>
>
>
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Tuesday, August 02, 2016 7:52 PM
> *To:* user@flink.apache.org
> *Subject:* Re: What is the recommended way to read AVRO data from Kafka
> using flink.
>
>
>
> Hi!
>
>
>
> I think this is a known limitation for Flink 1.0 and it is fixed in Flink
> 1.1
>
>
>
> Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691
>
>
>
> Here is the mail thread:
>
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E
>
>
>
> You could try and use the latest release candidate to get the fix:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html
>
>
>
> The release is also happening, so should be out in a stable release soon.
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan 
> wrote:
>
> Hi,
>
>
>
> I am using *Flink 1.0.3* and *FlinkKafkaConsumer08* to read AVRO data
> from flink. I am having the* AVRO schema file* with me which was used to
> write data in Kafka. Here
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
> you have mentioned that using the GenericData.Record type is possible with
> Flink, but not recommended. Since the record contains the full schema, its
> very data intensive and thus probably slow to use. So what is the
> recommended way to read AVRO data from Kafka using flink.
>
>
>
> *public* *static* *void* main(String[] args) *throws* Exception {
>
>   StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *getExecutionEnvironment*();
>
>   Properties properties = *new* Properties();
>
>   properties.setProperty("bootstrap.servers",
> "dojo3x:9092,dojox:9092,dojox:9092");
>
>   properties.setProperty("zookeeper.connect",
> "dojo3x:2181,dojox:2181,dojox:2181");
>
>   properties.setProperty("group.id", "Zeeshantest");
>
>   AvroDeserializationSchema avroSchema =
> *new* AvroDeserializationSchema<>(GenericData.Record.*class*);
>
>   FlinkKafkaConsumer08 kafkaConsumer =
> *new* FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
>
>   DataStream messageStream = env
> .addSource(kafkaConsumer);
>
>   messageStream.rebalance().print();
>
>   env.execute("Flink AVRO KAFKA Test");
>
>}
>
>
>
> This is the *AvroDeserializationSchema* that I am using.
>
>
>
>
>
> *public* *class* AvroDeserializationSchema *implements*
> DeserializationSchema {
>
>
>
>*private* *static* *final* *long* *serialVersionUID* =
> 4330538776656642778L;
>
>
>
>*private* *final* Class avroType;
>
>*private* *transient* DatumReader reader;
>
>*private* *transient* BinaryDecoder decoder;
>
>
>
>*public* AvroDeserializationSchema(Class avroType) {
>
>   *this*.avroType = avroType;
>
>}
>
>
>
>@Override
>
>*public* T deserialize(*byte*[] message) {
>
>   ensureInitialized();
>
>   *try* {
>
>  decoder = DecoderFactory.*get*().binaryDecoder(
> message, decoder);
>
>  *return* reader.read(*null*, decoder);
>
>   } *catch* (Exception e) {
>
>  *throw* 

Re: Regarding QueryableState

2016-08-04 Thread Vishnu Viswanath
Hi Ufuk,

I was able to create a QueryableState stream and query it when running in
cluster mode in my local machine, but I am unable to Query the stream when
running on AWS cluster, getting error AbstractMethodError at
kvState.getSerializedValue(serializedKeyAndNamespace) in
KvStateServerHandler.java while querying.

Not sure if I am doing something wrong here, so eagerly waiting for the
merge, to try it again :)

Thanks,
Vishnu

On Thu, Aug 4, 2016 at 7:40 AM, Ufuk Celebi  wrote:

> You can expect it to be merged by the end of this week.
>
> Note that the APIs are very low level at the moment though. In the PR
> branch you can have a look at the QueryableStateITCase for more
> details.
>
> – Ufuk
>
>
> On Thu, Aug 4, 2016 at 11:53 AM, vinay patil 
> wrote:
> > Hi All,
> >
> > For one use case I want to make use of this feature instead of querying
> > external store like Cassandra or ES.
> >
> > Can you please let me know if this is under development or present in the
> > master branch.
> >
> > It will be of great help if you can provide examples or link to the
> > documentation.
> >
> > Thanks,
> > Vinay Patil
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-QueryableState-tp8324.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>


Re: Regarding QueryableState

2016-08-04 Thread Ufuk Celebi
You can expect it to be merged by the end of this week.

Note that the APIs are very low level at the moment though. In the PR
branch you can have a look at the QueryableStateITCase for more
details.

– Ufuk


On Thu, Aug 4, 2016 at 11:53 AM, vinay patil  wrote:
> Hi All,
>
> For one use case I want to make use of this feature instead of querying
> external store like Cassandra or ES.
>
> Can you please let me know if this is under development or present in the
> master branch.
>
> It will be of great help if you can provide examples or link to the
> documentation.
>
> Thanks,
> Vinay Patil
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-QueryableState-tp8324.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Regarding QueryableState

2016-08-04 Thread vinay patil
Hi All,

For one use case I want to make use of this feature instead of querying
external store like Cassandra or ES.

Can you please let me know if this is under development or present in the
master branch.

It will be of great help if you can provide examples or link to the
documentation.

Thanks,
Vinay Patil



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-QueryableState-tp8324.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Kafka Consumer Behaviour

2016-08-04 Thread Stephan Ewen
Hi!

I have not used the Kafka Offset Checker before, maybe someone who worked
with that can chime in.

Greetings,
Stephan


On Wed, Aug 3, 2016 at 4:59 PM, Janardhan Reddy  wrote:

> I can see that offsets are stored in zookeeper and are not returned when i
> query through kafka offset checker.
>
> Can you please tell me how to monitor kafka flink consumer lag for 0.8
> flink kafka consumer.
>
> On Wed, Aug 3, 2016 at 3:29 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> Just check in the code. The 0.8 FlinkKafkaConsumer should always commit
>> offsets, regardless of whether checkpointing is enables. The 0.9
>> FlinkKafkaConsumer actually does not do any periodic offset committing when
>> checkpointing is disabled.
>>
>> Greetings,
>> Stephan
>>
>> On Wed, Aug 3, 2016 at 7:36 AM, Janardhan Reddy <
>> janardhan.re...@olacabs.com> wrote:
>>
>>> Checkpointing wasn't enabled in the streaming job, but the offsets
>>> should have been committed to zookeeper.
>>>
>>> But we don't see the offsets being written to zookeeper.
>>>
>>> On Tue, Aug 2, 2016 at 7:41 PM, Till Rohrmann 
>>> wrote:
>>>
 Hi Janardhan,

 Flink should commit the current offsets to Zookeeper whenever a
 checkpoint has been completed. In case that you disabled checkpointing,
 then the offsets will be periodically committed to ZooKeeper. The default
 value is 60s.

 Could it be that there wasn't yet a completed checkpoint? Which version
 of Flink are you using?

 Cheers,
 Till

 On Tue, Aug 2, 2016 at 7:26 PM, Janardhan Reddy <
 janardhan.re...@olacabs.com> wrote:

> Hi,
>
> When the run the following command i am getting that no topic is
> available for that consumer group.  i am suing
> flink-connector-kafka-0.8_${scala.version}(2.11).
>
> ./bin/kafka-consumer-groups.sh --zookeeper <> --group <> --describe
>
> No topic available for consumer group provided
>
>
> Does the kafka consumer commit offset to the broker always ? Do we
> need to enable checkpointing for the offsets to be committed ?
>
>
> Thanks
>


>>>
>>
>


Re: Container running beyond physical memory limits when processing DataStream

2016-08-04 Thread Stephan Ewen
Hi!

The JVM is allowed 1448m of memory, and the JVM should never use more heap
than that.

The fact that the process is using more than 2GB memory in total means that
some libraries are allocating memory outside the heap.

You can activate the memory logger to diagnose that:
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#memory-and-performance-debugging

Greetings,
Stephan


On Thu, Aug 4, 2016 at 3:41 AM, Jack Huang  wrote:

> Hi Max,
>
> Changing yarn-heap-cutoff-ratio works seem to suffice for the time being.
> Thanks for your help.
>
> Regards,
> Jack
>
> On Tue, Aug 2, 2016 at 11:11 AM, Jack Huang  wrote:
>
>> Hi Max,
>>
>> Is there a way to limit the JVM memory usage (something like the -Xmx
>> flag) for the task manager so that it won't go over the YARN limit but will
>> just run GC until there is memory to use? Trying to allocate "enough"
>> memory for this stream task is not ideal because I could have indefinitely
>> many messages backed-up in the source to be process.
>>
>> Thanks,
>> Jack
>>
>>
>> On Tue, Aug 2, 2016 at 5:21 AM, Maximilian Michels 
>> wrote:
>>
>>> Your job creates a lot of String objects which need to be garbage
>>> collected. It could be that the JVM is not fast enough and Yarn kills
>>> the JVM for consuming too much memory.
>>>
>>> You can try two things:
>>>
>>> 1) Give the task manager more memory
>>> 2) Increase the Yarn heap cutoff ratio (e.g yarn.heap-cutoff-ratio: 0.4)
>>>
>>> If the error still occurs then we need to investigate further.
>>>
>>> Thanks,
>>> Max
>>>
>>>
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Jul 29, 2016 at 11:19 AM, Jack Huang  wrote:
>>> >>
>>> >> Hi Max,
>>> >>
>>> >> Each events are only a few hundred bytes. I am reading from a Kafka
>>> topic
>>> >> from some offset in the past, so the events should be flowing in as
>>> fast as
>>> >> Flink can process them.
>>> >>
>>> >> The entire YARN task log, which contains both JobManager and
>>> TaskManager
>>> >> outputs, is attached.
>>> >>
>>> >> Thanks a lot,
>>> >> Jack
>>> >>
>>> >>
>>> >> On Fri, Jul 29, 2016 at 2:04 AM, Maximilian Michels 
>>> >> wrote:
>>> >>>
>>> >>> Hi Jack,
>>> >>>
>>> >>> Considering the type of job you're running, you shouldn't run out of
>>> >>> memory. Could it be that the events are quite large strings? It could
>>> >>> be that the TextOutputFormat doesn't write to disk fast enough and
>>> >>> accumulates memory. Actually, it doesn't perform regular flushing
>>> >>> which could be an issue.
>>> >>>
>>> >>> I'm just guessing, we need to investigate further. Could you please
>>> >>> supply the entire JobManager log file output?
>>> >>>
>>> >>> Thanks,
>>> >>> Max
>>> >>>
>>> >>> On Fri, Jul 29, 2016 at 12:59 AM, Jack Huang 
>>> wrote:
>>> >>> > Hi all,
>>> >>> >
>>> >>> > I am running a test Flink streaming task under YARN. It reads
>>> messages
>>> >>> > from
>>> >>> > a Kafka topic and writes them to local file system.
>>> >>> >
>>> >>> > object PricerEvent {
>>> >>> > def main(args:Array[String]) {
>>> >>> > val kafkaProp = new Properties()
>>> >>> > kafkaProp.setProperty("bootstrap.servers",
>>> "localhost:6667")
>>> >>> > kafkaProp.setProperty("auto.offset.reset", "earliest")
>>> >>> >
>>> >>> > val env =
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>> >>> > env.setStateBackend(new MemoryStateBackend)
>>> >>> >
>>> >>> > val wins = env.addSource(new
>>> >>> > FlinkKafkaConsumer09[String]("events",
>>> >>> > new SimpleStringSchema, kafkaProp))
>>> >>> > wins.writeAsText("/home/user/flink_out/" + new
>>> >>> > SimpleDateFormat("-MM-dd_HH-mm-ss").format(new Date))
>>> >>> >
>>> >>> > env.execute
>>> >>> > }
>>> >>> > }
>>> >>> >
>>> >>> > With the command
>>> >>> >
>>> >>> > flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent
>>> >>> > /home/user/flink-example/build/libs/flink-example-1.0-all.jar
>>> >>> >
>>> >>> >
>>> >>> > The task runs fine for a moment and then terminates. I looked into
>>> the
>>> >>> > error
>>> >>> > log and found following out-of-memory error message:
>>> >>> >
>>> >>> > 2016-07-28 22:34:40,397 INFO  org.apache.flink.yarn.YarnJobManager
>>> >>> > - Container container_e05_1467433388200_0136_01_02 is completed
>>> >>> > with
>>> >>> > diagnostics: Container
>>> >>> > [pid=5832,containerID=container_e05_1467433388200_0136_01_02]
>>> is
>>> >>> > running
>>> >>> > beyond physical memory limits. Current usage: 2.3 GB of 2 GB
>>> physical
>>> >>> > memory
>>> >>> > used; 6.1 GB of 4.2 GB virtual memory used. Killing container.
>>> >>> > Dump of the process-tree for
>>> container_e05_1467433388200_0136_01_02
>>> >>> > :
>>> >>> > |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>>> >>> > SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
>>> FULL_CMD_LINE
>>> >>> > |- 5838 5832 5832 5832 (java) 

Re: Parsing source JSON String as Scala Case Class

2016-08-04 Thread Stephan Ewen
If the class has non-serializable members, you need to initialize them
"lazily" when the objects are already in the distributed execution (after
serializing / distributing them).

Making a Scala 'val' a 'lazy val' often does the trick (at minimal
performance cost).

On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang  wrote:

> Hi all,
>
> I want to read a source of JSON String as Scala Case Class. I don't want
> to have to write a serde for every case class I have. The idea is:
>
> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new 
> JsonSerde(classOf[Event]), kafkaProp))
>
> ​
>
> I was implementing my own JsonSerde with Jackson/Gson, but in both case I
> get the error
>
> Task not serializable
> 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
> 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
> 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
> com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)
>
> ​
> It seems that both Jackson and Gson have classes that is not serializable.
>
> I couldn't find any other solution to perform this JSON-to-Case-Class
> parsing, yet it seems a very basic need. What am I missing?
>
>
> Thanks,
> Jack
>
>
>
>


Re: Parallel execution on AllWindows

2016-08-04 Thread Andrew Ge Wu
Thanks for the quick response, everything is clear!

cheers!

Andrew
> On 03 Aug 2016, at 18:11, Aljoscha Krettek  wrote:
> 
> Hi,
> "rebalance" simply specifies the strategy to use when sending elements 
> downstream to the next operator(s). There is no interaction or competition 
> between the parallel window operator instances. Each will do windowing 
> locally based on the elements that it receives from upstream.
> 
> Cheers,
> Aljoscha
> 
> On Wed, 3 Aug 2016 at 08:26  > wrote:
> Hi Aljoscha
> 
> Thanks for the explanation.
> One other thing, when you say there is no coordination is that means 
> rebalance() will not be honored, and each window operator instance will 
> compete for the next available window?
> 
> Thanks
> 
> Andrew
> From mobile
> 
> From: Aljoscha Krettek
> Sent: Wednesday, August 3, 17:11
> Subject: Re: Parallel execution on AllWindows
> To: user@flink.apache.org 
> Hi,
> 
> if you manually force a parallelism different from 1 after a *windowAll() 
> then you will get parallel execution of your window. For example, if you do 
> this:
> 
> input.countWindowAll(100).setParallelism(5)
> 
> then you will get five parallel instances of the window operator that each 
> wait for 100 elements before they fire the window. There is no global 
> coordination between the parallel instances that would allow it to fire once 
> 100 elements are received across the parallel instances.
> 
> Cheers,
> 
> Aljoscha
> 
> On Wed, 3 Aug 2016 at 05:10 Andrew Ge Wu  > wrote:
> 
>> Hi,
>> 
>> I have such task that I want to count window on a stream and execute them 
>> batch by batch.
>> 
>> Execute a count window may take some time, so I want it to be executed in 
>> parallel.
>> 
>> I read this part in the documentation when I found it automatically reduced 
>> parallelization to 1
>> 
>> * Note: This operation can be inherently non-parallel since all elements 
>> have to pass through
>> * the same operator instance. (Only for special cases, such as aligned time 
>> windows is
>> * it possible to perform this operation in parallel).
>> 
>> (It looks like the java doc is copied from timeWindowAll)
>> 
>> If I force all window function to run in parallel, what will happen?
>> 
>> Will a time/count window broadcast to all instances of the function? or will 
>> it be send to one of the instance so I can parallelize my work?
>> 
>> 
>> Thanks!
>> 
>> 
>> 
>> Andrew
>> 
>> Confidentiality Notice: This e-mail transmission may contain confidential or 
>> legally privileged information that is intended only for the individual or 
>> entity named in the e-mail address. If you are not the intended recipient, 
>> you are hereby notified that any disclosure, copying, distribution, or 
>> reliance upon the contents of this e-mail is strictly prohibited and may be 
>> unlawful. If you have received this e-mail in error, please notify the 
>> sender immediately by return e-mail and delete all copies of this message.
> 
> 
> Confidentiality Notice: This e-mail transmission may contain confidential or 
> legally privileged information that is intended only for the individual or 
> entity named in the e-mail address. If you are not the intended recipient, 
> you are hereby notified that any disclosure, copying, distribution, or 
> reliance upon the contents of this e-mail is strictly prohibited and may be 
> unlawful. If you have received this e-mail in error, please notify the sender 
> immediately by return e-mail and delete all copies of this message.


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: How to avoid path conflict in zookeeper/HDFS

2016-08-04 Thread Ufuk Celebi
Hey Hironori,

the storage directories (recovery.zookeeper.storageDir,
state.backend.fs.checkpointdir) can stay the same I think (either
random or jobID-specific sub folders should be created there). The
ZooKeeper root path (recovery.zookeeper.path.root) needs to be unique
per cluster for HA.

If you upgrade to the to be released 1.1 (vote just passed, binaries
are being uploaded) this will be set automatically for YARN. You can
also specify it via the new CLI parameter -z  (this sets
recovery.zookeeper.path.root).

Hope this helps.

Ufuk

On Thu, Aug 4, 2016 at 2:53 AM, Hironori Ogibayashi
 wrote:
> Hello,
>
> I have a question about Zookeeper or HDFS paths in case of running
> Flink on YARN.
>
> In my understanding, when I run multiple Flink cluster using the same
> zookeeper/HDFS, I have to specify different paths, as e.g.
> state.backend.fs.checkpointdir,
> recovery.zookeeper.path.root etc.
>
> If I run multiple Flink cluster job on YARN, and want to use checkpoint or
> JobManager HA, do I need to specify different paths for each cluster/job? or
> does YARN handle this nicely?
>
> Regards,
> Hironori Ogibayashi