How to test stateful streaming pipeline?

2022-02-01 Thread Marcin Kuthan
Hi

This is my first question to the community so welcome everyone :) On a
daily basis I’m using Apache Beam for developing streaming pipelines but I
would like to learn native Flink as well. I’m looking for examples on how
to write integration tests with full programmatic control over watermark
and assertions on results to check that the results are early, on-time or
late.

Let’s assume the “Word Count” aggregation in the fixed/tumbling window. The
function “myWordCount'' gets a stream of data (PCollection/Dataset) and
calculates the word's cardinality in a fixed window within allowed
lateness. The whole pipeline and input/output are defined outside of that
function.

In Beam API the test might looks like:

words = testStreamOf[String]
  .addElementsAtTime("00:00:00", "foo")
  .addElementsAtTime("00:00:30", "bar")
  .advanceWatermarkTo("00:01:00")
  .addElementsAtTime("00:00:40", "foo") // late event
  .advanceWatermarkToInfinity()

// function under test
results = myWordCount(words, windowDuration = 1 minute)

results should inOnTimePane("00:00:00", "00:01:00") {
  containInAnyOrderAtTime(Seq(
("00:00:59.999", ("foo", 1L)),
("00:00:59.999", ("bar", 1L))
  ))
 }

results should inLatePane("00:00:00", "00:01:00") {
  containSingleValueAtTime(
"00:00:59.999", ("foo", 1L) // “foo” from on-time pane was discarded
  )
 }

I found documentation for testing stateful UDFs but frankly speaking I
don’t know if it is for testing built-in Flink UDFs or for custom functions
as well. There is also only one test scenario example, far from
completeness.

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators

I also found flink-spector project, API looks promising but unfortunately
is not actively maintained anymore:

https://github.com/ottogroup/flink-spector

Could you share some documentation/examples/sources with integration tests
for Flink streaming pipelines, please? Perhaps I've missed something :)

Thanks in advance,
Marcin


Re: GenericType problem in KeyedCoProcessFunction after disableGenericTypes()

2022-02-01 Thread Yoel Benharrous
Hi Deniz,

You could declare UUIDGenerator as a transient field and instanciate it in
the open
function

Ot if you want to inject any UUIDGenerator you could provide a supplier of
UUIDGenerator that should implement Serializable and invoke it in the open
function.




On Tue, Feb 1, 2022, 10:01 PM Deniz Koçak  wrote:

> Hi All,
>
> We have a function extending `KeyedCoProcessFunction` and within that
> function implementation. I wanted to keep a class object as a field
> which is simply responsible for generating a UUID.
>
> We disabled Kyro fallback for generic types via
> `env.getConfig().disableGenericTypes()`.
> I am receiving the error below when I pass an instance of
> `UUIDGenerator` to the `KeyedCoProcessFunction` and try to run the
> job.
>
>
> ***
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and type
> java.util.UUID is treated as a generic type.
>
> ***
>
> At that point I wonder how can/should I do to pass an instance of
> `UUIDGenerator` to `PrebetModelRequestProcessor` function.
>
>
> ModelRequestProcessor Class:
> 
> public class ModelRequestProcessor extends
> KeyedCoProcessFunction ModelRequest> {
>
> protected final UUIDGenerator uuidGenerator;
>
> public PrebetModelRequestProcessor(UUIDGenerator generator) {
> this.uuidGenerator = generator;
> }
> }
> 
>
> UUIDGenerator Class:
> 
> public class UUIDGenerator implements Serializable {
>
> private static final long serialVersionUID = 42L;
>
> public java.util.UUID generate() {
> return java.util.UUID.randomUUID();
> }
> }
> 
>


Future support for custom FileEnumerator in FileSource?

2022-02-01 Thread Kevin Lam
Hi all,

We're interested in being able to filter files using the new FileSource API
.
Are there plans to add it? If there's existing work, we would be happy to
help push this forward through contributions.

