What does enableObjectReuse exactly do?

2021-05-07 Thread
I wrote a streaming job with scala, using only immutable case class. Is it
safe to enable object reuse? Will it get benefits from enabling object
reuse?

I reached to documents but they cover neither streaming cases nor immutable
data structures.


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread
Thank you for your response. Registering a timer at Long.MaxValue works.
And I have found the mistake in my original code.

When a timer fires and there are elements in the priority queue with
timestamp greater than current watermark, they do not get processed. A new
timer should be registered for these elements. I just forgot theses
unprocessed elements.

Dian Fu  于 2019年10月28日周一 下午4:17写道:

> Before a program close, it will emit Long.MaxValue as the watermark and
> that watermark will trigger all the windows. This is the reason why your
> `timeWindow` program could work. However, for the first program, you have
> not registered the event time timer(though context.timerService.
> registerEventTimeTimer) and also there is also no onTimer logic defined
> to process it.
>
> 在 2019年10月28日,下午4:01,杨力  写道:
>
> It seems to be the case. But when I use timeWindow or CEP with
> fromCollection, it works well. For example,
>
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002,
> 3002)).assignAscendingTimestamps(identity[Long])
> .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
>
> prints
>
> ```
> 1
> 1002
> 2002
> 3002
> ```
>
> How can I implement my KeyedProcessFunction so that it would work as
> expected.
>
> Dian Fu  于 2019年10月28日周一 下午2:04写道:
>
>> Hi,
>>
>> It generates watermark periodically by default in the underlying
>> implementation of `assignAscendingTimestamps`. So for your test program,
>> the watermark is still not generated yet and I think that's the reason why
>> it's Long.MinValue.
>>
>> Regards,
>> Dian
>>
>> 在 2019年10月28日,上午11:59,杨力  写道:
>>
>> I'm going to sort elements in a PriorityQueue and set up timers at
>> (currentWatermark + 1), following the instructions in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
>> .
>>
>> However, it seems that context.timerService().currentWatermark() always
>> returns Long.MinValue and my onTimer will never be called. Here's minimal
>> program to reproduce the problem. Am I missing something?
>>
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2,
>> 3)).assignAscendingTimestamps(identity[Long])
>> .process(new ProcessFunction[Long, Long] {
>>   override def processElement(i: Long, context: ProcessFunction[Long,
>> Long]#Context, collector: Collector[Long]): Unit = {
>> collector.collect(context.timerService().currentWatermark())
>>   }
>> }).print()
>> sEnv.execute()
>> ```
>>
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>>
>>
>>
>


Re: Watermark won't advance in ProcessFunction

2019-10-28 Thread
It seems to be the case. But when I use timeWindow or CEP with
fromCollection, it works well. For example,

```
sEnv.fromCollection(Seq[Long](1, 1002, 2002,
3002)).assignAscendingTimestamps(identity[Long])
.keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
```

prints

```
1
1002
2002
3002
```

How can I implement my KeyedProcessFunction so that it would work as
expected.

Dian Fu  于 2019年10月28日周一 下午2:04写道:

> Hi,
>
> It generates watermark periodically by default in the underlying
> implementation of `assignAscendingTimestamps`. So for your test program,
> the watermark is still not generated yet and I think that's the reason why
> it's Long.MinValue.
>
> Regards,
> Dian
>
> 在 2019年10月28日,上午11:59,杨力  写道:
>
> I'm going to sort elements in a PriorityQueue and set up timers at
> (currentWatermark + 1), following the instructions in
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
> .
>
> However, it seems that context.timerService().currentWatermark() always
> returns Long.MinValue and my onTimer will never be called. Here's minimal
> program to reproduce the problem. Am I missing something?
>
> ```
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> sEnv.setParallelism(argOps.parallelism())
> sEnv.fromCollection(Seq[Long](1, 2,
> 3)).assignAscendingTimestamps(identity[Long])
> .process(new ProcessFunction[Long, Long] {
>   override def processElement(i: Long, context: ProcessFunction[Long,
> Long]#Context, collector: Collector[Long]): Unit = {
> collector.collect(context.timerService().currentWatermark())
>   }
> }).print()
> sEnv.execute()
> ```
>
> ```
> -9223372036854775808
> -9223372036854775808
> -9223372036854775808
> ```
>
>
>


Watermark won't advance in ProcessFunction

2019-10-27 Thread
I'm going to sort elements in a PriorityQueue and set up timers at
(currentWatermark + 1), following the instructions in
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing
.

However, it seems that context.timerService().currentWatermark() always
returns Long.MinValue and my onTimer will never be called. Here's minimal
program to reproduce the problem. Am I missing something?

```
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
sEnv.setParallelism(argOps.parallelism())
sEnv.fromCollection(Seq[Long](1, 2,
3)).assignAscendingTimestamps(identity[Long])
.process(new ProcessFunction[Long, Long] {
  override def processElement(i: Long, context: ProcessFunction[Long,
Long]#Context, collector: Collector[Long]): Unit = {
collector.collect(context.timerService().currentWatermark())
  }
}).print()
sEnv.execute()
```

```
-9223372036854775808
-9223372036854775808
-9223372036854775808
```


Is it possible to register a custom TypeInfoFactory without using an annotation?

2019-08-26 Thread
I'd like to provide a custom serializer for a POJO class. But that class
cannot be modified so it's not possible to add a @TypeInfo annotation to
it. Are there any other ways to register one?


Re: Job xxx not found exception when starting Flink program in Local

2018-11-17 Thread
The web ui keeps job ids in web browsers.
徐涛  于2018年11月18日周日 上午8:12写道:
>
> Hi Chesnay,
> Yes, it is. Later I notice before the later started cluster I have use 
> ./bin/start-cluster.sh to start a local cluster, and after I 
> ./bin/stop-cluster.sh to stop the cluster, the problem is gone.
> But how the later started cluster know there is a job in the former started 
> cluster?
>
> Best
> Henry
>
> 在 2018年11月14日,下午5:16,Chesnay Schepler  写道:
>
> Did you have the WebUI open from a previous execution? If so then the UI 
> might still be requesting jobs from the previous job.
>
> On 13.11.2018 08:01, 徐涛 wrote:
>
> Hi Experts,
> When I start Flink program in local, I found that the following exception 
> throws out, I do not know why it happens because it happens in sudden, some 
> hours ago the program can start successfully.
> Could anyone help to explain it?
> Thanks a lot!
>
> 2018-11-13 14:48:45 [flink-akka.actor.default-dispatcher-60] ERROR 
> o.a.f.r.r.h.job.JobDetailsHandler - Exception occurred in REST handler.
> org.apache.flink.runtime.rest.NotFoundException: Job 
> 512a21d9f992d4884f836abb82c64f0d not found
> at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90)
>  ~[flink-runtime_2.11-1.6.2.jar:1.6.2]
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  ~[na:1.8.0_172]
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  ~[na:1.8.0_172]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[na:1.8.0_172]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[na:1.8.0_172]
> at 
> org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:133)
>  ~[flink-runtime_2.11-1.6.2.jar:1.6.2]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[na:1.8.0_172]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  [na:1.8.0_172]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[na:1.8.0_172]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[na:1.8.0_172]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>  ~[flink-runtime_2.11-1.6.2.jar:1.6.2]
> at akka.dispatch.OnComplete.internal(Future.scala:258) 
> ~[akka-actor_2.11-2.4.20.jar:na]
> at akka.dispatch.OnComplete.internal(Future.scala:256) 
> ~[akka-actor_2.11-2.4.20.jar:na]
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) 
> ~[akka-actor_2.11-2.4.20.jar:na]
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) 
> ~[akka-actor_2.11-2.4.20.jar:na]
> at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:32) 
> ~[scala-library-2.11.8.jar:na]
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) 
> ~[scala-library-2.11.8.jar:na]
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>  ~[flink-runtime_2.11-1.6.2.jar:1.6.2]
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
> ~[scala-library-2.11.8.jar:na]
> at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
> ~[scala-library-2.11.8.jar:na]
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) 
> ~[akka-actor_2.11-2.4.20.jar:na]
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>  ~[akka-actor_2.11-2.4.20.jar:na]
> at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>  ~[akka-actor_2.11-2.4.20.jar:na]
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) 
> ~[scala-library-2.11.8.jar:na]
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) 
> ~[scala-library-2.11.8.jar:na]
> at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:32) 
> ~[scala-library-2.11.8.jar:na]
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) 
> ~[scala-library-2.11.8.jar:na]
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>  ~[akka-actor_2.11-2.4.20.jar:na]
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>  ~[akka-actor_2.11-2.4.20.jar:na]
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>  ~[akka-actor_2.11-2.4.20.jar:na]
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>  ~[akka-actor_2.11-2.4.20.jar:na]
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
> ~[scala-library-2.11.8.jar:na]
> at 
> 

Re: JMX Configuration: Missing Job Related Beans

2018-09-24 Thread
They are provided in taskmanagers.

Sayat Satybaldiyev  于 2018年9月24日周一 下午6:38写道:

> Dear all,
>
> While configuring JMX with Flink, I don't see some bean metrics that
> belongs to the job, in particular, the number in/out records per operator.
> I've checked REST API and those numbers provided there. Does flink provide
> such bean or there's an additional configuration for it?
>
> Here's a list of bean that I see in visual vm:
> jobmanager.Status.JVM.*
> jobmanager.job.downtime
> jobmanager.job.lastCheckpoint*
> jobmanager.job.RestaringTime
> jobmanager.job.uptime
>
> and a bunch of JVM related one. I've attached a print screen from VisualVM
> to the email.
>
> Configuration for JMX in flink/conf.yaml:
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporter.jmx.port: 9020-9040
>


Re: CombinableGroupReducer says The Iterable can be iterated over only once

2018-09-14 Thread
A java.util.Iterable is expected to provide iterators again and again.

On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde 
wrote:

> Hello all,
>
> I am trying to replicate the code in the Docs (
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
> )
>
> But I keep getting the following exception:
>
> The Iterable can be iterated over only once. Only the first call to
> 'iterator()' will succeed.
>
> This is what I have:
>
> class MyCombinableGroupReducer
> extends GroupReduceFunction[(Double, Double), Double]
> with GroupCombineFunction[(Double, Double), (Double, Double)] {
> import collection.JavaConverters._
> override def reduce(
>   in: java.lang.Iterable[(Double, Double)],
>   out: Collector[Double]): Unit =
>   {
> val r = in.asScala.reduce ( (a, b) =>  ///ERROR HAPPENS HERE
>   (a._1, a._2 + b._2)
> )
> out.collect(r._1 + r._2)
>   }
>
> override def combine(
>   in: lang.Iterable[(Double, Double)],
>   out: Collector[(Double, Double)]): Unit = {
>   ???
> }
>   }
>
> Where am I transversing `in` a second time? may be is the call to
> `asScala`?
>
> Bests
>
> *-- Alejandro Alcalde - elbauldelprogramador.com
> *
>


Re: Can rocksDBBackend handle rescaling?

2018-09-13 Thread
A checkpoint cannot be used for rescaling. You should take a savepoint,
stop the application, and resume from the savepoint with a new parallelism.

https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html


On Fri, Sep 14, 2018 at 1:50 AM 廖嘉逸  wrote:

> Hi, all
>
>I’m using Flink 1.6 and I’m goint to use RocksDBBackend as the
> checkpoint state backend. Before that, I need to make sure that the
> checkpoint can handle rescale. From the docs on Apache Flink
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html,
> it says that the checkpoint do not support Flink’s rescaling, however, from
> the source code, the rescaling logic seems not to be related to the state
> backend if the substask’s states are restored from files successfully.
>
>Could anyone explain this? Or tell me if I miss something?
>
>
>
> Best, Jiayi Liao
>
>
>
>


Re: Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

2018-09-07 Thread
Thank you for you advice. I had not noticed that the log level was set to
WARN.
INFO logs suggest that the job fails because of akka timeout and the root
cause is long gc pause.

On Fri, Sep 7, 2018 at 5:43 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> You may need to config at least INFO level for logger in flink, and
> currently the messages are so limited for debugging the problem.
>
> Best,
> Zhijiang
>
> ------
> 发件人:杨力 
> 发送时间:2018年9月7日(星期五) 17:21
> 收件人:Zhijiang(wangzhijiang999) 
> 主 题:Re: Flink 1.6 Job fails with IllegalStateException: Buffer pool is
> destroyed.
>
>
> I have checked logs from yarn nodemanagers, and there are no killing
> action record. There are no job canceling record in jobmanager's log either.
>
> Here are job logs retrieved from yarn.
>
> https://pastebin.com/raw/1yHLYR65
>
> Zhijiang(wangzhijiang999)  于 2018年9月7日周五
> 下午3:22写道:
> Hi,
>
> I think the problem in the attched image is not the root cause of your job
> failure. It must exist other task or TaskManager failures, then all the
> related tasks will be cancelled by job manager, and the problem in attched
> image is just caused by task cancelled.
>
> You can review the log of job manager to check whether there are any
> failures to cause failing the whole job.
> FYI, the task manager may be killed by yarn because of memory exceed. You
> mentioned the job fails in half an hour after starts, so I guess it exits
> the possibility that the task manager is killed by yarn.
>
> Best,
> Zhijiang
> --
> 发件人:杨力 
> 发送时间:2018年9月7日(星期五) 13:09
> 收件人:user 
> 主 题:Flink 1.6 Job fails with IllegalStateException: Buffer pool is
> destroyed.
>
> Hi all,
> I am encountering a weird problem when running flink 1.6 in yarn per-job
> clusters.
> The job fails in about half an hour after it starts. Related logs is
> attached as an imange.
>
> This piece of log comes from one of the taskmanagers. There are not any
> other related log lines.
> No ERROR-level logs. The job just runs for tens of minutes without
> printing any logs
> and suddenly throws this exception.
>
> It is reproducable in my production environment, but not in my test
> environment.
> The 'Buffer pool is destroed' exception is always thrown while emitting
> latency marker.
>
> cy marker.

>


Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

2018-09-06 Thread
Hi all,
I am encountering a weird problem when running flink 1.6 in yarn per-job
clusters.
The job fails in about half an hour after it starts. Related logs is
attached as an imange.

This piece of log comes from one of the taskmanagers. There are not any
other related log lines.
No ERROR-level logs. The job just runs for tens of minutes without printing
any logs
and suddenly throws this exception.

It is reproducable in my production environment, but not in my test
environment.
The 'Buffer pool is destroed' exception is always thrown while emitting
latency marker.


image.png
Description: Binary data


Re: Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

2018-08-13 Thread
Thanks for the tip! It works.

I forgot the job manager.

Hequn Cheng  于 2018年8月14日周二 上午9:15写道:

> Hi,
>
> Have you ever increased the memory of job master?
> If you run a flink job on yarn, you can increase job master's memory by
> "-yjm 1024m"[1].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#run-a-flink-job-on-yarn
>
> On Mon, Aug 13, 2018 at 10:25 PM, 杨力  wrote:
>
>> I used to runFlink SQL in streaming mode with more than 70 sqls in
>> version 1.4. With so many sqls loaded, akka.framesize has to be set to 200
>> MB to submit the job.
>>
>> When I am trying to run the job with flink 1.6.0, the HTTP-based job
>> submission works perfectly but an OutOfMemoryError is thrown when tasks are
>> being depolyed.
>>
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236)
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
>> at
>> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
>> at
>> org.apache.flink.runtime.blob.BlobWriter.serializeAndTryOffload(BlobWriter.java:99)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:827)
>> at
>> org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:580)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:963)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph$$Lambda$105/800937955.accept(Unknown
>> Source)
>> at
>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
>> at
>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown
>> Source)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>> at akka.dispatch.OnComplete.internal(Future.scala:259)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>
>> This OOM error raises even with a 12GB heap. I have dived into source
>> code, only found that ExecutionJobVertex.getTaskInformationOrBlobKey is
>> serializing a TaskInformation object, which seems not to be a large one.
>> Can anyone help me to fix or work around the problem?
>>
>
>


Flink 1.6 ExecutionJobVertex.getTaskInformationOrBlobKey OutOfMemoryError

2018-08-13 Thread
I used to runFlink SQL in streaming mode with more than 70 sqls in version
1.4. With so many sqls loaded, akka.framesize has to be set to 200 MB to
submit the job.

When I am trying to run the job with flink 1.6.0, the HTTP-based job
submission works perfectly but an OutOfMemoryError is thrown when tasks are
being depolyed.

java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
at
org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
at
org.apache.flink.runtime.blob.BlobWriter.serializeAndTryOffload(BlobWriter.java:99)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getTaskInformationOrBlobKey(ExecutionJobVertex.java:393)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:827)
at
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:580)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$2(ExecutionGraph.java:963)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph$$Lambda$105/800937955.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:541)
at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture$$Lambda$92/1432873073.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:259)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

This OOM error raises even with a 12GB heap. I have dived into source code,
only found that ExecutionJobVertex.getTaskInformationOrBlobKey is
serializing a TaskInformation object, which seems not to be a large one.
Can anyone help me to fix or work around the problem?


Jobs running on a yarn per-job cluster fail to restart when a task manager is lost

2018-05-22 Thread
Hi,

I am running a streaming job without checkpointing enabled. A failute rate
restart strategy have been set with
StreamExecutionEvironment.setRestartStrategy.

When a task manager is lost because of memory problems, the job manager try
to restart the job without launching a new task manager, and failed with
NoResourceAvailableException: Not enough slots available to run the job.

The job is running on flink 1.4.2 and Hadoop 2.7.4.


User-defined aggregation function and parallelism

2018-04-15 Thread
I am running flink SQL in streaming mode and implemented a UDAGG, which is
used in keyed HOP windows. But I found that the throughput decreases
dramatically when the function is used. Does UDAGG run in parallell? Or
does it run only in one thread?

Regards,
Bill


Re: Access logs for a running Flink app in YARN cluster

2018-04-12 Thread
Maybe you can get them from yarn with rest API.

Tao Xia  于 2018年4月13日周五 上午8:09写道:

> Any good way to get access container logs from a running Flink app in YARN
> cluster in EMR?
> You can view the logs through YARN UI. But cannot programmatically access
> it and send to other services.
> The log aggregator only runs when the application finishes or a minimum
> 3600 secs copy. Any way we can get the logs more frequently?
>


Re: subuquery about flink sql

2018-04-03 Thread
You should add a column
TUMBLE_ROWTIME(t, INTERVAL '60' SECOND) AS t
to the select part of your subquery.

韩宁宁 <453673...@qq.com> 于 2018年4月3日周二 下午3:34写道:

> Thank you for your reply.
>
> I think the table registration no problem。I guess it's a question of
> subquery。
> It's no problem to execute this SQL:
> select
> user,
> count(product),
> TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
> TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
>  from myFlinkTable GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)
>
>
>
> -- 原始邮件 --
> *发件人:* "李玥";
> *发送时间:* 2018年4月3日(星期二) 中午11:49
> *收件人:* "韩宁宁"<453673...@qq.com>;
> *抄送:* "user"; "skycrab68";
> *主题:* Re: subuquery about flink sql
>
> The exception logs tells that your table “myFlinkTable” does not contain a
> column/field named “t”. Could be something  wrong  about your table
> registration.  It would be helpful to show us your table registration code,
>  like:
>
> // register a TabletableEnv.registerTable("table1", ...)// 
> ortableEnv.registerTableSource("table2", ...); // 
> ortableEnv.registerExternalCatalog("extCat", ...);
>
>
>
>
> LiYue
> http://tig.jd.com
> liyue2...@gmail.com
>
>
>
> 在 2018年4月3日,上午11:23,韩宁宁 <453673...@qq.com> 写道:
>
> Deal All
>   I have a question about subquery of flink sql.
>   My sql like this:
>   select
> user,
> count(product),
> TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
> TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
> from (
> select
> distinct(user),
> product,
> amount,
> actionTime
> from myFlinkTable
> ) GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)
>
> The typeinfo of field like this:
> TypeInformation typeInfo = Types.ROW(
> new String[] {"user","product","amount","actionTime"},
> new TypeInformation[] {
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.LONG_TYPE_INFO,
> }
> );
>
> My datasource implements DefinedRowtimeAttribute,as follows:
> @Override
> public String getRowtimeAttribute() {
> return "t";
> }
>
> I run the test code,and get the following error.
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed. At
> line 13, column 24: Column 't' not found in any table
> at
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:92)
> at
> org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
> at com.didi.flink.sql.Main.main(Main.java:103)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> Caused by: org.apache.calcite.runtime.CalciteContextException: At line 13,
> column 24: Column 't' not found in any table
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:804)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:789)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4363)
> at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:258)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5018)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5000)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
> at
> org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
> at
> org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
> at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:859)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:5053)
> at
> org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
> at
> org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
> at 

Re: bad data output

2018-03-29 Thread
You can use a split operator, generating 2 streams.

Darshan Singh  于 2018年3月30日周五 上午2:53写道:

> Hi
>
> I have a dataset which has almost 99% of correct data. As of now if say
> some data is bad I just ignore it and log it and return only correct data.
> I do this inside a map function.
>
> The part which decides whether data is correct or not is expensive one.
>
> Now I want to store the bad data somewhere so that I could analyze that
> data in future.
>
> So I can run the same calc 2 times and get the correct data in first go
> and bad data in 2nd go.
>
> Is there a better way where I can somehow store the bad data from inside
> of map function like send to kafka, file etc?
>
> Also, is there a way I could create a datastream which can get the data
> from inside map function(not sure this is feasible as of now)?
>
> Thanks
>


Re: Extremely large job serialization produced by union operator

2018-03-14 Thread
I understand complex SQL queries would be translated into large DAGs.
However, the submission succeeds in my case if I don't use union operator.
It might be a potential bug related to it. For example, following code
submisses successfully with the default limitations of akka.framesize.

val sqls: Seq[String] = ...
val sink: JDBCAppendTableSink = ...

sqls foreach {
  sql =>
val table = tEnv.sqlQuery(sql)
val outputStream = tEnv.toAppendStream[Row](table) map {
  ...
}
tEnv.fromDataStream(outputStream).writeToSink(sink)
}

If I union these outputStreams and send it to a single sink, the size of
serialized job will be 100 MB.

val outputStream = sqls map {
  sql =>
val table = tEnv.sqlQuery(sql)
tEnv.toAppendStream[Row](table) map {
  ...
}
} reduce {
  (a, b) => a union b
}
tEnv.fromDataStream(outputStream).writeToSink(sink)

I failed to reproduce it without actually used table schemas and SQL
queries in my production. And at last I wrote my own JDBC sink with
connection pooling to migrate this problem. Maybe someone familiar with the
implementation of union operator would figure out what's going wrong.

Fabian Hueske <fhue...@gmail.com> 于 2018年3月13日周二 下午11:42写道:

> Hi Bill,
>
> The size of the program depends on the number and complexity SQL queries
> that you are submitting.
> Each query might be translated into a sequence of multiple operators. Each
> operator has a string with generated code that will be compiled on the
> worker nodes. The size of the code depends on the number of fields in the
> schema.
> Operators and code are not shared across queries.
>
> Best, Fabian
>
> 2018-03-09 23:36 GMT+01:00 杨力 <bill.le...@gmail.com>:
>
>> Thank you for your response. It occurs both in a standalone cluster anda
>> a yarn-cluster. I am trying to remove business code and reproduce it with a
>> minimal demo.
>>
>>
>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Could you provide more details about your queries and setup? Logs could
>>> be helpful as well.
>>>
>>> Piotrek
>>>
>>> > On 9 Mar 2018, at 11:00, 杨力 <bill.le...@gmail.com> wrote:
>>> >
>>> > I wrote a flink-sql app with following topography.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > ...
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> >
>>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>>> number of JDBCAppendTableSink times parallelism, that is the number of
>>> concurrent connections to database, is too large for the database server to
>>> handle. So I tried union DataStreams before connecting them to the
>>> TableSink.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> > \
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>>> JDBCAppendTableSink
>>> > ... /
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> >
>>> > With this strategy, job submission failed with an
>>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>>> avoid this exception, but job submission hangs and times out.
>>> >
>>> > I can't understand why a simple union operator would serialize to such
>>> a large message. Can I avoid this problem?
>>> > Or can I change some configuration to fix the submission time out?
>>> >
>>> > Regards,
>>> > Bill
>>>
>>>
>


Re: Extremely large job serialization produced by union operator

2018-03-09 Thread
Thank you for your response. It occurs both in a standalone cluster anda a
yarn-cluster. I am trying to remove business code and reproduce it with a
minimal demo.

On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Could you provide more details about your queries and setup? Logs could be
> helpful as well.
>
> Piotrek
>
> > On 9 Mar 2018, at 11:00, 杨力 <bill.le...@gmail.com> wrote:
> >
> > I wrote a flink-sql app with following topography.
> >
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
> JDBCAppendTableSink
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
> JDBCAppendTableSink
> > ...
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
> JDBCAppendTableSink
> >
> > I have a dozen of TableSources And tens of SQLs. As a result, the number
> of JDBCAppendTableSink times parallelism, that is the number of concurrent
> connections to database, is too large for the database server to handle. So
> I tried union DataStreams before connecting them to the TableSink.
> >
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> > \
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
> JDBCAppendTableSink
> > ... /
> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> >
> > With this strategy, job submission failed with an
> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
> avoid this exception, but job submission hangs and times out.
> >
> > I can't understand why a simple union operator would serialize to such a
> large message. Can I avoid this problem?
> > Or can I change some configuration to fix the submission time out?
> >
> > Regards,
> > Bill
>
>


Re: Flink 1.4 SQL API Streaming TableException

2018-03-09 Thread
To use a field in a table as timestamp, it must be declared as a rowtime
attribute for the table.

1) Call env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime).
2) Call withRowtimeAttribute on KafkaJsonTableSourceBuilder.

Reference:
1.
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configuring-a-processing-time-attribute

On Sat, Mar 10, 2018 at 4:49 AM Pavel Ciorba  wrote:

> Hi everyone!
>
> I decided to try the Time-windowed join functionality of Flink 1.4+.
>
> My SQL query is an exact copy of the example in the documentation, and the
> program reads and writes from Kafka.
>
> I used the example from here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins
>
> Code:
> https://gist.github.com/invoker27/ecb4f4b38a52642089e41f6f49886c28
>
> Dependencies:
> compile group: 'org.apache.flink', name: 'flink-table_2.11', version:
> '1.4.0'
> compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11',
> version: '1.4.0'
> compile group: 'org.apache.flink', name:
> 'flink-connector-kafka-0.11_2.11', version: '1.4.0'
>
> Error:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
> FlinkLogicalJoin(condition=[AND(=($0, $5), >=($3, -($8, 1440)), <=($3,
> $8))], joinType=[inner])
>   FlinkLogicalTableSourceScan(table=[[TABLE1]], fields=[id, value1,
> timestamp], source=[KafkaJSONTableSource])
>   FlinkLogicalTableSourceScan(table=[[TABLE2]], fields=[id, value2,
> timestamp], source=[KafkaJSONTableSource])
>
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
> at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
> at
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:251)
> at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
> at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
> at
> com.sheffield.healthmonitoring.join.conditionalhr.JoinSQL.main(JoinSQL.java:72)
>
>
> I get the error in 1.4.0, 1.4.1 and 1.4.2, but 1.5-SNAPSHOT works.
>
> From what I can see the feature should work in 1.4.
>
> What might be the issue?
>
> Thank you!
>


Extremely large job serialization produced by union operator

2018-03-09 Thread
I wrote a flink-sql app with following topography.

KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
...
KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink

I have a dozen of TableSources And tens of SQLs. As a result, the number of
JDBCAppendTableSink times parallelism, that is the number of concurrent
connections to database, is too large for the database server to handle. So
I tried union DataStreams before connecting them to the TableSink.

KafkaJsonTableSource -> SQL -> toAppendStream -> Map
\
KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
JDBCAppendTableSink
... /
KafkaJsonTableSource -> SQL -> toAppendStream -> Map

With this strategy, job submission failed with an OversizedPayloadException
of 104 MB. Increasing akka.framesize helps to avoid this exception, but job
submission hangs and times out.

I can't understand why a simple union operator would serialize to such a
large message. Can I avoid this problem?
Or can I change some configuration to fix the submission time out?

Regards,
Bill


Re: Using time window with SQL nested query

2018-03-05 Thread
Thanks. It works. I missed it while reading the document.

Timo Walther <twal...@apache.org> 于 2018年3月5日周一 下午9:20写道:

> Hi Bill,
>
> you can use HOP_ROWTIME()/HOP_PROCTIME() to propagate the time attribute
> to the outer query. See also [1] for an example.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#selecting-group-window-start-and-end-timestamps
>
>
>
> Am 3/5/18 um 11:26 AM schrieb 杨力:
>
>
>


Using time window with SQL nested query

2018-03-05 Thread
I tried to use write a nested query with HOP window in a streaming
environment.

Table `source` consists of 3 column, a, b, and timestamp.

SELECT a FROM (SELECT a, COUNT(*) FROM source GROUP BY HOP(timestamp, ...,
...), a, b) GROUP BY HOP(timestamp, ..., ...), a HAVING ...

And flink throws an exception of "Column 'timestamp' not found in any
table".

And I tried to "SELECT HOP_END(timestamp, ..., ...) AS timestamp, a,
COUNT(*)" in the inner query, getting an exception of "Window can only be
defined over a time attribute column."

Can I make the rowtime attribute propagating to the outer query, just like
chaining windows in DataStream API?

Regrads,
Bill