Re: [DISCUSS] Proposal to support disk spilling in HeapKeyedStateBackend

2019-05-29 Thread Stefan Richter
Hi Yu,

Sorry for the late reaction. As already discussed internally, I think this is a 
very good proposal and design that can help to improve a major limitation of 
the current state backend. I think that most discussion is happening in the 
design doc and I left my comments there. Looking forward to seeing this 
integrated with Flink soon!

Best,
Stefan

> On 24. May 2019, at 14:50, Yu Li  wrote:
> 
> Hi All,
> 
> As mentioned in our speak[1] given in FlinkForwardChina2018, we have improved 
> HeapKeyedStateBackend to support disk spilling and put it in production here 
> in Alibaba for last year's Singles' Day. Now we're ready to upstream our work 
> and the design doc is up for review[2]. Please let us know your point of the 
> feature and any comment is welcomed/appreciated.
> 
> We plan to keep the discussion open for at least 72 hours, and will create 
> umbrella jira and subtasks if no objections. Thanks.
> 
> Below is a brief description about the motivation of the work, FYI:
> 
> HeapKeyedStateBackend is one of the two KeyedStateBackends in Flink, since 
> state lives as Java objects on the heap in HeapKeyedStateBackend and the 
> de/serialization only happens during state snapshot and restore, it 
> outperforms RocksDBKeyeStateBackend when all data could reside in memory.
> However, along with the advantage, HeapKeyedStateBackend also has its 
> shortcomings, and the most painful one is the difficulty to estimate the 
> maximum heap size (Xmx) to set, and we will suffer from GC impact once the 
> heap memory is not enough to hold all state data. There’re several 
> (inevitable) causes for such scenario, including (but not limited to):
> * Memory overhead of Java object representation (tens of times of the 
> serialized data size).
> * Data flood caused by burst traffic.
> * Data accumulation caused by source malfunction.
> To resolve this problem, we proposed a solution to support spilling state 
> data to disk before heap memory is exhausted. We will monitor the heap usage 
> and choose the coldest data to spill, and reload them when heap memory is 
> regained after data removing or TTL expiration, automatically.
> 
> [1] https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf 
> 
> [2] 
> https://docs.google.com/document/d/1rtWQjIQ-tYWt0lTkZYdqTM6gQUleV8AXrfTOyWUZMf4/edit?usp=sharing
>  
> 
> Best Regards,
> Yu



Re: RocksDB native checkpoint time

2019-05-03 Thread Stefan Richter
Hi,

out of curiosity, does it happen with jobs that have a large number of states 
(column groups) or also for jobs with few column groups and just “big state”?

Best,
Stefan

> On 3. May 2019, at 11:04, Piotr Nowojski  wrote:
> 
> Hi Gyula,
> 
> Have you read our tuning guide?
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdb
>  
> 
> 
> Synchronous part is mostly about flushing data to disks, so you could try to 
> optimise your setup having that in mind. Limiting the size of a page cache, 
> speeding up the writes (using more/faster disks…), etc… Maybe you can also 
> look at online resources how to speedup calls to 
> `org.rocksdb.Checkpoint#create`.
> 
> Piotrek
> 
>> On 3 May 2019, at 10:30, Gyula Fóra > > wrote:
>> 
>> Hi!
>> 
>> Does anyone know what parameters might affect the RocksDB native checkpoint 
>> time? (basically the sync part of the rocksdb incremental snapshots)
>> 
>> It seems to take 60-70 secs in some cases for larger state sizes, and I 
>> wonder if there is anything we could tune to reduce this. Maybe its only a 
>> matter of size i dont know.
>> 
>> Any ideas would be appreciated :)
>> Gyula
> 



Re: kafka partitions, data locality

2019-04-26 Thread Stefan Richter
Hi Sergey,

The point why this I flagged as beta is actually less about stability but more 
about the fact that this is supposed to be more of a "power user" feature 
because bad things can happen if your data is not 100% correctly partitioned in 
the same way as Flink would partition it. This is why typically you should only 
use it if the data was partitioned by Flink and you are very sure what your are 
doing, because the is not really something we can to at the API level to 
protect you from mistakes in using this feature. Eventually some runtime 
exceptions might show you that something is going wrong, but that is not 
exactly a good user experience.

On a different note, there actually is currently one open issue [1] to be aware 
of in connection with this feature and operator chaining, but at the same time 
this is something that should not hard to fix in for the next minor release.

Best,
Stefan

[1] 
https://issues.apache.org/jira/browse/FLINK-12296?focusedCommentId=16824945=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16824945
  

> On 26. Apr 2019, at 09:48, Smirnov Sergey Vladimirovich (39833) 
>  wrote:
> 
> Hi,
>  
> Dawid, great, thanks!
> Any plans to make it stable? 1.9?
>  
>  
> Regards,
> Sergey
>  
> From: Dawid Wysakowicz [mailto:dwysakow...@apache.org] 
> Sent: Thursday, April 25, 2019 10:54 AM
> To: Smirnov Sergey Vladimirovich (39833) ; Ken Krugler 
> 
> Cc: user@flink.apache.org; d...@flink.apache.org
> Subject: Re: kafka partitions, data locality
>  
> Hi Smirnov,
> 
> Actually there is a way to tell Flink that data is already partitioned. You 
> can try the reinterpretAsKeyedStream[1] method. I must warn you though this 
> is an experimental feature.
> 
> Best,
> 
> Dawid
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features
>  
> 
> On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
> Hi Ken,
>  
> It’s a bad story for us: even for a small window we have a dozens of 
> thousands events per job with 10x in peaks or even more. And the number of 
> jobs was known to be high. So instead of N operations (our producer/consumer 
> mechanism) with shuffle/resorting (current flink realization) it will be 
> N*ln(N) - the tenfold loss of execution speed!
> 4 all, my next step? Contribute to apache flink? Issues backlog?
>  
>  
> With best regards,
> Sergey
> From: Ken Krugler [mailto:kkrugler_li...@transpac.com 
> ] 
> Sent: Wednesday, April 17, 2019 9:23 PM
> To: Smirnov Sergey Vladimirovich (39833)  
> 
> Subject: Re: kafka partitions, data locality
>  
> Hi Sergey,
>  
> As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
> clientId and find the max, then the topology will have a partition/shuffle to 
> it.
>  
> This is because Flink doesn’t know that client ids don’t span Kafka 
> partitions.
>  
> I don’t know of any way to tell Flink that the data doesn’t need to be 
> shuffled. There was a discussion 
> 
>  about adding a keyByWithoutPartitioning a while back, but I don’t think that 
> support was ever added.
>  
> A simple ProcessFunction with MapState (clientId -> max) should allow you do 
> to the same thing without too much custom code. In order to support 
> windowing, you’d use triggers to flush state/emit results.
>  
> — Ken
>  
>  
> On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
> mailto:s.smirn...@tinkoff.ru>> wrote:
>  
> Hello,
>  
> We planning to use apache flink as a core component of our new streaming 
> system for internal processes (finance, banking business) based on apache 
> kafka.
> So we starting some research with apache flink and one of the question, 
> arises during that work, is how flink handle with data locality.
> I`ll try to explain: suppose we have a kafka topic with some kind of events. 
> And this events groups by topic partitions so that the handler (or a job 
> worker), consuming message from a partition, have all necessary information 
> for further processing. 
> As an example, say we have client’s payment transaction in a kafka topic. We 
> grouping by clientId (transaction with the same clientId goes to one same 
> kafka topic partition) and the task is to find max transaction per client in 
> sliding windows. In terms of map\reduce there is no needs to shuffle data 
> between all topic consumers, may be it`s worth to do within each consumer to 
> gain some speedup due to increasing number of executors within each partition 
> data.
> And my question is how flink will work in this case. Do it shuffle all data, 
> or it have some settings to avoid this extra unnecessary 

Re: Fast restart of a job with a large state

2019-04-18 Thread Stefan Richter
Hi,

If rescaling is the problem, let me clarify that you can currently rescale from 
savepoints and all types of checkpoints (including incremental). If that was 
the only problem, then there is nothing to worry about - the documentation is 
only a bit conservative about this because we will not commit to an APU that 
all future types checkpoints will be resealable. But currently they are all, 
and this is also very unlikely to change anytime soon.

Paul, just to comment on your suggestion as well, local recovery would only 
help with failover. 1) It does not help for restarts by the user and 2) also 
does not work for rescaling (2) is a consequence of 1) because failover never 
rescales, only restarts).

Best,
Stefan

> On 18. Apr 2019, at 12:07, Paul Lam  wrote:
> 
> The URL in my previous mail is wrong, and it should be: 
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>  
> 
> 
> Best,
> Paul Lam
> 
>> 在 2019年4月18日,18:04,Paul Lam > > 写道:
>> 
>> Hi,
>> 
>> Have you tried task local recovery [1]?
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>  
>> 
>> 
>> Best,
>> Paul Lam
>> 
>>> 在 2019年4月17日,17:46,Sergey Zhemzhitsky >> > 写道:
>>> 
>>> Hi Flinkers,
>>> 
>>> Operating different flink jobs I've discovered that job restarts with
>>> a pretty large state (in my case this is up to 100GB+) take quite a
>>> lot of time. For example, to restart a job (e.g. to update it) the
>>> savepoint is created, and in case of savepoints all the state seems to
>>> be pushed into the distributed store (hdfs in my case) when stopping a
>>> job and pulling this state back when starting the new version of the
>>> job.
>>> 
>>> What I've found by the moment trying to speed up job restarts is:
>>> - using external retained checkpoints [1]; the drawback is that the
>>> job cannot be rescaled during restart
>>> - using external state and storage with the stateless jobs; the
>>> drawback is the necessity of additional network hops to this storage.
>>> 
>>> So I'm wondering whether there are any best practices community knows
>>> and uses to cope with the cases like this?
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>>  
>>> 
>> 
> 



Re: Query on job restoration using relocated savepoint

2019-04-11 Thread Stefan Richter
Hi,

the first case sounds like you made a mistake when editing the paths manually 
and deleted one ore more bytes that were not part of the path and thus 
corrupted the meta data. For the second approach, of course you also need to 
replace the paths after reading and before rewriting the metadata. This 
approach is basically the programmatic version of your first attempt, but using 
Flink’s code to avoid the pitfalls of corrupting the file.

Best,
Stefan 

> On 10. Apr 2019, at 19:16, Parth Sarathy  wrote:
> 
> Hi All,
>   We are trying to restore a job using relocated savepoint
> files. As pointed out in the FAQs of savepoint documentation, savepoints
> have absolute paths recorded in them and hence a simple relocation to
> restore the job would fail. As directed in the documentation we tried out
> the simple way to refactor the paths by editing them manually, but the job
> submission failed with an IllegalStateException as noted below :
> Caused by: java.lang.IllegalStateException: Reading invalid
> OperatorStateHandle, type: 50
>at
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeOperatorStateHandle(SavepointV2Serializer.java:499)
> 
> We then went ahead and gave a swing at the second prescribed option of
> utilizing the SavepointV2Serializer for deserializing and serializing the
> metadata file. Even with this approach we observed that the generated
> metadata file still referenced the old absolute path. We are in a stuck in a
> predicament as of now. How is it that we can set / change the absolute paths
> present in the metadata file using the prescribed SavepointV2Serializer.
> It’d be helpful if you could provide some insight into this.
> 
> Thanks,
> Parth Sarathy
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Query on job restoration using relocated savepoint

2019-04-11 Thread Stefan Richter
Small correction, on the first case: more likely is that you changed the path 
string but I think those are prefixed by the string length, so that would 
require manual adjustment as well to not corrupt the metadata.

> On 11. Apr 2019, at 14:42, Stefan Richter  wrote:
> 
> Hi,
> 
> the first case sounds like you made a mistake when editing the paths manually 
> and deleted one ore more bytes that were not part of the path and thus 
> corrupted the meta data. For the second approach, of course you also need to 
> replace the paths after reading and before rewriting the metadata. This 
> approach is basically the programmatic version of your first attempt, but 
> using Flink’s code to avoid the pitfalls of corrupting the file.
> 
> Best,
> Stefan 
> 
>> On 10. Apr 2019, at 19:16, Parth Sarathy  
>> wrote:
>> 
>> Hi All,
>>  We are trying to restore a job using relocated savepoint
>> files. As pointed out in the FAQs of savepoint documentation, savepoints
>> have absolute paths recorded in them and hence a simple relocation to
>> restore the job would fail. As directed in the documentation we tried out
>> the simple way to refactor the paths by editing them manually, but the job
>> submission failed with an IllegalStateException as noted below :
>> Caused by: java.lang.IllegalStateException: Reading invalid
>> OperatorStateHandle, type: 50
>>   at
>> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeOperatorStateHandle(SavepointV2Serializer.java:499)
>> 
>> We then went ahead and gave a swing at the second prescribed option of
>> utilizing the SavepointV2Serializer for deserializing and serializing the
>> metadata file. Even with this approach we observed that the generated
>> metadata file still referenced the old absolute path. We are in a stuck in a
>> predicament as of now. How is it that we can set / change the absolute paths
>> present in the metadata file using the prescribed SavepointV2Serializer.
>> It’d be helpful if you could provide some insight into this.
>> 
>> Thanks,
>> Parth Sarathy
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-10 Thread Stefan Richter
Congrats and thanks to Aljoscha for managing the release!

Best,
Stefan

> On 10. Apr 2019, at 13:01, Biao Liu  wrote:
> 
> Great news! Thanks Aljoscha and all the contributors.
> 
> Till Rohrmann mailto:trohrm...@apache.org>> 
> 于2019年4月10日周三 下午6:11写道:
> Thanks a lot to Aljoscha for being our release manager and to the community 
> making this release possible!
> 
> Cheers,
> Till
> 
> On Wed, Apr 10, 2019 at 12:09 PM Hequn Cheng  > wrote:
> Thanks a lot for the great release Aljoscha!
> Also thanks for the work by the whole community. :-)
> 
> Best, Hequn
> 
> On Wed, Apr 10, 2019 at 6:03 PM Fabian Hueske  > wrote:
> Congrats to everyone!
> 
> Thanks Aljoscha and all contributors.
> 
> Cheers, Fabian
> 
> Am Mi., 10. Apr. 2019 um 11:54 Uhr schrieb Congxian Qiu 
> mailto:qcx978132...@gmail.com>>:
> Cool!
> 
> Thanks Aljoscha a lot for being our release manager, and all the others who 
> make this release possible.
> 
> Best, Congxian
> On Apr 10, 2019, 17:47 +0800, Jark Wu  >, wrote:
> > Cheers!
> >
> > Thanks Aljoscha and all others who make 1.8.0 possible.
> >
> > On Wed, 10 Apr 2019 at 17:33, vino yang  > > wrote:
> >
> > > Great news!
> > >
> > > Thanks Aljoscha for being the release manager and thanks to all the
> > > contributors!
> > >
> > > Best,
> > > Vino
> > >
> > > Driesprong, Fokko  于2019年4月10日周三 下午4:54写道:
> > >
> > > > Great news! Great effort by the community to make this happen. Thanks 
> > > > all!
> > > >
> > > > Cheers, Fokko
> > > >
> > > > Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang  > > > >:
> > > >
> > > > > Thanks Aljoscha and all others who made contributions to FLINK 1.8.0.
> > > > > Looking forward to FLINK 1.9.0.
> > > > >
> > > > > Regards,
> > > > > Shaoxuan
> > > > >
> > > > > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek  > > > > >
> > > > > wrote:
> > > > >
> > > > > > The Apache Flink community is very happy to announce the release of
> > > > > Apache
> > > > > > Flink 1.8.0, which is the next major release.
> > > > > >
> > > > > > Apache Flink® is an open-source stream processing framework for
> > > > > > distributed, high-performing, always-available, and accurate data
> > > > > streaming
> > > > > > applications.
> > > > > >
> > > > > > The release is available for download at:
> > > > > > https://flink.apache.org/downloads.html 
> > > > > > 
> > > > > >
> > > > > > Please check out the release blog post for an overview of the
> > > > > improvements
> > > > > > for this bugfix release:
> > > > > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html 
> > > > > > 
> > > > > >
> > > > > > The full release notes are available in Jira:
> > > > > >
> > > > > >
> > > > >
> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
> > > >  
> > > > 
> > > > > >
> > > > > > We would like to thank all contributors of the Apache Flink 
> > > > > > community
> > > > who
> > > > > > made this release possible!
> > > > > >
> > > > > > Regards,
> > > > > > Aljoscha
> > > > >
> > > >
> > >



Re: Migrating Existing TTL State to 1.8

2019-03-13 Thread Stefan Richter
Hi,

If you are worried about old state, you can combine the compaction filter based 
TTL with other cleanup strategies (see docs). For example, setting 
`cleanupFullSnapshot` when you take a savepoint it will be cleared of any 
expired state and you can then use it to bring it into Flink 1.8.

Best,
Stefan

> On 13. Mar 2019, at 13:41, Ning Shi  wrote:
> 
> Just wondering if anyone has any insights into the new TTL state cleanup 
> feature mentioned below.
> 
> Thanks,
> 
> —
> Ning
> 
> On Mar 11, 2019, at 1:15 PM, Ning Shi  > wrote:
> 
>> It's exciting to see TTL state cleanup feature in 1.8. I have a question 
>> regarding the migration of existing TTL state to the newer version.
>> 
>> Looking at the Pull Request [1] that introduced this feature, it seems like 
>> that Flink is leveraging RocksDB's compaction filter to remove stale state. 
>> I assume this means that state will only be cleaned on compaction. If I have 
>> a significant amount of stale TTL state, some of which may have already been 
>> compacted to higher levels already, upgrading to 1.8 may not clean them. Is 
>> this assumption correct? If so, is the best approach to take a full 
>> snapshot/checkpoint and restore it to 1.8 to have them been cleaned on 
>> initialization?
>> 
>> Thanks,
>> 
>> [1] https://github.com/dataArtisans/frocksdb/pull/1 
>> 
>> 
>> --
>> Ning



Re: Partitions and the number of cores/executors

2019-03-13 Thread Stefan Richter
Hi,

Your assumption is right. Parallel processing is based in splitting inputs and 
each split is only processed by one task instance at a time.

Best,
Stefan

> On 13. Mar 2019, at 09:52, mbilalce@gmail.com wrote:
> 
> Hi,
> 
> I am working with Gelly graph library but I think the question is applicable 
> in general. I just want to confirm if a single data partition in Flink is 
> executed by only a single executor/core? i.e. multiple executors can't be 
> utilized to process a single partition in parallel. So, if I need to have 
> utilize higher parallelism I should simply have an equal number of 
> partitions. 
> 
> I suppose the reason for this is to avoid any kind of locking that might be 
> required to process a data partition with multiple executors. 
> 
> 
> Thanks
> 
> - Bilal
> 



Re: Will state TTL support event time cleanup in 1.8?

2019-03-13 Thread Stefan Richter
TTL based on event time is not part or 1.8, but likely to be part of 1.9.

> On 13. Mar 2019, at 13:17, Sergei Poganshev  wrote:
> 
> Do improvements introduced in 
> https://issues.apache.org/jira/browse/FLINK-10471 
>  add support for event 
> time TTL?



Re: Understanding timestamp and watermark assignment errors

2019-03-13 Thread Stefan Richter
Hi,

I think this looks like the same problem as in this issue: 
https://issues.apache.org/jira/browse/FLINK-11420 


Best,
Stefan


> On 13. Mar 2019, at 09:41, Konstantin Knauf  wrote:
> 
> Hi Andrew, 
> 
> generally, this looks like a concurrency problem. 
> 
> Are you using asynchronous checkpointing? If so, could you check if this 
> issue also occurs with synchronous checkpointing. There have been reports 
> recently, that there might be a problem with some Kryo types.
> 
> Can you set the logging level to DEBUG? We have some checks enabled in that 
> case in the Kryo serializer to verify that the KryoSerializer is really 
> concurrently accessed.
> 
> Are you using any Scala types, in particular collections or "Try"?
> 
> Cheers, 
> 
> Konstantin
> 
> On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts  > wrote:
> This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more 
> circumstances. 
> 
> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf  > wrote:
> 
>> Hi Andrew, 
>> 
>> which Flink version do you use? This sounds a bit like 
>> https://issues.apache.org/jira/browse/FLINK-8836 
>> .
>> 
>> Cheers, 
>> 
>> Konstantin
>> 
>> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts > > wrote:
>> Hello,
>> 
>> I’m trying to convert some of our larger stateful computations into 
>> something that aligns more with the Flink windowing framework, and 
>> particularly, start using “event time” instead of “ingest time” as a time 
>> characteristics.
>> 
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka 
>> source), and while my data is generally time-ordered, there are some 
>> upstream races, so I’m attempting to assign timestamps and watermarks using 
>> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When 
>> I assign timestamps directly in the Kafka sources (I’m also connecting two 
>> Kafka streams here) using 
>> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my 
>> extractor has to do a bunch of “faking” because not every record that is 
>> produced will have a valid timestamp - for example, a record that can’t be 
>> parsed won’t.
>> 
>> When I assign timestamps downstream, after filtering the stream down to just 
>> records that are going to be windowed, I see errors in my Flink job:
>> 
>> java.io.IOException: Exception while applying AggregateFunction in 
>> aggregating state
>> at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>> at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>> at org.apache.flink.streaming.runtime.io 
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>> at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>> ... 6 more
>> 
>> I am calling aggregate() on my windows, but otherwise 

Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread Stefan Richter
Congrats Thomas!,

Best,
Stefan

> Am 12.02.2019 um 11:20 schrieb Stephen Connolly 
> :
> 
> Congratulations to Thomas. I see that this is not his first time in the PMC 
> rodeo... also somebody needs to update LDAP as he's not on 
> https://people.apache.org/phonebook.html?pmc=flink 
>  yet!
> 
> -stephenc
> 
> On Tue, 12 Feb 2019 at 09:59, Fabian Hueske  > wrote:
> Hi everyone,
> 
> On behalf of the Flink PMC I am happy to announce Thomas Weise as a new 
> member of the Apache Flink PMC.
> 
> Thomas is a long time contributor and member of our community. 
> He is starting and participating in lots of discussions on our mailing lists, 
> working on topics that are of joint interest of Flink and Beam, and giving 
> talks on Flink at many events.
> 
> Please join me in welcoming and congratulating Thomas!
> 
> Best,
> Fabian



Re: H-A Deployment : Job / task manager configuration

2019-02-06 Thread Stefan Richter
Hi,

You only need to do the configuration in conf/flink-conf.yaml on the job 
manager. The configuration will be shipped to the TMs.

Best,
Stefan 

> On 5. Feb 2019, at 16:59, bastien dine  wrote:
> 
> Hello everyone,
> 
> I would like to know what exactly I need to configure on my job / task 
> managers for an H-A deployment
> The document 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html
>  
> )
>  is not really fluent about this..
> The conf/masters need to be on job / tasks ? or only on taskmaangers to find 
> the job manager(s)
> If so, the conf/flink-conf.yaml of task manager need to be set to ha 
> zookeeper only on job manager ? Or on taskmanager too ?
> Just to know exactly where we need to configure things will help to know a 
> little more about interaction between job manager / task manager / zk
> 
> Best Regards,
> Bastien
> 
> --
> 
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io 



Re: JDBCAppendTableSink on Data stream

2019-02-06 Thread Stefan Richter
Hi,

That should be no problem, for example the `JDBCAppendTableSinkTest` is using 
it also with data stream.

Best,
Stefan

> On 6. Feb 2019, at 07:29, Chirag Dewan  wrote:
> 
> Hi,
> 
> In the documentation, the JDBC sink is mentioned as a source on Table 
> API/stream. 
> 
> Can I use the same sink with a Data stream as well?
> 
> My use case is to read the data from Kafka and send the data to Postgres.
> 
> I was also hoping to achieve Exactly-Once since these will mainly be 
> Idempotent writes on a table.
> 
> Any help much appreciated. 
> 
> Thanks,
> 
> Chirag



Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

2019-01-22 Thread Stefan Richter
Hi,

Which version of Flink are you using? This issue 
https://issues.apache.org/jira/browse/FLINK-10283 
 shows that a similar 
problem was fixed in 1.6.1 and 1.7. If you use a newer version and still 
encounter the problem, you can reopen the issue and describe how this is still 
a problem for you.

Best,
Stefan 

