Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-10-30 Thread Flink Developer
Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version 
1.6.2 fixes this. Has anyone encountered this before?

I would also like to note that my jar includes a core-site.xml file that uses 
*s3a*. Is this the recommended configuration to use with BucketingSink?   
Should the sink be specified using s3a:/// or  
s3:/// ?

- 
- 
- fs.s3.impl
- org.apache.hadoop.fs.s3a.S3AFileSystem
- 
- 
- fs.s3a.buffer.dir
- /tmp
- 
- 
- fs.s3a.access.key
- x
- 
- 
- fs.s3a.secret.key
- x
- 
- 
- fs.s3a.buffer.dir
- /tmp
- 
- 

And my pom.xml uses:

- flink-s3-fs-hadoop
- ...
- flink-statebackend-rocksdb_2.11
- ...
- hadoop-hdfs
- ...
- hadoop-common
- ...
- hadoop-core
- ...
- hadoop-aws
- ...

‐‐‐ Original Message ‐‐‐
On Sunday, October 28, 2018 8:08 AM, Rafi Aroch  wrote:

> Hi,
>
> I'm also experiencing this with Flink 1.5.2. This is probably related to 
> BucketingSink not working properly with S3 as filesystem because of the 
> eventual-consistency of S3.
>
> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of 
> 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not 
> presto).
>
> Does anyone know if this fix would solve this issue?
>
> Thanks,
> Rafi
>
> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer 
>  wrote:
>
>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 
>> 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic 
>> and sinks to S3 in the format of: s3:/. 
>> There's potentially 400 files writing simultaneously.
>>
>> Configuration:
>> - Flink v1.5.2
>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, 
>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause 
>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>> - Batch file size is set to 5mb.
>> - Batch rollover interval is set to 30min
>> - Writer uses GZip compression
>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, 
>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>
>> The app is able to run for hours straight, but occasionally (once or twice a 
>> day), it displays the following exception. When this happens, the app is 
>> able to recover from previous checkpoint, but I am concerned about the 
>> exception:
>>
>> Caused by: java.io.IOException: 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>> , S3 Extended Request ID: xx
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>
>> - at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>
>> - at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>
>> Caused by: 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>> , S3 Extended Request ID: xx
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>
>> - at 
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)
>>
>> - at 
>> 

CsvInputFormat - read header line first

2018-10-30 Thread madan
Hi,

When we are splitting a csv file into multiple parts we are not sure which
part is read first. Is there any way to make sure first part with header is
read first ? I need to read header line first to store column name vs index
and use this index for processing next records.

I could read header line from the file before submitting job to the flink,
but that way we are opening the file 2 times. Is there any better way to do
this? Please suggest.

-- 
Thank you.


Re: How to tune Hadoop version in flink shaded jar to Hadoop version actually used?

2018-10-30 Thread 徐涛
Hi Hequn & Vino,
Finally I rebuild the Flink by change the “hadoop.version” in the pom 
file. 
Because Flink use maven shaded plugin to shade the Hadoop dependency, 
this also means I need to rebuild the hadoop shaded jar each time I upgrade 
Flink version.

Best
Henry

> 在 2018年10月30日,下午12:35,Hequn Cheng  写道:
> 
> Hi Henry,
> 
> You can specify a specific Hadoop version to build against:
> mvn clean install -DskipTests -Dhadoop.version=2.6.1
>  More details here[1].
> 
> Best, Hequn
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#hadoop-versions
>  
> 
> On Tue, Oct 30, 2018 at 10:02 AM vino yang  > wrote:
> Hi Henry,
> 
> You just need to change the node of "hadoop.version" in the parent pom file.
> 
> Thanks, vino.
> 
> 徐涛 mailto:happydexu...@gmail.com>> 于2018年10月29日周一 
> 下午11:23写道:
> Hi Vino,
>   Because I build the project with Maven, maybe I can not use the jars 
> directly download from the web.
>   If built with Maven, how can I adjust the Hadoop version with the 
> Hadoop version really used?
>   Thanks a lot!!
> 
> Best 
> Henry
> 
>> 在 2018年10月26日,上午10:02,vino yang > > 写道:
>> 
>> Hi Henry,
>> 
>> When running flink on YARN, from ClusterEntrypoint the system environment 
>> info is print out.
>> One of the info is "Hadoop version: 2.4.1”, I think it is from the 
>> flink-shaded-hadoop2 jar. But actually the system Hadoop version is 2.7.2.
>> 
>> I want to know is it OK if the version is different? 
>> 
>> > I don't think it is OK, because you will use a lower version of the client 
>> > to access the higher version of the server.
>> 
>> Is it a best practice to adjust flink Hadoop version to the Hadoop version 
>> actually used?
>> 
>> > I personally recommend that you keep the two versions consistent to 
>> > eliminate the possibility of causing various potential problems. 
>> In fact, Flink provides a bundle of Hadoop 2.7.x bundles for you to 
>> download.[1]
>> 
>> [1]: 
>> https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz
>>  
>> 
>> 
>> Thanks, vino.
>> 
>> 徐涛 mailto:happydexu...@gmail.com>> 于2018年10月26日周五 
>> 上午9:13写道:
>> Hi Experts
>> When running flink on YARN, from ClusterEntrypoint the system 
>> environment info is print out.
>> One of the info is "Hadoop version: 2.4.1”, I think it is from the 
>> flink-shaded-hadoop2 jar. But actually the system Hadoop version is 2.7.2.
>> I want to know is it OK if the version is different? Is it a best 
>> practice to adjust flink Hadoop version to the Hadoop version actually used?
>> 
>> Thanks a lot.
>> 
>> Best
>> Henry
> 



