Re: Flink on Yarn resource arrangement

2019-11-13 Thread vino yang
Hi Alex,

Which Flink version are you using?

AFAIK, since Flink 1.8+, the config option: "-yn" for Flink on YARN job
cluster mode does not take effect(always 1 and would be overridden).

So, the config option "-ys" and "-p" will decide the number of TM.

The first example: -p(20)/-ys(3) should be advanced to 7 to match the
requirement of parallelism. So total slots should be 7 * 3 = 21.

The second example: -p(20)/-ys(4) = 5. So total slots should be 5 * 4 = 20.

Best,
Vino

qq <471237...@qq.com> 于2019年11月14日周四 下午3:26写道:

> Hi all,
>
>Could you list details how  Flink job on Yarn resources managed ?
>
>   I used command “-p 20 -yn 5 -ys 3 -yjm 2048m -ytm 2048m” to run flink
> job. I got
> containers vcores
> 8 22
> Task Managers 7 Total Task Slots 21
>
>
> I used command “-p 20 -yn 7 -ys 4 -yjm 2048m -ytm 2048m” to run flink job,
> I got
> containers vcores
>621
> Total Task Slots 20 Task Managers 5
>
> Could you help give the exactly resources formula ? Thanks very much.
>
>
>
> Alex Fu
> 2019/11/14


Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

2019-11-13 Thread Polarisary
My sql is regular insert like “insert into sink_table select c1,c2,c3 from 
source_table”, 
I want to know which case it will judge to append only? Does it has doc for 
this?

Many thanks!





> 在 2019年11月14日,上午10:05,张万新  写道:
> 
> Yes it's related to your sql, flink checks the plan of your sql to judge 
> whether your job is append only or has updates. If your job is append only, 
> that means no result need to be updated.
> 
> If you still have problems, please post your sql and complete error message 
> to help people understand your use case.
> 
> Polarisary mailto:polaris...@gmail.com>> 
> 于2019年11月13日周三 下午6:43写道:
> Hi
> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly 
> is modified to ture, and keyFields is modified to null by StreamExecSink, but 
> i want to upsert,
> Does this  related to sql?
> 
> the stack as follows:
> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
>   at task.Device.main(Device.java:77)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> 
> Hope to reply!
> many thanks
> 



Flink on Yarn resource arrangement

2019-11-13 Thread qq
Hi all,

   Could you list details how  Flink job on Yarn resources managed ? 

  I used command “-p 20 -yn 5 -ys 3 -yjm 2048m -ytm 2048m” to run flink job. I 
got 
containers vcores
8 22
Task Managers 7 Total Task Slots 21 


I used command “-p 20 -yn 7 -ys 4 -yjm 2048m -ytm 2048m” to run flink job, I got
containers vcores
   621
Total Task Slots 20 Task Managers 5

Could you help give the exactly resources formula ? Thanks very much.



Alex Fu
2019/11/14

Re: Initialization of broadcast state before processing main stream

2019-11-13 Thread vino yang
Hi Vasily,

Currently, Flink did not do the coordination between a general stream and
broadcast stream, they are both streams. Your scene of using the broadcast
state is a special one. In a more general scene, the states need to be
broadcasted is an unbounded stream, the state events may be broadcasted to
the downstream at any time. So it can not be wait to be done before playing
the usual stream events.

For your scene:


   - you can change your storage about dimension table, e.g. Redis or MySQL
   and so on to do the stream and dimension table join;
   - you can inject some control event in your broadcast stream to mark the
   stream is end and let the fact stream wait until receiving the control
   event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
   coordinate them, however, it would make your solution more complex.

Best,
Vino


Vasily Melnik  于2019年11月14日周四
下午1:28写道:

> Hi all.
>
> In our task we have two Kafka topics:
> - one with fact stream (web traffic)
> - one with dimension
>
> We would like to put dimension data into broadcast state and lookup on int
> with facts. But we see that not all dimension records are put into state
> before first fact record is processed, so lookup gives no data.
>
> The question is: how could we read fact topic with some "delay" to give
> dimension enough time to initialize state?
>
>
> С уважением,
> Василий Мельник
>


Initialization of broadcast state before processing main stream

2019-11-13 Thread Vasily Melnik
Hi all.