> On 22. Jan 2019, at 13:49, Chang Liu  wrote:
> 
> I have tried another way, it is not working as well:
> 
> def evaluationStream(
> indicatorsStream: DataStream[Indicators],
> scenarios: Set[Scenario]): DataStream[Evaluation] =
>   indicatorsStream.map(new IndicatorsToTxEval(scenarios))
> 
> class IndicatorsToTxEval(
> scenarios: Set[Scenario])
>   extends MapFunction[Indicators, Evaluation] {
> 
>   override def map(inds: Indicators): Evaluation =
> Evaluation(indicators.id , 
> evaluateScenarios(indicators, scenarios))
> }
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 22 Jan 2019, at 13:33, Chang Liu > > wrote:
>> 
>> Ok, I think I found where is the issue, but I don’t understand why.
>> 
>> I have a method:
>> 
>> def evaluationStream(
>> indicatorsStream: DataStream[Indicators],
>> scenarios: Set[Scenario]): DataStream[Evaluation] =
>>   indicatorsStream.map { indicators =>
>> Evaluation(indicators.id , 
>> evaluateScenarios(indicators, scenarios))
>>   }
>> 
>> And this is how I am using it:
>> 
>> lazy indicatorsStream: DataStream[Indicators] = ...
>> 
>> lazy val scenarios: Set[Scenario] = loadScenarios(...)
>> 
>> lazy val evalStream: DataStream[Evaluation] = 
>> evaluationStream(indicatorsStream, scenarios).print()
>> 
>> 
>> The problem is caused by the scenarios, which is passed as an argument of 
>> the method evaluationStream. But is is not working.
>> 
>> It will work if I do it in the following way:
>> 
>> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
>> 
>> def evaluationStream(indicatorsStream: DataStream[Indicators]): 
>> DataStream[Evaluation] =
>>   indicatorsStream.map { indicators =>
>> Evaluation(indicators.id , 
>> evaluateScenarios(indicators, scenarios))
>>   }
>> 
>> where the scenarios is not passed as a method argument but is a static 
>> object variable.
>> 
>> But this is not what I want, I would like to have a configurable scenarios 
>> which I can load from config file instead of a static object variable.
>> 
>> Any idea why this is happening? I also have other codes where I am also 
>> passing arguments and use them as part of my data flow and they are just 
>> working fine.
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 22 Jan 2019, at 10:47, Chang Liu >> > wrote:
>>> 
>>> Dear community,
>>> 
>>> I am having a problem releasing the job.
>>> 
>>> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
>>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream 
>>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. 
>>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching 
>>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
>>>  
>>> I searched online but only found this: 
>>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>>>  
>>> 
>>> 
>>> However, this warnings are keeping popping up and the job cannot be 
>>> released so that my data flow is not working.
>>> 
>>> But if I remove my last operator, it will work just fine. But my last 
>>> operator is justing doing some map operation. I am wondering what could be 
>>> the cause of this issue?
>>> 
>>> Many Thanks :)
>>> 
>>> Best regards/祝好,
>>> 
>>> Chang Liu 刘畅
>>> 
>>> 
>> 
> 



Re: [SURVEY] Custom RocksDB branch

2019-01-22 Thread Stefan Richter
+1 from me as well.

> On 22. Jan 2019, at 10:47, aitozi  wrote:
> 
> +1 from my side, since we rely on this feature to implement the real state
> ttl .
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Stefan Richter
Hi,

I have seen a few cases where for certain jobs a small imbalance in the state 
partition assignment did cascade into a larger imbalance of the job. If your 
max parallelism mod parallelism is not 0, it means that some tasks have one 
partition more than others. Again, depending on how much partitions you have 
assigned to each task, in the extremest case when every task has 1 key group, 
except for one that has 2, imbalance can be 100%. Maybe you could check for 
that, especially if you were running at a different parallelism in production 
and stress testing. This would also explain why the any checkpoint duration is 
longer for a task, because it would have much more state - assuming that the 
load is kind of balanced between partitions.

Best,
Stefan 

> On 15. Jan 2019, at 11:42, Bruno Aranda  wrote:
> 
> Hi,
> 
> Just an update from our side. We couldn't find anything specific in the logs 
> and the problem is not easy reproducible. This week, the system is running 
> fine, which makes me suspicious as well of some resourcing issue. But so far, 
> we haven't been able to find the reason though we have discarded a few 
> things. We consume from Kafka, and the load was properly balanced. We 
> couldn't find a relationship between rate and the task manager checkpoint 
> being slower. The problem could happen even at the times of day where we get 
> less messages. After a flink session restart (using AWS EMR), another TM in a 
> different machine could have been the one with the longer checkpoints.
> 
> We are now trying to reproduce the problem in a different cluster by trying 
> to send the data that was crossing the system while we saw the problems and 
> see if we can identify something specific to it. But our data is pretty 
> uniform, so not sure, and so far we have only seen this problem in our Prod 
> environment and not when running stress tests which much higher load.
> 
> Will come back if we figure anything out.
> 
> Thanks,
> 
> Bruno
> 
> On Tue, 15 Jan 2019 at 10:33, Till Rohrmann  > wrote:
> Same here Pasquale, the logs on DEBUG log level could be helpful. My guess 
> would be that the respective tasks are overloaded or there is some resource 
> congestion (network, disk, etc).
> 
> You should see in the web UI the number of incoming and outgoing events. It 
> would be good to check that the events are similarly sized and can be 
> computed in roughly the same time.
> 
> Cheers,
> Till
> 
> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana  > wrote:
> I have the same problem, even more impactful. Some subtasks stall forever 
> quite consistently.
> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't 
> help.
> The Backend doesn't seem to make any difference, I've tried Memory, FS and 
> RocksDB back ends but nothing changes. I've also tried to change the medium, 
> local spinning disk, SAN or mounted fs but nothing helps.
> Parallelism is the only thing which mitigates the stalling, when I set 1 
> everything works but if I increase the number of parallelism then everything 
> degrades, 10 makes it very slow 30 freezes it.
> It's always one of two subtasks, most of them does the checkpoint in few 
> milliseconds but there is always at least one which stalls for minutes until 
> it times out. The Alignment seems to be a problem.
> I've been wondering whether some Kafka partitions where empty but there is 
> not much data skew and the keyBy uses the same key strategy as the Kafka 
> partitions, I've tried to use murmur2 for hashing but it didn't help either.
> The subtask that seems causing problems seems to be a CoProcessFunction.
> I am going to debug Flink but since I'm relatively new to it, it might take a 
> while so any help will be appreciated. 
> 
> Pasquale
> 
> 
> From: Till Rohrmann mailto:trohrm...@apache.org>> 
> Sent: 08 January 2019 17:35
> To: Bruno Aranda mailto:bara...@apache.org>>
> Cc: user mailto:user@flink.apache.org>>
> Subject: Re: Subtask much slower than the others when creating checkpoints
> 
> Hi Bruno,
> 
> there are multiple reasons wh= one of the subtasks can take longer for 
> checkpointing. It looks as if the=e is not much data skew since the state 
> sizes are relatively equal. It als= looks as if the individual tasks all 
> start at the same time with the chec=pointing which indicates that there 
> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally 
> back-pressured). This narrows the pro=lem cause down to the asynchronous 
> write operation. One potential problem =ould be if the external system to 
> which you write your checkpoint data has=some kind of I/O limit/quota. Maybe 
> the sum of write accesses deplete the =aximum quota you have. You could try 
> whether running the job with a lower =arallelism solves the problems.
> 
> For further debug=ing it could be helpful to get access to the logs of the 
> JobManager and th= TaskManagers on DEBUG log 

Re: Bug in RocksDB timer service

2019-01-15 Thread Stefan Richter
Hi,

I have never seen this before. I would assume to see this exception because the 
write batch is flushed and contained a write against a column family that does 
not exist (anymore). However, we initialize everything relevant in 
RocksDBCachingPriorityQueueSet as final (CF handle) and never drop any column 
families or exchange db instances that are used with the writebatch, except 
after timer service and writebatch are already closed, in dispose(). Would be 
nice if they had added the name of the missing CF to the exception. The last 
remove is not necessarily the culprit, is is just what happened to trigger the 
flush, but it could be the culprit because any batched op could be. If you 
observe it near checkpoints and watermarks, that is not surprising because 
those are two points where flushes are likely to happen. Do you have any custom 
modifications that can drop column families. Because I cannot see where a CF 
could get lost in the vanilla Flink code. Is there any other particular 
circumstance around this happening, e.g. like first flush after a restore or 
something like that?

Best,
Stefan

> On 15. Jan 2019, at 09:48, Gyula Fóra  wrote:
> 
> Hi!
> 
> Lately I seem to be hitting a bug in the rocksdb timer service. This happens 
> mostly at checkpoints but sometimes even at watermark:
> 
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:330)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:220)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.rocksdb.RocksDBException: Invalid column family specified in write batch
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:333)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:166)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:56)
>   at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:97)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:249)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>   at 
> com.king.rbea.backend.operators.scriptexecution.RbeaOperator.processWatermark(RbeaOperator.java:193)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark1(AbstractStreamOperator.java:793)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:327)
>   ... 7 more
> Caused by: org.rocksdb.RocksDBException: Invalid column family specified in 
> write batch
>   at org.rocksdb.RocksDB.write0(Native Method)
>   at org.rocksdb.RocksDB.write(RocksDB.java:602)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:95)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.remove(RocksDBWriteBatchWrapper.java:89)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:331)
> 
> Has anyone seen this yet?
> Dont remember seeing this before 1.7
> 
> Gyula



Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

2019-01-09 Thread Stefan Richter
Hi,

Could you also provide the job master log?

Best,
Stefan

> On 9. Jan 2019, at 12:02, sohimankotia  wrote:
> 
> Hi,
> 
> I am running Flink Streaming Job with 1.5.5 version.
> 
> - Job is basically reading from Kafka , windowing on 2 minutes , and writing
> to hdfs using AvroBucketing Sink .
> - Job is running with parallelism 132
> - Checkpointing is enabled with interval of 1 minute.
> - Savepoint is enabled and getting triggered every 30 min .
> 
> 
> Few Modified Properties :
> 
> akka.ask.timeout: 15min
> akka.client.timeout: 900s
> akka.lookup.timeout: 60s
> akka.tcp.timeout : 900s
> 
> akka.watch.heartbeat.interval: 120s
> akka.watch.heartbeat.pause: 900s
> 
> Issues :
> 
> Job is getting restarted 3 to 4 time every day ( At random times). It simply
> says attempting to cancel task. No exception or logging . I tried to set 
> 
> log4j.logger.org.apache.flink.runtime.taskmanager.Task=DEBUG,file  
> 
> But nothing important is getting logged. 
> 
> Enabling DEBUGGING at Flink level is making Streaming Application to slow (
> so can not do that ).
> 
> Attaching Task logs .
> 
> task.gz
> 
>   
> 
> 
> Thanks
> Sohi
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: windowAll and AggregateFunction

2019-01-09 Thread Stefan Richter
Hi,

I think your expectation about windowAll is wrong, from the method 
documentation: “Note: This operation is inherently non-parallel since all 
elements have to pass through the same operator instance” and I also cannot 
think of a way in which the windowing API would support your use case without a 
shuffle. You could probably build the functionality by hand through, but I 
guess this is not quite what you want.

Best,
Stefan

> On 9. Jan 2019, at 13:43, CPC  wrote:
> 
> Hi all,
> 
> In our implementation,we are consuming from kafka and calculating distinct 
> with hyperloglog. We are using windowAll function with a custom 
> AggregateFunction but flink runtime shows a little bit unexpected behavior at 
> runtime. Our sources running with parallelism 4 and i expect add function to 
> run after source calculate partial results and at the end of the window i 
> expect it to send 4 hll object to single operator to merge there(merge 
> function). Instead, it sends all data to single instance and call add 
> function there. 
> 
> Is here any way to make flink behave like this? I mean calculate partial 
> results after consuming from kafka with paralelism of sources without 
> shuffling(so some part of the calculation can be calculated in parallel) and 
> merge those partial results with a merge function?
> 
> Thank you in advance...



Re: Can checkpoints be used to migrate jobs between Flink versions ?

2019-01-09 Thread Stefan Richter
Hi,

I would assume that this should currently work because the format of basic 
savepoints and checkpoints is the same right now. The restriction in the doc is 
probably there in case that the checkpoint format will diverge more in the 
future.

Best,
Stefan

> On 9. Jan 2019, at 13:12, Edward Rojas  wrote:
> 
> Hello,
> 
> For upgrading jobs between Flink versions I follow the guide in the doc
> here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html#upgrading-the-flink-framework-version
> 
> It states that we should always use savepoints for this procedure, I
> followed it and it works perfectly. 
> 
> I just would like to know if there is a reason why is not advised to use
> checkpoints for this procedure.
> 
> Say for example that the job has externalized checkpoints with
> RETAIN_ON_CANCELLATION policy, one could cancel the job before the upgrade
> and use the retained checkpoint to restart the job from it once the Flink
> cluster is upgraded... or maybe I'm missing something ?
> 
> I performed some tests and we are able to upgrade using checkpoint, by
> passing the checkpoint path in the "flink run -s" parameter.
> 
> Could you help to clarify if this is advised (and supported) or we should
> stick to the use of savepoints for this kind of manipulations ?
> 
> 
> Thanks in advance for your help.
> 
> Edward
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Zookeeper shared by Flink and Kafka

2019-01-09 Thread Stefan Richter
Hi,

That is more a ZK question than a Flink question, but I don’t think there is a 
problem.

Best,
Stefan

> On 9. Jan 2019, at 13:31, min@ubs.com wrote:
> 
> Hi,
>  
> I am new to Flink.
>  
> I have a question:
> Can a zookeeper cluster be shared by a flink cluster and a kafka cluster?
>  
> Regards,
>  
> Min
> 
> Check out our new brand campaign: www.ubs.com/together 
> 
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the risk that the banking relationship and confidential 
> information relating thereto are disclosed to third parties.
> UBS reserves the right to retain and monitor all messages. Messages are 
> protected and accessed only in legally justified cases.
> For information on how UBS uses and discloses personal data, how long we 
> retain it, how we keep it secure and your data protection rights, please see 
> our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html 
> 


Re: Error while reading from hadoop sequence file

2018-12-13 Thread Stefan Richter
Hi,

In that case, are you sure that your Flink version corresponds to the version 
of the flink-hadoop-compatibility jar? It seems that you are using Flink 1.7 
for the jar and your cluster needs to run that version as well. IIRC, this 
particular class was introduced with 1.7, so using a different version of other 
jars would be expected to give you this exception.

Best,
Stefan

> On 12. Dec 2018, at 08:34, Akshay Mendole  wrote:
> 
> Hi Stefen,
> You are correct. I logged the error messages incorrectly in 
> my previous mail. 
> 
> When I execute this code snippet
> DataSource> input = 
> env.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir));
> I got this error
> The type returned by the input format could not be automatically determined. 
> Please specify the TypeInformation of the produced type explicitly by using 
> the 'createInput(InputFormat, TypeInformation)' method instead.
>   
> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
>   flipkart.EnrichementFlink.main(EnrichementFlink.java:31)
> 
> When I gave TypeInfomation manually,
> DataSource> input = 
> env.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir),
> TypeInformation.of(new TypeHint>() {
> }));
> I started getting this error message
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:290)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
> Caused by: java.lang.RuntimeException: Could not load the TypeInformation for 
> the class 'org.apache.hadoop.io.Writable'. You may be missing the 
> 'flink-hadoop-compatibility' dependency.
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2082)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1701)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1643)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:921)
> 
> 
> When I copied flink-hadoop-compatibility_2.11-1.7.0.jar to flink lib 
> directory and executed,
> I got this error message
> java.lang.NoClassDefFoundError: 
> org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
>   at 
> org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
>   at 
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
>   at 
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
>   at 
&

Re: CodeCache is full - Issues with job deployments

2018-12-13 Thread Stefan Richter
Hi,

Thanks for analyzing the problem. If it turns out that there is a problem with 
the termination of the Kafka sources, could you please open an issue for that 
with your results?

Best,
Stefan

> On 11. Dec 2018, at 19:04, PedroMrChaves  wrote:
> 
> Hello Stefan,
> 
> Thank you for the reply.
> 
> I've taken a heap dump from a development cluster using jmap and analysed
> it. To do the tests we restarted the cluster and then left a job running for
> a few minutes. After that, we restarted the job a couple of times and
> stopped it. After leaving the cluster with no running jobs for 20 min we
> toke a heap dump.
> 
> We've found out that a thread which consumes data from kafka was still
> running with a lot of finalizer calls as depicted bellow. 
> 
> 
> 
>  
> 
> I will deploy a job without a Kafka consumer to see if the code cache still
> increases  (all of our cluster have problems with the code cache,
> coincidentally all of the deployed jobs read from kafka).
> 
> 
> Best Regards,
> Pedro Chaves
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: CodeCache is full - Issues with job deployments

2018-12-11 Thread Stefan Richter
Hi,

in general, Flink uses user-code class loader for job specific code and the 
lifecycle of the class loader should end with the job. This usually means that 
job related code could be removed after the job is finished. However, objects 
of a class that was loaded by the user-code class loader should no longer be 
referenced from anywhere after the job finished or else the user-code class 
loader cannot be freed. If that is the case depends on the user code and the 
used dependencies, e.g. the user code might register some objects somewhere and 
does not remove them by the end of the job. This would prevent freeing the 
user-code and result in a leak. To figure out the root cause, you can take can 
analyse a heap dump for leaking class loaders, e.g. [1] and other sources on 
the web go deeper into this topic.

Best,
Stefan

[1] http://java.jiderhamn.se/category/classloader-leaks/ 


> On 11. Dec 2018, at 12:56, PedroMrChaves  wrote:
> 
> Hello,
> 
> Every time I deploy a flink job the code cache increases, which is expected.
> However, when I stop and start the job or it restarts the code cache
> continuous to increase.
> 
> Screenshot_2018-12-11_at_11.png
> 
>   
> 
> 
> I've added the flags "-XX:+PrintCompilation -XX:ReservedCodeCacheSize=350m
> -XX:-UseCodeCacheFlushing" to Flink taskmanagers and jobmanagers, but the
> cache doesn't decrease very much, as it is depicted in the screenshot above.
> Even if I stop all the jobs, the cache doesn't decrease. 
> 
> This gets to a point where I get the error "CodeCache is full. Compiler has
> been disabled".
> 
> I've attached the taskmanagers output with the "XX:+PrintCompilation" flag
> activated.
> 
> flink-flink-taskexecutor.out
> 
>   
> 
> Flink: 1.6.2
> Java:  openjdk version "1.8.0_191"
> 
> Best Regards,
> Pedro Chaves.
> 
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error while reading from hadoop sequence file

2018-12-11 Thread Stefan Richter
Hi,

I am a bit confused by the explanation, the exception that you mentioned, is it 
happening in the first code snippet ( with the TypeInformation.of(…)) or the 
second one? From looking into the code, I would expect the exception can only 
happen in the second snippet (without TypeInformation) but I am also wondering 
what the exception is for the first snippet then, because from the code I think 
the exception cannot be the same but something different, see:

https://github.com/apache/flink/blob/70b2029f8a3d4ca2d3cb7bd7fddac9bb5b3e8f07/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L551
 


Vs

https://github.com/apache/flink/blob/70b2029f8a3d4ca2d3cb7bd7fddac9bb5b3e8f07/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L577
 


Can you please clarify? I would expect that it should work once you call the 
method and provide the type info, or else what exactly is the exception there.

Best,
Stefan

> On 10. Dec 2018, at 13:35, Akshay Mendole  wrote:
> 
> Hi,
>I have been facing issues while trying to read from a hdfs sequence file.
> 
> This is my code snippet
> DataSource> input = env
> .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir),
> TypeInformation.of(new TypeHint>() {
> }));
> 
> Upon executing this in yarn cluster mode, I am getting following error
> The type returned by the input format could not be automatically determined. 
> Please specify the TypeInformation of the produced type explicitly by using 
> the 'createInput(InputFormat, TypeInformation)' method instead.
>   
> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
>   flipkart.EnrichementFlink.main(EnrichementFlink.java:31)
> 
> 
> When I add the TypeInformation myself as follows, I run into the same issue.
> DataSource> input = env
> .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, 
> ravenDataDir));
> 
> 
> 
> When I add these libraries in the lib folder, 
> flink-hadoop-compatibility_2.11-1.7.0.jar
> 
> 
> the error changes to this
> 
> java.lang.NoClassDefFoundError: 
> org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
>   at 
> org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
>   at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
>   at 
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
>   at 
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> 
> 
> Can someone help me resolve this issue?
> 
> Thanks,
> Akshay
> 
> 
> 



Re: Very slow checkpoints occasionally occur

2018-12-11 Thread Stefan Richter
Hi,

Looking at the numbers, it seems to me that checkpoint execution (the times of 
the sync and async part) are always reasonable fast once they are executed on 
the task, but there are changes in the alignment time and the time from 
triggering a checkpoint to executing a checkpoint. As you are using windows and 
looking at the way the state size behaves before and after the problem, I might 
have a suggestion what could cause the problem. Before and during the 
problematic checkpoints, state size is rising. After the problem is gone, the 
state size is significantly smaller. Could it be that, as time progresses or 
jumps, there is a spike in session window triggering? When time moves it could 
be possible that suddenly a lot of windows are triggered and when a checkpoint 
barrier is arriving after the firing was triggered, it will have to wait until 
all window firing is completed for consistency reason. This would also explain 
the backpressure that you observe during this period, coming from a lot of / 
expensive window firing and future events/checkpoints can only proceed when the 
firing is done. You could investigate if that is what is happening and maybe 
take measure to avoid this, but that is highly dependent on your job logic.

Best,
Stefan 

> On 11. Dec 2018, at 10:26, Dongwon Kim  wrote:
> 
> Hi all,
> 
> We're facing the same problem mentioned in [1] - Very slow checkpoint 
> attempts of few tasks cause checkpoint failures and, furthermore, incur high 
> back pressure.
> We're running our Flink jobs on a cluster where
> - 2 masters + 8 worker nodes
> - all nodes, even masters, are equipped with SSD's
> - we have a separate cluster for Kafka
> - we depend largely on Hadoop-2.7.3; YARN for deploying per-job clusters and 
> HDFS for storing checkpoints and savepoints
> - All SSD's of each node serve as local-dirs for YARN NM and data-dirs for 
> HDFS DN
> - we use RocksDB state backend
> - we use the latest version, flink-1.7.0
> - we trigger checkpoints every 30 minutes and the size of state is not that 
> large as shown in the attached screenshot.
> 
> The job itself recovers from checkpoint failures and back pressure after a 
> while; [2] shows that the job recovers after three failed checkpoints.
> 
> Below is part of JM log message:
> 2018-12-10 17:24:36,150 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 14 @ 1544430276096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 17:24:57,912 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 14 for job e0cf3843cba85e8fdd5570ba18970d33 (43775252946 bytes in 
> 21781 ms).
> 2018-12-10 17:54:36,133 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 15 @ 1544432076096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 18:04:36,134 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 15 
> of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 18:24:36,156 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 16 @ 1544433876096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 18:34:36,157 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 16 
> of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 18:54:36,138 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 17 @ 1544435676096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:04:36,139 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 17 
> of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 19:15:44,849 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 15 from 
> e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:16:37,822 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 16 from 
> e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:17:12,974 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 17 from 
> e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:24:36,147 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 18 @ 1544437476096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:32:05,869 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 18 for job e0cf3843cba85e8fdd5570ba18970d33 (52481833829 bytes in 
> 449738 ms).
> #15, #16, and #17 fail due to a single task e81a7fb90d05da2bcec02a34d6f821e3, 
> which is a 

Re: After job cancel, leftover ZK state prevents job manager startup

2018-12-11 Thread Stefan Richter
Hi,

Thanks for reporting the problem, I think the exception trace looks indeed very 
similar to traces in the discussion for FLINK-10184. I will pull in Till who 
worked on the fix to hear his opinion. Maybe the current fix only made the 
problem less likely to appear but is not complete, yet?

Best,
Stefan

