Re: Flink 'Job Cluster' mode Ui Access

2019-11-27 Thread vino yang
Hi Jatin,

Flink web UI does not depend on any deployment mode.

You should check if there are error logs in the log file and the job status
is running state.

Best,
Vino

Jatin Banger  于2019年11月28日周四 下午3:43写道:

> Hi,
>
> It seems there is Web Ui for Flink Session cluster, But for Flink Job
> Cluster it is Showing
>
> {"errors":["Not found."]}
>
> Is it the expected behavior for Flink Job Cluster Mode ?
>
> Best Regards,
> Jatin
>


flink Connection timed out ????????

2019-11-27 Thread Mr.??????
flink??slots??100+TMConnection
 timed 
outflink

??


flink?? 1.9.1
??standalone??tm??slot??150+checkpoint

taskmanager.heap.size: 1536m
taskmanager.memory.fraction: 0.1
env.java.opts.taskmanager: -XX:+UseG1GC -Xss512K
taskmanager.numberOfTaskSlots: 1
taskmanager.network.memory.fraction: 0.3
taskmanager.network.memory.min: 512mb
taskmanager.network.memory.max: 512mb



??
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
Connection timed out (connection to '/10.20.84.44:37391')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at 

Flink 'Job Cluster' mode Ui Access

2019-11-27 Thread Jatin Banger
Hi,

It seems there is Web Ui for Flink Session cluster, But for Flink Job
Cluster it is Showing

{"errors":["Not found."]}

Is it the expected behavior for Flink Job Cluster Mode ?

Best Regards,
Jatin


?????? JobGraphs not cleaned up in HA mode

2019-11-27 Thread ??????
the config (/flink is the NASdirectory ):


jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 16
web.upload.dir: /flink/webUpload
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
high-availability: zookeeper
high-availability.cluster-id: /cluster-test
high-availability.storageDir: /flink/ha
high-availability.zookeeper.quorum: :2181
high-availability.jobmanager.port: 6123
high-availability.zookeeper.path.root: /flink/risk-insight
high-availability.zookeeper.path.checkpoints: /flink/zk-checkpoints
state.backend: filesystem
state.checkpoints.dir: file:///flink/checkpoints
state.savepoints.dir: file:///flink/savepoints
state.checkpoints.num-retained: 2
jobmanager.execution.failover-strategy: region
jobmanager.archive.fs.dir: file:///flink/archive/history






----
??:"Vijay Bhaskar"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread Vijay Bhaskar
Can you share the flink configuration once?

Regards
Bhaskar

On Thu, Nov 28, 2019 at 12:09 PM 曾祥才  wrote:

> if i clean the zookeeper data , it runs fine .  but next time when the
> jobmanager failed and redeploy the error occurs again
>
>
>
>
> -- 原始邮件 --
> *发件人:* "Vijay Bhaskar";
> *发送时间:* 2019年11月28日(星期四) 下午3:05
> *收件人:* "曾祥才";
> *主题:* Re: JobGraphs not cleaned up in HA mode
>
> Again it could not find the state store file: "Caused by:
> java.io.FileNotFoundException: /flink/ha/submittedJobGraph0c6bcff01199 "
>  Check why its unable to find.
> Better thing is: Clean up zookeeper state and check your configurations,
> correct them and restart cluster.
> Otherwise it always picks up corrupted state from zookeeper and it will
> never restart
>
> Regards
> Bhaskar
>
> On Thu, Nov 28, 2019 at 11:51 AM 曾祥才  wrote:
>
>> i've made a misstake( the log before is another cluster) . the full
>> exception log is :
>>
>>
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Recovering all persisted jobs.
>> 2019-11-28 02:33:12,726 INFO
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>> Starting the SlotManager.
>> 2019-11-28 02:33:12,743 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Released locks of job graph 639170a9d710bacfd113ca66b2aacefa from
>> ZooKeeper.
>> 2019-11-28 02:33:12,744 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>> occurred in the cluster entrypoint.
>> org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
>> leadership with session id 607e1fe0-f1b4-4af0-af16-4137d779defb.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.RuntimeException:
>> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph
>> from state handle under /639170a9d710bacfd113ca66b2aacefa. This indicates
>> that the retrieved state handle is broken. Try cleaning the state handle
>> store.
>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>> ... 7 more
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> submitted JobGraph from state handle under
>> /639170a9d710bacfd113ca66b2aacefa. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:190)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:755)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:740)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:721)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$27(Dispatcher.java:888)
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
>> ... 9 more
>> Caused by: java.io.FileNotFoundException:
>> /flink/ha/submittedJobGraph0c6bcff01199 (No such file or directory)
>> at java.io.FileInputStream.open0(Native Method)
>> at java.io.FileInputStream.open(FileInputStream.java:195)
>>
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Vijay Bhaskar";
>> *发送时间:* 2019年11月28日(星期四) 下午2:46
>> *收件人:* "曾祥才";
>> *抄送:* "User-Flink";
>> *主题:* Re: JobGraphs not cleaned up in HA mode
>>
>> Is it filesystem or hadoop? If its NAS then why the exception "Caused by:
>> org.apache.hadoop.hdfs.BlockMissingException: "
>> It seems you configured hadoop state store and giving NAS mount.
>>
>> 

?????? JobGraphs not cleaned up in HA mode

2019-11-27 Thread ??????
if i clean the zookeeper data , it runs fine . but next time when the 
jobmanager failed and redeploy the error occurs again








----
??:"Vijay Bhaskar"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread Vijay Bhaskar
Is it filesystem or hadoop? If its NAS then why the exception "Caused by:
org.apache.hadoop.hdfs.BlockMissingException: "
It seems you configured hadoop state store and giving NAS mount.

Regards
Bhaskar



On Thu, Nov 28, 2019 at 11:36 AM 曾祥才  wrote:

> /flink/checkpoints  is a external persistent store (a nas directory mounts
> to the job manager)
>
>
>
>
> -- 原始邮件 --
> *发件人:* "Vijay Bhaskar";
> *发送时间:* 2019年11月28日(星期四) 下午2:29
> *收件人:* "曾祥才";
> *抄送:* "user";
> *主题:* Re: JobGraphs not cleaned up in HA mode
>
> Following are the mandatory condition to run in HA:
>
> a) You should have persistent common external store for jobmanager and
> task managers to while writing the state
> b) You should have persistent external store for zookeeper to store the
> Jobgraph.
>
> Zookeeper is referring  path:
> /flink/checkpoints/submittedJobGraph480ddf9572ed  to get the job graph but
> jobmanager unable to find it.
> It seems /flink/checkpoints  is not the external persistent store
>
>
> Regards
> Bhaskar
>
> On Thu, Nov 28, 2019 at 10:43 AM seuzxc  wrote:
>
>> hi ,I've the same problem with flink 1.9.1 , any solution to fix it
>> when the k8s redoploy jobmanager ,  the error looks like (seems zk not
>> remove submitted job info, but jobmanager remove the file):
>>
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> submitted JobGraph from state handle under
>> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>>
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
>> at
>>
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
>> ... 9 more
>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
>> block: BP-1651346363-10.20.1.81-1525354906737:blk_1083182315_9441494
>> file=/flink/checkpoints/submittedJobGraph480ddf9572ed
>> at
>>
>> org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1052)
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


?????? JobGraphs not cleaned up in HA mode

2019-11-27 Thread ??????
/flink/checkpoints is a external persistent store(a nas directory 
mounts to the job manager)








----
??:"Vijay Bhaskar"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread Vijay Bhaskar
Following are the mandatory condition to run in HA:

a) You should have persistent common external store for jobmanager and task
managers to while writing the state
b) You should have persistent external store for zookeeper to store the
Jobgraph.

Zookeeper is referring  path:
/flink/checkpoints/submittedJobGraph480ddf9572ed  to get the job graph but
jobmanager unable to find it.
It seems /flink/checkpoints  is not the external persistent store


Regards
Bhaskar

On Thu, Nov 28, 2019 at 10:43 AM seuzxc  wrote:

> hi ,I've the same problem with flink 1.9.1 , any solution to fix it
> when the k8s redoploy jobmanager ,  the error looks like (seems zk not
> remove submitted job info, but jobmanager remove the file):
>
>
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> submitted JobGraph from state handle under
> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
> at
>
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
> at
>
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
> ... 9 more
> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
> block: BP-1651346363-10.20.1.81-1525354906737:blk_1083182315_9441494
> file=/flink/checkpoints/submittedJobGraph480ddf9572ed
> at
>
> org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1052)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread seuzxc
hi ,I've the same problem with flink 1.9.1 , any solution to fix it
when the k8s redoploy jobmanager ,  the error looks like (seems zk not
remove submitted job info, but jobmanager remove the file):  


Caused by: org.apache.flink.util.FlinkException: Could not retrieve
submitted JobGraph from state handle under
/147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
... 9 more
Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
block: BP-1651346363-10.20.1.81-1525354906737:blk_1083182315_9441494
file=/flink/checkpoints/submittedJobGraph480ddf9572ed
at
org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1052)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What happens to the channels when there is backpressure?

