Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-23 Thread Andrew Otto
Thanks all, I'm using other labels instead.  Specifically, I'm using the
component label to select the pods I need for my networkpolicies.

- I agree that it would probably be best if flink k8s native did not use
this label.

- It would be nice if there was a common label applied to all pods created
by flink and flink kubernetes operator.  I tried to bikeshed one but didn'
come up with anything great.  The app label as is doesn't work because it
appends the helm release name.  something like 'engine: flink'?  Not sure.

Anyway, thank you!


On Fri, Jan 20, 2023 at 2:46 AM Gyula Fóra  wrote:

> To clarify this logic is inherited from the Flink Native Kubernetes
> integration itself. The operator specific labels we use are already fully
> qualified.
> I agree that this could be improved in Flink by a better label.
>
> Cheers,
> Gyula
>
> On Thu, Jan 19, 2023 at 11:00 PM Mason Chen 
> wrote:
>
>> @Andrew I was also confused by this earlier and FYI this line where it is
>> referenced
>> https://github.com/apache/flink-kubernetes-operator/blame/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java#L43
>>
>> On Thu, Jan 19, 2023 at 1:59 PM Őrhidi Mátyás 
>> wrote:
>>
>>> On a side note, we should probably use a qualified label name instead of
>>> the pretty common app here. WDYT Gyula?
>>>
>>> On Thu, Jan 19, 2023 at 1:48 PM Gyula Fóra  wrote:
>>>
 Hi!

 The app label itself is used by Flink internally for a different
 purpose so it’s overriden. This is completely expected.

 I think it would be better to use some other label :)

 Cheers,
 Gyula

 On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:

> Hello!
>
> I'm seeing an unexpected label value assignment happening, and I'm not
> sure how it's happening.  It is possible it is in my own helm charts and
> templates somewhere, but I'm not seeing it, so I'm beginning to think this
> is happening in the FlinkDeployment CRD in the operator code somewhere.
>
> I'm using FlinkDeployment podTemplate
> 
> to add an 'app' label:
>
>  podTemplate:
> apiVersion: v1
> kind: Pod
> metadata:
>   labels:
> app: flink-app
> release: flink-example
> ...
>
> I also have this app label set in the FlinkDeployment labels:
>
> kind: FlinkDeployment
> metadata:
>   name: flink-app-flink-example
>   labels:
> app: flink-app
> chart: flink-app-0.1.1
> release: flink-example
>
> Since I've set app: flink-app in the podTemplate, I would expect all
> pods to get this label.  The FlinkDeployment resource has this label
> value as expected.  However, I see that in the pods, as well as the
> Deployment that are created by FlinkDeployment:
>
> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
> ...
> Name:   flink-app-flink-example
> Namespace:  flink-app0
> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
> Labels: app=flink-app-flink-example
> component=jobmanager
> ...
>
> Pod Template:
>   Labels:   app=flink-app-flink-example
> component=jobmanager
> release=flink-example
> ...
>
>
> *$ kubectl -n flink-app0 describe pod
> flink-app-flink-example-d974cb595-788ch*
> ...
> Labels:   app=flink-app-flink-example
>   component=jobmanager
>   pod-template-hash=d974cb595
>   release=flink-example
> ...
>
>
> I'd expect the app label to be 'flink-app' for at least the Deployment
> PodTemplate and the Pod, if not the Deployment itself too.
>
> Something is overriding the app label in podTemplate, and I don't
> think it's my chart or installation.  I looked in 
> flink-kubernetes-operator
> code and I didn't find where this was happening either.  I am not setting
> e.g. kubernetes.jobmanager.labels
> 
> .
>
> Is this expected?
>
> Thank you!
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>
>


Re: Job gets stuck when using kafka transactions and eventually crashes

2023-01-23 Thread Vishal Surana
Could it be that link is unable to commit offsets to Kafka? I know that blinks 
checkpoint mechanism isn’t tied to its ability to commit offset but at the same 
time, we’ve seen that the job can take hours to commit offsets while 
checkpoints go through successfully during that period. But with Kafka 
transactions enabled, the commit of offset is now required to happen.

Thanks,
Vishal
On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana , wrote:
> My job runs fine when running without kafka transactions. The source and sink 
> are kafka in my job with a couple of RocksDB based stateful operators taking 
> 100GB each.
>
> When I enable kafka transactions, things go well initially and we can see 
> high throughput as well. However, after a few hours, the job seems to get 
> stuck as it's unable to commit the transaction, due to which it's unable to 
> consume any more messages as we've enabled exactly once processing with 
> unaligned checkpoints. The number of hours it takes might vary but it always 
> happens and eventually the job crashes with this exception:
>
> ERROR org.apache.kafka.common.utils.KafkaThread : - Uncaught exception in 
> thread 'kafka-producer-network-thread | 
> producer-TRANSACTION_ID_PREFIX-1-17060':
> java.lang.OutOfMemoryError: Direct buffer memory\n\tat 
> java.nio.Bits.reserveMemory(Bits.java: 175)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
>  152)
> at 
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: 
> 58)
> at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 41)
> at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 430)
> at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 637)
> at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: 
> 593)
> at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 
> 327)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
> at java.lang.Thread.run(Thread.java: 829)
>
> What seems to be happening all of a sudden? Any suggestions on how to fix it?
>
> --
> Regards,
> Vishal