Re: Flink CEP Watermark Exception

2018-10-30 Thread Austin Cawley-Edwards
Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.

Thanks,
Austin

On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi there,
>
>
> We have a streaming application that uses CEP processing but are getting this 
> error fairly frequently after a checkpoint fails, though not sure if it is 
> related. We have implemented both  `hashCode` and `equals()` using 
> `Objects.hash(...properties)` and basic equality, respectively. Has anyone 
> seen this before using CEP?
>
>
> Here is  the full exception:
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark:
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Could not find previous entry 
> with key: alertOne, value: 
> {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000}
>  and timestamp: 153970079. This can indicate that either you did not 
> implement the equals() and hashCode() methods of your input elements properly 
> or that the element belonging to that entry has been already pruned.
>   at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
>   at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
>   at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at 
> java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>
>
>
> Best,
>
> Austin
>
>


Flink CEP Watermark Exception

2018-10-30 Thread Austin Cawley-Edwards
Hi there,


We have a streaming application that uses CEP processing but are
getting this error fairly frequently after a checkpoint fails, though
not sure if it is related. We have implemented both  `hashCode` and
`equals()` using `Objects.hash(...properties)` and basic equality,
respectively. Has anyone seen this before using CEP?


Here is  the full exception:


java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Could not find previous
entry with key: alertOne, value:
{"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000}
and timestamp: 153970079. This can indicate that either you did
not implement the equals() and hashCode() methods of your input
elements properly or that the element belonging to that entry has been
already pruned.
at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at 
java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)



Best,

Austin


Table API and AVG on dates

2018-10-30 Thread Flavio Pompermaier
Hi to all,
I'm using Flink 1.6.1 and it seems that average on dates is not
supported..am I right?
Is there any effort in implementing it?

Best,
Flavio


Re: [E] Re: Needed more information about jdbc sink in Flink SQL Client

2018-10-30 Thread Narayanan, Surat
Timo,

It helped, I implemented AppendStream table sink and factory and able to
configure JDBC sink.
Thank you.


Regards,
N.Surat


On Mon, Oct 29, 2018 at 7:47 PM Timo Walther  wrote:

> Hi,
>
> all supported connectors and formats for the SQL Client with YAML can be
> found in the connect section [1]. However, the JDBC sink is not available
> for the SQL Client so far. It still needs to be ported, see [2].
>
> However, if you want to use it. You could implement your own table factory
> that creates an instance of the sink. This is documented here [3].
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html
> 
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#further-tablesources-and-tablesinks
> 
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sourceSinks.html#define-a-tablefactory
> 
>
> Am 29.10.18 um 14:55 schrieb Narayanan, Surat:
>
> Flink team,
>
> I am exploring the Flink SQL client and trying to configure JDBC Sink in
> YAML
> I only find some sample YAML configuration in the documentation.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sqlClient.html
> 
> Where can I find the entire definitions for that YAML configuration?
>
>  tables:
>   - name: SinkTable
> type: sink
> update-mode: append
> connector:
>   type: jdbc
>   driverName: org.apache.derby.jdbc.EmbeddedDriver
>   dbURL: jdbc:oracle:thin:@host:port/serviceName
>   username: dbuser
>   password: 
>
>
>
> Regards,
> N.Surat
>
>
>


