Continuous Reading of File using FileSource does not process the existing files in version 1.17

2024-01-05 Thread Prasanna kumar
Hi Flink Community,


I hope this email finds you well. I am currently in the process of
migrating my Flink application from version 1.12.7 to 1.17.2 and have
encountered a behavior issue with the FileSource while reading data from an
S3 bucket.

 In the previous version (1.12.7), I was utilizing the readFile method with
the TextInputFormat to continuously monitor the S3 bucket for any updates
or new files added at a specified time interval. The code snippet for this
was as follows:








*streamExecutionEnvironment .readFile(new TextInputFormat(new
Path("s3://my-s3-path")),   "s3://my-s3-path",
FileProcessingMode.PROCESS_CONTINUOUSLY,   1)
.setParallelism(1);*



Now, after migrating to Flink 1.17.2, I have switched to using the
FileSource for continuous monitoring. The code snippet for this is as
follows:




*FileSource fileSource = FileSource .forRecordStreamFormat(new
TextLineInputFormat(), new Path("s3://my-s3-path"))
.monitorContinuously(Duration.ofMillis(1)) .build();*




*streamExecutionEnvironment .fromSource(fileSource,
WatermarkStrategy.noWatermarks(), "filesource") .uid("filesource")
.setParallelism(1);*

While this setup successfully detects new files added to the S3 bucket, but
it seems to be missing changes made to existing files. I am unsure if this
is expected behavior in Flink 1.17.2 or if there is a configuration detail
I might be overlooking.

 Any guidance or suggestions on resolving this issue would be greatly
appreciated.

Thanks,
Prasanna


Java 21 for flink

2023-07-07 Thread Prasanna kumar
Hi all,

Java 21 plans to support light weight thread called fiber based on Project
LOOM which will increase the concurrency to great extent.

Is there any plan for flink to leverage it?

Thanks,
Prasanna.


Snappy Compression for Checkpoints

2023-01-05 Thread Prasanna kumar
Hello Flink Community ,



We are running Jobs in flink version 1.12.7 which reads from Kafka , apply
some rules(stored in broadcast state) and then writes to kafka. This is a
very low latency and high throughput and we have set up at least one
semantics.



Checkpoint Configuration Used

   1. We cannot have many duplicates during the restarts so we have set a
   checkpoint interval of 3s. (We cannot increase it any more since , we have
   10s of 1000s of records processed per sec ) .
   2. Checkpointing target location is AWS S3.
   3. Max Concurrent Checkpoint is 1
   4. Time Between Checkpoints is 500ms

Earlier we had around 10 rule objects stored in broadcast state. Recently
we have enabled 80 rule objects.  Post increase , we are seeing a lot of
checkpoints in progress . (Earlier we had rarely seen this in metrics
dashboard).  The Parallelism of BroadCast Function is around 10 and the
present Checkpoint size is 64kb.



Since we expect this rule objects to increase to 1000 and beyond in a
year's time, we are looking at ways to improve performance in checkpoint.
We cannot use incremental checkpoint since its supported only in RocksDB
and the development arc is little higher. Looking at easier solution first
, we tried using "SnapshotCompression" , but we did not see any difference
in decrease of checkpoint size.



Have few questions on the same

   1. Does SnapshotCompression work in version 1.12.7 ?
   2. If Yes , how much size reduction could we expect if this is enabled
   and at what size does the Compression works . Is there any threshold post
   only which the compression would work ?



Apart from the questions above , you are welcome to suggest any config
changes that can be done for improvements.



Thanks & Regards,

Prasanna


Regarding Flink Upgrades

2022-11-02 Thread Prasanna kumar
Hi Community,

Currently we are using version 1.12.7 and it is running without any issue.
And we see that version 1.17 is set to release early next year.

That means we would be 5 versions behind.

1) So how far can we lag behind the current flink version ?

2) If we face any issues like log4j that we faced last year , would it be
fixed for older versions if so till what version ?

This is what I see on the update policy on old releases.  (
https://flink.apache.org/downloads.html)

[image: image.png]

 Would this hold good going forward also ?

Thanks,
Prasanna.


Re: [Security] - Critical OpenSSL Vulnerability

2022-11-01 Thread Prasanna kumar
Could we also get an emergency patch to 1.12 version as well , because
upgrading flink to a newer version on production in a short time would be
high in effort and longer in duration as well .

Thanks,
Prasanna

On Tue, Nov 1, 2022 at 11:30 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> If flink version 1.12 also affected ?
>
> Thanks,
> Prasanna.
>
> On Tue, Nov 1, 2022 at 10:40 AM Mason Chen  wrote:
>
>> Hi Tamir and Martjin,
>>
>> We have also noticed this internally. So far, we have found that the
>> *latest* Flink Java 11/Scala 2.12 docker images *1.14, 1.15, and 1.16*
>> are affected, which all have the *openssl 3.0.2 *dependency. It would be
>> good to discuss an emergency release when this patch comes out tomorrow, as
>> it is the highest priority level from their severity rating.
>>
>> Best,
>> Mason
>>
>> On Mon, Oct 31, 2022 at 1:10 PM Martijn Visser 
>> wrote:
>>
>>> Hi Tamir,
>>>
>>> That depends on a) if Flink is vulnerable and b) if yes, how vulnerable
>>> that would be.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> Op ma 31 okt. 2022 om 19:22 schreef Tamir Sagi <
>>> tamir.s...@niceactimize.com>
>>>
>>>> Hey all,
>>>>
>>>> Following that link
>>>>
>>>> https://mta.openssl.org/pipermail/openssl-announce/2022-October/000238.html
>>>>
>>>> due to critical vulnerability , there will be an important release of
>>>> OpenSSl v3.0.7 tomorrow November 1st.
>>>>
>>>> Is there any plan to update Flink with the newest version?
>>>>
>>>> Thanks.
>>>> Tamir
>>>>
>>>>
>>>> Confidentiality: This communication and any attachments are intended
>>>> for the above-named persons only and may be confidential and/or legally
>>>> privileged. Any opinions expressed in this communication are not
>>>> necessarily those of NICE Actimize. If this communication has come to you
>>>> in error you must take no action based on it, nor must you copy or show it
>>>> to anyone; please delete/destroy and inform the sender by e-mail
>>>> immediately.
>>>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>>>> Viruses: Although we have taken steps toward ensuring that this e-mail
>>>> and attachments are free from any virus, we advise that in keeping with
>>>> good computing practice the recipient should ensure they are actually virus
>>>> free.
>>>>
>>> --
>>> Martijn
>>> https://twitter.com/MartijnVisser82
>>> https://github.com/MartijnVisser
>>>
>>


Re: [Security] - Critical OpenSSL Vulnerability

2022-11-01 Thread Prasanna kumar
If flink version 1.12 also affected ?

Thanks,
Prasanna.

On Tue, Nov 1, 2022 at 10:40 AM Mason Chen  wrote:

> Hi Tamir and Martjin,
>
> We have also noticed this internally. So far, we have found that the
> *latest* Flink Java 11/Scala 2.12 docker images *1.14, 1.15, and 1.16*
> are affected, which all have the *openssl 3.0.2 *dependency. It would be
> good to discuss an emergency release when this patch comes out tomorrow, as
> it is the highest priority level from their severity rating.
>
> Best,
> Mason
>
> On Mon, Oct 31, 2022 at 1:10 PM Martijn Visser 
> wrote:
>
>> Hi Tamir,
>>
>> That depends on a) if Flink is vulnerable and b) if yes, how vulnerable
>> that would be.
>>
>> Best regards,
>>
>> Martijn
>>
>> Op ma 31 okt. 2022 om 19:22 schreef Tamir Sagi <
>> tamir.s...@niceactimize.com>
>>
>>> Hey all,
>>>
>>> Following that link
>>>
>>> https://mta.openssl.org/pipermail/openssl-announce/2022-October/000238.html
>>>
>>> due to critical vulnerability , there will be an important release of
>>> OpenSSl v3.0.7 tomorrow November 1st.
>>>
>>> Is there any plan to update Flink with the newest version?
>>>
>>> Thanks.
>>> Tamir
>>>
>>>
>>> Confidentiality: This communication and any attachments are intended for
>>> the above-named persons only and may be confidential and/or legally
>>> privileged. Any opinions expressed in this communication are not
>>> necessarily those of NICE Actimize. If this communication has come to you
>>> in error you must take no action based on it, nor must you copy or show it
>>> to anyone; please delete/destroy and inform the sender by e-mail
>>> immediately.
>>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>>> Viruses: Although we have taken steps toward ensuring that this e-mail
>>> and attachments are free from any virus, we advise that in keeping with
>>> good computing practice the recipient should ensure they are actually virus
>>> free.
>>>
>> --
>> Martijn
>> https://twitter.com/MartijnVisser82
>> https://github.com/MartijnVisser
>>
>


Re: Question About Histograms

2022-04-04 Thread Prasanna kumar
Anil,

Flink Histograms are actually summaries .. You need to override the
Prometheus Histogram class provided to write it into different buckets to
Prometheus .. Then you can write prom queries to calculate different
quantiles accordingly ...  Checkpointing The histograms is not a
recommended option/solution

Thanks,
Prasanna.

On Tue, 5 Apr 2022, 01:26 Anil K,  wrote:

> Hi,
> I was doing some experimentation using Histograms, had a few questions
> mostly related to fault tolerance and restarts. I am looking for a way to
> calculate p95 over 30days. Since histograms are pushed as a summary into
> prometheus, will not be able to do the aggregation for 30 days at
> Prometheus' side from smaller windows. Also there is a high chance that
> the job may restart in between.
>
> So Are Histograms Checkpointed ? I am looking for a very large
> SlidingTimeWindowReservoir of 30days and I don't want to lose the histogram
> updates till then and start from scratch after restart.
>
> If Histograms are not checkpointed and are reset, are there any other
> possible ways to calculate Histograms/P95 on a large window like 30days?
>
> Thanks & Regards
> Anil
>
>
>
>
>


Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-13 Thread Prasanna kumar
Chesnay Thank you for the clarification.

On Mon, Dec 13, 2021 at 6:55 PM Chesnay Schepler  wrote:

> The flink-shaded-zookeeper jars do not contain log4j.
>
> On 13/12/2021 14:11, Prasanna kumar wrote:
>
> Does Zookeeper have this vulnerability dependency ? I see references to
> log4j in Shaded Zookeeper jar included as part of the flink distribution.
>
> On Mon, Dec 13, 2021 at 1:40 PM Timo Walther  wrote:
>
>> While we are working to upgrade the affected dependencies of all
>> components, we recommend users follow the advisory of the Apache Log4j
>> Community. Also Ververica platform can be patched with a similar approach:
>>
>> To configure the JVMs used by Ververica Platform, you can pass custom
>> Java options via the JAVA_TOOL_OPTIONS environment variable. Add the
>> following to your platform values.yaml, or append to the existing value
>> of JAVA_TOOL_OPTIONS if you are using it already there, then redeploy
>> the platform with Helm:
>> env:
>>- name: JAVA_TOOL_OPTIONS
>>  value: -Dlog4j2.formatMsgNoLookups=true
>>
>>
>> For any questions, please contact us via our support portal.
>>
>> Regards,
>> Timo
>>
>> On 11.12.21 06:45, narasimha wrote:
>> > Folks, what about the veverica platform. Is there any
>> mitigation around it?
>> >
>> > On Fri, Dec 10, 2021 at 3:32 PM Chesnay Schepler > > <mailto:ches...@apache.org>> wrote:
>> >
>> > I would recommend to modify your log4j configurations to set
>> > log4j2.formatMsgNoLookups to true/./
>> > /
>> > /
>> > As far as I can tell this is equivalent to upgrading log4j, which
>> > just disabled this lookup by default.
>> > /
>> > /
>> > On 10/12/2021 10:21, Richard Deurwaarder wrote:
>> >> Hello,
>> >>
>> >> There has been a log4j2 vulnerability made public
>> >> https://www.randori.com/blog/cve-2021-44228/
>> >> <https://www.randori.com/blog/cve-2021-44228/> which is making
>> >> some waves :)
>> >> This post even explicitly mentions Apache Flink:
>> >>
>> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
>> >> <
>> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
>> >
>> >>
>> >> And fortunately, I saw this was already on your radar:
>> >> https://issues.apache.org/jira/browse/FLINK-25240
>> >> <https://issues.apache.org/jira/browse/FLINK-25240>
>> >>
>> >> What would the advice be for flink users? Do you expect to push a
>> >> minor to fix this? Or is it advisable to upgrade to the latest
>> >> log4j2 version manually for now?
>> >>
>> >> Thanks for any advice!
>> >
>> >
>> >
>> >
>> > --
>> > A.Narasimha Swamy
>>
>>
>


Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-13 Thread Prasanna kumar
Does Zookeeper have this vulnerability dependency ? I see references to
log4j in Shaded Zookeeper jar included as part of the flink distribution.