2019-11-27 Thread yingjie cao
Hi Felipe,

That depends on what do you mean by 'bandwidth'. If you mean the capability
of the network stack, the answer would be no.

Here is a post about Flink network stack which may help:
https://flink.apache.org/2019/06/05/flink-network-stack.html.

Thanks,
Yingjie

Felipe Gutierrez  于2019年11月27日周三 下午11:13写道:

> Hi community,
>
> I have a question about backpressure. Suppose a scenario that I have a map
> and a reducer, and the reducer is back pressuring the map operator. I know
> that the reducer is processing tuples at a lower rate than it is receiving.
>
> However, can I say that at least one channel between the map and the
> reducer is totally using its available bandwidth?
>
> My guess is it is not, at least in the beginning. But as the time goes on
> the tuples will be queued in the network buffer of the reducer and then the
> bandwidth will be 100% of usage. Am I right?
>
> Thanks,
> Felipe
>
>
>


Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-27 Thread yingjie
Piotr is right, that depend on the data size you are reading and the memory
pressure. Those memory occupied by mmapped region can be recycled and used
by other processes if memory pressure is high, that is, other process or
service on the same node won't be affected because the OS will recycle the
mmapped pages if needed. But currently, you can't assume a bound of the
memory can be used, because it will use more memory as long as there is free
space and you have more new data to read.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 状态一直是in progress

2019-11-27 Thread Caizhi Weng
Hi,

邮件里的图片看不到... 可以通过附件的形式再发一次图片吗?

sun <1392427...@qq.com> 于2019年11月27日周三 下午8:18写道:

>
>
>
> 请问我的状态一直是in progress是个什么问题啊
>
>
>


Re: Converting streaming to batch execution

2019-11-27 Thread Caizhi Weng
Hi Nick,

It seems to me that the slow part of the whole pipeline is the Derby sink.
Could you change it into other sinks (for example, csv sink or even a
"discard everything" sink) and see if the throughput improves?

If this is the case, are you using the JDBC connector? If yes, you might
consider calling the `JDBCOutputFormatBuilder#setBatchInterval` method and
make the batch interval to a larger value.

Thanks

Nicholas Walton  于2019年11月28日周四 上午1:57写道:

> Hi,
>
> I’ve been working with a pipleline that was initially aimed at processing
> high speed sensor data, but for a proof of concept I’m feeding simulated
> data from a CSV file. Each row of the file is a sample across a number of
> time series, and I’ve been using the streaming environment to process each
> column of the file in parallel. The columns are each around 6 million
> samples in length. The outputs are being sunk into a Derby database table.
>
> However, I’m not getting a very high throughput even running across two
> 32G Dell laptops with the database on a 16Mb Macbook. According to the
> metrics it seems I’m only dropping records on to the database at a rate of
> around 20-30 per second (I assume per parallel pipe of which I’m running 16
> across the two laptops). The pipeline runs a couple of windowing operations
> one of length 10 and one of length 100 with a small amount of computation,
> but it does yield a considerable amount of output a 10G CSV file yielding a
> database of around 100Gb+.
>
> I’m thinking that the slow rate is due to using stream process to process
> a batch. So I’ve I’ve been looking at the batch support in Flink (1.8)
> intending to move the code over from stream to batch execution. However, I
> can’t make head’n tail of the batch DataSet documentation. For example, in
> the streaming environment I was setting a watermark after I read each line
> of the file to keep the time series in order, and using keyBy to split up
> the individual time series by column number. I can’t find the equivalent
> operations in the batch interface.
>
> Could someone guide me to some relevant online documentation,and before
> anyone says I have chatted with Dr Google for a good while to no
> satisfactory outcome
>
> TIA
>
> Nick Walton


????????????????????????????????70??????????????????????????????????????????????????????failed????

2019-11-27 Thread sun


Re: flink 1.9.1状态持续增大

2019-11-27 Thread 宇张
我在用Flink的Blink Table Api,状态设置为:
streamTableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(15),
Time.minutes(20));
,我预期过期状态自动清理,总的状态大小在一个范围内波动,但是现在过期状态并没有清理,导致状态越来越大,最终内存溢出;并且先前对于订阅单topic的流使用子查询sql,最外层使用处理时间时间窗口统计,单过期状态也不清理(这种情况不知道是不是误用导致的)

On Wed, Nov 27, 2019 at 8:18 PM Congxian Qiu  wrote:

> Hi
>
> 你使用 TTL state 吗? 你怎么使用的,预期行为是什么
>
> Best,
> Congxian
>
>
> 谷歌-akulaku  于2019年11月27日周三 下午5:54写道:
>
> > Hello,我这面用FlinkKafkaConsumer011订阅topic list,在设置过期时间后过期状态没有清理,请问有什么解决办法吗,
> > 并且使用双流union过期状态也是不清理,但是单流的和单topic的情况是可以清理的,请问这是bug吗
> >
> >
> >
> > 发送自 Windows 10 版邮件应用
> >
> >
>


Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi Piotrek,

There was already an issue [1] and PR for this thread. Should we mark it as
duplicated or related issue?

Best,
Tony Wei

[1] https://issues.apache.org/jira/browse/FLINK-10377

Piotr Nowojski  於 2019年11月28日 週四 上午12:17寫道:

> Hi Tony,
>
> Thanks for the explanation. Assuming that’s what’s happening, then I
> agree, this checkStyle should be removed. I created a ticket for this issue
> https://issues.apache.org/jira/browse/FLINK-14979
>
> Piotrek
>
> On 27 Nov 2019, at 16:28, Tony Wei  wrote:
>
> Hi Piotrek,
>
> The case here was that the first snapshot is a savepoint. I know that if
> the following checkpoint succeeded before the previous one, the previous
> one will be subsumed by JobManager. However, if that previous one is a
> savepoint, it won't be subsumed. That leads to the case that Chesney said.
> The following checkpoint succeeded before the previous savepoint, handling
> both of their pending transaction, but savepoint still succeeded and sent
> the notification to each TaskManager. That led to this exception. Could you
> double check if this is the case? Thank you.
>
> Best,
> Tony Wei
>
> Piotr Nowojski  於 2019年11月27日 週三 下午8:50 寫道:
>
>> Hi,
>>
>> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
>> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
>> has the same logic [1]. If I remember the discussions with Stephan/Till,
>> the way how Flink is using Akka probably guarantees that messages will be
>> always delivered, except of some failure, so `notifyCheckpointComplete`
>> could be missed probably only if a failure happens between snapshot and
>> arrival of the notification. Receiving the same notification twice should
>> be impossible (based on the knowledge passed to me from Till/Stephan).
>>
>> However, for one thing, if that’s possible, then the code should adjusted
>> accordingly. On the other hand, maybe there is no harm in relaxing the
>> contract? Even if we miss this notification (because of some re-ordering?),
>> next one will subsume the missed one and commit everything.
>>
>> Piotrek
>>
>> [1]
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>>
>> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
>>
>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
>> The notification for complete checkpoints is not reliable; it may be late,
>> not come at all, possibly even in different order than expected.
>>
>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
>> the sink will always fail with an exception.
>>
>> What it should do imo is either a) don't check that there is a pending
>> transaction or b) track the highest checkpoint id received and optionally
>> don't fail if the notification is for an older CP.
>>
>> @piotr WDYT?
>>
>> On 27/11/2019 08:59, Tony Wei wrote:
>>
>> Hi,
>>
>> As the follow up, it seem that savepoint can't be subsumed, so that its
>> notification could still be send to each TMs.
>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>
>> Best,
>> Tony Wei
>>
>> Tony Wei  於 2019年11月27日 週三 下午3:43寫道:
>>
>>> Hi,
>>>
>>> I want to raise this question again, since I have had this exception on
>>> my production job.
>>>
>>> The exception is as follows
>>>
>>>
 2019-11-27 14:47:29
>>>
>>>
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint at
 org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
 .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:
 748) Caused by: java.lang.IllegalStateException: checkpoint completed,
 but no transaction pending at org.apache.flink.util.Preconditions
 .checkState(Preconditions.java:195) at
 org.apache.flink.streaming.api.functions.sink.
 TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
 TwoPhaseCommitSinkFunction.java:267) at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
 .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at
 org.apache.flink.streaming.runtime.tasks.StreamTask
 .notifyCheckpointComplete(StreamTask.java:822) at
 org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
 ... 5 more
>>>
>>>
>>> And these are the checkpoint / savepoint before the job failed.
>>> 
>>>
>>> It seems that checkpoint # 675's notification handled the savepoint #
>>> 674's pending transaction holder, but savepoint #674's notification didn't
>>> be subsumed or be ignored by JM.
>>> Therefore, during the checkpoint #676, some tasks got notification
>>> before getting the 

Converting streaming to batch execution

2019-11-27 Thread Nicholas Walton
Hi,

I’ve been working with a pipleline that was initially aimed at processing high 
speed sensor data, but for a proof of concept I’m feeding simulated data from a 
CSV file. Each row of the file is a sample across a number of time series, and 
I’ve been using the streaming environment to process each column of the file in 
parallel. The columns are each around 6 million samples in length. The outputs 
are being sunk into a Derby database table.