It seems like things are almost there. FileSource encapsulates filtering
functionality into FileEnumerator
.
However, the FileEnumerator is not parametrizable, it's currently hard-coded
.
One potential way to enable filtering files is to be able to pass a
custom FileEnumerator.

Thanks in advance,
Kevin


GenericType problem in KeyedCoProcessFunction after disableGenericTypes()

2022-02-01 Thread Deniz Koçak
Hi All,

We have a function extending `KeyedCoProcessFunction` and within that
function implementation. I wanted to keep a class object as a field
which is simply responsible for generating a UUID.

We disabled Kyro fallback for generic types via
`env.getConfig().disableGenericTypes()`.
I am receiving the error below when I pass an instance of
`UUIDGenerator` to the `KeyedCoProcessFunction` and try to run the
job.

***
Exception in thread "main" java.lang.UnsupportedOperationException:
Generic types have been disabled in the ExecutionConfig and type
java.util.UUID is treated as a generic type.
***

At that point I wonder how can/should I do to pass an instance of
`UUIDGenerator` to `PrebetModelRequestProcessor` function.


ModelRequestProcessor Class:

public class ModelRequestProcessor extends
KeyedCoProcessFunction {

protected final UUIDGenerator uuidGenerator;

public PrebetModelRequestProcessor(UUIDGenerator generator) {
this.uuidGenerator = generator;
}
}


UUIDGenerator Class:

public class UUIDGenerator implements Serializable {

private static final long serialVersionUID = 42L;

public java.util.UUID generate() {
return java.util.UUID.randomUUID();
}
}



How to proper hashCode() for keys.

2022-02-01 Thread John Smith
Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
using low level state access APIs, this is most likely caused by
non-deterministic shuffle key (hashCode and equals implementation).

This is my class, is my hashCode deterministic?

public final class MyEventCountKey {
private final String countDateTime;
private final String domain;
private final String event;

public MyEventCountKey(final String countDateTime, final String
domain, final String event) {
this.countDateTime = countDateTime;
this.domain = domain;
this.event = event;
}

public String getCountDateTime() {
return countDateTime;
}

public String getDomain() {
return domain;
}

public String getEven() {
return event;
}

@Override
public String toString() {
return countDateTime + "|" + domain + "|" + event;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyEventCountKey that = (MyEventCountKey) o;
return countDateTime.equals(that.countDateTime) &&
domain.equals(that.domain) &&
event.equals(that.event);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + countDateTime.hashCode();
result = prime * result + domain.hashCode();
result = prime * result +  event.hashCode();
return result;
}
}


Memory issues with Rocksdb ColumnFamilyOptions

2022-02-01 Thread Natalie Dunn
Hi All,

I am working on trying to process a Savepoint in order to produce basic 
statistics on it for monitoring. I’m running into an issue where processing a 
large Savepoint is running out of memory before I can process the Savepoint 
completely.

One thing I noticed in profiling the code is that there seems to be a lot of 
memory given to the  RocksDB ColumnFamilyOptions class because it is producing 
a lot of Java.lang.ref.Finalizer objects that don’t seem to be garbage 
collected.

I see in the RocksDB code that these should be closed but it doesn’t seem like 
they are being closed. 
https://github.com/facebook/rocksdb/blob/f57745814f2d9e937383b4bfea55373306182c14/java/src/main/java/org/rocksdb/AbstractNativeReference.java#L71

Is there a way to close these via the Flink API? Also, more generally, why am I 
seeing hundreds of thousands of these generated?

In case it’s helpful, here’s a genericized/simplified version of the code:


import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;

import org.apache.flink.state.api.functions.KeyedStateReaderFunction;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.MapOperator;

import org.apache.flink.api.common.functions.RichReduceFunction;

Configuration config = new Configuration();
config.setInteger("state.backend.rocksdb.files.open", 2);
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config);
env.getConfig().enableObjectReuse();

EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();

final EmbeddedRocksDBStateBackend configuredRocksDBStateBackend =
stateBackend.configure(
config, Thread.currentThread().getContextClassLoader());