[ANNOUNCE] Weekly community update #44

2018-10-30 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #44. Please post any news and
updates you want to share with the community to this thread.

# Flink 1.5.5 and 1.6.2 have been released

The Flink community is proud to announce the two bugfix release Flink 1.5.5
[1] and Flink 1.6.2 [2] have been released.

# Integrate Flink SQL with Hive ecosystem

Xuefu proposed to integrate Flink more tightly with the Hive ecosystem. The
discussion has been concluded with a resulting design document. If you want
to learn more about these efforts please check out the ML thread [3] or the
corresponding JIRA issue [4].

# News on API changes with Scala 2.12

The community is dedicated to add Scala 2.12 support with the next upcoming
major release 1.7. Aljoscha laid out the required changes from an API
perspective when upgrading to Scala 2.12 [5].

# Changing our web technology stack

Fabian kicked off a discussion about Flink's web technology stack [6]. He
noted that the stack is quite dusty and proposed to change technologies to
React or Angular 2-7. If you have an opinion on this topic or want to help
with this effort, then please join the discussion.

# Feature freeze for 1.7

The community works hard towards the Flink 1.7 feature freeze. Currently,
the community tries to finish the last set of features (Scala 2.12 support,
state evolution and fixing local recovery). It looks promising to finish
these threads by middle of this week.

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ANNOUNCE-Apache-Flink-1-5-5-released-tp24158.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ANNOUNCE-Apache-Flink-1-6-2-released-tp24159.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Integrate-Flink-SQL-well-with-Hive-ecosystem-tp23721.html
[4] https://issues.apache.org/jira/browse/FLINK-10556
[5]
https://lists.apache.org/thread.html/75d12228e4a1125f17cd31ff5e7e9b95509978613f2a88ebad55e042@%3Cdev.flink.apache.org%3E
[6]
https://lists.apache.org/thread.html/68416f3dc8c18433f427bb1f1b1a3c482aebbcdeee0f4e8a2540d147@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: Job fails to restore from checkpoint in Kubernetes with FileNotFoundException

2018-10-30 Thread Till Rohrmann
As Vino pointed out, you need to configure a checkpoint directory which is
accessible from all TMs. Otherwise you won't be able to recover the state
if the task gets scheduled to a different TaskManager. Usually, people use
HDFS or S3 for that.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:50 AM vino yang  wrote:

> Hi John,
>
> Is the file system configured by RocksDBStateBackend HDFS?[1]
>
> Thanks, vino.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html#the-rocksdbstatebackend
>
> John Stone  于2018年10月30日周二 上午2:54写道:
>
>> I am testing Flink in a Kubernetes cluster and am finding that a job gets
>> caught in a recovery loop.  Logs show that the issue is that a checkpoint
>> cannot be found although checkpoints are being taken per the Flink web UI.
>> Any advice on how to resolve this is most appreciated.
>>
>> Note on below: I can easily replicate this with a single TaskManager (>1
>> slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a
>> job parallelism of 8.
>>
>> Setup:
>> - Flink 1.6.0
>> - Kubernetes cluster.
>> - 1 JobManager node, 2 TaskManager nodes.
>> - RocksDB backend with incremental checkpointing.
>> - There is not a persistent volume mounted on any of the three nodes.  In
>> production, we would obviously need a persistent volume on the JobManager.
>> - Job submitted and running such that the job is parallelized over both
>> nodes (i.e. each TM has 4 task slots; job parallelism = 5).
>>
>> Test:
>> - Let the job collect a few checkpoints, say, 9 checkpoints.
>> - Kill one of the two TMs (kubectl delete pods ).
>> - New TM pod starts.
>>
>> Result:
>> - After the new TM starts, the job will cycle through FAILING -> RUNNING
>> -> FAILING -> RUNNING ->...
>>
>> Relevant Information
>> Contents of JobManager's pod:
>> user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh
>> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28
>> chk-9  shared  taskowned
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9
>> _metadata
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>>
>> flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> # ls -al
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> -rw-r--r-- 1 flink flink 23665 Oct 29 18:07
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned
>> # ls
>> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared
>>
>> Stacktrace
>> A similar message is repeated over and over in the logs:
>> - "Could not restore operator state backend for
>> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)"
>> - "Could not restore operator state backend for
>> WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)"
>> - "Could not restore operator state backend for
>> StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)"
>>
>> 2018-10-29 18:12:48,965 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING.
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for
>> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from
>> any of the 1 provided restore options.
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>> ... 5 more
>> Caused by: java.io.FileNotFoundException:
>> /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b
>> (No such file or directory)
>> at java.io.FileInputStream.open0(Native Method)
>> at 

