FW: Unable to achieve Flink kafka connector exactly once delivery semantics.

2023-10-27 Thread Gopal Chennupati (gchennup)
Hi,
Can someone please help me to resolve the below issue while running flink job.
Or provide me any doc/example which describe the exactly-once delivery 
guarantee semantics.

Thanks,
Gopal.

From: Gopal Chennupati (gchennup) 
Date: Friday, 27 October 2023 at 11:00 AM
To: commun...@flink.apache.org , 
u...@flink.apache.org 
Subject: Unable to achieve Flink kafka connector exactly once delivery 
semantics.
Hi Team,


I am trying to configure my kafka sink connector with “exactly-once” delivery 
guarantee, however it’s failing when I run the flink job with this 
configuration, here is the full exception stack trace from the job logs.


[Source: SG-SGT-TransformerJob -> Map -> Sink: Writer -> Sink: Committer 
(5/10)#12] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering 
AppInfo mbean

javax.management.InstanceAlreadyExistsException: 
kafka.producer:type=app-info,id=producer-sgt-4-1

  at 
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)

  at 
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

  at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:289)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:316)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:301)

  at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:55)

  at 
org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:332)

  at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionOfSubtask(TransactionAborter.java:104)

  at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionsWithPrefix(TransactionAborter.java:82)

  at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortLingeringTransactions(TransactionAborter.java:66)

  at 
org.apache.flink.connector.kafka.sink.KafkaWriter.abortLingeringTransactions(KafkaWriter.java:295)

  at 
org.apache.flink.connector.kafka.sink.KafkaWriter.(KafkaWriter.java:176)

  at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)

  at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)

  at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)

  at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)

  at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)

  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)

  at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)

  at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)

  at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)

  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)

  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)

  at java.base/java.lang.Thread.run(Thread.java:834)


And here is the producer configuration,
KafkaSink sink = KafkaSink
.builder()

.setBootstrapServers(producerConfig.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(producerConfig)
.setRecordSerializer(new 
GenericMessageSerialization<>(generic_key.class,
generic_value.class, 
producer

flink-kafka-connector 消费时获取不到topic-paitition

2022-11-25 Thread 朱文忠
kafka connector 开了这个配置, 'properties.allow.auto.create.topics' = 'true'
文档里面也有提到
, 但是开启flinkKafkaComsumer消费一个新的topic时,还是报找不到topic的错误,有大佬帮忙解释一下吗?
报错如下:
这是我的配置
kafka broker 也开启了自动创建topic的配置


Re: Utilizing Kafka headers in Flink Kafka connector

2022-10-13 Thread Shengkai Fang
hi.

You can use SQL API to parse or write the header in the Kafka record[1] if
you are using Flink SQL.

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata

Yaroslav Tkachenko  于2022年10月13日周四 02:21写道:

> Hi,
>
> You can implement a custom KafkaRecordDeserializationSchema (example
> https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
> and just avoid emitting the record if the header value matches what you
> need.
>
> On Wed, Oct 12, 2022 at 11:04 AM Great Info  wrote:
>
>> I have some flink applications that read streams from Kafka, now
>> the producer side code has introduced some additional information in Kafka
>> headers while producing records.
>> Now I need to change my consumer-side logic to process the records if the
>> header contains a specific value, if the header value is different than the
>> one I am looking I just need to move forward with the next steam.
>>
>> I got some sample reference code
>> but this logic needs
>> to deserialize and verify the header. Is there any simple way to ignore the
>> record before deserializing?
>>
>


Re: Utilizing Kafka headers in Flink Kafka connector

2022-10-12 Thread Yaroslav Tkachenko
Hi,

You can implement a custom KafkaRecordDeserializationSchema (example
https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
and just avoid emitting the record if the header value matches what you
need.

On Wed, Oct 12, 2022 at 11:04 AM Great Info  wrote:

> I have some flink applications that read streams from Kafka, now
> the producer side code has introduced some additional information in Kafka
> headers while producing records.
> Now I need to change my consumer-side logic to process the records if the
> header contains a specific value, if the header value is different than the
> one I am looking I just need to move forward with the next steam.
>
> I got some sample reference code
> but this logic needs to
> deserialize and verify the header. Is there any simple way to ignore the
> record before deserializing?
>


Utilizing Kafka headers in Flink Kafka connector

2022-10-12 Thread Great Info
I have some flink applications that read streams from Kafka, now
the producer side code has introduced some additional information in Kafka
headers while producing records.
Now I need to change my consumer-side logic to process the records if the
header contains a specific value, if the header value is different than the
one I am looking I just need to move forward with the next steam.

I got some sample reference code
but this logic needs to
deserialize and verify the header. Is there any simple way to ignore the
record before deserializing?


Re: (Possible) bug in flink-kafka-connector (metrics rewriting)

2022-08-02 Thread zhanghao.chen
Hi, I suggest you creating a ticket for it on 
https://issues.apache.org/jira/projects/FLINK/summary.
Flink - ASF JIRA<https://issues.apache.org/jira/projects/FLINK/summary>
Welcome to the Apache Flink project. Apache Flink is an open source platform 
for scalable batch and stream data processing.
issues.apache.org


Best,
Zhanghao Chen

From: Valentina Predtechenskaya 
Sent: Wednesday, August 3, 2022 1:32
To: user@flink.apache.org 
Subject: (Possible) bug in flink-kafka-connector (metrics rewriting)



Hello !


I would like to report a bug with metrics registration on KafkaProducer 
initialization.

Firstly we found the problem with our Flink cluster: metric 
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
near zero) on several subtasks, in the same time other subtasks was fine with 
this metric. Actual outgoing rate was the same on different subtasks, it was 
clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
on every subtask, problem 100% was with metric itself.


After long investigation we found the root-cause of this behavior:

  *   KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
initializes metric wrappers over existing KafkaProducer metrics (gauges)  - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330
  *   KafkaProducer itself in the constructor creates Sender to access brokers, 
starts a thread (kafka-producer-network-thread) and run Sender in this separate 
thread - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460
  *   After starting the Sender, metrics connected with topics and brokers 
register for some time. If they register quickly, KafkaWriter will see them 
before the end of initialization and these metrics will be wrapped as flink 
gauges. Otherwise, they will not.
  *   Some KafkaProducer metrics from producer and from broker has same names - 
for example, outgoing-byte-rate - 
https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
  *   In case if two metrics has same name, Flink KafkaWriter rewrites metric 
in wrapper - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360

I have debugged these libraries a lot and I'm sure in that behavior. If, for 
example, patch flink-kafka-connector with condition not to initialize metric if 
"producer-node-metrics".equals(metric.metricName().group()), our metrics all 
fine (outgoing-byte-rate is not 0).
Also, the bug does not reproduce if cluster is not very fast (for example, on 
local machine) and data from brokers comes only when all metrics initialized in 
KafkaWriter.

I suppose this is not an expected behavior, but even in the last version of 
flink-kafka-connector code is the same. Is there any treatement ? Maybe some 
workarounds ? To be honest, I don't really want to use my own patched version 
of connector.

Thank you !



“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”


(Possible) bug in flink-kafka-connector (metrics rewriting)

2022-08-02 Thread Valentina Predtechenskaya

Hello !


I would like to report a bug with metrics registration on KafkaProducer 
initialization.

Firstly we found the problem with our Flink cluster: metric 
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
near zero) on several subtasks, in the same time other subtasks was fine with 
this metric. Actual outgoing rate was the same on different subtasks, it was 
clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
on every subtask, problem 100% was with metric itself.


After long investigation we found the root-cause of this behavior:

  *   KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
initializes metric wrappers over existing KafkaProducer metrics (gauges)  - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330
  *   KafkaProducer itself in the constructor creates Sender to access brokers, 
starts a thread (kafka-producer-network-thread) and run Sender in this separate 
thread - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460
  *   After starting the Sender, metrics connected with topics and brokers 
register for some time. If they register quickly, KafkaWriter will see them 
before the end of initialization and these metrics will be wrapped as flink 
gauges. Otherwise, they will not.
  *   Some KafkaProducer metrics from producer and from broker has same names - 
for example, outgoing-byte-rate - 
https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
  *   In case if two metrics has same name, Flink KafkaWriter rewrites metric 
in wrapper - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360

I have debugged these libraries a lot and I'm sure in that behavior. If, for 
example, patch flink-kafka-connector with condition not to initialize metric if 
"producer-node-metrics".equals(metric.metricName().group()), our metrics all 
fine (outgoing-byte-rate is not 0).
Also, the bug does not reproduce if cluster is not very fast (for example, on 
local machine) and data from brokers comes only when all metrics initialized in 
KafkaWriter.

I suppose this is not an expected behavior, but even in the last version of 
flink-kafka-connector code is the same. Is there any treatement ? Maybe some 
workarounds ? To be honest, I don't really want to use my own patched version 
of connector.

Thank you !



"This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом."


Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Taimoor Bhatti
THANKSS...

This was it!! I did:-

CTRL+SHIFT+A and typed "Reload All Maven Projects"

Building the project didn't result in errors. I don't think I could've resolved 
this...

Thanks again Yun!!!

From: Yun Gao 
Sent: Monday, July 19, 2021 5:23 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error

Hi Taimoor,

It seems sometime IntelliJ does not works well for index, perhaps
you could choose mvn -> reimport project from the context menu,
if it still not work, perhaps you might try remove the .idea and .iml
file and re-open the project again.

Best,
Yun

--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 23:03
To:user@flink.apache.org ; Yun Gao 
Subject:Re: Apache Flink Kafka Connector not found Error

Hello Yun,
Many thanks for the reply...

For some reason I'm not able to import org.apache.flink.streaming.connectors 
within the IDE...

I get the following errors:

object connectors is not a member of package org.apache.flink.streaming
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


"mvn clean compile" works however... (Thanks...)

Do you know why IntelliJ doesn't see the import??

Best,
Taimoor


From: Yun Gao 
Sent: Monday, July 19, 2021 3:25 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error

Hi Taimoor,

I think it is right regarding the provided dependency and we need to use
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having.


My project is here: 
https://github.com/sysarcher/flink-scala-tests<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fsysarcher%2Fflink-scala-tests=04%7C01%7C%7C438129e5dd6c43e4f59308d94ac92007%7C84df9e7fe9f640afb435%7C1%7C0%7C637623049949903439%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=rlvS8vJQkgqV55%2FPReEEPZENnqh%2B6JeJ7jDgq%2FyFpDg%3D=0>

I want to I'm unable to use FlinkKafkaConsumer 
(link)<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fsysarcher%2Fflink-scala-tests%2Fblob%2Fmain%2Fsrc%2Fmain%2Fscala%2Fspendreport%2FFraudDetectionJob.scala%23L42=04%7C01%7C%7C438129e5dd6c43e4f59308d94ac92007%7C84df9e7fe9f640afb435%7C1%7C0%7C637623049949903439%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=yYdca%2FPa0fV7YwcmJZFJiLUW5motdY2JAYn20XaDpLs%3D=0>
 which I want to try out.

I'm using IntelliJ Idea. The project was generated from the tutorial on Flink's 
website<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Ftry-flink%2Fdatastream%2F%23how-to-follow-along=04%7C01%7C%7C438129e5dd6c43e4f59308d94ac92007%7C84df9e7fe9f640afb435%7C1%7C0%7C637623049949913395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=q0aWqThkCCHRX3RLlqbviKcmdSelV3gZLqvp32JK%2BkY%3D=0>

  *
The First problem seemed to be the provided scope as suggested here: 
https://stackoverflow.com/a/63667067/3760442<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fa%2F63667067%2F3760442=04%7C01%7C%7C438129e5dd6c43e4f59308d94ac92007%7C84df9e7fe9f640afb435%7C1%7C0%7C637623049949913395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=Jbs40RESUJvuJDmbGZ6mhIZF7ZTPwoU7S3I9IpMQLuQ%3D=0>
 ... Now, DataStream API (and the example) seem to work.
  *
The current problem is that I'm not able to use the Kafka connector which I'm 
looking to try out.

The following link was used to generate the project: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Ftry-flink%2Fdatastream%2F%23how-to-follow-along=04%7C01%7C%7C438129e5dd6c43e4f59308d94ac92007%7C84df9e7fe9f640afb435%7C1%7C0%7C637623049949923341%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=w7Zkp315lty0C

Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Yun Gao
Hi Taimoor,

It seems sometime IntelliJ does not works well for index, perhaps
you could choose mvn -> reimport project from the context menu,
if it still not work, perhaps you might try remove the .idea and .iml
file and re-open the project again.

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 23:03
To:user@flink.apache.org ; Yun Gao 
Subject:Re: Apache Flink Kafka Connector not found Error

 Hello Yun,
 Many thanks for the reply...

 For some reason I'm not able to import org.apache.flink.streaming.connectors 
within the IDE...

 I get the following errors:

 object connectors is not a member of package org.apache.flink.streaming
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


 "mvn clean compile" works however... (Thanks...)

 Do you know why IntelliJ doesn't see the import??

 Best, 
Taimoor

From: Yun Gao 
Sent: Monday, July 19, 2021 3:25 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error
Hi Taimoor,

I think it is right regarding the provided dependency and we need to use 
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having. 

My project is here:  https://github.com/sysarcher/flink-scala-tests
I want to I'm unable to use FlinkKafkaConsumer (link) which I want to try out.
I'm using IntelliJ Idea. The project was generated from the  tutorial on 
Flink's website
The First problem seemed to be the provided scope as suggested here:  
https://stackoverflow.com/a/63667067/3760442 ... Now, DataStream API (and the 
example) seem to work.
The current problem is that I'm not able to use the Kafka connector which I'm 
looking to try out.
The following link was used to generate the project:  
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along


 This same question was originally posted here:  
https://stackoverflow.com/q/68437215/3760442

Thanks in advance 



Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Taimoor Bhatti
Hello Yun,
Many thanks for the reply...

For some reason I'm not able to import org.apache.flink.streaming.connectors 
within the IDE...

I get the following errors:

object connectors is not a member of package org.apache.flink.streaming
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


"mvn clean compile" works however... (Thanks...)

Do you know why IntelliJ doesn't see the import??

Best,
Taimoor


From: Yun Gao 
Sent: Monday, July 19, 2021 3:25 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error

Hi Taimoor,

I think it is right regarding the provided dependency and we need to use
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having.


My project is here: 
https://github.com/sysarcher/flink-scala-tests<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fsysarcher%2Fflink-scala-tests=04%7C01%7C%7C0d8f727f0c8e42a94f0008d94ab8b04e%7C84df9e7fe9f640afb435%7C1%7C0%7C637622979351659599%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=vRK2%2FRqQqGKU39oqAABpFQ8%2BNS5pgaZ2iehWrVFW2QE%3D=0>

I want to I'm unable to use FlinkKafkaConsumer 
(link)<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fsysarcher%2Fflink-scala-tests%2Fblob%2Fmain%2Fsrc%2Fmain%2Fscala%2Fspendreport%2FFraudDetectionJob.scala%23L42=04%7C01%7C%7C0d8f727f0c8e42a94f0008d94ab8b04e%7C84df9e7fe9f640afb435%7C1%7C0%7C637622979351659599%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=kiwVPBqcmxvqy9NsK%2F3PNkotaFP1pKn2QUWxn%2BVHb38%3D=0>
 which I want to try out.

I'm using IntelliJ Idea. The project was generated from the tutorial on Flink's 
website<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Ftry-flink%2Fdatastream%2F%23how-to-follow-along=04%7C01%7C%7C0d8f727f0c8e42a94f0008d94ab8b04e%7C84df9e7fe9f640afb435%7C1%7C0%7C637622979351669554%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=NrnA65zYyW87G2tNrXAHl0%2F28j%2FMlkB05PKYWB29kY8%3D=0>

  *   The First problem seemed to be the provided scope as suggested here: 
https://stackoverflow.com/a/63667067/3760442<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fa%2F63667067%2F3760442=04%7C01%7C%7C0d8f727f0c8e42a94f0008d94ab8b04e%7C84df9e7fe9f640afb435%7C1%7C0%7C637622979351669554%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=R7NUhLiSAMn2gUUqTewKKFxoukIfvSAhCAwtEaCEJj0%3D=0>
 ... Now, DataStream API (and the example) seem to work.
  *   The current problem is that I'm not able to use the Kafka connector which 
I'm looking to try out.

The following link was used to generate the project: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Ftry-flink%2Fdatastream%2F%23how-to-follow-along=04%7C01%7C%7C0d8f727f0c8e42a94f0008d94ab8b04e%7C84df9e7fe9f640afb435%7C1%7C0%7C637622979351679514%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=stA63tMKQDzJEy5oS6BUpaQwLu%2Fvr78yK0y8TwYR5gY%3D=0>


This same question was originally posted here: 
https://stackoverflow.com/q/68437215/3760442<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fq%2F68437215%2F3760442=04%7C01%7C%7C0d8f727f0c8e42a94f0008d94ab8b04e%7C84df9e7fe9f640afb435%7C1%7C0%7C637622979351679514%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=k4mrQ5NCv1%2BCOs7xycqtaBOH%2Ff57vSB39%2BeGibzT3nQ%3D=0>

Thanks in advance


Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Yun Gao
Hi Taimoor,

I think it is right regarding the provided dependency and we need to use 
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun



--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

   I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having. 

My project is here:  https://github.com/sysarcher/flink-scala-tests
I want to I'm unable to use  FlinkKafkaConsumer (link) which I want to try out.
I'm using IntelliJ Idea. The project was generated from the  tutorial on 
Flink's website 
The First problem seemed to be the provided scope as suggested here:  
https://stackoverflow.com/a/63667067/3760442 ... Now, DataStream API (and the 
example) seem to work.
The current problem is that I'm not able to use the Kafka connector which I'm 
looking to try out.
The following link was used to generate the project:  
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along
 


 This same question was originally posted here:  
https://stackoverflow.com/q/68437215/3760442

Thanks in advance 


Apache Flink Kafka Connector not found Error

2021-07-19 Thread Taimoor Bhatti
I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having.


My project is here: https://github.com/sysarcher/flink-scala-tests

I want to I'm unable to use FlinkKafkaConsumer 
(link)
 which I want to try out.

I'm using IntelliJ Idea. The project was generated from the tutorial on Flink's 
website

  *   The First problem seemed to be the provided scope as suggested here: 
https://stackoverflow.com/a/63667067/3760442 ... Now, DataStream API (and the 
example) seem to work.
  *   The current problem is that I'm not able to use the Kafka connector which 
