Re: Checkpoint Error

2021-03-06 Thread Navneeth Krishnan
Hi All,

Any suggestions?

Thanks

On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> We are running our streaming job on flink 1.7.2 and we are noticing the
> below error. Not sure what's causing it, any pointers would help. We have
> 10 TM's checkpointing to AWS EFS.
>
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 11 for operator Processor -> Sink: KafkaSink (34/42).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 11 for 
> operator Processor -> Sink: KafkaSink (34/42).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>   ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
>   at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>   ... 7 more
> Caused by: java.io.IOException: Stale file handle
>   at java.io.FileOutputStream.close0(Native Method)
>   at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
>   at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
>   at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
>   at java.io.FileOutputStream.close(FileOutputStream.java:354)
>   at 
> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
>   ... 12 more
>
>
> Thanks
>
>


??flink ???? Kerberos??????hive ??

2021-03-06 Thread guoyb
??
flink1.12.1
hive2.1.0
CDH6.2.0



Kerberoshive??

Kerberos
hive metastore??


??sql-client.sh embedded??
Flink SQL show tables;
dimension_table
dimension_table1
test


Flink SQL select * from test;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Failed to collect all 
partitions from hive metaStore


?? 
/opt/cloudera/parcels/FLINK-1.12.1-BIN-SCALA_2.11/lib/flink/log/flink-root-sql-client-cdh6.com.log??

2021-03-07 10:29:18.776 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Trying to connect to 
localhost/127.0.0.1:6123
2021-03-07 10:29:18.777 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address 'cdh6.com/192.168.31.10': Connection refused (Connection refused)
2021-03-07 10:29:18.778 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/127.0.0.1': Connection refused (Connection refused)
2021-03-07 10:29:18.778 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': Network is unreachable (connect 
failed)
2021-03-07 10:29:18.778 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/192.168.31.10': Connection refused (Connection refused)
2021-03-07 10:29:18.779 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/0:0:0:0:0:0:0:1%lo': Network is unreachable (connect failed)
2021-03-07 10:29:18.779 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/127.0.0.1': Connection refused (Connection refused)
2021-03-07 10:29:18.779 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': Network is unreachable (connect 
failed)
2021-03-07 10:29:18.779 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/192.168.31.10': Connection refused (Connection refused)
2021-03-07 10:29:18.780 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/0:0:0:0:0:0:0:1%lo': Network is unreachable (connect failed)
2021-03-07 10:29:18.780 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/127.0.0.1': Connection refused (Connection refused)
2021-03-07 10:29:18.780 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Could not connect. Waiting 
for 1600 msecs before next attempt
2021-03-07 10:29:20.381 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Trying to connect to 
localhost/127.0.0.1:6123
2021-03-07 10:29:20.381 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address 'cdh6.com/192.168.31.10': Connection refused (Connection refused)
2021-03-07 10:29:20.382 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/127.0.0.1': Connection refused (Connection refused)
2021-03-07 10:29:20.383 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': Network is unreachable (connect 
failed)
2021-03-07 10:29:20.383 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/192.168.31.10': Connection refused (Connection refused)
2021-03-07 10:29:20.383 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/0:0:0:0:0:0:0:1%lo': Network is unreachable (connect failed)
2021-03-07 10:29:20.383 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/127.0.0.1': Connection refused (Connection refused)
2021-03-07 10:29:20.384 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/fe80:0:0:0:20c:29ff:fea1:6d6b%ens33': Network is unreachable (connect 
failed)
2021-03-07 10:29:20.384 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/192.168.31.10': Connection refused (Connection refused)
2021-03-07 10:29:20.384 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/0:0:0:0:0:0:0:1%lo': Network is unreachable (connect failed)
2021-03-07 10:29:20.385 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address '/127.0.0.1': Connection refused (Connection refused)
2021-03-07 10:29:20.385 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Could not connect. Waiting 
for 1829 msecs before next attempt
2021-03-07 10:29:22.214 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Trying to connect to 
localhost/127.0.0.1:6123
2021-03-07 10:29:22.215 [main] INFO 
org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from 
address 'cdh6.com/192.168.31.10': Connection 

Re: Dynamic JDBC Sink Support

2021-03-06 Thread Maciej Obuchowski
Hey Rion,