flink-1.6.2 in standalone-job mode | Cluster initialization failed.

2018-10-30 Thread zavalit
Hi,
just tried to launch flink app in flink-1.6.2 and get

2018-10-30 11:07:19,961 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Cluster
initialization failed.
java.lang.AbstractMethodError:
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createResourceManager(Lorg/apache/flink/configuration/Configuration;Lorg/apache/flink/runtime/clusterframework/types/ResourceID;Lorg/apache/flink/runtime/rpc/RpcService;Lorg/apache/flink/runtime/highavailability/HighAvailabilityServices;Lorg/apache/flink/runtime/heartbeat/HeartbeatServices;Lorg/apache/flink/runtime/metrics/MetricRegistry;Lorg/apache/flink/runtime/rpc/FatalErrorHandler;Lorg/apache/flink/runtime/entrypoint/ClusterInformation;Ljava/lang/String;)Lorg/apache/flink/runtime/resourcemanager/ResourceManager;
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:338)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:232)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:190)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:189)
at
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:176)

complete log is here:
https://gist.github.com/zavalit/4dba49cdea45c6f56f947a7dcec1a666

job manager is started with:
./bin/standalone-job.sh start-foreground --configDir conf --job-classname
MyEntryClass

the same app runs as it is in flink-1.6.1, the only thing that have changed
is a flink version

thx in advance, for any insides 





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Savepoint failed with error "Checkpoint expired before completing"

2018-10-30 Thread Gagan Agrawal
Hi,
We have a flink job (flink version 1.6.1) which unions 2 streams to pass
through custom KeyedProcessFunction with RocksDB state store which final
creates another stream into Kafka. Current size of checkpoint is around
~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
running this job on yarn with following parameters

-yn 10  (10 task managers)
-ytm 2048 (2 GB each)
- Operator parallelism is also 10.

While trying to run savepoint on this job, it runs for ~10mins and then
throws following error. Looks like checkpoint default timeout of 10mins is
causing this. What is recommended way to run savepoint for such job? Should
we increase checkpoint default timeout of 10mins? Also currently our state
size is 100GB but it is expected to grow unto 1TB. Is flink good for
usecases with that much of size? Also how much time savepoint is expected
to take with such state size and parallelism on Yarn? Any other
recommendation would be of great help.

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
434398968e635a49329f59a019b41b6f failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
expired before completing
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint expired before completing
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)


Re: Unbalanced Kafka consumer consumption

2018-10-30 Thread Gerard Garcia
I think my problem is not the same, yours is that you want to consume from
partitions with more data faster to avoid consuming first the one with less
elements which could advance the event time too fast. Mine is that Kafka
only consumes from some partitions even if it seems that it has resources
to read and process from all of them at the same time.

Gerard

On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy  wrote:

> Hi,
>
>If I understand your problem correctly, there is a similar JIRA
> issue FLINK-10348, reported by me. Maybe you can take a look at it.
>
>
> Jiayi Liao,Best
>
>  Original Message
> *Sender:* Gerard Garcia
> *Recipient:* fearsome.lucidity
> *Cc:* user
> *Date:* Monday, Oct 29, 2018 17:50
> *Subject:* Re: Unbalanced Kafka consumer consumption
>
> The stream is partitioned by key after ingestion at the finest granularity
> that we can (which is finer than how stream is partitioned when produced to
> kafka). It is not perfectly balanced but still is not so unbalanced to show
> this behavior (more balanced than what the lag images show).
>
> Anyway, let's assume that the problem is that the stream is so unbalanced
> that one operator subtask can't handle the ingestion rate. It is expected
> then that all the others operators reduce its ingestion rate even if they
> have resources to spare? The task is configured with processing time and
> there are no windows. If that is the case, is there a way to let operator
> subtasks process freely even if one of them is causing back pressure
> upstream?
>
> The attached images shows how Kafka lag increases while the throughput is
> stable until some operator subtasks finish.
>
> Thanks,
>
> Gerard
>
> On Fri, Oct 26, 2018 at 8:09 PM Elias Levy 
> wrote:
>
>> You can always shuffle the stream generated by the Kafka source
>> (dataStream.shuffle()) to evenly distribute records downstream.
>>
>> On Fri, Oct 26, 2018 at 2:08 AM gerardg  wrote:
>>
>>> Hi,
>>>
>>> We are experience issues scaling our Flink application and we have
>>> observed
>>> that it may be because Kafka messages consumption is not balanced across
>>> partitions. The attached image (lag per partition) shows how only one
>>> partition consumes messages (the blue one in the back) and it wasn't
>>> until
>>> it finished that the other ones started to consume at a good rate
>>> (actually
>>> the total throughput multiplied by 4 when these started) . Also, when
>>> that
>>> ones started to consume, one partition just stopped an accumulated
>>> messages
>>> back again until they finished.
>>>
>>> We don't see any resource (CPU, network, disk..) struggling in our
>>> cluster
>>> so we are not sure what could be causing this behavior. I can only assume
>>> that somehow Flink or the Kafka consumer is artificially slowing down the
>>> other partitions. Maybe due to how back pressure is handled?
>>>
>>> <
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>
>>>
>>>
>>> Gerard
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Job fails to restore from checkpoint in Kubernetes with FileNotFoundException

