Need Help/Code Examples with reading/writing Parquet File with Flink ?

2018-04-17 Thread sohimankotia
Hi ..

I have file in hdfs in format file.snappy.parquet . Can someone please
point/help with code example of reading parquet files .


-Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread Miguel Coimbra
Hello James,

Thanks for the information.
I noticed something suspicious as well: I have chains of operators where
the first operator will ingest the expected amount of records but will not
emit any, leaving the following operator empty in a "RUNNING" state.
For example:



I will get back if I find out more.


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 

On 17 April 2018 at 20:59, James Yu  wrote:

> Miguel, I and my colleague ran into same problem yesterday.
> We were expecting Flink to get 4 inputs from Kafka and write the inputs to
> Cassandra, but the operators got stuck after the 1st input is written into
> Cassandra.
> This is how DAG looks like:
> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
> After we disable the auto chaining (https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/stream/
> operators/#task-chaining-and-resource-groups), all 4 inputs are read from
> Kafka and written into Cassandra.
> We are still figuring out why the chaining causes the blocking.
>
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275
>
> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra :
>
>> Chesnay, following your suggestions I got access to the web interface and
>> also took a closer look at the debugging logs.
>> I have noticed one problem regarding the web interface port - it keeps
>> changing port now and then during my Java program's execution.
>>
>> Not sure if that is due to my program launching several job executions
>> sequentially, but the fact is that it happened.
>> Since I am accessing the web interface via tunneling, it becomes rather
>> cumbersome to keep adapting it.
>>
>> Another particular problem I'm noticing is that this exception frequently
>> pops up (debugging with log4j):
>>
>> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
>> ster.slotpool.SlotPool  - Releasing slot with slot request id
>> 9055ef473251505dac04c99727106dc9.
>> org.apache.flink.util.FlinkException: Slot is being returned to the
>> SlotPool.
>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
>> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>> at java.util.concurrent.CompletableFuture.uniHandle(Completable
>> Future.java:822)
>> at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
>> tableFuture.java:834)
>> at java.util.concurrent.CompletableFuture.handle(CompletableFut
>> ure.java:2155)
>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>> t.releaseSlot(SingleLogicalSlot.java:130)
>> at org.apache.flink.runtime.executiongraph.Execution.releaseAss
>> ignedResource(Execution.java:1239)
>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>> ed(Execution.java:946)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>> eState(ExecutionGraph.java:1588)
>> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
>> tionState(JobMaster.java:593)
>> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>> cation(AkkaRpcActor.java:210)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
>> (AkkaRpcActor.java:154)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
>> essage(FencedAkkaRpcActor.java:66)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
>> ive$1(AkkaRpcActor.java:132)
>> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
>> .scala:544)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j
>> ava:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>> kJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>> Don't know if the internals of Flink are explicitly using an exception
>> for control flow, but there are several occurrences of 

Re: assign time attribute after first window group when using Flink SQL

2018-04-17 Thread Ivan Wang
Thanks Fabian. I tried to use "rowtime" and Flink tells me below exception:

*Exception in thread "main" org.apache.flink.table.api.ValidationException:
SlidingGroupWindow('w2, 'end, 150.rows, 1.rows) is invalid: Event-time
grouping windows on row intervals in a stream environment are currently not
supported.*

Then I tried to OverWindows, luckily it can serve my requirement as well.
Now my table query is like below