> On 11. Dec 2018, at 05:19, Micah Wylde  wrote:
> 
> Hello,
> 
> We've been seeing an issue with several Flink 1.5.4 clusters that looks like 
> this:
> 
> 1. Job is cancelled with a savepoint
> 2. The jar is deleted from our HA blobstore (S3)
> 3. The jobgraph in ZK is *not* deleted
> 4. We restart the cluster
> 5. Startup fails in recovery because the jar is not available, with the 
> stacktrace:
> 
> 00:13:58.486 ERROR o.a.f.r.e.ClusterEntrypoint - Fatal error occurred in the 
> cluster entrypoint.
> {{ java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager}}
> {{ at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)}}
> {{ at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)}}
> {{ at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)}}
> {{ at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)}}
> {{ at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> {{ at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> {{ at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> {{ at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}Caused
>  by: java.lang.Exception: Cannot set up the user code libraries: No such file 
> or directory: 
> s3://streamingplatform-production/{JOB_NAME}/flink/highavailability/{JOB_NAME}/blob/job_5a3fe2c00c05efd3a552a1c6707d2c10/blob_p-6d585831f5c947335ac505b400cf8f3630cc706a-42355c2885b668b0bc5e15b856141b0
> 
> This superficially seems similar to several issues that have apparently been 
> fixed in 1.5.4, like FLINK-10255 and FLINK-10184.
> 
> Has anybody else seen this issue on 1.5.4 (or later) clusters? Or any advice 
> for debugging?
> 
> Thanks,
> Micah



Re: Failed to resume job from checkpoint

2018-12-10 Thread Stefan Richter
Hi,

good that you found the cause of the problem in your configuration setting, but 
unfortunately I think I cannot yet follow your reasoning. Can you explain why 
the code would fail for a “slow” HDFS? If no local recovery is happening (this 
means: job failover, with local recovery activated)  the job will always first 
download all files from HDFS to your local disk. After that, it will hard link 
the file on local disk to another directory. I would assume that all HDFS 
problems like slowdowns will show in the part that is downloading the files to 
local disk. But your exceptions comes after that, when the file supposedly was 
already copied. So I don’t understand how you think that this is connected, can 
you please explain it in more detail? 

For your second question, Flink currently assumes that your HDFS (or whatever 
checkpoint filesystem you use) is stable, highly available storage and that 
files do not “get lost”. It can tolerate temporary outages through multiple 
restart attempts, but your setup of the checkpoint directory should prevent 
data loss.

Best,
Stefan 

> On 9. Dec 2018, at 14:05, Ben Yan  wrote:
> 
> hi,
> 
> 1. I took a closer look at the relevant code about 
> RocksDBIncrementalRestoreOperation::restoreInstanceDirectoryFromPath. And I 
> did some verification. I found this problem is likely related to file system 
> connection restrictions. At first I was worried that my hdfs would be 
> overloaded due to a large number of connections, so I configured the 
> following related parameters:
> 
> fs..limit.total: (number, 0/-1 mean no limit)
> fs..limit.input: (number, 0/-1 mean no limit)
> fs..limit.output: (number, 0/-1 mean no limit)
> fs..limit.timeout: (milliseconds, 0 means infinite)
> fs..limit.stream-timeout: (milliseconds, 0 means infinite)
> 
> Since I configured the above configuration, this problem has begun to appear! 
> When I removed the above configuration, the problem disappeared.I think that 
> when flink is configured with file system connection restrictions, the 
> mechanism for recovering from checkpoint needs to be improved. Jobs can 
> recover from checkpoints more slowly with file system connection 
> restrictions, rather than failing directly because of the above exceptions.
> 
> 2. After the job has been running for a long time, if the state data stored 
> in the state backend (such as hdfs) is lost for some reason, what other ways 
> can quickly restore this state data back quickly, for example, through some 
> kind of offline task is to quickly recover state data from offline data, so 
> that streaming jobs can be launched from this recovered state data.
> 
> Best
> Ben
> 
> Ben Yan mailto:yan.xiao.bin.m...@gmail.com>> 
> 于2018年12月8日周六 上午11:08写道:
> I hava already tested it.
> 
> [root@node ~]#ll 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0038/
> total 32
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> blobStore-273cf1a6-0f98-4c86-801e-5d76fef66a58
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> blobStore-992562a5-f42f-43f7-90de-a415b4dcd398
> drwx--x---  4 yarn hadoop 4096 Dec  8 02:29 
> container_e73_1544101169829_0038_01_59
> drwx--x--- 13 yarn hadoop 4096 Dec  8 02:29 filecache
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> flink-dist-cache-6d8dab0c-4034-4bbe-a9b9-b524cf6856e3
> drwxr-xr-x  8 yarn hadoop 4096 Dec  8 02:29 
> flink-io-6fba8471-4d84-4c13-9e3c-ef3891b366f0
> drwxr-xr-x  4 yarn hadoop 4096 Dec  8 02:29 localState
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29 
> rocksdb-lib-7ef4471db8d3b8c1bdcfa4dba4d95a36
> 
> And the derectory "flink-io-6fba8471-4d84-4c13-9e3c-ef3891b366f0" does not 
> exist.
> 
> [root@node ~]#ll 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0038/
> total 12
> drwx--x--- 13 yarn hadoop 4096 Dec  8 02:29 filecache
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:53 localState
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:53 
> rocksdb-lib-7ef4471db8d3b8c1bdcfa4dba4d95a36
> 
> Ben Yan mailto:yan.xiao.bin.m...@gmail.com>> 
> 于2018年12月8日周六 上午12:23写道:
> Thank you for your advice! I will check this out next, and I will sync the 
> information at any time with new progress.
> 
> Stefan Richter  <mailto:s.rich...@data-artisans.com>> 于2018年12月8日周六 上午12:05写道:
> I think then you need to investigate what goes wrong in 
> RocksDBIncrementalRestoreOperation::restoreInstanceDirectoryFromPath. If you 
> look at the code it lists the files in a directory and tries to hard link 
> them into another directory, and I would only expect to see the mentioned 
> exception if the original file that we try to link does not exist. However, 
> imo it must exist because we list it in the directory right before the link 
> attempt

Re: Failed to resume job from checkpoint

2018-12-07 Thread Stefan Richter
I think then you need to investigate what goes wrong in 
RocksDBIncrementalRestoreOperation::restoreInstanceDirectoryFromPath. If you 
look at the code it lists the files in a directory and tries to hard link them 
into another directory, and I would only expect to see the mentioned exception 
if the original file that we try to link does not exist. However, imo it must 
exist because we list it in the directory right before the link attempt and 
Flink is not delete anything in the meantime. So the question is, why can a 
file that was listed before just suddenly disappear when it is hard linked? The 
only potential problem could be in the path transformations and concatenations, 
but they look good to me and also pass all tests, including end-to-end tests 
that do exactly such a restore. I suggest to either observe the created files 
and what happens with the one that is mentioned in the exception or introduce 
debug logging in the code, in particular a check if the listed file (the link 
target) does exist before linking, which it should in my opinion because it is 
listed in the directory. 

> On 7. Dec 2018, at 16:33, Ben Yan  wrote:
> 
> The version of the recovered checkpoint is also 1.7.0 . 
> 
> Stefan Richter  <mailto:s.rich...@data-artisans.com>> 于2018年12月7日周五 下午11:06写道:
> Just to clarify, the checkpoint from which you want to resume in 1.7, was 
> that taken by 1.6 or by 1.7? So far this is a bit mysterious because it says 
> FileNotFound, but the whole iteration is driven by listing the existing 
> files. Can you somehow monitor which files and directories are created during 
> the restore attempt?
> 
>> On 7. Dec 2018, at 15:53, Ben Yan > <mailto:yan.xiao.bin.m...@gmail.com>> wrote:
>> 
>> hi ,Stefan
>> 
>> Thank you for your explanation. I used flink1.6.2, which is without any 
>> problems. I have tested it a few times with version 1.7.0, but every time I 
>> resume from the checkpoint, the job will show the exception I showed 
>> earlier, which will make the job unrecoverable.And I checked all the logs, 
>> except for this exception, there are no other exceptions.
>> 
>> The following is all the logs when an exception occurs:
>> 2018-12-06 22:53:41,282 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess 
>> (120/138) (25ab0c8d0bc657860b766fa4c8d85a42) switched from DEPLOYING to 
>> RUNNING.
>> 2018-12-06 22:53:41,285 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess 
>> (2/138) (f770d22a976463d90fb4349d1c8521b8) switched from RUNNING to FAILED.
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
>> state backend for 
>> KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5_(2/138) from any of 
>> the 1 provided restore options.
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>  ... 5 more
>> Caused by: java.nio.file.NoSuchFileException: 
>> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-133c16c5-4565-4014-b769-2a978af8e772/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__2_138__uuid_ab38b75f-77d7-4124-a410-6444b35d232d/db/000495.sst
>>  -> 
>> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-133c16c5-4565-4014-b769-2a978af8e772/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__2_138__uuid_ab38b75f-77d7-4124-a410-6444b35d232d/cf45eae8-d5d4-4f04-8bf9-8d54ac078769/000495.sst
>>  at 
>> sun.nio.fs.UnixException.translateToIOException(Uni

Re: Failed to resume job from checkpoint

2018-12-07 Thread Stefan Richter
apache.flink.runtime.executiongraph.ExecutionGraph- Job 
> Flink-Job-Offline (6e40c9381aa12f69b6ac182c91d993f5) switched from state 
> RUNNING to FAILING.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5_(2/138) from any of the 
> 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>   ... 5 more
> Caused by: java.nio.file.NoSuchFileException: 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-133c16c5-4565-4014-b769-2a978af8e772/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__2_138__uuid_ab38b75f-77d7-4124-a410-6444b35d232d/db/000495.sst
>  -> 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-133c16c5-4565-4014-b769-2a978af8e772/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__2_138__uuid_ab38b75f-77d7-4124-a410-6444b35d232d/cf45eae8-d5d4-4f04-8bf9-8d54ac078769/000495.sst
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at 
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>   at java.nio.file.Files.createLink(Files.java:1086)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBKeyedStateBackend.java:1238)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreLocalStateIntoFullInstance(RocksDBKeyedStateBackend.java:1186)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBKeyedStateBackend.java:916)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:864)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:525)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:147)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   ... 7 more
> 2018-12-06 22:53:41,287 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> topic.rate (1/16) (5637f1c3568ca7c29db002e579c05546) switched from RUNNING to 
> CANCELING.
> 
> 
> Best, 
> Ben
> 
> Stefan Richter  <mailto:s.rich...@data-artisans.com>> 于2018年12月7日周五 下午10:00写道:
> Hi,
> 
> From what I can see in the log here, it looks like your RocksDB is not 
> recovering from local but from a remote filesystem. This recovery basically 
> has steps:
> 
> 1: Create a temporary directory (in your example, this is the dir that ends 
> …/5683a26f-cde2-406d-b4cf-3c6c3976f8ba) and download all the files, mainly 
> sst files from remote fs to the temporary directory in local fs.
> 
> 2: List all the downloaded files in the temporary directory and either 
> hardlink (for sst files) or copy (for all other files) the listed files into 
> the new RocksDb instance path (the path that ends with …/db)
> 
> 3: Open the new db from the instance path, delete the temporary directory.
> 
> Now what is very surprising here is that it claims some

Re: Failed to resume job from checkpoint

2018-12-07 Thread Stefan Richter
Hi,

From what I can see in the log here, it looks like your RocksDB is not 
recovering from local but from a remote filesystem. This recovery basically has 
steps:

1: Create a temporary directory (in your example, this is the dir that ends 
…/5683a26f-cde2-406d-b4cf-3c6c3976f8ba) and download all the files, mainly sst 
files from remote fs to the temporary directory in local fs.

2: List all the downloaded files in the temporary directory and either hardlink 
(for sst files) or copy (for all other files) the listed files into the new 
RocksDb instance path (the path that ends with …/db)

3: Open the new db from the instance path, delete the temporary directory.

Now what is very surprising here is that it claims some file was not found (not 
clear which one, but I assume the downloaded file). However, how the file can 
be lost between downloading/listing and the attempt to hardlink it is very 
mysterious. Can you check the logs for any other exceptions and can you check 
what files exist in the recovery (e.g. what is downloaded, if the instance path 
is there, …). For now, I cannot see how a listed file could suddenly disappear, 
Flink will only delete the temporary directory if recovery is completed or 
failed. 

Also: is this problem deterministic or was this a singularity? Did you use a 
different Flink version before (which worked)?

Best,
Stefan

> On 7. Dec 2018, at 11:28, Ben Yan  wrote:
> 
> hi . I am using flink-1.7.0. I am using RockDB and hdfs as statebackend, but 
> recently I found the following exception when the job resumed from the 
> checkpoint. Task-local state is always considered a secondary copy, the 
> ground truth of the checkpoint state is the primary copy in the distributed 
> store. But it seems that the job did not recover from hdfs, and it failed 
> directly.Hope someone can give me advices or hints about the problem that I 
> encountered.
> 
> 
> 2018-12-06 22:54:04,171 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess 
> (3/138) (5d96a585130f7a21f22f82f79941fb1d) switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5_(3/138) from any of the 
> 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>   ... 5 more
> Caused by: java.nio.file.NoSuchFileException: 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-0115e9d6-a816-4b65-8944-1423f0fdae58/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__3_138__uuid_1c6a5a11-caaf-4564-b3d0-9c7dadddc390/db/000495.sst
>  -> 
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0004/flink-io-0115e9d6-a816-4b65-8944-1423f0fdae58/job_6e40c9381aa12f69b6ac182c91d993f5_op_KeyedProcessOperator_e528d5d97ea2d7cefbcf6ff5b46354d5__3_138__uuid_1c6a5a11-caaf-4564-b3d0-9c7dadddc390/5683a26f-cde2-406d-b4cf-3c6c3976f8ba/000495.sst
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at 
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>   at java.nio.file.Files.createLink(Files.java:1086)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBKeyedStateBackend.java:1238)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreLocalStateIntoFullInstance(RocksDBKeyedStateBackend.java:1186)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBKeyedStateBackend.java:916)
>   at 
> 

Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread Stefan Richter
Btw how did you make sure that it is stuck in the seek call and that the trace 
does not show different invocations of seek? This can indicate that seek is 
slow, but is not yet proof that you are stuck.

> On 22. Nov 2018, at 13:01, liujiangang  wrote:
> 
> This is not my case. Thank you.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: IntervalJoin is stucked in rocksdb'seek for too long time in flink-1.6.2

2018-11-22 Thread Stefan Richter
Hi,

are your RocksDB instances running on local SSDs or on something like EBS? If 
have previously seen cases where this happened because some EBS quota was 
exhausted and the performance got throttled.

Best,
Stefan

> On 22. Nov 2018, at 09:51, liujiangang  wrote:
> 
> Thank you very much. I have something to say. Each data is 20KB. The
> parallelism is 500 and each taskmanager memory is 10G. The memory is enough,
> and I think the parallelism is big enough. Only the intervalJoin thread is
> beyond 100% because of rockdb's seek. I am confused that why rockdb's seek
> taks so long time but get no result. I don't kow how to debug rocksdb in
> flink.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: ClassNotFoundException: org.apache.kafka.common.metrics.stats.Rate$1

2018-11-20 Thread Stefan Richter
Hi,

It should be as easy as making sure that there is a jar with the missing class 
in the class path of your user-code class loader.

Best,
Stefan

> On 20. Nov 2018, at 14:32, Avi Levi  wrote:
> 
> looking at the log file of the taskexecutor I see this exception
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.metrics.stats.Rate$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 22 more
> anyone know what should I do to avoid it ?



Re: Tentative release date for 1.6.3

2018-11-20 Thread Stefan Richter
Hi,

there is no release date for 1.6.3, yet.

Best,
Stefan

> On 20. Nov 2018, at 12:18, Shailesh Jain  wrote:
> 
> Hi,
> 
> Is the tentative release date for 1.6.3 decided?
> 
> Thanks,
> Shailesh



Re: FlinkCEP, circular references and checkpointing failures

2018-11-08 Thread Stefan Richter
Sure, it is already merged as FLINK-10816.

Best,
Stefan

> On 8. Nov 2018, at 11:53, Shailesh Jain  wrote:
> 
> Thanks a lot for looking into this issue Stefan.
> 
> Could you please let me know the issue ID once you open it? It'll help me 
> understand the problem better, and also I could do a quick test in our 
> environment once the issue is resolved.
> 
> Thanks,
> Shailesh
> 
> On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann  <mailto:trohrm...@apache.org> wrote:
> Really good finding Stefan!
> 
> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> I think I can already spot the problem: LockableTypeSerializer.duplicate() is 
> not properly implemented because it also has to call duplicate() on the 
> element serialiser that is passed into the constructor of the new instance. I 
> will open an issue and fix the problem.
> 
> Best,
> Stefan
> 
>> On 7. Nov 2018, at 17:17, Till Rohrmann > <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi Shailesh,
>> 
>> could you maybe provide us with an example program which is able to 
>> reproduce this problem? This would help the community to better debug the 
>> problem. It looks not right and might point towards a bug in Flink. Thanks a 
>> lot!
>> 
>> Cheers,
>> Till
>> 
>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz > <mailto:dwysakow...@apache.org>> wrote:
>> This is some problem with serializing your events using Kryo. I'm adding 
>> Gordon to cc, as he was recently working with serializers. He might give you 
>> more insights what is going wrong.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>> Hi Dawid,
>>> 
>>> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, 
>>> the only commit on top of 1.6 is this: 
>>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>>  
>>> <https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c>
>>> 
>>> I ran two separate identical jobs (with and without checkpointing enabled), 
>>> I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when 
>>> checkpointing (HDFS backend) is enabled, with the below stack trace.
>>> 
>>> I did see a similar problem with different operators here 
>>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html>).
>>>  Is this a known issue which is getting addressed?
>>> 
>>> Any ideas on what could be causing this?
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> 
>>> 2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task 
>>> - SelectCepOperatorMixedTime (1/1) - 
>>> SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) 
>>> switched from RUNNING to FAILED.
>>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter 
>>> function.
>>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>> at 
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>> at 
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>> at 
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.WrappingRuntimeException: 
>>> java.lang.ArrayIndexOutOfBound

Re: FlinkCEP, circular references and checkpointing failures

2018-11-07 Thread Stefan Richter
Hi,

I think I can already spot the problem: LockableTypeSerializer.duplicate() is 
not properly implemented because it also has to call duplicate() on the element 
serialiser that is passed into the constructor of the new instance. I will open 
an issue and fix the problem.

Best,
Stefan

> On 7. Nov 2018, at 17:17, Till Rohrmann  wrote:
> 
> Hi Shailesh,
> 
> could you maybe provide us with an example program which is able to reproduce 
> this problem? This would help the community to better debug the problem. It 
> looks not right and might point towards a bug in Flink. Thanks a lot!
> 
> Cheers,
> Till
> 
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz  > wrote:
> This is some problem with serializing your events using Kryo. I'm adding 
> Gordon to cc, as he was recently working with serializers. He might give you 
> more insights what is going wrong.
> 
> Best,
> 
> Dawid
> 
> On 25/10/2018 05:41, Shailesh Jain wrote:
>> Hi Dawid,
>> 
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, 
>> the only commit on top of 1.6 is this: 
>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>  
>> 
>> 
>> I ran two separate identical jobs (with and without checkpointing enabled), 
>> I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when 
>> checkpointing (HDFS backend) is enabled, with the below stack trace.
>> 
>> I did see a similar problem with different operators here 
>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html
>>  
>> ).
>>  Is this a known issue which is getting addressed?
>> 
>> Any ideas on what could be causing this?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> 2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task  
>>- SelectCepOperatorMixedTime (1/1) - 
>> SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched 
>> from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter 
>> function.
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException: 
>> java.lang.ArrayIndexOutOfBoundsException: -1
>> at 
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>> at 
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>> at 
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>> at 
>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>> at 
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>> at 
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>> at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>> ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>> at 
>> 

Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Stefan Richter
Hi,

I see, then the important question for me is if the problem exists on the 
release/master code or just on your branches. Of course we can hardly give any 
advice for custom builds and without any code. In general, you should debug in 
HeapKeyedStateBackend lines lines 774-776 (the write part) and check against 
472-474 (the read part). What happens there is very straight forward: remember 
the offset of the output stream, write the key-group. The read part the seeks 
to the remembered offset and reads the key-group. They must match.

Best,
Stefan

> On 15. Oct 2018, at 11:35, Averell  wrote:
> 
> Hi Kostas, Stefan,
> 
> The problem doesn't come on all of my builds, so it is a little bit
> difficult to track. Are there any specific classes that I can turn DEBUG on
> to help in finding the problem? (Turning DEBUG on globally seems too much).
> Will try to minimize the code and post it.
> 
> One more point that I notice is the error doesn't stay on one single
> operator but changes from time to time (even within the same build). For
> example, the previous exception I quoted was with a Window operator, while
> the one below is with CoStreamFlatMap.
> 
> Thanks and best regards,
> Averell
> 
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64)
> from any of the 1 provided restore options.
>   at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
>   ... 5 more
> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
>   at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   ... 7 more
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error restoring from savepoint while there's no modification to the job

2018-10-15 Thread Stefan Richter
Hi,

I think it is rather unlikely that this is the problem because it should  give 
a different kind of exception. Would it be possible to provide a minimal and 
self-contained example code for a problematic job?

Best,
Stefan

> On 15. Oct 2018, at 08:29, Averell  wrote:
> 
> Hi everyone,
> 
> In the StreamExecutionEnvironment.createFileInput method, a file source is
> created as following:
>   /SingleOutputStreamOperator source = 
> *addSource*(monitoringFunction,
> sourceName)
>   .*transform*("Split Reader: " + sourceName, 
> typeInfo, reader);/
> 
> Does this create two different operators? If yes, then it seems impossible
> to assign a UID to the 1st operator. And might it be the cause for my
> problem?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink leaves a lot RocksDB sst files in tmp directory

2018-10-12 Thread Stefan Richter
Hi,

Can you maybe show us what is inside of one of the directory instance? 
Furthermore, your TM logs show multiple instances of OutOfMemoryErrors, so that 
might also be a problem. Also how was the job moved? If a TM is killed, of 
course it cannot cleanup. That is why the data goes to tmp dir so that the OS 
can eventually take care of it, in container environments this dir should 
always be cleaned anyways.

Best,
Stefan

> On 11. Oct 2018, at 10:15, Sayat Satybaldiyev  wrote:
> 
> Thank you Piotr for the reply! We didn't run this job on the previous version 
> of Flink. Unfortunately, I don't have a log file from JM only TM logs. 
> 
> https://drive.google.com/file/d/14QSVeS4c0EETT6ibK3m_TMgdLUwD6H1m/view?usp=sharing
>  
> 
> 
> On Wed, Oct 10, 2018 at 10:08 AM Piotr Nowojski  > wrote:
> Hi,
> 
> Was this happening in older Flink version? Could you post in what 
> circumstances the job has been moved to a new TM (full job manager logs and 
> task manager logs would be helpful)? I’m suspecting that those leftover files 
> might have something to do with local recovery.
> 
> Piotrek 
> 
>> On 9 Oct 2018, at 15:28, Sayat Satybaldiyev > > wrote:
>> 
>> After digging more in the log, I think it's more a bug. I've greped a log by 
>> job id and found under normal circumstances TM supposed to delete flink-io 
>> files. For some reason, it doesn't delete files that were listed above.
>> 
>> 2018-10-08 22:10:25,865 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566.
>> 2018-10-08 22:10:25,867 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381.
>> 2018-10-08 22:10:25,874 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34.
>> 2018-10-08 22:17:38,680 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close 
>> JobManager connection for job a5b223c7aee89845f9aed24012e46b7e.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> 2018-10-08 22:17:38,686 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> 2018-10-08 22:17:38,691 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> 
>> 
>> On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev > > wrote:
>> Dear all,
>> 
>> While running Flink 1.6.1 with RocksDB as a backend and hdfs as checkpoint 
>> FS, I've noticed that after a job has moved to a different host it leaves 
>> quite a huge state in temp folder(1.2TB in total). The files are not used as 
>> TM is not running a job on the current host. 
>> 
>> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but 
>> then it was moved to a different TM. I'm wondering is it intended behavior 
>> or a possible bug?
>> 
>> I've attached files that are left and not used by a job in PrintScreen.
> 



Re: Large rocksdb state restore/checkpoint duration behavior

2018-10-10 Thread Stefan Richter
Hi,

I would assume that the problem about blocked processing during a checkpoint is 
caused by [1], because you mentioned the use of RocksDB incremental checkpoints 
and it could be that you use it in combination with heap-based timers. This is 
the one combination that currently still uses a synchronous checkpointing path 
for the timers, and if you have many timers, this can block the pipeline.

For the cancellation problem, as seen in the stack trace, I would assume it is 
because of [2]. In a nutshell: if the wall clock or event time changes, 
multiple timers can trigger (it can be a lot, also depending on how big the 
change is) and currently this loop does not check the task’s cancellation 
status and will only terminate when all onTimer calls have been handled.

If you have problems with slow save points, you can also try to restore from 
the externalised handle of an incremental checkpoint and see if this works 
better.

Best,
Stefan

[1] https://issues.apache.org/jira/browse/FLINK-10026 

[2] https://issues.apache.org/jira/browse/FLINK-9845

