Re: convert Json to Tuple

2016-04-25 Thread Alexander Smirnov
Thanks Timur!

I should have mentioned, I need it for Java

On Mon, Apr 25, 2016 at 10:13 PM Timur Fayruzov 
wrote:

> Normally, Json4s or Jackson+scala plugin work well for json to scala data
> structure conversions. However, I would not expect they support a special
> case for tuples, since JSON key-value fields would normally convert to case
> classes and JSON arrays are converted to, well, arrays. That's being said,
> StackOverflow to the rescue:
> http://stackoverflow.com/questions/31909308/is-it-possible-to-parse-json-array-into-a-tuple-with-json4s
>
> I didn't try the approach myself, though.
>
> On Mon, Apr 25, 2016 at 6:50 PM, Alexander Smirnov <
> alexander.smirn...@gmail.com> wrote:
>
>> Hello everybody!
>>
>> my RMQSource function receives string with JSONs in it.
>> Because many operations in Flink rely on Tuple operations, I think it is
>> a good idea to convert JSON to Tuple.
>>
>> I believe this task has been solved already :)
>> what's the common approach for this conversion?
>>
>> Thank you,
>> Alex
>>
>
>


Re: convert Json to Tuple

2016-04-25 Thread Timur Fayruzov
Normally, Json4s or Jackson+scala plugin work well for json to scala data
structure conversions. However, I would not expect they support a special
case for tuples, since JSON key-value fields would normally convert to case
classes and JSON arrays are converted to, well, arrays. That's being said,
StackOverflow to the rescue:
http://stackoverflow.com/questions/31909308/is-it-possible-to-parse-json-array-into-a-tuple-with-json4s

I didn't try the approach myself, though.

On Mon, Apr 25, 2016 at 6:50 PM, Alexander Smirnov <
alexander.smirn...@gmail.com> wrote:

> Hello everybody!
>
> my RMQSource function receives string with JSONs in it.
> Because many operations in Flink rely on Tuple operations, I think it is a
> good idea to convert JSON to Tuple.
>
> I believe this task has been solved already :)
> what's the common approach for this conversion?
>
> Thank you,
> Alex
>


convert Json to Tuple

2016-04-25 Thread Alexander Smirnov
Hello everybody!

my RMQSource function receives string with JSONs in it.
Because many operations in Flink rely on Tuple operations, I think it is a
good idea to convert JSON to Tuple.

I believe this task has been solved already :)
what's the common approach for this conversion?

Thank you,
Alex


Re: Flink program without a line of code

2016-04-25 Thread Alexander Smirnov
thank you so much for the responses, guys!

On Sat, Apr 23, 2016 at 12:09 AM Flavio Pompermaier 
wrote:

> Hi Alexander,
> since I was looking for something similar some days ago here is what I
> know about this argument:
> during the Stratosphere project there was Meteor and Supremo allowing that
> [1] but then it was dismissed in favour of Pig integration that I don't
> wheter it was ever completed. Yiu could give a try to Piglet project[2]
> that allows to use PIG with Spark and Flink but I don't know how well it
> works (Flink integration is also very recent and not documented anywhere).
>
> Best,
> Flavio
>
> [1] http://stratosphere.eu/assets/papers/Sopremo_Meteor%20BigData.pdf
> [2] https://github.com/ksattler/piglet
> On 23 Apr 2016 07:48, "Aljoscha Krettek"  wrote:
>
> Hi,
> I think if the Table API/SQL API evolves enough it should be able to
> supply a Flink program as just an SQL query with source/sink definitions.
> Hopefully, in the future. :-)
>
> Cheers,
> Aljoscha
>
> On Fri, 22 Apr 2016 at 23:10 Fabian Hueske  wrote:
>
>> Hi Alex,
>>
>> welcome to the Flink community!
>> Right now, there is no way to specify a Flink program without writing
>> code (Java, Scala, Python(beta)).
>>
>> In principle it is possible to put such functionality on top of the
>> DataStream or DataSet APIs.
>> This has been done before for other programming APIs (Flink's own
>> libraries Table API, Gelly, FlinkML, and externals Apache Beam / Google
>> DataFlow, Mahout, Cascading, ...). However, all of these are again
>> programming APIs, some specialized for certain use-cases.
>>
>> Specifying Flink programs by config files (or graphically) would require
>> a data model, a DataStream/DataSet program generator and probably a code
>> generation component.
>>
>> Best, Fabian
>>
>> 2016-04-22 18:41 GMT+02:00 Alexander Smirnov <
>> alexander.smirn...@gmail.com>:
>>
>>> Hi guys!
>>>
>>> I’m new to Flink, and actually to this mailing list as well :) this is
>>> my first message.
>>> I’m still reading the documentation and I would say Flink is an amazing
>>> system!! Thanks everybody who participated in the development!
>>>
>>> The information I didn’t find in the documentation - if it is possible
>>> to describe data(stream) transformation without any code (Java/Scala).
>>> I mean if it is possible to describe datasource functions, all of the
>>> operators, connections between them, and sinks in a plain text
>>> configuration file and then feed it to Flink.
>>> In this case it would be possible to change data flow without
>>> recompilation/redeployment.
>>>
>>> Is there a similar functionality in Flink? May be some third party
>>> plugin?
>>>
>>> Thank you,
>>> Alex
>>
>>
>>


Job hangs

2016-04-25 Thread Timur Fayruzov
Hello,

Now I'm at the stage where my job seem to completely hang. Source code is
attached (it won't compile but I think gives a very good idea of what
happens). Unfortunately I can't provide the datasets. Most of them are
about 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
memory for each.

It was working for smaller input sizes. Any idea on what I can do
differently is appreciated.

Thans,
Timur


FaithResolution.scala
Description: Binary data


Re: Join DataStream with dimension tables?

2016-04-25 Thread Srikanth
Aljoscha,

Looks like a potential solution. Feels a bit hacky though.

Didn't quite understand why a list backed store is used to for static input
buffer? Join(inner) should emit only one record if there is a key match.

Is it a property of the system to emit Long.MAX_VALUE watermark when a
finite stream source ends?
If so can I do something like this to read static file in parallel?
val meta = env.readTextFile("S3:///path/to/file").map(...).keyBy(...)

Shouldn't we also override checkpoint handling of custom operator? If so,
should the checkpoint wait/fail during the initial read phase?

Lohith,
Adding a component like Cassandra just for this feels like a overkill. But
if I can't find a suitable way to do this, I might use it( or Redis
probably).

Srikanth



On Fri, Apr 22, 2016 at 12:20 PM, Lohith Samaga M  wrote:

> Hi,
> Cassandra could be used as a distributed cache.
>
> Lohith.
>
> Sent from my Sony Xperia™ smartphone
>
>
>  Aljoscha Krettek wrote 
>
>
> Hi Srikanth,
> that's an interesting use case. It's not possible to do something like
> this out-of-box but I'm actually working on API for such cases.
>
> In the mean time, I programmed a short example that shows how something
> like this can be programmed using the API that is currently available. It
> requires writing a custom operator but it is still somewhat succinct:
> https://gist.github.com/aljoscha/c657b98b4017282693a67f1238c88906
>
> Please let me know if you have any questions.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 03:06 Srikanth  wrote:
>
>> Hello,
>>
>> I have a fairly typical streaming use case but not able to figure how to
>> implement it best in Flink.
>> I want to join records read from a kafka stream with one(or more)
>> dimension tables which are saved as flat files.
>>
>> As per this jira  its
>> not possible to join DataStream with DataSet.
>> These tables are too big to do a collect() and join.
>>
>> It will be good to read these files during startup, do a partitionByHash
>> and keep it cached.
>> On the DataStream may be do a keyBy and join.
>> Is something like this possible?
>>
>> Srikanth
>>
>
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>


Re: Submit Flink Jobs to YARN running on AWS

2016-04-25 Thread Bajaj, Abhinav
Hi Fabian,

Thanks for your reply and the pointers to documentation.

In these steps, I think the Flink client is installed on the master node, 
referring to steps mentioned in Flink docs 
here.
However, the scenario I have is to run the client on my local machine and 
submit jobs remotely to the YARN Cluster (running on EMR or independently).

Let me describe in more detail here.
I am trying to submit a single Flink Job to YARN using the client, running on 
my dev machine -

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
./examples/batch/WordCount.jar

In my understanding, YARN (running in AWS) allocates a container for the 
Jobmanager.
Jobmanager discovers the IP and started the Actor system. At this step the IP 
it uses is the internal IP address, of the EC2 instance.

The client, running on my dev machine, is not able to connect to the Jobmanager 
for reasons explained in my mail below.

Is there a way, where I can set Jobmanager to use the hostname and not the IP 
address?

Or any other suggestions?

Thanks,
Abhi