.window(Tumble.over("15.seconds").on("timeMill").as("w1"))
.groupBy("symbol, w1").select("(w1.rowtime) as end, symbol, price.max
as p_max, price.min as p_min")
.window(Over.partitionBy("symbol").orderBy("end").preceding("149.rows").as("w2"))
.select("symbol as symbol_, end, p_max.max over w2 as max, p_min.min
over w2 as min");


It works and I can get what I want. However, the result is not ordered by
the rowtime (here I use "end" as alias). Is this by default and any thing
to get it ordered?

Below is the entire requirement,

Basically there's one raw stream (r1), and I group it first by time as w1
then by window count as w2. I'd like to compare the "price" field in every
raw event with the same field in the most close preceding event in w2.
If condition meets, I'd like to use the price value and timestamp in that
event to get one matching event from another raw stream (r2).

CEP sounds to be a good idea. But I need to refer to event in other stream
(r2) in current pattern condition (r1). Is it possible to do this using CEP?

Thanks
Ivan



On Mon, Apr 16, 2018 at 4:01 PM, Fabian Hueske  wrote:

> Sorry, I forgot to CC the user mailing list in my reply.
>
> 2018-04-12 17:27 GMT+02:00 Fabian Hueske :
>
>> Hi,
>>
>> Assuming you are using event time, the right function to generate a row
>> time attribute from a window would be "w1.rowtime" instead of "w1.start".
>>
>> The reason why Flink is picky about this is that we must ensure that the
>> result rows of the windows are aligned with the watermarks of the stream.
>>
>> Best, Fabian
>>
>>
>> Ivan Wang  schrieb am So., 8. Apr. 2018, 22:26:
>>
>>> Hi all,
>>>
>>>
>>>
>>> I'd like to use 2 window group in a chain in my program as below.
>>>
>>>
>>>
>>> Table myTable = cTable
>>> .window(Tumble.*over*("15.seconds").on("timeMill").as("w1"))
>>> .groupBy("symbol, w1").select("w1.start as start, w1.end as
>>> end, symbol, price.max as p_max, price.min as p_min")
>>> .window(Slide.*over*("150.rows").every("1.rows").on("start").as(
>>> "w2"))
>>> .groupBy("symbol, w2").select("w2.start, w2.end, symbol,
>>> p_max.max, p_min.min")
>>> ;
>>>
>>>
>>>
>>>
>>>
>>> However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows,
>>> 1.rows) is invalid: Sliding window expects a time attribute for grouping in
>>> a stream environment.
>>>
>>>  at org.apache.flink.table.plan.logical.LogicalNode.failValidati
>>> on(LogicalNode.scala:149)
>>>
>>>  at org.apache.flink.table.plan.logical.WindowAggregate.validate
>>> (operators.scala:658)
>>>
>>>  at org.apache.flink.table.api.WindowGroupedTable.select(table.
>>> scala:1159)
>>>
>>>  at org.apache.flink.table.api.WindowGroupedTable.select(table.
>>> scala:1179)
>>>
>>>  at minno.gundam.ReadPattern.main(ReadPattern.java:156)
>>>
>>>
>>>
>>> Is there any way to assign time attribute after the first groupBy (w1)?
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ivan
>>>
>>>
>>>
>>>
>


Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread James Yu
Miguel, I and my colleague ran into same problem yesterday.
We were expecting Flink to get 4 inputs from Kafka and write the inputs to
Cassandra, but the operators got stuck after the 1st input is written into
Cassandra.
This is how DAG looks like:
Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
After we disable the auto chaining (
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups),
all 4 inputs are read from Kafka and written into Cassandra.
We are still figuring out why the chaining causes the blocking.


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275

2018-04-18 6:57 GMT+08:00 Miguel Coimbra :

> Chesnay, following your suggestions I got access to the web interface and
> also took a closer look at the debugging logs.
> I have noticed one problem regarding the web interface port - it keeps
> changing port now and then during my Java program's execution.
>
> Not sure if that is due to my program launching several job executions
> sequentially, but the fact is that it happened.
> Since I am accessing the web interface via tunneling, it becomes rather
> cumbersome to keep adapting it.
>
> Another particular problem I'm noticing is that this exception frequently
> pops up (debugging with log4j):
>
> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
> ster.slotpool.SlotPool  - Releasing slot with slot request id
> 9055ef473251505dac04c99727106dc9.
> org.apache.flink.util.FlinkException: Slot is being returned to the
> SlotPool.
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
> at java.util.concurrent.CompletableFuture.uniHandle(Completable
> Future.java:822)
> at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
> tableFuture.java:834)
> at java.util.concurrent.CompletableFuture.handle(CompletableFut
> ure.java:2155)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
> t.releaseSlot(SingleLogicalSlot.java:130)
> at org.apache.flink.runtime.executiongraph.Execution.releaseAss
> ignedResource(Execution.java:1239)
> at org.apache.flink.runtime.executiongraph.Execution.markFinish
> ed(Execution.java:946)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
> eState(ExecutionGraph.java:1588)
> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
> tionState(JobMaster.java:593)
> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> thodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
> cation(AkkaRpcActor.java:210)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
> (AkkaRpcActor.java:154)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
> essage(FencedAkkaRpcActor.java:66)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
> ive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
> .scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
> java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
> l.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
>
> Don't know if the internals of Flink are explicitly using an exception for
> control flow, but there are several occurrences of this as time goes by.
>
> Regarding my program itself, I've achieved some progress.
> In my program I need to do a sequence of series of Flink jobs, and need
> extra care to make sure no DataSet instance from job *i* is being used in
> an operator in job *i + 1*.
> I believe this was generating the waiting scenarios I describe in an
> earlier email.
> The bottom line is to be extra careful about when job executions are
> actually triggered and to make sure that a DataSet which will need to be
> used in different Flink jobs is available for example as a file in
> secondary 

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread Miguel Coimbra
Chesnay, following your suggestions I got access to the web interface and
also took a closer look at the debugging logs.
I have noticed one problem regarding the web interface port - it keeps
changing port now and then during my Java program's execution.

Not sure if that is due to my program launching several job executions
sequentially, but the fact is that it happened.
Since I am accessing the web interface via tunneling, it becomes rather
cumbersome to keep adapting it.

Another particular problem I'm noticing is that this exception frequently
pops up (debugging with log4j):

00:17:54,368 DEBUG
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
- Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
org.apache.flink.util.FlinkException: Slot is being returned to the
SlotPool.
at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$
ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
at org.apache.flink.runtime.jobmaster.slotpool.
SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
at java.util.concurrent.CompletableFuture.uniHandle(
CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture.uniHandleStage(
CompletableFuture.java:834)
at java.util.concurrent.CompletableFuture.handle(
CompletableFuture.java:2155)
at org.apache.flink.runtime.jobmaster.slotpool.
SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
at org.apache.flink.runtime.executiongraph.Execution.
releaseAssignedResource(Execution.java:1239)
at org.apache.flink.runtime.executiongraph.Execution.
markFinished(Execution.java:946)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.
updateState(ExecutionGraph.java:1588)
at org.apache.flink.runtime.jobmaster.JobMaster.
updateTaskExecutionState(JobMaster.java:593)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
handleRpcInvocation(AkkaRpcActor.java:210)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
handleMessage(AkkaRpcActor.java:154)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
handleMessage(FencedAkkaRpcActor.java:66)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$
onReceive$1(AkkaRpcActor.java:132)
at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(
ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(
ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)

Don't know if the internals of Flink are explicitly using an exception for
control flow, but there are several occurrences of this as time goes by.

Regarding my program itself, I've achieved some progress.
In my program I need to do a sequence of series of Flink jobs, and need
extra care to make sure no DataSet instance from job *i* is being used in
an operator in job *i + 1*.
I believe this was generating the waiting scenarios I describe in an
earlier email.
The bottom line is to be extra careful about when job executions are
actually triggered and to make sure that a DataSet which will need to be
used in different Flink jobs is available for example as a file in
secondary storage (possibly masked as a memory-mapping) and is exclusively
read from that source.
This means ensuring the job that originally produces a DataSet (for reuse
on a later job) assigns to it a DataSink for secondary storage.

I'm going to keep digging taking this in account - if will report back if I
manage to fix everything or find a new problem.

Thanks again,



Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 

On 16 April 2018 at 10:26, Chesnay Schepler  wrote:

> ah yes, currently when you use that method the UI is started on a random
> port. I'm currently fixing that in this PR
>  that will be merged today.
> For now you will enable logging and search for something along the lines of
> "http://: was granted leadership"
>
> Sorry for the inconvenience.
>
> On 16.04.2018 15:04, Miguel Coimbra wrote:
>
> Thanks for the suggestions 

Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
Also, I note that none of the operations show any back pressure issues, and the 
records out from the kafka connector slow down to a crawl.  Are there any known 
issues with kafka throughput that could be the issue rather than flink?  I have 
a java program that monitors the test that reads all the kafka messages in 
about 34 min while the flink job has yet to read all the kafka messages 
1hr40min later.

Michael

> On Apr 17, 2018, at 12:58 PM, TechnoMage  wrote:
> 
> Also, I note some messages in the log about my java class not being a valid 
> POJO because it is missing accessors for a field.  Would this impact 
> performance significantly?
> 
> Michael
> 
>> On Apr 17, 2018, at 12:54 PM, TechnoMage > > wrote:
>> 
>> No checkpoints are active.
>> I will try that back end.
>> Yes, using JSONObject subclass for most of the intermediate state, with JSON 
>> strings in and out of Kafka.  I will look at the config page for how to 
>> enable that.
>> 
>> Thank you,
>> Michael
>> 
>>> On Apr 17, 2018, at 12:51 PM, Stephan Ewen >> > wrote:
>>> 
>>> A few ideas how to start debugging this:
>>> 
>>>   - Try deactivating checkpoints. Without that, no work goes into 
>>> persisting rocksdb data to the checkpoint store.
>>>   - Try to swap RocksDB for the FsStateBackend - that reduces serialization 
>>> cost for moving data between heap and offheap (rocksdb).
>>>   - Do you have some expensive types (JSON, etc)? Try activating object 
>>> reuse (which avoids some extra defensive copies)
>>> 
>>> On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage >> > wrote:
>>> Memory use is steady throughout the job, but the CPU utilization drops off 
>>> a cliff.  I assume this is because it becomes I/O bound shuffling managed 
>>> state.
>>> 
>>> Are there any metrics on managed state that can help in evaluating what to 
>>> do next?
>>> 
>>> Michael
>>> 
>>> 
 On Apr 17, 2018, at 7:11 AM, Michael Latta > wrote:
 
 Thanks for the suggestion. The task manager is configured for 8GB of heap, 
 and gets to about 8.3 total. Other java processes (job manager and Kafka). 
 Add a few more. I will check it again but the instances have 16GB same as 
 my laptop that completes the test in <90 min. 
 
 Michael
 
 Sent from my iPad
 
 On Apr 16, 2018, at 10:53 PM, Niclas Hedhman > wrote:
 
> 
> Have you checked memory usage? It could be as simple as either having 
> memory leaks, or aggregating more than you think (sometimes not obvious 
> how much is kept around in memory for longer than one first thinks). If 
> possible, connect FlightRecorder or similar tool and keep an eye on 
> memory. Additionally, I don't have AWS experience to talk of, but IF AWS 
> swaps RAM to disk like regular Linux, then that might be triggered if 
> your JVM heap is bigger than can be handled within the available RAM.
> 
> On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage  > wrote:
> I am doing a short Proof of Concept for using Flink and Kafka in our 
> product.  On my laptop I can process 10M inputs in about 90 min.  On 2 
> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and 
> ssd storage) I see the process hit a wall around 50min into the test and 
> short of 7M events processed.  This is running zookeeper, kafka broker, 
> flink all on the same server in all cases.  My goal is to measure single 
> node vs. multi-node and test horizontal scalability, but I would like to 
> figure out why hit hits a wall first.  I have the task maanger configured 
> with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, 
> and the EC2 instances have 4 threads. On smaller data sets and in the 
> begining of each test the EC2 instances outpace the laptop.  I will try 
> again with an m5.2xlarge which has 8 threads and 32GB ram to see if that 
> works better for this workload.  Any pointers or ways to get metrics that 
> would help diagnose this would be appreciated.
> 
> Michael
> 
> 
> 
> 
> -- 
> Niclas Hedhman, Software Developer
> http://polygene.apache.org  - New Energy for 
> Java
>>> 
>>> 
>> 
> 



Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
Also, I note some messages in the log about my java class not being a valid 
POJO because it is missing accessors for a field.  Would this impact 
performance significantly?

Michael

> On Apr 17, 2018, at 12:54 PM, TechnoMage  wrote:
> 
> No checkpoints are active.
> I will try that back end.
> Yes, using JSONObject subclass for most of the intermediate state, with JSON 
> strings in and out of Kafka.  I will look at the config page for how to 
> enable that.
> 
> Thank you,
> Michael
> 
>> On Apr 17, 2018, at 12:51 PM, Stephan Ewen > > wrote:
>> 
>> A few ideas how to start debugging this:
>> 
>>   - Try deactivating checkpoints. Without that, no work goes into persisting 
>> rocksdb data to the checkpoint store.
>>   - Try to swap RocksDB for the FsStateBackend - that reduces serialization 
>> cost for moving data between heap and offheap (rocksdb).
>>   - Do you have some expensive types (JSON, etc)? Try activating object 
>> reuse (which avoids some extra defensive copies)
>> 
>> On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage > > wrote:
>> Memory use is steady throughout the job, but the CPU utilization drops off a 
>> cliff.  I assume this is because it becomes I/O bound shuffling managed 
>> state.
>> 
>> Are there any metrics on managed state that can help in evaluating what to 
>> do next?
>> 
>> Michael
>> 
>> 
>>> On Apr 17, 2018, at 7:11 AM, Michael Latta >> > wrote:
>>> 
>>> Thanks for the suggestion. The task manager is configured for 8GB of heap, 
>>> and gets to about 8.3 total. Other java processes (job manager and Kafka). 
>>> Add a few more. I will check it again but the instances have 16GB same as 
>>> my laptop that completes the test in <90 min. 
>>> 
>>> Michael
>>> 
>>> Sent from my iPad
>>> 
>>> On Apr 16, 2018, at 10:53 PM, Niclas Hedhman >> > wrote:
>>> 
 
 Have you checked memory usage? It could be as simple as either having 
 memory leaks, or aggregating more than you think (sometimes not obvious 
 how much is kept around in memory for longer than one first thinks). If 
 possible, connect FlightRecorder or similar tool and keep an eye on 
 memory. Additionally, I don't have AWS experience to talk of, but IF AWS 
 swaps RAM to disk like regular Linux, then that might be triggered if your 
 JVM heap is bigger than can be handled within the available RAM.
 
 On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage > wrote:
 I am doing a short Proof of Concept for using Flink and Kafka in our 
 product.  On my laptop I can process 10M inputs in about 90 min.  On 2 
 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and 
 ssd storage) I see the process hit a wall around 50min into the test and 
 short of 7M events processed.  This is running zookeeper, kafka broker, 
 flink all on the same server in all cases.  My goal is to measure single 
 node vs. multi-node and test horizontal scalability, but I would like to 
 figure out why hit hits a wall first.  I have the task maanger configured 
 with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and 
 the EC2 instances have 4 threads. On smaller data sets and in the begining 
 of each test the EC2 instances outpace the laptop.  I will try again with 
 an m5.2xlarge which has 8 threads and 32GB ram to see if that works better 
 for this workload.  Any pointers or ways to get metrics that would help 
 diagnose this would be appreciated.
 
 Michael
 
 
 
 
 -- 
 Niclas Hedhman, Software Developer
 http://polygene.apache.org  - New Energy for 
 Java
>> 
>> 
> 



Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
No checkpoints are active.
I will try that back end.
Yes, using JSONObject subclass for most of the intermediate state, with JSON 
strings in and out of Kafka.  I will look at the config page for how to enable 
that.

Thank you,
Michael

> On Apr 17, 2018, at 12:51 PM, Stephan Ewen  wrote:
> 
> A few ideas how to start debugging this:
> 
>   - Try deactivating checkpoints. Without that, no work goes into persisting 
> rocksdb data to the checkpoint store.
>   - Try to swap RocksDB for the FsStateBackend - that reduces serialization 
> cost for moving data between heap and offheap (rocksdb).
>   - Do you have some expensive types (JSON, etc)? Try activating object reuse 
> (which avoids some extra defensive copies)
> 
> On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage  > wrote:
> Memory use is steady throughout the job, but the CPU utilization drops off a 
> cliff.  I assume this is because it becomes I/O bound shuffling managed state.
> 
> Are there any metrics on managed state that can help in evaluating what to do 
> next?
> 
> Michael
> 
> 
>> On Apr 17, 2018, at 7:11 AM, Michael Latta > > wrote:
>> 
>> Thanks for the suggestion. The task manager is configured for 8GB of heap, 
>> and gets to about 8.3 total. Other java processes (job manager and Kafka). 
>> Add a few more. I will check it again but the instances have 16GB same as my 
>> laptop that completes the test in <90 min. 
>> 
>> Michael
>> 
>> Sent from my iPad
>> 
>> On Apr 16, 2018, at 10:53 PM, Niclas Hedhman > > wrote:
>> 
>>> 
>>> Have you checked memory usage? It could be as simple as either having 
>>> memory leaks, or aggregating more than you think (sometimes not obvious how 
>>> much is kept around in memory for longer than one first thinks). If 
>>> possible, connect FlightRecorder or similar tool and keep an eye on memory. 
>>> Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM 
>>> to disk like regular Linux, then that might be triggered if your JVM heap 
>>> is bigger than can be handled within the available RAM.
>>> 
>>> On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage >> > wrote:
>>> I am doing a short Proof of Concept for using Flink and Kafka in our 
>>> product.  On my laptop I can process 10M inputs in about 90 min.  On 2 
>>> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and 
>>> ssd storage) I see the process hit a wall around 50min into the test and 
>>> short of 7M events processed.  This is running zookeeper, kafka broker, 
>>> flink all on the same server in all cases.  My goal is to measure single 
>>> node vs. multi-node and test horizontal scalability, but I would like to 
>>> figure out why hit hits a wall first.  I have the task maanger configured 
>>> with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and 
>>> the EC2 instances have 4 threads. On smaller data sets and in the begining 
>>> of each test the EC2 instances outpace the laptop.  I will try again with 
>>> an m5.2xlarge which has 8 threads and 32GB ram to see if that works better 
>>> for this workload.  Any pointers or ways to get metrics that would help 
>>> diagnose this would be appreciated.
>>> 
>>> Michael
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Niclas Hedhman, Software Developer
>>> http://polygene.apache.org  - New Energy for 
>>> Java
> 
> 



Re: Flink/Kafka POC performance issue

2018-04-17 Thread Stephan Ewen
A few ideas how to start debugging this:

  - Try deactivating checkpoints. Without that, no work goes into
persisting rocksdb data to the checkpoint store.
  - Try to swap RocksDB for the FsStateBackend - that reduces serialization
cost for moving data between heap and offheap (rocksdb).
  - Do you have some expensive types (JSON, etc)? Try activating object
reuse (which avoids some extra defensive copies)

On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage  wrote:

> Memory use is steady throughout the job, but the CPU utilization drops off
> a cliff.  I assume this is because it becomes I/O bound shuffling managed
> state.
>
> Are there any metrics on managed state that can help in evaluating what to
> do next?
>
> Michael
>
>
> On Apr 17, 2018, at 7:11 AM, Michael Latta  wrote:
>
> Thanks for the suggestion. The task manager is configured for 8GB of heap,
> and gets to about 8.3 total. Other java processes (job manager and Kafka).
> Add a few more. I will check it again but the instances have 16GB same as
> my laptop that completes the test in <90 min.
>
> Michael
>
> Sent from my iPad
>
> On Apr 16, 2018, at 10:53 PM, Niclas Hedhman  wrote:
>
>
> Have you checked memory usage? It could be as simple as either having
> memory leaks, or aggregating more than you think (sometimes not obvious how
> much is kept around in memory for longer than one first thinks). If
> possible, connect FlightRecorder or similar tool and keep an eye on memory.
> Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM
> to disk like regular Linux, then that might be triggered if your JVM heap
> is bigger than can be handled within the available RAM.
>
> On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage  wrote:
>
>> I am doing a short Proof of Concept for using Flink and Kafka in our
>> product.  On my laptop I can process 10M inputs in about 90 min.  On 2
>> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and
>> ssd storage) I see the process hit a wall around 50min into the test and
>> short of 7M events processed.  This is running zookeeper, kafka broker,
>> flink all on the same server in all cases.  My goal is to measure single
>> node vs. multi-node and test horizontal scalability, but I would like to
>> figure out why hit hits a wall first.  I have the task maanger configured
>> with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and
>> the EC2 instances have 4 threads. On smaller data sets and in the begining
>> of each test the EC2 instances outpace the laptop.  I will try again with
>> an m5.2xlarge which has 8 threads and 32GB ram to see if that works better
>> for this workload.  Any pointers or ways to get metrics that would help
>> diagnose this would be appreciated.
>>
>> Michael
>>
>>
>
>
> --
> Niclas Hedhman, Software Developer
> http://polygene.apache.org - New Energy for Java
>
>
>