> On 10. Oct 2018, at 12:39, Aminouvic  wrote:
> 
> Hi,
> 
> We are using Flink 1.6.1 on yarn with rocksdb as backend incrementally
> checkpointed to hdfs (for data and timers).
> 
> The job reads events from kafka (~1 billion event per day), constructs user
> sessions using an EventTimeSessionWindow coupled with a late firing trigger
> and WindowFunction with AggregatingState (few minutes gap, 1 day allowed
> lateness, ~1TB state ) to produces results back into kafka (~200 millions
> event per day).
> 
> When trying to restart the job for maintenance (stopped the cluster for 1
> hour), the restore duration took several hours.
> 
> Task metrics showed that no new data was read from Kafka, but the job
> produced data out.
> 
> Also, sometimes, the job seems to freeze (no data in/out) while performing
> long checkpoints (~20 minutes)
> 
> When we try to cancel the job it takes several minutes before stopping and
> logs show the following :
> :
> 2018-10-09 11:53:53,348 WARN  org.apache.flink.runtime.taskmanager.Task   
>  
> - Task 'window-function -> (Sink: kafka-sink, Sink: kafka-late-sink) (1/60)'
> did not react to cancelling signal for 30 seconds, but is stuck in method:
> org.rocksdb.RocksDB.get(Native Method)
> org.rocksdb.RocksDB.get(RocksDB.java:810)
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> java.lang.Thread.run(Thread.java:745)
> 
> Any ideas on this ?
> 
> Regards,
> Amine
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error restoring from savepoint while there's no modification to the job

2018-10-10 Thread Stefan Richter
Hi,

adding to Dawids questions, it would also be very helpful to know which Flink 
version was used to create the savepoint, which Flink version was used in the 
restore attempt, if the savepoint was moved or modified. Outside of potential 
conflicts with those things, I would not expect anything like this.

Best,
Stefan

> On 10. Oct 2018, at 09:51, Dawid Wysakowicz  wrote:
> 
> Hi Averell,
> 
> Do you try to scale the job up, meaning do you increase the job
> parallelism? Have you increased the job max parallelism by chance? If so
> this is not supported. The max parallelism parameter is used to create
> key groups that can be further assigned to parallel operators. This
> parameter cannot be changed for a job that shall be restored.
> 
> If this is not the case, maybe Stefan(cc) have some ideas, what can go
> wrong.
> 
> Best,
> 
> Dawid
> 
> 
> On 10/10/18 09:23, Averell wrote:
>> Hi everyone,
>> 
>> I'm getting the following error when trying to restore from a savepoint.
>> Here below is the output from flink bin, and in the attachment is a TM log.
>> I didn't have any change in the app before and after savepoint. All Window
>> operators have been assigned unique ID string.
>> 
>> Could you please help give a look?
>> 
>> Thanks and best regards,
>> Averell
>> 
>> taskmanager.gz
>> 
>>   
>> 
>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
>>  at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>>  at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>>  at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>  at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
>>  at
>> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
>>  at
>> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>  at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>  at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>  at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>  at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>  at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at javax.security.auth.Subject.doAs(Subject.java:422)
>>  at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>>  at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>  at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>  at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>>  ... 22 more
>> Caused by: java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>>  at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
>> from any of the 1 provided restore options.
>>  at
>> 

Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-10-08 Thread Stefan Richter
Hi Pedro,

unfortunately the interesting parts are all removed from the log, we already 
know about the exception itself. In particular, what I would like to see is 
what checkpoints have been triggered and completed before the exception happens.

Best,
Stefan

> Am 08.10.2018 um 10:23 schrieb PedroMrChaves :
> 
> Hello,
> 
> Find attached the jobmanager.log. I've omitted the log lines from other
> runs, only left the job manager info and the run with the error. 
> 
> jobmanager.log
> 
>   
> 
> 
> 
> Thanks again for your help.
> 
> Regards,
> Pedro.
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Data loss when restoring from savepoint

2018-10-04 Thread Stefan Richter
Hi,

> Am 04.10.2018 um 16:08 schrieb Juho Autio :
> 
> > you could take a look at Bravo [1] to query your savepoints and to check if 
> > the state in the savepoint complete w.r.t your expectations
> 
> Thanks. I'm not 100% if this is the case, but to me it seemed like the missed 
> ids were being logged by the reducer soon after the job had started (after 
> restoring a savepoint). But on the other hand, after that I also made another 
> savepoint & restored that, so what I could check is: does that next savepoint 
> have the missed ids that were logged (a couple of minutes before the 
> savepoint was created, so there should've been more than enough time to add 
> them to the state before the savepoint was triggered) or not. Any way, if I 
> would be able to verify with Bravo that the ids are missing from the 
> savepoint (even though reduced logged that it saw them), would that help in 
> figuring out where they are lost? Is there some major difference compared to 
> just looking at the final output after window has been triggered?


I think that makes a difference. For example, you can investigate if there is a 
state loss or a problem with the windowing. In the savepoint you could see 
which keys exists and to which windows they are assigned. Also just to make 
sure there is no misunderstanding: only elements that are in the state at the 
start of a savepoint are expected to be part of the savepoint; all elements 
between start and completion of the savepoint are not expected to be part of 
the savepoint.

> 
> > I also doubt that the problem is about backpressure after restore, because 
> > the job will only continue running after the state restore is already 
> > completed.
> 
> Yes, I'm not suspecting that the state restoring would be the problem either. 
> My concern was about backpressure possibly messing with the updates of 
> reducing state? I would tend to suspect that updating the state consistently 
> is what fails, where heavy load / backpressure might be a factor.


How would you assume that backpressure would influence your updates? Updates to 
each local state still happen event-by-event, in a single reader/writing thread.

> 
> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> you could take a look at Bravo [1] to query your savepoints and to check if 
> the state in the savepoint complete w.r.t your expectations. I somewhat doubt 
> that there is a general problem with the state/savepoints because many users 
> are successfully running it on a large state and I am not aware of any data 
> loss problems, but nothing is impossible. What the savepoint does is also 
> straight forward: iterate a db snapshot and write all key/value pairs to 
> disk, so all data that was in the db at the time of the savepoint, should 
> show up. I also doubt that the problem is about backpressure after restore, 
> because the job will only continue running after the state restore is already 
> completed. Did you check if you are using exactly-one-semantics or 
> at-least-once semantics? Also did you check that the kafka consumer start 
> position is configured properly [2]? Are watermarks generated as expected 
> after restore?
> 
> One more unrelated high-level comment that I have: for a granularity of 24h 
> windows, I wonder if it would not make sense to use a batch job instead?
> 
> Best,
> Stefan
> 
> [1] https://github.com/king/bravo <https://github.com/king/bravo>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration>
> 
>> Am 04.10.2018 um 14:53 schrieb Juho Autio > <mailto:juho.au...@rovio.com>>:
>> 
>> Thanks for the suggestions!
>> 
>> > In general, it would be tremendously helpful to have a minimal working 
>> > example which allows to reproduce the problem.
>> 
>> Definitely. The problem with reproducing has been that this only seems to 
>> happen in the bigger production data volumes.
>> 
>> That's why I'm hoping to find a way to debug this with the production data. 
>> With that it seems to consistently cause some misses every time the job is 
>> killed/restored.
>> 
>> > check if it happens for shorter windows, like 1h etc
>> 
>> What would be the benefit of that compared to 24h window?
>> 
>> > simplify the job to not use a reduce window but simply a time window which 
>> > outputs the window events. Then counting the input and output events 
>> > should allow you to verify

Re: Data loss when restoring from savepoint

2018-10-04 Thread Stefan Richter
Hi,

you could take a look at Bravo [1] to query your savepoints and to check if the 
state in the savepoint complete w.r.t your expectations. I somewhat doubt that 
there is a general problem with the state/savepoints because many users are 
successfully running it on a large state and I am not aware of any data loss 
problems, but nothing is impossible. What the savepoint does is also straight 
forward: iterate a db snapshot and write all key/value pairs to disk, so all 
data that was in the db at the time of the savepoint, should show up. I also 
doubt that the problem is about backpressure after restore, because the job 
will only continue running after the state restore is already completed. Did 
you check if you are using exactly-one-semantics or at-least-once semantics? 
Also did you check that the kafka consumer start position is configured 
properly [2]? Are watermarks generated as expected after restore?

One more unrelated high-level comment that I have: for a granularity of 24h 
windows, I wonder if it would not make sense to use a batch job instead?

Best,
Stefan

[1] https://github.com/king/bravo
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
 


> Am 04.10.2018 um 14:53 schrieb Juho Autio :
> 
> Thanks for the suggestions!
> 
> > In general, it would be tremendously helpful to have a minimal working 
> > example which allows to reproduce the problem.
> 
> Definitely. The problem with reproducing has been that this only seems to 
> happen in the bigger production data volumes.
> 
> That's why I'm hoping to find a way to debug this with the production data. 
> With that it seems to consistently cause some misses every time the job is 
> killed/restored.
> 
> > check if it happens for shorter windows, like 1h etc
> 
> What would be the benefit of that compared to 24h window?
> 
> > simplify the job to not use a reduce window but simply a time window which 
> > outputs the window events. Then counting the input and output events should 
> > allow you to verify the results. If you are not seeing missing events, then 
> > it could have something to do with the reducing state used in the reduce 
> > function.
> 
> Hm, maybe, but not sure how useful that would be, because it wouldn't yet 
> prove that it's related to reducing, because not having a reduce function 
> could also mean smaller load on the job, which might alone be enough to make 
> the problem not manifest.
> 
> Is there a way to debug what goes into the reducing state (including what 
> gets removed or overwritten and what restored), if that makes sense..? Maybe 
> some suitable logging could be used to prove that the lost data is written to 
> the reducing state (or at least asked to be written), but not found any more 
> when the window closes and state is flushed?
> 
> On configuration once more, we're using RocksDB state backend with 
> asynchronous incremental checkpointing. The state is restored from savepoints 
> though, we haven't been using those checkpoints in these tests (although they 
> could be used in case of crashes – but we haven't had those now).
> 
> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann  > wrote:
> Hi Juho,
> 
> another idea to further narrow down the problem could be to simplify the job 
> to not use a reduce window but simply a time window which outputs the window 
> events. Then counting the input and output events should allow you to verify 
> the results. If you are not seeing missing events, then it could have 
> something to do with the reducing state used in the reduce function.
> 
> In general, it would be tremendously helpful to have a minimal working 
> example which allows to reproduce the problem.
> 
> Cheers,
> Till
> 
> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> can you try to reduce the job to minimal reproducible example and share the 
> job and input?
> 
> For example:
> - some simple records as input, e.g. tuples of primitive types saved as cvs
> - minimal deduplication job which processes them and misses records
> - check if it happens for shorter windows, like 1h etc
> - setup which you use for the job, ideally locally reproducible or cloud
> 
> Best,
> Andrey
> 
>> On 4 Oct 2018, at 11:13, Juho Autio > > wrote:
>> 
>> Sorry to insist, but we seem to be blocked for any serious usage of state in 
>> Flink if we can't rely on it to not miss data in case of restore.
>> 
>> Would anyone have suggestions for how to troubleshoot this? So far I have 
>> verified with DEBUG logs that our reduce function gets to process also the 
>> data that is missing from window output.
>> 
>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio > > wrote:
>> Hi 

Re: Rocksdb Metrics

2018-09-25 Thread Stefan Richter
Hi,

this feature is tracked here https://issues.apache.org/jira/browse/FLINK-10423 


Best,
Stefan

> Am 25.09.2018 um 17:51 schrieb Sayat Satybaldiyev :
> 
> Flink provides a rich number of metrics. However, I didn't find any metrics 
> for rocksdb state backend not in metrics doc nor in JMX Mbean. 
> 
> Is there are any metrics for the rocksdb backend that Flink exposes?



Re: ArrayIndexOutOfBoundsException

2018-09-25 Thread Stefan Richter
You only need to update the flink jars, the job requires no update. I think you 
also cannot start from this checkpoint/savepoint after the upgrade because it 
seems to be corrupted from the bug. You need to us an older point to restart.

Best,
Stefan

> Am 25.09.2018 um 16:53 schrieb Alexander Smirnov 
> :
> 
> Thanks Stefan.
> 
> is it only Flink runtime should be updated, or the job should be recompiled 
> too?
> Is there a workaround to start the job without upgrading Flink?
> 
> Alex
> 
> On Tue, Sep 25, 2018 at 5:48 PM Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> this problem looks like https://issues.apache.org/jira/browse/FLINK-8836 
> <https://issues.apache.org/jira/browse/FLINK-8836> which would also match to 
> your Flink version. I suggest to update to 1.4.3 or higher to avoid the issue 
> in the future.
> 
> Best,
> Stefan
> 
> 
>> Am 25.09.2018 um 16:37 schrieb Alexander Smirnov 
>> mailto:alexander.smirn...@gmail.com>>:
>> 
>> I'm getting an exception on job starting from a savepoint. Why that could 
>> happen?
>> 
>> Flink 1.4.2
>> 
>> 
>> java.lang.IllegalStateException: Could not initialize operator state backend.
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>> at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>> ... 6 more
> 



Re: ArrayIndexOutOfBoundsException

2018-09-25 Thread Stefan Richter
Hi,

this problem looks like https://issues.apache.org/jira/browse/FLINK-8836 
 which would also match to 
your Flink version. I suggest to update to 1.4.3 or higher to avoid the issue 
in the future.

Best,
Stefan

> Am 25.09.2018 um 16:37 schrieb Alexander Smirnov 
> :
> 
> I'm getting an exception on job starting from a savepoint. Why that could 
> happen?
> 
> Flink 1.4.2
> 
> 
> java.lang.IllegalStateException: Could not initialize operator state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 more



Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-09-25 Thread Stefan Richter
Hi,

I cannot spot anything bad or „wrong“ about your job configuration. Maybe you 
can try to save and send the logs if it happens again? Did you observe this 
only once, often, or is it something that is even reproduceable?

Best,
Stefan

> Am 24.09.2018 um 10:15 schrieb PedroMrChaves :
> 
> Hello Stefan, 
> 
> Thank you for the help.
> 
> I've actually lost those logs to due several cluster restarts that we did,
> which cause log rotation up (limit = 5 versions).
> Those log lines that i've posted were the only ones that showed signs of
> some problem. 
> 
> *The configuration of the job is as follows:*
> 
> / private static final int DEFAULT_MAX_PARALLELISM = 16;
>private static final int CHECKPOINTING_INTERVAL = 1000;
>private static final int MIN_PAUSE_BETWEEN_CHECKPOINTS = 1000;
>private static final int CHECKPOINT_TIMEOUT = 6;
>private static final int INTERVAL_BETWEEN_RESTARTS = 120; 
> (...)
> 
>  environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>environment.setMaxParallelism(DEFAULT_MAX_PARALLELISM);
>environment.enableCheckpointing(CHECKPOINTING_INTERVAL,
> CheckpointingMode.EXACTLY_ONCE);
> 
> environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(MIN_PAUSE_BETWEEN_CHECKPOINTS);
> 
> environment.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT);
>environment.setRestartStrategy(RestartStrategies.noRestart());
>environment.setParallelism(parameters.getInt(JOB_PARALLELISM));/
> *
> the kafka consumer/producer configuration is:*
> /
>properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>properties.put("max.request.size","1579193");
>properties.put("processing.guarantee","exactly_once");
>properties.put("isolation.level","read_committed");/
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-09-21 Thread Stefan Richter
Hi,

could you provide some logs for this problematic job because I would like to 
double check the reason why this violated precondition did actually happen?

Thanks,
Stefan

> Am 20.09.2018 um 17:24 schrieb Stefan Richter :
> 
> FYI, here a link to my PR: https://github.com/apache/flink/pull/6723
> 
>> Am 20.09.2018 um 14:52 schrieb Stefan Richter :
>> 
>> Hi,
>> 
>> I think the failing precondition is too strict because sometimes a 
>> checkpoint can overtake another checkpoint and in that case the commit is 
>> already subsumed. I will open a Jira and PR with a fix.
>> 
>> Best,
>> Stefan
>> 
>>> Am 19.09.2018 um 10:04 schrieb PedroMrChaves :
>>> 
>>> Hello,
>>> 
>>> I have a running Flink job that reads data form one Kafka topic, applies
>>> some transformations and writes data back into another Kafka topic. The job
>>> sometimes restarts due to the following error:
>>> 
>>> /java.lang.RuntimeException: Error while confirming checkpoint
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>  at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>  at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>>> transaction pending
>>>  at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>  at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>>>  at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>>>  ... 5 more
>>> 2018-09-18 22:00:10,716 INFO 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
>>> restart the job Alert_Correlation (3c60b8670c81a629716bb2e42334edea) because
>>> the restart strategy prevented it.
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>  at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>  at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>>> transaction pending
>>>  at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>  at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>>>  at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>>>  ... 5 more/
>>> 
>>> My state is very small for this particular job, just a few KBs.
>>> 
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screen_Shot_2018-09-19_at_09.png>
>>>  
>>> 
>>> 
>>> Flink Version: 1.4.2
>>> State Backend: hadoop 2.8
>>> 
>>> Regards,
>>> Pedro Chaves
>>> 
>>> 
>>> 
>>> -
>>> Best Regards,
>>> Pedro Chaves
>>> --
>>> Sent from: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> 
> 



Re: Flink can't initialize operator state backend when starting from checkpoint

2018-09-21 Thread Stefan Richter
Hi,

that is correct. If you are using custom serializers you should double check 
their correctness, maybe using our test base for type serializers. Another 
reason could be that you modified the job in a way that silently changed the 
schema somehow. Concurrent use of serializers across different threads can also 
cause problems like this and I think there was a bug in 1.4 around this topic. 
I suggest that you also update to a newer version, at least the latest bugfix 
release.

Best,
Stefan

> Am 21.09.2018 um 10:26 schrieb vino yang :
> 
> Hi Qingxiang,
> 
> Several days ago, Stefan described the causes of this anomaly in a problem 
> similar to this:
> Typically, these problems have been observed when something was wrong with a 
> serializer or a stateful serializer was used from multiple threads.
> 
> Thanks, vino.
> 
> Marvin777 mailto:xymaqingxiang...@gmail.com>> 
> 于2018年9月21日周五 下午3:20写道:
> Hi all,
> 
> When Flink(1.4.2) job starts, it could find checkpoint files at HDFS, but 
> exception occurs during deserializing:
> 
> 
> 
> Do you have any insight on this?
> 
> Thanks,
> Qingxiang Ma



Re: multiple flink applications on yarn are shown in one application.

2018-09-21 Thread Stefan Richter
Hi,

I see from your command that you are using the same jar file twice, so I want 
to double-check first how you even determine which job should be started? I am 
also adding Till (in CC) depending on your answer to my first question, he 
might have some additional thoughts.

Best,
Stefan

> Am 21.09.2018 um 04:15 schrieb weilongxing :
> 
> Thank you for your help.
> 
> It is per-job mode if I submit the job detached.
> 
> However, I submit these two jobs use the command below. 
> > /data/apps/opt/flink/bin/flink run -m yarn-cluster -d 
> > /data/apps/opt/fluxion/fluxion-flink.jar 
> > /data/apps/conf/fluxion//job_submit_rpt_company_performan.conf
> > /data/apps/opt/flink/bin/flink run -m yarn-cluster -d 
> > /data/apps/opt/fluxion/fluxion-flink.jar 
> > /data/apps/conf/fluxion//job_submit_rpt_company_user_s.conf
> 
> I browse the yarn application. As the picture shows I got 2 applications(0013 
> / 0012) but the job in both applications is the same. I can’t find the job 
> submitted secondly. The job in application_XXX_0013 should be 
> rpt_company_user_s. This will not happen in session mode. 
> 
> Best
> LX
> 
> 
> 
> 
> 
> 
>> 在 2018年9月20日,下午7:13,Stefan Richter > <mailto:s.rich...@data-artisans.com>> 写道:
>> 
>> Hi,
>> 
>> currently, Flink still has to use session mode under the hood if you submit 
>> the job in attached-mode. The reason is that the job could consists of 
>> multiple parts that require to run one after the other. This will be changed 
>> in the future and also should not happen if you
>> submit the job detached.
>> 
>> Best,
>> Stefan
>> 
>>> Am 20.09.2018 um 10:29 schrieb weilongxing >> <mailto:weilongx...@aicaigroup.com>>:
>>> 
>>> I am new to Flink. I am using Flink on yarn per-job. I submitted two 
>>> applications.
>>> 
>>> > /data/apps/opt/flink/bin/flink run -m yarn-cluster  
>>> > /data/apps/opt/fluxion/fluxion-flink.jar 
>>> > /data/apps/conf/fluxion//job_submit_rpt_company_user_s.conf
>>> 
>>> > /data/apps/opt/flink/bin/flink run -m yarn-cluster  
>>> > /data/apps/opt/fluxion/fluxion-flink.jar 
>>> > /data/apps/conf/fluxion//job_submit_rpt_company_performan_.conf
>>> 
>>> I can saw these two applications on yarn. I noticed that the application 
>>> name is “flink session cluster” rather than “flink per-job”. Is that right?
>>> 
>>> 
>>> However, I can see both flink jobs in each yarn application. Is that right?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> And Finally, I want to kill one job and killed one yarn application. 
>>> However one yarn application is killed and but both flink jobs restarted in 
>>> another yarn application. I want to kill one and remain another. In my 
>>> opinion, one job in an application and the job is killed when the yarn 
>>> application is killed.
>>> 
>>> 
>>> 
>>> 
>>> I think the problem is that these two application should be “flink per-job” 
>>> rather than “flink session cluster”. But I don’t know why it becomes “flink 
>>> session-cluster”.  Can anybody help? Thanks.
>>> 
>>> 
>>> 
>> 
> 



Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

2018-09-20 Thread Stefan Richter
Oh yes exactly, enable is right.

> Am 20.09.2018 um 17:48 schrieb Hequn Cheng :
> 
> Hi Stefan,
> 
> Do you mean enable object reuse?
> If you want to reduce latency between chained operators, you can also try to 
> disable object-reuse:
> 
> On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Sorry, forgot the link for reference [1], which is 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency>
> 
>> Am 20.09.2018 um 16:36 schrieb Stefan Richter > <mailto:s.rich...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> you provide not very much information for this question, e.g. what and how 
>> exactly your measure, if this is a local or distributed setting etc. I 
>> assume that it is distributed and that the cause for your observation is the 
>> buffer timeout, i.e. the maximum time that Flink waits until sending a 
>> buffer with just one element, which happens to be 100ms by default. You can 
>> decrease this value to some extend, at to cost of potential loss in 
>> throughput, but I think even values around 5-10ms are ok-ish. See [1] for 
>> more details. If you want to reduce latency between chained operators, you 
>> can also try to disable object-reuse:
>> 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().enableObjectReuse();
>> 
>> Best,
>> Stefan
>> 
>>> Am 20.09.2018 um 16:03 schrieb James Yu >> <mailto:cyu...@gmail.com>>:
>>> 
>>> The previous email seems unable to display embedded images, let me put on 
>>> the links.
>>> Hi,
>>> 
>>> My team and I try to measure total time spent on our flink job and found 
>>> out that Flink takes 40ms ~ 100ms to proceed from one operator to another. 
>>> I wonder how can we reduce this transition time.
>>> 
>>> Following DAG represents our job:
>>> https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV 
>>> <https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV> 
>>> 
>>> 
>>> and here is the screenshot of our log:
>>> https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi 
>>> <https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi> 
>>> 
>>> at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
>>> at 19:37:04.605, the job is entering "Co-Flat Map"
>>> at 19:37:04.605, the job is leaving "Co-Flat Map"
>>> at 19:37:04.705, the job is entering "Co-Flat Map -> "
>>> at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."
>>> 
>>> both "Co-Flat Map" finishes merely instantly, while most of the execution 
>>> time is spent on the transition. Any idea?
>>> 
>>> 
>>> This is a UTF-8 formatted mail
>>> ---
>>> James C.-C.Yu
>>> +886988713275
>>> +8615618429976
>>> 
>> 
> 



Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-09-20 Thread Stefan Richter
FYI, here a link to my PR: https://github.com/apache/flink/pull/6723

> Am 20.09.2018 um 14:52 schrieb Stefan Richter :
> 
> Hi,
> 
> I think the failing precondition is too strict because sometimes a checkpoint 
> can overtake another checkpoint and in that case the commit is already 
> subsumed. I will open a Jira and PR with a fix.
> 
> Best,
> Stefan
> 
>> Am 19.09.2018 um 10:04 schrieb PedroMrChaves :
>> 
>> Hello,
>> 
>> I have a running Flink job that reads data form one Kafka topic, applies
>> some transformations and writes data back into another Kafka topic. The job
>> sometimes restarts due to the following error:
>> 
>> /java.lang.RuntimeException: Error while confirming checkpoint
>>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>   at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>> transaction pending
>>   at
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>   at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>>   at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>>   ... 5 more
>> 2018-09-18 22:00:10,716 INFO 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
>> restart the job Alert_Correlation (3c60b8670c81a629716bb2e42334edea) because
>> the restart strategy prevented it.
>> java.lang.RuntimeException: Error while confirming checkpoint
>>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>   at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>> transaction pending
>>   at
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>   at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>>   at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>>   ... 5 more/
>> 
>> My state is very small for this particular job, just a few KBs.
>> 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screen_Shot_2018-09-19_at_09.png>
>>  
>> 
>> 
>> Flink Version: 1.4.2
>> State Backend: hadoop 2.8
>> 
>> Regards,
>> Pedro Chaves
>> 
>> 
>> 
>> -
>> Best Regards,
>> Pedro Chaves
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