However, I’m not getting a very high throughput even running across two 32G 
Dell laptops with the database on a 16Mb Macbook. According to the metrics it 
seems I’m only dropping records on to the database at a rate of around 20-30 
per second (I assume per parallel pipe of which I’m running 16 across the two 
laptops). The pipeline runs a couple of windowing operations one of length 10 
and one of length 100 with a small amount of computation, but it does yield a 
considerable amount of output a 10G CSV file yielding a database of around 
100Gb+.

I’m thinking that the slow rate is due to using stream process to process a 
batch. So I’ve I’ve been looking at the batch support in Flink (1.8) intending 
to move the code over from stream to batch execution. However, I can’t make 
head’n tail of the batch DataSet documentation. For example, in the streaming 
environment I was setting a watermark after I read each line of the file to 
keep the time series in order, and using keyBy to split up the individual time 
series by column number. I can’t find the equivalent operations in the batch 
interface.

Could someone guide me to some relevant online documentation,and before anyone 
says I have chatted with Dr Google for a good while to no satisfactory outcome

TIA

Nick Walton

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-27 Thread Gyula Fóra
You are right Aaron.

I would say this is like this by design as Flink doesn't require you to
initialize state in the open method so it has no safe way to delete the
non-referenced ones.

What you can do is restore the state and clear it on all operators and not
reference it again. I know this feels like a workaround but I have no
better idea at the moment.

Cheers,
Gyula

On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin  wrote:

> Hi,
>
> Yes, we're using UNION state. I would assume, though, that if you are
> not reading the UNION state it would either stop stick around as a
> constant factor in your state size, or get cleared.
>
> Looks like I should try to recreate a small example and submit a bug
> if this is true. Otherwise it's impossible to remove union state from
> your operators.
>
> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu 
> wrote:
> >
> > Hi
> >
> > Do you use UNION state in your scenario, when using UNION state, then JM
> may encounter OOM because each TDD will contains all the state of all
> subtasks[1]
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> > Best,
> > Congxian
> >
> >
> > Aaron Levin  于2019年11月27日周三 上午3:55写道:
> >>
> >> Hi,
> >>
> >> Some context: after a refactoring, we were unable to start our jobs.
> >> They started fine and checkpointed fine, but once the job restarted
> >> owing to a transient failure, the application was unable to start. The
> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
> >> the `_metadata` file we saw `- 1402496 offsets:
> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
> >> to be the operator state we were no longer initializing or
> >> snapshotting after the refactoring.
> >>
> >> Before I dig further into this and try to find a smaller reproducible
> >> test case I thought I would ask if someone knows what the expected
> >> behaviour is for the following scenario:
> >>
> >> suppose you have an operator (in this case a Source) which has some
> >> operator ListState. Suppose you run your flink job for some time and
> >> then later refactor your job such that you no longer use that state
> >> (so after the refactoring you're no longer initializing this operator
> >> state in initializeState, nor are you snapshotting the operator state
> >> in snapshotState). If you launch your new code from a recent
> >> savepoint, what do we expect to happen to the state? Do we anticipate
> >> the behaviour I explained above?
> >>
> >> My assumption would be that Flink would not read this state and so it
> >> would be removed from the next checkpoint or savepoint. Alternatively,
> >> I might assume it would not be read but would linger around every
> >> future checkpoint or savepoint. However, it feels like what is
> >> happening is it's not read and then possibly replicated by every
> >> instance of the task every time a checkpoint happens (hence the
> >> accidentally exponential behaviour).
> >>
> >> Thoughts?
> >>
> >> PS - in case someone asks: I was sure that we were calling `.clear()`
> >> appropriately in `snapshotState` (we, uh, already learned that lesson
> >> :D)
> >>
> >> Best,
> >>
> >> Aaron Levin
>


Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-27 Thread Aaron Levin
Hi,

Yes, we're using UNION state. I would assume, though, that if you are
not reading the UNION state it would either stop stick around as a
constant factor in your state size, or get cleared.

Looks like I should try to recreate a small example and submit a bug
if this is true. Otherwise it's impossible to remove union state from
your operators.

On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu  wrote:
>
> Hi
>
> Do you use UNION state in your scenario, when using UNION state, then JM may 
> encounter OOM because each TDD will contains all the state of all subtasks[1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> Best,
> Congxian
>
>
> Aaron Levin  于2019年11月27日周三 上午3:55写道:
>>
>> Hi,
>>
>> Some context: after a refactoring, we were unable to start our jobs.
>> They started fine and checkpointed fine, but once the job restarted
>> owing to a transient failure, the application was unable to start. The
>> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> the `_metadata` file we saw `- 1402496 offsets:
>> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> to be the operator state we were no longer initializing or
>> snapshotting after the refactoring.
>>
>> Before I dig further into this and try to find a smaller reproducible
>> test case I thought I would ask if someone knows what the expected
>> behaviour is for the following scenario:
>>
>> suppose you have an operator (in this case a Source) which has some
>> operator ListState. Suppose you run your flink job for some time and
>> then later refactor your job such that you no longer use that state
>> (so after the refactoring you're no longer initializing this operator
>> state in initializeState, nor are you snapshotting the operator state
>> in snapshotState). If you launch your new code from a recent
>> savepoint, what do we expect to happen to the state? Do we anticipate
>> the behaviour I explained above?
>>
>> My assumption would be that Flink would not read this state and so it
>> would be removed from the next checkpoint or savepoint. Alternatively,
>> I might assume it would not be read but would linger around every
>> future checkpoint or savepoint. However, it feels like what is
>> happening is it's not read and then possibly replicated by every
>> instance of the task every time a checkpoint happens (hence the
>> accidentally exponential behaviour).
>>
>> Thoughts?
>>
>> PS - in case someone asks: I was sure that we were calling `.clear()`
>> appropriately in `snapshotState` (we, uh, already learned that lesson
>> :D)
>>
>> Best,
>>
>> Aaron Levin


Re: Apache Flink - Throttling stream flow

2019-11-27 Thread Rong Rong
Hi Mans,

is this what you are looking for [1][2]?

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-11501
[2] https://github.com/apache/flink/pull/7679

On Mon, Nov 25, 2019 at 3:29 AM M Singh  wrote:

> Thanks Ciazhi & Thomas for your responses.
>
> I read the throttling example but want to see if that work with a
> distributed broker like Kinesis and how to have throttling feedback to the
> Kinesis source so that it can vary the rate without interfering with
> watermarks, etc.
>
> Thanks again
>
> Mans
>
>
> On Monday, November 25, 2019, 05:55:21 AM EST, Thomas Julian <
> thomasjul...@zoho.com> wrote:
>
>
> related
>
> https://issues.apache.org/jira/browse/FLINK-13792
>
> Regards,
> Julian.
>
>
>  On Mon, 25 Nov 2019 15:25:14 +0530 *Caizhi Weng
> >* wrote 
>
> Hi,
>
> As far as I know, Flink currently doesn't have a built-in throttling
> function. You can write your own user-defined function to achieve this.
> Your function just gives out what it reads in and limits the speed it gives
> out records at the same time.
>
> If you're not familiar with user-defined functions, see
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>
> Here is a throttling iterator example:
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
>
> M Singh  于2019年11月25日周一 上午5:50写道:
>
> Hi:
>
> I have an Flink streaming application that invokes  some other web
> services.  However the webservices have limited throughput.  So I wanted to
> find out if there any recommendations on how to throttle the Flink
> datastream so that they don't overload the downstrream services.  I am
> using Kinesis as source and sink in my application.
>
> Please let me know if there any hooks available in Flink, what are the
> patterns that can be used and what are the best practices/pitfalls for
> using them.
>
> Thanks
>
> Mans
>
>
>
>


Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Piotr Nowojski
Hi Tony,

Thanks for the explanation. Assuming that’s what’s happening, then I agree, 
this checkStyle should be removed. I created a ticket for this issue 
https://issues.apache.org/jira/browse/FLINK-14979 


Piotrek

