Re: flink-1.6.1 :: job deployment :: detached mode

2018-11-07 Thread Mikhail Pryakhin
Hi Till,Thank you for your reply.Yes, I’ve upgraded to the latest Flink-1.6.2 and the problem is still there, please find the log file attached. 

flink-job-1.6.2.log
Description: Binary data

Kind Regards,Mike Pryakhin

On 7 Nov 2018, at 18:46, Till Rohrmann  wrote:Hi Mike,have you tried whether the problem also occurs with Flink 1.6.2? If yes, then please share with us the Flink logs with DEBUG log level to further debug the problem.Cheers,TillOn Fri, Oct 26, 2018 at 5:46 PM Mikhail Pryakhin  wrote:Hi community!Righ after I've upgraded flink up to flink-1.6.1 I get an exception during job deployment as a YARN cluster. The job is submitted with zookeper HA enabled, in detached mode.The flink yaml contains the following properties:high-availability: zookeeperhigh-availability.zookeeper.quorum: high-availability.zookeeper.storageDir: hdfs:///high-availability.zookeeper.path.root: high-availability.zookeeper.path.namespace: the job is deployed via flink CLI command like the following:"${FLINK_HOME}/bin/flink" run \	-m yarn-cluster \    -ynm "${JOB_NAME}-${JOB_VERSION}" \    -yn "${tm_containers}" \    -ys "${tm_slots}" \    -ytm "${tm_memory}" \    -yjm "${jm_memory}" \    -p "${parallelism}" \    -yqu "${queue}" \    -yt "${YARN_APP_PATH}" \    -c "${MAIN_CLASS}" \    -yst \    -yd \    ${class_path} \    "${YARN_APP_PATH}"/"${APP_JAR}"After the job has been successfully deplyed, I've got an exception:2018-10-26 18:29:17,781 | ERROR | Curator-Framework-0 | org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl | Background exception was not retry-able or retry gave upjava.lang.InterruptedException	at java.lang.Object.wait(Native Method)	at java.lang.Object.wait(Object.java:502)	at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1406)	at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1097)	at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1130)	at org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java:274)	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CreateBuilderImpl$7.performBackgroundOperation(CreateBuilderImpl.java:561)	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.OperationAndData.callPerformBackgroundOperation(OperationAndData.java:72)	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:831)	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)	at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)	at java.lang.Thread.run(Thread.java:745)If the job is deployed in "attached mode" everything goes fine.
Kind Regards,Mike Pryakhin





smime.p7s
Description: S/MIME cryptographic signature


Implementation error: Unhandled exception - "Implementation error: Unhandled exception."

2018-11-07 Thread Richard Deurwaarder
Hello,

We have a flink job / cluster running in kubernetes. Flink 1.6.2 (but the
same happens in 1.6.0 and 1.6.1) To upgrade our job we use the REST API.

Every so often the jobmanager seems to be stuck in a crashing state and the
logs show me this stack trace:

2018-11-07 18:43:05,815 [flink-scheduler-1] ERROR
org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler -
Implementation error: Unhandled exception.
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#1016927511]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.Implementation error: Unhandled
exception.".
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)

If I restart the jobmanager everything is fine afterwards, but the
jobmanager will not restart by itself.

What might've caused this and is this something we can prevent?

Richard


RE: Kubernetes Job Cluster - Checkpointing with Parallelism > 1

2018-11-07 Thread Thad Truman
Upgrading to Flink 1.6.2 from 1.6.0 appears to fix this, after making sure each 
job is writing checkpoints to a unique directory since the jobid's all match.

Thad Truman | Software Engineer | Neovest, Inc.
A:
T:
E:

1145 S 800 E, Ste 310 Orem, UT 84097
+1 801 900 2480
ttru...@neovest.com


Support Desk: supp...@neovest.com / +1 800 433 4276



[Alt logo for white backgrounds (Grey Flat)2]

This email is confidential and subject to important disclaimers and conditions 
including on offers for purchase or sale of securities accuracy and 
completeness of information viruses confidentiality legal privilege and legal 
entity disclaimers available at 
www.neovest.com/disclosures.html




From: Thad Truman
Sent: Tuesday, November 6, 2018 9:38 AM
To: user@flink.apache.org
Subject: Kubernetes Job Cluster - Checkpointing with Parallelism > 1

Hi all,

We are trying to configure checkpointing (RocksDb) for flink job clusters in 
k8s.  As described 
here
 we have a parallelism value that is used as the -Dparallelism.default arg in 
the job manager 
template
 as well as the replicas value in the task manager 
template.
  For jobs where the parallelism value is set to 1 checkpointing works great.  
But when we set the parallelism value to anything > 1 we get the below error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms. Slots 
required: 4, slots allocated: 1
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)


Any ideas on how we can remediate this?

Thanks,

Thad Truman | Software Engineer | Neovest, Inc.
A:
T:
E:

1145 S 800 E, Ste 310 Orem, UT 84097
+1 801 900 2480
ttru...@neovest.com


Support Desk: 

Re: Error after upgrading to Flink 1.6.2

2018-11-07 Thread Flavio Pompermaier
Hi Till,
we are not using HBase at the moment. We managed to run successfully the
job but it was a pain to find the right combination of dependencies,
library shading and the right HADOOP_CLASSPATH.
The problem was the combination of parquet, jaxrs, hadoop and jackson.
Moreover we had to run the cluster with parent-first class loading in order
to make it run.

However we still have the big problem of being able to submit jobs via rest
API (as I wrote in another thread it seems that there's no way to execute
any code after env.execute if using REST APIs).

Best,
Flavio

On Wed, Nov 7, 2018 at 6:15 PM Till Rohrmann  wrote:

> Hi Flavio,
>
> I haven't seen this problem before. Are you using Flink's HBase connector?
> According to similar problems with Spark one needs to make sure that the
> hbase jars are on the classpath [1, 2]. If not, then it might be a problem
> with the MR1 version 2.6.0-mr1-cdh5.11.2 which caused problems for CDH 5.2
> [2]. It could also be worthwhile to try it out with the latest CDH version.
>
> [1]
> https://stackoverflow.com/questions/34901331/spark-hbase-error-java-lang-illegalstateexception-unread-block-data
> [2]
> https://mapr.com/community/s/question/0D50L6BIthGSAT/javalangillegalstateexception-unread-block-data-when-running-spark-with-yarn
> [3]
> https://issues.apache.org/jira/browse/SPARK-1867?focusedCommentId=14322647=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14322647
>
> Cheers,
> Till
>
> On Wed, Nov 7, 2018 at 12:05 PM Flavio Pompermaier 
> wrote:
>
>> I forgot to mention that I'm using Flink 1.6.2 compiled for cloudera CDH
>> 5.11.2:
>>
>> /opt/shared/devel/apache-maven-3.3.9/bin/mvn clean install
>> -Dhadoop.version=2.6.0-cdh5.11.2 -Dhbase.version=1.2.0-cdh5.11.2
>> -Dhadoop.core.version=2.6.0-mr1-cdh5.11.2 -DskipTests -Pvendor-repos
>>
>> On Wed, Nov 7, 2018 at 11:48 AM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> we tried to upgrade our jobs to Flink 1.6.2 but now we get the following
>>> error (we saw a similar issue with spark that was caused by different java
>>> version on the cluster servers so we checked them and they are all to the
>>> same version - oracle-8-191):
>>>
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
>>> initialize task 'DataSink (Parquet write: 
>>> hdfs:/rivela/1/1/0_staging/parquet)': Deserializing the OutputFormat 
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8) 
>>> failed: unread block data
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>>> ... 10 more
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat 
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8) 
>>> failed: unread block data
>>> at 
>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>> ... 15 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at 
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>>> at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> at 
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
>>> at 
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
>>> at 
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
>>> at 
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
>>> at 
>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>> at 
>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>> ... 16 more
>>>
>>>
>>> Has anyone faced this problem before? How can we try to solve it?
>>> 