Re: Unit/Integration test for stateful source

2018-09-20 Thread Stefan Richter
Hi,

maybe you can use AbstractStreamOperatorTestHarness to test your source, 
including the snapshotting. You can take a look at the tests of some other 
source, e.g. StatefulSequenceSourceTest#testCheckpointRestore.

Best,
Stefan

> Am 20.09.2018 um 15:29 schrieb Darshan Singh :
> 
> Hi,
> 
> I am writing a stateful source very similar to KafkaBaseConsumer but not as 
> generic.  I was looking on how we can use unit test cases and integration 
> tests on this. I looked at the kafka-connector-based unit test cases. It 
> seems that there is too much external things at play here like lots of mock 
> classes created and then even Abstract fetcher was created. 
> 
> So I sort of dropped the idea where I can check if snapshot is basically 
> calling offset clear as well as store the offsets. 
> 
> But doing the integration tests is hard as well especially as I want to see 
> if it picks up offsets properly. I can run it manually and see it but I was 
> hoping if we can get some directions on integration tests for stateful source.
> 
> Thanks



Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

2018-09-20 Thread Stefan Richter
Sorry, forgot the link for reference [1], which is 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency

> Am 20.09.2018 um 16:36 schrieb Stefan Richter :
> 
> Hi,
> 
> you provide not very much information for this question, e.g. what and how 
> exactly your measure, if this is a local or distributed setting etc. I assume 
> that it is distributed and that the cause for your observation is the buffer 
> timeout, i.e. the maximum time that Flink waits until sending a buffer with 
> just one element, which happens to be 100ms by default. You can decrease this 
> value to some extend, at to cost of potential loss in throughput, but I think 
> even values around 5-10ms are ok-ish. See [1] for more details. If you want 
> to reduce latency between chained operators, you can also try to disable 
> object-reuse:
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> 
> Best,
> Stefan
> 
>> Am 20.09.2018 um 16:03 schrieb James Yu > <mailto:cyu...@gmail.com>>:
>> 
>> The previous email seems unable to display embedded images, let me put on 
>> the links.
>> Hi,
>> 
>> My team and I try to measure total time spent on our flink job and found out 
>> that Flink takes 40ms ~ 100ms to proceed from one operator to another. I 
>> wonder how can we reduce this transition time.
>> 
>> Following DAG represents our job:
>> https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV 
>> <https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV> 
>> 
>> 
>> and here is the screenshot of our log:
>> https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi 
>> <https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi> 
>> 
>> at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
>> at 19:37:04.605, the job is entering "Co-Flat Map"
>> at 19:37:04.605, the job is leaving "Co-Flat Map"
>> at 19:37:04.705, the job is entering "Co-Flat Map -> "
>> at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."
>> 
>> both "Co-Flat Map" finishes merely instantly, while most of the execution 
>> time is spent on the transition. Any idea?
>> 
>> 
>> This is a UTF-8 formatted mail
>> ---
>> James C.-C.Yu
>> +886988713275
>> +8615618429976
>> 
> 



Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

2018-09-20 Thread Stefan Richter
Hi,

you provide not very much information for this question, e.g. what and how 
exactly your measure, if this is a local or distributed setting etc. I assume 
that it is distributed and that the cause for your observation is the buffer 
timeout, i.e. the maximum time that Flink waits until sending a buffer with 
just one element, which happens to be 100ms by default. You can decrease this 
value to some extend, at to cost of potential loss in throughput, but I think 
even values around 5-10ms are ok-ish. See [1] for more details. If you want to 
reduce latency between chained operators, you can also try to disable 
object-reuse:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

> Am 20.09.2018 um 16:03 schrieb James Yu :
> 
> The previous email seems unable to display embedded images, let me put on the 
> links.
> Hi,
> 
> My team and I try to measure total time spent on our flink job and found out 
> that Flink takes 40ms ~ 100ms to proceed from one operator to another. I 
> wonder how can we reduce this transition time.
> 
> Following DAG represents our job:
> https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV 
>  
> 
> 
> and here is the screenshot of our log:
> https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi 
>  
> 
> at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
> at 19:37:04.605, the job is entering "Co-Flat Map"
> at 19:37:04.605, the job is leaving "Co-Flat Map"
> at 19:37:04.705, the job is entering "Co-Flat Map -> "
> at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."
> 
> both "Co-Flat Map" finishes merely instantly, while most of the execution 
> time is spent on the transition. Any idea?
> 
> 
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275
> +8615618429976
> 



Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-09-20 Thread Stefan Richter
Hi,

I think the failing precondition is too strict because sometimes a checkpoint 
can overtake another checkpoint and in that case the commit is already 
subsumed. I will open a Jira and PR with a fix.

Best,
Stefan

> Am 19.09.2018 um 10:04 schrieb PedroMrChaves :
> 
> Hello,
> 
> I have a running Flink job that reads data form one Kafka topic, applies
> some transformations and writes data back into another Kafka topic. The job
> sometimes restarts due to the following error:
> 
> /java.lang.RuntimeException: Error while confirming checkpoint
>at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
> transaction pending
>at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>... 5 more
> 2018-09-18 22:00:10,716 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
> restart the job Alert_Correlation (3c60b8670c81a629716bb2e42334edea) because
> the restart strategy prevented it.
> java.lang.RuntimeException: Error while confirming checkpoint
>at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
> transaction pending
>at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>... 5 more/
> 
> My state is very small for this particular job, just a few KBs.
> 
> 
>  
> 
> 
> Flink Version: 1.4.2
> State Backend: hadoop 2.8
> 
> Regards,
> Pedro Chaves
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-20 Thread Stefan Richter
A debug log for state backend and checkpoint coordinator could also help. 

> Am 20.09.2018 um 14:19 schrieb Stefan Richter :
> 
> Hi,
> 
> if some tasks take like 50 minutes, could you wait until such a checkpoint is 
> in progress and (let’s say after 10 minutes) log into the node and create a 
> (or multiple over time) thread-dump(s) for the JVM that runs the slow 
> checkpointing task. This could help to figure out where it is stuck or 
> waiting.
> 
> Best,
> Stefan
> 
>> Am 19.09.2018 um 22:30 schrieb Julio Biason > <mailto:julio.bia...@azion.com>>:
>> 
>> Hey guys,
>> 
>> So, switching to Ceph/S3 didn't shine any new lights on the issue. Although 
>> the times are a bit higher, just a few slots are taking a magnitude longer 
>> to save. So I changed the logs for DEBUG.
>> 
>> The problem is: I'm not seeing anything that seems relevant; only pings from 
>> ZooKeeper, heartbeats and the S3 disconnecting from being idle.
>> 
>> Is there anything else that I should change to DEBUG? Akka? Kafka? Haoop? 
>> ZooKeeper? (Those are, by the default config, bumped to INFO)
>> 
>> All of those?
>> 
>> On Tue, Sep 18, 2018 at 12:34 PM, Julio Biason > <mailto:julio.bia...@azion.com>> wrote:
>> Hey TIll (and others),
>> 
>> We don't have debug logs yet, but we decided to remove a related component: 
>> HDFS.
>> 
>> We are moving the storage to our Ceph install (using S3), which is running 
>> for longer than our HDFS install and we know, for sure, it runs without any 
>> problems (specially 'cause we have more people that understand Ceph than 
>> people that know HDFS at this point).
>> 
>> If, for some reason, the problem persists, we know it's not the underlying 
>> storage and may be something with our pipeline itself. I'll enable debug 
>> logs, then.
>> 
>> On Tue, Sep 18, 2018 at 4:20 AM, Till Rohrmann > <mailto:trohrm...@apache.org>> wrote:
>> This behavior seems very odd Julio. Could you indeed share the debug logs of 
>> all Flink processes in order to see why things are taking so long?
>> 
>> The checkpoint size of task #8 is twice as big as the second biggest 
>> checkpoint. But this should not cause an increase in checkpoint time of a 
>> factor of 8.
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Sep 17, 2018 at 5:25 AM Renjie Liu > <mailto:liurenjie2...@gmail.com>> wrote:
>> Hi, Julio:
>> This happens frequently? What state backend do you use? The async checkpoint 
>> duration and sync checkpoint duration seems normal compared to others, it 
>> seems that most of the time are spent acking the checkpoint.
>> 
>> On Sun, Sep 16, 2018 at 9:24 AM vino yang > <mailto:yanghua1...@gmail.com>> wrote:
>> Hi Julio,
>> 
>> Yes, it seems that fifty-five minutes is really long. 
>> However, it is linear with the time and size of the previous task adjacent 
>> to it in the diagram. 
>> I think your real application is concerned about why Flink accesses HDFS so 
>> slowly. 
>> You can call the DEBUG log to see if you can find any clues, or post the log 
>> to the mailing list to help others analyze the problem for you.
>> 
>> Thanks, vino.
>> 
>> Julio Biason mailto:julio.bia...@azion.com>> 
>> 于2018年9月15日周六 上午7:03写道:
>> (Just an addendum: Although it's not a huge problem -- we can always 
>> increase the checkpoint timeout time -- this anomalous situation makes me 
>> think there is something wrong in our pipeline or in our cluster, and that 
>> is what is making the checkpoint creation go crazy.)
>> 
>> On Fri, Sep 14, 2018 at 8:00 PM, Julio Biason > <mailto:julio.bia...@azion.com>> wrote:
>> Hey guys,
>> 
>> On our pipeline, we have a single slot that it's taking longer to create the 
>> checkpoint compared to other slots and we are wondering what could be 
>> causing it.
>> 
>> The operator in question is the window metric -- the only element in the 
>> pipeline that actually uses the state. While the other slots take 7 mins to 
>> create the checkpoint, this one -- and only this one -- takes 55mins.
>> 
>> Is there something I should look at to understand what's going on?
>> 
>> (We are storing all checkpoints in HDFS, in case that helps.)
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51  
>> 99907 0554
>> 
>> 
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51  
>> 99907 0554
>> -- 
>> Liu, Renjie
>> Software Engineer, MVAD
>> 
>> 
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51  
>> 99907 0554
>> 
>> 
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51  
>> 99907 0554
> 



Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-20 Thread Stefan Richter
Hi,

if some tasks take like 50 minutes, could you wait until such a checkpoint is 
in progress and (let’s say after 10 minutes) log into the node and create a (or 
multiple over time) thread-dump(s) for the JVM that runs the slow checkpointing 
task. This could help to figure out where it is stuck or waiting.

Best,
Stefan

> Am 19.09.2018 um 22:30 schrieb Julio Biason :
> 
> Hey guys,
> 
> So, switching to Ceph/S3 didn't shine any new lights on the issue. Although 
> the times are a bit higher, just a few slots are taking a magnitude longer to 
> save. So I changed the logs for DEBUG.
> 
> The problem is: I'm not seeing anything that seems relevant; only pings from 
> ZooKeeper, heartbeats and the S3 disconnecting from being idle.
> 
> Is there anything else that I should change to DEBUG? Akka? Kafka? Haoop? 
> ZooKeeper? (Those are, by the default config, bumped to INFO)
> 
> All of those?
> 
> On Tue, Sep 18, 2018 at 12:34 PM, Julio Biason  > wrote:
> Hey TIll (and others),
> 
> We don't have debug logs yet, but we decided to remove a related component: 
> HDFS.
> 
> We are moving the storage to our Ceph install (using S3), which is running 
> for longer than our HDFS install and we know, for sure, it runs without any 
> problems (specially 'cause we have more people that understand Ceph than 
> people that know HDFS at this point).
> 
> If, for some reason, the problem persists, we know it's not the underlying 
> storage and may be something with our pipeline itself. I'll enable debug 
> logs, then.
> 
> On Tue, Sep 18, 2018 at 4:20 AM, Till Rohrmann  > wrote:
> This behavior seems very odd Julio. Could you indeed share the debug logs of 
> all Flink processes in order to see why things are taking so long?
> 
> The checkpoint size of task #8 is twice as big as the second biggest 
> checkpoint. But this should not cause an increase in checkpoint time of a 
> factor of 8.
> 
> Cheers,
> Till
> 
> On Mon, Sep 17, 2018 at 5:25 AM Renjie Liu  > wrote:
> Hi, Julio:
> This happens frequently? What state backend do you use? The async checkpoint 
> duration and sync checkpoint duration seems normal compared to others, it 
> seems that most of the time are spent acking the checkpoint.
> 
> On Sun, Sep 16, 2018 at 9:24 AM vino yang  > wrote:
> Hi Julio,
> 
> Yes, it seems that fifty-five minutes is really long. 
> However, it is linear with the time and size of the previous task adjacent to 
> it in the diagram. 
> I think your real application is concerned about why Flink accesses HDFS so 
> slowly. 
> You can call the DEBUG log to see if you can find any clues, or post the log 
> to the mailing list to help others analyze the problem for you.
> 
> Thanks, vino.
> 
> Julio Biason mailto:julio.bia...@azion.com>> 
> 于2018年9月15日周六 上午7:03写道:
> (Just an addendum: Although it's not a huge problem -- we can always increase 
> the checkpoint timeout time -- this anomalous situation makes me think there 
> is something wrong in our pipeline or in our cluster, and that is what is 
> making the checkpoint creation go crazy.)
> 
> On Fri, Sep 14, 2018 at 8:00 PM, Julio Biason  > wrote:
> Hey guys,
> 
> On our pipeline, we have a single slot that it's taking longer to create the 
> checkpoint compared to other slots and we are wondering what could be causing 
> it.
> 
> The operator in question is the window metric -- the only element in the 
> pipeline that actually uses the state. While the other slots take 7 mins to 
> create the checkpoint, this one -- and only this one -- takes 55mins.
> 
> Is there something I should look at to understand what's going on?
> 
> (We are storing all checkpoints in HDFS, in case that helps.)
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554
> 
> 
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554
> -- 
> Liu, Renjie
> Software Engineer, MVAD
> 
> 
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554
> 
> 
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554



Re: How to get the location of keytab when using flink on yarn

2018-09-20 Thread Stefan Richter
Hi,

maybe Aljoscha or Eron (both in CC) can help you with this problem, I think 
they might know best about the Kerberos security.

Best,
Stefan

> Am 20.09.2018 um 11:20 schrieb 杨光 :
> 
> Hi,
> i am using  the " per-job YARN session " mode deploy flink job on yarn and my 
> flink
> version is 1.4.1.
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-kerberos.html
>  
> 
> 
> My use case is the yarn cluster where the flink job running is not enabled 
> the kerberos mode in core-site.xml ,but i am trying to connecting an HBase 
> cluster which is enabled kerberos. So i have to use the  
> loginUserFromKeytab() method to init kerberos infomation before  init the 
> HBase connection. 
> 
>  UserGroupInformation.loginUserFromKeytab(user, keytabLocation);
> 
> So how can i get the keytab location  in my user code  ,  or is there any 
> better ideas to solve the HBase kerberos problem on a yarn not using kerberos 
> mode.
>   
>  THANKS



Re: Writer has already been opened on BucketingSink to S3

2018-09-20 Thread Stefan Richter
Hi,

thanks for putting some effort into debugging the problem. Could you open a 
Jira with the problem and your analysis so that we can discuss how to proceed 
with it?

Best,
Stefan

> Am 18.09.2018 um 23:16 schrieb Chengzhi Zhao :
> 
> After checking the code, I think the issue might be related to 
> AvroKeyValueSinkWriter.java and led to the writer has not been closed 
> completely. I also noticed this change and affect 1.5+ 
> https://github.com/apache/flink/commit/915213c7afaf3f9d04c240f43d88710280d844e3#diff-86c35c993fdb0c482544951b376e5ea6
>  
> 
> 
> @Override
> public void close() throws IOException {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> } else {
> // need to make sure we close this if we never created the Key/Value 
> Writer.
> super.close();
> }
> }
> I created my own AvroKeyValueSinkWriter class and implement the code similar 
> as v1.4, it seems running fine now. 
> @Override
> public void close() throws IOException {
> try {
> super.close();
> } finally {
> if (keyValueWriter != null) {
> keyValueWriter.close();
> }
> }
> }
> 
> I am curious if anyone had the similar issue, Appreciate anyone has insights 
> on it. Thanks! 
> 
> Best,
> Chengzhi
> 
> On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao  > wrote:
> Hi Flink Community,
> 
> I am using flink 1.6.0 and I am using BucketingSink to S3.
> 
> After the application running for a while ~ 20 mins, I got an exception: 
> java.lang.IllegalStateException: Writer has already been opened
> 
> I have attached the job manager log and the sink code is here:
> val avroOutputPath = output
> var properties = new util.HashMap[String, String]()
> val stringSchema = Schema.create(Type.STRING)
> val keySchema = stringSchema.toString
> val valueSchema = schema.toString
> 
> val compress = true
> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
> properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
> properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
> DataFileConstants.SNAPPY_CODEC)
> 
> val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
> sink.setBucketer(new DateTimeBucketer("-MM-dd/HH/"))
> sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
> sink.setPendingSuffix(".avro")
> sink.setBatchRolloverInterval(20 * 60 * 1000)
> 
> def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
>   val writer = new AvroKeyValueSinkWriter[String, R](properties)
>   sink.setWriter(writer)
>   sink
> }
> Any suggestions on why this could happen and how to debug it? Thanks for your 
> help in advance! 
> 
> Regards,
> Chengzhi
> 
> 



Re: S3 connector Hadoop class mismatch

2018-09-20 Thread Stefan Richter
Hi,

I could not find any open Jira for the problem you describe. Could you please 
open one?

Best,
Stefan

> Am 19.09.2018 um 09:54 schrieb Paul Lam :
> 
> Hi, 
> 
> I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a 
> classloader problem. It seems that there are conflicts in 
> flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and 
> maybe related to class loading orders. 
> 
> Did anyone meet this problem? Thanks a lot!
> 
> The stack traces are as below:
> 
> java.io.IOException: java.lang.RuntimeException: class 
> org.apache.hadoop.security.LdapGroupsMapping not 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:111)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class 
> org.apache.hadoop.security.LdapGroupsMapping not 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.(Groups.java:108)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.(Groups.java:102)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
>   ... 13 more
> Caused by: java.lang.RuntimeException: class 
> org.apache.hadoop.security.LdapGroupsMapping not 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
>   ... 23 more
> 
> 
> Best,
> Paul Lam
> 
> 
> 



Re: multiple flink applications on yarn are shown in one application.

2018-09-20 Thread Stefan Richter
Hi,

currently, Flink still has to use session mode under the hood if you submit the 
job in attached-mode. The reason is that the job could consists of multiple 
parts that require to run one after the other. This will be changed in the 
future and also should not happen if you
submit the job detached.

Best,
Stefan

> Am 20.09.2018 um 10:29 schrieb weilongxing :
> 
> I am new to Flink. I am using Flink on yarn per-job. I submitted two 
> applications.
> 
> > /data/apps/opt/flink/bin/flink run -m yarn-cluster  
> > /data/apps/opt/fluxion/fluxion-flink.jar 
> > /data/apps/conf/fluxion//job_submit_rpt_company_user_s.conf
> 
> > /data/apps/opt/flink/bin/flink run -m yarn-cluster  
> > /data/apps/opt/fluxion/fluxion-flink.jar 
> > /data/apps/conf/fluxion//job_submit_rpt_company_performan_.conf
> 
> I can saw these two applications on yarn. I noticed that the application name 
> is “flink session cluster” rather than “flink per-job”. Is that right?
> 
> 
> However, I can see both flink jobs in each yarn application. Is that right?
> 
> 
> 
> 
> 
> 
> And Finally, I want to kill one job and killed one yarn application. However 
> one yarn application is killed and but both flink jobs restarted in another 
> yarn application. I want to kill one and remain another. In my opinion, one 
> job in an application and the job is killed when the yarn application is 
> killed.
> 
> 
> 
> 
> I think the problem is that these two application should be “flink per-job” 
> rather than “flink session cluster”. But I don’t know why it becomes “flink 
> session-cluster”.  Can anybody help? Thanks.
> 
> 
> 



Re: Checkpointing not working

2018-09-20 Thread Stefan Richter
Hi,

in the absence of any logs, my guess would be that your checkpoints are just 
not able to complete within 10 seconds, the state might be to large or the 
network and fs to slow. Are you using full or incremental checkpoints? For your 
relative small interval, I suggest that you try using incremental checkpoints. 
Still thinking that your timeout and interval is pretty ambitious.

Best,
Stefan

> Am 20.09.2018 um 10:17 schrieb vino yang :
> 
> Hi Yubraj,
> 
> Can you set your log print level to DEBUG and share it with us or share a 
> screenshot of your Flink web UI checkpoint information?
> 
> Thanks, vino.
> 
> Jörn Franke mailto:jornfra...@gmail.com>> 
> 于2018年9月19日周三 下午2:37写道:
> What do the logfiles say?
> 
> How does the source code looks like?
> 
> Is it really needed to do checkpointing every 30 seconds?
> 
> On 19. Sep 2018, at 08:25, yuvraj singh <19yuvrajsing...@gmail.com 
> > wrote:
> 
>> Hi , 
>> 
>> I am doing checkpointing using s3 and rocksdb , 
>> i am doing checkpointing per 30 seconds and time out is 10 seconds .
>> 
>> most of the time its failing by saying Failure Time: 11:53:17Cause: 
>> Checkpoint expired before completing .
>> I  increases the timeout  as well still it not working for me .
>> 
>> please suggest .
>> 
>> Thanks 
>> Yubraj Singh 



Re: Why FlinkKafkaConsumer08 does not support exactly once or at least once semantic?

2018-09-20 Thread Stefan Richter
Hi,

I think this part of the documentation is talking about KafkaProducer, and you 
are reading in the source code of KafkaConsumer.

Best,
Stefan

> Am 20.09.2018 um 10:48 schrieb 徐涛 :
> 
> Hi All,
>   In document of Flink 1.6, it says that "Before 0.9 Kafka did not 
> provide any mechanisms to guarantee at-least-once or exactly-once semantics”
>   I read the source code of FlinkKafkaConsumer08, and the comment says:
> “Please note that Flink snapshots the offsets internally as part of its 
> distributed checkpoints. The offsets
>  * committed to Kafka / ZooKeeper are only to bring the outside view of 
> progress in sync with Flink's view
>  * of the progress. That way, monitoring and other jobs can get a view of how 
> far the Flink Kafka consumer
>  * has consumed a topic"
>   Obviously, the kafka partition offsets are checkpointed periodically. 
> And when some error happens, the data are read from kafka, continued from the 
> checkpointed offset. Then source and other operator states restart from the 
> same checkpoint. Then why does the document say “Before 0.9 Kafka did not 
> provide any mechanisms to guarantee at-least-once or exactly-once semantics” ?
> 
>   Thanks a lot.
> 
> 
> Best
> Henry



Re: Flink 1.5.2 process keeps reference to deleted blob files.

2018-09-20 Thread Stefan Richter
Hi,