> On 27 Nov 2019, at 16:28, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> The case here was that the first snapshot is a savepoint. I know that if the 
> following checkpoint succeeded before the previous one, the previous one will 
> be subsumed by JobManager. However, if that previous one is a savepoint, it 
> won't be subsumed. That leads to the case that Chesney said. The following 
> checkpoint succeeded before the previous savepoint, handling both of their 
> pending transaction, but savepoint still succeeded and sent the notification 
> to each TaskManager. That led to this exception. Could you double check if 
> this is the case? Thank you. 
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski mailto:pi...@ververica.com>> 於 
> 2019年11月27日 週三 下午8:50 寫道:
> Hi,
> 
> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based 
> on Pravega’s sink for Flink, which was implemented by Stephan, and it has the 
> same logic [1]. If I remember the discussions with Stephan/Till, the way how 
> Flink is using Akka probably guarantees that messages will be always 
> delivered, except of some failure, so `notifyCheckpointComplete` could be 
> missed probably only if a failure happens between snapshot and arrival of the 
> notification. Receiving the same notification twice should be impossible 
> (based on the knowledge passed to me from Till/Stephan).
> 
> However, for one thing, if that’s possible, then the code should adjusted 
> accordingly. On the other hand, maybe there is no harm in relaxing the 
> contract? Even if we miss this notification (because of some re-ordering?), 
> next one will subsume the missed one and commit everything. 
> 
> Piotrek
> 
> [1] 
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>  
> 
> 
>> On 27 Nov 2019, at 13:02, Chesnay Schepler > > wrote:
>> 
>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. 
>> The notification for complete checkpoints is not reliable; it may be late, 
>> not come at all, possibly even in different order than expected.
>> 
>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify 
>> the sink will always fail with an exception.
>> 
>> What it should do imo is either a) don't check that there is a pending 
>> transaction or b) track the highest checkpoint id received and optionally 
>> don't fail if the notification is for an older CP.
>> 
>> @piotr WDYT?
>> 
>> On 27/11/2019 08:59, Tony Wei wrote:
>>> Hi, 
>>> 
>>> As the follow up, it seem that savepoint can't be subsumed, so that its 
>>> notification could still be send to each TMs.
>>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>> 
>>> Best,
>>> Tony Wei
>>> 
>>> Tony Wei mailto:tony19920...@gmail.com>> 於 
>>> 2019年11月27日 週三 下午3:43寫道:
>>> Hi, 
>>> 
>>> I want to raise this question again, since I have had this exception on my 
>>> production job.
>>> 
>>> The exception is as follows
>>>  
>>> 2019-11-27 14:47:29
>>>  
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no 
>>> transaction pending
>>> at 
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> at 
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>>> ... 5 more
>>> 
>>> And these are the checkpoint / savepoint before the job failed.
>>> 
>>> 
>>> It seems that checkpoint # 675's notification handled the savepoint # 674's 
>>> pending transaction holder, but savepoint #674's notification didn't be 
>>> subsumed or be ignored by JM.
>>> Therefore, during the checkpoint #676, 

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi Piotrek,

The case here was that the first snapshot is a savepoint. I know that if
the following checkpoint succeeded before the previous one, the previous
one will be subsumed by JobManager. However, if that previous one is a
savepoint, it won't be subsumed. That leads to the case that Chesney said.
The following checkpoint succeeded before the previous savepoint, handling
both of their pending transaction, but savepoint still succeeded and sent
the notification to each TaskManager. That led to this exception. Could you
double check if this is the case? Thank you.

Best,
Tony Wei

Piotr Nowojski  於 2019年11月27日 週三 下午8:50 寫道:

> Hi,
>
> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
> has the same logic [1]. If I remember the discussions with Stephan/Till,
> the way how Flink is using Akka probably guarantees that messages will be
> always delivered, except of some failure, so `notifyCheckpointComplete`
> could be missed probably only if a failure happens between snapshot and
> arrival of the notification. Receiving the same notification twice should
> be impossible (based on the knowledge passed to me from Till/Stephan).
>
> However, for one thing, if that’s possible, then the code should adjusted
> accordingly. On the other hand, maybe there is no harm in relaxing the
> contract? Even if we miss this notification (because of some re-ordering?),
> next one will subsume the missed one and commit everything.
>
> Piotrek
>
> [1]
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>
> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
>
> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
> The notification for complete checkpoints is not reliable; it may be late,
> not come at all, possibly even in different order than expected.
>
> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
> the sink will always fail with an exception.
>
> What it should do imo is either a) don't check that there is a pending
> transaction or b) track the highest checkpoint id received and optionally
> don't fail if the notification is for an older CP.
>
> @piotr WDYT?
>
> On 27/11/2019 08:59, Tony Wei wrote:
>
> Hi,
>
> As the follow up, it seem that savepoint can't be subsumed, so that its
> notification could still be send to each TMs.
> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>
> Best,
> Tony Wei
>
> Tony Wei  於 2019年11月27日 週三 下午3:43寫道:
>
>> Hi,
>>
>> I want to raise this question again, since I have had this exception on
>> my production job.
>>
>> The exception is as follows
>>
>>
>>> 2019-11-27 14:47:29
>>
>>
>>
>> java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
>>> .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but
>>> no transaction pending at org.apache.flink.util.Preconditions
>>> .checkState(Preconditions.java:195) at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>> .notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267) at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .notifyCheckpointComplete(StreamTask.java:822) at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ...
>>> 5 more
>>
>>
>> And these are the checkpoint / savepoint before the job failed.
>> 
>>
>> It seems that checkpoint # 675's notification handled the savepoint #
>> 674's pending transaction holder, but savepoint #674's notification didn't
>> be subsumed or be ignored by JM.
>> Therefore, during the checkpoint #676, some tasks got notification before
>> getting the checkpoint barrier and led to this exception happened, because
>> there was no pending transaction in queue.
>>
>> Does anyone know the details about subsumed notifications mechanism and
>> how checkpoint coordinator handle this situation? Please correct me if I'm
>> wrong. Thanks.
>>
>> Best,
>> Tony Wei
>>
>> Stefan Richter  於 2018年10月8日 週一 下午5:03寫道:
>>
>>> Hi Pedro,
>>>
>>> unfortunately the interesting parts are all removed from the log, we
>>> already know about the exception itself. In particular, what I would like
>>> to see is what checkpoints have been triggered and completed before the
>>> exception happens.
>>>
>>> 

What happens to the channels when there is backpressure?

2019-11-27 Thread Felipe Gutierrez
Hi community,

I have a question about backpressure. Suppose a scenario that I have a map
and a reducer, and the reducer is back pressuring the map operator. I know
that the reducer is processing tuples at a lower rate than it is receiving.

However, can I say that at least one channel between the map and the
reducer is totally using its available bandwidth?

My guess is it is not, at least in the beginning. But as the time goes on
the tuples will be queued in the network buffer of the reducer and then the
bandwidth will be 100% of usage. Am I right?

Thanks,
Felipe


[PROPOSAL/SURVEY] Enable background cleanup for state with TTL by default

2019-11-27 Thread Andrey Zagrebin
Hi all,

We were thinking about enabling background cleanup for the state with TTL
by default:
StateTtlConfig#Builder#cleanupInBackground()

Previously, we did not have it in the first implementation of TTL if you
remember.
So technically, we were a bit conservative to not enable it by default at
once.
In general, most of TTL use cases should always enable background cleanup
one way or another,
because it is usually needed. It means that it makes sense to enable it for
users to not care about it by default.

I am starting this thread to collect any feedback for this change which we
want to include into 1.10 release. Basically, the question is whether there
have been any problems with the background cleanups to postpone this change.

JIRA issue and PR:
https://issues.apache.org/jira/browse/FLINK-14898

I will let the thread to hang for some time if nothing speaks against it we
will merge it next week and include into 1.10.

Thanks,
Andrey


AW: ArrayIndexOutOfBoundException on checkpoint creation

2019-11-27 Thread theo.diefent...@scoop-software.de
Sorry, I forgot to mention the environment.
We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but using 
Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise from kryo, 
I doubt it)
Our flink is configured as default. Our job uses FsStateBackend and exactly 
once processing with Kafka source and sink.
Best regardsTheo
 Ursprüngliche Nachricht 
Betreff: ArrayIndexOutOfBoundException on checkpoint creation
Von: Theo Diefenthal
An: user
Cc:


Hi, 

We have a pipeline with a custom ProcessFunction and state (see [1], 
implemented as suggested by Fabian with a ValueState and 
ValueState>) 
The behavior of that function works fine in our unittests and with low load in 
our test environment (100.000 records per minute). On the production 
environment, we observe reproduceable crashes like the attached one. 
Any idea on why this out of bound could be caused? Every time we read the state 
and modify it, we are certain that an .update() was called: 

2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
checkpoint 7 for operator our_operator) (4/8). 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
 
2019-11-26T11:26:55+01:00 host19 ... 3 more 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
 
2019-11-26T11:26:55+01:00 host19 ... 5 more 
2019-11-26T11:26:55+01:00 host18 WARN  org.apache.hadoop.hdfs.DataStreamer  
 - DataStreamer Exception 