// The below function just downloads the savepoint from our cloud storage and 
runs Savepoint.load()
ExistingSavepoint savepoint = loadSavepoint(env, configuredRocksDBStateBackend, 
savepointPath);


// ReFunctionStateReader() is a KeyedStateReaderFunction and does basic 
processing in readKey

DataSource source = savepoint.readKeyedState("process-1", new 
FunctionStateReader());

final MapOperator sizes =
source
.map(s -> new Metrics(s.key, 
s.stateFields.values().stream().mapToInt(Integer::parseInt).sum(),
0, 0, 0, 0, 0, 0, 0))
.returns(TypeInformation.of(new TypeHint<>() {
}));

// MetricsRed() below is a RichReduceFunction
DataSet stats = sizes.reduce(new MetricsRed());


If you spot anything wrong with this approach that would cause memory issues, 
please let me know, I  am not 100% sure that the specific issue/question above 
is the full cause of the memory issues that I have been having.

Thank you!
Natalie


SV: StateFun deployement options

2022-02-01 Thread Christopher Gustafson
Hi Igal,

I have been reading some threads but the ones I found was fairly old and thus I 
wasn't sure whether the solutions were accurate or not. If you have any threads 
that explain how to deploy SF as a Flink Job please do. I have been trying the 
suggested version from the old docs but haven't succeeded in deploying it.

Best,
Christopher



Från: Igal Shilman 
Skickat: den 1 februari 2022 12:26:52
Till: Christopher Gustafson
Kopia: user@flink.apache.org
Ämne: Re: StateFun deployement options

Hello Christopher,

The most common deployment of StateFun applications is via the community 
provided Docker images[1] (and their derivations) . This image captures the 
optimal deployment of Flink for StateFun.
In addition, there is also an example of how to deploy these images to k8s[2].
If you are exploring SF and trying to get started, then you can simply explore 
the Stateful functions playground[3] and start up an entire env via 
docker-compose.

If your env requires deploying SF as a Flink job this is still possible, and 
few folks have been asking this in the past, and I'm sure you can find these 
threads in the archive, if not let me know and I will point you in the right 
direction.

[1] https://hub.docker.com/r/apache/flink-statefun
[2] 
https://github.com/apache/flink-statefun-playground/tree/release-3.1/deployments/k8s
[3] https://github.com/apache/flink-statefun-playground/tree/release-3.1

All the best!
Igal



On Tue, Feb 1, 2022 at 11:57 AM Christopher Gustafson 
mailto:chr...@kth.se>> wrote:

Hi,


I am looking into ways to deploy StateFun jobs. I noticed that the option of 
packaging a StateFun job as a fat jar to an existing flink cluster was 
described in the 2.2 version of the 
docs
 but not in the latest versions. Is this way of deploying StateFun jobs still 
supported, and in that case are there any additional steps that has to be taken?

Best Regards,

Christopher Gustafson


Queryable State Deprecation

2022-02-01 Thread Jatti, Karthik
Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to be 
deprecated but I couldn’t find much information on confluence or this mailing 
group’s archives to understand the background as to why it’s being deprecated 
and what would be a an alternative.  Any pointers to help me get some more 
information here would be great.

Thanks,
Karthik



The information in the email message containing a link to this page, including 
any attachments thereto (collectively, “the e-mail”), is only for use by the 
intended recipient(s). The e-mail may contain information that is confidential, 
proprietary and/or privileged. If you have reason to believe that you are not 
the intended recipient, please notify the sender that you may have received 
this e-mail in error and delete all copies of it, including attachments, from 
your computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an intended 
recipient may be governed by arrangements in place between the sender’s and 
recipient’s respective firms. Eze Software does not represent that the e-mail 
is virus-free, complete or accurate. Eze Software accepts no liability for any 
damage sustained in connection with the content or transmission of the e-mail.

Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.


Re: Show plan in UI not working.

2022-02-01 Thread John Smith
Hi here it is: https://issues.apache.org/jira/browse/FLINK-25812

Finally I think it looks liek a Javascript issue on the Ui rather than the
cluster.