I'm looking to try out.

The following link was used to generate the project: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along


This same question was originally posted here: 
https://stackoverflow.com/q/68437215/3760442

Thanks in advance


Re: Flink-kafka-connector Consumer配置警告

2021-04-20 Thread 范 佳兴
flink.partition-discovery.interval-millis这个配置在Flink中是生效的,flink kafka connectors 
会根据配置的时间去获取kafka topic的分区信息,代码实现见: FlinkKafkaConsumerBase 
中的createAndStartDiscoveryLoop方法。

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

这个WARN是kafka报出来的,意思是说kafka收到了提供这个参数,但是kafka并不认识。
这个参数并不是给kafka用的,只不过在获取kafka分区的时候需要创建一个KafkaConsumer实例,把设置的参数也一并传给了Kafka。
对应的Warn位置为KafkaConsumer构造函数里面调用的config.logUnused()方法。


在 2021/4/18 下午7:45,“lp”<973182...@qq.com> 写入:

flink1.12正常程序中,有如下告警:

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig 
   
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

我有一行如下配置:

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);



根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
By default, partition discovery is disabled. To enable it, set a
non-negative value for flink.partition-discovery.interval-millis in the
provided properties config, representing the discovery interval in
milliseconds.


上述配置应该是合法的,但是为何会报如此警告呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/




回复: Flink-kafka-connector Consumer配置警告

2021-04-19 Thread 飞翔
你可以看下源码:



这个props只是作为FlinkKafkaConsumer初始化配置变量,只是这个props 
不仅仅是用来初始化kafka的,只不过这个props最后整个扔进kafka消费客户端的初始化里面而已,不会有任何影响。
就想你自己初始化一个kafka 消费端,你往props塞进其他参数,也会警告,但没有任何影响。


--原始邮件--
发件人:
"user-zh"   
 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
 By default, partition discovery is disabled. To enable it, set a
 non-negative value for flink.partition-discovery.interval-millis in the
 provided properties config, representing the discovery interval in
 milliseconds.
 
 
 上述配置应该是合法的,但是为何会报如此警告呢?
 
 
 
 --
 Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink-kafka-connector Consumer配置警告

2021-04-19 Thread Paul Lam
这个是 Kafka client 的警告。这个配置项是 Flink 加进去的,Kafka 不认识。

Best,
Paul Lam

> 2021年4月18日 19:45,lp <973182...@qq.com> 写道:
> 
> flink1.12正常程序中,有如下告警:
> 
> 19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig   
>  
> [] - The configuration 'flink.partition-discovery.interval-millis' was
> supplied but isn't a known config.
> 
> 我有一行如下配置:
> properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);
> 
> 
> 根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
> By default, partition discovery is disabled. To enable it, set a
> non-negative value for flink.partition-discovery.interval-millis in the
> provided properties config, representing the discovery interval in
> milliseconds.
> 
> 
> 上述配置应该是合法的,但是为何会报如此警告呢?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Flink-kafka-connector Consumer配置警告

2021-04-18 Thread lp
flink1.12正常程序中,有如下告警:

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

我有一行如下配置:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);


根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
By default, partition discovery is disabled. To enable it, set a
non-negative value for flink.partition-discovery.interval-millis in the
provided properties config, representing the discovery interval in
milliseconds.


上述配置应该是合法的,但是为何会报如此警告呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-kafka-connector Producer.setWriteTimestampToKafka(true) 导致的问题

2021-04-16 Thread lp
我使用flink1.12版本,采用flink-kafka-connector从kafka的topicA中读取数据,然后sink会topicB,在sink
to topicB的FlinkProducer设置如下时,程序会偶现报错,去掉后异常消失,请问是什么原因呢?


flinkKafkaProducer.setWriteTimestampToKafka(true);




--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink kafka connector 偶现报错 Permission denied: connect

2021-04-07 Thread lp
我写了一个flink kafka connector的作业,从kafka topicA消费数据,做处理后,又sink回 topicB,
程序正常running中,偶现如下报错:


java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:321)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
10:23:15,951 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-5, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-5] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:321)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
10:23:15,953 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-2, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-2] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClien

Re: Flink Kafka connector in Python