2018-10-30 Thread vino yang
Hi John,

Is the file system configured by RocksDBStateBackend HDFS?[1]

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html#the-rocksdbstatebackend

John Stone  于2018年10月30日周二 上午2:54写道:

> I am testing Flink in a Kubernetes cluster and am finding that a job gets
> caught in a recovery loop.  Logs show that the issue is that a checkpoint
> cannot be found although checkpoints are being taken per the Flink web UI.
> Any advice on how to resolve this is most appreciated.
>
> Note on below: I can easily replicate this with a single TaskManager (>1
> slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a
> job parallelism of 8.
>
> Setup:
> - Flink 1.6.0
> - Kubernetes cluster.
> - 1 JobManager node, 2 TaskManager nodes.
> - RocksDB backend with incremental checkpointing.
> - There is not a persistent volume mounted on any of the three nodes.  In
> production, we would obviously need a persistent volume on the JobManager.
> - Job submitted and running such that the job is parallelized over both
> nodes (i.e. each TM has 4 task slots; job parallelism = 5).
>
> Test:
> - Let the job collect a few checkpoints, say, 9 checkpoints.
> - Kill one of the two TMs (kubectl delete pods ).
> - New TM pod starts.
>
> Result:
> - After the new TM starts, the job will cycle through FAILING -> RUNNING
> -> FAILING -> RUNNING ->...
>
> Relevant Information
> Contents of JobManager's pod:
> user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh
> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28
> chk-9  shared  taskowned
> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9
> _metadata
> # ls
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
>
> flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
> # ls -al
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
> -rw-r--r-- 1 flink flink 23665 Oct 29 18:07
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
> # ls
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned
> # ls
> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared
>
> Stacktrace
> A similar message is repeated over and over in the logs:
> - "Could not restore operator state backend for
> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)"
> - "Could not restore operator state backend for
> WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)"
> - "Could not restore operator state backend for
> StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)"
>
> 2018-10-29 18:12:48,965 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore
> operator state backend for
> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from
> any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
> ... 5 more
> Caused by: java.io.FileNotFoundException:
> /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
> at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
> at
> 

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-30 Thread bupt_ljy
Hi, Jayant
  Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo 
on UUID class, it all looks okay.
  I’m not very sure about this problem. Maybe you can write a very simple demo 
to try if it works.


Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Monday, Oct 29, 2018 11:53
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jiayi,
Any further help on this?


Jayant Ameta





On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta wittyam...@gmail.com wrote:

MapStateDescriptorUUID, String descriptor = new 
MapStateDescriptor("rulePatterns", UUID.class,
 String.class);
Jayant Ameta





On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy bupt_...@163.com wrote:

Hi,
 Can you show us the descriptor in the codes below?
  client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);


Jiayi Liao, Best




Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Friday, Oct 26, 2018 02:26
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Also, I haven't provided any custom serializer in my flink job. Shouldn't the 
same configuration work for queryable state client?


Jayant Ameta





On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta wittyam...@gmail.com wrote:

Hi Gordon,
Following is the stack trace that I'm getting:


Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Error while processing request with ID 
0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


I am not using any custom serialize as mentioned by Jiayi.


Jayant Ameta





On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy bupt_...@163.com wrote:

Hi Jayant,
 There should be a Serializer parameter in the constructor of the 
StateDescriptor, you should create a new serializer like this:


 new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best




