Re: Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

2017-04-18 Thread sohimankotia
Thanks Aljoscha for reply .

Is there any way where I can add custom metrics or counter  in
FlinkKafkaProducer010 ?





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-tp12633p12668.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Key by Task number

2017-04-18 Thread Telco Phone
Do you have a small example or a link to a doc that has one ?



  From: Chesnay Schepler 
 To: user@flink.apache.org 
 Sent: Tuesday, April 18, 2017 8:16 AM
 Subject: Re: Key by Task number
   
 If the number of combinations between partition and schemaID is limited then 
the subtask index could actually improve the distribution of values.
 
 In any case, the easiest way to do this is to add a RichMapFunction after the 
flatMap, or modify the flatMap, to also include the subtask index.
 Typically this would be done by creating a Tuple2 containing the index and 
value.
 
 On 18.04.2017 15:43, Kamil Dziublinski wrote:
  
 I am not sure if you really need a keyby, your load will be distributed among 
your map function without it.  But could you explain a bit what is your sink 
doing? 
  
  As for setting parallelism on the consumer remember that you wont have higher 
parallelism than number of partitions in your topic. If you have 240 partitions 
that's fine, but if you have less than other subtasks will be idle. Only one 
task can read from one partition in parallel.  
  On Tue, Apr 18, 2017 at 3:38 PM Telco Phone  wrote:
  
   
  I am trying to use the task number as a keyby value to help fan out the work 