2020-07-14 Thread Xingbo Huang
t;> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
>>>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>at 
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>at 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>at java.lang.reflect.Method.invoke(Method.java:498)
>>>>at 
>>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>>at 
>>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>>at 
>>>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>>at 
>>>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>>at 
>>>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>at 
>>>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>>at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.table.api.ValidationException: Could not find 
>>>> the required schema in property 'schema'.
>>>>at 
>>>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
>>>>at 
>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
>>>>at 
>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
>>>>at 
>>>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>>>>at 
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
>>>>... 13 more
>>>>
>>>>
>>>> During handling of the above exception, another exception occurred:
>>>>
>>>> Traceback (most recent call last):
>>>>   File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in 
>>>> 
>>>> ).register_table_source(INPUT_TABLE)
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
>>>>  line 1295, in register_table_source
>>>> self._j_connect_table_descriptor.registerTableSource(name)
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>>>  line 1286, in __call__
>>>> answer, self.gateway_client, self.target_id, self.name)
>>>>   File 
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>>>  line 154, in deco
>>>> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>>>> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
>>>>
>>>>
>>>> The relevant part seems to be *Caused by: 
>>>> org.apache.flink.table.api.ValidationException: Could not find the 
>>>> required schema in property 'schema'.*
>>>>
>>>> This is probably a basic error, but I can't figure out how I can know 
>>>> what's wrong with the schema. Is the schema not properly declared? Is some 
>>>> field missing?
>>>>
>>>> FWIW I have included the JSON and kafka connector JARs in the required 
>>>> location.
>>>>
>>>>
>>>> Regards,
>>>> Manas
>>>>
>>>>
>>>> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale 
>>>> wrote:
>>>>
>>>>> Hi Xingbo,
>>>>> Thank you for the information, it certainly helps!
>>>>>
>>>>> Regards,
>>>>> Manas
>>>>>
>>>>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang 
>>>>> wrote:
>>>>>
>>>>>> Hi Manas,
>>>>>>
>>>>>> Since Flink 1.9, the entire architecture of PyFlink has been
>>>>>> redesigned. So the method described in the link won't work.
>>>>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>>>>>> data. Besides, You can refer to the common questions about PyFlink[3]
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>> [3]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>>>>>
>>>>>> Best,
>>>>>> Xingbo
>>>>>>
>>>>>> Manas Kale  于2020年6月29日周一 下午8:10写道:
>>>>>>
>>>>>>> Hi,
>>>>>>> I want to consume and write to Kafak from Flink's python API.
>>>>>>>
>>>>>>> The only way I found to do this was through this
>>>>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>>>>>  question
>>>>>>> on SO where the user essentially copies FlinkKafka connector JARs into 
>>>>>>> the
>>>>>>> Flink runtime's lib/ directory.
>>>>>>>
>>>>>>>- Is this the recommended method to do this? If not, what is?
>>>>>>>- Is there any official documentation for using Kafka
>>>>>>>with pyFlink? Is this officially supported?
>>>>>>>- How does the method described in the link work? Does the Flink
>>>>>>>runtime load and expose all JARs in /lib to the python script? Can I 
>>>>>>> write
>>>>>>>custom operators in Java and use those through python?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Manas
>>>>>>>
>>>>>>


Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Dream-底限
好的

On Tue, Jul 7, 2020 at 5:30 PM Leonard Xu  wrote:

> 嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月7日,17:26,Dream-底限  写道:
> >
> > hi
> > 是的,想以下面这种方式获取
> >
> > CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> > ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> >
> >
> > On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> >
> >> Hi,
> >> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> >> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >>>
> >>
> >>> 在 2020年7月7日,17:12,Dream-底限  写道:
> >>>
> >>> kafka元数据
> >>
> >>
>
>


Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Leonard Xu
嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。

Best,
Leonard Xu

> 在 2020年7月7日,17:26,Dream-底限  写道:
> 
> hi
> 是的,想以下面这种方式获取
> 
> CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> 
> 
> On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> 
>> Hi,
>> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
>> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>> 
>> 祝好,
>> Leonard Xu
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
>>> 
>> 
>>> 在 2020年7月7日,17:12,Dream-底限  写道:
>>> 
>>> kafka元数据
>> 
>> 



Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Dream-底限
hi
是的,想以下面这种方式获取

CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...)


On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:

> Hi,
>  kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>
> 祝好,
> Leonard Xu
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >
>
> > 在 2020年7月7日,17:12,Dream-底限  写道:
> >
> > kafka元数据
>
>


Re: flink kafka connector中获取kafka元数据

2020-07-07 Thread Leonard Xu
Hi,
 kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。

祝好,
Leonard Xu

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
 


> 在 2020年7月7日,17:12,Dream-底限  写道:
> 
> kafka元数据



flink kafka connector中获取kafka元数据

2020-07-07 Thread Dream-底限
hi、
flink table/sql api中,有办法获取kafka元数据吗?

tableEnvironment.sqlUpdate(CREATE TABLE MyUserTable (...) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...))


Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
38)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.table.api.ValidationException: Could not find 
>>> the required schema in property 'schema'.
>>> at 
>>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
>>> at 
>>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>>> at 
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
>>> ... 13 more
>>>
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in 
>>> 
>>> ).register_table_source(INPUT_TABLE)
>>>   File 
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
>>>  line 1295, in register_table_source
>>> self._j_connect_table_descriptor.registerTableSource(name)
>>>   File 
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>>  line 1286, in __call__
>>> answer, self.gateway_client, self.target_id, self.name)
>>>   File 
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>>  line 154, in deco
>>> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>>> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
>>>
>>>
>>> The relevant part seems to be *Caused by: 
>>> org.apache.flink.table.api.ValidationException: Could not find the required 
>>> schema in property 'schema'.*
>>>
>>> This is probably a basic error, but I can't figure out how I can know 
>>> what's wrong with the schema. Is the schema not properly declared? Is some 
>>> field missing?
>>>
>>> FWIW I have included the JSON and kafka connector JARs in the required 
>>> location.
>>>
>>>
>>> Regards,
>>> Manas
>>>
>>>
>>> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale 
>>> wrote:
>>>
>>>> Hi Xingbo,
>>>> Thank you for the information, it certainly helps!
>>>>
>>>> Regards,
>>>> Manas
>>>>
>>>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang 
>>>> wrote:
>>>>
>>>>> Hi Manas,
>>>>>
>>>>> Since Flink 1.9, the entire architecture of PyFlink has been
>>>>> redesigned. So the method described in the link won't work.
>>>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>>>>> data. Besides, You can refer to the common questions about PyFlink[3]
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>>>>
>>>>> Best,
>>>>> Xingbo
>>>>>
>>>>> Manas Kale  于2020年6月29日周一 下午8:10写道:
>>>>>
>>>>>> Hi,
>>>>>> I want to consume and write to Kafak from Flink's python API.
>>>>>>
>>>>>> The only way I found to do this was through this
>>>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>>>>  question
>>>>>> on SO where the user essentially copies FlinkKafka connector JARs into 
>>>>>> the
>>>>>> Flink runtime's lib/ directory.
>>>>>>
>>>>>>- Is this the recommended method to do this? If not, what is?
>>>>>>- Is there any official documentation for using Kafka
>>>>>>with pyFlink? Is this officially supported?
>>>>>>- How does the method described in the link work? Does the Flink
>>>>>>runtime load and expose all JARs in /lib to the python script? Can I 
>>>>>> write
>>>>>>custom operators in Java and use those through python?
>>>>>>
>>>>>> Thanks,
>>>>>> Manas
>>>>>>
>>>>>


Re: Flink Kafka connector in Python

2020-07-03 Thread Manas Kale
gt; .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>>"{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )).register_table_sink(OUTPUT_TABLE)
>>
>> t_env.from_path(INPUT_TABLE) \
>> .insert_into(OUTPUT_TABLE)
>>
>> t_env.execute('IU pyflink job')
>>
>> *However, I am getting the following exception : *
>>
>> Traceback (most recent call last):
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>  line 147, in deco
>> return f(*a, **kw)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>  line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling 
>> o32.registerTableSource.
>> : org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
>>  at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>  at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
>>  at 
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>  at 
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>  at 
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>  at 
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>  at 
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>  at 
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find 
>> the required schema in property 'schema'.
>>  at 
>> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
>>  at 
>> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>>  at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
>>  ... 13 more
>>
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in 
>> 
>> ).register_table_source(INPUT_TABLE)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
>>  line 1295, in register_table_source
>> self._j_connect_table_descriptor.registerTableSource(name)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>  line 1286, in __call__
>> answer, self.gateway_client, self.target_id, self.name)
>>   File 
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>  line 154, in deco
>> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>> pyflink.util.exceptions.TableExcepti

Re: Flink Kafka connector in Python

2020-07-02 Thread Xingbo Huang
t; org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find the 
> required schema in property 'schema'.
>   at 
> org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
>   at 
> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>   at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
>   ... 13 more
>
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>   File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in 
> 
> ).register_table_source(INPUT_TABLE)
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
>  line 1295, in register_table_source
> self._j_connect_table_descriptor.registerTableSource(name)
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
>
>
> The relevant part seems to be *Caused by: 
> org.apache.flink.table.api.ValidationException: Could not find the required 
> schema in property 'schema'.*
>
> This is probably a basic error, but I can't figure out how I can know what's 
> wrong with the schema. Is the schema not properly declared? Is some field 
> missing?
>
> FWIW I have included the JSON and kafka connector JARs in the required 
> location.
>
>
> Regards,
> Manas
>
>
> On Tue, Jun 30, 2020 at 11:58 AM Manas Kale  wrote:
>
>> Hi Xingbo,
>> Thank you for the information, it certainly helps!
>>
>> Regards,
>> Manas
>>
>> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang  wrote:
>>
>>> Hi Manas,
>>>
>>> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
>>> So the method described in the link won't work.
>>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>>> data. Besides, You can refer to the common questions about PyFlink[3]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale  于2020年6月29日周一 下午8:10写道:
>>>
>>>> Hi,
>>>> I want to consume and write to Kafak from Flink's python API.
>>>>
>>>> The only way I found to do this was through this
>>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>>  question
>>>> on SO where the user essentially copies FlinkKafka connector JARs into the
>>>> Flink runtime's lib/ directory.
>>>>
>>>>- Is this the recommended method to do this? If not, what is?
>>>>- Is there any official documentation for using Kafka with pyFlink?
>>>>Is this officially supported?
>>>>- How does the method described in the link work? Does the Flink
>>>>runtime load and expose all JARs in /lib to the python script? Can I 
>>>> write
>>>>custom operators in Java and use those through python?
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>


Re: Flink Kafka connector in Python

2020-07-02 Thread Manas Kale
ce/Flink_POC/pyflink/main.py", line 46,
in 
).register_table_source(INPUT_TABLE)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py",
line 1295, in register_table_source
self._j_connect_table_descriptor.registerTableSource(name)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'


The relevant part seems to be *Caused by:
org.apache.flink.table.api.ValidationException: Could not find the
required schema in property 'schema'.*

This is probably a basic error, but I can't figure out how I can know
what's wrong with the schema. Is the schema not properly declared? Is
some field missing?

FWIW I have included the JSON and kafka connector JARs in the required location.


Regards,
Manas


On Tue, Jun 30, 2020 at 11:58 AM Manas Kale  wrote:

> Hi Xingbo,
> Thank you for the information, it certainly helps!
>
> Regards,
> Manas
>
> On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang  wrote:
>
>> Hi Manas,
>>
>> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
>> So the method described in the link won't work.
>> But you can use more convenient DDL[1] or descriptor[2] to read kafka
>> data. Besides, You can refer to the common questions about PyFlink[3]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年6月29日周一 下午8:10写道:
>>
>>> Hi,
>>> I want to consume and write to Kafak from Flink's python API.
>>>
>>> The only way I found to do this was through this
>>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>>  question
>>> on SO where the user essentially copies FlinkKafka connector JARs into the
>>> Flink runtime's lib/ directory.
>>>
>>>- Is this the recommended method to do this? If not, what is?
>>>- Is there any official documentation for using Kafka with pyFlink?
>>>Is this officially supported?
>>>- How does the method described in the link work? Does the Flink
>>>runtime load and expose all JARs in /lib to the python script? Can I 
>>> write
>>>custom operators in Java and use those through python?
>>>
>>> Thanks,
>>> Manas
>>>
>>


Re: Flink Kafka connector in Python

2020-06-30 Thread Manas Kale
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang  wrote:

> Hi Manas,
>
> Since Flink 1.9, the entire architecture of PyFlink has been redesigned.
> So the method described in the link won't work.
> But you can use more convenient DDL[1] or descriptor[2] to read kafka
> data. Besides, You can refer to the common questions about PyFlink[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html
>
> Best,
> Xingbo
>
> Manas Kale  于2020年6月29日周一 下午8:10写道:
>
>> Hi,
>> I want to consume and write to Kafak from Flink's python API.
>>
>> The only way I found to do this was through this
>> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>>  question
>> on SO where the user essentially copies FlinkKafka connector JARs into the
>> Flink runtime's lib/ directory.
>>
>>- Is this the recommended method to do this? If not, what is?
>>- Is there any official documentation for using Kafka with pyFlink?
>>Is this officially supported?
>>- How does the method described in the link work? Does the Flink
>>runtime load and expose all JARs in /lib to the python script? Can I write
>>custom operators in Java and use those through python?
>>
>> Thanks,
>> Manas
>>
>


Re: Flink Kafka connector in Python

2020-06-29 Thread Xingbo Huang
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So
the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data.
Besides, You can refer to the common questions about PyFlink[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale  于2020年6月29日周一 下午8:10写道:

> Hi,
> I want to consume and write to Kafak from Flink's python API.
>
> The only way I found to do this was through this
> <https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
>  question
> on SO where the user essentially copies FlinkKafka connector JARs into the
> Flink runtime's lib/ directory.
>
>- Is this the recommended method to do this? If not, what is?
>- Is there any official documentation for using Kafka with pyFlink? Is
>this officially supported?
>- How does the method described in the link work? Does the Flink
>runtime load and expose all JARs in /lib to the python script? Can I write
>custom operators in Java and use those through python?
>
> Thanks,
> Manas
>


Flink Kafka connector in Python

2020-06-29 Thread Manas Kale
Hi,
I want to consume and write to Kafak from Flink's python API.

The only way I found to do this was through this
<https://stackoverflow.com/questions/52744277/apache-flink-kafka-connector-in-python-streaming-api-cannot-load-user-class>
question
on SO where the user essentially copies FlinkKafka connector JARs into the
Flink runtime's lib/ directory.

   - Is this the recommended method to do this? If not, what is?
   - Is there any official documentation for using Kafka with pyFlink? Is
   this officially supported?
   - How does the method described in the link work? Does the Flink runtime
   load and expose all JARs in /lib to the python script? Can I write custom
   operators in Java and use those through python?

Thanks,
Manas


Re: Flink Kafka Connector Source Parallelism

2020-05-29 Thread Robert Metzger
Hi Mason,
your understanding is correct.

On Thu, May 28, 2020 at 8:23 AM Chen, Mason  wrote:

> I think I may have just answered my own question. There’s only one Kafka
> partition, so the maximum parallelism is one and it doesn’t really make
> sense to make another kafka consumer under the same group id. What threw me
> off is that there’s a 2nd subtask for the kafka source created even
> though it’s not actually doing anything. So, it seems a general statement
> can be made that (# kafka partitions) >= (# parallelism of flink kafka
> source)…well I guess you could have more parallelism than kafka partitions,
> but the extra subtasks will not doing anything.
>
>
>
> *From: *"Chen, Mason" 
> *Date: *Wednesday, May 27, 2020 at 11:09 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Flink Kafka Connector Source Parallelism
>
>
>
> Hi all,
>
>
>
> I’m currently trying to understand Flink’s Kafka Connector and how
> parallelism affects it. So, I am running the flink playground click count
> job and the parallelism is set to 2 by default.
>
>
> However, I don’t see the 2nd subtask of the Kafka Connector sending any 
> records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
> kafka?
>
> ```
> clicks = clicks
>.keyBy(ClickEvent::getPage)
>.map(*new *BackpressureMap())
>.name(*"Backpressure"*);
> ```
>
>
>
> `clicks` is the kafka click stream. From my reading in the operator docs,
> it seems counterintuitive to do a `rebalance()` when I am already doing a
> `keyBy()`.
>
> So, my questions:
>
> 1. How do I make use of the 2nd subtask?
>
> 2. Does the number of partitions have some sort of correspondence with the
> parallelism of the source operator? If so, is there a general statement to
> be made about parallelism across all source operators?
>
>
>
> Thanks,
>
> Mason
>


Re: Flink Kafka Connector Source Parallelism

2020-05-28 Thread Chen, Mason
I think I may have just answered my own question. There’s only one Kafka 
partition, so the maximum parallelism is one and it doesn’t really make sense 
to make another kafka consumer under the same group id. What threw me off is 
that there’s a 2nd subtask for the kafka source created even though it’s not 
actually doing anything. So, it seems a general statement can be made that (# 
kafka partitions) >= (# parallelism of flink kafka source)…well I guess you 
could have more parallelism than kafka partitions, but the extra subtasks will 
not doing anything.

From: "Chen, Mason" 
Date: Wednesday, May 27, 2020 at 11:09 PM
To: "user@flink.apache.org" 
Subject: Flink Kafka Connector Source Parallelism

Hi all,

I’m currently trying to understand Flink’s Kafka Connector and how parallelism 
affects it. So, I am running the flink playground click count job and the 
parallelism is set to 2 by default.



However, I don’t see the 2nd subtask of the Kafka Connector sending any 
records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
kafka?

```
clicks = clicks
   .keyBy(ClickEvent::getPage)
   .map(new BackpressureMap())
   .name("Backpressure");
```

`clicks` is the kafka click stream. From my reading in the operator docs, it 
seems counterintuitive to do a `rebalance()` when I am already doing a 
`keyBy()`.

So, my questions:

1. How do I make use of the 2nd subtask?
2. Does the number of partitions have some sort of correspondence with the 
parallelism of the source operator? If so, is there a general statement to be 
made about parallelism across all source operators?

Thanks,
Mason


Flink Kafka Connector Source Parallelism

2020-05-28 Thread Chen, Mason
Hi all,

I’m currently trying to understand Flink’s Kafka Connector and how parallelism 
affects it. So, I am running the flink playground click count job and the 
parallelism is set to 2 by default.


However, I don’t see the 2nd subtask of the Kafka Connector sending any 
records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
kafka?

```
clicks = clicks
   .keyBy(ClickEvent::getPage)
   .map(new BackpressureMap())
   .name("Backpressure");
```

`clicks` is the kafka click stream. From my reading in the operator docs, it 
seems counterintuitive to do a `rebalance()` when I am already doing a 
`keyBy()`.

So, my questions:

1. How do I make use of the 2nd subtask?
2. Does the number of partitions have some sort of correspondence with the 
parallelism of the source operator? If so, is there a general statement to be 
made about parallelism across all source operators?

Thanks,
Mason


Re:Re: Error handler strategy in Flink Kafka connector with json format

2020-03-08 Thread sunfulin
yep. Glad to see the progress. 
Best







At 2020-03-09 12:44:05, "Jingsong Li"  wrote:

Hi Sunfulin,


I think this is very important too. 

There is an issue to fix this[1]. Is that meet your requirement?


[1] https://issues.apache.org/jira/browse/FLINK-15396


Best,
Jingsong Lee


On Mon, Mar 9, 2020 at 12:33 PM sunfulin  wrote:

hi , community, 
I am wondering if there is some config params with error handler strategy as 
[1] refers when defining a Kafka stream table using Flink SQL DDL. For example, 
 the following `json.parser.failure.strategy'  can be set to `silencly skip` 
that can skip the malformed dirty data process while consuming kafka records.


create table xxx (
 ... 
) with (
'connector.type' = 'kafka',
'format.type' = 'json',
'json.parser.failure.strategy' = 'silencly skip'
)  
[1] 
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/




 





--

Best, Jingsong Lee

Re: Error handler strategy in Flink Kafka connector with json format

2020-03-08 Thread Jingsong Li
Hi Sunfulin,

I think this is very important too.
There is an issue to fix this[1]. Is that meet your requirement?

[1] https://issues.apache.org/jira/browse/FLINK-15396

Best,
Jingsong Lee

On Mon, Mar 9, 2020 at 12:33 PM sunfulin  wrote:

> hi , community,
> I am wondering if there is some config params with error handler strategy
> as [1] refers when defining a Kafka stream table using Flink SQL DDL. For
> example,  the following `json.parser.failure.strategy'  can be set to
> `silencly skip` that can skip the malformed dirty data process while
> consuming kafka records.
>
> create table xxx (
>  ...
> ) with (
> 'connector.type' = 'kafka',
> 'format.type' = 'json',
> 'json.parser.failure.strategy' = 'silencly skip'
> )
> [1]
> https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
>
>
>
>


-- 
Best, Jingsong Lee


Error handler strategy in Flink Kafka connector with json format

2020-03-08 Thread sunfulin
hi , community, 
I am wondering if there is some config params with error handler strategy as 
[1] refers when defining a Kafka stream table using Flink SQL DDL. For example, 
 the following `json.parser.failure.strategy'  can be set to `silencly skip` 
that can skip the malformed dirty data process while consuming kafka records.


create table xxx (
 ... 
) with (
'connector.type' = 'kafka',
'format.type' = 'json',
'json.parser.failure.strategy' = 'silencly skip'
)  
[1] 
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

Re: Flink Kafka connector consume from a single kafka partition

2020-02-21 Thread Robert Metzger
Hey Hemant,

Are you able to reconstruct the ordering of the event, for example based on
time or some sequence number?
If so, you could create as many Kafka partitions as you need (for proper
load distribution), disregarding any ordering at that point.
Then you keyBy your stream in Flink, and order it within a window operator
(or some custom logic in a process function)
Flink is able to handle quite large states using the RocksDB statebackend.

Best,
Robert


On Wed, Feb 19, 2020 at 6:34 PM hemant singh  wrote:

> Hi Arvid,
>
> Thanks for your response. I think I did not word my question properly.
> I wanted to confirm that if the data is distributed to more than one
> partition then the ordering cannot be maintained (which is documented).
> According to your response I understand if I set the parallelism to number
> of partition then each consumer will consume from one partition and
> ordering can be maintained.
>
> However, I have a question here in case my parallelism is less than number
> of partitions still I believe if I create keyedstream ordering will be
> maintained at operator level for that key. Correct me if I am wrong.
>
> Second, one issue/challenge which I see with this model is one of the
> source's frequency of pushing data is very high then one partition is
> overloaded. Hence the task which process this will be overloaded too,
> however for maintaining ordering I do not have any other options but to
> maintain data in one partition.
>
> Thanks,
> Hemant
>
> On Wed, Feb 19, 2020 at 5:54 PM Arvid Heise  wrote:
>
>> Hi Hemant,
>>
>> Flink passes your configurations to the Kafka consumer, so you could
>> check if you can subscribe to only one partition there.
>>
>> However, I would discourage that approach. I don't see the benefit to
>> just subscribing to the topic entirely and have dedicated processing for
>> the different devices.
>>
>> If you are concerned about the order, you shouldn't. Since all events of
>> a specific device-id reside in the same source partition, events are
>> in-order in Kafka (responsibility of producer, but I'm assuming that
>> because of your mail) and thus they are also in order in non-keyed streams
>> in Flink. Any keyBy on device-id or composite key involving device-id,
>> would also retain the order.
>>
>> If you have exactly one partition per device-id, you could even go with
>> `DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling.
>>
>> Let me know if I misunderstood your use case or if you have further
>> questions.
>>
>> Best,
>>
>> Arvid
>>
>> On Wed, Feb 19, 2020 at 8:39 AM hemant singh 
>> wrote:
>>
>>> Hello Flink Users,
>>>
>>> I have a use case where I am processing metrics from different type of
>>> sources(one source will have multiple devices) and for aggregations as well
>>> as build alerts order of messages is important. To maintain customer data
>>> segregation I plan to have single topic for each customer with each source
>>> stream data to one kafka partition.
>>> To maintain ordering I am planning to push data for a single source type
>>> to single partitions. Then I can create keyedstream so that each of the
>>> device-id I have a single stream which has ordered data for each device-id.
>>>
>>> However, flink-kafka consumer I don't see that I can read from a
>>> specific partition hence flink consumer read from multiple kafka
>>> partitions. So even if I try to create a keyedstream on source type(and
>>> then write to a partition for further processing like keyedstream on
>>> device-id) I think ordering will not be maintained per source type.
>>>
>>> Only other option I feel I am left with is have single partition for the
>>> topic so that flink can subscribe to the topic and this maintains the
>>> ordering, the challenge is too many topics(as I have this configuration for
>>> multiple customers) which is not advisable for a kafka cluster.
>>>
>>> Can anyone shed some light on how to handle this use case.
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: Flink Kafka connector consume from a single kafka partition

2020-02-19 Thread hemant singh
Hi Arvid,

Thanks for your response. I think I did not word my question properly.
I wanted to confirm that if the data is distributed to more than one
partition then the ordering cannot be maintained (which is documented).
According to your response I understand if I set the parallelism to number
of partition then each consumer will consume from one partition and
ordering can be maintained.

However, I have a question here in case my parallelism is less than number
of partitions still I believe if I create keyedstream ordering will be
maintained at operator level for that key. Correct me if I am wrong.

Second, one issue/challenge which I see with this model is one of the
source's frequency of pushing data is very high then one partition is
overloaded. Hence the task which process this will be overloaded too,
however for maintaining ordering I do not have any other options but to
maintain data in one partition.

Thanks,
Hemant

On Wed, Feb 19, 2020 at 5:54 PM Arvid Heise  wrote:

> Hi Hemant,
>
> Flink passes your configurations to the Kafka consumer, so you could check
> if you can subscribe to only one partition there.
>
> However, I would discourage that approach. I don't see the benefit to just
> subscribing to the topic entirely and have dedicated processing for the
> different devices.
>
> If you are concerned about the order, you shouldn't. Since all events of a
> specific device-id reside in the same source partition, events are in-order
> in Kafka (responsibility of producer, but I'm assuming that because of your
> mail) and thus they are also in order in non-keyed streams in Flink. Any
> keyBy on device-id or composite key involving device-id, would also retain
> the order.
>
> If you have exactly one partition per device-id, you could even go with
> `DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling.
>
> Let me know if I misunderstood your use case or if you have further
> questions.
>
> Best,
>
> Arvid
>
> On Wed, Feb 19, 2020 at 8:39 AM hemant singh  wrote:
>
>> Hello Flink Users,
>>
>> I have a use case where I am processing metrics from different type of
>> sources(one source will have multiple devices) and for aggregations as well
>> as build alerts order of messages is important. To maintain customer data
>> segregation I plan to have single topic for each customer with each source
>> stream data to one kafka partition.
>> To maintain ordering I am planning to push data for a single source type
>> to single partitions. Then I can create keyedstream so that each of the
>> device-id I have a single stream which has ordered data for each device-id.
>>
>> However, flink-kafka consumer I don't see that I can read from a specific
>> partition hence flink consumer read from multiple kafka partitions. So even
>> if I try to create a keyedstream on source type(and then write to a
>> partition for further processing like keyedstream on device-id) I think
>> ordering will not be maintained per source type.
>>
>> Only other option I feel I am left with is have single partition for the
>> topic so that flink can subscribe to the topic and this maintains the
>> ordering, the challenge is too many topics(as I have this configuration for
>> multiple customers) which is not advisable for a kafka cluster.
>>
>> Can anyone shed some light on how to handle this use case.
>>
>> Thanks,
>> Hemant
>>
>


Re: Flink Kafka connector consume from a single kafka partition

2020-02-19 Thread Arvid Heise
Hi Hemant,

Flink passes your configurations to the Kafka consumer, so you could check
if you can subscribe to only one partition there.

However, I would discourage that approach. I don't see the benefit to just
subscribing to the topic entirely and have dedicated processing for the
different devices.

If you are concerned about the order, you shouldn't. Since all events of a
specific device-id reside in the same source partition, events are in-order
in Kafka (responsibility of producer, but I'm assuming that because of your
mail) and thus they are also in order in non-keyed streams in Flink. Any
keyBy on device-id or composite key involving device-id, would also retain
the order.

If you have exactly one partition per device-id, you could even go with
`DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling.

Let me know if I misunderstood your use case or if you have further
questions.

Best,

Arvid

On Wed, Feb 19, 2020 at 8:39 AM hemant singh  wrote:

> Hello Flink Users,
>
> I have a use case where I am processing metrics from different type of
> sources(one source will have multiple devices) and for aggregations as well
> as build alerts order of messages is important. To maintain customer data
> segregation I plan to have single topic for each customer with each source
> stream data to one kafka partition.
> To maintain ordering I am planning to push data for a single source type
> to single partitions. Then I can create keyedstream so that each of the
> device-id I have a single stream which has ordered data for each device-id.
>
> However, flink-kafka consumer I don't see that I can read from a specific
> partition hence flink consumer read from multiple kafka partitions. So even
> if I try to create a keyedstream on source type(and then write to a
> partition for further processing like keyedstream on device-id) I think
> ordering will not be maintained per source type.
>
> Only other option I feel I am left with is have single partition for the
> topic so that flink can subscribe to the topic and this maintains the
> ordering, the challenge is too many topics(as I have this configuration for
> multiple customers) which is not advisable for a kafka cluster.
>
> Can anyone shed some light on how to handle this use case.
>
> Thanks,
> Hemant
>


Flink Kafka connector consume from a single kafka partition

2020-02-18 Thread hemant singh
Hello Flink Users,

I have a use case where I am processing metrics from different type of
sources(one source will have multiple devices) and for aggregations as well
as build alerts order of messages is important. To maintain customer data
segregation I plan to have single topic for each customer with each source
stream data to one kafka partition.
To maintain ordering I am planning to push data for a single source type to
single partitions. Then I can create keyedstream so that each of the
device-id I have a single stream which has ordered data for each device-id.

However, flink-kafka consumer I don't see that I can read from a specific
partition hence flink consumer read from multiple kafka partitions. So even
if I try to create a keyedstream on source type(and then write to a
partition for further processing like keyedstream on device-id) I think
ordering will not be maintained per source type.

Only other option I feel I am left with is have single partition for the
topic so that flink can subscribe to the topic and this maintains the
ordering, the challenge is too many topics(as I have this configuration for
multiple customers) which is not advisable for a kafka cluster.

Can anyone shed some light on how to handle this use case.

Thanks,
Hemant


Flink Kafka Connector

2019-09-05 Thread Vishwas Siravara
Hi guys,
I am using flink connector for kakfa from 1.9.0
Her is my sbt dependency :


"org.apache.flink" %% "flink-connector-kafka" % "1.9.0",


When I check the log file I see that the kafka version is 0.10.2.0.
According to the docs it says that 1.9.0 onwards the version should be
2.2.0. Why do I see this


2019-09-06 01:41:43,534 INFO
org.apache.kafka.common.utils.AppInfoParser - Kafka version :
0.10.2.0. This

creates a big problem, I can connect to the broker but I don't see any
messages.


Why is this ?


Thanks,

Vishwas


Re: Flink Kafka Connector相关问题

2019-08-22 Thread 戴鑫铉
Hi Victor:
   您的回复已收到,谢谢您详细的解答!非常感谢!

Victor Wong  于2019年8月23日周五 上午10:20写道:

> Hi 鑫铉:
>   我尝试解答下;
>
>   1. 这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
>   根据官方文档  [1],checkpoint offset是Flink的功能,auto commit offset是kafka
> client的功能;这俩功能在作用上有一定重叠,所以根据配置会有不同的行为表现;
>   如果Flink没有开启checkpoint,那么offset的保存依赖于kafka client的auto commit offset;
>   如果Flink开启了checkpoint,那么auto commit
> offset会被Flink禁用;在flink完成checkpoint之后,根据用户配置决定是否提交offset到zookeeper (kafka
> 0.8) 或 kafka broker ( kafka 0.8+);
>   结论:如果不开启checkpoint,只要kafka properties中配置了 " enable.auto.commit"为true 和 "
> auto.commit.interval.ms"大于0,就能定期提交offset到kafka;
>
>   2. current-offsets、committed-offsets、consumer lag;
>   根据官方文档 [2],
>   current-offsets是当前Flink读取到的最新offset;
>   committed-offsets是提交到zookeeper/kafka broker 的offset;
>   consumer lag是指topic最新的offset(log end offset) 和
> committed-offsets的差值;Flink没有提供consumer lag信息,该信息依赖于kafka及其相关运维工具生成;
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-connector-metrics
>
>
> On 2019/8/22, 7:21 PM, "戴鑫铉"  wrote:
>
> 您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:
>
>
> 1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
>
> 2、还想问下flink kafka
>
> connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
> End Offset不是一回事啊?能请详细解释一下吗?
>
>
>


Re: Flink Kafka Connector相关问题

2019-08-22 Thread Victor Wong
Hi 鑫铉:
  我尝试解答下;
  
  1. 这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
  根据官方文档  [1],checkpoint offset是Flink的功能,auto commit offset是kafka 
client的功能;这俩功能在作用上有一定重叠,所以根据配置会有不同的行为表现;
  如果Flink没有开启checkpoint,那么offset的保存依赖于kafka client的auto commit offset;
  如果Flink开启了checkpoint,那么auto commit 
offset会被Flink禁用;在flink完成checkpoint之后,根据用户配置决定是否提交offset到zookeeper (kafka 0.8) 或 
kafka broker ( kafka 0.8+);
  结论:如果不开启checkpoint,只要kafka properties中配置了 " enable.auto.commit"为true 和 " 
auto.commit.interval.ms"大于0,就能定期提交offset到kafka;

  2. current-offsets、committed-offsets、consumer lag;
  根据官方文档 [2],
  current-offsets是当前Flink读取到的最新offset;
  committed-offsets是提交到zookeeper/kafka broker 的offset;
  consumer lag是指topic最新的offset(log end offset) 和 
committed-offsets的差值;Flink没有提供consumer lag信息,该信息依赖于kafka及其相关运维工具生成;

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-connector-metrics


On 2019/8/22, 7:21 PM, "戴鑫铉"  wrote:

    您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:


1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
    
2、还想问下flink kafka

connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
End Offset不是一回事啊?能请详细解释一下吗?




Flink Kafka Connector相关问题

2019-08-22 Thread 戴鑫铉
您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:

1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?

2、还想问下flink kafka
connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
End Offset不是一回事啊?能请详细解释一下吗?


Re: Does Flink Kafka connector has max_pending_offsets concept?

2019-06-06 Thread xwang355
Thanks Fabian. This is really helpful.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Does Flink Kafka connector has max_pending_offsets concept?

2019-06-06 Thread Fabian Hueske
Hi Ben,

Flink correctly maintains the offsets of all partitions that are read by a
Kafka consumer.
A checkpoint is only complete when all functions successful checkpoint
their state. For a Kafka consumer, this state is the current reading offset.
In case of a failure the offsets and the state of all functions are reset
to the last completed checkpoint.

Best, Fabian

Am Mi., 5. Juni 2019 um 22:58 Uhr schrieb xwang355 :

> Elias Thanks for your reply. In this case,
>
> *When # of Kafka consumers  = # of partitions, and I use
> setParallelism(>1),
> something like this
> 'messageSteam.rebalance().map(lamba).setParallelism(3).print()'
> *
>
> If checkpointing is enabled, I assume Flink will commit the offsets in the
> 'right order' during checkpoint.
>
>
> For example, if a batch of offsets comprised of (1,2,3,4,5) and there are
> three worker threads(setParallelism(3)
>
> thread 1 -> 1 [stuck by a sync call]
> thread 2 -> 2, 3 [success]
> thread 3 -> 4, 5  [success]
>
> Will Flink commit 5?
>
> I just want to make sure that Flink will manage the pending offsets
> correctly so that there will be no data lost if the above code is used on
> production.
>
> Thanks again!
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Does Flink Kafka connector has max_pending_offsets concept?

2019-06-05 Thread xwang355
Elias Thanks for your reply. In this case, 

*When # of Kafka consumers  = # of partitions, and I use setParallelism(>1),
something like this
'messageSteam.rebalance().map(lamba).setParallelism(3).print()'
*

If checkpointing is enabled, I assume Flink will commit the offsets in the
'right order' during checkpoint.


For example, if a batch of offsets comprised of (1,2,3,4,5) and there are
three worker threads(setParallelism(3)

thread 1 -> 1 [stuck by a sync call]
thread 2 -> 2, 3 [success]
thread 3 -> 4, 5  [success]

Will Flink commit 5? 

I just want to make sure that Flink will manage the pending offsets
correctly so that there will be no data lost if the above code is used on
production. 

Thanks again!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Does Flink Kafka connector has max_pending_offsets concept?

2019-06-05 Thread Elias Levy
There is no such concept in Flink.  Flink tracks offsets in its
checkpoints.  It can optionally commit offsets to Kafka, but that is only
for reporting purposes.  If you wish to lower the number of records that
get reprocessed in the case of a restart, then you must lower the
checkpoint interval.

On Tue, Jun 4, 2019 at 10:47 AM wang xuchen  wrote:

>
> Hi Flink users,
>
> When # of Kafka consumers  = # of partitions, and I use
> setParallelism(>1), something like this
>
> 'messageSteam.rebalance().map(lamba).setParallelism(3).print()'
>
> How do I tune # of outstanding uncommitted offset? Something similar to
>
> https://storm.apache.org/releases/1.1.2/storm-kafka-client.html in Storm.
>
> Thanks
> Ben
>


Does Flink Kafka connector has max_pending_offsets concept?

2019-06-04 Thread wang xuchen
Hi Flink users,

When # of Kafka consumers  = # of partitions, and I use setParallelism(>1),
something like this

'messageSteam.rebalance().map(lamba).setParallelism(3).print()'

How do I tune # of outstanding uncommitted offset? Something similar to

https://storm.apache.org/releases/1.1.2/storm-kafka-client.html in Storm.

Thanks
Ben


Re: Apache Flink: Kafka connector in Python streaming API, “Cannot load user class”

2018-10-11 Thread Dawid Wysakowicz
Hi Kostas,

As far as I know you cannot just use java classes from within python
API. I think Python API does not provide wrapper for kafka connector. I
am adding Chesnay to cc to correct me if I am wrong.

Best,

Dawid


On 11/10/18 12:18, Kostas Evangelou wrote:
> Hey all, 
>
> Thank you so much for your efforts. I've already posted this question
> on stack overflow, but thought I should ask here as well.
>
> I am trying out Flink's new Python streaming API and attempting to run
> my script with |./flink-1.6.1/bin/pyflink-stream.sh
> examples/read_from_kafka.py|. The python script is fairly
> straightforward, I am just trying to consume from an existing topic
> and send everything to stdout (or the *.out file in the log directory
> where the output method emits data by default).
>
> import glob
>
> import os
>
> import sys
>
> from java.util import Properties
>
> from org.apache.flink.streaming.api.functions.source import SourceFunction
>
> from org.apache.flink.streaming.api.collector.selector import
> OutputSelector
>
> from org.apache.flink.api.common.serialization import SimpleStringSchema
>
>
> directories=['/home/user/flink/flink-1.6.1/lib']
>
> for directory in directories:
>
>     for jar in glob.glob(os.path.join(directory,'*.jar')):
>
>                 sys.path.append(jar)
>
>
> from org.apache.flink.streaming.connectors.kafka import
> FlinkKafkaConsumer09
>
>
> props = Properties()
>
> config = {"bootstrap_servers": "localhost:9092",
>
>           "group_id": "flink_test",
>
>           "topics": ["TopicCategory-TopicName"]}
>
> props.setProperty("bootstrap.servers", config['bootstrap_servers'])
>
> props.setProperty("group_id", config['group_id'])
>
> props.setProperty("zookeeper.connect", "localhost:2181")
>
>
> def main(factory):
>
>     consumer = FlinkKafkaConsumer09([config["topics"]],
> SimpleStringSchema(), props)
>
>
>     env = factory.get_execution_environment()
>
>     env.add_java_source(consumer) \
>
>         .output()
>
>     env.execute()
>
>
> I grabbed a handful of jar files from the maven repos,
> namely |flink-connector-kafka-0.9_2.11-1.6.1.jar|, 
> |flink-connector-kafka-base_2.11-1.6.1.jar| and |kafka-clients-0.9.0.1.jar|and
> copied them in Flink's |lib| directory. Unless I misunderstood the
> documentation, this should suffice for Flink to load the kafka
> connector. Indeed, if I remove any of these jars the import fails, but
> this doesn't seem to be enough to actually invoke the plan. Adding a
> for loop to dynamically add these to |sys.path| didn't work either.
> Here's what gets printed in the console:
>
> Starting execution of program
>
> Failed to run plan: null
>
> Traceback (most recent call last):
>
>   File "", line 1, in 
>
>   File
> "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py",
> line 32, in main
>
>     at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
>
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>
>     at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>
>     at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>
>     at
> org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>
> org.apache.flink.client.program.ProgramInvocationException:
> org.apache.flink.client.program.ProgramInvocationException: Job
> failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
>
>
> The program didn't contain a Flink job. Perhaps you forgot to call
> execute() on the execution environment.
>
>
> This is what I see in the logs:
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> load user class:   
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>
> ClassLoader info: URL ClassLoader:
>
>     file:
> '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887'
> (valid JAR)
>
> Class not resolvable through given classloader.
>
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
>
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
>
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
> Is there a way to fix this and make the 

Apache Flink: Kafka connector in Python streaming API, “Cannot load user class”

2018-10-11 Thread Kostas Evangelou
Hey all,

Thank you so much for your efforts. I've already posted this question on
stack overflow, but thought I should ask here as well.

I am trying out Flink's new Python streaming API and attempting to run my
script with ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py.
The python script is fairly straightforward, I am just trying to consume
from an existing topic and send everything to stdout (or the *.out file in
the log directory where the output method emits data by default).

import glob

import os

import sys

from java.util import Properties

from org.apache.flink.streaming.api.functions.source import SourceFunction

from org.apache.flink.streaming.api.collector.selector import OutputSelector

from org.apache.flink.api.common.serialization import SimpleStringSchema


directories=['/home/user/flink/flink-1.6.1/lib']

for directory in directories:

for jar in glob.glob(os.path.join(directory,'*.jar')):

sys.path.append(jar)


from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09


props = Properties()

config = {"bootstrap_servers": "localhost:9092",

  "group_id": "flink_test",

  "topics": ["TopicCategory-TopicName"]}

props.setProperty("bootstrap.servers", config['bootstrap_servers'])

props.setProperty("group_id", config['group_id'])

props.setProperty("zookeeper.connect", "localhost:2181")


def main(factory):

consumer = FlinkKafkaConsumer09([config["topics"]],
SimpleStringSchema(), props)


env = factory.get_execution_environment()

env.add_java_source(consumer) \

.output()

env.execute()

I grabbed a handful of jar files from the maven repos, namely
flink-connector-kafka-0.9_2.11-1.6.1.jar,
flink-connector-kafka-base_2.11-1.6.1.jar and kafka-clients-0.9.0.1.jarand
copied them in Flink's lib directory. Unless I misunderstood the
documentation, this should suffice for Flink to load the kafka connector.
Indeed, if I remove any of these jars the import fails, but this doesn't
seem to be enough to actually invoke the plan. Adding a for loop to
dynamically add these to sys.path didn't work either. Here's what gets
printed in the console:

Starting execution of program

Failed to run plan: null

Traceback (most recent call last):

  File "", line 1, in 

  File
"/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py",
line 32, in main

at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)

at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)

at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)


org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: bbcc0cb2c4fe6e3012d228b06b270eba)


The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.

This is what I see in the logs:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

ClassLoader info: URL ClassLoader:

file:
'/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887'
(valid JAR)

Class not resolvable through given classloader.

at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Is there a way to fix this and make the connector available to Python?

Many thanks,
Kostas


Re: Flink Kafka connector not exist

2018-04-19 Thread Tzu-Li (Gordon) Tai
Hi Sebastien,

You need to add the dependency under a “dependencies” section, like so:


…


Then it should be working.

I would also recommend using the Flink quickstart Maven templates [1], as they 
already have a well defined Maven project skeleton for Flink jobs.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html

On 19 April 2018 at 10:17:11 PM, Lehuede sebastien (lehued...@gmail.com) wrote:

Hi Guys,

I have created a project with Maven to try to send data from Kafka to Flink. 
But when i try to build the project i have the following error :

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project processing-app: Compilation failure: Compilation failure:
[ERROR] 
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[24,51]
 package org.apache.flink.streaming.connectors.kafka does not exist
[ERROR] 
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[52,63]
 cannot find symbol
[ERROR]   symbol:   class FlinkKafkaConsumer011

Here is my "pom.xml" configuration for Flink Kafka connector : 

                1.4.2
                1.8
                2.11
                ${java.version}
                ${java.version}

                
                        org.apache.flink
                        
flink-connector-kafka-0.11_${scala.binary.version}
                        ${flink.version}
                

And here is the import line in my java file :

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

Can anyone could help me with this issue ? 

Regards,
Sebastien

Flink Kafka connector not exist

2018-04-19 Thread Lehuede sebastien
Hi Guys,

I have created a project with Maven to try to send data from Kafka to
Flink. But when i try to build the project i have the following error :

*[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
(default-compile) on project processing-app: Compilation failure:
Compilation failure:*
*[ERROR]
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[24,51]
package org.apache.flink.streaming.connectors.kafka does not exist*
*[ERROR]
/env/mvn/test-0.0.1/processing-app/src/main/java/com/test/alpha/ReadFromKafka.java:[52,63]
cannot find symbol*
*[ERROR]   symbol:   class FlinkKafkaConsumer011*

Here is my "pom.xml" configuration for Flink Kafka connector :

*1.4.2
1.8
2.11
${java.version}
  ${java.version}*

**
*org.apache.flink*
*
flink-connector-kafka-0.11_${scala.binary.version}*
*${flink.version}*
**

And here is the import line in my java file :

*import**
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; *

Can anyone could help me with this issue ?

Regards,
Sebastien


Re: Flink - Kafka Connector

2018-04-13 Thread Alexandru Gutan
You will be able to use it. Kafka 1.10 has backwards compatibility with
v1.0, 0.11 and 0.10 connectors as far as I know.

On 13 April 2018 at 15:12, Lehuede sebastien  wrote:

> Hi All,
>
> I'm very new in Flink (And on Streaming Application topic in general) so
> sorry if for my newbie question.
>
> I plan to do some test with Kafka and Flink and use the Kafka connector
> for that.
>
> I find information on this page : https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html
>
> Can you confirm i will not be able to use latest version of Kafka (1.1.0),
> Flink 1.4 and this connector to do my test ? The connector is compatible
> with Kafka 0.11 and under only ?
>
> Regards,
> Sebastien.
>
>
>


Flink - Kafka Connector

2018-04-13 Thread Lehuede sebastien
Hi All,

I'm very new in Flink (And on Streaming Application topic in general) so
sorry if for my newbie question.

I plan to do some test with Kafka and Flink and use the Kafka connector for
that.

I find information on this page :
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html

Can you confirm i will not be able to use latest version of Kafka (1.1.0),
Flink 1.4 and this connector to do my test ? The connector is compatible
with Kafka 0.11 and under only ?

Regards,
Sebastien.


Re: Flink kafka connector with JAAS configurations crashed

2018-03-19 Thread Nico Kruber
Hi,
I'm no expert on Kafka here, but as the tasks are run on the worker
nodes (where the TaskManagers are run), please double-check whether the
file under /data/apps/spark/kafka_client_jaas.conf on these nodes also
contains the same configuration as on the node running the JobManager,
i.e. an appropriate entry for 'KafkaClient'.


Regards
Nico

On 13/03/18 08:42, sundy wrote:
> 
> Hi ,all 
> 
> I use the code below to set kafka JASS config,   the
> serverConfig.jasspath is  /data/apps/spark/kafka_client_jaas.conf,   but
> on flink standalone deployment, it crashs. I am sure the
> kafka_client_jass.conf is valid, cause other applications(Spark
> streaming) are still working fine with it. So I think it may be not the
> problem caused by kafka 0.10 client.
> 
> System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
> properties.setProperty("security.protocol", "SASL_PLAINTEXT");
> properties.setProperty("sasl.mechanism", "PLAIN");
> 
> 
> Exceptions msgs are:
> 
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: 
> java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in 
> the JAAS configuration. System property 'java.security.auth.login.config' is 
> /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
>   at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657)
>   ... 11 more
> Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
> entry in the JAAS configuration. System property 
> 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
>   at 
> org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
>   ... 15 more
> 
> 
> 
> File content looks like below:
> 
> KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule
> required username="admin" password=“xxx"; };
> 
> It seems like the kafka_client_jaas.conf file has been read, but the
> KafkaClient entry could not be resolved. That’s very strange, other
> applications with the same config are working fine. And I wrote a simple
> Java code to test the the file, it works fine too.
> 
> 
> public static void main(String[] args) {
>   Map maps = new HashMap<>();
>   System.setProperty("java.security.auth.login.config",
> "/data/apps/spark/kafka_client_jaas.conf");
>   Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
>   AppConfigurationEntry object[] =
> jassConfig.getAppConfigurationEntry("KafkaClient");
>   for(AppConfigurationEntry entry : object){
>     System.out.println(entry.getOptions());
>   }
> }
> 
> 
> 
> 
>  
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Flink kafka connector with JAAS configurations crashed

2018-03-13 Thread sundy

Hi ,all 

I use the code below to set kafka JASS config,   the serverConfig.jasspath is  
/data/apps/spark/kafka_client_jaas.conf,   but on flink standalone deployment, 
it crashs. I am sure the kafka_client_jass.conf is valid, cause other 
applications(Spark streaming) are still working fine with it. So I think it may 
be not the problem caused by kafka 0.10 client.
System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");

Exceptions msgs are:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the 
JAAS configuration. System property 'java.security.auth.login.config' is 
/data/apps/spark/kafka_client_jaas.conf
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
at 
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657)
... 11 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
entry in the JAAS configuration. System property 
'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
at 
org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
at 
org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
... 15 more