I think it would be very helpful if you could identify what data is behind. For 
example, I could imagine that it can be a jar file that was used by the TM and 
some classes are still in use or loaded by a classloader that was not yet GCed. 
Depending on that, there could be a problem in the user-code, in Flink’s 
classloading, or with the blob storage. I would suggest to open a Jira issue 
and to supply as much information about the dangling file as possible (e.g. 
maybe concluding from the log what blobkey was mapped to what file, from the 
size, or by peeking at the content.

Best,
Stefan

> Am 19.09.2018 um 16:04 schrieb Piotr Szczepanek :
> 
> Hello,
> 
> we are using YarnClusterClient for job submission. After successful/failed
> job execution it looks like blob file for that job is deleted, but there is
> still some handle from Flink process to that file. As a result the file is
> not removed from machine and we faced no space felt on device error.
> Restarting Flink cluster moved situation back to normal, but we are
> submitting quite huge number of jobs and often cluster restarts is not a
> solution.
> 
> Results of lsof are:
> During job execution:
> lsof /flinkDir | grep job_dbafb671b0d60ed8a8ec2651fe59303b
> java11883  yarn  memREG  253,2 112384928 109973177
> /flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578
> java11883  yarn 1837r   REG  253,2 112384928 109973177
> /flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578
> 
> After job execution:
> lsof /flinkDir | grep job_dbafb671b0d60ed8a8ec2651fe59303b
> java11883  yarn  DELREG  253,2   109973177
> /flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578
> java11883  yarn 1837r   REG  253,2 112384928 109973177
> /flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578
> *(deleted)*
> 
> So the blob file is marked as deleted but it's still present as there is
> still some handle from Flink container process. 
> Can you please advice, how can we avoid that situation, or if is it cause by
> some bug in Flink?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How to use checkpoint in flink1.5.3

2018-09-20 Thread Stefan Richter
Hi,

did you introduce some custom modifications to the code? Your stack trace does 
not match the lines in the code of release-1.5.3, e.g. line 230 is not in 
method internalTimeServiceManager(…) which makes it hard to draw any 
conclusions.

Best,
Stefan

> Am 19.09.2018 um 14:03 schrieb spoon_lz <971066...@qq.com>:
> 
> I used flink1.3.2 before, and recently upgraded to flink1.5.3. I found
> that the new version has adjusted checkpoint, and there was a problem after
> modifying the code
>My code like :
> 
>   * RocksDBStateBackend rocksDBStateBackend = new
> RocksDBStateBackend("hdfs://hdfs_1/demo/demo-fs-checkpoints/cpk/1_3",
> true);*
> 
>My jar is normal when it is submitted to the cluster run, then stops,
> and then resumes like :
> 
> */home/flink-1.5.3/bin/flink run -d -m yarn-cluster --yarnname test01 -ytm
> 4096 -yjm 1024 -s
> hdfs://hdfs_1/demo/demo-fs-checkpoints/cpk/1_3/ff31023d23e568290d5596a34aa3cad6/chk-3
>  
> /home/flink/jar/test01.jar*
> 
>this job can be submit ,but runs for error:
> 
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:197)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:231)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:715)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:230)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:160)
>   ... 5 more
> 2018-09-19 19:44:26,518 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job test01 
> (f1960bf42cdb35b7f3ee958e06d9e3cf) switched from state RUNNING to FAILING.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:197)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:231)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:715)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:230)
>   at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:160)
> 
>  The source code has not found the cause of the error, and the official
> document does not have too many instructions for the use of checkpoint. 
>  Can anyone help me
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-18 Thread Stefan Richter
Adding to my previous email, I start to doubt a little bit about the 
explanation because also alignment times are very low. Could it be possible 
that it takes very long for the checkpoint operation (for whatever reason) to 
get the checkpointing lock?

> Am 18.09.2018 um 11:58 schrieb Stefan Richter :
> 
> Hi,
> 
> from your screenshot, it looks like everything is running fine as soon as the 
> snapshots are actually running, sync and async part times are normal. So I 
> think the explanation is the time that the checkpoint barrier needs to reach 
> this particular operator. It seems like there is a large queue of events in 
> the buffers of that operator, in front of the barrier, and/or the operator is 
> very slow at processing events. Given the beakdown at hand the time must be 
> spend between the triggering of the checkpoint and the point where it reaches 
> the operator that lacks behind.
> 
> Best,
> Stefan
> 
>> Am 18.09.2018 um 09:39 schrieb Renjie Liu > <mailto:liurenjie2...@gmail.com>>:
>> 
>> I think what's weird is that  non of the three stages: alignment, sync cp, 
>> async cp takes much time.
>> 
>> On Tue, Sep 18, 2018 at 3:20 PM Till Rohrmann > <mailto:trohrm...@apache.org>> wrote:
>> This behavior seems very odd Julio. Could you indeed share the debug logs of 
>> all Flink processes in order to see why things are taking so long?
>> 
>> The checkpoint size of task #8 is twice as big as the second biggest 
>> checkpoint. But this should not cause an increase in checkpoint time of a 
>> factor of 8.
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Sep 17, 2018 at 5:25 AM Renjie Liu > <mailto:liurenjie2...@gmail.com>> wrote:
>> Hi, Julio:
>> This happens frequently? What state backend do you use? The async checkpoint 
>> duration and sync checkpoint duration seems normal compared to others, it 
>> seems that most of the time are spent acking the checkpoint.
>> 
>> On Sun, Sep 16, 2018 at 9:24 AM vino yang > <mailto:yanghua1...@gmail.com>> wrote:
>> Hi Julio,
>> 
>> Yes, it seems that fifty-five minutes is really long. 
>> However, it is linear with the time and size of the previous task adjacent 
>> to it in the diagram. 
>> I think your real application is concerned about why Flink accesses HDFS so 
>> slowly. 
>> You can call the DEBUG log to see if you can find any clues, or post the log 
>> to the mailing list to help others analyze the problem for you.
>> 
>> Thanks, vino.
>> 
>> Julio Biason mailto:julio.bia...@azion.com>> 
>> 于2018年9月15日周六 上午7:03写道:
>> (Just an addendum: Although it's not a huge problem -- we can always 
>> increase the checkpoint timeout time -- this anomalous situation makes me 
>> think there is something wrong in our pipeline or in our cluster, and that 
>> is what is making the checkpoint creation go crazy.)
>> 
>> On Fri, Sep 14, 2018 at 8:00 PM, Julio Biason > <mailto:julio.bia...@azion.com>> wrote:
>> Hey guys,
>> 
>> On our pipeline, we have a single slot that it's taking longer to create the 
>> checkpoint compared to other slots and we are wondering what could be 
>> causing it.
>> 
>> The operator in question is the window metric -- the only element in the 
>> pipeline that actually uses the state. While the other slots take 7 mins to 
>> create the checkpoint, this one -- and only this one -- takes 55mins.
>> 
>> Is there something I should look at to understand what's going on?
>> 
>> (We are storing all checkpoints in HDFS, in case that helps.)
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51  
>> 99907 0554
>> 
>> 
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51  
>> 99907 0554
>> -- 
>> Liu, Renjie
>> Software Engineer, MVAD
>> -- 
>> Liu, Renjie
>> Software Engineer, MVAD
> 



Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-18 Thread Stefan Richter
Hi,

from your screenshot, it looks like everything is running fine as soon as the 
snapshots are actually running, sync and async part times are normal. So I 
think the explanation is the time that the checkpoint barrier needs to reach 
this particular operator. It seems like there is a large queue of events in the 
buffers of that operator, in front of the barrier, and/or the operator is very 
slow at processing events. Given the beakdown at hand the time must be spend 
between the triggering of the checkpoint and the point where it reaches the 
operator that lacks behind.

Best,
Stefan

> Am 18.09.2018 um 09:39 schrieb Renjie Liu :
> 
> I think what's weird is that  non of the three stages: alignment, sync cp, 
> async cp takes much time.
> 
> On Tue, Sep 18, 2018 at 3:20 PM Till Rohrmann  > wrote:
> This behavior seems very odd Julio. Could you indeed share the debug logs of 
> all Flink processes in order to see why things are taking so long?
> 
> The checkpoint size of task #8 is twice as big as the second biggest 
> checkpoint. But this should not cause an increase in checkpoint time of a 
> factor of 8.
> 
> Cheers,
> Till
> 
> On Mon, Sep 17, 2018 at 5:25 AM Renjie Liu  > wrote:
> Hi, Julio:
> This happens frequently? What state backend do you use? The async checkpoint 
> duration and sync checkpoint duration seems normal compared to others, it 
> seems that most of the time are spent acking the checkpoint.
> 
> On Sun, Sep 16, 2018 at 9:24 AM vino yang  > wrote:
> Hi Julio,
> 
> Yes, it seems that fifty-five minutes is really long. 
> However, it is linear with the time and size of the previous task adjacent to 
> it in the diagram. 
> I think your real application is concerned about why Flink accesses HDFS so 
> slowly. 
> You can call the DEBUG log to see if you can find any clues, or post the log 
> to the mailing list to help others analyze the problem for you.
> 
> Thanks, vino.
> 
> Julio Biason mailto:julio.bia...@azion.com>> 
> 于2018年9月15日周六 上午7:03写道:
> (Just an addendum: Although it's not a huge problem -- we can always increase 
> the checkpoint timeout time -- this anomalous situation makes me think there 
> is something wrong in our pipeline or in our cluster, and that is what is 
> making the checkpoint creation go crazy.)
> 
> On Fri, Sep 14, 2018 at 8:00 PM, Julio Biason  > wrote:
> Hey guys,
> 
> On our pipeline, we have a single slot that it's taking longer to create the 
> checkpoint compared to other slots and we are wondering what could be causing 
> it.
> 
> The operator in question is the window metric -- the only element in the 
> pipeline that actually uses the state. While the other slots take 7 mins to 
> create the checkpoint, this one -- and only this one -- takes 55mins.
> 
> Is there something I should look at to understand what's going on?
> 
> (We are storing all checkpoints in HDFS, in case that helps.)
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554
> 
> 
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554
> -- 
> Liu, Renjie
> Software Engineer, MVAD
> -- 
> Liu, Renjie
> Software Engineer, MVAD



Re: Flink 1.3.2 RocksDB map segment fail if configured as state backend

2018-09-17 Thread Stefan Richter
Hi,

I think the exception pretty much says what is wrong, the native library cannot 
be mapped into the process because of some access rights problem. Please make 
sure that your path /tmp has the exec right.

Best,
Stefan

> Am 17.09.2018 um 11:37 schrieb Andrea Spina :
> 
> Hi everybody,
> 
> I run with a Flink 1.3.2 installation on a Red Hat Enterprise Linux Server 
> and I'm not able to set rocksdb as state.backend due to this error whenever I 
> try to deploy any job:
> 
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:678)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:666)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Could not load the native RocksDB library
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:560)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:298)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:756)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
> ... 6 more
> Caused by: java.lang.UnsatisfiedLinkError: 
> /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so: 
> /tmp/rocksdb-lib-ab7e3d3688fe883981ec37668bf2cbc3/librocksdbjni-linux64.so: 
> failed to map segment from shared object: Operation not permitted
> at java.lang.ClassLoader$NativeLibrary.load(Native Method)
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
> at java.lang.Runtime.load0(Runtime.java:809)
> at java.lang.System.load(System.java:1086)
> at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
> at 
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:537)
> ... 9 more
> 
> Machine detail
> 
> NAME="Red Hat Enterprise Linux Server"
> VERSION="7.5 (Maipo)"
> ID="rhel"
> ID_LIKE="fedora"
> 
> Cpu architecture (cat /proc/cpuinfo)
> 
> processor: 0
> vendor_id: GenuineIntel
> cpu family: 6
> model: 45
> model name: Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
> stepping: 2
> microcode: 0x710
> cpu MHz: 2200.000
> cache size: 20480 KB
> physical id: 0
> siblings: 4
> core id: 0
> cpu cores: 4
> apicid: 0
> initial apicid: 0
> fpu: yes
> fpu_exception: yes
> cpuid level: 13
> wp: yes
> flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov 
> pat pse36 clflush dts mmx fxsr sse sse2 ss ht syscall nx rdtscp lm 
> constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> aperfmperf pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic popcnt aes 
> xsave avx hypervisor lahf_lm epb dtherm ida arat pln pts
> bogomips: 4400.00
> clflush size: 64
> cache_alignment: 64
> address sizes: 40 bits physical, 48 bits virtual
> 
> I found similar errors related to different systems [1] and similar problems 
> with rocksdb related to endianness [2], but mine sounds different.
> Since the upgrade to newer Flink version atm might be painful, are there any 
> reason behind this exception, and is a workaround existing?
> 
> Thank you so much,
> 
> Andrea
> 
> [1] - 
> https://communities.ca.com/thread/241773398-em-failed-to-start-tmplibrocksdbjni8883977260053861907so-failed-to-map-segment-from-shared-object-operation-not-permitted
>  
> 
> 
> [2] - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unable-to-use-Flink-RocksDB-state-backend-due-to-endianness-mismatch-td13473.html
>  
> 
> 
> -- 
> Andrea Spina
> Software Engineer @ Radicalbit Srl 
> Via Borsieri 41, 20159, Milano - IT



Re: After OutOfMemoryError State can not be readed

2018-09-07 Thread Stefan Richter
Hi,

what I can say is that any failures like OOMs should not corrupt checkpoint 
files, because only successfully completed checkpoints are used for recovery by 
the job manager. Just to get a bit more info, are you using full or incremental 
checkpoints? Unfortunately, it is a bit hard to say from the given information 
what the cause of the problem is. Typically, these problems have been observed 
when something was wrong with a serializer or a stateful serializer was used 
from multiple threads.

Best,
Stefan 

> Am 07.09.2018 um 05:04 schrieb vino yang :
> 
> Hi Edward,
> 
> From this log: Caused by: java.io.EOFException, it seems that the state 
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
> 
> Thanks, vino.
> 
> Edward Rojas mailto:edward.roja...@gmail.com>> 
> 于2018年9月7日周五 上午1:22写道:
> Hello all,
> 
> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend. 
> When performing some load testing we got an /OutOfMemoryError: native memory
> exhausted/, causing the job to fail and be restarted.
> 
> After the Taskmanager is restarted, the job is recovered from a Checkpoint,
> but it seems that there is a problem when trying to access the state. We got
> the error from the *onTimer* function of a *onProcessingTime*.
> 
> It would be possible that the OOM error could have caused to checkpoint a
> corrupted state?
> 
> We get Exceptions like:
> 
> TimerException{java.lang.RuntimeException: Error while retrieving data from
> RocksDB.}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.lang.Thread.run(Thread.java:811)
> Caused by: java.lang.RuntimeException: Error while retrieving data from
> RocksDB.
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> ... 7 more
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:208)
> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
> ... 12 more
> 
> 
> Thanks in advance for any help
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 



Re: Increased Size of Incremental Checkpoint

2018-09-06 Thread Stefan Richter
Hi,

you should expect that the size can vary for some checkpoints, even if the 
change rate is constant. Some checkpoints will upload compacted replacements 
for previous checkpoints to prevent that the checkpoint history will grow 
without bounds. Whenever that
happens, the checkpoint does some „extra work“ by re-uploading compacted/merged 
versions of previous deltas.

Best,
Stefan 

> Am 05.09.2018 um 22:29 schrieb burgesschen :
> 
> Hi guys,
> I enabled incremental flink checkpoint for my flink job. I had the job read
> messages at a stable rate. For each message, the flink job store something
> in the keyed state. My question is: For every minute, the increased state
> size is the same, shouldn't the incremental checkpoint size remain
> relatively constant also? How come it is increasing as shown in the picture?
> Thank you!
> 
> 
> 
>  
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error while trigger checkpoint due to Kyro Exception

2018-08-19 Thread Stefan Richter
Hi,

this problem is fixed in Flink >= 1.4.3, see 
https://issues.apache.org/jira/browse/FLINK-8836 
.

Best,
Stefan

> Am 18.08.2018 um 05:07 schrieb Bruce Qiu :
> 
> Hi Community,
> I am using Flink 1.4.2 to do streaming processing. I fetch data from Kafka 
> and write the parquet file to HDFS. In the previous environment, the Kafka 
> had 192 partitions and I set the source parallelism to 192, the application 
> works fine. But recently we had increased the Kafka paritions to 384. So I 
> changed the source parallelism to 384. After I made this change, the 
> application throws the exception as blow, and the checkpoint is always fail. 
> Also I saw the backpressure is very high in the ColFlatMap stage. My 
> application’s DAG as blow. Can someone helps me about this exception, thanks 
> a lot.
>  
> DAG Stage
>  
> 
>  
>  
> Exception stack trace:
>  
> java.lang.Exception: Error while triggering checkpoint 109 for Source: Custom 
> Source (257/384)
>   at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not perform checkpoint 109 for operator 
> Source: Custom Source (257/384).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)
>   at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)
>   ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 109 for operator 
> Source: Custom Source (257/384).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)
>   ... 7 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>   at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>   at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>   at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.(DefaultOperatorStateBackend.java:448)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:460)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:363)
>   ... 12 more
>  
>  
> Regards,
> Bruce
> 



Re: Need a clarification about removing a stateful operator

2018-08-17 Thread Stefan Richter
Hi,

it will not be transported. The JM does the state assignment to create the 
deployment information for all tasks. If will just exclude the state for 
operators that are not present. So in your next checkpoints they will no longer 
be contained.

Best,
Stefan

> Am 17.08.2018 um 09:26 schrieb Tony Wei :
> 
> Hi Chesnay,
> 
> Thanks for your quick reply. I have another question. Will the state, which 
> is ignored, be transported
> to TMs from DFS? Or will it be detected by JM's checkpoint coordinator and 
> only those states reuired
> by operators be transported to each TM?
> 
> Best,
> Tony Wei
> 
> 2018-08-17 14:38 GMT+08:00 Chesnay Schepler  >:
> The state won't exist in the snapshot.
> 
> 
> On 17.08.2018 04:38, Tony Wei wrote:
>> Hi all,
>> 
>> I'm confused about the description in documentation. [1]
>> 
>> Removing a stateful operator: The state of the removed operator is lost 
>> unless
>> another operator takes it over. When starting the upgraded application, you 
>> have
>> to explicitly agree to discard the state.
>> Does that mean if I take a full snapshot (e.g. savepoint) after restoring by 
>> explicitly agreeing to
>> discard the state, then the state won't exist in that snapshot? Or does it 
>> just mean ignore the
>> state but the state still exist forever, unless I explicitly purge that 
>> state by using state operator?
>> 
>> And could this behavior differ between different state backend (Memory, FS, 
>> RocksDB) ?
>> 
>> Many thanks,
>> Tony Wei
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#application-topology
>>  
>> 
> 



Re: Window state with rocksdb backend

2018-08-09 Thread Stefan Richter
Hi,

it is not quiet clear to me what your window function is doing, so sharing some 
(pseudo) code would be helpful. Is it ever calling a update-function for the 
state you are trying to modify? From the information I have it seems not the be 
the case and that is a wrong use of the API which required you to call the 
update method. This wrong use somewhat seems to work for heap-based backends, 
because you are manipulating the objects directly (for efficiency reasons, 
otherwise we always had to make deep defensive copies), but this will not work 
for RocksDB because you always just work on a de-serialized copy of the 
ground-truth, and that is why updates are explicit.

Best,
Stefan

> Am 09.08.2018 um 10:36 schrieb 祁明良 :
> 
> Hi all,
> 
> This is mingliang, I got a problem with rocksdb backend.
> 
> I'm currently using a 15min SessionWindow which also fires every 10s, there's 
> no pre-aggregation, so the input of WindowFunction would be the whole 
> Iterator of input object.
> For window operator, I assume this collection is also a state that maintained 
> by Flink.
> Then, in each 10s fire, the window function will take the objects out from 
> iterator and do some update, and in next fire, I assume I would get the 
> updated value of that object.
> With File system backend it was successful but eats a lot of memory and 
> finally I got GC overhead limit, then I switch to rocksdb backend and the 
> problem is the object in the next fire round is not updated by the previous 
> fire round.
> Do I have to do some additional staff with rocksdb backend in this case?
> 
> Thanks in advance
> Mingliang
> 
> 
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>  
> This communication may contain privileged or other confidential information 
> of Red. If you have received it in error, please advise the sender by reply 
> e-mail and immediately delete the message and any attachments without copying 
> or disclosing the contents. Thank you.



Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread Stefan Richter
Hi,

ok, let me briefly explain the differences between local working director, 
checkpoint directory, and savepoint directory and also outline their best 
practises/requirements/tradeoffs. First easy comment is that typically 
checkpoints and savepoints have similar requirements and most users write them 
to the same fs. The working directory, i.e. the directory for spilling or where 
RocksDB operates is transient, it does not require replication because it is 
not part of the fault tolerance strategy. Here the main concern is speed and 
that is why it is ideally a local, physically attached disk on the TM machine.

In contrast to that, checkpoints and savepoints are part of the fault tolerance 
strategy and that is why they typically should be on fault tolerant file 
systems. In database terms, think of checkpoints as a recovery mechanism and 
savepoints as backups. As we usually want to survive node failures, those file 
systems should be fault tolerant/replicated, and also accessible for read/write 
from all TMs and the JM. TMs obviously need to write the data, and read in 
recovery. Under node failures, this means that a TM might have to read state 
that was written on a different machine, that is why TMs should be able to 
access the files written by other TMs. The JM is responsible for deleting 
checkpoints, because TMs might go down and that is why the JM needs access as 
well.

Those requirements typically hold for most Flink users. However, you might get 
away with certain particular trade-offs. You can write checkpoints to local 
disk if:

- Everything runs on one machine, or

- (not sure somebody ever did this, but it could work)
1) You will do the cleanup of old checkpoints manually (because JM cannot reach 
them), e.g. with scripts and
2) You will never try to rescale from a checkpoint and 
3) Tasks will never migrate to a different machine. You ignore node/disk/etc 
failures, and ensure that your job „owns" the cluster with no other jobs 
running in parallel. This means accepting data loss in the previous cases.

Typically, it should be ok to use a dfs only for checkpoints and savepoints, 
the local working directories should not go to dfs or else things will slow 
down dramatically. If you are just worried about recovery times, you might want 
to take a look at the local recovery feature [1], that keeps a secondary copy 
of the state on local disk for faster restores, but still ensures fault 
tolerance with a primary copy in dfs.

Best,
Stefan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#task-local-recovery

> Am 23.07.2018 um 14:18 schrieb ashish pok :
> 
> Sorry,
> 
> Just a follow-up. In absence of NAS then the best option to go with here is 
> checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?
> 
> We were trying to not even hit HDFS other than for savepoints.
> 
> 
> - Ashish
> On Monday, July 23, 2018, 7:45 AM, ashish pok  wrote:
> 
> Stefan,
> 
> I did have first point at the back of my mind. I was under the impression 
> though for checkpoints, cleanup would be done by TMs as they are being taken 
> by TMs.
> 
> So for a standalone cluster with its own zookeeper for JM high availability, 
> a NAS is a must have? We were going to go with local checkpoints with access 
> to remote HDFS for savepoints. This sounds like it will be a bad idea then. 
> Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
> datacenters - there is a mountain of security complainace to climb before we 
> will in Production if we need to go that route.
> 
> Thanks, Ashish
> 
> On Monday, July 23, 2018, 5:10 AM, Stefan Richter 
>  wrote:
> 
> Hi,
> 
> I am wondering how this can even work properly if you are using a local fs 
> for checkpoints instead of a distributed fs. First, what happens under node 
> failures, if the SSD becomes unavailable or if a task gets scheduled to a 
> different machine, and can no longer access the disk with the  corresponding 
> state data, or if you want to scale-out. Second, the same problem is also 
> what you can observe with the job manager: how could the checkpoint 
> coordinator, that runs on the JM, access a file on a local FS on a different 
> node to cleanup the checkpoint data? The purpose of using a distributed fs 
> here is that all TM and the JM can access the checkpoint files.
> 
> Best,
> Stefan
> 
> > Am 22.07.2018 um 19:03 schrieb Ashish Pokharel  > <mailto:ashish...@yahoo.com>>:
> > 
> > All,
> > 
> > We recently moved our Checkpoint directory from HDFS to local SSDs mounted 
> > on Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> > complex ML apps were spinning up more and more in YARN). This worked great 
> > other than the fact that w

Re: Flink job hangs using rocksDb as backend

2018-07-23 Thread Stefan Richter
Hi,

yes, timers cannot easily fire in parallel to event processing for correctness 
reasons because they both manipulate the state and there should be a distinct 
order of operations. If it is literally stuck, then it is obviously a problem. 
From the stack trace it looks pretty clear that the culprit would be RocksDB, 
if that is where it blocks. I cannot remember any report of a similar problem 
so far, and we are running this version of RocksDB for quiet some time with 
many users. At the same time I feel like many people are using SSDs for local 
storage these days. You could run the JVM with a tool that allows you to also 
get the native traces and system calls to see where RocksDB is potentially 
stuck. Something we could eventually try is updating the RocksDB version, but 
that is currently still blocked by a performance regression in newer RocksDB 
versions, see https://github.com/facebook/rocksdb/issues/3865 
<https://github.com/facebook/rocksdb/issues/3865>.

Best,
Stefan 