2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not 
exist: 
/.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18
 (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have 
any open files. 
2019-11-26T11:26:55+01:00 host18 at 

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Piotr Nowojski
Hi,

Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based on 
Pravega’s sink for Flink, which was implemented by Stephan, and it has the same 
logic [1]. If I remember the discussions with Stephan/Till, the way how Flink 
is using Akka probably guarantees that messages will be always delivered, 
except of some failure, so `notifyCheckpointComplete` could be missed probably 
only if a failure happens between snapshot and arrival of the notification. 
Receiving the same notification twice should be impossible (based on the 
knowledge passed to me from Till/Stephan).

However, for one thing, if that’s possible, then the code should adjusted 
accordingly. On the other hand, maybe there is no harm in relaxing the 
contract? Even if we miss this notification (because of some re-ordering?), 
next one will subsume the missed one and commit everything. 

Piotrek

[1] 
https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
 


> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
> 
> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. The 
> notification for complete checkpoints is not reliable; it may be late, not 
> come at all, possibly even in different order than expected.
> 
> As such, if you a simple case of snapshot -> snapshot -> notify -> notify the 
> sink will always fail with an exception.
> 
> What it should do imo is either a) don't check that there is a pending 
> transaction or b) track the highest checkpoint id received and optionally 
> don't fail if the notification is for an older CP.
> 
> @piotr WDYT?
> 
> On 27/11/2019 08:59, Tony Wei wrote:
>> Hi, 
>> 
>> As the follow up, it seem that savepoint can't be subsumed, so that its 
>> notification could still be send to each TMs.
>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>> 
>> Best,
>> Tony Wei
>> 
>> Tony Wei mailto:tony19920...@gmail.com>> 於 
>> 2019年11月27日 週三 下午3:43寫道:
>> Hi, 
>> 
>> I want to raise this question again, since I have had this exception on my 
>> production job.
>> 
>> The exception is as follows
>>  
>> 2019-11-27 14:47:29
>>  
>> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>> at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no 
>> transaction pending
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at 
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>> ... 5 more
>> 
>> And these are the checkpoint / savepoint before the job failed.
>> 
>> 
>> It seems that checkpoint # 675's notification handled the savepoint # 674's 
>> pending transaction holder, but savepoint #674's notification didn't be 
>> subsumed or be ignored by JM.
>> Therefore, during the checkpoint #676, some tasks got notification before 
>> getting the checkpoint barrier and led to this exception happened, because 
>> there was no pending transaction in queue.
>> 
>> Does anyone know the details about subsumed notifications mechanism and how 
>> checkpoint coordinator handle this situation? Please correct me if I'm 
>> wrong. Thanks.
>> 
>> Best,
>> Tony Wei
>> 
>> Stefan Richter > > 於 2018年10月8日 週一 下午5:03寫道:
>> Hi Pedro,
>> 
>> unfortunately the interesting parts are all removed from the log, we already 
>> know about the exception itself. In particular, what I would like to see is 
>> what checkpoints have been triggered and completed before the exception 
>> happens.
>> 
>> Best,
>> Stefan
>> 
>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves > > >:
>> > 
>> > Hello,
>> > 
>> > Find attached the jobmanager.log. I've omitted the log lines from other
>> > runs, only left the job manager info and the run with the error. 
>> > 
>> > jobmanager.log
>> > > >  
>> > 

Re: flink 1.9.1状态持续增大

2019-11-27 Thread Congxian Qiu
Hi

你使用 TTL state 吗? 你怎么使用的,预期行为是什么

Best,
Congxian


谷歌-akulaku  于2019年11月27日周三 下午5:54写道:

> Hello,我这面用FlinkKafkaConsumer011订阅topic list,在设置过期时间后过期状态没有清理,请问有什么解决办法吗,
> 并且使用双流union过期状态也是不清理,但是单流的和单topic的情况是可以清理的,请问这是bug吗
>
>
>
> 发送自 Windows 10 版邮件应用
>
>


Re: checkpoint文件一直在增涨

2019-11-27 Thread Congxian Qiu
Hi

你的图挂了,如果是文件数一直在增长,可以看一下 job 配置保留多少个 checkpoint(也就是 retained checkpoint
是多少),理论上过期的 checkpoint 都会被删掉的。


Best,
Congxian


sun <1392427...@qq.com> 于2019年11月27日周三 下午7:36写道:

> 你好:
>
>   我的flink配置是:
>
> ,然后我的checkpoint文件一直在增涨
>
>
>
>
> 请问怎么解决这个问题,万分感谢
>


??????????in progress

2019-11-27 Thread sun
??in progress??

Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-27 Thread Congxian Qiu
Hi,

As the doc[1] said we should assign uid to all the stateful operators. If
you do not set uid for an operator, Flink will generate an operatorId for
it, AFAIK, operatorId will not change as far as the job DAG does not change.

you can skip the operator's state which is not in the new job, please ref
to doc[2], and theses operators will start from scratch.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job
Best,
Congxian


M Singh  于2019年11月26日周二 上午10:49写道:

> Hi Kostas/Congxian:
>
> Thanks fo your response.
>
> Based on your feedback, I found that I had missed adding uid to one of the
> stateful operators and correcting that resolved the issue.  I still have
> stateless operators which I have no uid specified in the application.
>
> So, I thought that adding uid was optional and if we don't add it and run
> another instance of the same app from a savepoint or checkpoint, it will
> pick up the state based on the generated uid.  Is that a correct
> understanding ?
>
> Also, if some stateful operators have uids but some don't, will it pick up
> the state for the operators with uid and the non-uid (using the generated
> uid) ones provided the application has not changed ?
>
> Thanks again for your response.
>
> Mans
>
> On Monday, November 25, 2019, 09:24:42 AM EST, Congxian Qiu <
> qcx978132...@gmail.com> wrote:
>
>
> Hi
>
> The problem is that the specified uid did not in the new job.
> 1. As far as I know, the answer is yes. There are some operators have
> their own state(such as window state), could you please share the minimal
> code of your job?
> 2.*truely* stateless operator do not need to have uid, but for the reason
> described in the above, assign uid to all operators is recommended.
> 3. if the previous job is still there, I'm not sure we can find the
> operatorId in the UI easily, maybe other people can answer the question.
> 4. for this purpose, maybe you can debug the savepoint meta with the new
> job locally, maybe CheckpointMetadataLoadingTest can help.
> 5. for this problem, 1.9 is same as 1.6
>
>
> Best,
> Congxian
>
>
> Kostas Kloudas  于2019年11月25日周一 下午9:42写道:
>
> As a side note, I am assuming that you are using the same Flink Job
> before and after the savepoint and the same Flink version.
> Am I correct?
>
> Cheers,
> Kostas
>
> On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
> >
> > Hi Singh,
> >
> > This behaviour is strange.
> > One thing I can recommend to see if the two jobs are identical is to
> > launch also the second job without a savepoint,
> > just start from scratch, and simply look at the web interface to see
> > if everything is there.
> >
> > Also could you please provide some code from your job, just to see if
> > there is anything problematic with the application code?
> > Normally there should be no problem with not providing UIDs for some
> > stateless operators.
> >
> > Cheers,
> > Kostas
> >
> > On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> > >
> > >
> > > Hey Folks:
> > >
> > > Please let me know how to resolve this issue since using
> --allowNonRestoredState without knowing if any state will be lost seems
> risky.
> > >
> > > Thanks
> > > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
> > >
> > >
> > > Hi:
> > >
> > > I have a flink application in which some of the operators have uid and
> name and some stateless ones don't.
> > >
> > > I've taken a save point and tried to start another instance of the
> application from a savepoint - I get the following exception which
> indicates that the operator is not available to the new program even though
> the second job is the same as first but just running from the first jobs
> savepoint.
> > >
> > > Caused by: java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> Cannot map checkpoint/savepoint state for operator
> d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator
> is not available in the new program. If you want to allow to skip this, you
> can set the --allowNonRestoredState option on the CLI.
> > >
> > > at
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> > >
> > > at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
> > >
> > > at
> 

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Chesnay Schepler
This looks to me like the TwoPhaseCommitSinkFunction is a bit too 
strict. The notification for complete checkpoints is not reliable; it 
may be late, not come at all, possibly even in different order than 
expected.


As such, if you a simple case of snapshot -> snapshot -> notify -> 
notify the sink will always fail with an exception.


What it should do imo is either a) don't check that there is a pending 
transaction or b) track the highest checkpoint id received and 
optionally don't fail if the notification is for an older CP.


@piotr WDYT?

On 27/11/2019 08:59, Tony Wei wrote:

Hi,

As the follow up, it seem that savepoint can't be subsumed, so that 
its notification could still be send to each TMs.

Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?

Best,
Tony Wei

Tony Wei mailto:tony19920...@gmail.com>> 於 
2019年11月27日 週三 下午3:43寫道:


Hi,

I want to raise this question again, since I have had this
exception on my production job.

The exception is as follows

2019-11-27 14:47:29

java.lang.RuntimeException: Error while confirming checkpoint
at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.IllegalStateException: checkpoint completed, but no
transaction pending at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at

org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
at

org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
... 5 more


And these are the checkpoint / savepoint before the job failed.
checkoint.png

It seems that checkpoint # 675's notification handled the
savepoint # 674's pending transaction holder, but savepoint #674's
notification didn't be subsumed or be ignored by JM.
Therefore, during the checkpoint #676, some tasks got notification
before getting the checkpoint barrier and led to this exception
happened, because there was no pending transaction in queue.

Does anyone know the details about subsumed notifications
mechanism and how checkpoint coordinator handle this situation?
Please correct me if I'm wrong. Thanks.

Best,
Tony Wei

Stefan Richter mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一
下午5:03寫道:

Hi Pedro,

unfortunately the interesting parts are all removed from the
log, we already know about the exception itself. In
particular, what I would like to see is what checkpoints have
been triggered and completed before the exception happens.