Re: FlinkCEP, circular references and checkpointing failures

2018-11-07 Thread Till Rohrmann
Really good finding Stefan!

On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter 
wrote:

> Hi,
>
> I think I can already spot the problem: LockableTypeSerializer.duplicate()
> is not properly implemented because it also has to call duplicate() on the
> element serialiser that is passed into the constructor of the new instance.
> I will open an issue and fix the problem.
>
> Best,
> Stefan
>
> On 7. Nov 2018, at 17:17, Till Rohrmann  wrote:
>
> Hi Shailesh,
>
> could you maybe provide us with an example program which is able to
> reproduce this problem? This would help the community to better debug the
> problem. It looks not right and might point towards a bug in Flink. Thanks
> a lot!
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz 
> wrote:
>
>> This is some problem with serializing your events using Kryo. I'm adding
>> Gordon to cc, as he was recently working with serializers. He might give
>> you more insights what is going wrong.
>>
>> Best,
>>
>> Dawid
>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>
>> Hi Dawid,
>>
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>> 1.6.1, the only commit on top of 1.6 is this:
>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>
>> I ran two separate identical jobs (with and without checkpointing
>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) 
>> *only
>> when checkpointing (HDFS backend) is enabled*, with the below stack
>> trace.
>>
>> I did see a similar problem with different operators here (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>> Is this a known issue which is getting addressed?
>>
>> Any ideas on what could be causing this?
>>
>> Thanks,
>> Shailesh
>>
>>
>> 2018-10-24 17:04:13,365 INFO
>> org.apache.flink.runtime.taskmanager.Task -
>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>> function.
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>> at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>> at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>> java.lang.ArrayIndexOutOfBoundsException: -1
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>> at
>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>> at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>> at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>> at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>> ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>> at
>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>> at
>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>> at
>> 

Re: Error after upgrading to Flink 1.6.2

2018-11-07 Thread Till Rohrmann
Hi Flavio,

I haven't seen this problem before. Are you using Flink's HBase connector?
According to similar problems with Spark one needs to make sure that the
hbase jars are on the classpath [1, 2]. If not, then it might be a problem
with the MR1 version 2.6.0-mr1-cdh5.11.2 which caused problems for CDH 5.2
[2]. It could also be worthwhile to try it out with the latest CDH version.

[1]
https://stackoverflow.com/questions/34901331/spark-hbase-error-java-lang-illegalstateexception-unread-block-data
[2]
https://mapr.com/community/s/question/0D50L6BIthGSAT/javalangillegalstateexception-unread-block-data-when-running-spark-with-yarn
[3]
https://issues.apache.org/jira/browse/SPARK-1867?focusedCommentId=14322647=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14322647

Cheers,
Till

On Wed, Nov 7, 2018 at 12:05 PM Flavio Pompermaier 
wrote:

> I forgot to mention that I'm using Flink 1.6.2 compiled for cloudera CDH
> 5.11.2:
>
> /opt/shared/devel/apache-maven-3.3.9/bin/mvn clean install
> -Dhadoop.version=2.6.0-cdh5.11.2 -Dhbase.version=1.2.0-cdh5.11.2
> -Dhadoop.core.version=2.6.0-mr1-cdh5.11.2 -DskipTests -Pvendor-repos
>
> On Wed, Nov 7, 2018 at 11:48 AM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> we tried to upgrade our jobs to Flink 1.6.2 but now we get the following
>> error (we saw a similar issue with spark that was caused by different java
>> version on the cluster servers so we checked them and they are all to the
>> same version - oracle-8-191):
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
>> initialize task 'DataSink (Parquet write: 
>> hdfs:/rivela/1/1/0_staging/parquet)': Deserializing the OutputFormat 
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8) 
>> failed: unread block data
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>>  ... 10 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat 
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8) 
>> failed: unread block data
>>  at 
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>  at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>  ... 15 more
>> Caused by: java.lang.IllegalStateException: unread block data
>>  at 
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
>>  at 
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
>>  at 
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>  at 
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>  ... 16 more
>>
>>
>> Has anyone faced this problem before? How can we try to solve it?
>> Best,Flavio
>>
>
>


Re: flink-1.6.2 in standalone-job mode | Cluster initialization failed.

2018-11-07 Thread Till Rohrmann
Hi Zavalit,

the AbstractMethodError indicates that there must be some kind of version
conflict. From Flink 1.6.1 to 1.6.2 we modified the signature of
`ClusterEntrypoint#createResourceManager` which causes the problem if you
mix up versions. Could you check that you don't mix Flink 1.6.1 and 1.6.2
classes. Please also make sure that you don't bundle Flink runtime classes
in your job jar. If you do this, then please recompile the job with the
Flink version or remove these classes.

Cheers,
Till

On Tue, Oct 30, 2018 at 12:21 PM zavalit  wrote:

> Hi,
> just tried to launch flink app in flink-1.6.2 and get
>
> 2018-10-30 11:07:19,961 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Cluster
> initialization failed.
> java.lang.AbstractMethodError:
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createResourceManager(Lorg/apache/flink/configuration/Configuration;Lorg/apache/flink/runtime/clusterframework/types/ResourceID;Lorg/apache/flink/runtime/rpc/RpcService;Lorg/apache/flink/runtime/highavailability/HighAvailabilityServices;Lorg/apache/flink/runtime/heartbeat/HeartbeatServices;Lorg/apache/flink/runtime/metrics/MetricRegistry;Lorg/apache/flink/runtime/rpc/FatalErrorHandler;Lorg/apache/flink/runtime/entrypoint/ClusterInformation;Ljava/lang/String;)Lorg/apache/flink/runtime/resourcemanager/ResourceManager;
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:338)
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:232)
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:190)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:189)
> at
>
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:176)
>
> complete log is here:
> https://gist.github.com/zavalit/4dba49cdea45c6f56f947a7dcec1a666
>
> job manager is started with:
> ./bin/standalone-job.sh start-foreground --configDir conf --job-classname
> MyEntryClass
>
> the same app runs as it is in flink-1.6.1, the only thing that have changed
> is a flink version
>
> thx in advance, for any insides
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: InterruptedException when async function is cancelled

2018-11-07 Thread Anil
Hi Till,
Thanks for the reply. Is there any particular patch I can use as
upgrading to Flink 1.6 is not an option for me at the moment.
Regards,
Anil.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unbalanced Kafka consumer consumption

2018-11-07 Thread Till Rohrmann
Hi Gerard,

the behaviour you are describing sounds odd to me. I have a couple of
questions:

1. Which Flink and Kafka version are you using?
2. How many partitions do you have? --> Try to set the parallelism of your
job to the number of partitions. That way, you will have one partition per
source task.
3. How are the source operators distributed? Are they running on different
nodes?
4. What do you mean with "until it (the blue one) was finished consuming
the partition"? I assume that you don't ingest into the Kafka topic live
but want to read persisted data.
5. Are you using Flink's metrics to monitor the different source tasks?
Check what the source operator's output rate is (should be visible from the
web UI).

Cheers,
Till

On Tue, Oct 30, 2018 at 10:27 AM Gerard Garcia  wrote:

> I think my problem is not the same, yours is that you want to consume from
> partitions with more data faster to avoid consuming first the one with less
> elements which could advance the event time too fast. Mine is that Kafka
> only consumes from some partitions even if it seems that it has resources
> to read and process from all of them at the same time.
>
> Gerard
>
> On Tue, Oct 30, 2018 at 9:36 AM bupt_ljy  wrote:
>
>> Hi,
>>
>>If I understand your problem correctly, there is a similar JIRA
>> issue FLINK-10348, reported by me. Maybe you can take a look at it.
>>
>>
>> Jiayi Liao,Best
>>
>>  Original Message
>> *Sender:* Gerard Garcia
>> *Recipient:* fearsome.lucidity
>> *Cc:* user
>> *Date:* Monday, Oct 29, 2018 17:50
>> *Subject:* Re: Unbalanced Kafka consumer consumption
>>
>> The stream is partitioned by key after ingestion at the finest
>> granularity that we can (which is finer than how stream is partitioned when
>> produced to kafka). It is not perfectly balanced but still is not so
>> unbalanced to show this behavior (more balanced than what the lag images
>> show).
>>
>> Anyway, let's assume that the problem is that the stream is so unbalanced
>> that one operator subtask can't handle the ingestion rate. It is expected
>> then that all the others operators reduce its ingestion rate even if they
>> have resources to spare? The task is configured with processing time and
>> there are no windows. If that is the case, is there a way to let operator
>> subtasks process freely even if one of them is causing back pressure
>> upstream?
>>
>> The attached images shows how Kafka lag increases while the throughput is
>> stable until some operator subtasks finish.
>>
>> Thanks,
>>
>> Gerard
>>
>> On Fri, Oct 26, 2018 at 8:09 PM Elias Levy 
>> wrote:
>>
>>> You can always shuffle the stream generated by the Kafka source
>>> (dataStream.shuffle()) to evenly distribute records downstream.
>>>
>>> On Fri, Oct 26, 2018 at 2:08 AM gerardg  wrote:
>>>
 Hi,

 We are experience issues scaling our Flink application and we have
 observed
 that it may be because Kafka messages consumption is not balanced across
 partitions. The attached image (lag per partition) shows how only one
 partition consumes messages (the blue one in the back) and it wasn't
 until
 it finished that the other ones started to consume at a good rate
 (actually
 the total throughput multiplied by 4 when these started) . Also, when
 that
 ones started to consume, one partition just stopped an accumulated
 messages
 back again until they finished.

 We don't see any resource (CPU, network, disk..) struggling in our
 cluster
 so we are not sure what could be causing this behavior. I can only
 assume
 that somehow Flink or the Kafka consumer is artificially slowing down
 the
 other partitions. Maybe due to how back pressure is handled?

 <
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1007/consumer_max_lag.png>


 Gerard





 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


flink run from savepoint

2018-11-07 Thread Cussac, Franck
Hi,

I'm working with Flink 1.5.0 and I try to run a job from a savepoint. My 
jobmanager is dockerized and I try to run my flink job in another container.
The command :
flink run -m jobmanager:8081 myJar.jar
works fine, but when I try to run a job from a savepoint, I got  an Internal 
server error.

Here my command to run flink job and the stacktrace :

flink run -m jobmanager:8081 -s file:/tmp/test/savepoint/ myJar.jar
Starting execution of program


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result.
   at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
   at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
   at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
   at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
   at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
   at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
   at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
   at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
   at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
   at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
   at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
   at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
   at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357)
   at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
   at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
   at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
   at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
   at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
   at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Exception is not retryable.
   at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
   at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
   at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
   at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
   ... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Exception is not retryable.
   ... 10 more
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 

Re: FlinkCEP, circular references and checkpointing failures

2018-11-07 Thread Stefan Richter
Hi,

I think I can already spot the problem: LockableTypeSerializer.duplicate() is 
not properly implemented because it also has to call duplicate() on the element 
serialiser that is passed into the constructor of the new instance. I will open 
an issue and fix the problem.

Best,
Stefan

> On 7. Nov 2018, at 17:17, Till Rohrmann  wrote:
> 
> Hi Shailesh,
> 
> could you maybe provide us with an example program which is able to reproduce 
> this problem? This would help the community to better debug the problem. It 
> looks not right and might point towards a bug in Flink. Thanks a lot!
> 
> Cheers,
> Till
> 
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz  > wrote:
> This is some problem with serializing your events using Kryo. I'm adding 
> Gordon to cc, as he was recently working with serializers. He might give you 
> more insights what is going wrong.
> 
> Best,
> 
> Dawid
> 
> On 25/10/2018 05:41, Shailesh Jain wrote:
>> Hi Dawid,
>> 
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, 
>> the only commit on top of 1.6 is this: 
>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>  
>> 
>> 
>> I ran two separate identical jobs (with and without checkpointing enabled), 
>> I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when 
>> checkpointing (HDFS backend) is enabled, with the below stack trace.
>> 
>> I did see a similar problem with different operators here 
>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html
>>  
>> ).
>>  Is this a known issue which is getting addressed?
>> 
>> Any ideas on what could be causing this?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> 2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task  
>>- SelectCepOperatorMixedTime (1/1) - 
>> SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched 
>> from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter 
>> function.
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException: 
>> java.lang.ArrayIndexOutOfBoundsException: -1
>> at 
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>> at 
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>> at 
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>> at 
>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>> at 
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>> at 
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>> at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>> ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>> at 
>> 

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-07 Thread Till Rohrmann
Hi Jayant, could you check that the UUID key on the TM is actually
serialized using a Kryo serializer? You can do this by setting a breakpoint
in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:

> Hi, Jayant
>
> Your code looks good to me. And I’ve tried the serialize/deserialize
> of Kryo on UUID class, it all looks okay.
>
> I’m not very sure about this problem. Maybe you can write a very
> simple demo to try if it works.
>
>
> Jiayi Liao, Best
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* bupt_ljy
> *Cc:* Tzu-Li (Gordon) Tai; user >
> *Date:* Monday, Oct 29, 2018 11:53
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Hi Jiayi,
> Any further help on this?
>
> Jayant Ameta
>
>
> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta  wrote:
>
>> MapStateDescriptor descriptor = new 
>> MapStateDescriptor<>("rulePatterns", UUID.class,
>> String.class);
>>
>> Jayant Ameta
>>
>>
>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>>
>>> Hi,
>>>
>>>Can you show us the descriptor in the codes below?
>>>
>>> client.getKvState(JobID.fromHexString(
>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>
>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHint() {}), descriptor);