On Tue, 25 Jan 2022 at 02:58, Ingo Bürk  wrote:

> Hi John,
>
> can you please submit this as an issue in JIRA? If you suspect it is
> related to other issues, just make a note of that in the issue as well.
> Thanks!
>
>
> Ingo
>
> On 23.01.22 18:05, John Smith wrote:
> > Just I'm case but in 1.14.x regardless of the job manager is leader or
> > not. Before submitting a job of you click on "Show Plan" it just shows a
> > blank window.
> >
> > I'm assuming it's similar issue as the deserialozation ones.
>


Re: KafkaSource vs FlinkKafkaConsumer010

2022-02-01 Thread David Anderson
Before Kafka introduced their universal client, Flink had version-specific
connectors, e.g., for versions 0.8, 0.9, 0.10, and 0.11. Those were
eventually removed in favor of FlinkKafkaConsumer, which is/was backward
compatible back to Kafka version 0.10.

FlinkKafkaConsumer itself was deprecated in Flink 1.14 in favor of
KafkaSource, which implements the unified batch/streaming interface defined
in FLIP-27.

Regards,
David

On Tue, Feb 1, 2022 at 9:21 AM Francesco Guardiani 
wrote:

> I think the FlinkKakfaConsumer010 you're talking about is the old source
> api. You should use only KafkaSource now, as they use the new source
> infrastructure.
>
> On Tue, Feb 1, 2022 at 9:02 AM HG  wrote:
>
>> Hello Francesco
>> Perhaps I copied the wrong link of 1.2.
>> But there is also
>> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>
>> It seems there are 2 ways to use Kafka
>>
>> KafkaSource source = KafkaSource.builder()
>> .setBootstrapServers(brokers)
>> .setTopics("input-topic")
>> .setGroupId("my-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build();
>>
>> And like this:
>>
>> Properties kafkaProperties = new Properties();
>> kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
>> kafkaProperties.put("group.id",kafkaGroupID);
>> kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
>> FlinkKafkaConsumer010 kafkaConsumer = new 
>> FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), 
>> kafkaProperties);
>> kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>>
>>
>> There is even a FlinkKafkaConsumer011
>>
>> Which one is preferable ? Or have they different use cases?
>>
>> Regards Hans
>>
>>
>> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
>> france...@ververica.com>:
>>
>>> The latter link you posted refers to a very old flink release. You shold
>>> use the first link, which refers to latest release
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:20 AM HG  wrote:
>>>
 Hello all

 I am confused.
 What is the difference between KafkaSource as defined in :
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
 and FlinkKafkaConsumer010 as defined in
 https://nightlies.apache.org/flink/flink-docs-release-
 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
 

 When should I use which?

 Regards Hans

>>>


Re: Kafka Consumer Group Name not set if no checkpointing?

2022-02-01 Thread John Smith
Ok that's fine. It's just a thing we are used to that functionality from
basically every other consumer we have flink or not. So we monitor the
offsets for lateness or just to look.

On Tue, 1 Feb 2022 at 03:38, Fabian Paul  wrote:

> Hi John,
>
> You are seeing what I described in my previous mail. The KafkaSource
> only writes consumer offsets to Kafka when a checkpoint is finished
> [1]. Flink does not leverage the offsets stored in Kafka to ensure
> exactly-once processing but it writes the last read offset to Flink's
> internal state that is part of the checkpoint.
>
> Please be also aware that it is not guaranteed that the offsets you
> are seeing with Kafka Explorer reflect the latest record read by the
> KafkaSource because the offset is committed asynchronously and we do
> not ensure that it succeeds.
>
> Maybe you can share with us why you want to inspect the progress of
> the KafkaSource with Kafka Explorer.
>
> Best,
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/dea2b10502a493e9d4137e7d94d2dac85d9fa666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L221
>
> On Mon, Jan 31, 2022 at 8:08 PM John Smith  wrote:
> >
> > Hi yes, see below. So it only seems to show the consumer offsets if
> checkpointing is on... That's the only diff I can see between my two
> different jobs. And the moment I enabled it on the job. It started showing
> in Kafka Explorer here: https://www.kafkatool.com/
> >
> > return KafkaSource.builder()
> > .setBootstrapServers(bootstrapServers)
> > .setTopics(topic)
> > .setValueOnlyDeserializer(new VertxJsonSchema())
> > .setGroupId(consumerGroup)
> > .setStartingOffsets(oi)
> > .setProperties(props)
> > .build();
> >
> >
> > On Mon, 31 Jan 2022 at 12:03, Fabian Paul  wrote:
> >>
> >> Hi John,
> >> First I would like to ask you to give us some more information about
> >> how you consume from Kafka with Flink. It is currently recommended to
> >> use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.
> >>
> >> One thing to already note is that by default Flink does not commit the
> >> Kafka to offset back to the topic because it is not needed from a
> >> Flink perspective and is only supported on a best-effort basis if a
> >> checkpoint completes.
> >>
> >> I am not very familiar with Kafka Explorer but I can imagine it only
> >> shows the consumer group if there are actually committed offsets
> >>
> >> Best,
> >> Fabian
> >>
> >> On Mon, Jan 31, 2022 at 3:41 PM John Smith 
> wrote:
> >> >
> >> > Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I
> have noticed that the Kafka Consumer Group is not set?
> >> >
> >> > I use Kafka Explorer to see all the consumers and when I run the job
> I don't see the consumer group. Finally I decided to enable checkpointing
> and restart the job and finally saw the consumer group.
> >> >
> >> > Is this expected behaviour?
>


Re: regarding flink metrics

2022-02-01 Thread Chesnay Schepler
Your best bet is to create a custom reporter that does this calculation. 
You could either wrap the reporter, subclass is, or fork it.
In any case, 
https://github.com/apache/flink/tree/master/flink-metrics/flink-metrics-datadog 
should be a good starting point.


On 01/02/2022 13:26, Jessy Ping wrote:

Hi Team,

We are using datadog and its http reporter( packaged in flink image) 
for sending metrics from flink application. We do have a requirement 
for setting tags with values calculated at runtime for the custom 
metrics emitted from Flink. Currently, it is impossible to assign tags 
at runtime. Is there a work arround for the same ?


Thanks
Jessy





regarding flink metrics

2022-02-01 Thread Jessy Ping
Hi Team,

We are using datadog and its http reporter( packaged in flink image) for
sending metrics from flink application. We do have a requirement for
setting tags with values calculated at runtime for the custom metrics
emitted from Flink. Currently, it is impossible to assign tags at runtime.
Is there a work arround for the same ?

Thanks
Jessy


Re: StateFun deployement options

2022-02-01 Thread Igal Shilman
Hello Christopher,

The most common deployment of StateFun applications is via the community
provided Docker images[1] (and their derivations) . This image captures the
optimal deployment of Flink for StateFun.
In addition, there is also an example of how to deploy these images to
k8s[2].
If you are exploring SF and trying to get started, then you can simply
explore the Stateful functions playground[3] and start up an entire env via
docker-compose.

If your env requires deploying SF as a Flink job this is still possible,
and few folks have been asking this in the past, and I'm sure you can find
these threads in the archive, if not let me know and I will point you in
the right direction.

[1] https://hub.docker.com/r/apache/flink-statefun
[2]
https://github.com/apache/flink-statefun-playground/tree/release-3.1/deployments/k8s
[3] https://github.com/apache/flink-statefun-playground/tree/release-3.1

All the best!
Igal



On Tue, Feb 1, 2022 at 11:57 AM Christopher Gustafson  wrote:

> Hi,
>
>
> I am looking into ways to deploy StateFun jobs. I noticed that the option
> of packaging a StateFun job as a fat jar to an existing flink cluster was
> described in the 2.2 version of the docs
> 
>  but
> not in the latest versions. Is this way of deploying StateFun jobs still
> supported, and in that case are there any additional steps that has to be
> taken?
>
> Best Regards,
>
> Christopher Gustafson
>