Re: InterruptedException when async function is cancelled

2018-04-17 Thread Stephan Ewen
Agreed.

It is fixed in 1.5 and in the 1.4.x branch. The fix came after 1.4.2, so it
s not released as of now.

On Tue, Apr 17, 2018 at 7:47 PM, Ken Krugler 
wrote:

> Hi Timo,
>
> [Resending from an address the Apache list server likes…]
>
> I discussed this with Till during Flink Forward, and he said it looks like
> the expected result when cancelling, as that will cause all operators to be
> interrupted, which in turn generates the stack trace I’m seeing.
>
> As to whether it’s a bug or not - I guess not.
>
> But it would be nice if something like this (expected action) wasn’t being
> logged as an error.
>
> Regards,
>
> — Ken
>
>
> On Mar 26, 2018, at 3:19 AM, Timo Walther  wrote:
>
> Hi Ken,
>
> as you can see here [1], Flink interrupts the timer service after a
> certain timeout. If you want to get rid of the exception, you should
> increase "task.cancellation.timers.timeout" in the configuration.
>
> Actually, the default is already set to 7 seconds. So your exception
> should not be thrown so quickly. For me this looks like a bug but please
> let us know if setting the timeout higher solved your problem.
>
> Regards,
> Timo
>
>
> [1] https://github.com/apache/flink/blob/master/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/
> runtime/tasks/StreamTask.java#L358
>
>
> Am 21.03.18 um 23:29 schrieb Ken Krugler:
>
> Hi all,
>
> When I cancel a job that has async functions, I see this sequence in the
> TaskManager logs:
>
> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Attempting to cancel task AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9).
> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task
>   - AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from RUNNING to CANCELING.
> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Triggering cancellation of task
> code AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>
> And then less than a second later...
>
> 2018-03-21 14:51:35,315 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask
>   - Could not shut down timer service
> java.lang.InterruptedException
> at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
> at java.util.concurrent.ThreadPoolExecutor.awaitTermination(
> ThreadPoolExecutor.java:1465)
> at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.
> shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:317)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:747)
>
> Followed shortly thereafter by a call to the async function’s close()
> method, which logs:
>
> 2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor
>- Shutting down the AsyncFunctionName thread pool
>
> And finally…
>
> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task
>   - AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9) switched from CANCELING to CANCELED.
> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Freeing task resources for AsyncFunctionName (1/1) (
> fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>
> I’ve looked through the code, and I don’t see any place where I’m
> interrupting any threads. When I shut down my own thread pool, interrupts
> will be generated, but only for threads used by my pool, and this happens
> after the InterruptedException.
>
> Is this a known issue? Or is there something I can to do to avoid it?
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
> 
> http://about.me/kkrugler
> +1 530-210-6378
>
>
>
> 
> http://about.me/kkrugler
> +1 530-210-6378
>
>


Re: CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?

2018-04-17 Thread Stephan Ewen
Thanks for reporting this, also thanks for checking out that this works
with RocksDB and also with synchronous checkpoints.

I would assume that this issue lies not in the serializer itself, but in
accidental sharing in the FsStateBackend async snapshots.
Do you know if the issue still exists in Flink 1.4.2?

On Tue, Apr 17, 2018 at 6:14 PM, joshlemer  wrote:

> Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that
> asynchronous
> snapshots fail when using the Filesystem back-end. Synchronous snapshots
> succeed, and RocksDB snapshots succeed (both async and sync), but async
> Filesystem snapshots fail with this error:
>
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
> at java.util.ArrayList.set(ArrayList.java:448)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(
> MapReferenceResolver.java:56)
> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
> KryoSerializer.java:189)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:101)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$copy$1.apply(TraversableSerializer.scala:69)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$
> anonfun$copy$1.apply(TraversableSerializer.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(
> TraversableSerializer.scala:69)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(
> TraversableSerializer.scala:33)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:282)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:306)
> at
> org.apache.flink.runtime.state.heap.HeapValueState.
> value(HeapValueState.java:55)
> at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentA
> ssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentA
> ssignments.scala:102)
> at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentA
> ssignments.processElement2(JoinSegmentMappingWithSegmentA
> ssignments.scala:218)
> at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentA
> ssignments.processElement2(JoinSegmentMappingWithSegmentA
> ssignments.scala:76)
> at
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.
> processElement2(KeyedCoProcessOperator.java:86)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.
> processInput(StreamTwoInputProcessor.java:270)
> at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(
> TwoInputStreamTask.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
>
> This stack trace occurs when I am trying to access the value of a
>
> `ValueState[scala.collection.mutable.PriorityQueue[(
> AJavaObjectThatUsesTwitterChillProtoSerialization,
> Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on.
>
> I have found similar errors occurring in already-fixed tickets like this
> one:
> https://issues.apache.org/jira/browse/FLINK-7484
> which is part of this umbrella issue:
> https://issues.apache.org/jira/browse/FLINK-7830
>
> However these tickets are apparently resolved, maybe the bug has not been
> completely fixed? Or maybe I am making a mistake in programming? When I get
> the value of the state, I do mutate it, and I also mutate the
> mutable.BitSet
> before persisting again. But as far as I know this is perfectly ok by flink
> yes?
>
> Thanks for any help or pointers!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: InterruptedException when async function is cancelled

2018-04-17 Thread Ken Krugler
Hi Timo,

[Resending from an address the Apache list server likes…]

I discussed this with Till during Flink Forward, and he said it looks like the 
expected result when cancelling, as that will cause all operators to be 
interrupted, which in turn generates the stack trace I’m seeing.

As to whether it’s a bug or not - I guess not.

But it would be nice if something like this (expected action) wasn’t being 
logged as an error.

Regards,

— Ken


> On Mar 26, 2018, at 3:19 AM, Timo Walther  > wrote:
> 
> Hi Ken,
> 
> as you can see here [1], Flink interrupts the timer service after a certain 
> timeout. If you want to get rid of the exception, you should increase 
> "task.cancellation.timers.timeout" in the configuration.
> 
> Actually, the default is already set to 7 seconds. So your exception should 
> not be thrown so quickly. For me this looks like a bug but please let us know 
> if setting the timeout higher solved your problem.
> 
> Regards,
> Timo
> 
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L358
>  
> 
> 
> 
> Am 21.03.18 um 23:29 schrieb Ken Krugler:
>> Hi all,
>> 
>> When I cancel a job that has async functions, I see this sequence in the 
>> TaskManager logs:
>> 
>> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task  
>>- Attempting to cancel task AsyncFunctionName (1/1) 
>> (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task  
>>- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) 
>> switched from RUNNING to CANCELING.
>> 2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task  
>>- Triggering cancellation of task code AsyncFunctionName 
>> (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>> 
>> And then less than a second later...
>> 
>> 2018-03-21 14:51:35,315 ERROR 
>> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
>> shut down timer service
>> java.lang.InterruptedException
>>  at 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>  at java.lang.Thread.run(Thread.java:747)
>> 
>> Followed shortly thereafter by a call to the async function’s close() 
>> method, which logs:
>> 
>> 2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor  
>> - Shutting down the AsyncFunctionName thread pool 
>> 
>> And finally…
>> 
>> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task  
>>- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) 
>> switched from CANCELING to CANCELED.
>> 2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task  
>>- Freeing task resources for AsyncFunctionName (1/1) 
>> (fcb7bbe7cd89f1167f8a656b0f2fdaf9).
>> 
>> I’ve looked through the code, and I don’t see any place where I’m 
>> interrupting any threads. When I shut down my own thread pool, interrupts 
>> will be generated, but only for threads used by my pool, and this happens 
>> after the InterruptedException.
>> 
>> Is this a known issue? Or is there something I can to do to avoid it?
>> 
>> Thanks,
>> 
>> — Ken 
>> 
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com 
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>> 
> 


http://about.me/kkrugler 
+1 530-210-6378




http://about.me/kkrugler
+1 530-210-6378



CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?

2018-04-17 Thread joshlemer
Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous
snapshots fail when using the Filesystem back-end. Synchronous snapshots
succeed, and RocksDB snapshots succeed (both async and sync), but async
Filesystem snapshots fail with this error:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.set(ArrayList.java:448)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentAssignments.scala:102)
at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:218)
at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:76)
at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:86)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:270)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

This stack trace occurs when I am trying to access the value of a

`ValueState[scala.collection.mutable.PriorityQueue[(AJavaObjectThatUsesTwitterChillProtoSerialization,
Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on.

I have found similar errors occurring in already-fixed tickets like this
one:
https://issues.apache.org/jira/browse/FLINK-7484 
which is part of this umbrella issue:
https://issues.apache.org/jira/browse/FLINK-7830

However these tickets are apparently resolved, maybe the bug has not been
completely fixed? Or maybe I am making a mistake in programming? When I get
the value of the state, I do mutate it, and I also mutate the mutable.BitSet
before persisting again. But as far as I know this is perfectly ok by flink
yes?

Thanks for any help or pointers! 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink/Kafka POC performance issue

2018-04-17 Thread TechnoMage
Memory use is steady throughout the job, but the CPU utilization drops off a 
cliff.  I assume this is because it becomes I/O bound shuffling managed state.

Are there any metrics on managed state that can help in evaluating what to do 
next?

Michael

> On Apr 17, 2018, at 7:11 AM, Michael Latta  wrote:
> 
> Thanks for the suggestion. The task manager is configured for 8GB of heap, 
> and gets to about 8.3 total. Other java processes (job manager and Kafka). 
> Add a few more. I will check it again but the instances have 16GB same as my 
> laptop that completes the test in <90 min. 
> 
> Michael
> 
> Sent from my iPad
> 
> On Apr 16, 2018, at 10:53 PM, Niclas Hedhman  > wrote:
> 
>> 
>> Have you checked memory usage? It could be as simple as either having memory 
>> leaks, or aggregating more than you think (sometimes not obvious how much is 
>> kept around in memory for longer than one first thinks). If possible, 
>> connect FlightRecorder or similar tool and keep an eye on memory. 
>> Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM 
>> to disk like regular Linux, then that might be triggered if your JVM heap is 
>> bigger than can be handled within the available RAM.
>> 
>> On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage > > wrote:
>> I am doing a short Proof of Concept for using Flink and Kafka in our 
>> product.  On my laptop I can process 10M inputs in about 90 min.  On 2 
>> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd 
>> storage) I see the process hit a wall around 50min into the test and short 
>> of 7M events processed.  This is running zookeeper, kafka broker, flink all 
>> on the same server in all cases.  My goal is to measure single node vs. 
>> multi-node and test horizontal scalability, but I would like to figure out 
>> why hit hits a wall first.  I have the task maanger configured with 6 slots 
>> and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 
>> instances have 4 threads. On smaller data sets and in the begining of each 
>> test the EC2 instances outpace the laptop.  I will try again with an 
>> m5.2xlarge which has 8 threads and 32GB ram to see if that works better for 
>> this workload.  Any pointers or ways to get metrics that would help diagnose 
>> this would be appreciated.
>> 
>> Michael
>> 
>> 
>> 
>> 
>> -- 
>> Niclas Hedhman, Software Developer
>> http://polygene.apache.org  - New Energy for 
>> Java



Flink job testing with

2018-04-17 Thread Chauvet, Thomas
Hi everybody,

I would like to test a kafka / flink process in scala. I would like to proceed 
as in the integration testing documentation : 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing
 with Kafka as source and sink.

For example, I have a topic kafka as source for flink (I use 
FlinkKafkaConsumer011), then I do some process with Flink, then I send the 
stream to Kafka (FlinkKafkaProducer011).

Any idea on how to do that ? Or better, any example ?

Thanks


Re: Flink/Kafka POC performance issue

2018-04-17 Thread Michael Latta
Thanks for the suggestion. The task manager is configured for 8GB of heap, and 
gets to about 8.3 total. Other java processes (job manager and Kafka). Add a 
few more. I will check it again but the instances have 16GB same as my laptop 
that completes the test in <90 min. 

Michael

Sent from my iPad

> On Apr 16, 2018, at 10:53 PM, Niclas Hedhman  wrote:
> 
> 
> Have you checked memory usage? It could be as simple as either having memory 
> leaks, or aggregating more than you think (sometimes not obvious how much is 
> kept around in memory for longer than one first thinks). If possible, connect 
> FlightRecorder or similar tool and keep an eye on memory. Additionally, I 
> don't have AWS experience to talk of, but IF AWS swaps RAM to disk like 
> regular Linux, then that might be triggered if your JVM heap is bigger than 
> can be handled within the available RAM.
> 
>> On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage  wrote:
>> I am doing a short Proof of Concept for using Flink and Kafka in our 
>> product.  On my laptop I can process 10M inputs in about 90 min.  On 2 
>> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd 
>> storage) I see the process hit a wall around 50min into the test and short 
>> of 7M events processed.  This is running zookeeper, kafka broker, flink all 
>> on the same server in all cases.  My goal is to measure single node vs. 
>> multi-node and test horizontal scalability, but I would like to figure out 
>> why hit hits a wall first.  I have the task maanger configured with 6 slots 
>> and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 
>> instances have 4 threads. On smaller data sets and in the begining of each 
>> test the EC2 instances outpace the laptop.  I will try again with an 
>> m5.2xlarge which has 8 threads and 32GB ram to see if that works better for 
>> this workload.  Any pointers or ways to get metrics that would help diagnose 
>> this would be appreciated.
>> 
>> Michael
>> 
> 
> 
> 
> -- 
> Niclas Hedhman, Software Developer
> http://polygene.apache.org - New Energy for Java


Re: How to configure the reporting interval for the flink metric monitoring.

2018-04-17 Thread Chesnay Schepler
You can configure the interval by setting 
|metrics.reporter..interval| as described in the documentation 
.


On 17.04.2018 13:40, Ganesh Manal wrote:


HI,

When I configure the counter ( in default system metric ).

I could see the counter getting monitored on reporting tool ( graphite 
in my case ).


But the default reporting interval is 60 seconds.

Is there a way to configure the interval for metric reporting ?

Thanks & Regards,
Ganesh Manal





How to configure the reporting interval for the flink metric monitoring.

2018-04-17 Thread Ganesh Manal
HI,

When I configure the counter ( in default system metric ).
I could see the counter getting monitored on reporting tool ( graphite in my 
case ).
But the default reporting interval is 60 seconds.
Is there a way to configure the interval for metric reporting ?

Thanks & Regards,
Ganesh Manal



Re: State-machine-based search logic in Flink ?

2018-04-17 Thread Fabian Hueske
Hi Esa,

What do you mean by "individual searches in the Table API"?
There is some work (a pending PR [1]) to integrate the MATCH_RECOGNIZE
clause (SQL 2016) [2] into Flink's SQL which basically adds a SQL syntax
for the CEP library.

Best, Fabian

[1] https://github.com/apache/flink/pull/4502
[2] https://modern-sql.com/feature/match_recognize

2018-04-17 10:07 GMT+02:00 Esa Heikkinen :

> Hi
>
>
>
> I am not sure I have understand all, but it is possible to build some kind
> of state-machine-based search logic for
>
> example on top of the individual searches in Table API (using
> CsvTableSource) ?
>
>
>
> Best, Esa
>
>
>


State-machine-based search logic in Flink ?

2018-04-17 Thread Esa Heikkinen
Hi

I am not sure I have understand all, but it is possible to build some kind of 
state-machine-based search logic for
example on top of the individual searches in Table API (using CsvTableSource) ?

Best, Esa



rest.port is reset to 0 by YarnEntrypointUtils

2018-04-17 Thread Dongwon Kim
Hi,

I'm trying to launch a dispatcher on top of YARN by executing "yarn-session.sh" 
on the command line.

To access the rest endpoint outside the cluster, I need to assign a port from 
an allowed range.

YarnEntrypointUtils, however, sets rest.port to 0 for random binding.

Is there any reason on it?


Best,

- Dongwon

Re: How to rebalance a table without converting to dataset

2018-04-17 Thread Shuyi Chen
Hi Darshan, thanks for raising the problem. We do have similar use of
rebalancing in Flink SQL, where we want to rebalance the Kafka input with
more partitions to increase parallelism in streaming.

As Fabian suggests, rebalancing is not relation algebra. The closest use of
the operation I can find in databases is in vertica (REBALANCE_TABLE
),
but it's used more as a one-time rebalance operation of the data after
adding/removing nodes. However, on the data processing side, I think this
might deserve more attention because we can't easily modify the input data
source, e.g. number of Kafka partitions.

The closest I can think of to enable this feature is DDL in SQL, e.g.
something like ALTER TABLE REBALANCE in SQL. With this DDL statement, it
will cause a rebalance() call when StreamTableSource.getDataStream
or BatchTableSource.getDataSet is invoked. In such case, we dont need to
touch the parser or planner in Calcite. For the table API, a possible
solution would be to add a rebalance() api in table API, but will need to
close out the LogicalPlan every time rebalance() is called, so we won't
need to touch the Calcite planner.

On Mon, Apr 16, 2018 at 5:41 AM, Fabian Hueske  wrote:

> Hi Darshan,
>
> You are right. there's currently no rebalancing operation on the Table
> API.
> I see that this might be a good feature, not sure though how easy it would
> be to integrate because we need to pass it through the Calcite optimizer
> and rebalancing is not a relational operation.
>
> For now, converting to DataSet and back to Table is the only option.
>
> Best, Fabian
>
> 2018-04-13 14:33 GMT+02:00 Darshan Singh :
>
>> Hi
>>
>> I have a table and I want to rebalance the data so that each partition is
>> equal. I cna convert to dataset and rebalance and then convert to table.
>>
>> I couldnt find any rebalance on table api. Does anyone know any better
>> idea to rebalance table data?
>>
>> Thanks
>>
>
>


-- 
"So you have to trust that the dots will somehow connect in your future."


Re: Kafka topic partition skewness causes watermark not being emitted

2018-04-17 Thread Juho Autio
A possible workaround while waiting for FLINK-5479, if someone is hitting
the same problem: we chose to send "heartbeat" messages periodically to all
topics & partitions found on our Kafka. We do that through the service that
normally writes to our Kafka. This way every partition always has some
~recent timestamps.

On Wed, Dec 13, 2017 at 1:06 PM, Gerard Garcia  wrote:

> Thanks Gordon.
>
> Don't worry, I'll be careful to not have empty partitions until the next
> release.
> Also, I'll keep an eye to FLINK-5479 and if at some point I see that
> there is a fix and the issue bothers us too much I'll try to apply the
> patch myself to the latest stable release.
>
> Gerard
>
> On Wed, Dec 13, 2017 at 10:31 AM, Tzu-Li (Gordon) Tai  > wrote:
>
>> Hi,
>>
>> I've just elevated FLINK-5479 to BLOCKER for 1.5.
>>
>> Unfortunately, AFAIK there is no easy workaround solution for this issue
>> yet in the releases so far.
>> The min watermark logic that controls per-partition watermark emission is
>> hidden inside the consumer, making it hard to work around it.
>>
>> One possible solution I can imagine, but perhaps not that trivial, is to
>> inject some special marker event into all partitions periodically.
>> The watermark assigner should be able to recognize this special marker
>> and try to provide some watermark for it.
>> Another option is that I can provide some patch you can apply for a
>> custom build of the Kafka connector that handles partition idleness
>> properly.
>> However, given that we're aiming for a faster release cycle for Flink 1.5
>> (proposed release date is Feb. 2018), it might not be worth the extra
>> maintenance effort on your side of a custom build.
>>
>> Best,
>> Gordon
>>
>>
>> On Tue, Dec 12, 2017 at 9:28 PM, gerardg  wrote:
>>
>>> I'm also affected by this behavior. There are no updates in FLINK-5479
>>> but
>>> did you manage to find a way to workaround this?
>>>
>>> Thanks,
>>>
>>> Gerard
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>