On Mon, Dec 13, 2021 at 1:40 PM Timo Walther  wrote:

> While we are working to upgrade the affected dependencies of all
> components, we recommend users follow the advisory of the Apache Log4j
> Community. Also Ververica platform can be patched with a similar approach:
>
> To configure the JVMs used by Ververica Platform, you can pass custom
> Java options via the JAVA_TOOL_OPTIONS environment variable. Add the
> following to your platform values.yaml, or append to the existing value
> of JAVA_TOOL_OPTIONS if you are using it already there, then redeploy
> the platform with Helm:
> env:
>- name: JAVA_TOOL_OPTIONS
>  value: -Dlog4j2.formatMsgNoLookups=true
>
>
> For any questions, please contact us via our support portal.
>
> Regards,
> Timo
>
> On 11.12.21 06:45, narasimha wrote:
> > Folks, what about the veverica platform. Is there any
> mitigation around it?
> >
> > On Fri, Dec 10, 2021 at 3:32 PM Chesnay Schepler  > > wrote:
> >
> > I would recommend to modify your log4j configurations to set
> > log4j2.formatMsgNoLookups to true/./
> > /
> > /
> > As far as I can tell this is equivalent to upgrading log4j, which
> > just disabled this lookup by default.
> > /
> > /
> > On 10/12/2021 10:21, Richard Deurwaarder wrote:
> >> Hello,
> >>
> >> There has been a log4j2 vulnerability made public
> >> https://www.randori.com/blog/cve-2021-44228/
> >>  which is making
> >> some waves :)
> >> This post even explicitly mentions Apache Flink:
> >>
> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
> >> <
> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
> >
> >>
> >> And fortunately, I saw this was already on your radar:
> >> https://issues.apache.org/jira/browse/FLINK-25240
> >> 
> >>
> >> What would the advice be for flink users? Do you expect to push a
> >> minor to fix this? Or is it advisable to upgrade to the latest
> >> log4j2 version manually for now?
> >>
> >> Thanks for any advice!
> >
> >
> >
> >
> > --
> > A.Narasimha Swamy
>
>


Re: Kryo Serialization issues in Flink Jobs.

2021-11-01 Thread Prasanna kumar
Any thoughts on these ?

Thanks,
Prasanna.

On Sat, Oct 30, 2021 at 7:25 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi ,
>
> We have the following Flink Job that processes records from kafka based on
> the rules we get from S3 files into broadcasted state.
> Earlier we were able to spin a job with any number of task parallelism
> without any issues.
> Recently we made changes to the Broadcast state Structure and it is
> working well upto parallelism of 3.
> If we give parallelism of 4 or more , we end up getting
> serialization exceptions which result in job failure. ( Block 4 as in the
> Image)
> Also If the leader job manager dies and a new one comes up , the other
> jobs are restarted automatically but this job dies of serialization issues.
> But when we start manually with a parallelism <= 3, it is working.
>
> Programmatically this code is working when we tested with all possible
> test cases.
> How do we debug serialization issues that we face.
> I have attached the exception of logs and the code related to it.
>
> Let me know if any more details are required.
>
>
>
>
>
> *KRYO SERIALIZAER INITIALISATON*
>
> Class unmodifiableCollectionsSerializer =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig().addDefaultKryoSerializer(
> unmodifiableCollectionsSerializer,
> UnmodifiableCollectionsSerializer.class
> );
>
>
> *CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)*
>
> public interface EventConfigState {
>
>
> void createOrUpdateState(String key, DataPair dataPair);
>
> List executeRule(InputMessage inputMessage);
>
> Map> getCurrentState();
> }
>
>
> *DERIVED EVENT CONFIG STATE IMPLEMENTATION*
>
> public class DerivedEventConfigState  implements EventConfigState {
>
> Logger logger = LoggerFactory.getLogger(DerivedEventConfigState.class);
> private Map> derivedConfigMap;
>
> public DerivedEventConfigState() {
> derivedConfigMap = new HashMap<>();
> }
>
> public void createOrUpdateState(String key, DataPair dataPair) {
>
> derivedConfigMap.putIfAbsent(key, new HashSet<>());
> if (derivedConfigMap.get(key).contains(dataPair)) {
> derivedConfigMap.get(key).remove(dataPair);
> }
> derivedConfigMap.get(key).add(dataPair);
> }
>
> @Override
> public List executeRule(InputMessage inputMessage) {
>
> String key = inputMessage.getKey();
> List outputMessageList = new ArrayList<>();
>
> if (derivedConfigMap.size() == 0) {
> logger.error("DerivedEventConfigMap is empty");
> return outputMessageList;
> }
>
> if ( derivedConfigMap.get(key) == null) {
> return outputMessageList;
> }
>
> for (DataPair dataPair : derivedConfigMap.get(key)) {
> IRule rule = dataPair.getRule();
> if (rule.isSatisfied(inputMessage)) {
>
> IMessageBuilder messageBuilder =
> MessageBuilderFactory.getMessageBuilder("OutputMessage");
> OutputMessage outputMessage = messageBuilder.build(
> inputMessage,
> dataPair.getEventMessageDefinition()
> );
> outputMessageList.add(outputMessage);
> }
> }
> return outputMessageList;
> }
>
> @Override
> public Map> getCurrentState() {
> return Collections.unmodifiableMap(derivedConfigMap);
> }
>
> @Override
> public String toString() {
> return "DerivedEventConfigState{"
> + "derivedConfigMap=" + derivedConfigMap
> + '}';
> }
> }
>
>
> Attached are three Exceptions thrown rando
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP at
> java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at
> java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.util.FlinkUserCodeCl

Kryo Serialization issues in Flink Jobs.

2021-10-30 Thread Prasanna kumar
Hi ,

We have the following Flink Job that processes records from kafka based on
the rules we get from S3 files into broadcasted state.
Earlier we were able to spin a job with any number of task parallelism
without any issues.
Recently we made changes to the Broadcast state Structure and it is working
well upto parallelism of 3.
If we give parallelism of 4 or more , we end up getting
serialization exceptions which result in job failure. ( Block 4 as in the
Image)
Also If the leader job manager dies and a new one comes up , the other jobs
are restarted automatically but this job dies of serialization issues.
But when we start manually with a parallelism <= 3, it is working.

Programmatically this code is working when we tested with all possible test
cases.
How do we debug serialization issues that we face.
I have attached the exception of logs and the code related to it.

Let me know if any more details are required.





*KRYO SERIALIZAER INITIALISATON*

Class unmodifiableCollectionsSerializer =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(
unmodifiableCollectionsSerializer,
UnmodifiableCollectionsSerializer.class
);


*CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)*

public interface EventConfigState {


void createOrUpdateState(String key, DataPair dataPair);

List executeRule(InputMessage inputMessage);

Map> getCurrentState();
}


*DERIVED EVENT CONFIG STATE IMPLEMENTATION*

public class DerivedEventConfigState  implements EventConfigState {

Logger logger = LoggerFactory.getLogger(DerivedEventConfigState.class);
private Map> derivedConfigMap;

public DerivedEventConfigState() {
derivedConfigMap = new HashMap<>();
}

public void createOrUpdateState(String key, DataPair dataPair) {

derivedConfigMap.putIfAbsent(key, new HashSet<>());
if (derivedConfigMap.get(key).contains(dataPair)) {
derivedConfigMap.get(key).remove(dataPair);
}
derivedConfigMap.get(key).add(dataPair);
}

@Override
public List executeRule(InputMessage inputMessage) {

String key = inputMessage.getKey();
List outputMessageList = new ArrayList<>();

if (derivedConfigMap.size() == 0) {
logger.error("DerivedEventConfigMap is empty");
return outputMessageList;
}

if ( derivedConfigMap.get(key) == null) {
return outputMessageList;
}

for (DataPair dataPair : derivedConfigMap.get(key)) {
IRule rule = dataPair.getRule();
if (rule.isSatisfied(inputMessage)) {

IMessageBuilder messageBuilder =
MessageBuilderFactory.getMessageBuilder("OutputMessage");
OutputMessage outputMessage = messageBuilder.build(
inputMessage,
dataPair.getEventMessageDefinition()
);
outputMessageList.add(outputMessage);
}
}
return outputMessageList;
}

@Override
public Map> getCurrentState() {
return Collections.unmodifiableMap(derivedConfigMap);
}

@Override
public String toString() {
return "DerivedEventConfigState{"
+ "derivedConfigMap=" + derivedConfigMap
+ '}';
}
}


Attached are three Exceptions thrown rando










































*Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP at
java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at
java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Class.forName0(Native
Method) ~[?:?] at java.lang.Class.forName(Unknown Source) ~[?:?] at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)

Re: Flink support for Kafka versions

2021-10-26 Thread Prasanna kumar
Hi ,

We are using Kafka broker version 2.4.1.1.
Also kafka client 2.4.1.1 jar which is part of flink kafka connector
recently was marked with high security issue.
So we excluded the dependency and overriden it with kafka client 2.8.1
client jar and it works fine with the 2.4.1.1 broker. ( since its backward
compatible)

1) If we tried to connect to broker with 2.8.1 version (with kafka client
2.8.1 jar override ) would it work or would it throw errors because of
Scala 2.11?
2) https://issues.apache.org/jira/browse/FLINK-20845 is marked as done but
the fix version is marked as 1.15.0.. Wouldn't this change be available for
1.12.x ?
3) https://issues.apache.org/jira/browse/FLINK-14105 is marked as done but
the fix version is marked as 1.14.0.. Wouldn't this change be available for
1.12.x ?

Thanks,
Prasanna.

On Wed, Apr 21, 2021 at 11:03 PM Arvid Heise  wrote:

> I'm wondering if we could shade scala 1.13 dependencies inside the Kafka
> connector? Then we would be independent of the rather big FLINK-20845.
>
> On Tue, Apr 20, 2021 at 5:54 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Prasanna,
>>
>> It looks like the Kafka 2.5.0 connector upgrade is tied to dropping
>> support for Scala 2.11. The best place to track that would be the ticket
>> for Scala 2.13 support, FLINK-13414 [1], and its subtask FLINK-20845 [2].
>>
>> I have listed FLINK-20845 as a blocker for FLINK-19168 for better
>> visibility.
>>
>> Best,
>> Austin
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-13414
>> [2]: https://issues.apache.org/jira/browse/FLINK-20845
>>
>> On Tue, Apr 20, 2021 at 9:08 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi Flinksters,
>>>
>>> We are researching about if we could use the latest version of kafka
>>> (2.6.1 or 2.7.0)
>>>
>>> Since we are using Flink as a processor , we came across this
>>> https://issues.apache.org/jira/browse/FLINK-19168.
>>>
>>> It says that it does not support version 2.5.0 and beyond.
>>>
>>> That was created 8 months back , just checking if there is any effort on
>>> that front.
>>>
>>> Thanks,
>>> Prasanna
>>>
>>


Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Prasanna kumar
Yes you are right.

We tested recently to find that the flink jobs do not pick up the new
topics that got created with the same pattern provided to flink kafka
consumer.  The topics are set only during the start of the jobs.

Prasanna.

On Fri, 15 Oct 2021, 05:44 Preston Price,  wrote:

> Okay so topic discovery is possible with topic patterns, and maybe topic
> lists. However I don't believe it's possible to change the configured topic
> list, or topic pattern after the source is created.
>
> On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu  wrote:
>
>> There is a setting for dynamic topic discovery
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery
>>
>> Best,
>>
>> Denis
>>
>> On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu 
>> wrote:
>>
>>> Hi,
>>>
>>> In my experience with the librdkafka client and the Go wrapper, the
>>> topic-pattern subscribe is reactive. The Flink Kafka connector might behave
>>> similarly.
>>>
>>> Best,
>>> Denis
>>>
>>> On Fri, Oct 15, 2021 at 12:34 AM Preston Price 
>>> wrote:
>>>
 No, the topic-pattern won't work for my case. Topics that I should
 subscribe to can be enabled/disabled based on settings I read from another
 system, so there's no way to craft a single regular expression that would
 fit the state of all potential topics. Additionally the documentation you
 linked seems to suggest that the regular expression is evaluated only once
 "when the job starts running". My understanding is it would not pick up new
 topics that match the pattern after the job starts.


 On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng 
 wrote:

> Hi!
>
> I suppose you want to read from different topics every now and then?
> Does the topic-pattern option [1] in Table API Kafka connector meet your
> needs?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>
> Preston Price  于2021年10月14日周四 上午1:34写道:
>
>> The KafkaSource, and KafkaSourceBuilder appear to prevent users from
>> providing their own KafkaSubscriber. Am I overlooking something?
>>
>> In my case I have an external system that controls which topics we
>> should be ingesting, and it can change over time. I need to add, and 
>> remove
>> topics as we refresh configuration from this external system without 
>> having
>> to stop and start our Flink job. Initially it appeared I could accomplish
>> this by providing my own implementation of the `KafkaSubscriber` 
>> interface,
>> which would be invoked periodically as configured by the `
>> partition.discovery.interval.ms` property. However there is no way
>> to provide my implementation to the KafkaSource since the constructor for
>> KafkaSource is package protected, and the KafkaSourceBuilder does not
>> supply a way to provide the `KafkaSubscriber`.
>>
>> How can I accomplish a period refresh of the topics to ingest?
>>
>> Thanks
>>
>>
>>
>>>
>>> --
>>> Regards,
>>> Denis Nutiu
>>>
>>
>>
>> --
>> Regards,
>> Denis Nutiu
>>
>


Does Flink 1.12.2 support Zookeeper version 3.6+

2021-09-30 Thread Prasanna kumar
Hi ,

Does Flink 1.12.2 support Zookeeper version 3.6+ ?

If we add  zookeeper version 3.6 jar in the flink image ,would it be able
to connect ?

The following link mentions only zk 3.5 or 3.4
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/#zookeeper-versions


Thanks,
Prasanna.


Exploring Flink for a HTTP delivery service.

2021-08-14 Thread Prasanna kumar
Hi,

Aim: Building an event delivery
service
Scale : Peak load 50k messages/sec.
Average load 5k messages/sec Expected to grow every passing month
Unique Customer Endpoints : 10k+
Unique events(kafka topics)  : 500+
Unique tenants  : 30k+
Subscription Level : Events are generated for tenants.
Customers may subscribe a)
entirely to an event or b) either at tenant level ( 5 tenants or 100
tenants) or c) even at sub-tenant level. ( Tenant 2. Dept 100,200,300)
*Other Requirements *:
1) Batching events based on quantity or minimum threshold time whichever
comes first . Example 1000 messages or 1 sec.
2) Message size < 1kb

*Possible Solutions:*

1) Build an app using reactive programming say vert.x/spring reactive etc
2) Use apache flink

*Flink Solution *
RDS : Has the subscription connection details

[image: Flink HTTP Publisher.png]

2a ) Use DB and Look UP Cache to retrieve Configurations

(i)   Stream the data from kafka
(ii)  For every message flowing in , query RDS(postgres) ,get the
connection/subscription details, and apply filters. [Use lookup Cache to
improve performance]
(iii a)  if it's a streaming customer , form the message with appropriate
authentication details.
(iii b) if it's a batch customer, push the message to the state backend.
Once maximum message or minimum threshold batch time is reached, retrieve
the messages and form a single batch message with appropriate
authentication details.
(iv) Send message and endpoint info to async sink. which delivers to
customers. In case of failure write to a dead letter queue where customers
can poll later.,


2b ) Load Configurations to BroadCastState and Update it in a regular
interval

(i) Stream the data from kafka
(iI) Periodically query the PROXY API (on top of RDS) to get the latest
added/updated subscription/connection details .
(iii) For every message flowing in from kafka , Check against the
broadcasted configuration to find the customers subscribed for the event,
their filter requirement and connection details.
(iv a)  if it's a streaming customer , form the message with appropriate
authentication details.
(v b) if it's a batch customer, push the message to the state backend. Once
maximum message or minimum threshold batch time is reached, retrieve
the messages and form a single batch message with appropriate
authentication details.
(vi) Send message and endpoint info to async sink. which delivers to
customers. In case of failure write to a dead letter queue where customers
can poll later.

*Questions : *
1) Batching is an aggregation operation.But what I have seen in the
examples of windowing is that they get the count/max/min operation in the
particular window.  So could the batching be implemented via a windowing
mechanism ?

2) Is it a good design to have both batched delivery and per-event delivery
in the same job or should it be different ?

2) Does the performance of broadcasted state better than LookUp Cache?
(Personally i have implemented broadcasted state for other purpose and not
sure about the performance of Querying DB+LookUpCache)

3) I read this
" The working copy of broadcast state is always on the heap; not in
RocksDB. So, it has to be small enough to fit in memory. Furthermore, each
instance will copy all of the broadcast state into its checkpoints, so all
checkpoints and savepoints will have *n* copies of the broadcast state
(where *n* is the parallelism).
If you are able to key partition this data, then you might not need to
broadcast it. It sounds like it might be per-employee data that could be
keyed by the employeeId. But if not, then you'll have to keep it small
enough to fit into memory. "

Using Keyed Process BroadCast looks Better than using non keyed as the same
data is not replicated against all the parallel operators.
A caveat here is that the load across all subscriptions are not the same .
So if we key the stream , then we might have unbalanced job running.
Thoughts on this ?

4) Latency must be minimal , so the first thought is to store the
messages to be batched in HashMapStateBackend.
But to store both the State of config and the data in HEAP might increase
the memory usage a lot. Since there could be a huge spike in load.Are
there any other things that need to be factored in ?

5) Auto Scaling capability would save a lot of cost because of consistent
load patterns with occasional spikes. Though reactive scaling is introduced
in flink 1.13 , we don't know whether its battle hardened .

6) After looking at the solutions , does flink seem to be a natural fit for
this use case in comparison to Spring Reactor framework/vert.x ?
One thing we see from the documentation is that spring reactive can auto
scale very well but we need to work on fault tolerance/stability from the
dev side which flink is good at.

Spring reactor is new to us and we wanted to 

Re: Topic assignment across Flink Kafka Consumer

2021-08-04 Thread Prasanna kumar
Robert

We are checking using the metric
flink_taskmanager_job_task_operator_KafkaConsumer_assigned_partitions{jobname="SPECIFICJOBNAME"}

This metric gives the number of partitions assigned to each task(kafka
consumer operator).

Prasanna.


On Wed, Aug 4, 2021 at 8:59 PM Robert Metzger  wrote:

> Hi Prasanna,
>
> How are you checking the assignment of Kafka partitions to the consumers?
>
> The FlinkKafkaConsumer doesn't have a rebalance() method, this is a
> generic concept of the DataStream API. Is it possible that you are
> somehow partitioning your data in your Flink job, and this is causing the
> data distribution issues you are observing?
>
>
> On Wed, Aug 4, 2021 at 4:00 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Robert
>>
>> When we apply a rebalance method to the kafka consumer, it is assigning
>> partitions of various topics evenly.
>>
>> But my only concern is that the rebalance method might have a performance
>> impact .
>>
>> Thanks,
>> Prasanna.
>>
>>
>> On Wed, Aug 4, 2021 at 5:55 PM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Robert,
>>>
>>> Flink version 1.12.2.
>>> Flink connector Kafka Version 2..12
>>>
>>> The partitions are assigned equally if we are reading from a single
>>> topic.
>>>
>>> Our Use case is to read from multiple topics [topics r4 regex pattern]
>>> we use 6 topics and 1 partition per topic for this job.
>>>
>>> In this case , few of the kafka consumer tasks are not allocated.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>> On Tue, 20 Jul 2021, 17:44 Robert Metzger,  wrote:
>>>
>>>> Hi Prasanna,
>>>> which Flink version and Kafka connector are you using? (the
>>>> "KafkaSource" or "FlinkKafkaConsumer"?)
>>>>
>>>> The partition assignment for the FlinkKafkaConsumer is defined here:
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>>>>
>>>>
>>>> I assume all your topics have one partition only. Still, the
>>>> "startIndex" should be determined based on the hash of the topic name. My
>>>> only explanation is that your unlucky with the distribution of the hashes.
>>>> If this leads to performance issues, consider using topics with
>>>> multiple partitions, change the name of the topics or increase the
>>>> parallelism of your consumer.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
>>>> prasannakumarram...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have a Flink job reading from multiple Kafka topics based on a
>>>>> regex pattern.
>>>>>
>>>>> What we have found out is that the topics are not shared between the
>>>>> kafka consumers in an even manner .
>>>>>
>>>>> Example if there are 8 topics and 4 kafka consumer operators . 1
>>>>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>>>>> consumer is not assigned at all.
>>>>>
>>>>> This leads to inadequate usage of the resources.
>>>>>
>>>>> I could not find any setting/configuration which would make them as
>>>>> even as possible.
>>>>>
>>>>> Let me know if there's a way to do the same.
>>>>>
>>>>> Thanks,
>>>>> Prasanna.
>>>>>
>>>>


Re: Topic assignment across Flink Kafka Consumer

2021-08-04 Thread Prasanna kumar
Robert

When we apply a rebalance method to the kafka consumer, it is assigning
partitions of various topics evenly.

But my only concern is that the rebalance method might have a performance
impact .

Thanks,
Prasanna.


On Wed, Aug 4, 2021 at 5:55 PM Prasanna kumar 
wrote:

> Robert,
>
> Flink version 1.12.2.
> Flink connector Kafka Version 2..12
>
> The partitions are assigned equally if we are reading from a single topic.
>
> Our Use case is to read from multiple topics [topics r4 regex pattern] we
> use 6 topics and 1 partition per topic for this job.
>
> In this case , few of the kafka consumer tasks are not allocated.
>
> Thanks,
> Prasanna.
>
> On Tue, 20 Jul 2021, 17:44 Robert Metzger,  wrote:
>
>> Hi Prasanna,
>> which Flink version and Kafka connector are you using? (the "KafkaSource"
>> or "FlinkKafkaConsumer"?)
>>
>> The partition assignment for the FlinkKafkaConsumer is defined here:
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>>
>>
>> I assume all your topics have one partition only. Still, the "startIndex"
>> should be determined based on the hash of the topic name. My only
>> explanation is that your unlucky with the distribution of the hashes.
>> If this leads to performance issues, consider using topics with multiple
>> partitions, change the name of the topics or increase the parallelism of
>> your consumer.
>>
>>
>>
>>
>> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We have a Flink job reading from multiple Kafka topics based on a regex
>>> pattern.
>>>
>>> What we have found out is that the topics are not shared between the
>>> kafka consumers in an even manner .
>>>
>>> Example if there are 8 topics and 4 kafka consumer operators . 1
>>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>>> consumer is not assigned at all.
>>>
>>> This leads to inadequate usage of the resources.
>>>
>>> I could not find any setting/configuration which would make them as even
>>> as possible.
>>>
>>> Let me know if there's a way to do the same.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>


Re: Topic assignment across Flink Kafka Consumer

2021-08-04 Thread Prasanna kumar
Robert,

Flink version 1.12.2.
Flink connector Kafka Version 2..12

The partitions are assigned equally if we are reading from a single topic.

Our Use case is to read from multiple topics [topics r4 regex pattern] we
use 6 topics and 1 partition per topic for this job.

In this case , few of the kafka consumer tasks are not allocated.

Thanks,
Prasanna.

On Tue, 20 Jul 2021, 17:44 Robert Metzger,  wrote:

> Hi Prasanna,
> which Flink version and Kafka connector are you using? (the "KafkaSource"
> or "FlinkKafkaConsumer"?)
>
> The partition assignment for the FlinkKafkaConsumer is defined here:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>
>
> I assume all your topics have one partition only. Still, the "startIndex"
> should be determined based on the hash of the topic name. My only
> explanation is that your unlucky with the distribution of the hashes.
> If this leads to performance issues, consider using topics with multiple
> partitions, change the name of the topics or increase the parallelism of
> your consumer.
>
>
>
>
> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi,
>>
>> We have a Flink job reading from multiple Kafka topics based on a regex
>> pattern.
>>
>> What we have found out is that the topics are not shared between the
>> kafka consumers in an even manner .
>>
>> Example if there are 8 topics and 4 kafka consumer operators . 1
>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>> consumer is not assigned at all.
>>
>> This leads to inadequate usage of the resources.
>>
>> I could not find any setting/configuration which would make them as even
>> as possible.
>>
>> Let me know if there's a way to do the same.
>>
>> Thanks,
>> Prasanna.
>>
>