[ANNOUNCE] Apache Flink Stateful Functions 3.2.0 released

2022-02-01 Thread Till Rohrmann
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 3.2.0.

Stateful Functions is an API that simplifies building distributed stateful
applications.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/01/31/release-statefun-3.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at: https://pypi.org/project/apache-flink-statefun/

JavaScript SDK for Stateful Functions published to the NPM registry can be
found at: https://www.npmjs.com/package/apache-flink-statefun

Official Docker image for building Stateful Functions applications can be
found at:
https://hub.docker.com/r/apache/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350540

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Till


StateFun deployement options

2022-02-01 Thread Christopher Gustafson
Hi,


I am looking into ways to deploy StateFun jobs. I noticed that the option of 
packaging a StateFun job as a fat jar to an existing flink cluster was 
described in the 2.2 version of the 
docs
 but not in the latest versions. Is this way of deploying StateFun jobs still 
supported, and in that case are there any additional steps that has to be taken?

Best Regards,

Christopher Gustafson


Question about object reusing in Flink SQL

2022-02-01 Thread LM Kang
Hi community,


I have read a blog named <> [1], 
which says enabling object reuse can greatly improve performance of Blink 
Planner. 

But as I see in the code (v1.14), there’s few occurrences of controllable 
object reusing in Flink SQL-related modules. 

What’s more, when enabling object reuse, the planner deep-copies the objects 
(e.g. in `CommonExecLookupJoin`), which is just opposite to the shallow-copy 
behavior in DataStream API. The generated code is always the same, regardless 
of the setting.

Am I missing something? Or the behavior changed during version update?

Many thanks.


[1] https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance



Re: Flink 1.14 metrics : taskmanager host name missing

2022-02-01 Thread Chesnay Schepler

There is unfortunately no knob for you to turn to get the previous behavior.

The host variable has a number of issues (being inconsistent across 
processes, not being configurable, not being stable (because we just 
forward what we get from the RPC layer)), such that at the moment it 
probably shouldn't be relied on at all.


https://issues.apache.org/jira/browse/FLINK-7198
https://issues.apache.org/jira/browse/FLINK-8358

On 01/02/2022 07:32, Mayur Gubrele wrote:

Hi,

Can someone please take a look at this? It's a major blocker for us.

Thanks,
Mayur

On Fri, Jan 21, 2022 at 2:11 PM Mayur Gubrele 
 wrote:


Hello,

We recently upgraded our Flink cluster to 1.14 and noticed that
all the taskmanager metrics we receive in our Prometheus data
source get host IPs instead of hostnames, which was the case
earlier before we moved to 1.14.

I see on the flink dashboard as well under taskmanager details,
host IPs are being populated now. This is a breaking change for
us. Some of our APIs use this host tag value which used to be the
hostname earlier.

Can you tell us if there's a way we can configure to get hostnames
instead of IPs?

Thanks,
Mayur



Re: Kafka Consumer Group Name not set if no checkpointing?

2022-02-01 Thread Fabian Paul
Hi John,

You are seeing what I described in my previous mail. The KafkaSource
only writes consumer offsets to Kafka when a checkpoint is finished
[1]. Flink does not leverage the offsets stored in Kafka to ensure
exactly-once processing but it writes the last read offset to Flink's
internal state that is part of the checkpoint.

Please be also aware that it is not guaranteed that the offsets you
are seeing with Kafka Explorer reflect the latest record read by the
KafkaSource because the offset is committed asynchronously and we do
not ensure that it succeeds.

Maybe you can share with us why you want to inspect the progress of
the KafkaSource with Kafka Explorer.

Best,
Fabian


[1] 
https://github.com/apache/flink/blob/dea2b10502a493e9d4137e7d94d2dac85d9fa666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L221