File content looks like below:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password=“xxx";
};

It seems like the kafka_client_jaas.conf file has been read, but the 
KafkaClient entry could not be resolved.  That’s very strange, other 
applications with the same config are working fine. And I wrote a simple Java 
code to test the the file, it works fine too. 


public static void main(String[] args) {
  Map maps = new HashMap<>();
  System.setProperty("java.security.auth.login.config", 
"/data/apps/spark/kafka_client_jaas.conf");
  Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
  AppConfigurationEntry object[] = 
jassConfig.getAppConfigurationEntry("KafkaClient");
  for(AppConfigurationEntry entry : object){
System.out.println(entry.getOptions());
  }
}

 



 





Re: Flink-Kafka connector - partition offsets for a given timestamp?

2017-12-12 Thread Yang, Connie
Thanks, Gordan!  Will keep an eye on that!

Connie

From: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Date: Monday, December 11, 2017 at 5:29 PM
To: Connie Yang <cy...@ebay.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Flink-Kafka connector - partition offsets for a given timestamp?

Hi Connie,

We do have a pull request for the feature, that should almost be ready after 
rebasing: 
https://github.com/apache/flink/pull/3915<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fpull%2F3915=02%7C01%7Ccyang%40ebay.com%7C6326cf882d31466c2cc008d540ffdd30%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636486390025975513=l5ttzcZlDNAJj7%2Bh%2FWXDtiQZeIIUeipnEVXx%2F5EOInE%3D=0>,
 JIRA: 