Topic assignment across Flink Kafka Consumer

2021-07-19 Thread Prasanna kumar
Hi,

We have a Flink job reading from multiple Kafka topics based on a regex
pattern.

What we have found out is that the topics are not shared between the kafka
consumers in an even manner .

Example if there are 8 topics and 4 kafka consumer operators . 1
consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
consumer is not assigned at all.

This leads to inadequate usage of the resources.

I could not find any setting/configuration which would make them as even as
possible.

Let me know if there's a way to do the same.

Thanks,
Prasanna.


Re: Metric counter gets reset when leader jobmanager changes in Flink native K8s HA solution

2021-06-14 Thread Prasanna kumar
amit,

This is expected behaviour from counter . If the total count irrespective
of the restarts needed to be found, aggregate functions need to be applied
on the counter . Example  sum(Rate(counter))
https://prometheus.io/docs/prometheus/latest/querying/functions/

Prasanna.

On Tue, Jun 15, 2021 at 8:25 AM Amit Bhatia 
wrote:

> Hi,
>
> We have configured jobmanager HA with flink 1.12.1 on the k8s environment.
> We have implemented a HA solution using Native K8s HA solution (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink).
> We have used deployment controller for both jobmanager & taskmanager pods.
>
> So whenever a leader jobmanager crashes and the same jobmanager becomes
> leader again then everything works fine but whenever a leader jobmanager
> crashes and some other standby jobmanager becomes leader then metric count
> gets reset and it starts the request count again from 1. Is it the expected
> behaviour ? or is there any specific configuration required so that even if
> the leader jobmanager changes then instead of resetting the metric count it
> continues the count.
>
> Regards,
> Amit
>


Sometimes Counter Metrics getting Stuck and not increasing

2021-05-21 Thread Prasanna kumar
Hi,

We are publishing around 200 kinds of events for 15000 customers.
Source Kafka Topics , Sink Amazon SNS Topic.
We are collecting metrics in the following combination [Event , Consumer,
PublishResult].  (Publish Result could be published or error).
So Metrics count is in the order of 200*15000*2 = 6 million and the growth
rate is expected around 20% per year

We are using Prometheus reporter for scraping metrics.
Yesterday when i found that for one customer the metric got stuck at a
particular value.
But while printing the value of "counterEventPublishedMapKey" in logs , we
are getting correct increasing values.
There are no other Warning Or Error in the logs.
Other similar metrics are being scrapped without any issues.

The Performance of the overall pipeline is good and we don't see any other
issues.
JM Memory is 4Gb (Metaspace is 1gb)
TM Memory is 4gb (Heap is 3gb)
Flink Version 1.12.2
Ran with Parallelism of 2.

There are other jobs where we have used similar structure to publish
metrics through RichMap Function and they are running successfully.

1) Can we rely on the metrics ported in this fashion to report ?
2) Has anyone faced this kind of scenario before ?
3) What do we do if a particular counter metric alone gets stuck like this
(Say other counter metrics are working).
4) Could you point out if the code structure is the reason for the same ?



public class SNSPublisher implements EventJobs {

  private static FlinkKafkaConsumer
getFlinkKafkaConsumer(ParameterTool configParams) {
...
  }

  @Override
  public void execute(ParameterTool configParams) throws Exception {


final StreamExecutionEnvironment env =
KafkaUtil.createStreamExecutionEnvironment(configParams);

// Enabling Checkpointing

env.enableCheckpointing(1000);
String checkpointingDirectory =
configParams.get(AppConstant.CHECKPOINTING_DIRECTORY);
env.setStateBackend(new FsStateBackend(checkpointingDirectory,true));
Class unmodifiableCollectionsSerializer =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodifiableCollectionsSerializer,
UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();

config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

FlinkKafkaConsumer flinkKafkaConsumer = getFlinkKafkaConsumer(configParams);

DataStream> inputStream =
env.addSource(flinkKafkaConsumer)
.uid(configParams.get(AppConstant.STATE_KAFKA_SOURCE_UUID))
.name(AppConstant.SOURCE);
DataStream eventStream = inputStream
.map((MapFunction, Event>) value ->
value.getField(1));
SNSMessagePublisherFunction snsMessagePublisherFunction = new
SNSMessagePublisherFunction(configParams);

SNSMessagePublisher snsMessagePublisher = new
SNSMessagePublisherImpl(snsMessagePublisherFunction);

DataStream> result =
snsMessagePublisher.publish(eventStream);

result
.keyBy(resultTuple -> getMapEventCounterKey(resultTuple))
.map(
new RichMapFunction, String>() {

  private transient MapState
counterEventPublishedMapState;

  @Override
  public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor stateDescriptor =
new MapStateDescriptor(
"counterEventPublishedMapState",
String.class,
Counter.class);

counterEventPublishedMapState =
getRuntimeContext().getMapState(stateDescriptor);
  }

  @Override
  public String map(Tuple2 tuple2) throws Exception {

Event event = tuple2.getField(1);

MetricGroup metricGroup = getRuntimeContext().getMetricGroup();

Counter counter;
String counterName = getResultofSNSPublish(tuple2);
String counterEventMapKey = getMapEventCounterKey(tuple2);

if
(counterEventPublishedMapState.contains(counterEventMapKey)) {
  counter =
counterEventPublishedMapState.get(counterEventMapKey);
} else {
  counter =
  metricGroup
  .addGroup(AppConstant.EVENT, event.fetchEventType())
  .addGroup(AppConstant.CONSUMER,
event.fetchConsumerID())
  .counter(counterName);
}
counter.inc();
Long counterValue = counter.getCount();
counterEventPublishedMapState.put(counterEventMapKey, counter);

return new StringBuilder("counterEventPublishedMapKey:")
.append(counterEventMapKey)
.append(AppConstant.COLON)
  

Presence of Jars in Flink reg security

2021-05-04 Thread Prasanna kumar
Hi Flinksters,

Our repo which is a maven based java project(flink) went through SCA
scan using WhiteSource tool and following are the HIGH severity issues
reported. The target vulnerable jar is not found when we build the
dependency tree of the project.

Could any one let us know if flink uses these anywhere.

+--++
| Library  | Severity   |
+==++
| xercesImpl-2.9.1.jar | HIGH   |
+--++
- Artifact ID: xercesImpl
- Group ID: xerces
- Library Version: 2.9.1
- Library Path:
/var/lib/jenkins/workspace/branch/latest/?/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: XMLscanner.java in Apache Xerces2 Java Parser before
2.12.0, as used in the Java Runtime Environment (JRE) in IBM Java 5.0
before 5.0 SR16-FP3, 6 before 6 SR14, 6.0.1 before 6.0.1 SR6, and 7
before 7 SR5 as well as Oracle Java SE 7u40 and earlier, Java SE 6u60
and earlier, Java SE 5.0u51 and earlier, JRockit R28.2.8 and earlier,
JRockit R27.7.6 and earlier, Java SE Embedded 7u40 and earlier, and
possibly other products allows remote attackers to cause a denial of
service via vectors related to XML attribute names.
- Suggested Fix: Upgrade to version xerces:xercesImpl:Xerces-J_2_12_0


+---++
| Library   | Severity   |
+===++
| struts-core-1.3.8.jar | HIGH   |
+---++
- Artifact ID: struts-core
- Group ID: org.apache.struts
- Library Version: 1.3.8
- Library Path:
/var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/apache/struts/struts-core/1.3.8/struts-core-1.3.8.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: ActionServlet.java in Apache Struts 1 1.x through
1.3.10 does not properly restrict the Validator configuration, which
allows remote attackers to conduct cross-site scripting (XSS) attacks
or cause a denial of service via crafted input, a related issue to
CVE-2015-0899.
- Suggested Fix: Replace or update the following file: 
ActionServlet.java

+--++
| Library  | Severity   |
+==++
| plexus-utils-3.0.jar | HIGH   |
+--++
- Artifact ID: plexus-utils
- Group ID: org.codehaus.plexus
- Library Version: 3.0
- Library Path:
/var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/codehaus/plexus/plexus-utils/3.0/plexus-utils-3.0.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: Security vulnerability found in plexus-utils before
3.0.24. XML injection found in XmlWriterUtil.java.
- Suggested Fix: Upgrade to version 3.0.24

Thanks,

Prasanna.


Flink support for Kafka versions

2021-04-20 Thread Prasanna kumar
Hi Flinksters,

We are researching about if we could use the latest version of kafka (2.6.1
or 2.7.0)

Since we are using Flink as a processor , we came across this
https://issues.apache.org/jira/browse/FLINK-19168.

It says that it does not support version 2.5.0 and beyond.

That was created 8 months back , just checking if there is any effort on
that front.

Thanks,
Prasanna


Flink Metrics

2021-02-28 Thread Prasanna kumar
Hi flinksters,

Scenario: We have cdc messages from our rdbms(various tables) flowing to
Kafka.  Our flink job reads the CDC messages and creates events based on
certain rules.

I am using Prometheus  and grafana.

Following are there metrics that i need to calculate

A) Number of CDC messages wrt to each table.
B) Number of events created wrt to each event type.
C) Average/P99/P95 Latency (event created ts - ccd operation ts)

For A and B, I created counters and able to see the metrices flowing into
Prometheus . Few questions I have here.

1) How to create labels for counters in flink ? I did not find any easier
method to do it . Right now I see that I need to create counters for each
type of table and events . I referred to one of the community discussions.
[1] . Is there any way apart from this ?

2) When the job gets restarted , the counters get back to 0 . How to
prevent that and to get continuity.

For C , I calculated latency in code for each event and assigned  it to
histogram.  Few questions I have here.

3) I read in a few blogs [2] that histogram is the best way to get
latencies. Is there any better idea?

4) How to create buckets for various ranges? I also read in a community
email that flink implements  histogram as summaries.  I also should be able
to see the latencies across timelines .

[1]
https://stackoverflow.com/questions/58456830/how-to-use-multiple-counters-in-flink
[2] https://povilasv.me/prometheus-tracking-request-duration/

Thanks,
Prasanna.


Re: Using Prometheus Client Metrics in Flink

2021-02-27 Thread Prasanna kumar
Rion,

Regarding the second question , you can aggregate by using sum function
sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the
metric counter.

Prasanna.

On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:

> Hi folks,
>
> I’ve just recently started working with Flink and I was in the process of
> adding some metrics through my existing pipeline with the hopes of building
> some Grafana dashboards with them to help with observability.
>
> Initially I looked at the built-in Flink metrics that were available, but
> I didn’t see an easy mechanism for setting/using labels with them.
> Essentially, I have two properties for my messages coming through the
> pipeline that I’d like to be able to keep track of (tenant/source) across
> several metrics (e.g. total_messages with tenant / source labels, etc.). I
> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a
> good pattern for handling these.
>
> I had previously used the Prometheus Client metrics [0] to accomplish this
> in the past but I wasn’t entirely sure how it would/could mesh with Flink.
> Does anyone have experience in working with these or know if they are
> supported?
>
> Secondly, when using the Flink metrics, I noticed I was receiving a
> separate metric for each task that was being spun up. Is there an “easy
> button” to handle aggregating these to ensure that a single metric (e.g.
> total_messages) reflects the total processed across all of the tasks
> instead of each individual one?
>
> Any recommendations / resources / advice would be greatly appreciated!
>
> Thanks,
>
> Rion
>
> [0] : https://prometheus.io/docs/instrumenting/clientlibs/
>


Re: Routing events to different kafka topics dynamically

2020-12-03 Thread Prasanna kumar
Thanks Till,

Able to deduce topics by extending the KafkaSerializarion Schema class.

Prasanna.

On Wed, Dec 2, 2020 at 11:18 PM Till Rohrmann  wrote:

> Hi Prasanna,
>
> I believe that what Aljoscha suggestd in the linked discussion is still
> the best way to go forward. Given your description of the problem this
> should actually be pretty straightforward as you can deduce the topic from
> the message. Hence, you just need to create the ProducerRecord with the
> right target topic you extracted from the record/message.
>
> Cheers,
> Till
>
> On Wed, Dec 2, 2020 at 5:28 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi,
>>
>> Events need to be routed to different kafka topics dynamically based upon
>> some info in the message.
>>
>> We have implemented using KeyedSerializationSchema similar to
>> https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
>> But its deprecated and we cannot use it for production.
>>
>> I looked at the alternative KafkaSerializationSchema but there i do not
>> see an option there.
>>
>> Then i stumbled upon this
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
>> which asks us to use KafkaContextAware.
>>
>> Is there a more intuitive/easier way to do the same ?
>>
>> Thanks,
>> Prasanna.
>>
>>
>>