In our task we have two Kafka topics:
- one with fact stream (web traffic)
- one with dimension

We would like to put dimension data into broadcast state and lookup on int
with facts. But we see that not all dimension records are put into state
before first fact record is processed, so lookup gives no data.

The question is: how could we read fact topic with some "delay" to give
dimension enough time to initialize state?


С уважением,
Василий Мельник


Re: 流处理任务失败该如何追回之前的数据

2019-11-13 Thread Dian Fu
如果使用的event 
time,watermark是根据event计算出来的,和系统时间没有关系,所以从最后一次checkpoint恢复即可。为什么你会觉得有问题?

> 在 2019年11月13日,下午8:29,柯桂强  写道:
> 
> 我现在有一个流处理任务失败了,并且保留了checkpoint或者savepoint,我希望从最后一次checkpoint恢复,但是任务使用的是事件时间,超过窗口的数据就会被丢弃,我想到一个方法是,重启之前的数据通过批处理完成然后跑流处理,想问问大家这个方案是否可行,但是感觉如何限定批处理的范围并且和之后的流处理完美拼接是一个比较难的问题



Re: Flink (Local) Environment Thread Leaks?

2019-11-13 Thread tison
It is because MiniCluster start a SystemResourcesCounter for gathering
metrics but no
logic for shutdown. Thus on cluster exist the thread leak.

Best,
tison.


tison  于2019年11月14日周四 上午10:21写道:

> We found this issue previous.
>
> In our case where leak thread comes from is tracked as
> https://issues.apache.org/jira/browse/FLINK-14565
>
> Best,
> tison.
>
>
> vino yang  于2019年11月14日周四 上午10:15写道:
>
>> Hi Theo,
>>
>> If you think there is a thread leakage problem. You can create a JIRA
>> issue and write a detailed description.
>>
>> Ping @Gary Yao   and @Zhu Zhu  to
>> help to locate and analyze this problem?
>>
>> Best,
>> Vino
>>
>> Theo Diefenthal  于2019年11月14日周四
>> 上午3:16写道:
>>
>>> I included a Solr End2End test in my project, inheriting from Junit 4
>>> SolrCloudTestCase.
>>>
>>> The solr-test-framework for junit 4 makes use of 
>>> com.carrotsearch.randomizedtesting
>>> which automatically tests for thread leakages on test end. In my other
>>> projects, that tool doesn't produce any problems.
>>> When used in a test together with a Flink LocalExecutionEnvironment, it
>>> will prevent the test from suceeding due the following error at shutdown
>>> phase:
>>>
>>> com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked
>>> from SUITE scope at somepackage.E2ETest:
>>>1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1,
>>> state=TIMED_WAITING, group=TGRP-E2ETest]
>>> at sun.misc.Unsafe.park(Native Method)
>>> at
>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>> at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING,
>>> group=TGRP-E2ETest]
>>> at sun.misc.Unsafe.park(Native Method)
>>> at
>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>> at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>3) Thread[id=28, name=metrics-meter-tick-thread-1,
>>> state=TIMED_WAITING, group=TGRP-E2ETest]
>>> at sun.misc.Unsafe.park(Native Method)
>>> at
>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>> at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)
>>>
>>> Note that I can suppress the errors easily via setting
>>> @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want
>>> to point out possible thread leaks in the mailing list here. As the
>>> first thread is named FlinkCompletableFutureDelayScheduler, I suggest that
>>> Flink doesn't shut down some of its multitude of threads nicely in a local
>>> execution environment. My question: Is that some kind of problem / thread
>>> leakage in Flink or is it just a false warning?
>>>
>>>
>>>
>>>


Re: Flink (Local) Environment Thread Leaks?

2019-11-13 Thread tison
We found this issue previous.

In our case where leak thread comes from is tracked as
https://issues.apache.org/jira/browse/FLINK-14565

Best,
tison.


vino yang  于2019年11月14日周四 上午10:15写道:

> Hi Theo,
>
> If you think there is a thread leakage problem. You can create a JIRA
> issue and write a detailed description.
>
> Ping @Gary Yao   and @Zhu Zhu  to
> help to locate and analyze this problem?
>
> Best,
> Vino
>
> Theo Diefenthal  于2019年11月14日周四
> 上午3:16写道:
>
>> I included a Solr End2End test in my project, inheriting from Junit 4
>> SolrCloudTestCase.
>>
>> The solr-test-framework for junit 4 makes use of 
>> com.carrotsearch.randomizedtesting
>> which automatically tests for thread leakages on test end. In my other
>> projects, that tool doesn't produce any problems.
>> When used in a test together with a Flink LocalExecutionEnvironment, it
>> will prevent the test from suceeding due the following error at shutdown
>> phase:
>>
>> com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from
>> SUITE scope at somepackage.E2ETest:
>>1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1,
>> state=TIMED_WAITING, group=TGRP-E2ETest]
>> at sun.misc.Unsafe.park(Native Method)
>> at
>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>> at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING,
>> group=TGRP-E2ETest]
>> at sun.misc.Unsafe.park(Native Method)
>> at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>> at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>3) Thread[id=28, name=metrics-meter-tick-thread-1,
>> state=TIMED_WAITING, group=TGRP-E2ETest]
>> at sun.misc.Unsafe.park(Native Method)
>> at
>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>> at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)
>>
>> Note that I can suppress the errors easily via setting
>> @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want
>> to point out possible thread leaks in the mailing list here. As the
>> first thread is named FlinkCompletableFutureDelayScheduler, I suggest that
>> Flink doesn't shut down some of its multitude of threads nicely in a local
>> execution environment. My question: Is that some kind of problem / thread
>> leakage in Flink or is it just a false warning?
>>
>>
>>
>>


Re: Flink (Local) Environment Thread Leaks?

2019-11-13 Thread vino yang
Hi Theo,

If you think there is a thread leakage problem. You can create a JIRA issue
and write a detailed description.

Ping @Gary Yao   and @Zhu Zhu  to
help to locate and analyze this problem?

Best,
Vino

Theo Diefenthal  于2019年11月14日周四 上午3:16写道:

> I included a Solr End2End test in my project, inheriting from Junit 4
> SolrCloudTestCase.
>
> The solr-test-framework for junit 4 makes use of 
> com.carrotsearch.randomizedtesting
> which automatically tests for thread leakages on test end. In my other
> projects, that tool doesn't produce any problems.
> When used in a test together with a Flink LocalExecutionEnvironment, it
> will prevent the test from suceeding due the following error at shutdown
> phase:
>
> com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from
> SUITE scope at somepackage.E2ETest:
>1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1,
> state=TIMED_WAITING, group=TGRP-E2ETest]
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING,
> group=TGRP-E2ETest]
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>3) Thread[id=28, name=metrics-meter-tick-thread-1, state=TIMED_WAITING,
> group=TGRP-E2ETest]
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)
>
> Note that I can suppress the errors easily via setting
> @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want
> to point out possible thread leaks in the mailing list here. As the first
> thread is named FlinkCompletableFutureDelayScheduler, I suggest that Flink
> doesn't shut down some of its multitude of threads nicely in a local
> execution environment. My question: Is that some kind of problem / thread
> leakage in Flink or is it just a false warning?
>
>
>
>


Re: flink里删除cassandra的记录

2019-11-13 Thread 163
Dear

您好,

我们使用 DataStax Cassandra Driver 定制了 Cassandra Sink 实现了 detele 操作。

> On Nov 13, 2019, at 3:54 PM, 陈程程  wrote:
> 
> 大家好,
> 我们可以通过CassandraSink插入行到Cassandra,但是如何删除该行呢?
> 我试图写了一个定制的sink,但是出错了(NotSerializableException)。到底如何执行删除行操作呀?
> Thanks,程程



Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

2019-11-13 Thread 张万新
Yes it's related to your sql, flink checks the plan of your sql to judge
whether your job is append only or has updates. If your job is append only,
that means no result need to be updated.

If you still have problems, please post your sql and complete error message
to help people understand your use case.

Polarisary  于2019年11月13日周三 下午6:43写道:

> Hi
> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql,
> the isAppendOnly is modified to ture, and keyFields is modified to null by
> StreamExecSink, but i want to upsert,
> Does this related to sql?
>
> the stack as follows:
> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
> at task.Device.main(Device.java:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>
> Hope to reply!
> many thanks
>
>


Flink (Local) Environment Thread Leaks?

2019-11-13 Thread Theo Diefenthal
I included a Solr End2End test in my project, inheriting from Junit 4 
SolrCloudTestCase. 

The solr-test-framework for junit 4 makes use of 
com.carrotsearch.randomizedtesting which automatically tests for thread 
leakages on test end. In my other projects, that tool doesn't produce any 
problems. 
When used in a test together with a Flink LocalExecutionEnvironment, it will 
prevent the test from suceeding due the following error at shutdown phase: 

com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from SUITE 
scope at somepackage.E2ETest: 
1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1, 
state=TIMED_WAITING, group=TGRP-E2ETest] 
at sun.misc.Unsafe.park(Native Method) 
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
 
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 
2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING, 
group=TGRP-E2ETest] 
at sun.misc.Unsafe.park(Native Method) 
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
 
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 
3) Thread[id=28, name=metrics-meter-tick-thread-1, state=TIMED_WAITING, 
group=TGRP-E2ETest] 
at sun.misc.Unsafe.park(Native Method) 
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
 
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0) 

Note that I can suppress the errors easily via setting 
@ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want to 
point out possible thread leaks in the mailing list here. As the first thread 
is named FlinkCompletableFutureDelayScheduler, I suggest that Flink doesn't 
shut down some of its multitude of threads nicely in a local execution 
environment. My question: Is that some kind of problem / thread leakage in 
Flink or is it just a false warning? 





回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-13 Thread Yuan,Youjun
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS 
sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink 
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature 
FROM ( 
SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER 
(PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS 
sum_temperature 
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, 
deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, 
INTERVAL '60' SECOND), deviceid
)
)

我用如下测试数据:
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row 
over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source 
type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":5}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"1,dev1,1.1",
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - 
sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, 
max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts 
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT 
TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS 
max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), 
deviceid)) "
}



-邮件原件-
发件人: Chennet Steven  
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
 webchat 38798579

发件人: Dian Fu
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
 

> 在 2019年11月7日,下午7:06,Chennet Steven  写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> 
> 发件人: wenlong.lwl 
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org 
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> 

Re: Running flink example programs-WordCount

2019-11-13 Thread RAMALINGESWARA RAO THOTTEMPUDI
Respected Sir, 

This is my code of flink gelly: 

for(Vertex vertex : node_set) { 
System.out.println(vertex); 
graph_copy.removeVertex( vertex); 
temp.add(vertex); 
} 

For some reason .removeVertex is not working even on giving correct parameters. 
Please help. Given a list of edges, remove vertices with some degree i 
recursively. 


From: "RAMALINGESWARA RAO THOTTEMPUDI"  
To: "user"  
Sent: Wednesday, November 13, 2019 8:44:32 PM 
Subject: Re: Running flink example programs-WordCount 

Respected Sir, 

This is my code of flink gelly: 

for(Vertex vertex : node_set) { 
System.out.println(vertex); 
graph_copy.removeVertex(vertex); 
temp.add(vertex); 
} 

For some reason .removeVertex is not working even on giving correct parameters. 
Please help. Given a list of edges, remove vertices with some degree i 
recursively. 

regards 
TR RAO 


From: "RAMALINGESWARA RAO THOTTEMPUDI"  
To: "user"  
Sent: Monday, August 26, 2019 6:15:51 PM 
Subject: Running flink example programs-WordCount 

Hi, 
I am using the command 

" ./bin/flink run ./examples/batch/WordCount.jar --input 
/home/trrao/Desktop/ram2.txt --output /home/trrao/Desktop/ramop.txt " 

But I am getting " Caused by: java.net.ConnectException:connection refused" 

Kindly give the correct to run the wordcount example in flink batch examples. 

regards 
TR RAO 

From: "RAMALINGESWARA RAO THOTTEMPUDI"  
To: "Vishwas Siravara"  
Cc: "user"  
Sent: Tuesday, August 20, 2019 9:51:09 PM 
Subject: Re: Configuring logback 

Hi 

How to build a Graph using Flink Gelly from a text file that consists of edge 
list. 

regards 
TR RAO 