Original Message
Sender:Tzu-Li (Gordon) taitzuli...@apache.org
Recipient:Jayant ametawittyam...@gmail.com; bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 17:18
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jayant,


What is the Kryo exception message that you are getting?


Cheers,
Gordon




On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?


Jayant Ameta





On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote:

Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?





Re: Unbalanced Kafka consumer consumption

2018-10-30 Thread bupt_ljy
Hi,
 If I understand your problem correctly, there is a similar JIRA 
issueFLINK-10348, reported by me. Maybe you can take a look at it.


Jiayi Liao,Best


Original Message
Sender:Gerard garciager...@talaia.io
Recipient:fearsome.lucidityfearsome.lucid...@gmail.com
Cc:useru...@flink.apache.org
Date:Monday, Oct 29, 2018 17:50
Subject:Re: Unbalanced Kafka consumer consumption


The stream is partitioned by key after ingestion at the finest granularity that 
we can (which is finer than how stream is partitioned when produced to kafka). 
It is not perfectly balanced but still is not so unbalanced to show this 
behavior (more balanced than what the lag images show).


Anyway, let's assume that the problem is that the stream is so unbalanced that 
one operator subtask can't handle the ingestion rate. It is expected then that 
all the others operators reduce its ingestion rate even if they have resources 
to spare? The task is configured with processing time and there are no windows. 
If that is the case, is there a way to let operator subtasks process freely 
even if one of them is causing back pressure upstream?


The attached images shows how Kafka lag increases while thethroughput is stable 
until some operator subtasks finish.


Thanks,


Gerard


On Fri, Oct 26, 2018 at 8:09 PM Elias Levy fearsome.lucid...@gmail.com wrote:

You can always shuffle the stream generated by the Kafka source 
(dataStream.shuffle())to evenly distribute records downstream.


On Fri, Oct 26, 2018 at 2:08 AM gerardg ger...@talaia.io wrote:

Hi,
 
 We are experience issues scaling our Flink application and we have observed
 that it may be because Kafka messages consumption is not balanced across
 partitions. The attached image (lag per partition) shows how only one
 partition consumes messages (the blue one in the back) and it wasn't until
 it finished that the other ones started to consume at a good rate (actually
 the total throughput multiplied by 4 when these started) . Also, when that
 ones started to consume, one partition just stopped an accumulated messages
 back again until they finished.
 
 We don't see any resource (CPU, network, disk..) struggling in our cluster
 so we are not sure what could be causing this behavior. I can only assume
 that somehow Flink or the Kafka consumer is artificially slowing down the
 other partitions. Maybe due to how back pressure is handled?
 
 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png
 
 
 Gerard
 
 
 
 
 
 --
 Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: FlinkCEP, circular references and checkpointing failures

2018-10-30 Thread Dawid Wysakowicz
This is some problem with serializing your events using Kryo. I'm adding
Gordon to cc, as he was recently working with serializers. He might give
you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag
> 1.6.1, the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes
> NPE) *only when checkpointing (HDFS backend) is enabled*, with the
> below stack trace.
>
> I did see a similar problem with different operators here
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO 
> org.apache.flink.runtime.taskmanager.Task -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in
> filter function.
>     at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>     at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>     at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>     at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>     at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>     at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>     at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>     at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
>     at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>     at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>     at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>     at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>     at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>     at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>     at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>     at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>     at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>     ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>     at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>     at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>     at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>     at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>     at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>     at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>     at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>     at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>     at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>     at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>     at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>     ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain
> mailto:shailesh.j...@stellapps.com>> wrote:
>
> Hi Dawid,
>
> Thanks for your time on this. The diff 

Re: FlinkCEP, circular references and checkpointing failures

2018-10-30 Thread Shailesh Jain
Bump.

On Thu, Oct 25, 2018 at 9:11 AM Shailesh Jain 
wrote:

> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
> the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) 
> *only
> when checkpointing (HDFS backend) is enabled*, with the below stack trace.
>
> I did see a similar problem with different operators here (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO
> org.apache.flink.runtime.taskmanager.Task -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
> function.
> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
> at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
> at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
> at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
> at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
> ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
> at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
> at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
> at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
> at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
> at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
> ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
> shailesh.j...@stellapps.com> wrote:
>
>> Hi Dawid,
>>
>> Thanks for your time on this. The diff should have pointed out only the
>> top 3 commits, but since it did not, it is possible I did not rebase my
>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>> I hit