Best,
Stefan

> Am 08.10.2018 um 10:23 schrieb PedroMrChaves
mailto:pedro.mr.cha...@gmail.com>>:
>
> Hello,
>
> Find attached the jobmanager.log. I've omitted the log lines
from other
> runs, only left the job manager info and the run with the
error.
>
> jobmanager.log
>



>
>
>
> Thanks again for your help.
>
> Regards,
> Pedro.
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: How to recover state from savepoint on embedded mode?

2019-11-27 Thread Congxian Qiu
Hi,

You can recovery from checkpoint/savepoint if JM can read from the given
path. no math which mode the job is running on.

Best,
Congxian


Reo Lei  于2019年11月26日周二 下午12:18写道:

>
>
> -- Forwarded message -
> 发件人: Reo Lei 
> Date: 2019年11月26日周二 上午9:53
> Subject: Re: How to recover state from savepoint on embedded mode?
> To: Yun Tang 
>
>
> Hi Yun,
> Thanks for your reply. what I say the embedded mode is the whole flink
> cluster and job, include jobmanager, taskmanager and the job application
> itself, running within a local JVM progress, which is use the "
> LocalStreamEnvironment" within the job. And the start command look like
> this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
> com.a.b.c.MyJob > /dev/null &"
>
> why I am not use the standalnoe mode to run the job is because the running
> env haven't zookeeper, and would not install the zookeeper. So I need to 
> depend
> on the embedded mode to run my job.
>
> BR,
> Reo
>
> Yun Tang  于2019年11月26日周二 上午2:38写道:
>
>> What is the embedded mode mean here? If you refer to SQL embedded mode,
>> you cannot resume from savepoint now; if you refer to local standalone
>> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>>
>>
>>
>> Best
>>
>> Yun Tang
>>
>>
>>
>> *From: *Reo Lei 
>> *Date: *Tuesday, November 26, 2019 at 12:37 AM
>> *To: *"user@flink.apache.org" 
>> *Subject: *How to recover state from savepoint on embedded mode?
>>
>>
>>
>> Hi,
>>
>> I have a job need running on embedded mode, but need to init some rule
>> data from a database before start. So I used the State Processor API to
>> construct my state data and save it to the local disk. When I want to used
>> this savepoint to recover my job, I found resume a job from a savepoint
>> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]`
>> to submit a job to flink cluster. That is mean the job is run on remote
>> mode, not embedded mode.
>>
>>
>>
>> And I was wondering why I can't resume a job from a savepoint on
>> embedded mode. If that is possible, what should I do?
>>
>> BTW, if we can not  resume a job from a savepoint on embedded mode, how
>> to know the savepoint is constructed correctly in develop environment and
>> use idea to debug it?
>>
>>
>>
>> BR,
>>
>> Reo
>>
>>
>>
>


Re: ArrayIndexOutOfBoundException on checkpoint creation

2019-11-27 Thread Congxian Qiu
Hi

Which version are you using now(if on some old version, could you please
try if this exception is till there on Flink 1.9),  on the other hand, did
you try RocksDBStateBackend for this?

Best,
Congxian


Theo Diefenthal  于2019年11月26日周二 下午6:52写道:

> Hi,
>
> We have a pipeline with a custom ProcessFunction and state (see [1],
> implemented as suggested by Fabian with a ValueState and
> ValueState>)
> The behavior of that function works fine in our unittests and with low
> load in our test environment (100.000 records per minute). On the
> production environment, we observe reproduceable crashes like the attached
> one.
> Any idea on why this out of bound could be caused? Every time we read the
> state and modify it, we are certain that an .update() was called:
>
> 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
> checkpoint 7 for operator our_operator) (4/8).
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745)
> 2019-11-26T11:26:55+01:00 host19 Caused by: 
> java.util.concurrent.ExecutionException: 
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> 2019-11-26T11:26:55+01:00 host19 ... 3 more
> 2019-11-26T11:26:55+01:00 host19 Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> 2019-11-26T11:26:55+01:00 host19 ... 5 more
> 2019-11-26T11:26:55+01:00 host18 WARN  org.apache.hadoop.hdfs.DataStreamer
>- DataStreamer Exception
> 2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not 
> exist: 
> /.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18
>  (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have 
> any open files.
> 2019-11-26T11:26:55+01:00 host18 at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2782)
> 2019-11-26T11:26:55+01:00 host18 at 
> 

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-27 Thread Congxian Qiu
Hi

Do you use UNION state in your scenario, when using UNION state, then JM
may encounter OOM because each TDD will contains all the state of all
subtasks[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
Best,
Congxian


Aaron Levin  于2019年11月27日周三 上午3:55写道:

> Hi,
>
> Some context: after a refactoring, we were unable to start our jobs.
> They started fine and checkpointed fine, but once the job restarted
> owing to a transient failure, the application was unable to start. The
> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
> the `_metadata` file we saw `- 1402496 offsets:
> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
> to be the operator state we were no longer initializing or
> snapshotting after the refactoring.
>
> Before I dig further into this and try to find a smaller reproducible
> test case I thought I would ask if someone knows what the expected
> behaviour is for the following scenario:
>
> suppose you have an operator (in this case a Source) which has some
> operator ListState. Suppose you run your flink job for some time and
> then later refactor your job such that you no longer use that state
> (so after the refactoring you're no longer initializing this operator
> state in initializeState, nor are you snapshotting the operator state
> in snapshotState). If you launch your new code from a recent
> savepoint, what do we expect to happen to the state? Do we anticipate
> the behaviour I explained above?
>
> My assumption would be that Flink would not read this state and so it
> would be removed from the next checkpoint or savepoint. Alternatively,
> I might assume it would not be read but would linger around every
> future checkpoint or savepoint. However, it feels like what is
> happening is it's not read and then possibly replicated by every
> instance of the task every time a checkpoint happens (hence the
> accidentally exponential behaviour).
>
> Thoughts?
>
> PS - in case someone asks: I was sure that we were calling `.clear()`
> appropriately in `snapshotState` (we, uh, already learned that lesson
> :D)
>
> Best,
>
> Aaron Levin
>


checkpoint??????????????

2019-11-27 Thread sun
??


   flink

??checkpoint??







??

Re: Flink behavior as a slow consumer - out of Heap MEM

2019-11-27 Thread Robert Metzger
Hi Hanan,

Flink does handle backpressure gracefully. I guess your custom ZMQ source
is receiving events in a separate thread?
In a Flink source, the SourceContext.collect() method will not return if
the downstream operators are not able to process incoming data fast enough.

If my assumptions are right, I would suggest you to pull data from ZMQ in
small batches, forwarding them to .collect(), and pausing the fetch when
collect() is blocked.


On Tue, Nov 26, 2019 at 6:59 AM vino yang  wrote:

> Hi Hanan,
>
> Sometimes, the behavior depends on your implementation.
>
> Since it's not a built-in connector, it would be better to share your
> customized source with the community
> so that the community would be better to help you figure out where is the
> problem.
>
> WDYT?
>
> Best,
> Vino
>
> Hanan Yehudai  于2019年11月26日周二 下午12:27写道:
>
>> HI ,  I am trying to do some performance test to my flink deployment.
>>
>> I am implementing an extremely simplistic use case
>>
>> I built a ZMQ Source
>>
>>
>>
>> The topology is ZMQ Source - > Mapper- > DIscardingSInk ( a sink that
>> does nothing )
>>
>>
>>
>> Data is pushed via ZMQ at a very high rate.
>>
>> When the incoming  rate from ZMQ is higher then the rate flink can keep
>> up with,  I can see that the JVM Heap is filling up  ( using Flinks metrics
>> ) .
>> when the heap is fullt consumes – TM chokes , a HeartBeat is missed  and
>> the job fails.
>>
>>
>>
>> I was expecting Flink to handle this type of backpressure gracefully and
>> not
>>
>>
>>
>> Note :  The mapper has not state to persist
>>
>> See below the Grafana charts,  on the left  is the TM HHEAP  Used.
>>
>> On the right – the ZMQ – out of flink. ( yellow ) Vs Flink consuming rate
>> from reported by ZMQSOurce
>>
>> 1GB is the configured heap size
>>
>>
>>
>>


Re: Some doubts about window start time and end time

2019-11-27 Thread Jun Zhang
Hi,Caizhi :
1.if I add offset , 
window(TumblingProcessingTimeWindows.of(Time.hours(6),Time.hours(-8)))
 it wil get a error: TumblingProcessingTimeWindows parameters must 
satisfy abs(offset) < size 
2.If it is caused by do not adding an offset, then why the same code, I set the 
window size to be an hour and there is no problem, and set the window size to 
six hours will be a problem?
   
  

 
 
 
 On 11/27/2019 18:21??Caizhi Weng

Re:回复: 回复: flink on yarn 的 kerberos 认证问题

2019-11-27 Thread guoshuai
我们的flink配置kerberos的,访问kerberos的组件kafka,HBase(zk也是kerberos)和开源的simple组件es,mysql这些
 读写都没什么问题.(Kerberos都是在一个集群下的,安全认证的用户具有访问hdfs,zk,kafka的权限)