[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Fabian Hueske >
Reply-To: "user@flink.apache.org" 
>
Date: Wednesday, March 9, 2016 at 12:51 AM
To: "user@flink.apache.org" 
>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I have used Flink on EMR via YARN a couple of times without problems.
I started a Flink YARN session like this:

./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

This will start five YARN containers (1 JobManager with 1024MB, 4 Taskmanagers 
with 4096MB). See more config options in the documentation [1].
In one of the last lines of the std-out output you should find a line that 
tells you the IP and port of the JobManager.

With the IP and port, you can submit a job as follows:

./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar 

This will send the job to the JobManager specified by IP and port and execute 
the program with a parallelism of 4. See more config options in the 
documentation [2].

If this does not help, could you share the exact command that you use to start 
the YARN session and submit the job?

Best, Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html

2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav 
>:
Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client 
running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not 
able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
 - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
 - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - Start application client.
YARN cluster started
JobManager web interface address 
http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient  
 - Notification about new leader address 
akka.tcp://flink@54.35.41.12:41292/user/jobmanager 
with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient  
 - Received address of new leader 
akka.tcp://flink@54.35.41.12:41292/user/jobmanager 
with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient  
 - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient  
 - Trying to register at JobManager 
akka.tcp://flink@54.35.41.12:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR 

Re: Getting java.lang.Exception when try to fetch data from Kafka

2016-04-25 Thread prateekarora
Hi

I have java program to send data into kafka topic. below is code for this :

private Producer producer = null

Serializer keySerializer = new StringSerializer();
Serializer valueSerializer = new ByteArraySerializer();
producer = new KafkaProducer(props, keySerializer,
valueSerializer);

ProducerRecord imageRecord;
imageRecord = new ProducerRecord(streamInfo.topic,
Integer.toString(messageKey), imageData);

producer.send(imageRecord);


then trying to fetch data in Apache flink .

Regards
Prateek




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6418.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: AvroWriter for Rolling sink

2016-04-25 Thread Igor Berman
Hi,
it's not a problem, I'll find time to change it(I understand the
refactoring is in master and not released yet).
Wanted to ask if it's acceptable to add following dependency to flink?
I mean my code reused code in this jar(pay attention it's not present
currently in flink classpath)

org.apache.avro
avro-mapred
1.7.6
hadoop2


On 25 April 2016 at 16:20, Aljoscha Krettek  wrote:

> Hi,
> the code looks very good! Do you think it can be adapted to the slightly
> modified interface introduced here:
> https://issues.apache.org/jira/browse/FLINK-3637
>
> It basically requires the writer to know the write position, so that we
> can truncate to a valid position in case of failure.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 18:40 Igor Berman  wrote:
>
>> ok,
>> I have working prototype already, if somebody is interested(attached)
>>
>> I might add it as PR latter(with tests etc)
>>
>> tested locally & with s3
>>
>>
>>
>>
>>
>>
>>
>> On 21 April 2016 at 12:01, Aljoscha Krettek  wrote:
>>
>>> Hi,
>>> as far as I know there is no one working on this. I'm only aware of
>>> someone working on an ORC (from Hive) Writer.
>>>
>>> This would be a welcome addition! I think you are already on the right
>>> track, the only thing required will probably be an AvroFileWriter and you
>>> already started looking at SequenceFileWriter, which should be similar.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:
>>>
 Hi All,
 Is there such implementation somewhere?(before I start to implement it
 myself, it seems not too difficult based on SequenceFileWriter example)

 anyway any ideas/pointers will be highly appreciated

 thanks in advance


>>


Re: YARN terminating TaskNode

2016-04-25 Thread Timur Fayruzov
Great answer, thanks you Max for a very detailed explanation! Illuminating
how off-heap parameter affects the memory allocation.

I read this post:
https://blogs.oracle.com/jrockit/entry/why_is_my_jvm_process_larger_t

and the thing that jumped on me is the allocation of memory for jni libs. I
do use a native library in my application, which is likely the culprit. I
need to account for its memory footprint when doing my memory calculations.

Thanks,
Timur


On Mon, Apr 25, 2016 at 10:28 AM, Maximilian Michels  wrote:

> Hi Timur,
>
> Shedding some light on the memory calculation:
>
> You have a total memory size of 2500 MB for each TaskManager. The
> default for 'taskmanager.memory.fraction' is 0.7. This is the fraction
> of the memory used by the memory manager. When you have turned on
> off-heap memory, this memory is allocated off-heap. As you pointed
> out, the default Yarn cutoff ratio is 0.25.
>
> Memory cutoff for Yarn: 2500 * 0.25 MB = 625 MB
>
> Java heap size with off-heap disabled: 2500 MB - 625 MB = 1875 MB
>
> Java heap size with off-heap enabled: (2500 MB - 625 MB) * 0.3 = 562,5
> MB (~570 MB in your case)
> Off-heap memory size: (2500 MB - 625 MB) * 0.7 = 1312,5 MB
>
> The heap memory limits in your log seem to be calculated correctly.
> Note that we don't set a strict limit for the off-heap memory because
> the Flink memory manager controls the amount of memory allocated. It
> will preallocate memory when you have 'taskmanager.memory.preallocate'
> set to true. Otherwise it will allocate dynamically. Still, you should
> have about 500 MB memory left with everything allocated. There is some
> more direct (off-heap) memory allocated for the network stack
> adjustable with 'taskmanager.network.numberOfBuffers' which is set to
> 2048 by default and corresponds to 2048 * 32 KB = 64 MB memory. I
> believe this can grow up to twice of that size. Still, should be
> enough memory left.
>
> Are you running a streaming or batch job? Off-heap memory and memory
> preallocation are mostly beneficial for batch jobs which use the
> memory manager a lot for sorting, hashing and caching.
>
> For streaming I'd suggest to use Flink's defaults:
>
> taskmanager.memory.off-heap: false
> taskmanager.memory.preallocate: false
>
> Raising the cutoff ratio should prevent killing of the TaskManagers.
> As Robert mentioned, in practice the JVM tends to allocate more than
> the maximum specified heap size. You can put the following in your
> flink-conf.yaml:
>
> # slightly raise the cut off ratio (might need to be even higher)
> yarn.heap-cutoff-ratio: 0.3
>
> Thanks,
> Max
>
> On Mon, Apr 25, 2016 at 5:52 PM, Timur Fayruzov
>  wrote:
> > Hello Maximilian,
> >
> > I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running
> this on
> > EMR. I didn't see any exceptions in other logs. What are the logs you are
> > interested in?
> >
> > Thanks,
> > Timur
> >
> > On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels 
> wrote:
> >>
> >> Hi Timur,
> >>
> >> Which version of Flink are you using? Could you share the entire logs?
> >>
> >> Thanks,
> >> Max
> >>
> >> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger 
> >> wrote:
> >> > Hi Timur,
> >> >
> >> > The reason why we only allocate 570mb for the heap is because you are
> >> > allocating most of the memory as off heap (direct byte buffers).
> >> >
> >> > In theory, the memory footprint of the JVM is limited to 570 (heap) +
> >> > 1900
> >> > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM
> >> > is
> >> > allocating more memory, causing these killings by YARN.
> >> >
> >> > I have to check the code of Flink again, because I would expect the
> >> > safety
> >> > boundary to be much larger than 30 mb.
> >> >
> >> > Regards,
> >> > Robert
> >> >
> >> >
> >> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov
> >> > 
> >> > wrote:
> >> >>
> >> >> Hello,
> >> >>
> >> >> Next issue in a string of things I'm solving is that my application
> >> >> fails
> >> >> with the message 'Connection unexpectedly closed by remote task
> >> >> manager'.
> >> >>
> >> >> Yarn log shows the following:
> >> >>
> >> >> Container
> [pid=4102,containerID=container_1461341357870_0004_01_15]
> >> >> is
> >> >> running beyond physical memory limits. Current usage: 2.5 GB of 2.5
> GB
> >> >> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing
> >> >> container.
> >> >> Dump of the process-tree for container_1461341357870_0004_01_15 :
> >> >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> >> >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
> FULL_CMD_LINE
> >> >> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c
> >> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
> >> >> -XX:MaxDirectMemorySize=1900m
> >> >>
> >> >>
> 

Re: Getting java.lang.Exception when try to fetch data from Kafka

2016-04-25 Thread prateekarora
Hi

I have java program  that sending data into kafka topic using kafa client
API (0.8.2)

here is sample to code using to send data in kafka topic :

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

ProducerRecord imageRecord;
imageRecord = new ProducerRecord(streamInfo.topic,
Integer.toString(messageKey), imageData);

producer.send(imageRecord);


Regrads
Prateek



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6415.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Access to a shared resource within a mapper

2016-04-25 Thread Timur Fayruzov
Hi Fabian,

I didn't realize you meant that lazy val should be inside RichMapFunction
implementation, it makes sense. That's what I ended up doing already.

Thanks!
Timur

On Mon, Apr 25, 2016 at 3:34 AM, Fabian Hueske  wrote:

> Hi Timur,
>
> a TaskManager may run as many subtasks of a Map operator as it has slots.
> Each subtask of an operator runs in a different thread. Each parallel
> subtask of a Map operator has its own MapFunction object, so it should be
> possible to use a lazy val.
>
> However, you should not use static variables to hold state, because these
> are shared between all MapFunction in a TaskManager (JVM).
>
> 2016-04-22 21:21 GMT+02:00 Timur Fayruzov :
>
>> Actually, a follow-up question: is map function single-threaded (within
>> one task manager, that is). If it's not then lazy initialization wont'
>> work, is it right?
>>
>> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen  wrote:
>>
>>> You may also be able to initialize the client only in the parallel
>>> execution by making it a "lazy" variable in Scala.
>>>
>>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <
>>> timur.fairu...@gmail.com> wrote:
>>>
 Outstanding! Thanks, Aljoscha.

 On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek 
 wrote:

> Hi,
> you could use a RichMapFunction that has an open method:
>
> data.map(new RichMapFunction[...]() {
>   def open(): () = {
> // initialize client
>   }
>
>   def map(input: INT): OUT = {
> // use client
>   }
> }
>
> the open() method is called before any elements are passed to the
> function. The counterpart of open() is close(), which is called after all
> elements are through or if the job cancels.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov 
> wrote:
>
>> Hello,
>>
>> I'm writing a Scala Flink application. I have a standalone process
>> that exists on every Flink node that I need to call to transform my data.
>> To access this process I need to initialize non thread-safe client 
>> first. I
>> would like to avoid initializing a client for each element being
>> transformed. A straightforward implementation would be something like 
>> this:
>> ```
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"
>> val pool  = new ArrayBlockingQueue[Client](5)
>> // pool is filled here
>> data.map(e => {
>>   val client = pool.take()
>>   val res = client.transform(e)
>>   pool.put(client)
>>   res
>> })
>>
>> ```
>> However, this causes a runtime exception with message "Task not
>> serializable", which makes sense.
>>
>> Function parameters and broadcast variables won't work either as far
>> as I understand. Is there a way to make this happen?
>>
>> Thanks,
>> Timur
>>
>

>>>
>>
>


Re: YARN terminating TaskNode

2016-04-25 Thread Timur Fayruzov
Hello Maximilian,

I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running this
on EMR. I didn't see any exceptions in other logs. What are the logs you
are interested in?

Thanks,
Timur

On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels  wrote:

> Hi Timur,
>
> Which version of Flink are you using? Could you share the entire logs?
>
> Thanks,
> Max
>
> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger 
> wrote:
> > Hi Timur,
> >
> > The reason why we only allocate 570mb for the heap is because you are
> > allocating most of the memory as off heap (direct byte buffers).
> >
> > In theory, the memory footprint of the JVM is limited to 570 (heap) +
> 1900
> > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM is
> > allocating more memory, causing these killings by YARN.
> >
> > I have to check the code of Flink again, because I would expect the
> safety
> > boundary to be much larger than 30 mb.
> >
> > Regards,
> > Robert
> >
> >
> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov <
> timur.fairu...@gmail.com>
> > wrote:
> >>
> >> Hello,
> >>
> >> Next issue in a string of things I'm solving is that my application
> fails
> >> with the message 'Connection unexpectedly closed by remote task
> manager'.
> >>
> >> Yarn log shows the following:
> >>
> >> Container [pid=4102,containerID=container_1461341357870_0004_01_15]
> is
> >> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
> >> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing
> >> container.
> >> Dump of the process-tree for container_1461341357870_0004_01_15 :
> >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> >> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c
> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
> >> -XX:MaxDirectMemorySize=1900m
> >>
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
> >> -Dlogback.configurationFile=file:logback.xml
> >> -Dlog4j.configuration=file:log4j.properties
> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
> >>
> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.out
> >> 2>
> >>
> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.err
> >> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460
> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
> >> -XX:MaxDirectMemorySize=1900m
> >>
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
> >> -Dlogback.configurationFile=file:logback.xml
> >> -Dlog4j.configuration=file:log4j.properties
> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir .
> >>
> >> One thing that drew my attention is `-Xmx570m`. I expected it to be
> >> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the
> >> application as follows:
> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm
> >> 4096 -ytm 2500 eval-assembly-1.0.jar
> >>
> >> In flink logs I do see 'Task Manager memory: 2500'. When I look at the
> >> yarn container logs on the cluster node I see that it starts with 570mb,
> >> which puzzles me. When I look at the actually allocated memory for a
> Yarn
> >> container using 'top' I see 2.2GB used. Am I interpreting these
> parameters
> >> correctly?
> >>
> >> I also have set (it failed in the same way without this as well):
> >> taskmanager.memory.off-heap: true
> >>
> >> Also, I don't understand why this happens at all. I assumed that Flink
> >> won't overcommit allocated resources and will spill to the disk when
> running
> >> out of heap memory. Appreciate if someone can shed light on this too.
> >>
> >> Thanks,
> >> Timur
> >
> >
>


Re: General Data questions - streams vs batch

2016-04-25 Thread Aljoscha Krettek
Hi,
I'll try and answer your questions separately. First, a general remark,
although Flink has the DataSet API for batch processing and the DataStream
API for stream processing we only have one underlying streaming execution
engine that is used for both. Now, regarding the questions:

1) What do you mean by "parallel into 2 streams"? Maybe that could
influence my answer but I'll just give a general answer: Flink does not
give any guarantees about the ordering of elements in a Stream or in a
DataSet. This means that merging or unioning two streams/data sets will
just mean that operations see all elements in the two merged streams but
the order in which we see them is arbitrary. This means that we don't keep
buffers based on time or size or anything.

2) The elements that flow through the topology are not tracked
individually, each operation just receives elements, updates state and
sends elements to downstream operation. In essence this means that elements
themselves don't block any resources except if they alter some kept state
in operations. If you have a stateless pipeline that only has
filters/maps/flatMaps then the amount of required resources is very low.