From: "Vishwas Siravara"  
To: "user"  
Sent: Tuesday, August 20, 2019 9:17:01 PM 
Subject: Configuring logback 

Hi guys, 
I am using logback for my application logs. I have logback.xml as a part of my 
fat jar that I submit to flink via command line flink run "...". When I run my 
application from IDE , the appenders are what I have set in my logback but when 
I run from command line the appender defaults to the root in the flink 
installation directory. How can I make sure that my application logs go to the 
correct appender. Here is my logback.xml file which is available in the 
classpath. 

 

 

 
 true  
 

 
 

 
 
 
 INFO  
 ACCEPT  
 DENY  
 
 ${APP_LOG_ROOT}service.log  
 
 ${APP_LOG_ROOT}Archive/service.%d{-MM-dd_HH}.log.gz 
 
 
 
 %d{-MM-dd_HH:mm:ss.SSS} %p %c | %m%n  
 UTF-8  
 
 
 
 0  
 5  
 
 
 
 
 


 
 
 
 ERROR  
 ACCEPT  
 DENY  
 
 ${APP_LOG_ROOT}service-error.log  
 
 ${APP_LOG_ROOT}Archive/service-error.%d{-MM-dd_HH}.log.gz 
 
 
 
 %d{-MM-dd_HH:mm:ss.SSS} %p %c %m%n  
 UTF-8  
 
 
 
 0  
 5  
 
 
 
 
 



 
 
 
 %d{-MM-dd_HH:mm:ss.SSS} [%thread] %-5level %logger{5} - 
%m%n  
 UTF-8  
 
 

 
 
 

 

Thanks, 
Vishwas 



Re: Running flink example programs-WordCount

2019-11-13 Thread RAMALINGESWARA RAO THOTTEMPUDI
Respected Sir, 

This is my code of flink gelly: 

for(Vertex vertex : node_set) { 
System.out.println(vertex); 
graph_copy.removeVertex( vertex); 
temp.add(vertex); 
} 

For some reason .removeVertex is not working even on giving correct parameters. 
Please help. Given a list of edges, remove vertices with some degree i 
recursively. 

regards 
TR RAO 


From: "RAMALINGESWARA RAO THOTTEMPUDI"  
To: "user"  
Sent: Monday, August 26, 2019 6:15:51 PM 
Subject: Running flink example programs-WordCount 

Hi, 
I am using the command 

" ./bin/flink run ./examples/batch/WordCount.jar --input 
/home/trrao/Desktop/ram2.txt --output /home/trrao/Desktop/ramop.txt " 

But I am getting " Caused by: java.net.ConnectException:connection refused" 

Kindly give the correct to run the wordcount example in flink batch examples. 

regards 
TR RAO 

From: "RAMALINGESWARA RAO THOTTEMPUDI"  
To: "Vishwas Siravara"  
Cc: "user"  
Sent: Tuesday, August 20, 2019 9:51:09 PM 
Subject: Re: Configuring logback 

Hi 

How to build a Graph using Flink Gelly from a text file that consists of edge 
list. 

regards 
TR RAO 


From: "Vishwas Siravara"  
To: "user"  
Sent: Tuesday, August 20, 2019 9:17:01 PM 
Subject: Configuring logback 

Hi guys, 
I am using logback for my application logs. I have logback.xml as a part of my 
fat jar that I submit to flink via command line flink run "...". When I run my 
application from IDE , the appenders are what I have set in my logback but when 
I run from command line the appender defaults to the root in the flink 
installation directory. How can I make sure that my application logs go to the 
correct appender. Here is my logback.xml file which is available in the 
classpath. 

 

 

 
 true  
 

 
 

 
 
 
 INFO  
 ACCEPT  
 DENY  
 
 ${APP_LOG_ROOT}service.log  
 
 ${APP_LOG_ROOT}Archive/service.%d{-MM-dd_HH}.log.gz 
 
 
 
 %d{-MM-dd_HH:mm:ss.SSS} %p %c | %m%n  
 UTF-8  
 
 
 
 0  
 5  
 
 
 
 
 


 
 
 
 ERROR  
 ACCEPT  
 DENY  
 
 ${APP_LOG_ROOT}service-error.log  
 
 ${APP_LOG_ROOT}Archive/service-error.%d{-MM-dd_HH}.log.gz 
 
 
 
 %d{-MM-dd_HH:mm:ss.SSS} %p %c %m%n  
 UTF-8  
 
 
 
 0  
 5  
 
 
 
 
 



 
 
 
 %d{-MM-dd_HH:mm:ss.SSS} [%thread] %-5level %logger{5} - 