On Mon, Jan 31, 2022 at 8:08 PM John Smith  wrote:
>
> Hi yes, see below. So it only seems to show the consumer offsets if 
> checkpointing is on... That's the only diff I can see between my two 
> different jobs. And the moment I enabled it on the job. It started showing in 
> Kafka Explorer here: https://www.kafkatool.com/
>
> return KafkaSource.builder()
> .setBootstrapServers(bootstrapServers)
> .setTopics(topic)
> .setValueOnlyDeserializer(new VertxJsonSchema())
> .setGroupId(consumerGroup)
> .setStartingOffsets(oi)
> .setProperties(props)
> .build();
>
>
> On Mon, 31 Jan 2022 at 12:03, Fabian Paul  wrote:
>>
>> Hi John,
>> First I would like to ask you to give us some more information about
>> how you consume from Kafka with Flink. It is currently recommended to
>> use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.
>>
>> One thing to already note is that by default Flink does not commit the
>> Kafka to offset back to the topic because it is not needed from a
>> Flink perspective and is only supported on a best-effort basis if a
>> checkpoint completes.
>>
>> I am not very familiar with Kafka Explorer but I can imagine it only
>> shows the consumer group if there are actually committed offsets
>>
>> Best,
>> Fabian
>>
>> On Mon, Jan 31, 2022 at 3:41 PM John Smith  wrote:
>> >
>> > Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I have 
>> > noticed that the Kafka Consumer Group is not set?
>> >
>> > I use Kafka Explorer to see all the consumers and when I run the job I 
>> > don't see the consumer group. Finally I decided to enable checkpointing 
>> > and restart the job and finally saw the consumer group.
>> >
>> > Is this expected behaviour?


Re: KafkaSource vs FlinkKafkaConsumer010

2022-02-01 Thread Francesco Guardiani
I think the FlinkKakfaConsumer010 you're talking about is the old source
api. You should use only KafkaSource now, as they use the new source
infrastructure.

On Tue, Feb 1, 2022 at 9:02 AM HG  wrote:

> Hello Francesco
> Perhaps I copied the wrong link of 1.2.
> But there is also
> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>
> It seems there are 2 ways to use Kafka
>
> KafkaSource source = KafkaSource.builder()
> .setBootstrapServers(brokers)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
>
> And like this:
>
> Properties kafkaProperties = new Properties();
> kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
> kafkaProperties.put("group.id",kafkaGroupID);
> kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
> FlinkKafkaConsumer010 kafkaConsumer = new 
> FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), 
> kafkaProperties);
> kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>
>
> There is even a FlinkKafkaConsumer011
>
> Which one is preferable ? Or have they different use cases?
>
> Regards Hans
>
>
> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> The latter link you posted refers to a very old flink release. You shold
>> use the first link, which refers to latest release
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:20 AM HG  wrote:
>>
>>> Hello all
>>>
>>> I am confused.
>>> What is the difference between KafkaSource as defined in :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>>> and FlinkKafkaConsumer010 as defined in
>>> https://nightlies.apache.org/flink/flink-docs-release-
>>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>> 
>>>
>>> When should I use which?
>>>
>>> Regards Hans
>>>
>>


Re: KafkaSource vs FlinkKafkaConsumer010

2022-02-01 Thread HG
Hello Francesco
Perhaps I copied the wrong link of 1.2.
But there is also
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html

It seems there are 2 ways to use Kafka

KafkaSource source = KafkaSource.builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

And like this:

Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
kafkaProperties.put("group.id",kafkaGroupID);
kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
FlinkKafkaConsumer010 kafkaConsumer = new
FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(),
kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);


There is even a FlinkKafkaConsumer011

Which one is preferable ? Or have they different use cases?

Regards Hans


Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
france...@ververica.com>:

> The latter link you posted refers to a very old flink release. You shold
> use the first link, which refers to latest release
>
> FG
>
> On Tue, Feb 1, 2022 at 8:20 AM HG  wrote:
>
>> Hello all
>>
>> I am confused.
>> What is the difference between KafkaSource as defined in :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>> and FlinkKafkaConsumer010 as defined in
>> https://nightlies.apache.org/flink/flink-docs-release-
>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>> 
>>
>> When should I use which?
>>
>> Regards Hans
>>
>