> Am 23.07.2018 um 12:56 schrieb shishal singh :
> 
> Thanks Stefan,
> 
> You are correct , I learned the hard way that when timers fires it stops 
> processing new events till the time all timers callback completes. This is 
> the points when I decided to isolate the problem by scheduling only 5-6K 
> timers in total so that even if its taking time in timers it should progress 
> after a reasonable period of time. But event after I left it running whole 
> night, watermark didn't progressed at all and cpu still shows 100% usages 
> without any error log(either JM of TM). The stack trace I shared in the one I 
> took in the morning. 
> 
> Also to isolate any problem with elastic sink, I removed sink and just did 
> stream.print() at the end.
> 
> I am using  spinning disk and set following option 
> 
> setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); // Also 
> tried SPINNING_DISK_OPTIMIZED_HIGH_MEM
> 
> My cluster setup has 3 node (Its a private cloud machine and has 4 cpu core 
> each) and 1 TM with 4 slot each running on each node.  Also Job manager and 
> hadoop is also running on same 3 node. 
> 
> My job graph look like this:
> 
> 
> I am using following config with checkpointing interval of 10min and hadoop 
> to store checkpoint.
> 
>  RocksDBStateBackend backend = new 
> RocksDBStateBackend(baseDir+"/checkpoints", true);
> 
> backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> env.setStateBackend(backend);
> env.enableCheckpointing(intervalMilli);
> 
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(intervalMilli);
> env.getCheckpointConfig().setCheckpointTimeout(timeoutMilli);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().enableExternalizedCheckpoints( 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> 
> The last thing I am intended to try is using FSSatebackend to be sure if its 
> rocksDB related issue, but the problem is sometimes issue get reproduced 
> after couple of days. 
> 
> Regards,
> Shishal
> 
> 
> On Mon, Jul 23, 2018 at 10:08 AM Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> let me first clarify what you mean by „stuck“, just because your job stops 
> consuming events for some time does not necessarily mean that it is „stuck“. 
> That is very hard to evaluate from the information we have so far, because 
> from the stack trace you cannot conclude that the thread is „stuck“, because 
> it looks like it is just processing firing timers. And while timers are 
> firing, the pipeline will stop consuming further events until all timers have 
> been processed. Even if your thread dump looks the same all the time, it 
> could just be that you observe the same call (the most expensive one) across 
> multiple invocations and is not necessarily an indicator for the thread being 
> stuck. Attaching a sampler or introducing logging to one of the seemingly 
> stuck task JVMs could clarify this a bit more. For now I am assuming that it 
> makes progress but spends a lot of work on timers. Why you might experience 
> this randomly is, for example, if your watermark makes a bigger jump and many 
> (or all) of your timers suddenly fire. From the perspective of consuming 
> events, this could look like being stuck.
> In case that the job really is stuck in the strict sense, it does not look 
> like a Flink problem because your threads are in some call against RocksDB. 
> Since we are not aware of any similar problem from the mailing list, a setup 
> problem would be the most likel

Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread Stefan Richter
Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that all TM 
and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel :
> 
> All,
> 
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
> Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> complex ML apps were spinning up more and more in YARN). This worked great 
> other than the fact that when jobs are being canceled or canceled with 
> Savepoint, local data is not being cleaned up. In HDFS, Checkpoint 
> directories were cleaned up on Cancel and Cancel with Savepoints as far as I 
> can remember. I am wondering if it is permissions issue. Local disks have RWX 
> permissions for both yarn and flink headless users (flink headless user 
> submits the apps to YARN using our CICD pipeline). 
> 
> Appreciate any pointers on this.
> 
> Thanks, Ashish



Re: Memory Leak in ProcessingTimeSessionWindow

2018-07-23 Thread Stefan Richter
Hi,

for most windows, all state is cleared through FIRE_AND_PURGE, except for 
windows that are subtypes of merging windows, such as session windows. Here, 
the state still remembers the window itself until the watermark passes the 
session timeout+allowed lateness. This is done so that elements that fall into 
the window after firing can still resurrect the window’s information, see 
WindowOperator.clearAllState(). Only after that, all state from the session 
window is removed. Looking in Aljoscha, who might have more ideas about the 
best ways to implement your use case.

Best,
Stefan

> Am 22.07.2018 um 18:19 schrieb Ashish Pokharel :
> 
> One more attempt to get some feedback on this. It basically boils down to 
> using High-Level Window API in scenarios where keys are unbounded / infinite 
> but can be expired after certain time. From what we have observed (solution 2 
> below), some properties of keys are still in state (guessing key itself and 
> watermarks etc). Is there any way to clean these up as FIRE_AND_PURGE trigger 
> doesn’t seem to do it? I am of an option that even if we end up using HDFS or 
> RocksDB backed State, we would think we would still want to clean those up. 
> Any suggestions on this before we start re-writing our apps to start using 
> Low-Level Process APIs in general? 
> 
> Thanks, Ashish
> 
>> On Jul 2, 2018, at 10:47 AM, ashish pok > > wrote:
>> 
>> All,
>> 
>> I have been doing a little digging on this and to Stefan's earlier point 
>> incrementing memory (not necessarily leak) was essentially because of keys 
>> that were incrementing as I was using time buckets concatenated with actual 
>> key to make unique sessions.
>> 
>> Taking a couple of steps back, use case is very simple tumbling window of 15 
>> mins by keys. Stream can be viewed simply as:
>> 
>> ||
>> 
>> We have a few of these type of pipelines and one catch here is we wanted to 
>> create an app which can process historical and current data. HIstorical data 
>> is mainly because users adhoc request for "backfill". In order to easily 
>> manage processing pipeline, we are making no distinction between historical 
>> and current data as processing is based on event time. 
>> 
>> 1) Of course, easiest way to solve this problem is to create TumblingWindow 
>> of 15mins with some allowed lateness. One issue here was watermarks are 
>> moved forward and backfill data appeared to be viewed as late arrival data, 
>> which is a correct behavior from Flink perspective but seems to be causing 
>> issues in how we are trying to handle streams.
>> 
>> 2) Another issue is our data collectors are highly distributed - we 
>> regularly get data from later event time buckets faster than older buckets. 
>> Also, it is also more consistent to actually create 15min buckets using 
>> concept of Session instead. So I am creating a key with 
>> | and a session gap of say 10 mins. This works 
>> perfectly from business logic perspective. However, now I am introducing 
>> quite a lot of keys which based on my heap dumps seem to be hanging around 
>> causing memory issues.
>> 
>> 3) We converted the apps to a Process function and manage all states using 
>> key defined in step (2) and registering/unregistering timeouts. 
>> 
>> Solution (3) seems to be working pretty stable from memory perspective. 
>> However, it just feels like with so much high-level APIs available, we are 
>> not using them properly and keep reverting back to low level Process APIs - 
>> in the last month we have migrated about 5 or 6 apps to Process now :) 
>> 
>> For solution (2) it feels like any other Session aggregation use case will 
>> have the issue of keys hanging around (eg: for click streams with user 
>> sessions etc). Isn't there a way to clear those session windows? Sorry, I 
>> just feel like we are missing something simple and have been reverting to 
>> low level APIs instead.
>> 
>> Thanks, Ashish
> 



Re: Events can overtake watermarks

2018-07-23 Thread Stefan Richter
Hi,

events overtaking watermarks doesn’t sound like a „wrong“ behaviour, only 
watermarks overtaking events would be bad. Do you think this only stated from 
Flink 1.5? To me this does not sound like a problem, but not sure if it is 
intended. Looping in Aljoscha, just in case.

Best,
Stefan

> Am 22.07.2018 um 22:19 schrieb Gyula Fóra :
> 
> Hi,
> In 1.5.1 I have noticed some strange behaviour that happens quite frequently 
> and I just want to double check with you that this is intended.
> 
> If I have a non-parallel source that takes the following actions:
> 
> emit: event1
> emit: watermark1
> emit: event2
> 
> it can happen that a downstream operators receives watermark1 after event2. 
> It doesn't happen very often but definitely seems to happen sometimes.
> 
> Maybe this is a property of the broadcastEmit(..) method but it seems a 
> little funny :)
> 
> Thanks for the clarification!
> 
> Gyula



Re: Flink job hangs using rocksDb as backend

2018-07-23 Thread Stefan Richter
sDBMapState.java:102)
>   at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>   at 
> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:99)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   - locked <0x000302b404a0> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> 
>Locked ownable synchronizers:
> 
> 
> Regards,
> Shishal
> 
> 
> On Thu, Jul 12, 2018 at 4:11 PM Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> Did you check the metrics for the garbage collector? Stuck with high CPU 
> consumption and lots of timers sound like there could be a possible problem, 
> because timer are currently on-heap objects, but we are working on 
> RocksDB-based timers right now.
> 
> Best,
> Stefan
> 
>> Am 12.07.2018 um 14:54 schrieb shishal singh > <mailto:shisha...@gmail.com>>:
>> 
>> Thanks Stefan/Stephan/Nico,
>> 
>> Indeed there are 2 problem. For the 2nd problem ,I am almost certain that 
>> explanation given by Stephan is the true as in my case as there number of 
>> timers are in millions. (Each for different key so I guess coalescing is not 
>> an option for me). 
>> 
>> If I simplify my problem, each day I receive millions of events (10-20M) and 
>> I have to schedule a timer for next day 8 AM to check if matching events are 
>> there , if not I have to send it to Elastic sink as Alert. I suspected that 
>> having so many timers fires at same time could cause my jobs to hang, so I 
>> am now scheduling times randomly between (8AM-to 10AM). But still my job 
>> gets hang after some time.  One more thing which I noticed that when my job 
>> gets hang CPU utilization shoot to almost 100%.
>> I tried to isolate problem by removing ES sink and just did stream.print() 
>> and yet problem persist. 
>> 
>> In my current setup, I am running a standalone cluster of 3 machine (All 
>> three server has Task manger, Job manager and Hadoop on it). So I am not 
>> using EBS for rocksDB.
>> 
>>  Also I verified that when jobs gets hang even timers are not being called 
>> as I have debug statement in Timers and only logs I see at that time are 
>> following :
>> 
>> 2018-07-12 14:35:30,423 DEBUG 
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got 
>> ping response for sessionid: 0x2648355f7c6010f after 11ms
>> 2018-07-12 14:35:31,957 DEBUG 
>> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
>> heartbeat to JobManager
>> 2018-07-12 14:35:36,946 DEBUG 
>> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
>> heartbeat to JobManager
>> 2018-07-12 14:35:41,963 DEBUG 
>> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
>> heartbeat to JobManager
>> 2018-07-12 14:35:43,775 DEBUG 
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got 
>> ping response for sessionid: 0x2648355f7c6010f after 10ms
>> 2018-07-12 14:35:46,946 DEBUG 
>> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
>> heartbeat to JobManager
>> 2018-07-12 14:35:51,954 DEBUG 
>> org.apache.flink.runtime.taskmanager.TaskManager 

Re: Flink job hangs using rocksDb as backend

2018-07-12 Thread Stefan Richter
Hi,

Did you check the metrics for the garbage collector? Stuck with high CPU 
consumption and lots of timers sound like there could be a possible problem, 
because timer are currently on-heap objects, but we are working on 
RocksDB-based timers right now.

Best,
Stefan

> Am 12.07.2018 um 14:54 schrieb shishal singh :
> 
> Thanks Stefan/Stephan/Nico,
> 
> Indeed there are 2 problem. For the 2nd problem ,I am almost certain that 
> explanation given by Stephan is the true as in my case as there number of 
> timers are in millions. (Each for different key so I guess coalescing is not 
> an option for me). 
> 
> If I simplify my problem, each day I receive millions of events (10-20M) and 
> I have to schedule a timer for next day 8 AM to check if matching events are 
> there , if not I have to send it to Elastic sink as Alert. I suspected that 
> having so many timers fires at same time could cause my jobs to hang, so I am 
> now scheduling times randomly between (8AM-to 10AM). But still my job gets 
> hang after some time.  One more thing which I noticed that when my job gets 
> hang CPU utilization shoot to almost 100%.
> I tried to isolate problem by removing ES sink and just did stream.print() 
> and yet problem persist. 
> 
> In my current setup, I am running a standalone cluster of 3 machine (All 
> three server has Task manger, Job manager and Hadoop on it). So I am not 
> using EBS for rocksDB.
> 
>  Also I verified that when jobs gets hang even timers are not being called as 
> I have debug statement in Timers and only logs I see at that time are 
> following :
> 
> 2018-07-12 14:35:30,423 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 11ms
> 2018-07-12 14:35:31,957 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:36,946 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:41,963 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:43,775 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 10ms
> 2018-07-12 14:35:46,946 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:51,954 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:56,967 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:57,127 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 8ms
> 2018-07-12 14:36:01,944 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:36:06,955 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:36:08,287 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Receiver 
> TriggerCheckpoint 155@1531398968248 for d9af2f1da87b7268cc03e152a6179eae.
> 2018-07-12 14:36:08,287 DEBUG org.apache.flink.runtime.taskmanager.Task   
>   - Invoking async call Checkpoint Trigger for Source: Event 
> Source -> filter (1/1) (d9af2f1da87b7268cc03e152a6179eae). on task Source: 
> Event Source -> filter (1/1)
> 2018-07-12 14:36:10,476 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 10ms
> 2018-07-12 14:36:11,957 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 
> As I expected checkpoint also start to fail during this time.
> 
> My Job Graph is pretty much simple : Source-->filter-- times>--->Sink
> 
> 
> Regards,
> Shishal
> 
> 
> On Thu, Jul 12, 2018 at 9:54 AM Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> adding to what has already been said, I think that here can be two orthogonal 
> problems here: i) why is your job slowing down/getting stuck? and ii) why is 
> cancellation blocked? As for ii) I think Stephan already gave to right reason 
> that shutdown could take longer and that is what gets the TM killed.
> 
> A more interesting question could still be i), why is your job slowing down 
> until shutdown in the first place. I have two questions here.First, are you 
> runn

Re: help understand/debug high memory footprint on jobmanager

2018-06-29 Thread Stefan Richter
Hi Steven,

from your analysis, I would conclude the following problem. ExecutionVertexes 
hold executions, which are bootstrapped with the state (in form of the map of 
state handles) when the job is initialized from a checkpoint/savepoint. It 
holds a reference on this state, even when the task is already running. I would 
assume it is save to set the reference to TaskStateSnapshot to null at the end 
of the deploy() method and can be GC’ed. From the provided stats, I cannot say 
if maybe the JM is also holding references to too many ExecutionVertexes, but 
that would be a different story.

Best,
Stefan

> Am 29.06.2018 um 01:29 schrieb Steven Wu :
> 
> First, some context about the job
> * embarrassingly parallel: all operators are chained together
> * parallelism is over 1,000
> * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
> * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
> * internal checkpoint with 10 checkpoints retained in history
> 
> We don't expect jobmanager to use much memory at all. But it seems that this 
> high memory footprint (or leak) happened occasionally, maybe under certain 
> conditions. Any hypothesis?
> 
> Thanks,
> Steven
> 
> 
> 41,567 ExecutionVertex objects retained 9+ GB of memory
> 
> 
> 
> Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator
> 
> 



Re: Storing Streaming Data into Static source

2018-06-26 Thread Stefan Richter
Hi,

I think this is not really a Flink related question. In any case, you might 
want to specify a bit more what you mean by „better", because usually there is 
no strict better but trade-offs and what is „better“ to somebody might not be 
„better“ for you.

Best,
Stefan

> Am 26.06.2018 um 12:54 schrieb Rad Rad :
> 
> Hi all, 
> 
> 
> Kindly, I want to save streaming data which subscribed from Kafka into a
> static data source. Which is better /MongoDB or PostgreSQL. 
> 
> 
> Radhya. 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: String Interning

2018-06-26 Thread Stefan Richter
Hi,

you can enable object reuse via the execution config [1]: „By default, objects 
are not reused in Flink. Enabling the object reuse mode will instruct the 
runtime to reuse user objects for better performance. Keep in mind that this 
can lead to bugs when the user-code function of an operation is not aware of 
this behavior.“.

Best,
Stefan

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/execution_configuration.html
 


> Am 22.06.2018 um 20:09 schrieb Martin, Nick :
> 
> I have a job where I read data from Kafka, do some processing on it, and 
> write it to a database. When I read data out of Kafka, I put it into an 
> object that has a String field based on the Kafka message key. The possible 
> values for the message key are tightly constrained so there are fewer than 
> 100 possible unique key values. Profiling of the Flink job shows millions of 
> in flight stream elements, with an equal number of Strings, but I know all 
> the strings are duplicates of a small number of unique values.  So it’s an 
> ideal usecase for String interning. I’ve tried to use interning in the 
> constructors for the message elements, but I suspect that I need to do 
> something to preserve the interning when Flink serializes/deserializes 
> objects when passing them between operators. What’s the best way to 
> accomplish that?
>  
>  
>  
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *



Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-20 Thread Stefan Richter
Hi,

it is possible that the number of processing time timers can grow, because 
internal timers are scoped by time, key, and namespace (typically this means 
„window“, because each key can be part of multiple windows). So if the number 
of keys in your application is steadily growing this can happen. 

To analyse the heap dump, I usually take the following approach:
- obviously include only reachable objects. If dumps are very big, try limit 
the size or to trigger the OOM earlier by configuring a lower heap size. It 
should still give you the problematic object accumulation, if there is one.
- like at the statistics of „heavy hitter“ classes, i.e. classes for which the 
instances contribute the most to the overall heap consumption. Sometimes this 
will show you classes that are also part of classes that rank higher up, e.g. 
1st place could be string, and second place char[]. But you can figure that out 
in the next step.
- explore the instances of the top heavy hitter class(es). If there is a leak, 
if you just randomly sample into some objects, the likelihood is usually *very* 
high that you catch an object that is part of the leak (as determined in the 
next step). Otherwise just repeat and sample another object.
- inspect the object instance and follow the reference links to the parent 
objects in the object graph that hold a reference to the leak object candidate. 
You will typically end up in some array where the leak accumulates. Inspect the 
object holding references to the leaking objects. You can see the field values 
and this can help to determine if the collection of objects is justified or if 
data is actually leaking. So in your case, you can start from some 
InternalTimer or Window object, backwards through the reference chain to see 
what class is holding onto them and why (e.g. should they already be gone 
w.r.t. to their timestamp). Walking through the references should be supported 
by all major heap analysis tools, including JVisualVM that comes with your JDK. 
You can also use OQL[1] to query for timers or windows that should already be 
gone.

Overall I think it could at least be helpful to see the statistics for heavy 
hitter classes and screenshots of representative reference chains to objects to 
figure out the problem cause. If it is not possible to share heap dumps, 
unfortunately I think giving you this strategy is currently the best I can 
offer to help. 

Best,
Stefan


[1] https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql 
<https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql>

> Am 20.06.2018 um 02:33 schrieb ashish pok :
> 
> All, 
> 
> I took a few heap dumps (when app restarts and at 2 hour intervals) using 
> jmap, they are 5GB to 8GB. I did some compares and what I can see is heap 
> shows data tuples (basically instances of object that is maintained as 
> states) counts going up slowly. 
> 
> Only thing I could possibly relate that to were 
> streaming.api.operators.InternalTimer and 
> streaming.api.windowing.windows.TimeWindow both were trending up as well. 
> There are definitely lot more windows created than the increments I could 
> notice but nevertheless those objects are trending up. Input stream has a 
> very consistent sin wave throughput. So it really doesn't make sense for 
> windows and tuples to keep trending up. There is also no event storm or 
> anything of that sort (ie. source stream has been very steady as far as 
> throughput is concerned).
> 
> Here is a plot of heap utilization:
> 
> <1529454480422blob.jpg>
> So it has a typical sin wave pattern which is definitely expected as input 
> stream has the same pattern but source doesnt have a trend upwards like heap 
> utilization shown above. Screenshot above is showing spike from 60% 
> utilization to 80% and trend keeps going up until an issue occurs that resets 
> the app.
> 
> Since processing is based on ProcessingTime, I really would have expected 
> memory to reach a steady state and remain sort of flat from a trending 
> perspective. 
> 
> Appreciate any pointers anyone might have.
> 
> Thanks, Ashish
> 
> On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok  
> wrote:
> 
> 
> Right, thats where I am headed now but was wondering there are any “gochas” I 
> am missing before I try and dig into a few gigs of heap dump. 
> 
> 
> Thanks, Ashish
> 
> 
> Sent from Yahoo Mail for iPhone <https://overview.mail.yahoo.com/?.src=iOS>
> 
> On Monday, June 18, 2018, 3:37 AM, Stefan Richter 
>  wrote:
> 
> Hi,
> 
> can you take a heap dump from a JVM that runs into the problem and share it 
> with us? That would make finding the cause a lot easier.
> 
> Best,
> Stefan
> 
>> Am 15.06.2018 um 23:01 schrieb ashish pok > <mailto:ashish...@yahoo.c

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-18 Thread Stefan Richter
Hi,

can you take a heap dump from a JVM that runs into the problem and share it 
with us? That would make finding the cause a lot easier.

Best,
Stefan

> Am 15.06.2018 um 23:01 schrieb ashish pok :
> 
> All,
> 
> I have another slow Memory Leak situation using basic TimeSession Window 
> (earlier it was GlobalWindow related that Fabian helped clarify). 
> 
> I have a very simple data pipeline:
> 
> DataStream processedData = rawTuples
>   
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780
>  
>   .trigger(new ProcessingTimePurgeTrigger())
>   .apply(new IPSLAMetricWindowFn())
>   .name("windowFunctionTuple")
>   .map(new TupleToPlatformEventMapFn())
>   .name("mapTupleEvent")
>   ;
>   
> 
> I initially didnt even have ProcessingTmePurgeTrigger and it was using 
> default Trigger. In an effort to fix this issue, I created my own Trigger 
> from default ProcessingTimeTrigger with simple override to onProcessingTime 
> method (essentially replacing FIRE with FIRE_AND_PURGE)
> 
> @Override
>   public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) {
>   return TriggerResult.FIRE_AND_PURGE;
>   }
> 
> This seems to have done nothing (may have delayed issue by couple of hours - 
> not certain). But, I still see heap utilization creep up slowly and 
> eventually reaches a point when GC starts to take too long and then the 
> dreaded OOM. 
> 
> For completeness here is my Window Function (still using old function 
> interface). It creates few metrics for reporting and applies logic by looping 
> over the Iterable. NO states are explicitly kept in this function, needed 
> RichWindowFunction to generate metrics basically.
> 
> public class IPSLAMetricWindowFn extends RichWindowFunction BasicFactTuple, String, TimeWindow> {
> 
>   private static final long serialVersionUID = 1L;
>   
>   private static Logger logger = 
> LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
>   
>   private Meter in;
>   
>   private Meter out;
> 
>   private Meter error;
>   
>   @Override
>   public void open(Configuration conf) throws Exception {
>   this.in = getRuntimeContext()
> .getMetricGroup()
> .addGroup(AppConstants.APP_METRICS.PROCESS)
> .meter(AppConstants.APP_METRICS.IN, new 
> MeterView(AppConstants.APP_METRICS.INTERVAL_30));
>   this.out = getRuntimeContext()
> .getMetricGroup()
> .addGroup(AppConstants.APP_METRICS.PROCESS)
> .meter(AppConstants.APP_METRICS.OUT, new 
> MeterView(AppConstants.APP_METRICS.INTERVAL_30));
>   this.error = getRuntimeContext()
> .getMetricGroup()
> .addGroup(AppConstants.APP_METRICS.PROCESS)
> .meter(AppConstants.APP_METRICS.ERROR, new 
> MeterView(AppConstants.APP_METRICS.INTERVAL_30));
>   super.open(conf);
>   }
> 
>   @Override
>   public void apply(String key, TimeWindow window, 
> Iterable events, Collector collector) 
> throws Exception {
>   }
> }
> 
> 
> Appreciate any pointers on what could be causing leaks here. This seems 
> pretty straight-forward.
> 
> Thanks, Ashish
> 



Re: Clarity on Flink 1.5 Rescale mechanism

2018-06-12 Thread Stefan Richter
Hi,

it means that you can now modify the parallelism of a running job with a new 
„modify“ command in the CLI, see [1]. Adding task manager will add their 
offered slots to the pool of available slots, it will not automatically change 
the parallelism.

Best,
Stefan

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html 


> Am 12.06.2018 um 11:50 schrieb Sampath Bhat :
> 
> Hello
> In flink 1.5 release notes -
> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
> 
> Various Other Features and Improvements:
> Applications can be rescaled without manually triggering a savepoint. Under
> the hood, Flink will still take a savepoint, stop the application, and
> rescale it to the new parallelism.
> 
> What exactly does this statement mean?
> How to rescale the application ?
> If i'm running my flink job application with parallelism of 5 and then I
> execute the following command
> flink run  -p 10 then will the application rescale to 10
> parallelism?
> 
> Adding on what will be the behavior in flink 1.5 if I increase the number
> of task manager in a flink cluster?
> 
> So if i have a flink job running on flink cluster having 1 task manager and
> i increase the number of task manager to 2 will the flink rescale the flink
> job also or will the flink job be unaffected?



Re: Stopping of a streaming job empties state store on HDFS

2018-06-11 Thread Stefan Richter
Hi,