Routing events to different kafka topics dynamically

2020-12-02 Thread Prasanna kumar
Hi,

Events need to be routed to different kafka topics dynamically based upon
some info in the message.

We have implemented using KeyedSerializationSchema similar to
https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
But its deprecated and we cannot use it for production.

I looked at the alternative KafkaSerializationSchema but there i do not see
an option there.

Then i stumbled upon this
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
which asks us to use KafkaContextAware.

Is there a more intuitive/easier way to do the same ?

Thanks,
Prasanna.


Re: Caching

2020-11-26 Thread Prasanna kumar
Navneeth,

Thanks for posting this question.

This looks like our future scenario where we might end up with.

We are working on a Similar problem statement with two differences.

1) The cache items would not change frequently say max of once per month or
few times per year and the number of entities in cache would not be more
than 1000. (Say Java objects)

2) The Eventload we look at is around 10-50k/sec.

We are using broadcast mechanism for the same.

Prasanna.

On Thu 26 Nov, 2020, 14:01 Navneeth Krishnan, 
wrote:

> Hi All,
>
> We have a flink streaming job processing around 200k events per second.
> The job requires a lot of less frequently changing data (sort of static but
> there will be some changes over time, say 5% change once per day or so).
> There are about 12 caches with some containing approximately 20k
> entries whereas a few with about 2 million entries.
>
> In the current implementation we are using in-memory lazy loading static
> cache to populate the data and the initialization happens in open function.
> The reason to choose this approach is because we have allocated around 4GB
> extra memory per TM for these caches and if a TM has 6 slots the cache can
> be shared.
>
> Now the issue we have with this approach is everytime when a container is
> restarted or a new job is deployed it has to populate the cache again.
> Sometimes this lazy loading takes a while and it causes back pressure as
> well. We were thinking to move this logic to the broadcast stream but since
> the data has to be stored per slot it would increase the memory consumption
> by a lot.
>
> Another option that we were thinking of is to replace the current near far
> cache that uses rest api to load the data to redis based near far cache.
> This will definitely reduce the overall loading time but still not the
> perfect solution.
>
> Are there any recommendations on how this can be achieved effectively?
> Also how is everyone overcoming this problem?
>
> Thanks,
> Navneeth
>
>


Dynamic Kafka Source

2020-09-26 Thread Prasanna kumar
Hi,

My requirement has been captured by the following stack overflow question.

https://stackoverflow.com/questions/61876849/custom-kafka-source-on-apache-flink

Could anyone take a shot at it ?

Thanks,
Prasanna.


Re: SDK vs Connectors

2020-08-23 Thread Prasanna kumar
Thanks for the Reply Yun,

I see that when I publish the messages to SNS from map operator, in case of
any errors I find the checkpointing mechanism takes care of "no data loss".

One scenario I could not replicate is that, the method from SDK unable to
send messages to SNS but remains silent not throwing any
errors/exceptions.In this case we may not confirm "at least once guarantee"
of delivery of messages.

Prasanna.

On Sun 23 Aug, 2020, 07:51 Yun Gao,  wrote:

> Hi Prasanna,
>
>1) Semantically both a) and b) would be Ok. If the Custom sink could be
> chained with the map operator (I assume the map operator is the
> "Processing" in the graph), there should be also no much difference
> physically, if they could not chain, then writting a custom sink would
> cause another pass of network transferring, but the custom sink would be
> run in a different thread, thus much more computation resources could be
> exploited.
>2) To achieve at-least-once, you need to implment the
> "CheckpointedFunction" interface, and ensures flushing all the data to the
> outside systems when snapshotting states. Since if the checkpointing
> succeed, the previous data will not be replayed after failover, thus these
> pieces of data need to be ensured written out before the checkpoint
> succeeds.
>3) From my side I don't think there are significant disadvantages of
> writing custom sink functions.
>
> Best,
>  Yun
>
>
> --
> Sender:Prasanna kumar
> Date:2020/08/22 02:00:51
> Recipient:user; 
> Theme:SDK vs Connectors
>
> Hi Team,
>
> Following is the pipeline
> Kafka => Processing => SNS Topics .
>
> Flink Does not provide a SNS connector out of the box.
>
> a) I implemented the above by using AWS SDK and published the messages in
> the Map operator itself.
> The pipeline is working well. I see messages flowing to SNS topics.
>
> b) Another approach is that I could write a custom sink function and still
> publish to SNS using SDK in this stage.
>
> Questions
> 1) What would be the primary difference between approach a) and b). Is
> there any significant advantage of one over the other ?
>
> 2) Would at least once guarantee be confirmed if we follow the above
> approach?
>
> 3) Would there be any significant disadvantages(rather what we need to be
> careful ) of writing our custom sink functions ?
>
> Thanks,
> Prasanna.
>
>


Re: AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-21 Thread Prasanna kumar
Manas,

One option you could try is to set the scope in the dependencies as 
compile for the required artifacts rather than provided.

Prasanna.

On Fri, Aug 21, 2020 at 1:47 PM Chesnay Schepler  wrote:

> If this class cannot be found on the classpath then chances are Flink is
> completely missing from the classpath.
>
> I haven't worked with EMR, but my guess is that you did not submit things
> correctly.
>
> From the EMR documentation I could gather that the submission should work
> without the submitted jar bundling all of Flink;
>
> given that you jar works in a local cluster that part should not be the
> problem.
>
> On 21/08/2020 08:16, Manas Kale wrote:
>
> Hi,
> I am trying to deploy a Flink jar on AWS EMR service. I have ensured that
> Flink v1.10.0 is used in my pom file as that's the version supported by
> EMR. However, I get the following error:
>
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/api/java/typeutils/ResultTypeQueryable
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>   at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>   at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at org.apache.hadoop.util.RunJar.run(RunJar.java:232)
>   at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.api.java.typeutils.ResultTypeQueryable
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>   ... 15 more
>
> Also, if I deploy this on my local Flink cluster (v1.10.1) it works.
> I'm not sure what could be the cause. Could it be because of misconfigured
> classes bundled in the final JAR file or something that was patched in v
> 1.10.1?
>
>
>


SDK vs Connectors

2020-08-21 Thread Prasanna kumar
Hi Team,

Following is the pipeline
Kafka => Processing => SNS Topics .

Flink Does not provide a SNS connector out of the box.

a) I implemented the above by using AWS SDK and published the messages in
the Map operator itself.
The pipeline is working well. I see messages flowing to SNS topics.

b) Another approach is that I could write a custom sink function and still
publish to SNS using SDK in this stage.

Questions
1) What would be the primary difference between approach a) and b). Is
there any significant advantage of one over the other ?

2) Would at least once guarantee be confirmed if we follow the above
approach?

3) Would there be any significant disadvantages(rather what we need to be
careful ) of writing our custom sink functions ?

Thanks,
Prasanna.


Flink Sinks

2020-07-17 Thread Prasanna kumar
Hi ,

I did not find out of box flink sink connector for http and SQS mechanism.

Has anyone implemented it?
Wanted to know if we are writing a custom sink function  , whether  it
would affect semantic exactly one guarantees ?


Thanks ,
Prasanna


Re: Performance test Flink vs Storm

2020-07-16 Thread Prasanna kumar
Hi,

After making the task.managed. Memory. fraction as 0 , i see that JVM heap
memory increased from 512 mb to 1 GB.

Earlier I was getting a maximum of 4-6k per second throughput on Kafka
source for ingestion rate of 12k+/second. Now I see that improved to 11k
per task(parallelism of 1) and 16.5k+ second when run with parallelism of
2. (8.25k per task)..

The maximum memory used during the run was 500 mb of heap space.

>From this exercise , I understand that increasing JVM memory would directly
support/increase throughout. Am i correct?

Our goal is to test for 100k ingestion per second and try to calculate cost
for 1 million per second ( hope it's linear relation)

I also saw the CPU utilisation peaked to 50% during the same.

1) Let me know what you think of the same, as I would continue to test.

2) Is there a benchmark for number of records handled per Kafka connector
task for a particular JVM heap number.

Thanks,
Prasanna

On Fri 17 Jul, 2020, 06:18 Xintong Song,  wrote:

> *I had set Checkpoint to use the Job manager backend.*
>
> Jobmanager backend also runs in JVM heap space and does not use managed
> memory. Setting managed memory fraction to 0 will give you larger JVM heap
> space, thus lesser GC pressure.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jul 16, 2020 at 10:38 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>>
>> Xintong Song,
>>
>>
>>- Which version of Flink is used?*1.10*
>>- Which deployment mode is used? *Standalone*
>>- Which cluster mode is used? *Job*
>>- Do you mean you have a 4core16gb node for each task manager, and
>>each task manager has 4 slots? *Yeah*. *There are totally 3
>>taskmanagers in the cluster.  2TMs are t2.medium machine 2 core 4 gb per
>>machine. 1 slot per core. 1TM is t2.large 4core 16gb . 4slots in the
>>machine. There were other jobs running in the t2.medium TMs. T2.large
>>machine is where the performance testing job was running. *
>>- Sounds like you are running a streaming job without using any
>>state. Have you tuned the managed memory fraction
>>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>>document[1]?  *No i have not set the
>>taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
>>Job manager backend. *
>>- *The CPU maximum spike i spotted was 40%. *
>>
>> *Between i did some latest test only on t2.medium machine with 2 slots
>> per core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
>> *I added rebalance to the inputstream.   ex: *
>> inputStream.rebalance().map()
>> *I was able to get latency in the range 130ms - 2sec.*
>>
>> Let me also know if there are more things to consider here.
>>
>> Thanks
>> Prasanna.
>>
>> On Thu, Jul 16, 2020 at 4:04 PM Xintong Song 
>> wrote:
>>
>>> Hi Prasanna,
>>>
>>> Trying to understand how Flink is deployed.
>>>
>>>- Which version of Flink is used?
>>>- Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
>>>- Which cluster mode is used? (Job/Session)
>>>- Do you mean you have a 4core16gb node for each task manager, and
>>>each task manager has 4 slots?
>>>- Sounds like you are running a streaming job without using any
>>>state. Have you tuned the managed memory fraction
>>>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>>>document[1]?
>>>
>>> When running a stateless job or using a heap state backend
>>>> (MemoryStateBackend or FsStateBackend), set managed memory to zero.
>>>>
>>>
>>> I can see a few potential problems.
>>>
>>>- Managed memory is probably not configured. That means a
>>>significant fraction of memory is unused.
>>>- It sounds like the CPU processing time is not the bottleneck. Thus
>>>increasing the parallelism will not give you better performance, but will
>>>on the other hand increase the overhead load on the task manager.
>>>
>>> Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
>>> you have observed lack of performance in reading from Kafka compared to
>>> Storm.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
>>>
>>> On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar <
>>> prasannakumarram

Re: Performance test Flink vs Storm

2020-07-16 Thread Prasanna kumar
Xintong Song,


   - Which version of Flink is used?*1.10*
   - Which deployment mode is used? *Standalone*
   - Which cluster mode is used? *Job*
   - Do you mean you have a 4core16gb node for each task manager, and each
   task manager has 4 slots? *Yeah*. *There are totally 3 taskmanagers in
   the cluster.  2TMs are t2.medium machine 2 core 4 gb per machine. 1 slot
   per core. 1TM is t2.large 4core 16gb . 4slots in the machine. There were
   other jobs running in the t2.medium TMs. T2.large machine is where the
   performance testing job was running. *
   - Sounds like you are running a streaming job without using any state.
   Have you tuned the managed memory fraction
   (`taskmanager.memory.managed.fraction`) to zero as suggested in the
   document[1]?  *No i have not set the
   taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
   Job manager backend. *
   - *The CPU maximum spike i spotted was 40%. *

*Between i did some latest test only on t2.medium machine with 2 slots per
core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
*I added rebalance to the inputstream.   ex: *inputStream.rebalance().map()
*I was able to get latency in the range 130ms - 2sec.*

Let me also know if there are more things to consider here.

Thanks
Prasanna.

On Thu, Jul 16, 2020 at 4:04 PM Xintong Song  wrote:

> Hi Prasanna,
>
> Trying to understand how Flink is deployed.
>
>- Which version of Flink is used?
>- Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
>- Which cluster mode is used? (Job/Session)
>- Do you mean you have a 4core16gb node for each task manager, and
>each task manager has 4 slots?
>- Sounds like you are running a streaming job without using any state.
>Have you tuned the managed memory fraction
>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>document[1]?
>
> When running a stateless job or using a heap state backend
>> (MemoryStateBackend or FsStateBackend), set managed memory to zero.
>>
>
> I can see a few potential problems.
>
>- Managed memory is probably not configured. That means a significant
>fraction of memory is unused.
>- It sounds like the CPU processing time is not the bottleneck. Thus
>increasing the parallelism will not give you better performance, but will
>on the other hand increase the overhead load on the task manager.
>
> Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
> you have observed lack of performance in reading from Kafka compared to
> Storm.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
>
> On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi
>>
>> Sending to you all separately as you answered one of my earlier query.
>>
>> Thanks,
>> Prasanna.
>>
>>
>> -- Forwarded message -
>> From: Prasanna kumar 
>> Date: Wed 15 Jul, 2020, 23:27
>> Subject: Performance test Flink vs Storm
>> To: , user 
>>
>>
>> Hi,
>>
>> We are testing flink and storm for our streaming pipelines on various
>> features.
>>
>> In terms of Latency,i see the flink comes up short on storm even if more
>> CPU is given to it. Will Explain in detail.
>>
>> *Machine*. t2.large 4 core 16 gb. is used for Used for flink task
>> manager and storm supervisor node.
>> *Kafka Partitions* 4
>> *Messages tested:* 1million
>> *Load* : 50k/sec
>>
>> *Scenario*:
>> Read from Kafka -> Transform (Map to a different JSON format) - > Write
>> to a Kafka topic.
>>
>> *Test 1*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink) .
>> Flink. Operator level parallelism not set. Task Parallelism is set as 1.
>> Task slot is 1 per core.
>>
>> Storm was 130 milliseconds faster in 1st record.
>> Storm was 20 seconds faster in 1 millionth record.
>>
>> *Test 2*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink)
>> Flink. Operator level parallelism not set. Task Parallelism is set as 4.
>> Task slot is 1 per core. So all cores is used.
>>
>> Storm was 180 milliseconds faster in 1st record.
>> Storm was 25 seconds faster in 1 millionth record.
>>
>> *Observations here*
>> 1) Increasing Parallelism did not increase the performance in Flink
>> rather it became 50ms to 5s slower.
>> 2) Flink is s

Performance test Flink vs Storm

2020-07-15 Thread Prasanna kumar
Hi,

We are testing flink and storm for our streaming pipelines on various
features.

In terms of Latency,i see the flink comes up short on storm even if more
CPU is given to it. Will Explain in detail.

*Machine*. t2.large 4 core 16 gb. is used for Used for flink task manager
and storm supervisor node.
*Kafka Partitions* 4
*Messages tested:* 1million
*Load* : 50k/sec

*Scenario*:
Read from Kafka -> Transform (Map to a different JSON format) - > Write to
a Kafka topic.

*Test 1*
Storm Parallelism is set as 1. There are four processes. 1 Spout (Read from
Kafka) and 3 bolts (Transformation and sink) .
Flink. Operator level parallelism not set. Task Parallelism is set as 1.
Task slot is 1 per core.

Storm was 130 milliseconds faster in 1st record.
Storm was 20 seconds faster in 1 millionth record.

*Test 2*
Storm Parallelism is set as 1. There are four processes. 1 Spout (Read from
Kafka) and 3 bolts (Transformation and sink)
Flink. Operator level parallelism not set. Task Parallelism is set as 4.
Task slot is 1 per core. So all cores is used.

Storm was 180 milliseconds faster in 1st record.
Storm was 25 seconds faster in 1 millionth record.

*Observations here*
1) Increasing Parallelism did not increase the performance in Flink rather
it became 50ms to 5s slower.
2) Flink is slower in Reading from Kafka compared to storm. Thats where the
bulk of the latency is.  for the millionth record its 19-24 seconds slower.
3) Once message is read, flink takes lesser time to transform and write to
kafka compared to storm.

*Other Flink Config*
jobmanager.heap.size: 1024m

taskmanager.memory.process.size: 1568m

*How do we improve the latency ? *
*Why does latency becomes worse when parallelism is increased and matched
to partitions?*

Thanks,
Prasanna.


Check pointing for simple pipeline

2020-07-07 Thread Prasanna kumar
Hi ,

I have pipeline. Source-> Map(JSON transform)-> Sink..

Both source and sink are Kafka.

What is the best checkpoint ing mechanism?

 Is setting checkpoints incremental a good option? What should be careful
of?

I am running it on aws emr.

Will checkpoint slow the speed?

Thanks,
Prasanna.


Flink Parallelism for various type of transformation

2020-07-06 Thread Prasanna kumar
Hi ,

I used t2.medium machines for the task manager nodes. It has 2 CPU and 4GB
memory.

But the task manager screen shows that there are 4 slots.

Generally we should match the number of slots to the number of cores.

[image: image.png]

Our pipeline is Source -> Simple Transform -> Sink.

What happens when we have more slots than cores in following scenarios?
1) The transform is just changing of json format.

2)  When the transformation is done by hitting another server (HTTP
request)

Thanks,
Prasanna.


Re: Is Flink HIPAA certified

2020-07-01 Thread Prasanna kumar
Thanks Marta,

Prasanna.

On Wed 1 Jul, 2020, 11:35 Marta Paes Moreira,  wrote:

> Hi, Prasanna.
>
> We're not aware of any Flink users in the US healthcare space (as far as I
> know).
>
> I'm looping in Ryan from AWS, as he might be able to tell you more about
> how you can become HIPAA-compliant with Flink [1].
>
> Marta
>
> [1]
> https://docs.aws.amazon.com/kinesisanalytics/latest/java/akda-java-compliance.html
>
> On Sat, Jun 27, 2020 at 9:41 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi Community ,
>>
>> Could anyone let me know if Flink is used in US healthcare tech space ?
>>
>> Thanks,
>> Prasanna.
>>
>


Is Flink HIPAA certified

2020-06-27 Thread Prasanna kumar
Hi Community ,

Could anyone let me know if Flink is used in US healthcare tech space ?

Thanks,
Prasanna.


Re: Dynamic rescaling in Flink

2020-06-14 Thread Prasanna kumar
Thanks Xintong and Yu Yang for the replies,

I see AWS provides deploying Flink on EMR out of the box. There they have
an option of EMR cluster scaling based on the load.

Is this not equal to dynamic rescaling ?

[image: Screen Shot 2020-06-15 at 9.23.24 AM.png]


https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-scaling.html

Let me know your thoughts on the same.

Prasanna.


On Wed, Jun 10, 2020 at 7:33 AM Xintong Song  wrote:

> Hi Prasanna,
>
> Flink does not support dynamic rescaling at the moment.
>
> AFAIK, there are some companies in China already have solutions for
> dynamic scaling Flink jobs (Alibaba, 360, etc.), but none of them are yet
> available to the community version. These solutions rely on an external
> system to monitor the workload and rescale the job accordingly. In case of
> rescaling, it requires a full stop of the data processing, then rescale,
> then recover from the most recent checkpoint.
>
> The Flink community is also preparing a declarative resource management
> approach, which should allow the job to dynamically adapt to the available
> resources (e.g., add/reduce pods on kubernetes). AFAIK, this is still in
> the design discussion.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jun 10, 2020 at 2:44 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi all,
>>
>> Does flink support dynamic scaling. Say try to add/reduce nodes based
>> upon incoming load.
>>
>> Because our use case is such that we get peak loads for 4 hours and then
>> medium loads for 8 hours and then light to no load for rest 2 hours.
>>
>> Or peak load would be atleast 5 times the medium load.
>>
>> Has anyone used flink in these type of scenario? We are looking at flink
>> for it's low latency performance.
>>
>> Earlier I worked with Spark+YARN which provides a features to dynamicaly
>> add/reduce executors.
>>
>> Wanted to know the same on flink.
>>
>> Thanks,
>> Prasanna
>>
>


Dynamic rescaling in Flink

2020-06-09 Thread Prasanna kumar
Hi all,

Does flink support dynamic scaling. Say try to add/reduce nodes based upon
incoming load.

Because our use case is such that we get peak loads for 4 hours and then
medium loads for 8 hours and then light to no load for rest 2 hours.

Or peak load would be atleast 5 times the medium load.

Has anyone used flink in these type of scenario? We are looking at flink
for it's low latency performance.

Earlier I worked with Spark+YARN which provides a features to dynamicaly
add/reduce executors.

Wanted to know the same on flink.

Thanks,
Prasanna


Re: Multiple Sinks for a Single Soure

2020-06-03 Thread Prasanna kumar
Piotr and Alexander ,

I have fixed the programmatic error in filter method and it is working now.

Thanks for the detailed help from both of you. Am able to add the sinks
based on the JSON and create DAG.

Thanks,
Prasanna.

On Wed, Jun 3, 2020 at 4:51 PM Piotr Nowojski  wrote:

> Hi Prasanna,
>
> 1.
>
> > The object probably contains or references non serializable fields.
>
> That should speak for itself. Flink was not able to distribute your code
> to the worker nodes.
>
> You have used a lambda function that turned out to be non serialisable.
> You should unit test your code and in this case add a
> serialisation/deserialisation round trip unit test for the filter function.
> For starters I would suggest to not use lambda function, but a full/proper
> named class and work from there.
>
> 2.
>
> Can not you create an array/map/collection of OutputTags corresponding to
> the the sinks/topics combinations. One OutputTag per sink(/topic) and use
> this array/map/collection inside your process function?
>
> Piotrek
>
> On 2 Jun 2020, at 13:49, Prasanna kumar 
> wrote:
>
> Hi ,
>
> I have a Event router Registry as this. By reading this as input i need to
> create a Job which would redirect the messages to the correct sink as per
> condition.
>
> {
>   "eventRouterRegistry": [
> { "eventType": "biling", "outputTopic": "billing" },
> { "eventType": "cost", "outputTopic": "cost" },
> { "eventType": "marketing", "outputTopic": "marketing" }
>   ]
> }
>
> I tried the following two approaches.
> *1) Using the Filter method*
>
> public static void setupRouterJobsFilter(
> List registryList, 
> StreamExecutionEnvironment env) {
>
>Properties props = new Properties();
>props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>props.put("client.id", "flink-example1");
>
>FlinkKafkaConsumer011 fkC =
>new FlinkKafkaConsumer011<>("EVENTTOPIC", new 
> SimpleStringSchema(), props);
>
>DataStream inputStream = env.addSource(fkC).name("EVENTTOPIC");
>
>for (eventRouterRegistry record : registryList) {
>   System.out.print(record.getEventType() + " <==> " + 
> record.getOutputTopic());
>
>   FlinkKafkaProducer011 fkp =
>   new FlinkKafkaProducer011<>(record.getOutputTopic(), new 
> SimpleStringSchema(), props);
>
>   inputStream.filter(msg -> msg.equals(record.getEventType()) );
>   //sideOutputStream.print();
>   inputStream.addSink(fkp).name(record.getOutputTopic());
>}
> }
>
> Here am getting the following error.
>  ./bin/flink run -c firstflinkpackage.GenericStreamRouter
> ../../myrepository/flink001/target/flink001-1.0.jar
> Starting execution of program
> ---
>  The program finished with the following exception:
>
> The implementation of the FilterFunction is not serializable. The object
> probably contains or references non serializable fields.
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
>
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>
> org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)
>
> firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)
> firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)
>
> Looks like I should not use  record.getEventType() as this is outside of
> the stream.
>
> Is there a any way to use external variable here mainly to Generalise the
> process.
>
> *2) Using the Side Output method*
>
> Following code is my attempt in creating a generic way for sideoutput
> creation.
>
> I am able to create the Sink Streams based on the list
> (eventRouterRegistry).
>
> But i could not generalise the Output tag creation.
> The issue here is the output tag is fixed.
> My output tag need to be the Event Type and that needs to be in Process
> Function too.
>
> How do i implement. Should I write my own process function ?
>
>  public static void setupRouterJobs(
>  List registryList, StreamExecutionEnvironment env) {
>
>Properties props = new Properties();
>props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>props.put("client.id", "flink-example1");
>
>FlinkKafkaConsumer011 fkC =
>new FlinkKafkaCons

NoResourceAvailableException and JobNotFound Errors

2020-06-02 Thread Prasanna kumar
Hi ,

I am running flink locally in my machine with following configurations.

# The RPC port where the JobManager is reachable.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM

jobmanager.heap.size: 1024m


# The heap size for the TaskManager JVM

taskmanager.heap.size: 1024m


# The number of task slots that each TaskManager offers. Each slot runs one
parallel pipeline.

taskmanager.numberOfTaskSlots: 8


# The parallelism used for programs that did not specify and other
parallelism.

parallelism.default: 1

When i run my program i end up getting

Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 1, slots allocated: 0

at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)