load reading from kafka. 
  
  Given: 
          DataStream stream =                 env.addSource(new 
FlinkKafkaConsumer010("topicA", schema, properties)               
  ).setParallelism(240).flatMap(new SchemaRecordSplit()).setParallelism(240).   
                      name("TopicA splitter").keyBy("partition", "keyByHelper", 
"schemaId"); 
          stream.addSink(new CustomMaprFsSink()).name("TopicA 
Sink").setParallelism(240); 
  
  In the DeserialClass I am trying to get to the  
  getRuntimeContext().getIndexOfThisSubtask(); 
  
  Which is only avaliable in the RichSinkFunction 
  
  
  The above is partition (by hour) , schemaID (avro schemaId) and I would like 
to add the task number so that all 240 readers / writers have something to do. 
  Any ideas ? 
  
  

  
 
  

   

Yarn terminating TM for pmem limit cascades causing all jobs to fail

2017-04-18 Thread Shannon Carey
I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM due to 
exceeding pmem limits and all jobs failing as a result. I thought I had 
successfully disabled that check, but apparently the property doesn't work as 
expected in EMR.

From what I can tell in the logs, it looks like after the first TM was killed 
by Yarn, the jobs failed and were retried. However, when they are retried they 
cause increased pmem load on yet another TM, which results in Yarn killing 
another TM. That caused the jobs to fail again. This happened 5 times until our 
job retry policy gave up and allowed the jobs to fail permanently. Obviously, 
this situation is very problematic because it results in the loss of all job 
state, plus it requires manual intervention to start the jobs again.

The job retries eventually fail due to, "Could not restart the job ... The slot 
in which the task was executed has been released. Probably loss of TaskManager" 
or due to "Could not restart the job … Connection unexpectedly closed by remote 
task manager … This might indicate that the remote task manager was lost." 
Those are only the final failure causes: Flink does not appear to log the cause 
of intermediate restart failures.

I assume that the messages logged from the JobManager about "Association with 
remote system … has failed, address is now gated for [5000] ms. Reason is: 
[Disassociated]." is due to the TM failing, and is expected/harmless?

It seems like disabling the pmem check will fix this problem, but I am 
wondering if this is related: 
https://flink.apache.org/faq.html#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do
 ? I don't see any log messages about quarantined TMs…

Do you think that increasing the # of job retries so that the jobs don't fail 
until all TMs are replaced with fresh ones fix this issue? The 
"memory.percent-free" metric from Collectd did go down to 2-3% on the TMs 
before they failed, and shot back up to 30-40% on TM restart (though I'm not 
sure how much of that had to do with the loss of state).  So, memory usage may 
be a real problem, but we don't get an OOM exception so I'm not sure we can 
control this from the JVM perspective. Are there other memory adjustments we 
should make which would allow our TMs to run for long periods of time without 
having this problem? Is there perhaps a memory leak in RocksDB?

Thanks for any help you can provide,
Shannon


Re: Window Functions and Empty Panes

2017-04-18 Thread Kostas Kloudas
I forgot to say that timers are fault-tolerant. You set them, and Flink takes 
care of checkpointing and
restoring them after failure. The flag will also be fault-tolerant as, i 
suppose, you will use Flink’s keyed state.

For more info, you can check the ProcessFunction documentation that Konstantin 
provided.
There, the example uses a value state to hold the counter, you can do sth 
similar to keep the flag.
Keep in mind that the state will already be scoped by key so you do not have to 
worry about that
either.

Kostas

> On Apr 18, 2017, at 11:11 PM, Kostas Kloudas  
> wrote:
> 
> No problem! Glad I could help!
> 
> Kostas
> 
>> On Apr 18, 2017, at 11:01 PM, Ryan Conway > > wrote:
>> 
>> Hi Kostas,
>> 
>> Re restarting: I missed that ProcessFunction.OnTimerContext extends 
>> ProcessFunction.Context! Until now my thought was that OnTimerContext did 
>> not provide a means of restarting a timer.
>> 
>> Re initial timer, you're right, I'll just need to track a boolean in a state 
>> variable that notes whether or not the timer has been initialized. What I am 
>> not confident about is how to manage timer recovery after a node failure; I 
>> imagine it will make sense to not track this variable. I will do more 
>> research and cross that bridge when I get there.
>> 
>> So I think a process function will work just fine, here. Thank you again for 
>> your time, Kostas and Konstantin.
>> 
>> Ryan
>> 
>> On Tue, Apr 18, 2017 at 12:07 PM, Kostas Kloudas 
>> > wrote:
>> Hi Ryan,
>> 
>> “A periodic window like this requires the ability to start a timer without 
>> an element and to restart a timer when fired.”
>> 
>> For the second part, i.e. “to restart a timer when fired”, you can 
>> re-register the timer in the onTimer() method (set a 
>> new timer for “now + T"), so that the next one fires after T time units, 
>> where T is your period.
>> 
>> For the first part, where you set the initial timer for a window, this needs 
>> to have a first element right? If not, how
>> do you know the key for which to set the timer? Are all the keys known in 
>> advance?
>> 
>> Kostas
>> 
>> 
>> 
>>> On Apr 18, 2017, at 8:35 PM, Ryan Conway >> > wrote:
>>> 
>>> A periodic window like this requires the ability to start a timer without 
>>> an element and to restart a timer when fired.
>> 
>> 
> 



Re: Window Functions and Empty Panes

2017-04-18 Thread Kostas Kloudas
Hi Ryan,

“A periodic window like this requires the ability to start a timer without an 
element and to restart a timer when fired.”

For the second part, i.e. “to restart a timer when fired”, you can re-register 
the timer in the onTimer() method (set a 
new timer for “now + T"), so that the next one fires after T time units, where 
T is your period.

For the first part, where you set the initial timer for a window, this needs to 
have a first element right? If not, how
do you know the key for which to set the timer? Are all the keys known in 
advance?

Kostas



> On Apr 18, 2017, at 8:35 PM, Ryan Conway  
> wrote:
> 
> A periodic window like this requires the ability to start a timer without an 
> element and to restart a timer when fired.



Re: CEP timeout does not trigger under certain conditions

2017-04-18 Thread Kostas Kloudas
I just realized that the conversation was not sent to the Mailing List, so I am 
resending it.

Kostas

> On Apr 11, 2017, at 7:30 PM, vijayakumar palaniappan 
>  wrote:
> 
> Sure Thanks
> 
> On Tue, Apr 11, 2017 at 1:28 PM, Kostas Kloudas  > wrote:
> You are correct! I did not read your message correctly. This seems like a 
> bug. You are using the 1.2 version right?
> 
> Could you open a Jira issue?
> 
> Thanks,
> Kostas
> 
> On Apr 11, 2017 09:06, "vijayakumar palaniappan"  > wrote:
> -TimeoutPattern does not trigger under certain conditions. Following are the 
> preconditions: 
> 
> -Assume a pattern of Event A followed by Event B within 2 Seconds
> 
> -PeriodicWaterMarks every 1 second
> 
> -Assume follwoing events have arrived. 
> 
> -Event A-1[time: 1 sec]
> 
> -Event B-1[time: 2 sec] 
> 
> -Event A-2[time: 2 sec]
> 
> -Event A-3[time: 5 sec] 
> 
> -WaterMark[time: 5 sec]
> 
> 
> I would assume that after watermark arrival, Event A-1,B-1 detected. A-2 
> timed out. But A-2 timeout does not happen.
> 
> if i use a punctuated watermark and generate watermark for every event, it 
> seems to work as expected.
> 
> To me this seems to be a bug, is my understanding correct?
> 
> 
> -- 
> Thanks,
> -Vijay
> 
> 
> 
> -- 
> Thanks,
> -Vijay



Re: Flink to S3 streaming

2017-04-18 Thread Aljoscha Krettek
Hi,
You would have to write your own SinkFunction that uses the AWS S3 sdk to write 
to S3. You might be interested in the work proposed in this Jira: 
https://issues.apache.org/jira/browse/FLINK-6306 


As to buffering elements, I’m afraid you would also have to roll your own 
solution for now. You could use the Flink state API for that: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
 

 This even has an example of a buffering Sink.

Best,
Aljoscha
> On 8. Apr 2017, at 08:09, pradeep s  wrote:
> 
> Hi,
> I have a use case to stream messages from Kafka to Amazon S3. I am not using 
> the s3 file system way since i need to have Object tags to be added for each 
> object written in S3.
> So i am planning to use the AWS S3 sdk . But i have a query on how to hold 
> the data till the message size is in few MBs and then write to S3.Also what 
> should be sink to be used in this case if i am using S3 sdks to write to S3.
> Regards
> Pradeep S



Re: Signal Trigger

2017-04-18 Thread Aljoscha Krettek
Ah sorry, you’re right. You add the delay to the signal timestamp.

> On 4. Apr 2017, at 19:25, nragon  wrote:
> 
> Hi,
> 
> I believe this "timestamp + this.delay" is the signal event timestamp + the
> allowed lateness which in this case an configuring it as
> EventTimeSessionSignalTrigger.of(this.lateness.toMilliseconds());
> So, if the allowed lateness is 10 seconds and the event arrived at 15:10 the
> event timer would be called at 15:20 right? It should be the desired
> behaviour.
> 
> Thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Signal-Trigger-tp12509p12521.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Disk I/O in Flink

2017-04-18 Thread Robert Schmidtke
Hi,

I have already looked at the UnilateralSortMerger, concluding that all I/O
eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are
there more interaction points between Flink and the underlying file system
that I might want to consider?

Thanks!
Robert

On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young  wrote:

> Hi,
>
> You probably want check out UnilateralSortMerger.java, this is the class
> which is responsible for external sort for flink. Here is a short
> description for how it works: there are totally 3 threads working together,
> one for reading, one for sorting partial data in memory, and the last one
> is responsible for spilling. Flink will first figure out how many memory it
> can use during the in-memory sort, and manage them as MemorySegments. Once
> these memory runs out, the sorting thread will take over these memory and
> do the in-memory sorting (For more details about in-memory sorting, you can
> see NormalizedKeySorter). After this, the spilling thread will write this
> sorted data to disk and make these memory available again for reading. This
> will repeated until all data has been processed.
> Normally, the data will be read twice (one from source, and one from disk)
> and write once, but if you spilled too much files, flink will first merge
> some all the files and make sure the last merge step will not exceed some
> limit (default 128). Hope this can help you.
>
> Best,
> Kurt
>
> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke 
> wrote:
>
>> Hi,
>>
>> I'm currently examining the I/O patterns of Flink, and I'd like to know
>> when/how Flink goes to disk. Let me give an introduction of what I have
>> done so far.
>>
>> I am running TeraGen (from the Hadoop examples package) + TeraSort (
>> https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of
>> disk. I'm using YARN and HDFS. The underlying file system is XFS.
>>
>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>> zero, and after TeraGen + TeraSort are finished, I dump the XFS counters
>> again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2
>> TiB of reads. What I'd have expected would be 2 TiB of writes (1 for
>> TeraGen, 1 for TeraSort) and 1 TiB of reads (during TeraSort).
>>
>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>> wrapper that logs file system statistics for each call to hdfs://..., such
>> as start time/end time, no. of bytes read/written etc. I can plot these
>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to
>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes
>> to hdfs://... So far, so good.
>>
>> Now this still did not explain the disk I/O, so I added bytecode
>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
>> mapped files etc., and have the same statistics: start/end of a read
>> from/write to disk, no. of bytes involved and such. I can plot these
>> numbers too and see that the HDFS JVMs write 1 TiB of data to disk during
>> TeraGen (expected) and read and write 1 TiB from and to disk during
>> TeraSort (expected).
>>
>> Sorry for the enormous introduction, but now there's finally the
>> interesting part: Flink's JVMs read from and write to disk 1 TiB of data
>> each during TeraSort. I'm suspecting there is some sort of spilling
>> involved, potentially because I have not done the setup properly. But that
>> is not the crucial point: my statistics give a total of 3 TiB of writes to
>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
>> from above. However, my statistics only give 2 TiB of reads from disk (1
>> TiB for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from
>> disk somewhere. I have done the same with Hadoop TeraSort, and there I'm
>> not missing any data, meaning my statistics agree with XFS for TeraSort on
>> Hadoop, which is why I suspect there are some cases where Flink goes to
>> disk without me noticing it.
>>
>> Therefore here finally the question: in which cases does Flink go to
>> disk, and how does it do so (meaning precisely which Java classes are
>> involved, so I can check my bytecode instrumentation)? This would also
>> include any kind of resource distribution via HDFS/YARN I guess (like JAR
>> files and I don't know what). Seeing that I'm missing an amount of data
>> equal to the size of my input set I'd suspect there must be some sort of
>> shuffling/spilling at play here, but I'm not sure. Maybe there is also some
>> sort of remote I/O involved via sockets or so that I'm missing.
>>
>> Any hints as to where Flink might incur disk I/O are greatly appreciated!
>> I'm also happy with doing the digging