https://issues.apache.org/jira/browse/FLINK-6352<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-6352=02%7C01%7Ccyang%40ebay.com%7C6326cf882d31466c2cc008d540ffdd30%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636486390025975513=Ay6YN6q7BVPVHdOyLvAJEq3TX69gXzuiuy%2BXQeE709c%3D=0>.
This means, of course, that the feature isn't part of any release yet. We can 
try to make sure this happens for Flink 1.5, for which the proposed release 
date is around February 2018.

Cheers,
Gordon

On Tue, Dec 12, 2017 at 3:53 AM, Yang, Connie 
<cy...@ebay.com<mailto:cy...@ebay.com>> wrote:
Hi,

Does Flink-Kafka connector allow job graph to consume topoics/partitions from a 
specific timestamp?

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L469<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-connectors%2Fflink-connector-kafka-base%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Fkafka%2FFlinkKafkaConsumerBase.java%23L469=02%7C01%7Ccyang%40ebay.com%7C6326cf882d31466c2cc008d540ffdd30%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636486390025975513=Vr9f1Qlo1v%2Fy0C9Tg3OL5rCqPopTWiB2irDTJALqD%2BE%3D=0>
 seems to suggest that a job graph can only start from an earliest, latest or a 
set of offsets.

KafkaConsumer API, 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1598<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fblob%2Ftrunk%2Fclients%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fkafka%2Fclients%2Fconsumer%2FKafkaConsumer.java%23L1598=02%7C01%7Ccyang%40ebay.com%7C6326cf882d31466c2cc008d540ffdd30%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636486390025975513=xHa2TeAbeuoRf0yX9XniFi6XgP8tSF811iarYHE%2BR5Q%3D=0>,
 gives us a way to find partition offsets based on a timestamp.