For a finite data set, elements are also streamed through the topology.
Only if you use operations that require grouping or sorting (such as
groupBy/reduce and join) will elements be buffered in memory or on disk
before they are processed.

Two answer your last question. If you only do stateless
transformations/filters then you are fine to use either API and the
performance should be similar.

Cheers,
Aljoscha

On Sun, 24 Apr 2016 at 15:54 Konstantin Kulagin  wrote:

> Hi guys,
>
> I have some kind of general question in order to get more understanding of
> stream vs final data transformation. More specific - I am trying to
> understand 'entities' lifecycle during processing.
>
> 1) For example in a case of streams: suppose we start with some key-value
> source, parallel it into 2 streams by key. Each stream modifies entry's
> values, lets say adds some fields. And we want to merge it back later. How
> does it happen?
> Merging point will keep some finite buffer of entries? Basing on time or
> size?
>
> I understand that probably right solution in this case would be having one
> stream and achieve more more performance by increasing parallelism, but
> what if I have 2 sources from the beginning?
>
>
> 2) Also I assume that in a case of streaming each entry considered as
> 'processed' once it passes whole chain and emitted into some sink, so after
> it will not consume resources. Basically similar to what Storm is doing.
> But in a case of finite data (data sets): how big amount of data system
> will keep in memory? The whole set?
>
> I probably have some example of dataset vs stream 'mix': I need to
> *transform* big but finite chunk of data, I don't really need to do any
> 'joins', grouping or smth like that so I never need to store whole dataset
> in memory/storage. What my choice would be in this case?
>
> Thanks!
> Konstantin
>
>
>


Re: AvroWriter for Rolling sink

2016-04-25 Thread Aljoscha Krettek
Hi,
the code looks very good! Do you think it can be adapted to the slightly
modified interface introduced here:
https://issues.apache.org/jira/browse/FLINK-3637

It basically requires the writer to know the write position, so that we can
truncate to a valid position in case of failure.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 18:40 Igor Berman  wrote:

> ok,
> I have working prototype already, if somebody is interested(attached)
>
> I might add it as PR latter(with tests etc)
>
> tested locally & with s3
>
>
>
>
>
>
>
> On 21 April 2016 at 12:01, Aljoscha Krettek  wrote:
>
>> Hi,
>> as far as I know there is no one working on this. I'm only aware of
>> someone working on an ORC (from Hive) Writer.
>>
>> This would be a welcome addition! I think you are already on the right
>> track, the only thing required will probably be an AvroFileWriter and you
>> already started looking at SequenceFileWriter, which should be similar.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:
>>
>>> Hi All,
>>> Is there such implementation somewhere?(before I start to implement it
>>> myself, it seems not too difficult based on SequenceFileWriter example)
>>>
>>> anyway any ideas/pointers will be highly appreciated
>>>
>>> thanks in advance
>>>
>>>
>


Re: Clear irrelevant state values

2016-04-25 Thread Sowmya Vallabhajosyula
Thanks Gyula.

Yes, I am using state only in RichFlatMapFunction. Will try to evaluate
generating events for removal of state.

Regards,
Sowmya

On Mon, Apr 25, 2016 at 5:44 PM, Gyula Fóra  wrote:

> Hi,
>
> The removal markers are just something I made up :) What I meant is that
> you can generate events in a custom source for instance that will trigger
> the removal of the state. This might be easy or hard to do depending on
> your use-case.
>
> What do you mean by custom state maintenance? As long as you are using the
> state interfaces correctly in your functions you should be fine in terms of
> scalability.
>
> Gyula
>
> Sowmya Vallabhajosyula  ezt írta (időpont: 2016.
> ápr. 25., H, 13:29):
>
>> Hi Gyula,
>>
>> Thank you so much.
>>
>> 1. Can you point me to any documentation on removal markers?
>> 2. My understanding is this implementation of custom state maintenance
>> does not impact scalabiity. Is that right?
>>
>> Thanks,
>> Sowmya
>>
>> On Mon, Apr 25, 2016 at 3:06 PM, Gyula Fóra  wrote:
>>
>>> Hi,
>>>
>>> (a) I think your understanding is correct, one consideration might be
>>> that if you are always sending the state to the sink, it might make sense
>>> to build it there directly using a RichSinkFunction.
>>>
>>> (b) There is no built-in support for this at the moment. What you can do
>>> yourself is to generate removal markers for the patients automatically. We
>>> could probably add this feature later which might be easier to implement
>>> for some state backends. For instance in RocksDB we could use a
>>> time-to-live database to remove states after a given period.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> Sowmya Vallabhajosyula  ezt írta (időpont:
>>> 2016. ápr. 25., H, 11:22):
>>>
 Hi,

 Scenario: Health care where a list of patient events are flowing in. We
 would like to keep a Value / List state holding all events and updating the
 state based on a set of business rules. For e.g. if 4 vitals exceed range
 in 24 hours, state is sirs. If the patient is in sirs state and a source of
 infection is reported now, update state to sepsis. Attached flow for your
 reference.

 Considering the example of RideSource, if I use a RichFlatMapFunction
 to maintain this state (please let me know if this doesn't make sense),

 (a) out.collect will return the current patient state to a sink so at
 the end of every event, we know the state of the patient at this instance.
 Is my understanding right?
 (b) Let's say after x days, these events that we recorded are not valid
 anymore. How do we clear the state?

 --
 Thanks and Regards,
 Sowmya Vallabhajosyula

>>>
>>
>>
>> --
>> Thanks and Regards,
>> Sowmya Vallabhajosyula
>>
>


-- 
Thanks and Regards,
Sowmya Vallabhajosyula


Re: Flink on Yarn - ApplicationMaster command

2016-04-25 Thread Maximilian Michels
Great to hear! :)

On Sun, Apr 24, 2016 at 3:51 PM, Theofilos Kakantousis  wrote:
> Hi,
>
> The issue was a mismatch of jar versions on my client. Seems to be working
> fine now.
> Thanks again for your help!
>
> Cheers,
> Theofilos
>
>
> On 2016-04-22 18:22, Theofilos Kakantousis wrote:
>
> Hi Max,
>
> I manage to get the jobManagerAddress from FlinkYarnCluster, however when I
> submit a job using the code below the jobID is null.
> Is there something wrong in the way I submit the job? Otherwise any ideas to
> which direction should I further investigate?
>
> The runBlocking call returns almost immediately. There is no indication the
> job reaches the JobManager as the last log entries for the jobmanager and
> taskmanager logs are that the processes have started successfully.
>
>
> String[] args = {""};
> File file = new File("/srv/flink/examples/ConnectedComponents.jar");
> int parallelism = 1;
> InetSocketAddress jobManagerAddress = cluster.getJobManagerAddress();
> org.apache.flink.configuration.Configuration clientConf = new
> org.apache.flink.configuration.Configuration();
> clientConf.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> jobManagerAddress.getPort());
> clientConf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> jobManagerAddress.getHostName());
> Client client = new Client(clientConf);
> try {
> PackagedProgram program = new PackagedProgram(file,
> "org.apache.flink.examples.java.graph.ConnectedComponents", args);
> client.setPrintStatusDuringExecution(true);
> JobSubmissionResult jobRes = client.runBlocking(program, parallelism);
> JobID jobID = jobRes.getJobID();
> } catch (ProgramInvocationException ex) {
> Logger.getLogger(YarnRunner.class.getName()).log(Level.SEVERE, null,
> }
>
>
> Thanks,
> Theofilos
>
>
> On 2016-04-22 16:05, Maximilian Michels wrote:
>
> Hi Theofilos,
>
> Assuming you have the FlinkYarnCluster after the call to deploy(). You
> can get the JobManager address using the
>
> InetSocketAddress address = cluster.getJobManagerAddress();
>
> Then create a Configuration with this address:
>
> Configuration config = new Configuration();
> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> address.getHostName());
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> address.getPort());
>
> Then the client:
>
> Client client = new Client(config);
>
> Then use it to submit jobs blocking/detached, e.g.
>
> client.runBlocking(...);
> client.runDetached(...);
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis  wrote:
>
> Hi Max,
>
> Thank you for your reply. Exactly, I want to setup the Yarn cluster and
> submit a job through code and not using cmd client.
> I had done what you suggested, I used part of the deploy method to write my
> own code that starts up the cluster which seems to be working fine.
>
> Could you point me to some examples how to use the Client you mention?
>
> Cheers,
> Theofilos
>
>
> On 2016-04-19 16:35, Maximilian Michels wrote:
>
> Hi Theofilos,
>
> I'm not sure whether I understand correctly what you are trying to do.
> I'm assuming you don't want to use the command-line client.
>
> You can setup the Yarn cluster in your code manually using the
> FlinkYarnClient class. The deploy() method will give you a
> FlinkYarnCluster which you can use to connect to the deployed cluster.
> Then get the JobManager address and use the Client class to submit
> Flink jobs to the cluster. I have to warn you that these classes are
> subject to change in Flink 1.1.0 and above.
>
> Let me know if the procedure works for you.
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis 
> wrote:
>
> Hi everyone,
>
> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
> submits a
> flink application to Yarn. To keep it simple I use the
> ConnectedComponents
> app from flink examples.
>
> I set the required properties (Resources, AM ContainerLaunchContext etc.)
> on
> the YARN client interface. What happens is the JobManager and TaskManager
> processes start and based on the logs containers are running but the
> actual
> application does not start. I'm probably missing the proper way to pass
> parameters to the ApplicationMaster and it cannot pick up the application
> it
> needs to run. Anyone knows where I could get some info on how to pass
> runtime params to the AppMaster?
>
> The ApplicationMaster launchcontainer script includes the following:
> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
> org.apache.flink.yarn.ApplicationMaster  -c
> org.apache.flink.examples.java.graph.ConnectedComponents 1>
> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>
> Thank you,
> Theofilos
>
>
>


Re: Clear irrelevant state values

2016-04-25 Thread Sowmya Vallabhajosyula
Hi Gyula,

Thank you so much.

1. Can you point me to any documentation on removal markers?
2. My understanding is this implementation of custom state maintenance does
not impact scalabiity. Is that right?

Thanks,
Sowmya

On Mon, Apr 25, 2016 at 3:06 PM, Gyula Fóra  wrote:

> Hi,
>
> (a) I think your understanding is correct, one consideration might be that
> if you are always sending the state to the sink, it might make sense to
> build it there directly using a RichSinkFunction.
>
> (b) There is no built-in support for this at the moment. What you can do
> yourself is to generate removal markers for the patients automatically. We
> could probably add this feature later which might be easier to implement
> for some state backends. For instance in RocksDB we could use a
> time-to-live database to remove states after a given period.
>
> Cheers,
> Gyula
>
> Sowmya Vallabhajosyula  ezt írta (időpont: 2016.
> ápr. 25., H, 11:22):
>
>> Hi,
>>
>> Scenario: Health care where a list of patient events are flowing in. We
>> would like to keep a Value / List state holding all events and updating the
>> state based on a set of business rules. For e.g. if 4 vitals exceed range
>> in 24 hours, state is sirs. If the patient is in sirs state and a source of
>> infection is reported now, update state to sepsis. Attached flow for your
>> reference.
>>
>> Considering the example of RideSource, if I use a RichFlatMapFunction to
>> maintain this state (please let me know if this doesn't make sense),
>>
>> (a) out.collect will return the current patient state to a sink so at the
>> end of every event, we know the state of the patient at this instance. Is
>> my understanding right?
>> (b) Let's say after x days, these events that we recorded are not valid
>> anymore. How do we clear the state?
>>
>> --
>> Thanks and Regards,
>> Sowmya Vallabhajosyula
>>
>


-- 
Thanks and Regards,
Sowmya Vallabhajosyula


Re: YARN terminating TaskNode

2016-04-25 Thread Maximilian Michels
Hi Timur,

Which version of Flink are you using? Could you share the entire logs?

Thanks,
Max

On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger  wrote:
> Hi Timur,
>
> The reason why we only allocate 570mb for the heap is because you are
> allocating most of the memory as off heap (direct byte buffers).
>
> In theory, the memory footprint of the JVM is limited to 570 (heap) + 1900
> (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM is
> allocating more memory, causing these killings by YARN.
>
> I have to check the code of Flink again, because I would expect the safety
> boundary to be much larger than 30 mb.
>
> Regards,
> Robert
>
>
> On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov 
> wrote:
>>
>> Hello,
>>
>> Next issue in a string of things I'm solving is that my application fails
>> with the message 'Connection unexpectedly closed by remote task manager'.
>>
>> Yarn log shows the following:
>>
>> Container [pid=4102,containerID=container_1461341357870_0004_01_15] is
>> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
>> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing
>> container.
>> Dump of the process-tree for container_1461341357870_0004_01_15 :
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c
>> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
>> -XX:MaxDirectMemorySize=1900m
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
>> -Dlogback.configurationFile=file:logback.xml
>> -Dlog4j.configuration=file:log4j.properties
>> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
>> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.out
>> 2>
>> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.err
>> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460
>> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
>> -XX:MaxDirectMemorySize=1900m
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
>> -Dlogback.configurationFile=file:logback.xml
>> -Dlog4j.configuration=file:log4j.properties
>> org.apache.flink.yarn.YarnTaskManagerRunner --configDir .
>>
>> One thing that drew my attention is `-Xmx570m`. I expected it to be
>> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the
>> application as follows:
>> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm
>> 4096 -ytm 2500 eval-assembly-1.0.jar
>>
>> In flink logs I do see 'Task Manager memory: 2500'. When I look at the
>> yarn container logs on the cluster node I see that it starts with 570mb,
>> which puzzles me. When I look at the actually allocated memory for a Yarn
>> container using 'top' I see 2.2GB used. Am I interpreting these parameters
>> correctly?
>>
>> I also have set (it failed in the same way without this as well):
>> taskmanager.memory.off-heap: true
>>
>> Also, I don't understand why this happens at all. I assumed that Flink
>> won't overcommit allocated resources and will spill to the disk when running
>> out of heap memory. Appreciate if someone can shed light on this too.
>>
>> Thanks,
>> Timur
>
>


Re: Access to a shared resource within a mapper

2016-04-25 Thread Fabian Hueske
Hi Timur,

a TaskManager may run as many subtasks of a Map operator as it has slots.
Each subtask of an operator runs in a different thread. Each parallel
subtask of a Map operator has its own MapFunction object, so it should be
possible to use a lazy val.

However, you should not use static variables to hold state, because these
are shared between all MapFunction in a TaskManager (JVM).

2016-04-22 21:21 GMT+02:00 Timur Fayruzov :

> Actually, a follow-up question: is map function single-threaded (within
> one task manager, that is). If it's not then lazy initialization wont'
> work, is it right?
>
> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen  wrote:
>
>> You may also be able to initialize the client only in the parallel
>> execution by making it a "lazy" variable in Scala.
>>
>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <
>> timur.fairu...@gmail.com> wrote:
>>
>>> Outstanding! Thanks, Aljoscha.
>>>
>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 you could use a RichMapFunction that has an open method:

 data.map(new RichMapFunction[...]() {
   def open(): () = {
 // initialize client
   }

   def map(input: INT): OUT = {
 // use client
   }
 }

 the open() method is called before any elements are passed to the
 function. The counterpart of open() is close(), which is called after all
 elements are through or if the job cancels.

 Cheers,
 Aljoscha

 On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov 
 wrote:

> Hello,
>
> I'm writing a Scala Flink application. I have a standalone process
> that exists on every Flink node that I need to call to transform my data.
> To access this process I need to initialize non thread-safe client first. 
> I
> would like to avoid initializing a client for each element being
> transformed. A straightforward implementation would be something like 
> this:
> ```
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"
> val pool  = new ArrayBlockingQueue[Client](5)
> // pool is filled here
> data.map(e => {
>   val client = pool.take()
>   val res = client.transform(e)
>   pool.put(client)
>   res
> })
>
> ```
> However, this causes a runtime exception with message "Task not
> serializable", which makes sense.
>
> Function parameters and broadcast variables won't work either as far
> as I understand. Is there a way to make this happen?
>
> Thanks,
> Timur
>

>>>
>>
>


Re: Custom Trigger Implementation

2016-04-25 Thread Piyush Shrivastava
Thanks a lot Kostas. This solved my problem. Thanks and Regards,Piyush 
Shrivastava
http://webograffiti.com
 

On Monday, 25 April 2016 3:27 PM, Kostas Kloudas 
 wrote:
 

 Hi,
Let me also add that you should also override the clear() method in order to 
clear you state.and delete the pending timers.
Kostas

On Apr 25, 2016, at 11:52 AM, Kostas Kloudas  
wrote:

Hi Piyush,
In the onElement function, you register a timer every time you receive an 
element. 
When the next watermark arrives, in the flag==false case, this will lead to 
every element adding a timer for its timestamp+6ms. The same for flag==true 
case, with 2ms interval.
What you can try is to set only once, at the first element the initial trigger 
for 60 sec, and then just set all the rest in the on the onEventTime with 20 
sec.
To have a look at a custom trigger you can look at 
here:https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
I hope this helped.Let me know if you need any help.
Kostas

On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava  wrote:
Hi all,I want to implement a custom Trigger which fired a GlobalWindow in 1 
minute for the first time and every 20 seconds after that.I believe I cannot 
get this logic right in the implementation of my custom Trigger. Please help me 
with this.
Here is the code of my custom Trigger:

public class TradeTrigger extends Trigger {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
      
    private TradeTrigger() {
    }
    
    @Override
    public TriggerResult onElement(
            Object element,
            long timestamp,
            W window,
            
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
            throws Exception {
    
        ctx.registerEventTimeTimer(timestamp);
        return TriggerResult.CONTINUE;
        
    }

    @Override
    public TriggerResult onEventTime(
            long timestamp,
            W window,
            
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
            throws Exception {
    
    ValueState state = ctx.getPartitionedState(new 
ValueStateDescriptor("flag", Boolean.TYPE, false));
        
        if(state.value()==false){
            ctx.registerEventTimeTimer(timestamp+6);
            state.update(true);
            return TriggerResult.FIRE;
        }else{
            System.out.println(""+state.value());
            ctx.registerEventTimeTimer(timestamp+2);
            return TriggerResult.FIRE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }
    
 
    public static  TradeTrigger of() {
        return new TradeTrigger<>();
    }

    
}

 Thanks and Regards,Piyush Shrivastava
http://webograffiti.com





  

Re: YARN terminating TaskNode

2016-04-25 Thread Robert Metzger
Hi Timur,

The reason why we only allocate 570mb for the heap is because you are
allocating most of the memory as off heap (direct byte buffers).

In theory, the memory footprint of the JVM is limited to 570 (heap) + 1900
(direct mem) = 2470 MB (which is below 2500). But in practice thje JVM is
allocating more memory, causing these killings by YARN.

I have to check the code of Flink again, because I would expect the safety
boundary to be much larger than 30 mb.

Regards,
Robert


On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov 
wrote:

> Hello,
>
> Next issue in a string of things I'm solving is that my application fails
> with the message 'Connection unexpectedly closed by remote task manager'.
>
> Yarn log shows the following:
>
> Container [pid=4102,containerID=container_1461341357870_0004_01_15] is
> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_1461341357870_0004_01_15 :
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c
> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
> -XX:MaxDirectMemorySize=1900m
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.out
> 2>
> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.err
> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460
> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
> -XX:MaxDirectMemorySize=1900m
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir .
>
> One thing that drew my attention is `-Xmx570m`. I expected it to be
> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the
> application as follows:
> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm
> 4096 -ytm 2500 eval-assembly-1.0.jar
>
> In flink logs I do see 'Task Manager memory: 2500'. When I look at the
> yarn container logs on the cluster node I see that it starts with 570mb,
> which puzzles me. When I look at the actually allocated memory for a Yarn
> container using 'top' I see 2.2GB used. Am I interpreting these parameters
> correctly?
>
> I also have set (it failed in the same way without this as well):
> taskmanager.memory.off-heap: true
>
> Also, I don't understand why this happens at all. I assumed that Flink
> won't overcommit allocated resources and will spill to the disk when
> running out of heap memory. Appreciate if someone can shed light on this
> too.
>
> Thanks,
> Timur
>


Re: Custom Trigger Implementation

2016-04-25 Thread Kostas Kloudas
Hi,

Let me also add that you should also override the clear() method in order to 
clear you state.
and delete the pending timers.

Kostas

> On Apr 25, 2016, at 11:52 AM, Kostas Kloudas  
> wrote:
> 
> Hi Piyush,
> 
> In the onElement function, you register a timer every time you receive an 
> element. 
> 
> When the next watermark arrives, in the flag==false case, this will lead to 
> every element 
> adding a timer for its timestamp+6ms. The same for flag==true case, with 
> 2ms interval.
> 
> What you can try is to set only once, at the first element the initial 
> trigger for 60 sec, and then 
> just set all the rest in the on the onEventTime with 20 sec.
> 
> To have a look at a custom trigger you can look at here:
> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>  
> 
> 
> I hope this helped.
> Let me know if you need any help.
> 
> Kostas
> 
>> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava > > wrote:
>> 
>> Hi all,
>> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute 
>> for the first time and every 20 seconds after that.
>> I believe I cannot get this logic right in the implementation of my custom 
>> Trigger. Please help me with this.
>> 
>> Here is the code of my custom Trigger:
>> 
>> public class TradeTrigger extends Trigger {
>> 
>> /**
>>  * 
>>  */
>> private static final long serialVersionUID = 1L;
>>   
>> private TradeTrigger() {
>> }
>> 
>> @Override
>> public TriggerResult onElement(
>> Object element,
>> long timestamp,
>> W window,
>> 
>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>> throws Exception {
>> 
>> ctx.registerEventTimeTimer(timestamp);
>> return TriggerResult.CONTINUE;
>> 
>> }
>> 
>> @Override
>> public TriggerResult onEventTime(
>> long timestamp,
>> W window,
>> 
>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>> throws Exception {
>> 
>> ValueState state = ctx.getPartitionedState(new 
>> ValueStateDescriptor("flag", Boolean.TYPE, false));
>> 
>> if(state.value()==false){
>> ctx.registerEventTimeTimer(timestamp+6);
>> state.update(true);
>> return TriggerResult.FIRE;
>> }else{
>> System.out.println(""+state.value());
>> ctx.registerEventTimeTimer(timestamp+2);
>> return TriggerResult.FIRE;
>> }
>> }
>> 
>> @Override
>> public TriggerResult onProcessingTime(
>> long arg0,
>> W arg1,
>> 
>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 
>> arg2)
>> throws Exception {
>> // TODO Auto-generated method stub
>> return TriggerResult.CONTINUE;
>> }
>> 
>>  
>> public static  TradeTrigger of() {
>> return new TradeTrigger<>();
>> }
>> 
>> 
>> }
>>  
>> Thanks and Regards,
>> Piyush Shrivastava 
>> 
>> http://webograffiti.com 
> 



Re: Flink first() operator

2016-04-25 Thread Fabian Hueske
Hi Biplop,

you can also implement a generic IF that wraps another IF (such as a
CsvInputFormat).
The wrapping IF forwards all calls to the wrapped IF and in addition counts
how many records were emitted (how often InputFormat.nextRecord() was
called).
Once the count arrives at the threshold, it returns true for
InputFormat.reachedEnd().

Cheers, Fabian

2016-04-25 11:06 GMT+02:00 Ufuk Celebi :

> Hey Biplob,
>
> Yes, the file source will read all input. The first operator will add
> a combiner to the source for pre-aggregation and then shuffle
> everything to a single reduce instance, which emits the N first
> elements. Keep in mind that there is no strict order in which the
> records will be emitted.
>
> If you need to optimize this you could write a custom
> File/TextInputFormat, which discards the lines at the sources. You can
> have a look at these classes and then get back with questions on the
> mailing list.
>
> – Ufuk
>
> On Sat, Apr 23, 2016 at 6:37 PM, Biplob Biswas 
> wrote:
> > Hi,
> >
> > It might be a naive question but I was concerned as I am trying to read
> from
> > a file.
> > My question is if I have a file with n lines and i want m lines out of
> that
> > where n << m, would the first operator process only the first m lines or
> > would it go through the entire file?
> >
> > If it does go through the entire file, is there a better way to just get
> the
> > top m lines using readCsvFile function?
> >
> > Thanks & Regards
> > Biplob Biswas
>


Re: Custom Trigger Implementation

2016-04-25 Thread Kostas Kloudas
Hi Piyush,

In the onElement function, you register a timer every time you receive an 
element. 

When the next watermark arrives, in the flag==false case, this will lead to 
every element 
adding a timer for its timestamp+6ms. The same for flag==true case, with 
2ms interval.

What you can try is to set only once, at the first element the initial trigger 
for 60 sec, and then 
just set all the rest in the on the onEventTime with 20 sec.

To have a look at a custom trigger you can look at here:
https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
 


I hope this helped.
Let me know if you need any help.

Kostas

> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava  
> wrote:
> 
> Hi all,
> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute 
> for the first time and every 20 seconds after that.
> I believe I cannot get this logic right in the implementation of my custom 
> Trigger. Please help me with this.
> 
> Here is the code of my custom Trigger:
> 
> public class TradeTrigger extends Trigger {
> 
> /**
>  * 
>  */
> private static final long serialVersionUID = 1L;
>   
> private TradeTrigger() {
> }
> 
> @Override
> public TriggerResult onElement(
> Object element,
> long timestamp,
> W window,
> 
> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
> throws Exception {
> 
> ctx.registerEventTimeTimer(timestamp);
> return TriggerResult.CONTINUE;
> 
> }
> 
> @Override
> public TriggerResult onEventTime(
> long timestamp,
> W window,
> 
> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
> throws Exception {
> 
> ValueState state = ctx.getPartitionedState(new 
> ValueStateDescriptor("flag", Boolean.TYPE, false));
> 
> if(state.value()==false){
> ctx.registerEventTimeTimer(timestamp+6);
> state.update(true);
> return TriggerResult.FIRE;
> }else{
> System.out.println(""+state.value());
> ctx.registerEventTimeTimer(timestamp+2);
> return TriggerResult.FIRE;
> }
> }
> 
> @Override
> public TriggerResult onProcessingTime(
> long arg0,
> W arg1,
> 
> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
> throws Exception {
> // TODO Auto-generated method stub
> return TriggerResult.CONTINUE;
> }
> 
>  
> public static  TradeTrigger of() {
> return new TradeTrigger<>();
> }
> 
> 
> }
>  
> Thanks and Regards,
> Piyush Shrivastava 
> 
> http://webograffiti.com 



Re: Getting java.lang.Exception when try to fetch data from Kafka

2016-04-25 Thread Robert Metzger
Hi Prateek,

were the messages written to the Kafka topic by Flink, using the
TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink
deserializers expect a different data format of the messages in the topic.

How are the messages written into the topic?


On Fri, Apr 22, 2016 at 10:21 PM, prateekarora 
wrote:

>
> Hi
>
> I am sending data using kafkaProducer API
>
>imageRecord = new ProducerRecord byte[]>(topic,messageKey, imageData);
> producer.send(imageRecord);
>
>
> And in flink program  try to fect data using FlinkKafkaConsumer08 . below
> are the sample code .
>
> def main(args: Array[String]) {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val properties = new Properties()
>   properties.setProperty("bootstrap.servers", ":9092")
>   properties.setProperty("zookeeper.connect", ":2181")
>   properties.setProperty("group.id", "test")
>
>   val readSchema = new
>
> TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]],
>
> env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]
>
>   val stream : DataStream[(String,Array[Byte])]  =
> env.addSource(new
> FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))
>
>   stream.print
>   env.execute("Flink Kafka Example")
>   }
>
>
> but getting  below error :
>
> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
> Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to
> FAILED
> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
> Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to
> CANCELING
>
> java.lang.Exception
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException
> at
>
> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
> at
>
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at
>
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at
>
> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
> at
>
> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)
>
>
> Regards
> Prateek
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Clear irrelevant state values

2016-04-25 Thread Gyula Fóra
Hi,

(a) I think your understanding is correct, one consideration might be that
if you are always sending the state to the sink, it might make sense to
build it there directly using a RichSinkFunction.

(b) There is no built-in support for this at the moment. What you can do
yourself is to generate removal markers for the patients automatically. We
could probably add this feature later which might be easier to implement
for some state backends. For instance in RocksDB we could use a
time-to-live database to remove states after a given period.

Cheers,
Gyula

Sowmya Vallabhajosyula  ezt írta (időpont: 2016.
ápr. 25., H, 11:22):

> Hi,
>
> Scenario: Health care where a list of patient events are flowing in. We
> would like to keep a Value / List state holding all events and updating the
> state based on a set of business rules. For e.g. if 4 vitals exceed range
> in 24 hours, state is sirs. If the patient is in sirs state and a source of
> infection is reported now, update state to sepsis. Attached flow for your
> reference.
>
> Considering the example of RideSource, if I use a RichFlatMapFunction to
> maintain this state (please let me know if this doesn't make sense),
>
> (a) out.collect will return the current patient state to a sink so at the
> end of every event, we know the state of the patient at this instance. Is
> my understanding right?
> (b) Let's say after x days, these events that we recorded are not valid
> anymore. How do we clear the state?
>
> --
> Thanks and Regards,
> Sowmya Vallabhajosyula
>


Re: Thanks everyone

2016-04-25 Thread Ufuk Celebi
Thanks for sharing Prez! :-)

On Sat, Apr 23, 2016 at 7:08 AM, Márton Balassi
 wrote:
> Hi Prez,
>
> Thanks for sharing, the community is always glad to welcome new Flink users.
>
> Best,
>
> Marton
>
> On Sat, Apr 23, 2016 at 6:01 AM, Prez Cannady 
> wrote:
>>
>> We’ve completed our first full sweep on a five node Flink cluster and it
>> went beautifully.  On behalf of my team, thought I’d say thanks for all the
>> support.  Lots more learning and work to do, so we look forward to working
>> with you all.
>>
>> Prez Cannady
>> p: 617 500 3378
>> e: revp...@opencorrelate.org
>> GH: https://github.com/opencorrelate
>> LI: https://www.linkedin.com/in/revprez
>>
>>
>>
>>
>>
>>
>>
>>
>>
>