at
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)

at
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)

at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

JobManager Logs

2020-06-02 23:25:09,992 ERROR
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   -
Exception occurred in REST handler.
org.apache.flink.runtime.rest.NotFoundException: Job
be3d6b9751b6e9c509b9bedeb581a72e not found

Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
Could not find Flink job (be3d6b9751b6e9c509b9bedeb581a72e)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:485)

 Finally its shutdown

2020-06-02 23:30:05,427 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Stopping checkpoint coordinator for job
c23a172cda6cc659296af6452ff57f45.
2020-06-02 23:30:05,427 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
 - Shutting down
2020-06-02 23:30:05,428 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
c23a172cda6cc659296af6452ff57f45 reached globally terminal state
FAILED.
2020-06-02 23:30:05,449 INFO
org.apache.flink.runtime.jobmaster.JobMaster  -
Stopping the JobMaster for job Flink Streaming Single
Environment(c23a172cda6cc659296af6452ff57f45).
2020-06-02 23:30:05,450 INFO
org.apache.flink.runtime.jobmaster.JobMaster  - Close
ResourceManager connection 9da4590b1bbc3c104e70e270988db461:
JobManager is shutting down..
2020-06-02 23:30:05,450 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  -
Suspending SlotPool.
2020-06-02 23:30:05,450 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Disconnect job manager
0...@akka.tcp://flink@localhost:6123/user/jobmanager_0
for job c23a172cda6cc659296af6452ff57f45 from the resource manager.
2020-06-02 23:30:05,451 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  -
Stopping SlotPool.
2020-06-02 23:30:05,451 INFO
org.apache.flink.runtime.jobmaster.JobManagerRunner   -
JobManagerRunner already shutdown.


Thanks,
Prasanna.


Re: Multiple Sinks for a Single Soure

2020-06-02 Thread Prasanna kumar
Hi ,

I have a Event router Registry as this. By reading this as input i need to
create a Job which would redirect the messages to the correct sink as per
condition.

{
  "eventRouterRegistry": [
{ "eventType": "biling", "outputTopic": "billing" },
{ "eventType": "cost", "outputTopic": "cost" },
{ "eventType": "marketing", "outputTopic": "marketing" }
  ]
}

I tried the following two approaches.
*1) Using the Filter method*

public static void setupRouterJobsFilter(
List registryList,
StreamExecutionEnvironment env) {

   Properties props = new Properties();
   props.put("bootstrap.servers", BOOTSTRAP_SERVER);
   props.put("client.id", "flink-example1");

   FlinkKafkaConsumer011 fkC =
   new FlinkKafkaConsumer011<>("EVENTTOPIC", new
SimpleStringSchema(), props);

   DataStream inputStream = env.addSource(fkC).name("EVENTTOPIC");

   for (eventRouterRegistry record : registryList) {
  System.out.print(record.getEventType() + " <==> " +
record.getOutputTopic());

  FlinkKafkaProducer011 fkp =
  new FlinkKafkaProducer011<>(record.getOutputTopic(),
new SimpleStringSchema(), props);

  inputStream.filter(msg -> msg.equals(record.getEventType()) );
  //sideOutputStream.print();
  inputStream.addSink(fkp).name(record.getOutputTopic());
   }
}

Here am getting the following error.

 ./bin/flink run -c firstflinkpackage.GenericStreamRouter
../../myrepository/flink001/target/flink001-1.0.jar

Starting execution of program

---

 The program finished with the following exception:


The implementation of the FilterFunction is not serializable. The object
probably contains or references non serializable fields.

org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)

org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)

org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)

firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)

firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)

Looks like I should not use  record.getEventType() as this is outside of
the stream.

Is there a any way to use external variable here mainly to Generalise the
process.

*2) Using the Side Output method*

Following code is my attempt in creating a generic way for sideoutput
creation.

I am able to create the Sink Streams based on the list
(eventRouterRegistry).

But i could not generalise the Output tag creation.
The issue here is the output tag is fixed.
My output tag need to be the Event Type and that needs to be in Process
Function too.

How do i implement. Should I write my own process function ?

 public static void setupRouterJobs(
 List registryList, StreamExecutionEnvironment env) {

   Properties props = new Properties();
   props.put("bootstrap.servers", BOOTSTRAP_SERVER);
   props.put("client.id", "flink-example1");

   FlinkKafkaConsumer011 fkC =
   new FlinkKafkaConsumer011<>("EVENTTOPIC", new
SimpleStringSchema(), props);

   DataStream inputStream = env.addSource(fkC).name("EVENTTOPIC");

 //Even if i try to generalise OUtput tag here. How do i do it
inside ProcessFunction

 final OutputTag outputTag = new
OutputTag("side-output") {};

SingleOutputStreamOperator mainDataStream =
inputStream.process(
new ProcessFunction() {

@Override
public void processElement(String value, Context ctx, Collector out)
throws Exception {
// emit data to side output
ctx.output(OutputTag, value);
}
});

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic())
;

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new
SimpleStringSchema(), props);

DataStream sideOutputStream =
mainDataStream.getSideOutput(outputTag);
sideOutputStream.print();
sideOutputStream.addSink(fkp).name(record.getOutputTopic());
}
}


Thanks,
Prasanna.



On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Alexander,
>
> Thanks for the reply. Will implement and come back in case of any
> questions.
>
> Prasanna.
>
> On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov 
> wrote:
>
>> Hi Prasanna,
>>
>> if the set of all possible sinks is known in advance, side outputs will
>> be generic enough to express your requirements. Side output produces a
>> stream. Create all of the side output tags, associate each of them with one
>

Re: Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Prasanna kumar
Leaonard,

Thanks for the reply and would look into those options.
But as for the original question, could we create a topic dynamically when
required .

Prasanna.

On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu  wrote:

> Hi, kumar
>
> Flink support consume/produce from/to multiple kafka topics[1], in your
> case you can implement KeyedSerializationSchema(legacy interface) or
> KafkaSerializationSchema[2] to make one producer instance support send data
> to multiple topics. There is an ITCase you can reference[3].
>
>
> Best,
> Leonard Xu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126
>
>
> 在 2020年6月1日,15:35,Prasanna kumar  写道:
>
> Hi,
>
> I have Use Case where i read events from a Single kafka Stream comprising
> of JSON messages.
>
> Requirement is to split the stream into multiple output streams based on
> some criteria say based on Type of Event or Based on Type and Customer
> associated with the event.
>
> We could achieve the splitting of stream using Side outputs as i have seen
> in the documentation.
>
> Our business environment is such that there could be new event types
> flowing in and would the Flink Kafka producer create the topics dynamically
> based on the inflowing events. I did not see any documentation saying
> that it could create.
>
> Or should it be always pre created by running a script separately. (Not a
> good scalable practice in our case)
>
> Thanks,
> Prasanna.
>
>
>


Creating Kafka Topic dynamically in Flink

2020-06-01 Thread Prasanna kumar
Hi,

I have Use Case where i read events from a Single kafka Stream comprising
of JSON messages.

Requirement is to split the stream into multiple output streams based on
some criteria say based on Type of Event or Based on Type and Customer
associated with the event.

We could achieve the splitting of stream using Side outputs as i have seen
in the documentation.

Our business environment is such that there could be new event types
flowing in and would the Flink Kafka producer create the topics dynamically
based on the inflowing events. I did not see any documentation saying
that it could create.

Or should it be always pre created by running a script separately. (Not a
good scalable practice in our case)

Thanks,
Prasanna.


Re: Need Help on Flink suitability to our usecase

2020-05-29 Thread Prasanna kumar
Thanks Robert for the reply.

On Fri 29 May, 2020, 12:31 Robert Metzger,  wrote:

> Hey Prasanna,
>
> (Side note: there is not need to send this email to multiple mailing
> lists. The user@ list is the right one)
>
> Let me quickly go through your questions:
>
> Is this usecase suited for flink ?
>
>
> Based on the details you've provided: Yes
> What you also need to consider are the hardware requirements you'll have
> for processing such amounts of data. I can strongly recommend setting up a
> small demo environment to measure the throughput of a smaller Flink cluster
> (say 10 machines).
>
> 1) If you do not have any consistency guarantees (data loss is
> acceptable), and you have good infrastructure in place to deploy and
> monitor such microservices then a microservice might also be an option.
> Flink is pretty well suited for heavy IO use-cases. Afaik Netflix has
> talked at several Flink Forward conferences about similar cases (check
> Youtube for recorded talks)
>
> 2) It should not be too difficult to build a small, generic framework on
> top of the Flink APIs
>
> 3) If you are deploying Flink on a resource manager like Kubernetes or
> YARN, they will take care of recovering your cluster if it goes down. Your
> recovery time will mostly depend on the state size that you are
> checkpointing (and the ability of your resource manager to bring up new
> resources). I don't think you'll be able to recover in < 500 milliseconds,
> but within a few seconds.
> I don't think that the other frameworks you are looking at are going to be
> much better at this.
>
> Best,
> Robert
>
> On Tue, May 19, 2020 at 1:28 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi,
>>
>> I have the following usecase to implement in my organization.
>>
>> Say there is huge relational database(1000 tables for each of our 30k
>> customers) in our monolith setup
>>
>> We want to reduce the load on the DB and prevent the applications from
>> hitting it for latest events. So an extract is done from redo logs on to
>> kafka.
>>
>> We need to set up a streaming platform based on the table updates that
>> happen(read from kafka) , we need to form events and send it consumer.
>>
>> Each consumer may be interested in same table but different
>> updates/columns respective of their business needs and then deliver it to
>> their endpoint/kinesis/SQS/a kafka topic.
>>
>> So the case here is *1* table update : *m* events : *n* sink.
>> Peak Load expected is easily a  100k-million table updates per second(all
>> customers put together)
>> Latency expected by most customers is less than a second. Mostly in
>> 100-500ms.
>>
>> Is this usecase suited for flink ?
>>
>> I went through the Flink book and documentation. These are the following
>> questions i have
>>
>> 1). If we have situation like this *1* table update : *m* events : *n*
>> sink , is it better to write our micro service on our own or it it better
>> to implement through flink.
>>   1 a)  How does checkpointing happens if we have *1* input: *n*
>> output situations.
>>   1 b)  There are no heavy transformations maximum we might do is to
>> check the required columns are present in the db updates and decide whether
>> to create an event. So there is an alternative thought process to write a
>> service in node since it more IO and less process.
>>
>> 2)  I see that we are writing a Job and it is deployed and flink takes
>> care of the rest in handling parallelism, latency and throughput.
>>  But what i need is to write a generic framework so that we should be
>> able to handle any table structure. we should not end up writing one job
>> driver for each case.
>> There are at least 200 type of events in the existing monolith system
>> which might move to this new system once built.
>>
>> 3)  How do we maintain flink cluster HA . From the book , i get that
>> internal task level failures are handled gracefully in flink.  But what if
>> the flink cluster goes down, how do we make sure its HA ?
>> I had earlier worked with spark and we had issues managing it. (Not
>> much problem was there since there the latency requirement is 15 min and we
>> could make sure to ramp another one up within that time).
>> These are absolute realtime cases and we cannot miss even one
>> message/event.
>>
>> There are also thoughts whether to use kafka streams/apache storm for the
>> same. [They are investigated by different set of folks]
>>
>> Thanks,
>> Prasanna.
>>
>


Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Prasanna kumar
Alexander,

Thanks for the reply. Will implement and come back in case of any
questions.

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov 
wrote:

> Hi Prasanna,
>
> if the set of all possible sinks is known in advance, side outputs will be
> generic enough to express your requirements. Side output produces a stream.
> Create all of the side output tags, associate each of them with one sink,
> add conditional logic around `ctx.output(outputTag, ... *)*;`  to decide
> where to dispatch the messages  (see [1]), collect to none or many side
> outputs, depending on your logic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>
> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Piotr,
>>
>> There is an event and subscriber registry as JSON file which has the
>> table event mapping and event-subscriber mapping as mentioned below.
>>
>> Based on the set JSON , we need to job to go through the table updates
>> and create events and for each event there is a way set how to sink them.
>>
>> The sink streams have to be added based on this JSON. Thats what i
>> mentioned as no predefined sink in code earlier.
>>
>> You could see that each event has different set of sinks.
>>
>> Just checking how much generic could Side-output streams be ?.
>>
>> Source -> generate events -> (find out sinks dynamically in code ) ->
>> write to the respective sinks.
>>
>> {
>>   " tablename ": "source.table1",
>>   "events": [
>> {
>>   "operation": "update",
>>   "eventstobecreated": [
>> {
>>   "eventname": "USERUPDATE",
>>   "Columnoperation": "and",
>>   "ColumnChanges": [
>> {
>>   "columnname": "name"
>> },
>> {
>>   "columnname": "loginenabled",
>>   "value": "Y"
>> }
>>   ],
>>   "Subscribers": [
>> {
>>   "customername": "c1",
>>   "method": "Kafka",
>>   "methodparams": {
>> "topicname": "USERTOPIC"
>>   }
>> },
>> {
>>   "customername": "c2",
>>   "method": "S3",
>>   "methodparams": {
>> "folder": "aws://folderC2"
>>   }}, ]}]
>> },
>> {
>>   "operation": "insert",
>>   "eventstobecreated": [
>>   "eventname": "USERINSERT",
>>   "operation": "insert",
>>   "Subscribers": [
>> {
>>   "teamname": "General",
>>   "method": "Kafka",
>>   "methodparams": {
>> "topicname": "new_users"
>>   }
>> },
>> {
>>   "teamname": "General",
>>   "method": "kinesis",
>>   "methodparams": {
>> "URL": "new_users",
>> "username": "uname",
>> "password":  "pwd"
>>   }}, ]}]
>> },
>> {
>>   "operation": "delete",
>>   "eventstobecreated": [
>> {
>>   "eventname": "USERDELETE",
>>   "Subscribers": [
>> {
>>   "customername": "c1",
>>   "method": "Kafka",
>>   "methodparams": {
>> "topicname": "USERTOPIC"
>>   }
>>

Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Prasanna kumar
Piotr,

There is an event and subscriber registry as JSON file which has the table
event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and
create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i
mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks.

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write
to the respective sinks.

{
  " tablename ": "source.table1",
  "events": [
{
  "operation": "update",
  "eventstobecreated": [
{
  "eventname": "USERUPDATE",
  "Columnoperation": "and",
  "ColumnChanges": [
{
  "columnname": "name"
},
{
  "columnname": "loginenabled",
  "value": "Y"
}
  ],
  "Subscribers": [
{
  "customername": "c1",
  "method": "Kafka",
  "methodparams": {
"topicname": "USERTOPIC"
  }
},
{
  "customername": "c2",
  "method": "S3",
  "methodparams": {
"folder": "aws://folderC2"
  }}, ]}]
},
{
  "operation": "insert",
  "eventstobecreated": [
  "eventname": "USERINSERT",
  "operation": "insert",
  "Subscribers": [
{
  "teamname": "General",
  "method": "Kafka",
  "methodparams": {
"topicname": "new_users"
  }
},
{
  "teamname": "General",
  "method": "kinesis",
  "methodparams": {
"URL": "new_users",
"username": "uname",
"password":  "pwd"
  }}, ]}]
},
{
  "operation": "delete",
  "eventstobecreated": [
{
  "eventname": "USERDELETE",
  "Subscribers": [
{
  "customername": "c1",
  "method": "Kafka",
  "methodparams": {
"topicname": "USERTOPIC"
  }
},
{
  "customername": "c4",
  "method": "Kafka",
  "methodparams": {
"topicname": "deleterecords"
 }}, ]}]
 },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski  wrote:

> Hi,
>
> I’m not sure if I fully understand what do you mean by
>
> > The point is the sink are not predefined.
>
> You must know before submitting the job, what sinks are going to be used
> in the job. You can have some custom logic, that would filter out records
> before writing them to the sinks, as I proposed before, or you could use
> side outputs [1] would be better suited to your use case?
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On 26 May 2020, at 12:20, Prasanna kumar 
> wrote:
>
> Thanks Piotr for the Reply.
>
> I will explain my requirement in detail.
>
> Table Updates -> Generate Business Events -> Subscribers
>
> *Source Side*
> There are CDC of 100 tables which the framework needs to listen to.
>
> *Event Table Mapping*
>
> There would be Event associated with table in a *m:n* fashion.
>
> say there are tables TA, TB, TC.
>
> EA, EA2 and EA3 are generated from TA (based on conditions)
> EB generated from TB (based on conditions)
> EC generated from TC (no conditions.)
>
> Say there are events EA,EB,EC generated from the tables TA, TB, TC
>
> *Event Sink Mapping*
>
> EA has following sinks. kafka topic SA,SA2,SAC.
> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
> EC has only rest endpoint RC.
>
> The point is the sink are not predefined. [. But i onl

Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Prasanna kumar
Thanks Piotr for the Reply.

I will explain my requirement in detail.

Table Updates -> Generate Business Events -> Subscribers

*Source Side*
There are CDC of 100 tables which the framework needs to listen to.

*Event Table Mapping*

There would be Event associated with table in a *m:n* fashion.

say there are tables TA, TB, TC.

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC

*Event Sink Mapping*

EA has following sinks. kafka topic SA,SA2,SAC.
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC.

The point is the sink are not predefined. [. But i only see the example
online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years
time.

We are looking at writing a generic job for the same , rather than writing
one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski  wrote:

> Hi,
>
> You could easily filter/map/process the streams differently before writing
> them to the sinks. Building on top of my previous example, this also should
> work fine:
>
>
> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
> but any ;
>
> myStream.baz().addSink(sink1);
> myStream.addSink(sink2);
> myStream.qux().quuz().corge().addSink(sink3);
>
> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that
> you wish. `foo` and `bar` would be applied once to the stream, before it’s
> going to be split to different sinks, while `baz`, `qux`, `quuz` and
> `corge` would be applied to only of the sinks AFTER splitting.
>
> In your case, it could be:
>
> myStream.filter(...).addSink(sink1);
> myStream.addSink(sink2);
> myStream.addSink(sink3);
>
> So sink2 and sink3 would get all of the records, while sink1 only a
> portion of them.
>
> Piotrek
>
>
> On 26 May 2020, at 06:45, Prasanna kumar 
> wrote:
>
> Piotr,
>
> Thanks for the reply.
>
> There is one other case, where some events have to be written to multiple
> sinks and while other have to be written to just one sink.
>
> How could i have a common codeflow/DAG for the same ?
>
> I do not want multiple jobs to do the same want to accomplish in a single
> job .
>
> Could i add Stream code "myStream.addSink(sink1)" under a conditional
> operator such as 'if' to determine .
>
> But i suppose here the stream works differently compared to normal code
> processing.
>
> Prasanna.
>
>
> On Mon 25 May, 2020, 23:37 Piotr Nowojski,  wrote:
>
>> Hi,
>>
>> To the best of my knowledge the following pattern should work just fine:
>>
>> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
>> but any ;
>> myStream.addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.addSink(sink3);
>>
>> All of the records from `myStream` would be passed to each of the sinks.
>>
>> Piotrek
>>
>> > On 24 May 2020, at 19:34, Prasanna kumar 
>> wrote:
>> >
>> > Hi,
>> >
>> > There is a single source of events for me in my system.
>> >
>> > I need to process and send the events to multiple destination/sink at
>> the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>> >
>> > I am able send to one sink.
>> >
>> > By adding more sink stream to the source stream could we achieve it .
>> Are there any shortcomings.
>> >
>> > Please let me know if any one here has successfully implemented one .
>> >
>> > Thanks,
>> > Prasanna.
>>
>>
>


Re: Multiple Sinks for a Single Soure

2020-05-25 Thread Prasanna kumar
Piotr,

Thanks for the reply.

There is one other case, where some events have to be written to multiple
sinks and while other have to be written to just one sink.

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single
job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional
operator such as 'if' to determine .

But i suppose here the stream works differently compared to normal code
processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski,  wrote:

> Hi,
>
> To the best of my knowledge the following pattern should work just fine:
>
> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
> but any ;
> myStream.addSink(sink1);
> myStream.addSink(sink2);
> myStream.addSink(sink3);
>
> All of the records from `myStream` would be passed to each of the sinks.
>
> Piotrek
>
> > On 24 May 2020, at 19:34, Prasanna kumar 
> wrote:
> >
> > Hi,
> >
> > There is a single source of events for me in my system.
> >
> > I need to process and send the events to multiple destination/sink at
> the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
> >
> > I am able send to one sink.
> >
> > By adding more sink stream to the source stream could we achieve it .
> Are there any shortcomings.
> >
> > Please let me know if any one here has successfully implemented one .
> >
> > Thanks,
> > Prasanna.
>
>


Multiple Sinks for a Single Soure

2020-05-24 Thread Prasanna kumar
Hi,

There is a single source of events for me in my system.

I need to process and send the events to multiple destination/sink at the
same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]

I am able send to one sink.

By adding more sink stream to the source stream could we achieve it . Are
there any shortcomings.

Please let me know if any one here has successfully implemented one .

Thanks,
Prasanna.


Need Help on Flink suitability to our usecase

2020-05-19 Thread Prasanna kumar
Hi,

I have the following usecase to implement in my organization.

Say there is huge relational database(1000 tables for each of our 30k
customers) in our monolith setup

We want to reduce the load on the DB and prevent the applications from
hitting it for latest events. So an extract is done from redo logs on to
kafka.

We need to set up a streaming platform based on the table updates that
happen(read from kafka) , we need to form events and send it consumer.

Each consumer may be interested in same table but different updates/columns
respective of their business needs and then deliver it to their
endpoint/kinesis/SQS/a kafka topic.

So the case here is *1* table update : *m* events : *n* sink.
Peak Load expected is easily a  100k-million table updates per second(all
customers put together)
Latency expected by most customers is less than a second. Mostly in
100-500ms.

Is this usecase suited for flink ?

I went through the Flink book and documentation. These are the following
questions i have

1). If we have situation like this *1* table update : *m* events : *n* sink
, is it better to write our micro service on our own or it it better to
implement through flink.
  1 a)  How does checkpointing happens if we have *1* input: *n* output
situations.
  1 b)  There are no heavy transformations maximum we might do is to
check the required columns are present in the db updates and decide whether
to create an event. So there is an alternative thought process to write a
service in node since it more IO and less process.

2)  I see that we are writing a Job and it is deployed and flink takes care
of the rest in handling parallelism, latency and throughput.
 But what i need is to write a generic framework so that we should be
able to handle any table structure. we should not end up writing one job
driver for each case.
There are at least 200 type of events in the existing monolith system
which might move to this new system once built.

3)  How do we maintain flink cluster HA . From the book , i get that
internal task level failures are handled gracefully in flink.  But what if
the flink cluster goes down, how do we make sure its HA ?
I had earlier worked with spark and we had issues managing it. (Not
much problem was there since there the latency requirement is 15 min and we
could make sure to ramp another one up within that time).
These are absolute realtime cases and we cannot miss even one
message/event.

There are also thoughts whether to use kafka streams/apache storm for the
same. [They are investigated by different set of folks]

Thanks,
Prasanna.


flink setup errors

2020-05-17 Thread Prasanna kumar
I tried to setup flink locally as mentioned in the link
https://ci.apache.org/projects/flink/flink-docs-stable/dev/projectsetup/java_api_quickstart.html
.

I ended getting the following error

[INFO] Generating project in Interactive mode

[WARNING] No archetype found in remote catalog. Defaulting to internal
catalog

[WARNING] Archetype not found in any catalog. Falling back to central
repository.

[WARNING] Add a repository with id 'archetype' in your settings.xml if
archetype's repository is elsewhere.

Downloading from central:
http://repo.maven.apache.org/maven2/org/apache/flink/flink-quickstart-java/1.10.0/flink-quickstart-java-1.10.0.pom

[INFO]


[INFO] BUILD FAILURE

[INFO]


[INFO] Total time: 49.262 s

[INFO] Finished at: 2020-05-18T10:28:08+05:30

[INFO]


[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-archetype-plugin:3.1.2:generate (default-cli)
on project standalone-pom: The desired archetype does not exist
(org.apache.flink:flink-quickstart-java:1.10.0) -> [Help 1]

[ERROR]

[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR]

[ERROR] For more information about the errors and possible solutions,
please read the following articles:

[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException




This is my settings.xml in my .m2/ directory.



myown







 http://repo.maven.apache.org/maven2/

central











myown





Tried to run using second option

curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0

Still ended up getting same error.

Thanks
Prasanna.