Thanks
Connie



Re: Flink-Kafka connector - partition offsets for a given timestamp?

2017-12-11 Thread Tzu-Li (Gordon) Tai
Hi Connie,

We do have a pull request for the feature, that should almost be ready
after rebasing: https://github.com/apache/flink/pull/3915, JIRA:
https://issues.apache.org/jira/browse/FLINK-6352.
This means, of course, that the feature isn't part of any release yet. We
can try to make sure this happens for Flink 1.5, for which the proposed
release date is around February 2018.

Cheers,
Gordon

On Tue, Dec 12, 2017 at 3:53 AM, Yang, Connie <cy...@ebay.com> wrote:

> Hi,
>
>
>
> Does Flink-Kafka connector allow job graph to consume topoics/partitions
> from a specific timestamp?
>
>
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kafka-base/src/main/java/org/
> apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L469
> seems to suggest that a job graph can only start from an earliest, latest
> or a set of offsets.
>
>
>
> KafkaConsumer API, https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1598,
> gives us a way to find partition offsets based on a timestamp.
>
>
>
> Thanks
>
> Connie
>
>


Flink-Kafka connector - partition offsets for a given timestamp?

2017-12-11 Thread Yang, Connie
Hi,

Does Flink-Kafka connector allow job graph to consume topoics/partitions from a 
specific timestamp?

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L469
 seems to suggest that a job graph can only start from an earliest, latest or a 
set of offsets.

KafkaConsumer API, 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1598,
 gives us a way to find partition offsets based on a timestamp.

Thanks
Connie


Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread MAHESH KUMAR
Thanks for the prompt reply

On Tue, Feb 7, 2017 at 10:38 AM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Mahesh,
>
> this is a known limitation of Apache Kafka: https://www.mail-
> archive.com/us...@kafka.apache.org/msg22595.html
> You could implement a tool that is manually retrieving the latest offset
> for the group from the __offsets topic.
>
> On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <r.mahesh.kumar@gmail.com
> > wrote:
>
>> Hi Team,
>>
>> Kindly let me know if I am doing something wrong.
>>
>> Kafka Version - kafka_2.11-0.10.1.1
>> Flink Version - flink-1.2.0
>> Using the latest Kafka Connector - FlinkKafkaConsumer010 -
>> flink-connector-kafka-0.10_2.11
>>
>> Issue Faced: Not able to get the consumer offsets from Kafka when using
>> Flink with Flink-Kafka Connector
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092  --list
>> console-consumer-19886
>> console-consumer-89637
>> $
>>
>> It does not show the consumer group "test"
>>
>> For a group that does not exist, the message is as follows:
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092 --group test1 --describe
>> Consumer group `test1` does not exist.
>> $
>>
>> For the "test" group the error message is as follows
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092 --group test --describe
>> Error while executing consumer group command Group test with protocol
>> type '' is not a valid consumer group
>> java.lang.IllegalArgumentException: Group test with protocol type '' is
>> not a valid consumer group
>> at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
>> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
>> escribeGroup(ConsumerGroupCommand.scala:308)
>> at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.
>> describe(ConsumerGroupCommand.scala:89)
>> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
>> escribe(ConsumerGroupCommand.scala:296)
>> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
>> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
>> $
>>
>> The error is from the AdminClient.scala (https://github.com/apache/kaf
>> ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala)
>>
>> if (metadata.state != "Dead" && metadata.state != "Empty" &&
>> metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
>>   throw new IllegalArgumentException(s"Consumer Group $groupId with
>> protocol type '${metadata.protocolType}' is not a valid consumer group")
>>
>> Code:
>>
>> import java.util.Properties;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>>
>> public class KafkaFlinkOutput {
>> private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
>> private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
>> private static final String CONSUMER_GROUP = "test";
>>
>> public KafkaFlinkOutput() {
>> }
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>> Properties kafkaProps = new Properties();
>> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>> kafkaProps.setProperty("group.id", "test");
>> kafkaProps.setProperty("auto.offset.reset", "latest");
>> env.enableCheckpointing(1000L);
>> FlinkKafkaConsumer010 consumer = new
>> FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps);
>> DataStreamSource consumerData = env.addSource(consumer);
>> consumerData.print();
>> System.out.println("Streaming Kafka in Flink");
>> env.execute("Starting now!");
>> }
>> }
>>
>> Debug Logs that

Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread Robert Metzger
Hi Mahesh,

this is a known limitation of Apache Kafka:
https://www.mail-archive.com/users@kafka.apache.org/msg22595.html
You could implement a tool that is manually retrieving the latest offset
for the group from the __offsets topic.

On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <r.mahesh.kumar@gmail.com>
wrote:

> Hi Team,
>
> Kindly let me know if I am doing something wrong.
>
> Kafka Version - kafka_2.11-0.10.1.1
> Flink Version - flink-1.2.0
> Using the latest Kafka Connector - FlinkKafkaConsumer010 -
> flink-connector-kafka-0.10_2.11
>
> Issue Faced: Not able to get the consumer offsets from Kafka when using
> Flink with Flink-Kafka Connector
>
> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092  --list
> console-consumer-19886
> console-consumer-89637
> $
>
> It does not show the consumer group "test"
>
> For a group that does not exist, the message is as follows:
>
> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092 --group test1 --describe
> Consumer group `test1` does not exist.
> $
>
> For the "test" group the error message is as follows
>
> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092 --group test --describe
> Error while executing consumer group command Group test with protocol type
> '' is not a valid consumer group
> java.lang.IllegalArgumentException: Group test with protocol type '' is
> not a valid consumer group
> at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
> escribeGroup(ConsumerGroupCommand.scala:308)
> at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.
> describe(ConsumerGroupCommand.scala:89)
> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
> escribe(ConsumerGroupCommand.scala:296)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> $
>
> The error is from the AdminClient.scala (https://github.com/apache/kaf
> ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala)
>
> if (metadata.state != "Dead" && metadata.state != "Empty" &&
> metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
>   throw new IllegalArgumentException(s"Consumer Group $groupId with
> protocol type '${metadata.protocolType}' is not a valid consumer group")
>
> Code:
>
> import java.util.Properties;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>
> public class KafkaFlinkOutput {
> private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
> private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
> private static final String CONSUMER_GROUP = "test";
>
> public KafkaFlinkOutput() {
> }
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
> ExecutionEnvironment();
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
> kafkaProps.setProperty("group.id", "test");
> kafkaProps.setProperty("auto.offset.reset", "latest");
> env.enableCheckpointing(1000L);
> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1",
> new SimpleStringSchema(), kafkaProps);
> DataStreamSource consumerData = env.addSource(consumer);
> consumerData.print();
> System.out.println("Streaming Kafka in Flink");
> env.execute("Starting now!");
> }
> }
>
> Debug Logs that show that Kafka Connector does commit to Kafka:
>
> 2017-02-07 09:52:38,851 INFO  org.apache.kafka.clients.consumer.ConsumerConfig
>  - ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = [org.apache.kafka.clients.cons
> umer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [local

Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread MAHESH KUMAR
Hi Team,

Kindly let me know if I am doing something wrong.

Kafka Version - kafka_2.11-0.10.1.1
Flink Version - flink-1.2.0
Using the latest Kafka Connector - FlinkKafkaConsumer010 -
flink-connector-kafka-0.10_2.11

Issue Faced: Not able to get the consumer offsets from Kafka when using
Flink with Flink-Kafka Connector

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092  --list
console-consumer-19886
console-consumer-89637
$

It does not show the consumer group "test"

For a group that does not exist, the message is as follows:

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group test1 --describe
Consumer group `test1` does not exist.
$

For the "test" group the error message is as follows

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group test --describe
Error while executing consumer group command Group test with protocol type
'' is not a valid consumer group
java.lang.IllegalArgumentException: Group test with protocol type '' is not
a valid consumer group
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(
ConsumerGroupCommand.scala:308)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.
describe(ConsumerGroupCommand.scala:89)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.
describe(ConsumerGroupCommand.scala:296)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
$

The error is from the AdminClient.scala (https://github.com/apache/
kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala)

if (metadata.state != "Dead" && metadata.state != "Empty" &&
metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
  throw new IllegalArgumentException(s"Consumer Group $groupId with
protocol type '${metadata.protocolType}' is not a valid consumer group")

Code:

import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaFlinkOutput {
private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String CONSUMER_GROUP = "test";

public KafkaFlinkOutput() {
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "test");
kafkaProps.setProperty("auto.offset.reset", "latest");
env.enableCheckpointing(1000L);
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1",
new SimpleStringSchema(), kafkaProps);
DataStreamSource consumerData = env.addSource(consumer);
consumerData.print();
System.out.println("Streaming Kafka in Flink");
env.execute("Starting now!");
}
}

Debug Logs that show that Kafka Connector does commit to Kafka:

2017-02-07 09:52:38,851 INFO  org.apache.kafka.clients.consumer.ConsumerConfig
 - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy = [org.apache.kafka.clients.
consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647 <02147%20483%20647>
check.crcs = true
request.timeout.ms = 4
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.
Byte

Re: Zeppelin: Flink Kafka Connector

2017-01-18 Thread Fabian Hueske
Ah, OK :-)
Thanks for reporting back!

Cheers, Fabian

2017-01-17 17:50 GMT+01:00 Neil Derraugh <
neil.derra...@intellifylearning.com>:

> I re-read that enough times and it finally made sense. I wasn’t paying
> attention and thought 0.10.2 was the Kafka version —which hasn’t been
> released yet either - ha :(.  I switched to a recent version and it’s all
> good. :)
>
> Thanks !
> Neil
>
> > On Jan 17, 2017, at 11:14 AM, Neil Derraugh <neil.derraugh@
> intellifylearning.com> wrote:
> >
> > Hi Timo & Fabian,
> >
> > Thanks for replying.  I'm using Zeppelin built off master.  And Flink 1.2
> > built off the release-1.2 branch.  Is that the right branch?
> >
> > Neil
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Zeppelin-Flink-
> Kafka-Connector-tp3p9.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>


Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Neil Derraugh
I re-read that enough times and it finally made sense. I wasn’t paying 
attention and thought 0.10.2 was the Kafka version —which hasn’t been released 
yet either - ha :(.  I switched to a recent version and it’s all good. :)

Thanks !
Neil

> On Jan 17, 2017, at 11:14 AM, Neil Derraugh 
> <neil.derra...@intellifylearning.com> wrote:
> 
> Hi Timo & Fabian,
> 
> Thanks for replying.  I'm using Zeppelin built off master.  And Flink 1.2
> built off the release-1.2 branch.  Is that the right branch?
> 
> Neil
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Zeppelin-Flink-Kafka-Connector-tp3p9.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Neil Derraugh
Hi Timo & Fabian,

Thanks for replying.  I'm using Zeppelin built off master.  And Flink 1.2
built off the release-1.2 branch.  Is that the right branch?

Neil



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Zeppelin-Flink-Kafka-Connector-tp3p9.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Fabian Hueske
The connectors are included in the release and available as individual
Maven artifacts.
So Flink 1.2.0 will provide a flink-connector-kafka-0.10 artifact (with
version 1.2.0).

2017-01-17 16:22 GMT+01:00 Foster, Craig <foscr...@amazon.com>:

> Are connectors being included in the 1.2.0 release or do you mean Kafka
> specifically?
>
>
>
> *From: *Fabian Hueske <fhue...@gmail.com>
> *Reply-To: *"user@flink.apache.org" <user@flink.apache.org>
> *Date: *Tuesday, January 17, 2017 at 7:10 AM
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Zeppelin: Flink Kafka Connector
>
>
>
> One thing to add: Flink 1.2.0 has not been release yet.
> The FlinkKafkaConsumer010 is only available in a SNAPSHOT release or the
> first release candidate (RC0).
>
> Best, Fabian
>
>
>
> 2017-01-17 16:08 GMT+01:00 Timo Walther <twal...@apache.org>:
>
> You are using an old version of Flink (0.10.2). A FlinkKafkaConsumer010
> was not present at that time. You need to upgrade to Flink 1.2.
>
> Timo
>
>
> Am 17/01/17 um 15:58 schrieb Neil Derraugh:
>
>
>
> This is really a Zeppelin question, and I’ve already posted to the user
> list there.  I’m just trying to draw in as many relevant eyeballs as
> possible.  If you can help please reply on the Zeppelin mailing list.
>
> In my Zeppelin notebook I’m having a problem importing the Kafka streaming
> library for Flink.
>
> I added org.apache.flink:flink-connector-kafka_2.11:0.10.2 to the
> Dependencies on the Flink interpreter.
>
> The Flink interpreter runs code, just not if I have the following import.
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
>
> I get this error:
> :72: error: object FlinkKafkaConsumer010 is not a member of
> package org.apache.flink.streaming.connectors.kafka
> import org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumer010
>
> Am I doing something wrong here?
>
> Neil
>
>
>
>
>


Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Foster, Craig
Are connectors being included in the 1.2.0 release or do you mean Kafka 
specifically?

From: Fabian Hueske <fhue...@gmail.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Tuesday, January 17, 2017 at 7:10 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Zeppelin: Flink Kafka Connector

One thing to add: Flink 1.2.0 has not been release yet.
The FlinkKafkaConsumer010 is only available in a SNAPSHOT release or the first 
release candidate (RC0).
Best, Fabian

2017-01-17 16:08 GMT+01:00 Timo Walther 
<twal...@apache.org<mailto:twal...@apache.org>>:
You are using an old version of Flink (0.10.2). A FlinkKafkaConsumer010 was not 
present at that time. You need to upgrade to Flink 1.2.

Timo


Am 17/01/17 um 15:58 schrieb Neil Derraugh:

This is really a Zeppelin question, and I’ve already posted to the user list 
there.  I’m just trying to draw in as many relevant eyeballs as possible.  If 
you can help please reply on the Zeppelin mailing list.

In my Zeppelin notebook I’m having a problem importing the Kafka streaming 
library for Flink.

I added org.apache.flink:flink-connector-kafka_2.11:0.10.2 to the Dependencies 
on the Flink interpreter.

The Flink interpreter runs code, just not if I have the following import.
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

I get this error:
:72: error: object FlinkKafkaConsumer010 is not a member of package 
org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

Am I doing something wrong here?

Neil




Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Fabian Hueske
One thing to add: Flink 1.2.0 has not been release yet.
The FlinkKafkaConsumer010 is only available in a SNAPSHOT release or the
first release candidate (RC0).

Best, Fabian

2017-01-17 16:08 GMT+01:00 Timo Walther :

> You are using an old version of Flink (0.10.2). A FlinkKafkaConsumer010
> was not present at that time. You need to upgrade to Flink 1.2.
>
> Timo
>
>
> Am 17/01/17 um 15:58 schrieb Neil Derraugh:
>
> This is really a Zeppelin question, and I’ve already posted to the user
>> list there.  I’m just trying to draw in as many relevant eyeballs as
>> possible.  If you can help please reply on the Zeppelin mailing list.
>>
>> In my Zeppelin notebook I’m having a problem importing the Kafka
>> streaming library for Flink.
>>
>> I added org.apache.flink:flink-connector-kafka_2.11:0.10.2 to the
>> Dependencies on the Flink interpreter.
>>
>> The Flink interpreter runs code, just not if I have the following import.
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
>>
>> I get this error:
>> :72: error: object FlinkKafkaConsumer010 is not a member of
>> package org.apache.flink.streaming.connectors.kafka
>> import org.apache.flink.streaming.con
>> nectors.kafka.FlinkKafkaConsumer010
>>
>> Am I doing something wrong here?
>>
>> Neil
>>
>
>
>


Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Timo Walther
You are using an old version of Flink (0.10.2). A FlinkKafkaConsumer010 
was not present at that time. You need to upgrade to Flink 1.2.


Timo


Am 17/01/17 um 15:58 schrieb Neil Derraugh:

This is really a Zeppelin question, and I’ve already posted to the user list 
there.  I’m just trying to draw in as many relevant eyeballs as possible.  If 
you can help please reply on the Zeppelin mailing list.

In my Zeppelin notebook I’m having a problem importing the Kafka streaming 
library for Flink.

I added org.apache.flink:flink-connector-kafka_2.11:0.10.2 to the Dependencies 
on the Flink interpreter.

The Flink interpreter runs code, just not if I have the following import.
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

I get this error:
:72: error: object FlinkKafkaConsumer010 is not a member of package 
org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

Am I doing something wrong here?

Neil





Zeppelin: Flink Kafka Connector

2017-01-17 Thread Neil Derraugh
This is really a Zeppelin question, and I’ve already posted to the user list 
there.  I’m just trying to draw in as many relevant eyeballs as possible.  If 
you can help please reply on the Zeppelin mailing list.

In my Zeppelin notebook I’m having a problem importing the Kafka streaming 
library for Flink.

I added org.apache.flink:flink-connector-kafka_2.11:0.10.2 to the Dependencies 
on the Flink interpreter.

The Flink interpreter runs code, just not if I have the following import.
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

I get this error:
:72: error: object FlinkKafkaConsumer010 is not a member of package 
org.apache.flink.streaming.connectors.kafka
   import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

Am I doing something wrong here?

Neil

Re: Flink Kafka Connector behaviour on leader change and broker upgrade

2016-11-06 Thread Robert Metzger
Hi,
yes, the Flink Kafka connector for Kafka 0.8 handles broker leader changes
without failing. The SimpleConsumer provided by Kafka 0.8 doesn't handle
that.

The 0.9 Flink Kafka consumer also supports broker leader changes
transparently.
If you keep using the Flink Kafka 0.8 connector with a 0.9 broker, you will
need to change the protocol version of the 0.9 broker, so that the old
client can talk to it. I would recommend using matching consumer and broker
versions.

Regards,
Robert


On Fri, Nov 4, 2016 at 11:19 AM, Janardhan Reddy <
janardhan.re...@olacabs.com> wrote:

> HI,
>
> Does the flink kafka connector 0.8.2 handle broker's leader change
> gracefully since simple kafka consumer should be handling leader changes
> for a partition.
>
> How would the consumer behave when upgrading the brokers from 0.8 to 0.9.
>
>
> Thanks
>


Flink Kafka Connector behaviour on leader change and broker upgrade

2016-11-04 Thread Janardhan Reddy
HI,

Does the flink kafka connector 0.8.2 handle broker's leader change
gracefully since simple kafka consumer should be handling leader changes
for a partition.

How would the consumer behave when upgrading the brokers from 0.8 to 0.9.


Thanks


Re: flink-kafka-connector offset management

2016-05-20 Thread Ufuk Celebi
Hey Arun!

How did you configure your Kafka source? If the offset has been
committed and you configured the source to read from the latest
offset, the message should not be re-processed.

– Ufuk


On Fri, May 13, 2016 at 2:19 PM, Arun Balan <arunba...@gmail.com> wrote:
> Hi, I am trying to use the flink-kafka-connector and I notice that every
> time I restart my application it re-reads the last message on the kafka
> topic. So if the latest offset on the topic is 10, then when the application
> is restarted, kafka-connector will re-read message 10. Why is this the
> behavior? I would assume that the last message has already been read and
> offset committed. I require that messages that are already processed from
> the topic not be reprocessed. Any insight would be helpful.
>
> Thanks
> Arun Balan


flink-kafka-connector offset management

2016-05-13 Thread Arun Balan
Hi, I am trying to use the flink-kafka-connector and I notice that every
time I restart my application it re-reads the last message on the kafka
topic. So if the latest offset on the topic is 10, then when the
application is restarted, kafka-connector will re-read message 10. Why is
this the behavior? I would assume that the last message has already been
read and offset committed. I require that messages that are already
processed from the topic not be reprocessed. Any insight would be helpful.

Thanks
Arun Balan