hi,yang

使用的镜像是docker hub提供的1.10.1版本镜像。因此解法是
1. 等待1.11版本
2. 自行构建flink 1.10.1版本镜像,降低jdk版本?

Looking forward to your reply and help.

Best





| |
a511955993
|
|
邮箱:[email protected]
|

签名由 网易邮箱大师 定制

在2020年05月27日 13:25,Yang Wang 写道:
"Broken pipe" 这个是fabric8的kubernetes-client的一个bug
你镜像的jdk版本是java 8u252吧,目前Flink on K8s不能和java 8u252一起工作,
解法是使用8u252以下的jdk版本或者升级到jdk11

在Flink 1.11里面会升级fabric8的kubernetes client依赖到最新版本来解决


Best,
Yang

<[email protected]> 于2020年5月27日周三 下午12:52写道:

>
> 根据文档[1]进行配置,可以看到具体日志信息,启动指令如下:
>
> /usr/local/flink/flink-1.10.1/bin/kubernetes-session.sh \
>          -Dkubernetes.cluster-id=ipcode \
>          -Dkubernetes.jobmanager.service-account=flink \
>          -Dtaskmanager.memory.process.size=4096m \
>          -Dkubernetes.taskmanager.cpu=2 \
>          -Dtaskmanager.numberOfTaskSlots=4 \
>          -Dkubernetes.namespace=flink-ipcode \
>          -Dkubernetes.rest-service.exposed.type=NodePort \
>          -Dkubernetes.container-start-command-template="%java% %classpath%
> %jvmmem% %jvmopts% %logging% %class% %args%" \
>          -Dakka.framesize=104857600b \
>          -Dkubernetes.container.image=flink:1.10.1
>
>
> 对应的service、deployment、ConfigMap都已经创建
>
>
> kubectl get svc -n flink-ipcode
> NAME          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)
>           AGE
> ipcode        ClusterIP   xxxxxxxxxxxxx   <none>
> 8081/TCP,6123/TCP,6124/TCP   21s
> ipcode-rest   NodePort    xxxxxxxxxxxx    <none>        8081:30803/TCP
>            21s
>
>
> kubernetes版本 v1.17.4失败,V1.15.1成功。
>
> 1.17.4的jobmanager报错日志如下:
>
>
> 2020-05-27 04:37:44,225 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Fatal error
> occurred in the cluster entrypoint.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> Could not start the ResourceManager akka.tcp://[email protected]
> :6123/user/resourcemanager
>  at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:193)
>  at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:185)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:549)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>  at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>  at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>  at akka.actor.Actor.aroundReceive(Actor.scala:517)
>  at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>  at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> Operation: [list]  for kind: [Pod]  with name: [null]  in namespace:
> [flink-ipcode]  failed.
>  at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
>  at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
>  at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:151)
>  at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:614)
>  at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:63)
>  at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getPodsWithLabels(Fabric8FlinkKubeClient.java:211)
>  at
> org.apache.flink.kubernetes.KubernetesResourceManager.recoverWorkerNodesFromPreviousAttempts(KubernetesResourceManager.java:233)
>  at
> org.apache.flink.kubernetes.KubernetesResourceManager.initialize(KubernetesResourceManager.java:145)
>  at
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:203)
>  at
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:191)
>  ... 22 more
> Caused by: java.net.SocketException: Broken pipe (Write failed)
>  at java.net.SocketOutputStream.socketWrite0(Native Method)
>  at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
>  at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
>  at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
>  at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
>  at
> sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:894)
>  at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:865)
>  at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
>  at org.apache.flink.kubernetes.shadded.okio.Okio$1.write(Okio.java:79)
>  at
> org.apache.flink.kubernetes.shadded.okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
>  at
> org.apache.flink.kubernetes.shadded.okio.RealBufferedSink.flush(RealBufferedSink.java:224)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http2.Http2Writer.settings(Http2Writer.java:203)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http2.Http2Connection.start(Http2Connection.java:515)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http2.Http2Connection.start(Http2Connection.java:505)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.RealConnection.startHttp2(RealConnection.java:298)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:287)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:168)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:257)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at
> io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:119)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at
> io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at
> io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:110)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:254)
>  at
> org.apache.flink.kubernetes.shadded.okhttp3.RealCall.execute(RealCall.java:92)
>  at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:411)
>  at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:372)
>  at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:354)
>  at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:147)
>  ... 29 more
>
>
> [1].
>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#log-files
>
> Looking forward to your reply and help.
>
> Best
>
>
> | |
> a511955993
> |
> |
> 邮箱:[email protected]
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年05月27日 11:47,Yang Wang 写道:
> Hi,
>
> 很高兴尝试native K8s,默认情况下Pod的log是没有输出到console的,所以
> 会导致kubectl logs查看不了,可以参考这个文档[1]进行配置,然后看到具体报错。
>
> 当然,需要首先确认的是service、deployment、ConfigMap已经创建成功,如果失败
> 的话,Flink client端应该会有报错信息的
>
> [1].
>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#log-files
>
>
> Best,
> Yang
>
>
> <[email protected]> 于2020年5月27日周三 上午11:21写道:
>
> >
> >
> > 在两个kubernetes版本下进行一样的操作,结果如下:
> >
> > v1.17.4  失败
> > v1.15.1  成功
> >
> >
> > 步骤如下:
> >
> >
> > 创建rbac
> >
> > rbac.yaml
> >
> > apiVersion: v1
> > kind: ServiceAccount
> > metadata:
> >  name: flink
> >  namespace: flink
> > ---
> >
> > apiVersion: rbac.authorization.k8s.io/v1
> > kind: ClusterRoleBinding
> > metadata:
> >  name: flink-role-binding
> > roleRef:
> >  apiGroup: rbac.authorization.k8s.io
> >  kind: ClusterRole
> >  name: edit
> > subjects:
> > - kind: ServiceAccount
> >  name: flink
> >  namespace: flink
> >
> >
> >
> > 然后运行如下语句:
> >
> > /usr/local/flink/flink-1.10.1/bin/kubernetes-session.sh \
> >          -Dkubernetes.cluster-id=flink \
> >          -Dkubernetes.jobmanager.service-account=flink \
> >          -Dtaskmanager.memory.process.size=4096m \
> >          -Dkubernetes.taskmanager.cpu=2 \
> >          -Dtaskmanager.numberOfTaskSlots=4 \
> >          -Dkubernetes.namespace=flink \
> >          -Dkubernetes.rest-service.exposed.type=NodePort \
> >          -Dakka.framesize=104857600b \
> >          -Dkubernetes.container.image=flink:1.10.1
> >
> >
> > 在1.15可以正常构建出集群,在1.17版本会出现Back-off restarting failed
> > container,查看日志除了如下日志无其他输出
> >
> > Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath
> > $FLINK_CLASSPATH -Xms424m -Xmx424m
> -Dlog.file=/opt/flink/log/jobmanager.log
> > -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
> > -Dlog4j.configuration=file:/opt/flink/conf/log4j.properties
> > org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> > 1> /opt/flink/log/jobmanager.out 2> /opt/flink/log/jobmanager.err
> >
> >
> > 尝试过通过job的方式提交,yaml如下:
> >
> >
> > apiVersion: batch/v1
> > kind: Job
> > metadata:
> >  name: boot-flink
> >  namespace: flink
> > spec:
> >  template:
> >    spec:
> >      serviceAccount: flink
> >      restartPolicy: OnFailure
> >      containers:
> >      - name: start
> >        image: flink:1.10.1
> >        workingDir: /opt/flink
> >        command: ["bash", "-c", "$FLINK_HOME/bin/kubernetes-session.sh \
> >          -Dkubernetes.cluster-id=roc \
> >          -Dkubernetes.jobmanager.service-account=flink \
> >          -Dtaskmanager.memory.process.size=1024m \
> >          -Dkubernetes.taskmanager.cpu=1 \
> >          -Dtaskmanager.numberOfTaskSlots=1 \
> >          -Dkubernetes.container.image=flink:1.10 \
> >          -Dkubernetes.namespace=flink"]
> >
> > 在1.15版本正常,在1.17版本,无法构建出对应的service,失败的现象不一样。
> >
> >
> >
> > 我的操作是否是有其他疏漏的地方?目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。
> >
> > Looking forward to your reply and help.
> >
> > Best
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:[email protected]
> > |
> >
> > 签名由 网易邮箱大师 定制
>

回复