你说你的zookeeper也是simple的,两个simple组件互相访问是没问题,   
但是确定simple的zk可以让flink访问配置kerberos的hbase及存储数据的hdfs?(这个我没测过)
1:确定simple模式的flink可以拿到kerberos的kafka,是的话应该可以排除掉simple模式flink跨集群访问kerberos的问题
2:有没有可能是simple模式不能访问配置kerberos集群的hdfs导致的,

在配置kerberos集群的core-site.xml开启allow simple 试试

ipc.client.fallback-to-simple-auth-allowedtrue
 








在 2019-11-27 17:08:48,"venn"  写道:
>我们好像zookeeper 没有开安全认证,从Java 代码也没有添加 jaas.conf 文件,而且
>看 日志里面有 关于 zookeeper 已经建立连接相关的日志。
>
>
>问题其实还没到这一步,我现在的问题是:“Flink on yarn 运行在不用认证的hadoop
>集群上,是否可以访问带kerberos 认证的hadoop集群的 hbase”
>
>现在是这两种现象:
>   1、直接在不认证的hadoop集群提交 读认证hbase 的任务,可以从日志看到,
>hadoop 运行在 simple 模式(默认模式,不认证模式),对应日志“Hadoop user set
>to xxx (auth: SIMPLE) ”,任务卡在读hbase 的地方,直到超时
>   2、修改提交节点的 core-site.xml/hdfs-site.xml,注入一个新的
>core-site.xml 带配置参数 " Hadoop.security.authentication  = kerberos",可以
>在日志中看到 "Hadoop user set to xxx (auth: KERBEROS)",但是任务一直处于
>“created” 状态,日志报:“server asks us to fall back to SIMPLE auth. But
>the client is configured to only allow secure connections”
>
>
>
>
>-邮件原件-
>发件人: user-zh-return-1559-wxchunjhyy=163@flink.apache.org
> 代表 guoshuai
>发送时间: Wednesday, November 27, 2019 2:26 PM
>收件人: user-zh@flink.apache.org
>主题: Re:回复: flink on yarn 的 kerberos 认证问题
>
>
>
>HBase认证需要ZooKeeper和Kerberos安全认证,跟ZooKeeper认证“jaas.conf”文件也
>加载进去了吗?
>
>LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName,
>userKeytabFile);
>LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY,
>ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
>LoginUtil.login(userName, userKeytabFile, krb5File, conf);
>
>
>
>
>
>
>
>
>在 2019-11-27 14:00:15,"venn"  写道:
>>我们kafka 可以正常认证、消费数据,认证hbase 和kafka 好像不一样,我们是不认
>证
>>读不到数据,认证了,任务又提交不到 yarn 上去了
>>
>>如下:
>>>看过对应位置的代码,将 “Hadoop.security.authentication =
>>kerberos” 
>>>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>>>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
>>“Hadoop.security.
>>>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>>>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
>>SIMPLE
>>>auth. But the client is configured to only allow secure connections”
>>
>>
>>
>>-邮件原件-
>>发件人: user-zh-return-1557-wxchunjhyy=163@flink.apache.org
>> 代表 guoshuai
>>发送时间: Wednesday, November 27, 2019 1:31 PM
>>收件人: user-zh@flink.apache.org
>>主题: Re:flink on yarn 的 kerberos 认证问题
>>
>>kerberos用户的krb5.conf ,user.keytab文件是否在程序运行时加载到了,我之前遇到
>>的kerberos问题是flink读kafka获取不到数据,通过yarn-session模式运行,认证阶段
>是
>>在yarn-session发布完成的. 最后问题出在kafka通信协议,可以看下hbase端的配置,
>实
>>现不行 也可以解耦hbase跟flink中间加一个kafka
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2019-11-26 14:50:32,"venn"  写道:
>>>各位大佬:
>>>
>>>请教一个flink 认证的问题: Flink on yarn 运行在不用认证的
>>Hadoop
>>>集群上,怎样访问带kerberos 认证集群的 hbase ?
>>>
>>> 
>>>
>>>下面是一些我们使用的描述和发现的问题:
>>>
>>>我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是
>>simple
>>>认证模式,Flink 1.9.0  部署在 simple 认证的集群上。
>>>
>>>最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
>>>flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
>>>security.kerberos.login.principal 。
>>>
>>>我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
>>>rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子上,
>>直
>>>到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
>>>志: 
>>>
>>>
>>>org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn
>>>
>>>org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
>>>user: admin (注:登陆用户)
>>>
>>> 
>>>
>>>org.apache.flink.yarn.YarnTaskExecutorRunner   - YARN daemon is running
>as:
>>>admin Yarn client user obtainer: admin
>>>
>>>org.apache.flink.runtime.security.modules.HadoopModule  - Hadoop user 
>>>set to admin (auth:SIMPLE)
>>>
>>> 
>>>
>>>看过对应位置的代码,将 “Hadoop.security.authentication =
>>kerberos” 
>>>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>>>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
>>“Hadoop.security.
>>>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>>>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
>>SIMPLE
>>>auth. But the client is configured to only allow secure connections”
>>>
>>> 
>>>
>>> 
>>>
>>>看到官网文档有这样的描述:
>>>https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/securi
>>>t
>>>y-ker
>>>beros.html
>>>
>>>
>>>Hadoop Security Module
>>>
>>>This module uses the Hadoop UserGroupInformation (UGI) class to 
>>>establish a process-wide login user context. The login user is then 
>>>used for all interactions with Hadoop, including HDFS, HBase, and YARN.
>>>
>>>If Hadoop security is enabled (in core-site.xml), the login user will 
>>>have whatever Kerberos credential is configured. Otherwise, the login 
>>>user conveys only the user identity of the OS account that launched 
>>>the
>>cluster.
>>>
>>> 
>>>
>>> 
>>>
>>> 
>>>


Re: Some doubts about window start time and end time

2019-11-27 Thread Caizhi Weng
Hi Jun,

You have to specify an offset when defining the windows. According to the
Java docs of TumblingProcessingTimeWindows: "*if you are living in
somewhere which is not using UTC±00:00 time*,* such as China which is using
UTC+08:00*,*and you want a time window with size of one day*,* and window
begins at every 00:00:00 of local time*,*you may use {**@code **of*(
*Time.days*(*1*),*Time.hours*(*-8*))*}. The parameter of offset is {**@code
**Time.hours*(*-8*))*} since UTC+08:00 is 8 hours earlier than UTC time.*"

Does this solve the problem?

Jun Zhang <825875...@qq.com> 于2019年11月27日周三 下午6:03写道:

>
>
> Hi,Caizhi :
>
> the code like this :
>
>
> dataStream
> .keyBy(“device")
> .window(TumblingProcessingTimeWindows.of(Time.hours(6)))
> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
> .aggregate(new MyAggre(), new
> WindowResultFunction())
>  .print();
>
> I add a trigger for quick output
>
>
> On 11/27/2019 17:54,Caizhi Weng
>  wrote:
>
> Hi Jun,
>
> How do you define your window? Could you please show us the code?
>
> Thanks.
>
> Jun Zhang <825875...@qq.com> 于2019年11月27日周三 下午5:22写道:
>
>> ,
>> Hi:
>> I defined a Tumbling window, I set the time size to one hour, and the
>> resulting windows are [00: 00: 00-01: 00: 00], [01: 00: 00-02: 00: 00]. 
>> This meets my expectations, but when I set the time size to 6 hours, the
>> resulting window size is [02: 00: 00-08: 00: 00], [08: 00: 00-14: 00: 00],
>> [14: 00: 00-20: 00: 00] ...
>> But my expected window size is [00: 00: 00-06: 00: 00], [06: 00: 00-12:
>> 00: 00] ...
>> Is it right to get such a window result?
>>
>> thanks
>>
>>
>>


Re: Some doubts about window start time and end time

2019-11-27 Thread Jun Zhang
Hi??Caizhi ??


the code like this :




dataStream
.keyBy(??device")
.window(TumblingProcessingTimeWindows.of(Time.hours(6)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))


 .aggregate(new MyAggre(), new WindowResultFunction())  
   
 .print();



I add a trigger for quick output




On 11/27/2019 17:54??Caizhi Weng

Re: Some doubts about window start time and end time

2019-11-27 Thread Caizhi Weng
Hi Jun,

How do you define your window? Could you please show us the code?

Thanks.

Jun Zhang <825875...@qq.com> 于2019年11月27日周三 下午5:22写道:

> ,
> Hi:
> I defined a Tumbling window, I set the time size to one hour, and the
> resulting windows are [00: 00: 00-01: 00: 00], [01: 00: 00-02: 00: 00]. 
> This meets my expectations, but when I set the time size to 6 hours, the
> resulting window size is [02: 00: 00-08: 00: 00], [08: 00: 00-14: 00: 00],
> [14: 00: 00-20: 00: 00] ...
> But my expected window size is [00: 00: 00-06: 00: 00], [06: 00: 00-12:
> 00: 00] ...
> Is it right to get such a window result?
>
> thanks
>
>
>


flink 1.9.1状态持续增大

2019-11-27 Thread 谷歌-akulaku
Hello,我这面用FlinkKafkaConsumer011订阅topic list,在设置过期时间后过期状态没有清理,请问有什么解决办法吗,
并且使用双流union过期状态也是不清理,但是单流的和单topic的情况是可以清理的,请问这是bug吗



发送自 Windows 10 版邮件应用



Some doubts about window start time and end time

2019-11-27 Thread Jun Zhang
,  
Hi:
I defined a Tumbling window, I set the time size to one hour, and the resulting 
windows are [00: 00: 00-01: 00: 00], [01: 00: 00-02: 00: 00]. 
This meets my expectations, but when I set the time size to 6 hours, the 
resulting window size is [02: 00: 00-08: 00: 00], [08: 00: 00-14: 00: 00], [14: 
00: 00-20: 00: 00] ...
But my expected window size is [00: 00: 00-06: 00: 00], [06: 00: 00-12: 00: 00] 
...
Is it right to get such a window result?



thanks

回复: 回复: flink on yarn 的 kerberos 认证问题

2019-11-27 Thread venn
我们好像zookeeper 没有开安全认证,从Java 代码也没有添加 jaas.conf 文件,而且
看 日志里面有 关于 zookeeper 已经建立连接相关的日志。


问题其实还没到这一步,我现在的问题是:“Flink on yarn 运行在不用认证的hadoop
集群上,是否可以访问带kerberos 认证的hadoop集群的 hbase”

现在是这两种现象:
1、直接在不认证的hadoop集群提交 读认证hbase 的任务,可以从日志看到,
hadoop 运行在 simple 模式(默认模式,不认证模式),对应日志“Hadoop user set
to xxx (auth: SIMPLE) ”,任务卡在读hbase 的地方,直到超时
2、修改提交节点的 core-site.xml/hdfs-site.xml,注入一个新的
core-site.xml 带配置参数 " Hadoop.security.authentication  = kerberos",可以
在日志中看到 "Hadoop user set to xxx (auth: KERBEROS)",但是任务一直处于
“created” 状态,日志报:“server asks us to fall back to SIMPLE auth. But
the client is configured to only allow secure connections”




-邮件原件-
发件人: user-zh-return-1559-wxchunjhyy=163@flink.apache.org
 代表 guoshuai
发送时间: Wednesday, November 27, 2019 2:26 PM
收件人: user-zh@flink.apache.org
主题: Re:回复: flink on yarn 的 kerberos 认证问题



HBase认证需要ZooKeeper和Kerberos安全认证,跟ZooKeeper认证“jaas.conf”文件也
加载进去了吗?

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName,
userKeytabFile);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY,
ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
LoginUtil.login(userName, userKeytabFile, krb5File, conf);








在 2019-11-27 14:00:15,"venn"  写道:
>我们kafka 可以正常认证、消费数据,认证hbase 和kafka 好像不一样,我们是不认
证
>读不到数据,认证了,任务又提交不到 yarn 上去了
>
>如下:
>>看过对应位置的代码,将 “Hadoop.security.authentication =
>kerberos” 
>>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
>“Hadoop.security.
>>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
>SIMPLE
>>auth. But the client is configured to only allow secure connections”
>
>
>
>-邮件原件-
>发件人: user-zh-return-1557-wxchunjhyy=163@flink.apache.org
> 代表 guoshuai
>发送时间: Wednesday, November 27, 2019 1:31 PM
>收件人: user-zh@flink.apache.org
>主题: Re:flink on yarn 的 kerberos 认证问题
>
>kerberos用户的krb5.conf ,user.keytab文件是否在程序运行时加载到了,我之前遇到
>的kerberos问题是flink读kafka获取不到数据,通过yarn-session模式运行,认证阶段
是
>在yarn-session发布完成的. 最后问题出在kafka通信协议,可以看下hbase端的配置,
实
>现不行 也可以解耦hbase跟flink中间加一个kafka
>
>
>
>
>
>
>
>
>在 2019-11-26 14:50:32,"venn"  写道:
>>各位大佬:
>>
>>请教一个flink 认证的问题: Flink on yarn 运行在不用认证的
>Hadoop
>>集群上,怎样访问带kerberos 认证集群的 hbase ?
>>
>> 
>>
>>下面是一些我们使用的描述和发现的问题:
>>
>>我们有两个hadoop 集群,一个使用 Kerberos 认证模式,一个是
>simple
>>认证模式,Flink 1.9.0  部署在 simple 认证的集群上。
>>
>>最近在使用flink 读取 Kerberos 认证的集群的 hbase 上遇到了问题。配置
>>flink-conf.yaml 中的配置参数:security.kerberos.login.keytab 、
>>security.kerberos.login.principal 。
>>
>>我们计划在 map 中同步的读取 hbase 的数据,从输入数据中获取
>>rowkey,使用get 方式获取hbase 数据,程序启动后,呈现 “卡” 在map 算子上,
>直
>>到hbase get 超时,无法读取任何数据。在 taskmanager.log 中有发现有这样的日
>>志: 
>>
>>
>>org.apache.flink.yarn.YarnTaskExecutorRunner   - OS current user: yarn
>>
>>org.apache.flink.yarn.YarnTaskExecutorRunner   - current Hadoop/Kerberos
>>user: admin (注:登陆用户)
>>
>> 
>>
>>org.apache.flink.yarn.YarnTaskExecutorRunner   - YARN daemon is running
as:
>>admin Yarn client user obtainer: admin
>>
>>org.apache.flink.runtime.security.modules.HadoopModule  - Hadoop user 
>>set to admin (auth:SIMPLE)
>>
>> 
>>
>>看过对应位置的代码,将 “Hadoop.security.authentication =
>kerberos” 
>>参数添加到 Hadoop 的 配置文件中(注: 使用 simple 认证的 hadoop集
>>群使用 amberi 部署 的 hdp 集群,不开启 Kerberos 认证 参数
>“Hadoop.security.
>>authentication” 的值为 simple ),使程序认证通过,但是 flink job 一直处于
>>created 状态,taskmanager.log 中一直报 “server asks us to fall back to
>SIMPLE
>>auth. But the client is configured to only allow secure connections”
>>
>> 
>>
>> 
>>
>>看到官网文档有这样的描述:
>>https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/securi
>>t
>>y-ker
>>beros.html
>>
>>
>>Hadoop Security Module
>>
>>This module uses the Hadoop UserGroupInformation (UGI) class to 
>>establish a process-wide login user context. The login user is then 
>>used for all interactions with Hadoop, including HDFS, HBase, and YARN.
>>
>>If Hadoop security is enabled (in core-site.xml), the login user will 
>>have whatever Kerberos credential is configured. Otherwise, the login 
>>user conveys only the user identity of the OS account that launched 
>>the
>cluster.
>>
>> 
>>
>> 
>>
>> 
>>


Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi,

As the follow up, it seem that savepoint can't be subsumed, so that its
notification could still be send to each TMs.
Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?

Best,
Tony Wei

Tony Wei  於 2019年11月27日 週三 下午3:43寫道:

> Hi,
>
> I want to raise this question again, since I have had this exception on my
> production job.
>
> The exception is as follows
>
>
>> 2019-11-27 14:47:29
>
>
>
> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>> .java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>> transaction pending
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:
>> 195)
>> at org.apache.flink.streaming.api.functions.sink.
>> TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
>> TwoPhaseCommitSinkFunction.java:267)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .notifyCheckpointComplete(StreamTask.java:822)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>> ... 5 more
>
>
> And these are the checkpoint / savepoint before the job failed.
> [image: checkoint.png]
>
> It seems that checkpoint # 675's notification handled the savepoint #
> 674's pending transaction holder, but savepoint #674's notification didn't
> be subsumed or be ignored by JM.
> Therefore, during the checkpoint #676, some tasks got notification before
> getting the checkpoint barrier and led to this exception happened, because
> there was no pending transaction in queue.
>
> Does anyone know the details about subsumed notifications mechanism and
> how checkpoint coordinator handle this situation? Please correct me if I'm
> wrong. Thanks.
>
> Best,
> Tony Wei
>
> Stefan Richter  於 2018年10月8日 週一 下午5:03寫道:
>
>> Hi Pedro,
>>
>> unfortunately the interesting parts are all removed from the log, we
>> already know about the exception itself. In particular, what I would like
>> to see is what checkpoints have been triggered and completed before the
>> exception happens.
>>
>> Best,
>> Stefan
>>
>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves > >:
>> >
>> > Hello,
>> >
>> > Find attached the jobmanager.log. I've omitted the log lines from other
>> > runs, only left the job manager info and the run with the error.
>> >
>> > jobmanager.log
>> > <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>
>>
>> >
>> >
>> >
>> > Thanks again for your help.
>> >
>> > Regards,
>> > Pedro.
>> >
>> >
>> >
>> > -
>> > Best Regards,
>> > Pedro Chaves
>> > --
>> > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>