I had exactly the same problem and implemented this functionality in
my Flink fork with XA sink taken from the development branch.
As I see that it's not only my problem, I've created a Jira task for
it - FLINK-21643 - and will provide draft PR for it.

@David - for traditional relational databases even a relatively small
number of connections can be unreasonable here.

Thanks,
Maciej

pt., 5 mar 2021 o 21:55 David Anderson  napisał(a):
>
> Rion,
>
> A given JdbcSink can only write to one table, but if the number of tables 
> involved isn't unreasonable, you could use a separate sink for each table, 
> and use side outputs [1] from a process function to steer each record to the 
> appropriate sink.
>
> I suggest you avoid trying to implement a sink.
>
> In general, custom sinks need to implement their own checkpointing, though 
> there is a generic two phase commit sink you can use as a starting point for 
> implementing a transactional sink. FYI, the JDBC sink has been reworked for 
> 1.13 to include exactly-once guarantees based on the XA standard [2].
>
> Regards,
> David
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink
>
> On Fri, Mar 5, 2021 at 7:34 PM Rion Williams  wrote:
>>
>> Hi all,
>>
>> I’ve been playing around with a proof-of-concept application with Flink to 
>> assist a colleague of mine. The application is fairly simple (take in a 
>> single input and identify various attributes about it) with the goal of 
>> outputting those to separate tables in Postgres:
>>
>> object AttributeIdentificationJob {
>> @JvmStatic
>> fun main(args: Array) {
>> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>
>> stream
>> .addSource(ReadFromKafka())
>> .process(IdentifyAttributesFunction())
>> .addSink(DynamicJdbcHere())
>>
>> // execute program
>> stream.execute("Attribute Identification")
>> }
>> }
>>
>> Considering my attributes may be of varying types (all implementing an 
>> Attribute interface), I don't know if the existing JdbcSink functionality or 
>> some variant of it (i.e. one of the dynamic ones that I see listed) could 
>> handle this functionality. Essentially for a given "bundle" of records, I'd 
>> need to ensure that each respective type of attribute was upserted into its 
>> corresponding table within a Postgres database.
>>
>> Is that something that the connector can handle on it's own? Or would I need 
>> to implement my own RichSinkFunction> that could 
>> handle opening a connection to Postgres and dynamically generating the 
>> appropriate UPSERT statements to handle sending the records? As a follow up 
>> to that, if I did need to write my own RichSinkFunction, would I need to 
>> implement my own checkmarking for resilience purposes or does that come 
>> along for the ride for RichSinkFunctions?
>>
>> Any insight or approaches would be welcome!
>>
>> Thanks,
>>
>> Rion


Re: reading file from s3

2021-03-06 Thread Tamir Sagi
I had a typo in my previous answer, the env name was missing an 'S'

ENABLE_BUILT_IN_PLUGIN --> ENABLE_BUILT_IN_PLUGINS
once again, the value is the plugin jar name : 
flink-s3-fs-hadoop-.jar
The complete list can be found 
here

You can Build your own Flink image and set an Environment variable in it or 
once you run the container.
If you execute it locally(not in a container) in a standalone cluster, make 
sure this env is defined in system level.