>>> Jiayi Liao, Best
>>>
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>> user@flink.apache.org>
>>> *Date:* Friday, Oct 26, 2018 02:26
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Also, I haven't provided any custom serializer in my flink job.
>>> Shouldn't the same configuration work for queryable state client?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
>>> wrote:
>>>
 Hi Gordon,
 Following is the stack trace that I'm getting:

 *Exception in thread "main" java.util.concurrent.ExecutionException:
 java.lang.RuntimeException: Failed request 0.*
 * Caused by: java.lang.RuntimeException: Failed request 0.*
 * Caused by: java.lang.RuntimeException: Error while processing request
 with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
 unregistered class ID: -985346241*
 *Serialization trace:*
 *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
 * at
 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
 * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
 * at
 com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
 * at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
 * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
 * at
 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
 * at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
 * at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
 * at
 org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
 * at
 org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
 * at
 org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
 * at
 org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
 * at
 org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
 * at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
 * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
 * at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
 * at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
 * at java.lang.Thread.run(Thread.java:748)*

 I am not using any custom serialize as mentioned by Jiayi.

 Jayant Ameta


 On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:

> Hi  Jayant,
>
>   There should be a Serializer parameter in the constructor of the
> StateDescriptor, you should create a new serializer like this:
>
>
>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>
>
>  By the way, can you show us your kryo exception like what Gordon said?
>
>
> Jiayi Liao, Best
>
>
>
>  Original Message
> *Sender:* Tzu-Li (Gordon) Tai
> *Recipient:* Jayant Ameta; bupt_ljy<
> bupt_...@163.com>
> *Cc:* user
> *Date:* Thursday, Oct 

Re: FlinkCEP, circular references and checkpointing failures

2018-11-07 Thread Till Rohrmann
Hi Shailesh,

could you maybe provide us with an example program which is able to
reproduce this problem? This would help the community to better debug the
problem. It looks not right and might point towards a bug in Flink. Thanks
a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz 
wrote:

> This is some problem with serializing your events using Kryo. I'm adding
> Gordon to cc, as he was recently working with serializers. He might give
> you more insights what is going wrong.
>
> Best,
>
> Dawid
> On 25/10/2018 05:41, Shailesh Jain wrote:
>
> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
> the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) 
> *only
> when checkpointing (HDFS backend) is enabled*, with the below stack trace.
>
> I did see a similar problem with different operators here (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO
> org.apache.flink.runtime.taskmanager.Task -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
> function.
> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
> at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
> at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
> at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
> at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
> at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
> at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
> at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
> at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
> at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
> ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
> at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
> at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
> at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
> at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
> at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
> at
> 

Re: InterruptedException when async function is cancelled

2018-11-07 Thread Till Rohrmann
Hi Anil,

as Stephan stated, the fix is not included in Flink 1.4.2 but in the later
version of Flink. Can you upgrade to Flink 1.5.5 or Flink 1.6.2 to check
whether the problem still occurs?

Cheers,
Till

On Sun, Oct 28, 2018 at 8:55 AM Anil  wrote:

> I do see the same error but in case different situation. I'm not cancelling
> the job. Below is my error stack trace. SwiglobeZoneFromLatLong is my UDF
> name. Is this error something ignorable. I'm using flink 1.4.2.
> Thanks in advance.
>
> ```
> {"debug_level":"ERROR","debug_timestamp":"2018-10-28
> 06:40:20,838","debug_thread":"Source: Custom Source -> from: (event,
> proctime) -> select: (proctime, CityFromLatLong1(event.latLong.lat,
> event.latLong.lng) AS $f1, SwiglobeZoneFromLatLong(event.latLong.lat,
> event.latLong.lng) AS $f2, event.listingDataEventStats.serviceableRestCount
> AS $f3) (1/8)","debug_file":"StreamTask.java",
> "debug_line":"326","debug_message":"Could not shut down timer service",
> "job_name": "7bb4d4a4-f85a-429e-92e8-0c887f9b8cbd" }
> java.lang.InterruptedException
> at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
> at
>
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
> at
>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: RocksDB checkpointing dir per TM

2018-11-07 Thread Till Rohrmann
This is a very good point Elias. We actually forgot to add these options to
the configuration documentation after a refactoring. I will fix it.

Cheers,
Till

On Fri, Oct 26, 2018 at 8:27 PM Elias Levy 
wrote:

> There is also state.backend.rocksdb.localdir.  Oddly, I can find the
> documentation for it in the 1.5 docs
> ,
> but not in the 1.6 docs
> .
> The option is still in master
> ,
> and it is used
> 
> .
>
> On Fri, Oct 26, 2018 at 3:01 AM Andrey Zagrebin 
> wrote:
>
>> Hi Taher,
>>
>> TMs keep state locally while running, in this case RocksDB files already
>> belong to TM.
>> You can point it to the same NVME disk location on each node, relevant
>> Flink options here are:
>> - io.tmp.dirs
>> - taskmanager.state.local.root-dirs
>> This data is transient and has temporary nature. It does not survive a
>> job failure.
>>
>> The checkpoint is a logical snapshot of the operator state for all
>> involved TMs,
>> so it belongs to the job and usually uploaded to a distributed file
>> system available on all TMs.
>> The location is set in Flink option ‘state.checkpoints.dir'.
>> This way job can restore from it with different set of TMs.
>>
>> Best,
>> Andrey
>>
>> > On 26 Oct 2018, at 08:29, Taher Koitawala 
>> wrote:
>> >
>> > Hi All,
>> >   Our current cluster configuration uses one HDD which is
>> mainly for root and an other NVME disk per node, [1]we want make sure all
>> TMs write their own RocksDB files to the NVME disk only, how do we do that?
>> >
>> > [2] Is it also possible to specify multiple directories per TMs so that
>> we have an even spread when the RocksDB files are written?
>> >
>> > Thanks,
>> > Taher Koitawala
>>
>>


Re: 答复: Flink1.6.0 submit job and got "No content to map due to end-of-input" Error

2018-11-07 Thread Till Rohrmann
Hi Jeroen,

could you maybe share the Flink logs with us to further debug the problem?

Cheers,
Till

On Fri, Oct 26, 2018 at 3:56 PM Jeroen Steggink | knowsy 
wrote:

> Hi,
>
> I'm running Flink 1.5.4 and all dependencies in the job rely on 1.5.4.
> However, I still get this error. According to the JIRA issue it should be
> fixed in 1.5.4 as well.
>
> Since I'm using Apache Beam to build the jar, I can't move to version
> 1.6.x.
>
> What could it be?
>
> Cheers,
>
> Jeroen
> On 07-Sep-18 17:52, Till Rohrmann wrote:
>
> Hi Gongsen,
>
> Chesnay found and fixed the problem:
> https://issues.apache.org/jira/browse/FLINK-10293.
>
> Cheers,
> Till
>
>
> On Wed, Sep 5, 2018 at 10:00 AM 潘 功森  wrote:
>
>> Hi  Chesney,
>>
>>I can sure the client and cluster all upgraded to 1.6.0 cause if I
>> used “./flink run XXX.jar” to submit a job and it works fine.You can see ui
>> below.
>>
>>But when I used createRemoteEnvironment at local, and it
>> failed.It confused me a lot.
>>
>>
>>
>>
>> --
>> *发件人:* Chesnay Schepler 
>> *发送时间:* Wednesday, September 5, 2018 3:23:23 PM
>> *收件人:* 潘 功森; vino yang; d...@flink.apache.org
>> *抄送:* user
>> *主题:* Re: 答复: Flink1.6.0 submit job and got "No content to map due to
>> end-of-input" Error
>>
>> Did you upgrade both the client and cluster to 1.6.0? The server returned
>> a completely empty response which shouldn't be possible if it runs 1.6.0.
>>
>> On 05.09.2018 07:27, 潘 功森 wrote:
>>
>> Hi  Vino,
>>
>>
>>
>> Below are dependencies I used,please have a look.
>>
>>
>>
>> I floud it also inclued flink-connector-kafka-0.10_2.11-1.6.0.jar and
>> flink-connector-kafka-0.9_2.11-1.6.0.jar, and I don’t know if it has any
>> effect?
>>
>>
>>
>> yours,
>>
>> Gongsen
>>
>>
>>
>> 发送自 Windows 10 版邮件 应用
>>
>>
>> --
>> *发件人:* vino yang  
>> *发送时间:* Wednesday, September 5, 2018 10:35:59 AM
>> *收件人:* d...@flink.apache.org
>> *抄送:* user
>> *主题:* Re: Flink1.6.0 submit job and got "No content to map due to
>> end-of-input" Error
>>
>> Hi Pangongsen,
>>
>> Do you upgrade the Flink-related dependencies you use at the same time?
>> In other words, is the dependency consistent with the flink version?
>>
>> Thanks, vino.
>>
>> ? ??  于2018年9月4日周二 下午10:07写道:
>>
>>> Hi all,
>>>  I use below way to submit jar to Flink :
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.createRemoteEnvironment(config.clusterIp,
>>>
>>> config.clusterPort,
>>>
>>> config.clusterFlinkJar);
>>>
>>>
>>> I used Flink 1.3.2 before, and it works fine. But I upgrade it to
>>> 1.6.0, and I got the error below:
>>>
>>> 2018-09-04 21:38:32.039 [ERROR] [flink-rest-client-netty-19-1]
>>> org.apache.flink.runtime.rest.RestClient - Unexpected plain-text response:
>>>
>>> 2018-09-04 21:38:32.137 [ERROR] [flink-rest-client-netty-18-1]
>>> org.apache.flink.runtime.rest.RestClient - Response was not valid JSON.
>>>
>>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>>> No content to map due to end-of-input
>>>
>>>
>>> Could you give me some advice to fix it?
>>>
>>> yours,
>>> Gongsen
>>>
>>>
>>
>


Re: Flink Task Allocation on Nodes

2018-11-07 Thread Till Rohrmann
Hi Sayat,

at the moment it is not possible to control the scheduling behaviour of
Flink. In the future, we plan to add some kind of hints which controls
whether tasks of a job get spread out or will be packed on as few nodes as
possible.

Cheers,
Till

On Fri, Oct 26, 2018 at 2:06 PM Kien Truong  wrote:

> Hi,
>
> There are couple of reasons:
>
> - Easier resource allocation and isolation: one faulty job doesn't affect
> another.
>
> - Mix and match of Flink version: you can leave the old stable jobs run
> with the old Flink version, and use the latest version of Flink for new
> jobs.
>
> - Faster metrics collection: Flink generates a lots of metrics, by keeping
> each cluster small, our Prometheus instance can scrape their metrics a lot
> faster.
>
>
> Regards,
>
> Kien
>
>
> On 10/26/2018 2:50 PM, Sayat Satybaldiyev wrote:
>
> Thanks for the advice, Klein. Could you please share more details why it's
> best to allocate for each job a separate cluster?
>
> On Wed, Oct 24, 2018 at 3:23 PM Kien Truong 
> wrote:
>
>> Hi,
>>
>> You can have multiple Flink clusters on the same set of physical
>> machines. In our experience, it's best to deploy a separate Flink
>> cluster for each job and adjust the resource accordingly.
>>
>> Best regards,
>> Kien
>>
>> On Oct 24, 2018 at 20:17, >
>> wrote:
>>
>> Flink Cluster in standalone with HA configuration. It has 6 Task managers
>> and each has 8 slots. Overall, 48 slots for the cluster.
>>
>> >>If you cluster only have one task manager with one slot in each node,
>> then the job should be spread evenly.
>> Agree, this will solve the issue. However, the cluster is running other
>> jobs and in this case it won't have hardware resource for other jobs.
>>
>> On Wed, Oct 24, 2018 at 2:20 PM Kien Truong 
>> wrote:
>>
>>> Hi,
>>>
>>> How are your task managers deploy ?
>>>
>>> If you cluster only have one task manager with one slot in each node,
>>> then the job should be spread evenly.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>> On 10/24/2018 4:35 PM, Sayat Satybaldiyev wrote:
>>> > Is there any way to indicate flink not to allocate all parallel tasks
>>> > on one node?  We have a stateless flink job that reading from 10
>>> > partition topic and have a parallelism of 6. Flink job manager
>>> > allocates all 6 parallel operators to one machine, causing all traffic
>>> > from Kafka allocated to only one machine. We have a cluster of 6 nodes
>>> > and ideal to spread one parallel operator to one machine. Is there a
>>> > way to do than in Flink?
>>>
>>


Re: Run a Flink job: REST/ binary client

2018-11-07 Thread Flavio Pompermaier
After a painful migration to Flink 1.6.2 we were able to run one of the
jobs.
Unfortunately we faced the same behaviour: all the code after the first
env.execute() is not execute if the job is called from the REST services or
from the web UI, while everything works fine if running the job using
'bin/flink run' from a shell.

Any solution to this?

On Tue, Nov 6, 2018 at 4:55 PM Flavio Pompermaier 
wrote:

> Hi to all,
> I'm using Flink 1.3.2. If executing a job using bin/flink run everything
> goes well.
> If executing using REST service of job manager (/jars:jarid/run) the job
> writes to the sink but fails to return on env.execute() and all the code
> after it is not executed.
>
> Is this a known issue? Was it resolved in Flink 1.6.2?
>
> Best,
> Flavio
>


Re: HA jobmanagers redirect to ip address of leader instead of hostname

2018-11-07 Thread Till Rohrmann
Hi Jeroen,

this sounds like a bug in Flink that we return sometimes IP addresses
instead of hostnames. Could you tell me which Flink version you are using?
In the current version, the redirect address and the address retrieved from
ZooKeeper should actually be the same.

In the future, we plan to remove the redirect message and simply forward
the request to the current leader. This should hopefully avoid these kind
of problems.

Cheers,
Till

On Fri, Oct 26, 2018 at 1:40 PM Jeroen Steggink | knowsy 
wrote:

> Hi,
>
> I'm having some troubles with Flink jobmanagers in a HA setup within
> OpenShift.
>
> I have three jobmanagers, a Zookeeper cluster and a loadbalancer
> (Openshift/Kubernetes Route) for the web ui / rest server on the
> jobmanagers. Everything works fine, as long as the loadbalancer connects
> to the leader. However, when the leader changes and the loadbalancer
> connects to a non-leader, the jobmanager redirects to a leader using the
> ip address of the host. Since the routing in our network is done using
> hostnames, it doesn't know how to find the node using the ip address and
> results in a timeout.
>
> So I have a few questions:
> 1. Why is Flink using the ip addresses instead of the hostname which are
> configured in the config? Other times it does use the hostname, like the
> info send to Zookeeper.
> 2. Is there another way of coping with connections to non-leaders
> instead of redirects? Maybe proxying through a non-leader to the leader?
>
> Cheers,
> Jeroen
>
>


Re: Counting DataSet in DataFlow

2018-11-07 Thread bastien dine
Hi Fabian,
Thanks for the response, I am going to use the second solution !
Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 7 nov. 2018 à 14:16, Fabian Hueske  a écrit :

> Another option for certain tasks is to work with broadcast variables [1].
> The value could be use to configure two filters.
>
> DataSet input = 
> DataSet count = input.map(-> 1L).sum()
> DataSet input.filter(if cnt == 0).withBroadcastSet("cnt",
> count).doSomething
> DataSet input.filter(if cnt != 0).withBroadcastSet("cnt",
> count).doSomethingElse
>
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#broadcast-variables
>
>
> Am Mi., 7. Nov. 2018 um 14:10 Uhr schrieb Fabian Hueske  >:
>
>> Hi,
>>
>> Counting always requires a job to be executed.
>> Not sure if this is what you want to do, but if you want to prevent to
>> get an empty result due to an empty cross input, you can use a
>> mapPartition() with parallelism 1 to emit a special record, in case the
>> MapPartitionFunction didn't see any data.
>>
>> Best, Fabian
>>
>> Am Mi., 7. Nov. 2018 um 14:02 Uhr schrieb bastien dine <
>> bastien.d...@gmail.com>:
>>
>>> Hello,
>>>
>>> I would like to a way to count a dataset to check if it is empty or
>>> not.. But .count() throw an execution and I do not want to do separe job
>>> execution plan, as hthis will trigger multiple reading..
>>> I would like to have something like..
>>>
>>> Source -> map -> count -> if 0 -> do someting
>>>if not -> do something
>>>
>>>
>>> More concrete i would like to check if one of my dataset is empty before
>>> doing a cross operation..
>>>
>>> Thanks,
>>> Bastien
>>>
>>>
>>>


Re: RichInputFormat working differently in eclipse and in flink cluster

2018-11-07 Thread Till Rohrmann
Hi Teena,

which Flink version are you using? Have you tried whether this happens with
the latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <
teena.kap...@bprise.com> wrote:

> Hi all,
>
>
>
> I have implemented RichInputFormat for reading result of aggregation
> queries in Elasticsearch. There are around 10 buckets, which are of
> type json array. Note: This is one time response.
>
>
>
> My idea here is to iterate these arrays in parallel. Here is the pseudo
> code.
>
>
>
> public void configure(Configuration parameters) {
>
> System.out.println("configure");
>
> }
>
>
>
> public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
>
> }
>
>
>
> public ResponseInputSplit[] createInputSplits(int minNumSplits){
>
> System.out.println("createInputSplits");
>
>
>
> //read from elastic
>
> // add buckets to array
>
> }
>
>
>
> public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[]
> inputSplits) {
>
> //this is default
>
> System.out.println("getInputSplitAssigner");
>
> return new DefaultInputSplitAssigner(inputSplits);
>
> }
>
>
>
> public void open(ResponseInputSplit split) {
>
> //read buckets
>
> }
>
>
>
> public boolean reachedEnd(){
>
> System.out.println("reachedEnd");
>
> }
>
>
>
> public Bounce nextRecord(Bounce reuse) {
>
> }
>
>
>
> public void close(){
>
> }
>
>
>
> // my main method,
>
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
> DataSet bounce_data_set = env.createInput(new
> MyInputDataSetInputFormat());
>
>
>
> When running in eclipse, it executes createInputSplits and the results
> look fine. Logs are given below.
>
> Output is à
>
> configure
>
> Connected to JobManager at
> Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...
>
> configure
>
> createInputSplits
>
>
>
> When submitting job in flink cluster, it doesn’t execute ‘configure’ and
> ‘createInputSplits’ methods. Instead it directly goes to nextRecord
> function. Logs are given below.
>
> Output is à
>
> Starting execution of program
>
> configure
>
> Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for
> job completion.
>
> Connected to JobManager at Actor[akka.tcp://flink@:xxx
> /user/jobmanager#1219973491] with leader session id...
>
> 10/26/2018 15:05:57 Job execution switched to status RUNNING.
>
> 10/26/2018 15:05:57 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
>
> 10/26/2018 15:05:57 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
>
> 10/26/2018 15:06:00 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
>
> 10/26/2018 15:06:00 DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
>
> java.lang.NullPointerException
>
>at com.xxx.test.
> MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)
>
>
>
> Regards,
>
> Teena
>
>
>


State processing and testing utilities (Bravo)

2018-11-07 Thread Gyula Fóra
Hey all!

I just wanted to give you a quick update on the bravo project.

Bravo contains a bunch of useful utilities for processing the
checkpoint/savepoint state of a streaming job as Flink Datasets (batch).
The end goal of the project is to be contributed to Flink once we are happy
with it but for that we need support from the users to try and work with it
first :)

Over the past weeks I have worked on a bunch of useful fixes and features
to make it nice and we already had a number of people trying it
successfully to analyze state of streaming applications.

I suggest all of you to give a look and try it if it seems to be useful:
https://github.com/king/bravo

The library is also accessible from Maven central now :)

The bravo project also contains a set of super nice (according to me)
testing utilities for testing stateful pipelines end-to-end including
validating the state itself. These utilities are encapsulated in the
bravo-test-utils subproject.

Cheers
Gyula


Re: Counting DataSet in DataFlow

2018-11-07 Thread Fabian Hueske
Another option for certain tasks is to work with broadcast variables [1].
The value could be use to configure two filters.

DataSet input = 
DataSet count = input.map(-> 1L).sum()
DataSet input.filter(if cnt == 0).withBroadcastSet("cnt",
count).doSomething
DataSet input.filter(if cnt != 0).withBroadcastSet("cnt",
count).doSomethingElse

Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#broadcast-variables


Am Mi., 7. Nov. 2018 um 14:10 Uhr schrieb Fabian Hueske :

> Hi,
>
> Counting always requires a job to be executed.
> Not sure if this is what you want to do, but if you want to prevent to get
> an empty result due to an empty cross input, you can use a mapPartition()
> with parallelism 1 to emit a special record, in case the
> MapPartitionFunction didn't see any data.
>
> Best, Fabian
>
> Am Mi., 7. Nov. 2018 um 14:02 Uhr schrieb bastien dine <
> bastien.d...@gmail.com>:
>
>> Hello,
>>
>> I would like to a way to count a dataset to check if it is empty or not..
>> But .count() throw an execution and I do not want to do separe job
>> execution plan, as hthis will trigger multiple reading..
>> I would like to have something like..
>>
>> Source -> map -> count -> if 0 -> do someting
>>if not -> do something
>>
>>
>> More concrete i would like to check if one of my dataset is empty before
>> doing a cross operation..
>>
>> Thanks,
>> Bastien
>>
>>
>>


How to run Flink 1.6 job cluster in "standalone" mode?

2018-11-07 Thread Hao Sun
"Standalone" here I mean job-mananger + taskmanager on the same JVM. I have
an issue to debug on our K8S environment, I can not reproduce it in local
docker env or Intellij. If JM and TM are running in different VMs, it makes
things harder to debug.

Or is there a way to debug a job running on JM + TM on different VMs?
Is reverting to session cluster the only way to get JM + TM on the same VM?


Re: Counting DataSet in DataFlow

2018-11-07 Thread Fabian Hueske
Hi,

Counting always requires a job to be executed.
Not sure if this is what you want to do, but if you want to prevent to get
an empty result due to an empty cross input, you can use a mapPartition()
with parallelism 1 to emit a special record, in case the
MapPartitionFunction didn't see any data.

Best, Fabian

Am Mi., 7. Nov. 2018 um 14:02 Uhr schrieb bastien dine <
bastien.d...@gmail.com>:

> Hello,
>
> I would like to a way to count a dataset to check if it is empty or not..
> But .count() throw an execution and I do not want to do separe job
> execution plan, as hthis will trigger multiple reading..
> I would like to have something like..
>
> Source -> map -> count -> if 0 -> do someting
>if not -> do something
>
>
> More concrete i would like to check if one of my dataset is empty before
> doing a cross operation..
>
> Thanks,
> Bastien
>
>
>


Counting DataSet in DataFlow

2018-11-07 Thread bastien dine
Hello,

I would like to a way to count a dataset to check if it is empty or not..
But .count() throw an execution and I do not want to do separe job
execution plan, as hthis will trigger multiple reading..
I would like to have something like..

Source -> map -> count -> if 0 -> do someting
   if not -> do something


More concrete i would like to check if one of my dataset is empty before
doing a cross operation..

Thanks,
Bastien


Error after upgrading to Flink 1.6.2

2018-11-07 Thread Flavio Pompermaier
Hi to all,
we tried to upgrade our jobs to Flink 1.6.2 but now we get the following
error (we saw a similar issue with spark that was caused by different java
version on the cluster servers so we checked them and they are all to the
same version - oracle-8-191):

Caused by: org.apache.flink.runtime.client.JobExecutionException:
Cannot initialize task 'DataSink (Parquet write:
hdfs:/rivela/1/1/0_staging/parquet)': Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8)
failed: unread block data
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
... 10 more
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@54a4c7c8)
failed: unread block data
at 
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 15 more
Caused by: java.lang.IllegalStateException: unread block data
at 
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:438)
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at 
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 16 more


Has anyone faced this problem before? How can we try to solve it?
Best,Flavio


RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

2018-11-07 Thread LINZ, Arnaud
FYI, the code below ends with version 1.6.0, do not end in 1.6.1. I suspect 
it’s a bug instead of a new feature.

De : LINZ, Arnaud
Envoyé : mercredi 7 novembre 2018 11:14
À : 'user' 
Objet : RE: Stopping a streaming app from its own code : behaviour change from 
1.3 to 1.6


Hello,



This has nothing to do with HA. All my unit tests involving a streaming app now 
fail in “infinite execution”
This simple code never ends :
@Test
public void testFlink162() throws Exception {
// get the execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
final DataStreamSource text = env.addSource(new 
SourceFunction() {
@Override
public void run(final SourceContext ctx) throws Exception {
for (int count = 0; count < 5; count++) {
ctx.collect(String.valueOf(count));
}
}
@Override
public void cancel() {
}
});
text.print().setParallelism(1);
env.execute("Simple Test");
// Never ends !
}
Is this really a new feature or a critical bug?
In the log, the task executor is stopped
[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:330)
But execute() does not return.

Arnaud

Log is :
[2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini cluster 
(org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)
[2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:227)
[2018-11-07 11:11:11,636] INFO Starting Metrics Registry 
(org.apache.flink.runtime.minicluster.MiniCluster:238)
[2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics will 
be exposed/reported. (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)
[2018-11-07 11:11:11,703] INFO Starting RPC Service(s) 
(org.apache.flink.runtime.minicluster.MiniCluster:249)
[2018-11-07 11:11:12,244] INFO Slf4jLogger started 
(akka.event.slf4j.Slf4jLogger:92)
[2018-11-07 11:11:12,264] INFO Starting high-availability services 
(org.apache.flink.runtime.minicluster.MiniCluster:290)
[2018-11-07 11:11:12,367] INFO Created BLOB server storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108
 (org.apache.flink.runtime.blob.BlobServer:141)
[2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max 
concurrent requests: 50 - max backlog: 1000 
(org.apache.flink.runtime.blob.BlobServer:203)
[2018-11-07 11:11:12,380] INFO Starting ResourceManger 
(org.apache.flink.runtime.minicluster.MiniCluster:301)
[2018-11-07 11:11:12,409] INFO Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . 
(org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
[2018-11-07 11:11:12,432] INFO Proposing leadership to contender 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@5b1f29fa
 @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
[2018-11-07 11:11:12,439] INFO ResourceManager 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was 
granted leadership with fencing token 86394924fb97bad612b67f526f84406f 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)
[2018-11-07 11:11:12,440] INFO Starting the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)
[2018-11-07 11:11:12,442] INFO Received confirmation of leadership for leader 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , 
session=12b67f52-6f84-406f-8639-4924fb97bad6 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
[2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f
 (org.apache.flink.runtime.blob.PermanentBlobCache:107)
[2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90
 (org.apache.flink.runtime.blob.TransientBlobCache:107)
[2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) 
(org.apache.flink.runtime.minicluster.MiniCluster:316)
[2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: 
e84ce076-ec5e-48d6-90dc-4b18ba7c5757 
(org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)
[2018-11-07 11:11:12,531] INFO Temporary file directory 
'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% 
usable) 

RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

2018-11-07 Thread LINZ, Arnaud
Hello,



This has nothing to do with HA. All my unit tests involving a streaming app now 
fail in “infinite execution”
This simple code never ends :
@Test
public void testFlink163() throws Exception {
// get the execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
final DataStreamSource text = env.addSource(new 
SourceFunction() {
@Override
public void run(final SourceContext ctx) throws Exception {
for (int count = 0; count < 5; count++) {
ctx.collect(String.valueOf(count));
}
}
@Override
public void cancel() {
}
});
text.print().setParallelism(1);
env.execute("Simple Test");
// Never ends !
}
Is this really a new feature or a critical bug?
In the log, the task executor is stopped
[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:330)
But execute() does not return.

Arnaud

Log is :
[2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini cluster 
(org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)
[2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:227)
[2018-11-07 11:11:11,636] INFO Starting Metrics Registry 
(org.apache.flink.runtime.minicluster.MiniCluster:238)
[2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics will 
be exposed/reported. (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)
[2018-11-07 11:11:11,703] INFO Starting RPC Service(s) 
(org.apache.flink.runtime.minicluster.MiniCluster:249)
[2018-11-07 11:11:12,244] INFO Slf4jLogger started 
(akka.event.slf4j.Slf4jLogger:92)
[2018-11-07 11:11:12,264] INFO Starting high-availability services 
(org.apache.flink.runtime.minicluster.MiniCluster:290)
[2018-11-07 11:11:12,367] INFO Created BLOB server storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108
 (org.apache.flink.runtime.blob.BlobServer:141)
[2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max 
concurrent requests: 50 - max backlog: 1000 
(org.apache.flink.runtime.blob.BlobServer:203)
[2018-11-07 11:11:12,380] INFO Starting ResourceManger 
(org.apache.flink.runtime.minicluster.MiniCluster:301)
[2018-11-07 11:11:12,409] INFO Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . 
(org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)
[2018-11-07 11:11:12,432] INFO Proposing leadership to contender 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@5b1f29fa @ 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)
[2018-11-07 11:11:12,439] INFO ResourceManager 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was 
granted leadership with fencing token 86394924fb97bad612b67f526f84406f 
(org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)
[2018-11-07 11:11:12,440] INFO Starting the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)
[2018-11-07 11:11:12,442] INFO Received confirmation of leadership for leader 
akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , 
session=12b67f52-6f84-406f-8639-4924fb97bad6 
(org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)
[2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f
 (org.apache.flink.runtime.blob.PermanentBlobCache:107)
[2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory 
C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90
 (org.apache.flink.runtime.blob.TransientBlobCache:107)
[2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) 
(org.apache.flink.runtime.minicluster.MiniCluster:316)
[2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: 
e84ce076-ec5e-48d6-90dc-4b18ba7c5757 
(org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)
[2018-11-07 11:11:12,531] INFO Temporary file directory 
'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% 
usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720)
[2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool (number 
of memory segments: 12686, bytes per segment: 32768). 
(org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114)
[2018-11-07 11:11:12,765] INFO Could not load Queryable State Client Proxy. 
Probable reason: flink-queryable-state-runtime is not in the classpath. To 

flink job restarts when flink cluster restarts?

2018-11-07 Thread Chang Liu
Hi,

I have a question regarding whether the current running job will restart if I 
stop and start the flink cluster?

1. Let’s say I am just having a Standalone one node cluster.
2. I have several Flink jobs already running on the cluster.
3. If I do a bin/cluster-stop.sh and then do a bin/cluster-start.sh, will be 
previously running job restart again?

OR

Before I do bin/cluster-stop.sh, I have to do Savepoints for each of the job.
After bin/cluster-start.sh is finished, I have to do Start Job based on 
Savepoints triggered before for each of the job I want to restart.

Many thanks in advance :)

Best regards/祝好,

Chang Liu 刘畅




Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-07 Thread Piotr Nowojski
Hi,

You would have to register timers (probably based on event time).

Your operator would be a vastly simplified window operator, where for given 
window you keep emitted record from your SQL, sth like:

MapState emittedRecords; // map window start -> emitted 
record

When you process elements, you just put them into this map. To emit the 
results, you just register event time timers and when a timer fires, you search 
in the map for the latest record matching the timer's event time (there might 
be many elements in the map, some of them older some of them newer then the 
fired timer). You can/should also prune the state in the same timer - for 
example after emitting the result drop all of the windows older then the timer.

Piotrek

> On 7 Nov 2018, at 02:55, yinhua.dai  wrote:
> 
> Hi Piotr,
> 
> Can you elaborate more on the solution with the custom operator?
> I don't think there will be any records from the SQL query if no input data
> in coming in within the time window even if we convert the result to a
> datastream.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-07 Thread Flink Developer
Thank you Addison and Ravi for the detailed info.

Hi Addison, it sounds like StreamingFileSink is promising and will be available 
in Flink 1.7. From the mailing list, it looks like Flink 1.7 RC is now 
available for use.

Some questions for you... in your use case, is your source Kafka and is the 
Flink app running high parallelism (>300)? Are you able to run with 
StreamingFileSink to S3 for multiple days without failure? When using 
StreamingFileSink, what type of configuration did you use? Thank you.

‐‐‐ Original Message ‐‐‐
On Monday, November 5, 2018 8:23 AM, Addison Higham  wrote:

> Hi there,
>
> This is going to be a bit of a long post, but I think there has been a lot of 
> confusion around S3, so I am going to go over everything I know in hopes that 
> helps.
>
> As mentioned by Rafi, The BucketingSink does not work for file systems like 
> S3, as the bucketing sink makes some assumptions that are incorrect for 
> eventually consistent file systems as well as for file systems that don't 
> have certain atomic operations, which leads to inconsistency (see 
> https://issues.apache.org/jira/browse/FLINK-6306). This has been poorly 
> documented in the docs, so I think a lot of people have tried to use s3 only 
> to face issues. There is a plan for moving forward however.
>
> However, that plan does NOT include "fixing" the BucketingSink. Instead, a 
> new API - the StreamingFileSink - is the replacement for BucketingSink, which 
> was first introduced in Flink 1.6 is planned to (eventually) fix the problem. 
> The first release of StreamingFileSink in the 1.6 branch didn't support S3. 
> This was originally seen as a bug that would be fixed in Flink 1.6.2, 
> however, once all the work was done to add support for S3, it seems it was 
> decided not to backport the fix (see this thread: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-9752-s3-recoverable-writer-not-actually-fixed-in-1-6-2-td24925.html).
>  This means that flink 1.6.2 does NOT fix the S3 issue, but the fix will be 
> included in 1.7, which is currently in feature freeze and will hopefully have 
> an RC in the next couple of weeks.
>
> But yes, if you need S3 support ASAP, you are in a bit of a pickle. My team 
> is in that situation, so this the options as we saw them:
>
> 0. Wait for flink 1.7
> 1. Run and build your own flink from master or flink-1.7 branches which has 
> support for S3 and StreamingFileSink
> 2. Write our own custom sink for s3 (probably with some caveats)
> 3. Backport the changes into flink 1.6
>
> We really didn't want to wait for 1.7, as that would make our delivery 
> timeline not great. We didn't love the idea of running a fun unreleased 
> version of flink in production either. As we looked into writing something 
> ourselves, it became clear
> pretty quick that we could fairly easily get an output sink to a file that 
> would be at-least-once delivery to a file, but exactly-once delivery would be 
> significantly more difficult. That is actually okay for our use case, but we 
> decided we would rather not have to
> revisit this later on and change all the code and then run a one-off job to 
> remove dupes. Instead, we decided to backport the changes into 1.6 branch. 
> Luckily, we already build our own flink, so we had that tooling already. The 
> backport took a few hours (it was fairly complicated to get all the changes), 
> but we seem to got everything
> working. The backport is here: 
> https://github.com/instructure/flink/tree/s3_recover_backport. Our plan is to 
> use that backport until 1.7 is stable, then we can upgrade without 
> (hopefully) having to change any code. We still recognize there is a 
> possibility for bugs in the backport, but
> for us that is mitigated by the fact that we are okay with at-least-once and 
> if all else fails, we have a period of transition where we have this data 
> being written in another location we can fall back to.
>
> So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that should 
> hopefully be fixed *soon*. If you can wait, that is the easiest, if you 
> can't, building either your own custom sink or your own flink with the 
> backport isn't a terrible option.
>
> Hope that helps!
>
> Adddison
>
> On Sun, Nov 4, 2018 at 3:09 AM Flink Developer  
> wrote:
>
>> Hi Ravi, some questions:
>>
>> - Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop, 
>> flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs, 
>> hadoop-common) ? If so, could you please share your dependency versioning?
>> - Does this use a kafka source with high flink parallelism (~400) for all 
>> kafka partitions and does it run continuously for several days?
>> - Could you please share your checkpoint interval configuration, batch file 
>> size, batch rollover interval configuration, and sink prefix (s3:// ,  
>> s3a://)
>>
>> Thank you
>> ‐‐‐ Original Message ‐‐‐
>> On Saturday, November 3, 2018 7:18 AM, Ravi