%m%n  
 UTF-8  
 
 

 
 
 

 

Thanks, 
Vishwas 



流处理任务失败该如何追回之前的数据

2019-11-13 Thread 柯桂强
我现在有一个流处理任务失败了,并且保留了checkpoint或者savepoint,我希望从最后一次checkpoint恢复,但是任务使用的是事件时间,超过窗口的数据就会被丢弃,我想到一个方法是,重启之前的数据通过批处理完成然后跑流处理,想问问大家这个方案是否可行,但是感觉如何限定批处理的范围并且和之后的流处理完美拼接是一个比较难的问题

Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-13 Thread Dian Fu
1)在Table API & SQL中,RuntimeContext是不暴露给用户用的,所以是private
2)窗口之间聚合值的差值,可以看看cep能否满足需求,可以参考文档: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html
 

> 在 2019年11月13日,下午3:35,Chennet Steven  写道:
> 
> 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
>  UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
> 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
> 
> From stevenchen
> webchat 38798579
> 
> 发件人: Dian Fu
> 发送时间: Thursday, November 7, 2019 19:41
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
> 
> 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
>  
> 
>> 在 2019年11月7日,下午7:06,Chennet Steven  写道:
>> 
>> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
>> 能否给个example或者是test代码的链接啊?
>> 
>> From stevenchen
>>webchat 38798579
>> 
>> 
>> 发件人: wenlong.lwl 
>> 发送时间: Thursday, November 7, 2019 2:13:43 PM
>> 收件人: user-zh@flink.apache.org 
>> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>> 
>> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>> 
>> On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:
>> 
>>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>>> 如何在聚合函数中使用State?
>>> 
>>> 
>>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>>> TypeInformation}
>>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>>> import org.apache.flink.table.functions.{AggregateFunction,
>>> FunctionContext}
>>> import java.lang.{Iterable => JIterable}
>>> 
>>> 
>>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>> 
>>> class IntDiffSumFunction extends AggregateFunction[Int,
>>> IntDiffSumAccumulator] {
>>> 
>>> override def open(context: FunctionContext): Unit = {
>>>   // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>>   //getRuntimeContext.getState(desc)
>>>   val a = this.hashCode()
>>>   print(s"hashCode:$a")
>>>   super.open(context)
>>> }
>>> 
>>> override def createAccumulator(): IntDiffSumAccumulator = {
>>>   val acc = new IntDiffSumAccumulator()
>>>   acc.f0 = 0
>>>   acc.f1 = false
>>>   acc
>>> }
>>> 
>>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>>   accumulator.f0 += value
>>>   accumulator.f1 = true
>>> }
>>> 
>>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>>   if (accumulator.f1) {
>>> 
>>> accumulator.f0
>>>   } else {
>>> Int.MinValue
>>>   }
>>> }
>>> 
>>> def merge(acc: IntDiffSumAccumulator, its:
>>> JIterable[IntDiffSumAccumulator]) = {
>>>   val iter = its.iterator()
>>>   while (true) {
>>> val a = iter.next()
>>> if (a.f1) {
>>>   acc.f0 += a.f0
>>>   acc.f1 = true
>>> }
>>>   }
>>> }
>>> 
>>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>>   acc.f0 = 0
>>>   acc.f1 = false
>>> }
>>> 
>>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>>   new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>>> }
>>> 
>>> 
>>> From stevenchen
>>>webchat 38798579
>>> 
>>> 
>>> 
> 



Flink-JDBC JDBCUpsertTableSink keyFields Problem

2019-11-13 Thread Polarisary
Hi
When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly 
is modified to ture, and keyFields is modified to null by StreamExecSink, but i 
want to upsert,
Does this  related to sql?

the stack as follows:
at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
at task.Device.main(Device.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)

Hope to reply!
many thanks