Tamir.
[https://my-email-signature.link/signature.gif?u=1088647=139753658=61d0f9764de1d96603b59533d7797a450ed41f983a06e5e81998ef6e88d9aff3]

From: Tamir Sagi 
Sent: Saturday, March 6, 2021 7:33 PM
To: Avi Levi ; Chesnay Schepler 
Cc: user@flink.apache.org 
Subject: [SUSPECTED FRAUD]Re: reading file from s3

Hey Avi,

Do you use 'Hadoop S3 plugin' to read from S3?

If yes, what is its version?

If not try to read from S3 as follow 
(ref)

  1.  set an environment variable to use hadoop plugin (it's part of Flink 
image):
key = ENABLE_BUILT_IN_PLUGIN
value = flink-s3-fs-hadoop-.jar (i.e 
flink-s3-fs-hadoop-1.11.1.jar,  for Flink 1.11.1)
  2.  read the file from S3:
DataSource lines = env.readTextFile("s3://");

Tamir
[https://my-email-signature.link/signature.gif?u=1088647=139745102=e1c175d1aad586ec34f211146023d1e58b49bba775226af52da8148eaa4c27fd]

From: Avi Levi 
Sent: Saturday, March 6, 2021 6:59 AM
To: Chesnay Schepler 
Cc: user@flink.apache.org 
Subject: Re: reading file from s3


EXTERNAL EMAIL


Does anyone by any chance have a working example (of course without the 
credentials etc') that can be shared on github ?simply reading/writing a file 
from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks

On Thu, Mar 4, 2021 at 7:30 PM Avi Levi 
mailto:a...@theneura.com>> wrote:
Sure, This is the full exception stacktrace:

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 

Re: reading file from s3

2021-03-06 Thread Tamir Sagi
Hey Avi,

Do you use 'Hadoop S3 plugin' to read from S3?

If yes, what is its version?

If not try to read from S3 as follow 
(ref)

  1.  set an environment variable to use hadoop plugin (it's part of Flink 
image):
key = ENABLE_BUILT_IN_PLUGIN
value = flink-s3-fs-hadoop-.jar (i.e 
flink-s3-fs-hadoop-1.11.1.jar,  for Flink 1.11.1)
  2.  read the file from S3:
DataSource lines = env.readTextFile("s3://");

Tamir
[https://my-email-signature.link/signature.gif?u=1088647=139745102=e1c175d1aad586ec34f211146023d1e58b49bba775226af52da8148eaa4c27fd]

From: Avi Levi 
Sent: Saturday, March 6, 2021 6:59 AM
To: Chesnay Schepler 
Cc: user@flink.apache.org 
Subject: Re: reading file from s3


EXTERNAL EMAIL


Does anyone by any chance have a working example (of course without the 
credentials etc') that can be shared on github ?simply reading/writing a file 
from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks

On Thu, Mar 4, 2021 at 7:30 PM Avi Levi 
mailto:a...@theneura.com>> wrote:
Sure, This is the full exception stacktrace:

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at 

Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-06 Thread tison
有的,通过 FLINK 和 YARN 或 k8s 的接口进行编程,管理元数据,管理用户文件,支持提交作业及之后管理作业状态,这是许多公司应用
Flink 的实现方式。

你说的前端的东西是对接口返回值的友好展示,Flink 本身有一个 Web 前端,可以支持你要的大部分功能,但是多任务可能会有一些缺陷,尤其你不是使用
session 模式的情况下。

向下整合 Flink 能力以及实际部署的集群信息,向上提供人性化的交互页面,按照传统的 Web App 开发思路就可以的。

Best,
tison.


Jacob <17691150...@163.com> 于2021年3月6日周六 下午4:00写道:

> 我们现在提交Flink Job
> 是通过flink客户端run命令提交job,进行实时任务的计算,每次提交都要登录prd机器,上传jar包,过程比较麻烦。
>
>
> 后期规划把实时计算的任务集成到我们已有的一个系统中,把上面描述的过程封装起来,给用户提供一些按钮、菜单等,理想状态下,在这个系统增加一些模块、菜单之类的东西,就能完成对Job的维护,包括提交Job、查看正在运行的Job、停止Job等等
>
>
> 上面所说的这个系统是我们自研的一个数据处理平台,实时计算任务也是其中的一环,因此就想把实时计算的任务的模块也集成到其中去。
>
>
> 不知道这有没有可能实现
>
> 请大佬提供些许思路!感谢
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-06 Thread Shuiqiang Chen
Hi Kevin,

For your information, bellow is an example for running a PyFlink table API
WordCount job.

1. Building a Docker image with Python and PyFlink Installed:

Dockerfile:

FROM flink:1.12.0


# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install Python Flink

RUN pip3 install apache-flink==1.12.0

2. Resource definitions:

Flink-configuration-configmap.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
app: flink
data:
  flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
  log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors
on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel
handler
logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

Job-manager-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
port: 6123
  - name: blob-server
port: 6124
  - name: webui
port: 8081
  selector:
app: flink
component: jobmanager

Job-manager.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
metadata:
  labels:
app: flink
component: jobmanager
spec:
  restartPolicy: OnFailure
  containers:
- name: jobmanager
  image: pyflink:v1
  env:
  args: ["standalone-job", "-py",
"/opt/flink/examples/python/table/batch/word_count.py"]
  ports:
- containerPort: 6123
  name: rpc
- containerPort: 6124
  name: blob-server
- containerPort: 8081
  name: webui
  livenessProbe:
tcpSocket:
  port: 6123
initialDelaySeconds: 30
periodSeconds: 60
  volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf
  securityContext:
runAsUser:   # refers to user _flink_ from official flink
image, change if necessary
  volumes:
- name: flink-config-volume
  configMap:
name: flink-config
items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties

Task-manager.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
matchLabels:
  app: flink
  component: taskmanager
  template:
metadata:
  labels:
app: flink
component: taskmanager
spec:
  containers:
  - name: taskmanager
image: pyflink:v1
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
  name: rpc
- containerPort: 6125
  name: query-state

Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-06 Thread Shuiqiang Chen
Hi Kevin,

You are able to run PyFlink applications on kuberetes cluster, both native
k8s mode and resource definition mode are supported since release-1.12.0.
Currently, Python and PyFlink are not enabled in official flink docker
image, that you might need to build a custom image with Python and PyFlink
install, please refer to Enbale Python in docker

.

Generally, by setting the value of args field in
`jobmanager-application.yaml` to be args: ["standalone-job", "--python",
"my_python_app.py", , ] the job manager
will try to submit a PyFlink job with the specified python file once it is
started. You can check the pod status for jobmanger and taskmanger via
`kubectl get pods [-n namespace]`. The job manger pod will turn to the
completed state once the job is finished or error state if there is
something wrong, while the task manger pod will always be in the running
state.

Finally, it requires you to tear down the cluster by deleting all created
resources (jobmanger/taskmanger jobs, flink-conf configmap,
jobmanger-service, etc).

Best,
Shuiqiang



Kevin Lam  于2021年3月6日周六 上午5:29写道:

> Hello everyone,
>
> I'm looking to run a Pyflink application run in a distributed fashion,
> using kubernetes, and am currently facing issues. I've successfully gotten
> a Scala Flink Application to run using the manifests provided at [0]
>
> I attempted to run the application by updating the jobmanager command args
> from
>
>  args: ["standalone-job", "--job-classname", "com.job.ClassName",  arguments>, ]
>
> to
>
> args: ["standalone-job", "--python", "my_python_app.py",  arguments>, ]
>
> But this didn't work. It resulted in the following error:
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
> org.apache.commons.cli.Options. A different class with the same name was
> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
> module of loader 'app'
>
> I was able to get things to 'run' by setting args to:
>
> args: ["python", "my_python_app.py", , ]
>
>
> But I'm not sure if things were running in a distributed fashion or not.
>
> 1/ Is there a good way to check if the task pods were being correctly
> utilized?
>
> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
> kubernetes?
>
> Open to any suggestions you may have. Note: we'd prefer not to run using
> the native K8S route outlined at [1] because we need to maintain the
> ability to customize certain aspects of the deployment (eg. mounting SSDs
> to some of the pods)
>
> Thanks in advance!
>
> [0]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>
>


flink 1.12.2??????DataStream????Table??????rowtime????????????????

2021-03-06 Thread Asahi Lee
??
   ??
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);

DataStream

java options to generate heap dump in EMR not working

2021-03-06 Thread bat man
Hi,

I am trying to generate a heap dump to debug a GC overhead OOM. For that I
added the below java options in flink-conf.yaml, however after adding this
the yarn is not able to launch the containers. The job logs show it goes on
requesting for containers from yarn and it gets them, again releases it.
then again the same cycle continues. If I remove the option from
flink-conf.yaml then the containers are launched and the job starts
processing.


*env.java.opts.taskmanager: "-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof"*

If I try this then yarn client does not comes up -


*env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof"*

Am I doing anything wrong here?

PS: I am using EMR.

Thanks,
Hemant


Flink Job 如何集成到自己的系统,方便管理

2021-03-06 Thread Jacob
我们现在提交Flink Job 是通过flink客户端run命令提交job,进行实时任务的计算,每次提交都要登录prd机器,上传jar包,过程比较麻烦。 


后期规划把实时计算的任务集成到我们已有的一个系统中,把上面描述的过程封装起来,给用户提供一些按钮、菜单等,理想状态下,在这个系统增加一些模块、菜单之类的东西,就能完成对Job的维护,包括提交Job、查看正在运行的Job、停止Job等等
  

上面所说的这个系统是我们自研的一个数据处理平台,实时计算任务也是其中的一环,因此就想把实时计算的任务的模块也集成到其中去。


不知道这有没有可能实现

请大佬提供些许思路!感谢



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/