> Am 08.06.2018 um 01:16 schrieb Peter Zende :
> 
> Hi all,
> 
> We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable 
> sources to be able to  gracefully exit from the job with Yarn state 
> "finished/succeeded".
> This works fine, however after creating a savepoint, stopping the job (stop 
> event) and restarting it we remarked that the RocksDB state hasn't been 
> recovered. It looks like that it's because the state directory on HDFS was 
> emptied after issueing a stop event. This isn't the case when we cancel the 
> job, but we'd like to distinguish between job failures and stop events. After 
> reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not 
> clear why this is the intended behavior.
> Should we use cancel instead?

Savepoints should _not_ be cleaned up in case of stop or cancellation, 
checkpoints should be cleaned up. Where are you storing the created savepoints? 
They should not go into the checkpoint directory. Stop is intended to be a more 
„graceful“ variant of cancel, but I think it is rarely used with Flink. I would 
prefer cancel except if you really require to use stoppable for some particular 
reason.

> When we backup the local state directory, stop the job, copy back the 
> directory and start a new job from the savepoint then it works fine.
> Another issue is that when we restart the job with different source (1st job: 
> HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from 
> savepoint doesn't fail but the local state isn't restored. Is there any trick 
> besides setting allowNonRestoredState?


I need to clarify here, when you say „each having uids set“, do you set the 
same uids for both types of sources? The uid must match, because Flink will 
reassign the state in a restore based on the uids, i.e. state x goes to the 
operator with the same uid as the uid of the operator that created it in the 
previous job. The flag allowNonRestoredState has the purpose to tolerate that 
some state from a checkpoint/savepoint does not find a matching operator to 
which it should be assigned (no operator with matching uid exists in the 
jobgraph). For example, you want this if you removed operators from the job.

Best,
Stefan



Re: Having a backoff while experiencing checkpointing failures

2018-06-11 Thread Stefan Richter
Hi,

I think the behaviour of min_pause_between_checkpoints is either buggy or we 
should at least discuss if it would not be better to respect a pause also for 
failed checkpoints. As far as I know there is no ongoing work to add backoff, 
so I suggest you open a jira issue and make a case for this.

Best,
Stefan

> Am 08.06.2018 um 06:30 schrieb vipul singh :
> 
> Hello all,
> 
> Are there any recommendations on using a backoff when experiencing 
> checkpointing failures?
> What we have seen is when a checkpoint starts to expire, the next checkpoint 
> dosent care about the previous failure, and starts soon after. We 
> experimented with min_pause_between_checkpoints, however that seems only to 
> work for successful checkpoints( the same is discussed on this thread 
> )
> 
> Are there any recommendations on how to have a backoff or is there something 
> in works to add a backoff incase of checkpointing failures? This seems very 
> valuable incase of checkpointing on an external location like s3, where one 
> can be potentially throttled or gets errors like TooBusyException from s3(for 
> example like in this jira )
> 
> Please let us know!
> Thanks,
> Vipul



Re: Kryo Exception

2018-05-25 Thread Stefan Richter
I agree, it looks like one of the two mentioned issues.

> Am 25.05.2018 um 06:15 schrieb sihua zhou :
> 
> Hi Gordon,
> 
> I think this might not be caused by 
> https://issues.apache.org/jira/browse/FLINK-9263 
> , the bug in FLINK-9263 
> should only cause problem of Operate State, but in this case the exceptions 
> thrown from HeapValueState which is a type of Keyed State.
> 
> Best, Sihua
> 
> 
> 
> On 05/25/2018 11:48,Tzu-Li (Gordon) Tai 
>  wrote: 
> Hi,
> 
> FYI, this is the JIRA ticket for the issue: 
> https://issues.apache.org/jira/browse/FLINK-8836 
> 
> Yes, this seems to be only included in 1.5.0 (to be released), and 1.4.3 
> (there has been no discussion on releasing that yet).
> 
> It could also be possible that the reported issue was caused by 
> https://issues.apache.org/jira/browse/FLINK-9263 
>  (which has a fix included 
> in 1.5.0)?
> 
> Cheers,
> Gordon
> On 25 May 2018 at 11:39:03 AM, sihua zhou (summerle...@163.com 
> ) wrote:
> 
>> Hi,
>> this looks like the bug "when duplicating a KryoSerializer does not 
>> duplicate registered default serializers", and this has been fixed on the 
>> branch master, 1.5.0, and 1.4.x. But, unfortunately not included in 
>> 1.4.2(because this bug was discovered after 1.4.2 release). @Stefan plz 
>> correct me if I'm wrong.
>> 
>> 
>> Best, Sihua
>> 
>> On 05/25/2018 05:55,Ya-Te Wong 
>>  wrote:
>> Hello,
>> 
>> We're using Flink version 1.4.2.
>> Our Flink job runs pretty well most of the time. But sometimes we see 
>> exceptions in the Kryo serializer.
>> The timing on when the exceptions would occur seems pretty random.
>> Sometimes we don't see any exceptions for 5 days. Sometimes we get 
>> exceptions within hours.
>> I have captured the stack traces of the last 3 times that the exceptions 
>> occurred. They are not exactly the same. The commonality is that our code in 
>> onTimer() is triggered as part of watermark handling. Our code then tried to 
>> get from the copy-on-write state table and eventually exception occurred in 
>> Kryo.
>> 
>> Has anyone seen something like that before?
>> 
>> Thanks,
>> 
>> 
>> 
>> [ Exception #1 ]
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark:
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
>> at 
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.esotericsoftware.kryo.KryoException: 
>> java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
>> Serialization trace:
>> timestamp (com.mycompany.datascience.datatypes.SensorStateEvent)
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>> at 
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> at 
>> 

Re: Missing MapState when Timer fires after restored state

2018-05-18 Thread Stefan Richter
Hi,

I had a look at the logs from the restoring job and couldn’t find anything 
suspicious in them. Everything looks as expected and the state files are 
properly found and transferred from S3. We are including rescaling in some 
end-to-end tests now and then let’s see what happens. 
If you say that you can reproduce the problem, does that mean reproduce from 
the single existing checkpoint or also creating other problematic checkpoints? 
I am asking because maybe a log from the job that produces the problematic 
checkpoint might be more helpful. You can create a ticket if you want.

Best,
Stefan

> Am 18.05.2018 um 09:02 schrieb Juho Autio <juho.au...@rovio.com>:
> 
> I see. I appreciate keeping this option available even if it's "beta". The 
> current situation could be documented better, though.
> 
> As long as rescaling from checkpoint is not officially supported, I would put 
> it behind a flag similar to --allowNonRestoredState. The flag could be called 
> --allowRescalingRestoredCheckpointState, for example. This would make sure 
> that users are aware that what they're using is experimental and might have 
> unexpected effects.
> 
> As for the bug I faced, indeed I was able to reproduce it consistently. And I 
> have provided TRACE-level logs personally to Stefan. If there is no Jira 
> ticket for this yet, would you like me to create one?
> 
> On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> > 
> > This raises a couple of questions:
> > - Is it a bug though, that the state restoring goes wrong like it does for 
> > my job? Based on my experience it seems like rescaling sometimes works, but 
> > then you can have these random errors.
> 
> If there is a problem, I would still consider it a bug because it should work 
> correctly.
> 
> > - If it's not supported properly, why not refuse to restore a checkpoint if 
> > it would require rescaling?
> 
> It should work properly, but I would preferred to keep this at the level of a 
> "hidden feature“ until it got some more exposure and also some questions 
> about the future of differences between savepoints and checkpoints are 
> solved. 
> 
> > - We have sometimes had Flink jobs where the state has become so heavy that 
> > cancelling with a savepoint times out & fails. Incremental checkpoints are 
> > still working because they don't timeout as long as the state is growing 
> > linearly. In that case if we want to scale up (for example to enable 
> > successful savepoint creation ;) ), the only thing we can do is to restore 
> > from the latest checkpoint. But then we have no way to scale up by 
> > increasing the cluster size, because we can't create a savepoint with a 
> > smaller cluster but on the other hand can't restore a checkpoint to a 
> > bigger cluster, if rescaling from a checkpoint is not supposed to be relied 
> > on. So in this case we're stuck and forced to start from an empty state?
> 
> IMO there is a very good chance that this will simply become a normal feature 
> in the near future.
> 
> Best,
> Stefan
> 
> 



Re: are there any ways to test the performance of rocksdb state backend?

2018-05-18 Thread Stefan Richter
Hi,

Sihua is right, of course we would like to update our RocksDB version but are 
currently blocked on a performance regression. Here is our issue in the RocksDB 
tracker for this: https://github.com/facebook/rocksdb/issues/3865 
 .

Best,
Stefan

> Am 18.05.2018 um 08:52 schrieb sihua zhou :
> 
> 
> Hi makeyang,
> there are some cases under 
> _org.apache.flink.contrib.streaming.state.benchmark.*_ that you can refer to. 
> But, I not sure whether it's possible to upgrade the RocksDB to any higher 
> version because the regression of the merge operator, the comments in this PR 
> https://github.com/apache/flink/pull/5937 
>  may also give you some help.
> 
> Best, Sihua
> 
> On 05/18/2018 11:05,makeyang 
>  wrote: 
> I'd like to integrate newer version of rocksdb with flink. I'd like to know
> if there are existing tools/ways to benchmark the performance of rocksdb
> state backend to see if there are performence improve or drop?
> 
> 
> MaKeyang
> TIG.JD.COM
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Missing MapState when Timer fires after restored state

2018-05-16 Thread Stefan Richter
Hi,

I think that is very good to know and fix. It feels a bit like a not so nice 
API design in RocksDB that iterators are required to check on two methods and 
the documentation of this is also newer than most of our RocksDB code, so an 
update there clearly makes sense.

@Sihua: if you want to fix this problem, can you also search for other usages 
of this `isValid` flag that should be covered. I will review a PR as soon as I 
can.

Best,
Stefan 

> Am 16.05.2018 um 08:40 schrieb sihua zhou <summerle...@163.com>:
> 
> Hi,
> I have a bref loop of the code that related to the restoring of incremental 
> checkpoint, not abvious bug could be found. But there is a suspicious 
> loophole that may lead to data loss, the suspicious code is pasted below.
> 
> 
> while (iterator.isValid()) {
> 
>int keyGroup = 0;
>for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
>   keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
>}
> 
>if (stateBackend.keyGroupRange.contains(keyGroup)) {
>   stateBackend.db.put(targetColumnFamilyHandle,
>  iterator.key(), iterator.value());
>}
> 
>iterator.next();
> }
> 
> we only use the iterator.isValid() to check whether we have reached the end 
> of the iterator, but if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling 
> <https://github.com/facebook/rocksdb/wiki/Iterator#error-handling> we can 
> find that iterator.isValid() is indeed not enough, it may return false may 
> also because the there is a internal error of RocksDB. So, a safer way is to 
> called iterator.status() to check whether everthing is ok. I'm a bit want to 
> fire a PR for this now, because in RocksDBMapState (we guarantee to support) 
> we also use the iterator without checking the  iterator.status().
> @Stefan What do you think?  
> 
> Best, Sihua
> On 05/16/2018 10:22,sihua zhou<summerle...@163.com> 
> <mailto:summerle...@163.com> wrote: 
> Hi Juho,
> if I'm not misunderstand, you saied your're rescaling the job from the 
> checkpoint? If yes, I think that behavior is not guaranteed yet, you can find 
> this on the doc 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints>.
>  So, I not sure whether this is a "bug" at current stage(personally I'd like 
> to dig it out because currently we also use the checkpoint like the way you 
> are) ...
> 
> Best, Sihua
> 
> On 05/16/2018 01:46,Juho Autio<juho.au...@rovio.com> 
> <mailto:juho.au...@rovio.com> wrote: 
> I was able to reproduce this error.
> 
> I just happened to notice an important detail about the original failure:
> - checkpoint was created with a 1-node cluster (parallelism=8)
> - restored on a 2-node cluster (parallelism=16), caused that null exception
> 
> I tried restoring again from the problematic checkpoint again
> - restored on a 1-node cluster, no problems
> - restored on a 2-node cluster, getting the original error!
> 
> So now I have a way to reproduce the bug. To me it seems like the checkpoint 
> itself is fine. The bug seems to be in redistributing the state of a restored 
> checkpoint to a higher parallelism. I only tested each cluster size once (as 
> described above) so it could also be coincidence, but seems at least likely 
> now that it's about the state redistribution.
> 
> I'll try to follow up with those TRACE-level logs tomorrow. Today I tried 
> adding these to the logback.xml, but I didn't get anything else but INFO 
> level logs:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink 
> 1.5-SNAPSHOT and the package has all of these in the conf/ dir:
> 
> log4j-cli.properties
> log4j-console.properties
> log4j.properties
> log4j-yarn-session.properties
> logback-console.xml
> logback.xml
> logback-yarn.xml
> 
> On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
>> Am 15.05.2018 um 10:34 schrieb Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>>:
>> 
>> Ok, that should be possible to provide. Are there any specific packages to 
>> set on trace level? Maybe just go with org.apache.flink.* on TRACE?
> 
> The following packages would be helpful:
> 
> org.apache.flink.contrib.streami

Re: Missing MapState when Timer fires after restored state

2018-05-15 Thread Stefan Richter
Hi,

> Am 15.05.2018 um 10:34 schrieb Juho Autio <juho.au...@rovio.com>:
> 
> Ok, that should be possible to provide. Are there any specific packages to 
> set on trace level? Maybe just go with org.apache.flink.* on TRACE?

The following packages would be helpful:

org.apache.flink.contrib.streaming.state.*
org.apache.flink.runtime.state.*
org.apache.flink.runtime.checkpoint.*
org.apache.flink.streaming.api.operators.*
org.apache.flink.streaming.runtime.tasks.*

> 
> > did the „too many open files“ problem only happen with local recovery 
> > (asking since it should actually not add the the amount of open files)
> 
> I think it happened in various places, maybe not when restoring.. Any way if 
> the situation is like that, the system is pretty much unusable (on OS level), 
> so it shouldn't matter too much which operation of the application it causes 
> to fail? Any way I'll try to grab & share all log lines that say "Too Many 
> Open Files"..
> 
> > and did you deactivate it on the second cluster for the restart or changed 
> > your OS settings?
> 
> No, didn't change anything except for increasing the ulimit on OS to prevent 
> this from happening again. Note that the system only ran out of files after 
> ~11 days of uptime. During that time there had been some local recoveries. 
> This makes me wonder though, could it be that many local recoveries 
> eventually caused this – could it be that in the occasion of local recovery 
> some "old" files are left open, making the system eventually run out of files?


From the way how local recovery works with incremental RocksDB checkpoints, I 
would not assume that it is the cause of the problem. In this particular case, 
the number of opened files on a local FS should not be higher than the number 
without local recovery. Maybe it is just a matter of the OS limit and the 
number of operators with a RocksDB backend running on the machine and the 
amount of files managed by all those RocksDB instances that simply exceed the 
limit. If you have an overview how many parallel operator instances with keyed 
state were running on the machine and assume some reasonable number of files 
per RocksDB instance and the limit configured in your OS, could that be the 
case?

> 
> Thanks!

Thanks for your help!

> 
> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Btw having a trace level log of a restart from a problematic checkpoint could 
> actually be helpful if we cannot find the problem from the previous points. 
> This can give a more detailed view of what checkpoint files are mapped to 
> which operator.
> 
> I am having one more question: did the „too many open files“ problem only 
> happen with local recovery (asking since it should actually not add the the 
> amount of open files), and did you deactivate it on the second cluster for 
> the restart or changed your OS settings?
> 
> 
>> Am 15.05.2018 um 10:09 schrieb Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>>:
>> 
>> What I would like to see from the logs is (also depending a bit on your log 
>> level):
>> 
>> - all exceptions.
>> - in which context exactly the „too many open files“ problem occurred, 
>> because I think for checkpoint consistency it should not matter as a 
>> checkpoint with such a problem should never succeed.
>> - files that are written for checkpoints/savepoints.
>> - completed checkpoints/savepoints ids.
>> - the restored checkpoint/savepoint id.
>> - files that are loaded on restore.
>> 
>>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>>:
>>> 
>>> Thanks all. I'll have to see about sharing the logs & configuration..
>>> 
>>> Is there something special that you'd like to see from the logs? It may be 
>>> easier for me to get specific lines and obfuscate sensitive information 
>>> instead of trying to do that for the full logs.
>>> 
>>> We basically have: RocksDBStateBackend with 
>>> enableIncrementalCheckpointing=true, external state path on s3.
>>> 
>>> The code that we use is:
>>> 
>>> env.setStateBackend(getStateBackend(statePath, new 
>>> RocksDBStateBackend(statePath, true)));
>>> 
>>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
>>>  60 * 1000));
>>> 
>>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurr

Re: Missing MapState when Timer fires after restored state

2018-05-15 Thread Stefan Richter
Btw having a trace level log of a restart from a problematic checkpoint could 
actually be helpful if we cannot find the problem from the previous points. 
This can give a more detailed view of what checkpoint files are mapped to which 
operator.

I am having one more question: did the „too many open files“ problem only 
happen with local recovery (asking since it should actually not add the the 
amount of open files), and did you deactivate it on the second cluster for the 
restart or changed your OS settings?

> Am 15.05.2018 um 10:09 schrieb Stefan Richter <s.rich...@data-artisans.com>:
> 
> What I would like to see from the logs is (also depending a bit on your log 
> level):
> 
> - all exceptions.
> - in which context exactly the „too many open files“ problem occurred, 
> because I think for checkpoint consistency it should not matter as a 
> checkpoint with such a problem should never succeed.
> - files that are written for checkpoints/savepoints.
> - completed checkpoints/savepoints ids.
> - the restored checkpoint/savepoint id.
> - files that are loaded on restore.
> 
>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>>:
>> 
>> Thanks all. I'll have to see about sharing the logs & configuration..
>> 
>> Is there something special that you'd like to see from the logs? It may be 
>> easier for me to get specific lines and obfuscate sensitive information 
>> instead of trying to do that for the full logs.
>> 
>> We basically have: RocksDBStateBackend with 
>> enableIncrementalCheckpointing=true, external state path on s3.
>> 
>> The code that we use is:
>> 
>> env.setStateBackend(getStateBackend(statePath, new 
>> RocksDBStateBackend(statePath, true)));
>> 
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
>>  60 * 1000));
>> 
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent",
>>  1));
>> 
>> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
>>  10 * 60 * 1000));
>> 
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> 
>> The problematic state that we tried to use was a checkpoint created with 
>> this conf.
>> 
>> > Are you using the local recovery feature?
>> 
>> Yes, and in this particular case the job was constantly failing/restarting 
>> because of Too Many Open Files. So we terminated the cluster entirely, 
>> created a new one, and launched a new job by specifying the latest 
>> checkpoint path to restore state from.
>> 
>> This is the only time I have seen this error happen with timer state. I 
>> still have that bad checkpoint data on s3, so I might be able to try to 
>> restore it again if needed to debug it. But that would require some 
>> tweaking, because I don't want to tangle with the same kafka consumer group 
>> offsets or send old data again to production endpoint.
>> 
>> Please keep in mind that there was that Too Many Open Files issue on the 
>> cluster that created the problematic checkpoint, if you think that's 
>> relevant.
>> 
>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter 
>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I agree, this looks like a bug. Can you tell us your exact configuration of 
>> the state backend, e.g. if you are using incremental checkpoints or not. Are 
>> you using the local recovery feature? Are you restarting the job from a 
>> checkpoint or a savepoint? Can you provide logs for both the job that failed 
>> and the restarted job?
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>>:
>>> 
>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
>>> state. After restoring state from a checkpoint, it seems like a timer had 
>>> been restored, but not the data that was expected to be in a related 
>>> MapState if such timer has been added.
>>> 
>>> The way I see this is that there's a bug, either of these:
>>> - The writing of timers & map states to Flink state is not synchronized (or 
>>> maybe there are no such guarantees by design?)
>>> - Flink may restore a checkpoint th

Re: Missing MapState when Timer fires after restored state

2018-05-15 Thread Stefan Richter
What I would like to see from the logs is (also depending a bit on your log 
level):

- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because 
I think for checkpoint consistency it should not matter as a checkpoint with 
such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.

> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com>:
> 
> Thanks all. I'll have to see about sharing the logs & configuration..
> 
> Is there something special that you'd like to see from the logs? It may be 
> easier for me to get specific lines and obfuscate sensitive information 
> instead of trying to do that for the full logs.
> 
> We basically have: RocksDBStateBackend with 
> enableIncrementalCheckpointing=true, external state path on s3.
> 
> The code that we use is:
> 
> env.setStateBackend(getStateBackend(statePath, new 
> RocksDBStateBackend(statePath, true)));
> 
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
>  60 * 1000));
> 
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent",
>  1));
> 
> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
>  10 * 60 * 1000));
> 
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> The problematic state that we tried to use was a checkpoint created with this 
> conf.
> 
> > Are you using the local recovery feature?
> 
> Yes, and in this particular case the job was constantly failing/restarting 
> because of Too Many Open Files. So we terminated the cluster entirely, 
> created a new one, and launched a new job by specifying the latest checkpoint 
> path to restore state from.
> 
> This is the only time I have seen this error happen with timer state. I still 
> have that bad checkpoint data on s3, so I might be able to try to restore it 
> again if needed to debug it. But that would require some tweaking, because I 
> don't want to tangle with the same kafka consumer group offsets or send old 
> data again to production endpoint.
> 
> Please keep in mind that there was that Too Many Open Files issue on the 
> cluster that created the problematic checkpoint, if you think that's relevant.
> 
> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> I agree, this looks like a bug. Can you tell us your exact configuration of 
> the state backend, e.g. if you are using incremental checkpoints or not. Are 
> you using the local recovery feature? Are you restarting the job from a 
> checkpoint or a savepoint? Can you provide logs for both the job that failed 
> and the restarted job?
> 
> Best,
> Stefan
> 
> 
>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>>:
>> 
>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
>> state. After restoring state from a checkpoint, it seems like a timer had 
>> been restored, but not the data that was expected to be in a related 
>> MapState if such timer has been added.
>> 
>> The way I see this is that there's a bug, either of these:
>> - The writing of timers & map states to Flink state is not synchronized (or 
>> maybe there are no such guarantees by design?)
>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>> 
>> Our code (simplified):
>> 
>> private MapState<String, String> mapState;
>> 
>> public void processElement(..) {
>> mapState.put("lastUpdated", ctx.timestamp().toString());
>> ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
>> stateRetentionMillis);
>> }
>> 
>> public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>> long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>> if (timestamp >= lastUpdated + stateRetentionMillis) {
>> mapState.clear();
>> }
>> }
>> 
>> Normally this "just works". As you can see, it shouldn't be possible that 
>> "lastUpdated" doesn't exist in state if timer was registered and onTimer 
>> gets called.
>> 
>> However, after restoring state from a checkpoint, t

Re: Missing MapState when Timer fires after restored state

2018-05-15 Thread Stefan Richter
Hi,

I agree, this looks like a bug. Can you tell us your exact configuration of the 
state backend, e.g. if you are using incremental checkpoints or not. Are you 
using the local recovery feature? Are you restarting the job from a checkpoint 
or a savepoint? Can you provide logs for both the job that failed and the 
restarted job?

Best,
Stefan

> Am 14.05.2018 um 13:00 schrieb Juho Autio :
> 
> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
> state. After restoring state from a checkpoint, it seems like a timer had 
> been restored, but not the data that was expected to be in a related MapState 
> if such timer has been added.
> 
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized (or 
> maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
> 
> Our code (simplified):
> 
> private MapState mapState;
> 
> public void processElement(..) {
> mapState.put("lastUpdated", ctx.timestamp().toString());
> ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
> stateRetentionMillis);
> }
> 
> public void onTimer(long timestamp, OnTimerContext ctx, ..) {
> long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
> if (timestamp >= lastUpdated + stateRetentionMillis) {
> mapState.clear();
> }
> }
> 
> Normally this "just works". As you can see, it shouldn't be possible that 
> "lastUpdated" doesn't exist in state if timer was registered and onTimer gets 
> called.
> 
> However, after restoring state from a checkpoint, the job kept failing with 
> this error:
> 
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
> ..
> 
> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
> 
> The background for restoring state in this case is not entirely clean. There 
> was an OS level issue "Too many open files" after running a job for ~11 days. 
> To fix that, we replaced the cluster with a new one and launched the Flink 
> job again. State was successfully restored from the latest checkpoint that 
> had been created by the "problematic execution". Now, I'm assuming that if 
> the state wouldn't have been created successfully, restoring wouldn't succeed 
> either – correct? This is just to rule out that the issue with state didn't 
> happen because the checkpoint files were somehow corrupted due to the Too 
> many open files problem.
> 
> Thank you all for your continued support!
> 
> P.S. I would be very much interested to hear if there's some cleaner way to 
> achieve this kind of TTL for keyed state in Flink.



  1   2   3   4   >