Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-10 Thread Yang Wang
Could you find the logs under /opt/flink/log/jobmanager.log? If not, please
share the
commands the JobManager and TaskManager are using? If the command is correct
and the log4j under /opt/flink/conf is expected, it is so curious why we
could not get the logs.


Best,
Yang

Li Peng  于2019年12月11日周三 下午1:24写道:

> Ah I see. I think the Flink app is reading files from
> /opt/flink/conf correctly as it is, since changes I make to flink-conf are
> picked up as expected, it's just the log4j properties that are either not
> being used, or don't apply to stdout or whatever source k8 uses for its
> logs? Given that the pods don't seem to have logs written to file
> anywhere, contrary to the properties, I'm inclined to say it's the former
> and that the log4j properties just aren't being picked up. Still have no
> idea why though.
>
> On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:
>
>> Sure, /opt/flink/conf is mounted as a volume from the configmap.
>>
>>
>>
>> Best
>>
>> Yun Tang
>>
>>
>>
>> *From: *Li Peng 
>> *Date: *Wednesday, December 11, 2019 at 9:37 AM
>> *To: *Yang Wang 
>> *Cc: *vino yang , user 
>> *Subject: *Re: Flink on Kubernetes seems to ignore log4j.properties
>>
>>
>>
>> 1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
>> /opt/flink/bin/taskmanager.sh on my job and task managers respectively.
>> It's based on the setup described here:
>> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ 
>> .
>> I haven't tried the configmap approach yet, does it also replace the conf
>> files in /opt/flink/conf?
>>
>> 2. Hey Vino, here's a sample of the kubernetes:
>> https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
>> default, so the string patterns should look the same, but as you can see
>> it's full of info checkpoint logs that I originally was trying to suppress.
>> Based on my log4j.properties, the level should be set to WARN. I couldn't
>> actually find any .out files on the pod, this is from the kubectl logs
>> command. I also didn't see any files in /opt/flink/log, which I thought my
>> log4j was specified to do, hence me thinking that the properties weren't
>> actually being consumed. I also have the same properties in my
>> src/main/resources folder.
>>
>> 3. Hey Yang, yes this is a standalone session cluster. I did specify in
>> the docker file to copy the log4j.properties to the /opt/flink/conf folder
>> on the image, and I confirmed that the properties are correct when I bash'd
>> into the pod and viewed them manually.
>>
>>
>>
>> Incidentally, I also tried passing the -Dlog4j.configuration argument to
>> the programs, and it doesn't work either. And based on what I'm reading on
>> jira, that option is not really supported anymore?
>>
>>
>>
>> Thanks for your responses, folks!
>>
>> Li
>>
>>
>>
>> On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:
>>
>> Hi Li Peng,
>>
>>
>>
>> You are running standalone session cluster or per-job cluster on
>> kubernetes. Right?
>>
>> If so, i think you need to check your log4j.properties in the image, not
>> local. The log is
>>
>> stored to /opt/flink/log/jobmanager.log by default.
>>
>>
>>
>> If you are running active Kubernetes integration for a fresh taste. The
>> following cli option
>>
>> could be used to remove the redirect.
>>
>> -Dkubernetes.container-start-command-template="%java% %classpath%
>> %jvmmem% %jvmopts% %logging% %class% %args%"
>>
>>
>>
>> Best,
>>
>> Yang
>>
>>
>>
>> vino yang  于2019年12月10日周二 上午10:55写道:
>>
>> Hi Li,
>>
>>
>>
>> A potential reason could be conflicting logging frameworks. Can you share
>> the log in your .out file and let us know if the print format of the log is
>> the same as the configuration file you gave.
>>
>>
>>
>> Best,
>>
>> Vino
>>
>>
>>
>> Li Peng  于2019年12月10日周二 上午10:09写道:
>>
>> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
>> logs *) completely ignore any of the configurations I put into
>> /flink/conf/. I set the logger level to WARN, yet I still see INFO level
>> logging from flink loggers
>> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
>> copied the same properties to /flink/conf/log4j-console.properties
>> and log4j-cli.properties.
>>
>>
>>
>> From what I can tell, kubernetes just listens to stdout and stderr, so
>> shouldn't the log4j.properties control output to them? Anyone seen this
>> issue before?
>>
>>
>>
>> Here is my log4j.properties:
>>
>>
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=WARN, file, console, stdout
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=WARN
>>
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to 
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>
>> # L

Re: Event Timestamp corrupted by timezone

2019-12-10 Thread Jingsong Li
Hi Wojciech,

You can try 1.9/1.10 with blink planner.
As Timo said, the timestamp is TimestampType without time zone in new type
system and Blink planner support it.

And you should use LocalDateTime in sink/down stream, LocalDateTime has no
time zone.

Best,
Jingsong Lee

On Tue, Dec 10, 2019 at 11:09 PM Timo Walther  wrote:

> Hi,
>
> I hope we can solve this issues with the new type system. The core
> problem is the old planner uses java.sql.Timestamp which depends on the
> timezone of the current machine. I would recommend to set everything to
> UTC if possible for now.
>
> Regards,
> Timo
>
>
> On 03.12.19 18:49, Lasse Nedergaard wrote:
> > Hi.
> >
> > We have the same Challenges. I asked on Flink forward and it’s a known
> > problem. We input in utc but Flink output in local machine time. We have
> > created a function that converts it back to utc before collecting to
> > down stream.
> >
> > Med venlig hilsen / Best regards
> > Lasse Nedergaard
> >
> >
> > Den 3. dec. 2019 kl. 15.16 skrev Wojciech Indyk  > >:
> >
> >> Hi!
> >> I use Flink 1.8 with Scala. I think I've found a problem with event
> >> timestamps in TableAPI. When I mark my timestamp: Long as .rowtime and
> >> then save it back to stream as sql.Timestamp I will get wrong .getTime
> >> result. The gist for reproduction is here:
> >> https://gist.github.com/woj-i/b1dfbb71590b7f1c0c58be1f9e41c610
> >> When I change my timezome from GMT+1 to GMT everything works ok.
> >> I've found this post from March
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/event-time-timezone-is-not-correct-tt26457.html
> >> but it's not resolved. The most relevant ticket I've found
> >> https://issues.apache.org/jira/browse/FLINK-8353 seems to not include
> >> the problem I described.
> >>
> >> 1. Can you confirm it's a bug?
> >> 2. Should I post this bug somewhere to be at least planned to solve?
> >> 3. Can you recommend me a workaround for the described problem?
> >>
> >> --
> >> Kind regards/ Pozdrawiam,
> >> Wojciech Indyk
>
>

-- 
Best, Jingsong Lee


Request for removal from subscription

2019-12-10 Thread L Jainkeri, Suman (Nokia - IN/Bangalore)
Unsubscribe


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-10 Thread Li Peng
Ah I see. I think the Flink app is reading files from
/opt/flink/conf correctly as it is, since changes I make to flink-conf are
picked up as expected, it's just the log4j properties that are either not
being used, or don't apply to stdout or whatever source k8 uses for its
logs? Given that the pods don't seem to have logs written to file
anywhere, contrary to the properties, I'm inclined to say it's the former
and that the log4j properties just aren't being picked up. Still have no
idea why though.

On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:

> Sure, /opt/flink/conf is mounted as a volume from the configmap.
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *Li Peng 
> *Date: *Wednesday, December 11, 2019 at 9:37 AM
> *To: *Yang Wang 
> *Cc: *vino yang , user 
> *Subject: *Re: Flink on Kubernetes seems to ignore log4j.properties
>
>
>
> 1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
> /opt/flink/bin/taskmanager.sh on my job and task managers respectively.
> It's based on the setup described here:
> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
> I haven't tried the configmap approach yet, does it also replace the conf
> files in /opt/flink/conf?
>
> 2. Hey Vino, here's a sample of the kubernetes:
> https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
> default, so the string patterns should look the same, but as you can see
> it's full of info checkpoint logs that I originally was trying to suppress.
> Based on my log4j.properties, the level should be set to WARN. I couldn't
> actually find any .out files on the pod, this is from the kubectl logs
> command. I also didn't see any files in /opt/flink/log, which I thought my
> log4j was specified to do, hence me thinking that the properties weren't
> actually being consumed. I also have the same properties in my
> src/main/resources folder.
>
> 3. Hey Yang, yes this is a standalone session cluster. I did specify in
> the docker file to copy the log4j.properties to the /opt/flink/conf folder
> on the image, and I confirmed that the properties are correct when I bash'd
> into the pod and viewed them manually.
>
>
>
> Incidentally, I also tried passing the -Dlog4j.configuration argument to
> the programs, and it doesn't work either. And based on what I'm reading on
> jira, that option is not really supported anymore?
>
>
>
> Thanks for your responses, folks!
>
> Li
>
>
>
> On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:
>
> Hi Li Peng,
>
>
>
> You are running standalone session cluster or per-job cluster on
> kubernetes. Right?
>
> If so, i think you need to check your log4j.properties in the image, not
> local. The log is
>
> stored to /opt/flink/log/jobmanager.log by default.
>
>
>
> If you are running active Kubernetes integration for a fresh taste. The
> following cli option
>
> could be used to remove the redirect.
>
> -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
> %jvmopts% %logging% %class% %args%"
>
>
>
> Best,
>
> Yang
>
>
>
> vino yang  于2019年12月10日周二 上午10:55写道:
>
> Hi Li,
>
>
>
> A potential reason could be conflicting logging frameworks. Can you share
> the log in your .out file and let us know if the print format of the log is
> the same as the configuration file you gave.
>
>
>
> Best,
>
> Vino
>
>
>
> Li Peng  于2019年12月10日周二 上午10:09写道:
>
> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
> logs *) completely ignore any of the configurations I put into
> /flink/conf/. I set the logger level to WARN, yet I still see INFO level
> logging from flink loggers
> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
> copied the same properties to /flink/conf/log4j-console.properties
> and log4j-cli.properties.
>
>
>
> From what I can tell, kubernetes just listens to stdout and stderr, so
> shouldn't the log4j.properties control output to them? Anyone seen this
> issue before?
>
>
>
> Here is my log4j.properties:
>
>
> # This affects logging for both user code and Flink
> log4j.rootLogger=WARN, file, console, stdout
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=WARN
>
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
> %-60c %x - %m%n
>
> # Log all infos to the console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4

Re: Processing Events by custom rules kept in Broadcast State

2019-12-10 Thread vino yang
Hi KristoffSC,

It seems the main differences are when to parse your rules and what could
be put into the broadcast state.

IMO, multiple solutions all can take effect. I prefer option 3. I'd like to
parse the rules ASAP and let them be real rule event stream (not ruleset
stream) in the source. Then doing the real parse in the
processBroadcastElement.

In short, it's my personal opinion.

Best,
Vino

KristoffSC  于2019年12月11日周三 上午6:26写道:

> Hi,
> I think this would be the very basic use case for Broadcast State Pattern
> but I would like to know what are the best approaches to solve this
> problem.
>
> I have an operator that extends BroadcastProcessFunction. The
> brodcastElement is an element sent as Json format message by Kafka. It
> describes a processing rules like key/value mapping, like so: ruleName -
> ruleValue (both strings).
>
> In processElement method I'm delegating to my custom RuleEngineService. It
> is a class that has the "rule engine" logic and accepts received event and
> "set of processing rules" in some form.
>
> What would be the best approaches:
> 1. Keep original Json String in broadcast state. Whenever there is a new
> set
> of rules streamed by Kafka, then in processBroadcastElement method parse
> this Json, map to some RuleParams abstraction and keep it as transient
> field
> in my BroadcastProcessFunction operator. Save Json in broadcast state. Pass
> RuleParams to rule engine service.
>
> 2. Same as 1 but instead keeping Raw Json String in broadcast state, keep
> already parsed JsonObject, somethign like ObjectNode from KafkaConnector
> lib.
>
> 3. Keep each pair of ruleName - ruleValue (both strings) separate in
> broadcast state. In processBrodcastElement method parse the received Json
> and update the state. In processElement method take all rules, build
> RulePArams object (basically a map) and pass them to rule engine
>
> 4. Parse Json in processBroadcastElement method, map it to RuleParams
> abstraction method, keeping rules in a hashMap and keep this RulePrams in
> broadcast state
>
> 5. any other...
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-10 Thread Yun Tang
Sure, /opt/flink/conf is mounted as a volume from the configmap.

Best
Yun Tang

From: Li Peng 
Date: Wednesday, December 11, 2019 at 9:37 AM
To: Yang Wang 
Cc: vino yang , user 
Subject: Re: Flink on Kubernetes seems to ignore log4j.properties

1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and 
/opt/flink/bin/taskmanager.sh on my job and task managers respectively. It's 
based on the setup described here: 
http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ . 
I haven't tried the configmap approach yet, does it also replace the conf files 
in /opt/flink/conf?

2. Hey Vino, here's a sample of the kubernetes: https://pastebin.com/fqJrgjZu  
I didn't change any patterns from the default, so the string patterns should 
look the same, but as you can see it's full of info checkpoint logs that I 
originally was trying to suppress. Based on my log4j.properties, the level 
should be set to WARN. I couldn't actually find any .out files on the pod, this 
is from the kubectl logs command. I also didn't see any files in 
/opt/flink/log, which I thought my log4j was specified to do, hence me thinking 
that the properties weren't actually being consumed. I also have the same 
properties in my src/main/resources folder.

3. Hey Yang, yes this is a standalone session cluster. I did specify in the 
docker file to copy the log4j.properties to the /opt/flink/conf folder on the 
image, and I confirmed that the properties are correct when I bash'd into the 
pod and viewed them manually.

Incidentally, I also tried passing the -Dlog4j.configuration argument to the 
programs, and it doesn't work either. And based on what I'm reading on jira, 
that option is not really supported anymore?

Thanks for your responses, folks!
Li

On Mon, Dec 9, 2019 at 7:10 PM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Hi Li Peng,

You are running standalone session cluster or per-job cluster on kubernetes. 
Right?
If so, i think you need to check your log4j.properties in the image, not local. 
The log is
stored to /opt/flink/log/jobmanager.log by default.

If you are running active Kubernetes integration for a fresh taste. The 
following cli option
could be used to remove the redirect.
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% 
%jvmopts% %logging% %class% %args%"

Best,
Yang

vino yang mailto:yanghua1...@gmail.com>> 于2019年12月10日周二 
上午10:55写道:
Hi Li,

A potential reason could be conflicting logging frameworks. Can you share the 
log in your .out file and let us know if the print format of the log is the 
same as the configuration file you gave.

Best,
Vino

Li Peng mailto:li.p...@doordash.com>> 于2019年12月10日周二 
上午10:09写道:
Hey folks, I noticed that my kubernetes flink logs (reached via kubectl logs 
) completely ignore any of the configurations I put into 
/flink/conf/. I set the logger level to WARN, yet I still see INFO level 
logging from flink loggers like 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made copied 
the same properties to /flink/conf/log4j-console.properties and 
log4j-cli.properties.

From what I can tell, kubernetes just listens to stdout and stderr, so 
shouldn't the log4j.properties control output to them? Anyone seen this issue 
before?

Here is my log4j.properties:

# This affects logging for both user code and Flink
log4j.rootLogger=WARN, file, console, stdout

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=WARN

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
%-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
 file, console
log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
log4j.logger.org.apache.flink.runtime.checkpoint=WARN
Thanks,
Li


Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-10 Thread vino yang
Hi kristoffSC,

>> I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

No, open/processXXX/close methods are called in the different stages of a
task thread's life cycle. The framework must keep the call order.

>> The second thing I've noticed is that "open" method was executed only
before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

There is a similar question I joined that you can consider.[1]
There is also another similar question that comes from StackOverflow.[2]

Best,
Vino

[1]:
http://mail-archives.apache.org/mod_mbox/flink-user/201911.mbox/%3CCAArWwf4jmbaFeizO_YBZVBAMyiuvV95DetoVCkj4rJi4PYorpQ%40mail.gmail.com%3E
[2]:
https://stackoverflow.com/questions/54748158/how-could-flink-broadcast-state-be-initialized


KristoffSC  于2019年12月11日周三 上午5:56写道:

> Hi,
> I was playing around with BroadcastProcessFunction and I've observe a
> specific behavior.
>
> My setup:
>
> MapStateDescriptor ruleStateDescriptor = new
> MapStateDescriptor<>(
> "RulesBroadcastState",
> Types.VOID,
> TypeInformation.of(new TypeHint() {
> }));
>
> BroadcastStream processingRulesBroadcastStream =
> processingRulesStream
>.broadcast(ruleStateDescriptor);
>
>
> SingleOutputStreamOperator evaluatedTrades =
> enrichedTransactionStream
> .connect(processingRulesBroadcastStream)
> .process(new DriveEngineRuleOperator())
> .name("Drive Rule Evaluation");
>
> Where DriveEngineRuleOperator extends BroadcastProcessFunction and
> implements open, processElement and processBroadcastElement methods.
>
> I was following Flink's tutorials about broadcast state pattern and my
> "open" method looks like this:
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> processingRulesDesc = new MapStateDescriptor<>(
> "RulesBroadcastState",
> Types.VOID,
> TypeInformation.of(new TypeHint() {
> }));
>
>
> }
>
>
> I've noticed that all methods are called by the same thread. Would it be
> always the case, or could those methods be called by different threads?
>
> The second thing I've noticed is that "open" method was executed only
> before
> the first "fast stream" element was received (before execution of
> processElement method). That means that if I received the control stream
> element first (the broadcast stream element) then method open would not be
> called and I will not initialize my processing rule descriptor - I will
> loose the event.
>
> What are the good practices in this case?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink ML feature

2019-12-10 Thread vino yang
Hi Benoit,

I can only try to ping @Till Rohrmann  @Kurt Young
  who may know more information to answer this question.

Best,
Vino

Benoît Paris  于2019年12月10日周二 下午7:06写道:

> Is there any information as to whether Alink is going to be contributed to
> Apache Flink as the official ML Lib?
>
>
> On Tue, Dec 10, 2019 at 7:11 AM vino yang  wrote:
>
>> Hi Chandu,
>>
>> AFAIK, there is a project named Alink[1] which is the Machine Learning
>> algorithm platform based on Flink, developed by the PAI team of Alibaba
>> computing platform. FYI
>>
>> Best,
>> Vino
>>
>> [1]: https://github.com/alibaba/Alink
>>
>> Tom Blackwood  于2019年12月10日周二 下午2:07写道:
>>
>>> You may try Spark ML, which is a production ready library for ML stuff.
>>>
>>> regards.
>>>
>>> On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:
>>>
 Hello Community,

 Can you please give me some pointers for implementing Machine Learning
 using Flink.

 I see Flink ML libraries were dropped in v1.9. It looks like ML feature
 in Flink going to be enhanced.

 What is the recommended approach for implementing production grade ML
 based apps using Flink? v1.9 is ok?or should wait for 1.10?

 Thanks,
 Chandu

>>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>


Re: Apache Flink - Retries for async processing

2019-12-10 Thread Zhu Zhu
Hi M Singh,

I think you would be able to know the request failure cause and whether it
is recoverable or not.
You can handle the error as you like.
For example, if you think the error is unrecoverable, you can complete the
ResultFuture exceptionally to expose this failure to Flink framework. If
the error is recoverable, you can just retry (or refresh the token), and
only complete the ResultFuture until it succeeds (until timeout).

Thanks,
Zhu Zhu

M Singh  于2019年12月10日周二 下午8:51写道:

> Thanks Jingsong for sharing your solution.
>
> Since both refreshing the token and the actual API request can fail with
> either recoverable and unrecoverable exceptions, are there any patterns for
> retrying both and making the code robust to failures.
>
> Thanks again.
>
> On Monday, December 9, 2019, 10:08:39 PM EST, Jingsong Li <
> jingsongl...@gmail.com> wrote:
>
>
> Hi M Singh,
>
> Our internal has this scenario too, as far as I know, Flink does not have
> this internal mechanism in 1.9 too.
> I can share my solution:
> - In async function, start a thread factory.
> - Send the call to thread factory when this call has failed. Do refresh
> security token too.
> Actually, deal with anything in function. As long as we finally call the
> relevant methods of ResultFuture.
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:
>
> Hi Folks:
>
> I am working on a project where I will be using Flink's async processing
> capabilities.  The job has to make http request using a token.  The token
> expires periodically and needs to be refreshed.
>
> So, I was looking for patterns for handling async call failures and
> retries when the token expires.  I found this link Re: Backoff strategies
> for async IO functions?
> 
>  and
> it appears that Flink does not support retries and periodically refresh a
> security token.  I am using 1.6 at the moment but am planning to migrate to
> 1.9 soon.
>
> Re: Backoff strategies for async IO functions?
>
>
> 
>
>
> If there are any patterns on how to deal with this scenario, please let me
> know.
>
> Thanks
>
> Mans
>
>
>
> --
> Best, Jingsong Lee
>


Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration

2019-12-10 Thread Yangze Guo
Thanks for the feedback, Yang.

Some updates I want to share in this thread.
I have built a PoC version of Meos e2e test with WordCount
workflow.[1] Then, I ran it in the testing environment. As the result
shown here[2]:
- For pulling image from DockerHub, it took 1 minute and 21 seconds
- For building it locally, it took 2 minutes and 54 seconds.

I prefer building it locally. Although it is slower, I think the time
overhead, comparing to the cost of maintaining the image in DockerHub
and the whole test process, is trivial for building or pulling the
image.

I look forward to hearing from you. ;)

Best,
Yangze Guo

[1]https://github.com/KarmaGYZ/flink/commit/0406d942446a1b17f81d93235b21a829bf88ccf0
[2]https://travis-ci.org/KarmaGYZ/flink/jobs/623207957
Best,
Yangze Guo

On Mon, Dec 9, 2019 at 2:39 PM Yang Wang  wrote:
>
> Thanks Yangze for starting this discussion.
>
> Just share my thoughts.
>
> If the mesos official docker image could not meet our requirement, i suggest 
> to build the image locally.
> We have done the same things for yarn e2e tests. This way is more flexible 
> and easy to maintain. However,
> i have no idea how long building the mesos image locally will take. Based on 
> previous experience of yarn, i
> think it may not take too much time.
>
>
>
> Best,
> Yang
>
> Yangze Guo  于2019年12月7日周六 下午4:25写道:
>>
>> Thanks for your feedback!
>>
>> @Till
>> Regarding the time overhead, I think it mainly come from the network
>> transmission. For building the image locally, it will totally download
>> 260MB files including the base image and packages. For pulling from
>> DockerHub, the compressed size of the image is 347MB. Thus, I agree
>> that it is ok to build the image locally.
>>
>> @Piyush
>> Thank you for offering the help and sharing your usage scenario. In
>> current stage, I think it will be really helpful if you can compress
>> the custom image[1] or reduce the time overhead to build it locally.
>> Any ideas for improving test coverage will also be appreciated.
>>
>> [1]https://hub.docker.com/layers/karmagyz/mesos-flink/latest/images/sha256-4e1caefea107818aa11374d6ac8a6e889922c81806f5cd791ead141f18ec7e64
>>
>> Best,
>> Yangze Guo
>>
>> On Sat, Dec 7, 2019 at 3:17 AM Piyush Narang  wrote:
>> >
>> > +1 from our end as well. At Criteo, we are running some Flink jobs on 
>> > Mesos in production to compute short term features for machine learning. 
>> > We’d love to help out and contribute on this initiative.
>> >
>> > Thanks,
>> > -- Piyush
>> >
>> >
>> > From: Till Rohrmann 
>> > Date: Friday, December 6, 2019 at 8:10 AM
>> > To: dev 
>> > Cc: user 
>> > Subject: Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration
>> >
>> > Big +1 for adding a fully working e2e test for Flink's Mesos integration. 
>> > Ideally we would have it ready for the 1.10 release. The lack of such a 
>> > test has bitten us already multiple times.
>> >
>> > In general I would prefer to use the official image if possible since it 
>> > frees us from maintaining our own custom image. Since Java 9 is no longer 
>> > officially supported as we opted for supporting Java 11 (LTS) it might not 
>> > be feasible, though. How much longer would building the custom image vs. 
>> > downloading the custom image from DockerHub be? Maybe it is ok to build 
>> > the image locally. Then we would not have to maintain the image.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Fri, Dec 6, 2019 at 11:05 AM Yangze Guo 
>> > mailto:karma...@gmail.com>> wrote:
>> > Hi, all,
>> >
>> > Currently, there is no end to end test or IT case for Mesos deployment
>> > while the common deployment related developing would inevitably touch
>> > the logic of this component. Thus, some work needs to be done to
>> > guarantee experience for both Meos users and contributors. After
>> > offline discussion with Till and Xintong, we have some basic ideas and
>> > would like to start a discussion thread on adding end to end tests for
>> > Flink's Mesos integration.
>> >
>> > As a first step, we would like to keep the scope of this contribution
>> > to be relative small. This may also help us to quickly get some basic
>> > test cases that might be helpful for the upcoming 1.10 release.
>> >
>> > As far as we can think of, what needs to be done is to setup a Mesos
>> > framework during the testing and determine which tests need to be
>> > included.
>> >
>> >
>> > ** Regarding the Mesos framework, after trying out several approaches,
>> > I find that setting up Mesos in docker is probably what we want. The
>> > resources needed for building and setting up Mesos from source is
>> > probably not affordable in most of the scenarios. So, the one open
>> > question that worth discussion is the choice of Docker image. We have
>> > come up with two options.
>> >
>> > - Using official Mesos image[1]
>> > The official image was the first alternative that come to our mind,
>> > but we run into some sort of Java version compatibility problem that
>> > leads to failures o

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-10 Thread Li Peng
1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
/opt/flink/bin/taskmanager.sh on my job and task managers respectively.
It's based on the setup described here:
http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
I haven't tried the configmap approach yet, does it also replace the conf
files in /opt/flink/conf?

2. Hey Vino, here's a sample of the kubernetes:
https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
default, so the string patterns should look the same, but as you can see
it's full of info checkpoint logs that I originally was trying to suppress.
Based on my log4j.properties, the level should be set to WARN. I couldn't
actually find any .out files on the pod, this is from the kubectl logs
command. I also didn't see any files in /opt/flink/log, which I thought my
log4j was specified to do, hence me thinking that the properties weren't
actually being consumed. I also have the same properties in my
src/main/resources folder.

3. Hey Yang, yes this is a standalone session cluster. I did specify in the
docker file to copy the log4j.properties to the /opt/flink/conf folder on
the image, and I confirmed that the properties are correct when I bash'd
into the pod and viewed them manually.

Incidentally, I also tried passing the -Dlog4j.configuration argument to
the programs, and it doesn't work either. And based on what I'm reading on
jira, that option is not really supported anymore?

Thanks for your responses, folks!
Li

On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:

> Hi Li Peng,
>
> You are running standalone session cluster or per-job cluster on
> kubernetes. Right?
> If so, i think you need to check your log4j.properties in the image, not
> local. The log is
> stored to /opt/flink/log/jobmanager.log by default.
>
> If you are running active Kubernetes integration for a fresh taste. The
> following cli option
> could be used to remove the redirect.
> -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
> %jvmopts% %logging% %class% %args%"
>
> Best,
> Yang
>
> vino yang  于2019年12月10日周二 上午10:55写道:
>
>> Hi Li,
>>
>> A potential reason could be conflicting logging frameworks. Can you share
>> the log in your .out file and let us know if the print format of the log is
>> the same as the configuration file you gave.
>>
>> Best,
>> Vino
>>
>> Li Peng  于2019年12月10日周二 上午10:09写道:
>>
>>> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
>>> logs *) completely ignore any of the configurations I put
>>> into /flink/conf/. I set the logger level to WARN, yet I still see INFO
>>> level logging from flink loggers
>>> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
>>> copied the same properties to /flink/conf/log4j-console.properties
>>> and log4j-cli.properties.
>>>
>>> From what I can tell, kubernetes just listens to stdout and stderr, so
>>> shouldn't the log4j.properties control output to them? Anyone seen this
>>> issue before?
>>>
>>> Here is my log4j.properties:
>>>
>>>
>>> # This affects logging for both user code and Flink
>>> log4j.rootLogger=WARN, file, console, stdout
>>>
>>> # Uncomment this if you want to _only_ change Flink's logging
>>> log4j.logger.org.apache.flink=WARN
>>>
>>> # The following lines keep the log level of common libraries/connectors on
>>> # log level INFO. The root logger does not override this. You have to 
>>> manually
>>> # change the log levels here.
>>> log4j.logger.akka=INFO
>>> log4j.logger.org.apache.kafka=INFO
>>> log4j.logger.org.apache.hadoop=INFO
>>> log4j.logger.org.apache.zookeeper=INFO
>>>
>>> # Log all infos in the given file
>>> log4j.appender.file=org.apache.log4j.FileAppender
>>> log4j.appender.file.file=${log.file}
>>> log4j.appender.file.append=false
>>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>>> %-5p %-60c %x - %m%n
>>>
>>> # Log all infos to the console
>>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>>> %-5p %-60c %x - %m%n
>>>
>>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>>>  file, console
>>> log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
>>> log4j.logger.org.apache.flink.runtime.checkpoint=WARN
>>>
>>> Thanks,
>>> Li
>>>
>>


Re: KeyBy/Rebalance overhead?

2019-12-10 Thread Komal Mariam
Thank you so much for the detailed reply. I understand the usage for keyBy
a lot better now.  You are correct about the time variation too. We will
apply different network settings and extend our datasets to check
performance on different use cases.

On Mon, 9 Dec 2019 at 20:45, Arvid Heise  wrote:

> Hi Komal,
>
> as a general rule of thumb, you want to avoid network shuffles as much as
> possible. As vino pointed out, you need to reshuffle, if you need to group
> by key. Another frequent usecase is for a rebalancing of data in case of a
> heavy skew. Since neither applies to you, removing the keyby is the best
> option.
>
> If you want to retain it, because you may experience skew in the future,
> there are only a couple of things you can do. You may tinker with
> networking settings to have smaller/larger network buffers (smaller = less
> latency, larger = more throughput) [1]. Of course, you get better results
> if you have a faster network (running in the cloud, you can play around
> with different adapters). Also you could try if less/more machines are
> actually faster (less machines = less network traffic, more machines = more
> compute power).
>
> In any case, your data volume is so low that I would probably not optimize
> too much. We are talking about seconds and the times may vary largely from
> run to run, because of the low data volume. If you want to test the
> throughput as a POC for a larger volume, I'd either generate a larger
> sample or replicate it to get more reliable numbers. In any case, try to
> have your final use case in mind when deciding for an option.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-the-network-buffers
>
> On Mon, Dec 9, 2019 at 10:25 AM vino yang  wrote:
>
>> Hi Komal,
>>
>> Actually, the main factor about choosing the type of the partition
>> depends on your business logic. If you want to do some aggregation logic
>> based on a group. You must choose KeyBy to guarantee the correctness
>> semantics.
>>
>> Best,
>> Vino
>>
>> Komal Mariam  于2019年12月9日周一 下午5:07写道:
>>
>>> Thank you @vino yang   for the reply. I suspect
>>> keyBy will beneficial in those cases where my subsequent operators are
>>> computationally intensive. Their computation time being > than network
>>> reshuffling cost.
>>>
>>> Regards,
>>> Komal
>>>
>>> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>>>
 Hi Komal,

 KeyBy(Hash Partition, logically partition) and rebalance(physical
 partition) are both one of the partitions been supported by Flink.[1]

 Generally speaking, partitioning may cause network
 communication(network shuffles) costs which may cause more time cost. The
 example provided by you may be benefit from operator chain[2] if you remove
 the keyBy operation.

 Best,
 Vino

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
 [2]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains

 Komal Mariam  于2019年12月9日周一 上午9:11写道:

> Anyone?
>
> On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
> wrote:
>
>> Hello everyone,
>>
>> I want to get some insights on the KeyBy (and Rebalance) operations
>> as according to my understanding they partition our tasks over the 
>> defined
>> parallelism and thus should make our pipeline faster.
>>
>> I am reading a topic which contains 170,000,000 pre-stored records
>> with 11 Kafka partitions and replication factor of 1.   Hence I use
>> .setStartFromEarliest() to read the stream.
>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10
>> cores and 1 job manager with 6 cores. (10 task slots per TM hence I set
>> environment parallelism to 30).
>>
>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>> keeping the number of records fixed to get a handle on how fast they're
>> being processed.
>>
>> When I remove keyBy, I get the same results in 39 secs as opposed to
>> 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 
>> or
>> below I still get the same extra overhead of 9 to 13secs. My data is 
>> mostly
>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>> likewise has the same latency as keyBy.
>>
>>  What I want to know is what may be causing this overhead? And is
>> there any way to decrease it?
>>
>> Here's the script I'm running for testing purposes:
>> --
>> DataStream JSONStream  = env.addSource(new
>> FlinkKafkaConsumer<>("data", new
>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>
>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>
>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>
>> publi

Interval Join Late Record Metrics

2019-12-10 Thread Chris Gillespie
Hello Flink users, first time poster here.

I'm using an interval join in my Flink project, however I haven't found
where late records get logged in metrics. Window Joins have
"numLateRecordsDropped" implemented
,
but is there an equivalent within an interval join?

My main use case is to track how often a record falls outside of the lower
and upper bounds when trying to join two streams. Interval Join looks like
it simply short circuits
when
there is a late record? Maybe I am not understanding what defines a late
record in this situation.

Is there a good way to monitor when an interval join fails to join two
streams? Currently I'm looking at the delta between two operator metrics,
but it hasn't looked that reliable so far.

Thanks,
Chris Gillespie


Re: Help to Understand cutoff memory

2019-12-10 Thread Theo Diefenthal
Hi Lu, 

I found this talk on last Flink Forward in Berlin very helpful in order to 
understand JVM RAM and cutoff memory [1]. Maybe it helps you understand that 
stuff better. 
In my experiences on YARN, the author was totally correct. I was able to 
reproduce that by assigning something about 12GB for taskmanager memory, 
setting cutoff to 0.15 and let network stay on 0.1 , I got something round 
about 8-9GB RAM for my real taskmanager JVM memory. 

Best regards 
Theo 

[1] [ https://www.youtube.com/watch?v=aq1Whga-RJ4 | 
https://www.youtube.com/watch?v=aq1Whga-RJ4 ] 


Von: "Lu Niu"  
An: "user"  
Gesendet: Dienstag, 10. Dezember 2019 22:58:01 
Betreff: Help to Understand cutoff memory 

Hi, flink users 

I have some question regarding memory allocation. According to doc, 
containerized.heap-cutoff-ratio means: 

``` 
Percentage of heap space to remove from containers (YARN / Mesos), to 
compensate for other JVM memory usage 
``` 
However, I find cutoff memory is actually treated as "part of direct memory": 
[ 
https://github.com/apache/flink/blob/release-1.9.1/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java#L67
 | 
https://github.com/apache/flink/blob/release-1.9.1/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java#L67
 ] 

The code above shows the max of MaxDirectMemorySize of jvm process is the sum 
of networkbuffer + cutoff. Then, there is no guarantee of a fixed headroom in 
container memory. In our case we use rocksDB memory and we found many times 
DirectMemory is close to maximum, leaving less headroom in the container for 
rocksdb. Could someone help on this? thanks 

Best 
Lu 


Processing Events by custom rules kept in Broadcast State

2019-12-10 Thread KristoffSC
Hi,
I think this would be the very basic use case for Broadcast State Pattern
but I would like to know what are the best approaches to solve this problem.

I have an operator that extends BroadcastProcessFunction. The
brodcastElement is an element sent as Json format message by Kafka. It
describes a processing rules like key/value mapping, like so: ruleName -
ruleValue (both strings).

In processElement method I'm delegating to my custom RuleEngineService. It
is a class that has the "rule engine" logic and accepts received event and
"set of processing rules" in some form.

What would be the best approaches:
1. Keep original Json String in broadcast state. Whenever there is a new set
of rules streamed by Kafka, then in processBroadcastElement method parse
this Json, map to some RuleParams abstraction and keep it as transient field
in my BroadcastProcessFunction operator. Save Json in broadcast state. Pass
RuleParams to rule engine service.

2. Same as 1 but instead keeping Raw Json String in broadcast state, keep
already parsed JsonObject, somethign like ObjectNode from KafkaConnector
lib.

3. Keep each pair of ruleName - ruleValue (both strings) separate in
broadcast state. In processBrodcastElement method parse the received Json
and update the state. In processElement method take all rules, build
RulePArams object (basically a map) and pass them to rule engine

4. Parse Json in processBroadcastElement method, map it to RuleParams
abstraction method, keeping rules in a hashMap and keep this RulePrams in
broadcast state

5. any other...





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Help to Understand cutoff memory

2019-12-10 Thread Lu Niu
Hi, flink users

I have some question regarding memory allocation. According to doc,
containerized.heap-cutoff-ratio means:

```
Percentage of heap space to remove from containers (YARN / Mesos), to
compensate for other JVM memory usage
```
However, I find cutoff memory is actually treated as "part of direct
memory":
https://github.com/apache/flink/blob/release-1.9.1/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java#L67

The code above shows the max of MaxDirectMemorySize of jvm process is the
sum of networkbuffer + cutoff. Then, there is *no guarantee of a fixed
headroom* in container memory. In our case we use rocksDB memory and we
found many times DirectMemory is close to maximum, leaving less headroom in
the container for rocksdb. Could someone help on this? thanks

Best
Lu


Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-10 Thread KristoffSC
Hi,
I was playing around with BroadcastProcessFunction and I've observe a
specific behavior. 

My setup:

MapStateDescriptor ruleStateDescriptor = new
MapStateDescriptor<>(
"RulesBroadcastState",
Types.VOID,
TypeInformation.of(new TypeHint() {
}));

BroadcastStream processingRulesBroadcastStream =
processingRulesStream
   .broadcast(ruleStateDescriptor);


SingleOutputStreamOperator evaluatedTrades =
enrichedTransactionStream
.connect(processingRulesBroadcastStream)
.process(new DriveEngineRuleOperator())
.name("Drive Rule Evaluation");

Where DriveEngineRuleOperator extends BroadcastProcessFunction and
implements open, processElement and processBroadcastElement methods.

I was following Flink's tutorials about broadcast state pattern and my
"open" method looks like this:

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
processingRulesDesc = new MapStateDescriptor<>(
"RulesBroadcastState",
Types.VOID,
TypeInformation.of(new TypeHint() {
}));

  
}


I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

The second thing I've noticed is that "open" method was executed only before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

What are the good practices in this case?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-10 Thread Devin Bost
I did confirm that I got no resulting output after 20 seconds and after
sending additional data after waiting over a minute between batches of
data.

My code looks like this:

PulsarSourceBuilder builder = PulsarSourceBuilder
  .builder(new SimpleStringSchema())
  .serviceUrl(SERVICE_URL)
  .topic(INPUT_TOPIC)
  .subscriptionName(SUBSCRIPTION_NAME);
SourceFunction src = builder.build();
DataStream dataStream = env.addSource(src);

DataStream combinedEnvelopes = dataStream
  .map(new MapFunction>() {
 @Override
 public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
 }
  })
  .keyBy(0)
  //.timeWindow(Time.seconds(5))
  .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
  .aggregate(new JsonConcatenator());
//dataStream.print();

Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
logger.info("Ran dataStream. Adding sink next");
combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
  SERVICE_URL,
  OUTPUT_TOPIC,
  new AuthenticationDisabled(), // probably need to fix //
AuthenticationTls()
  combinedData -> combinedData.toString().getBytes(UTF_8),
  combinedData -> "test")
);
logger.info("Added sink. Executing job.");
// execute program
env.execute("Flink Streaming Java API Skeleton");


Here is the JsonConcatenator class:

private static class JsonConcatenator
  implements AggregateFunction,
Tuple2, String> {
   Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
   @Override
   public Tuple2 createAccumulator() {
  return new Tuple2("","");
   }

   @Override
   public Tuple2 add(Tuple2 value,
Tuple2 accumulator) {
  logger.info("Running Add on value.f0: " + value.f0 + " and
value.f1: " + value.f1);
  return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
   }

   @Override
   public String getResult(Tuple2 accumulator) {
  logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
  return "[" + accumulator.f1.substring(1) + "]";
   }

   @Override
   public Tuple2 merge(Tuple2 a,
Tuple2 b) {
  // Merge is applied when you allow lateness.
  logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
a.f1 + " and b.f1: " + b.f1);
  if(b.f1.charAt(0) == '['){
 logger.info("During merge, we detected the right message
starts with the '[' character. Removing it.");
 b.f1 = b.f1.substring(1);
  }
  return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
   }
}


Devin G. Bost

Re:

getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
> Best,
> Arvid


On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise  wrote:

> getResult will only be called when the window is triggered. For a
> fixed-time window, it triggers at the end of the window.
>
> However, for EventTimeSessionWindows you need to have gaps in the data.
> Can you verify that there is actually a 20sec pause inbetween data points
> for your keys?
> Additionally, it may also be an issue with extracting the event time from
> the sources. Could you post the relevant code as well?
>
> Best,
>
> Arvid
>
> On Mon, Dec 9, 2019 at 8:51 AM vino yang  wrote:
>
>> Hi dev,
>>
>> The time of the window may have different semantics.
>> In the session window, it's only a time gap, the size of the window is
>> driven via activity events.
>> In the tumbling or sliding window, it means the size of the window.
>>
>> For more details, please see the official documentation.[1]
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows
>>
>>
>>
>> devinbost  于2019年12月6日周五 下午10:39写道:
>>
>>> I think there might be a bug in
>>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>>>  (unless I'm just not using it correctly) because I'm able to get output
>>> when I use the simpler window
>>> `.timeWindow(Time.seconds(5))`
>>> However, I don't get any output when I used the session-based window.
>>>
>>>
>>> devinbost wrote
>>> > I added logging statements everywhere in my code, and I'm able to see
>>> my
>>> > message reach the `add` method in the AggregateFunction that I
>>> > implemented,
>>> > but the getResult method is never called.
>>> >
>>> > In the code below, I also never see the:
>>> >  "Ran dataStream. Adding sink next"
>>> > line appear in my log, and the only log statements from the
>>> > JsonConcatenator
>>> > class come from the `add` method, as shown below.
>>> >
>>> >
>>> > DataStream
>>> > 
>>> >  combinedEnvelopes = dataStream
>>> > .map(new MapFunction

Apache Flink - Clarifications about late side output

2019-12-10 Thread M Singh
Hi:
I have a few questions about the side output late data.  
Here is the API
stream
   .keyBy(...)   <-  keyed versus non-keyed windows
   .window(...)  <-  required: "assigner"
  [.trigger(...)]<-  optional: "trigger" (else default trigger)
  [.evictor(...)]<-  optional: "evictor" (else no evictor)
  [.allowedLateness(...)]<-  optional: "lateness" (else zero)
  [.sideOutputLateData(...)] <-  optional: "output tag" (else no side 
output for late data)
   .reduce/aggregate/fold/apply()  <-  required: "function"
  [.getSideOutput(...)]  <-  optional: "output tag"

Apache Flink 1.9 Documentation: Windows

| 
| 
|  | 
Apache Flink 1.9 Documentation: Windows


 |

 |

 |



Here is the documentation:
Late elements considerations

When specifying an allowed lateness greater than 0, the window along with its 
content is kept after the watermark passes the end of the window. In these 
cases, when a late but not dropped element arrives, it could trigger another 
firing for the window. These firings are called late firings, as they are 
triggered by late events and in contrast to the main firing which is the first 
firing of the window. In case of session windows, late firings can further lead 
to merging of windows, as they may “bridge” the gap between two pre-existing, 
unmerged windows.

Attention You should be aware that the elements emitted by a late firing should 
be treated as updated results of a previous computation, i.e., your data stream 
will contain multiple results for the same computation. Depending on your 
application, you need to take these duplicated results into account or 
deduplicate them.

Questions:
1. If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness),     (a) it is 
considered late and  included in the window function as a late firing ?      
(b) Are the late firings under the control of the trigger ?      (c) If there 
are may events like this - are there multiple window function invocations ?     
(d) Are these events (still within window end + allowed lateness) also emitted 
via the side output late data ?2. If an event arrives after the window end + 
allowed lateness -     (a) Is it excluded from the window function but still 
emitted from the side output late data ?      (b) And if it is emitted is there 
any attribute which indicates for which window it was a late event ?      (c) 
Is there any time limit while the late side output remains active for a 
particular window or all late events channeled to it ?
Thanks
Thanks
Mans






Re: Flink 'Job Cluster' mode Ui Access

2019-12-10 Thread Jatin Banger
Yes, I did.

On Tue, Dec 10, 2019 at 3:47 PM Arvid Heise  wrote:

> Hi Jatin,
>
> just to be sure. Did you increase the log level to debug [1] before
> checking for *StaticFileServerHandler*?
>
> Best,
>
> Arvid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html#configuring-log4j
>
> On Mon, Dec 9, 2019 at 7:54 AM Jatin Banger 
> wrote:
>
>> Hi,
>>
>> I have checked the logs with this keyword  *StaticFileServerHandler   *in
>> it, But there were no logs coming for "Flink Job Cluster".
>> Then i checked for Flink Session Cluster, i was able to find the logs for
>> the *StaticFileServerHandler *keyword.
>>
>> Can i raise this as bug ?
>>
>> Best Regards,
>> Jatin
>>
>>
>> On Thu, Dec 5, 2019 at 8:59 PM Chesnay Schepler 
>> wrote:
>>
>>> Ok, it's good to know that the WebUI files are there.
>>>
>>> Please enable DEBUG logging and try again, searching for messages from
>>> the StaticFileServerHandler.
>>>
>>> This handler logs every file that is requested (which effectively
>>> happens when the WebUI is being served); let's see what is actually being
>>> requested.
>>>
>>> On 05/12/2019 05:57, Jatin Banger wrote:
>>>
>>> I have tried that already using
>>> '$FLINK_HOME/bin/jobmanager.sh" start-foreground
>>> Ui comes fine with this one.
>>> Which means web/index.html is present.
>>>
>>>
>>> On Wed, Dec 4, 2019 at 9:01 PM Chesnay Schepler 
>>> wrote:
>>>
 hmm...this is quite odd.

 Let's try to narrow things down a bit.

 Could you try starting a local cluster (using the same distribution)
 and checking whether the UI is accessible?

 Could you also check whether the flink-dist.jar in /lib contains
 web/index.html?
 On 04/12/2019 06:02, Jatin Banger wrote:

 Hi,

 I am using flink binary directly.

 I am using this command to deploy the script.

 "$FLINK_HOME/bin/standalone-job.sh"
 start-foreground --job-classname ${ARGS_FOR_JOB}
 where ARGS_FOR_JOB contain job class name and all other necessary
 details needed by the job.

 Best regards,
 Jatin


 On Fri, Nov 29, 2019 at 4:18 PM Chesnay Schepler 
 wrote:

> To clarify, you ran "mvn package -pl flink-dist -am" to build Fink?
>
> If so, could you run that again and provide us with the maven output?
>
> On 29/11/2019 11:23, Jatin Banger wrote:
>
> Hi,
>
> @vino yang   I am using flink 1.8.1
>
> I am using the following procedure for the deployment:
>
> https://github.com/apache/flink/blob/master/flink-container/docker/README.md
>
> And i tried accessing the path you mentioned:
>
> # curl :4081/#/overview
> {"errors":["Not found."]}
>
> Best Regards,
> Jatin
>
> On Thu, Nov 28, 2019 at 10:21 PM Chesnay Schepler 
> wrote:
>
>> Could you try accessing :/#/overview ?
>>
>> The REST API is obviously accessible, and hence the WebUI should be
>> too.
>>
>> How did you setup the session cluster? Are you using some custom
>> Flink build or something, which potentially excluded flink-runtime-web 
>> from
>> the classpath?
>>
>> On 28/11/2019 10:02, Jatin Banger wrote:
>>
>> Hi,
>>
>> I checked the log file there is no error.
>> And I checked the pods internal ports by using rest api.
>>
>> # curl : 4081
>> {"errors":["Not found."]}
>> 4081 is the Ui port
>>
>> # curl :4081/config
>> {"refresh-interval":3000,"timezone-name":"Coordinated Universal
>> Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
>> @ 11.02.2019 @ 22:17:09 CST"}
>>
>> # curl :4081/jobs
>> {"jobs":[{"id":"___job_Id_","status":"RUNNING"}]}
>>
>> Which shows the state of the job as running.
>>
>> What else can we do ?
>>
>> Best regards,
>> Jatin
>>
>> On Thu, Nov 28, 2019 at 1:28 PM vino yang 
>> wrote:
>>
>>> Hi Jatin,
>>>
>>> Flink web UI does not depend on any deployment mode.
>>>
>>> You should check if there are error logs in the log file and the job
>>> status is running state.
>>>
>>> Best,
>>> Vino
>>>
>>> Jatin Banger  于2019年11月28日周四 下午3:43写道:
>>>
 Hi,

 It seems there is Web Ui for Flink Session cluster, But for Flink
 Job Cluster it is Showing

 {"errors":["Not found."]}

 Is it the expected behavior for Flink Job Cluster Mode ?

 Best Regards,
 Jatin

>>>
>>
>

>>>


Order events by filed that does not represent time

2019-12-10 Thread KristoffSC
Hi,
Is it possible to use an field that does not represent timestamp to order
events in Flink's pipeline?

In other words, I will receive a stream of events that will ha a sequence
number (gaps are possible).
Can I maintain the order of those events based on this field same as I would
do for time representing field?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Event Timestamp corrupted by timezone

2019-12-10 Thread Timo Walther

Hi,

I hope we can solve this issues with the new type system. The core 
problem is the old planner uses java.sql.Timestamp which depends on the 
timezone of the current machine. I would recommend to set everything to 
UTC if possible for now.


Regards,
Timo


On 03.12.19 18:49, Lasse Nedergaard wrote:

Hi.

We have the same Challenges. I asked on Flink forward and it’s a known 
problem. We input in utc but Flink output in local machine time. We have 
created a function that converts it back to utc before collecting to 
down stream.


Med venlig hilsen / Best regards
Lasse Nedergaard


Den 3. dec. 2019 kl. 15.16 skrev Wojciech Indyk >:



Hi!
I use Flink 1.8 with Scala. I think I've found a problem with event 
timestamps in TableAPI. When I mark my timestamp: Long as .rowtime and 
then save it back to stream as sql.Timestamp I will get wrong .getTime 
result. The gist for reproduction is here: 
https://gist.github.com/woj-i/b1dfbb71590b7f1c0c58be1f9e41c610

When I change my timezome from GMT+1 to GMT everything works ok.
I've found this post from March 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/event-time-timezone-is-not-correct-tt26457.html 
but it's not resolved. The most relevant ticket I've found 
https://issues.apache.org/jira/browse/FLINK-8353 seems to not include 
the problem I described.


1. Can you confirm it's a bug?
2. Should I post this bug somewhere to be at least planned to solve?
3. Can you recommend me a workaround for the described problem?

--
Kind regards/ Pozdrawiam,
Wojciech Indyk




Re: Job manager is failing to start with an S3 no key specified exception [1.7.2]

2019-12-10 Thread Andrey Zagrebin
`flink-2`Hi Harshith,

Could you share your full log files from the job master?
As I understand, this stack trace already belongs to a failover attempt,
what was the original cause of failover? Do you still have any other job
state in S3 for this cluster id `flink-2`?
Have you tried the latest version of Flink 1.9?

Best,
Andrey

On Mon, Dec 9, 2019 at 12:37 PM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> I'm running a standalone Flink cluster with Zookeeper and S3 for high
> availability storage. All of a sudden, the job managers started failing
> with an S3 `UnrecoverableS3OperationException` error. Here is the full
> error trace -
>
>
>
> ```
>
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
>
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>
> at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
>
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>
> ... 7 more
>
> Caused by:
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$UnrecoverableS3OperationException:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> The specified key does not exist. (Service: Amazon S3; Status Code: 404;
> Error Code: NoSuchKey; Request ID: 1769066EBD605AB5; S3 Extended Request
> ID:
> K8jjbsE4DPAsZJDVJKBq3Nh0E0o+feafefavbvbaae+nbUTphHHw73/eafafefa+dsVMR0=),
> S3 Extended Request ID:
> lklalkioe+eae2234+nbUTphHHw73/gVSclc1o1YH7M0MeNjmXl+dsVMR0= (Path:
> s3://abc-staging/flink/jobmanagerha/flink-2/blob/job_3e16166a1122885eb6e9b2437929b266/blob_p-3b687174148e9e1dd951f2a9fbec83f4fcd5281e-b85417f69b354c83b270bf01dcf389e0)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:908)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:893)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:878)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:871)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:810)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>
> at
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:809)
>
> ... 10 more
>
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> The specified key does not exist. (Service: Amazon S3; Status Code: 404;
> Error Code: NoSuchKey; Request ID: 1769066EBaD6aefB5; S3 Extended Request
> ID: fealloga+4rVwsF+nbUTphHHw73/gVSclc1o1YH7M0MeNjmXl+dsVMR0=), S3 Extended
> Request ID:
> K8jjbsE4DPAsZJDVJKBq3Nh0E0o+4rVwsF+nbUTphHHweafga/lc1o1YH7M0MeNjmXl+dsVMR0=
>
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>
> at
> org.

Re: Side output question

2019-12-10 Thread Arvid Heise
There is no clear reference as it's not a use case that has occurred yet.
I'd be careful with all metrics related to output. Shuffle service should
be fine [1] as side-output also go over it.
I wouldn't be surprised if currentOutputWatermark is not updated though.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#default-shuffle-service

On Tue, Dec 10, 2019 at 1:47 PM M Singh  wrote:

> Thanks Arvid for your answer.
>
> Can you please point me to any documentation/reference as to which metrics
> might be impacted ? Also, let me know of any other pitfall.
>
> Once again, I appreciate your help.
>
> On Tuesday, December 10, 2019, 03:23:01 AM EST, Arvid Heise <
> ar...@ververica.com> wrote:
>
>
> Hi Mans,
>
> there should be no issue to only have side-outputs in your operator. There
> should also be no big drawbacks. I guess mostly some metrics will not be
> properly populated, but you can always populate them manually or add new
> ones.
>
> Best,
>
> Arvid
>
> On Mon, Dec 2, 2019 at 8:40 PM M Singh  wrote:
>
> Hi:
>
> I am replacing SplitOperator in my flink application with a simple
> processor with side outputs.
>
> My questions is that does the main stream from which we get the side
> outputs need to have any events (ie, produced using by the using
> collector.collect) ?  Or can we have all the output as side outputs ? Also
> are there any pros and cons of at least one main collected output vs all
> side outputs ?
>
> Thanks
>
> Mans
>
>


Re: Apache Flink - Retries for async processing

2019-12-10 Thread M Singh
 Thanks Jingsong for sharing your solution.
Since both refreshing the token and the actual API request can fail with either 
recoverable and unrecoverable exceptions, are there any patterns for retrying 
both and making the code robust to failures.
Thanks again.
On Monday, December 9, 2019, 10:08:39 PM EST, Jingsong Li 
 wrote:  
 
 Hi M Singh,
Our internal has this scenario too, as far as I know, Flink does not have this 
internal mechanism in 1.9 too.I can share my solution:- In async function, 
start a thread factory.- Send the call to thread factory when this call has 
failed. Do refresh security token too.Actually, deal with anything in function. 
As long as we finally call the relevant methods of ResultFuture.
Best,Jingsong Lee
On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:

Hi Folks:
I am working on a project where I will be using Flink's async processing 
capabilities.  The job has to make http request using a token.  The token 
expires periodically and needs to be refreshed.
So, I was looking for patterns for handling async call failures and retries 
when the token expires.  I found this link Re: Backoff strategies for async IO 
functions? and it appears that Flink does not support retries and periodically 
refresh a security token.  I am using 1.6 at the moment but am planning to 
migrate to 1.9 soon.

| 
| 
|  | 
Re: Backoff strategies for async IO functions?


 |

 |

 |

 
If there are any patterns on how to deal with this scenario, please let me know.
Thanks
Mans



-- 
Best, Jingsong Lee  

Re: Side output question

2019-12-10 Thread M Singh
 Thanks Arvid for your answer.
Can you please point me to any documentation/reference as to which metrics 
might be impacted ? Also, let me know of any other pitfall.
Once again, I appreciate your help.
On Tuesday, December 10, 2019, 03:23:01 AM EST, Arvid Heise 
 wrote:  
 
 Hi Mans,
there should be no issue to only have side-outputs in your operator. There 
should also be no big drawbacks. I guess mostly some metrics will not be 
properly populated, but you can always populate them manually or add new ones.
Best,
Arvid

On Mon, Dec 2, 2019 at 8:40 PM M Singh  wrote:

Hi:
I am replacing SplitOperator in my flink application with a simple processor 
with side outputs.

My questions is that does the main stream from which we get the side outputs 
need to have any events (ie, produced using by the using collector.collect) ?  
Or can we have all the output as side outputs ? Also are there any pros and 
cons of at least one main collected output vs all side outputs ?
Thanks
Mans
  

Re: Flink ML feature

2019-12-10 Thread Benoît Paris
Is there any information as to whether Alink is going to be contributed to
Apache Flink as the official ML Lib?


On Tue, Dec 10, 2019 at 7:11 AM vino yang  wrote:

> Hi Chandu,
>
> AFAIK, there is a project named Alink[1] which is the Machine Learning
> algorithm platform based on Flink, developed by the PAI team of Alibaba
> computing platform. FYI
>
> Best,
> Vino
>
> [1]: https://github.com/alibaba/Alink
>
> Tom Blackwood  于2019年12月10日周二 下午2:07写道:
>
>> You may try Spark ML, which is a production ready library for ML stuff.
>>
>> regards.
>>
>> On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:
>>
>>> Hello Community,
>>>
>>> Can you please give me some pointers for implementing Machine Learning
>>> using Flink.
>>>
>>> I see Flink ML libraries were dropped in v1.9. It looks like ML feature
>>> in Flink going to be enhanced.
>>>
>>> What is the recommended approach for implementing production grade ML
>>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>>
>>> Thanks,
>>> Chandu
>>>
>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Basic question about flink programms

2019-12-10 Thread KristoffSC
Hi Arvid Heise-3,
Thanks for your answer. I took this approach. 


I did not want to start a new thread since I wanted to avoid "subject
duplication" :)

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-10 Thread Arvid Heise
getResult will only be called when the window is triggered. For a
fixed-time window, it triggers at the end of the window.

However, for EventTimeSessionWindows you need to have gaps in the data. Can
you verify that there is actually a 20sec pause inbetween data points for
your keys?
Additionally, it may also be an issue with extracting the event time from
the sources. Could you post the relevant code as well?

Best,

Arvid

On Mon, Dec 9, 2019 at 8:51 AM vino yang  wrote:

> Hi dev,
>
> The time of the window may have different semantics.
> In the session window, it's only a time gap, the size of the window is
> driven via activity events.
> In the tumbling or sliding window, it means the size of the window.
>
> For more details, please see the official documentation.[1]
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows
>
>
>
> devinbost  于2019年12月6日周五 下午10:39写道:
>
>> I think there might be a bug in
>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>>  (unless I'm just not using it correctly) because I'm able to get output
>> when I use the simpler window
>> `.timeWindow(Time.seconds(5))`
>> However, I don't get any output when I used the session-based window.
>>
>>
>> devinbost wrote
>> > I added logging statements everywhere in my code, and I'm able to see my
>> > message reach the `add` method in the AggregateFunction that I
>> > implemented,
>> > but the getResult method is never called.
>> >
>> > In the code below, I also never see the:
>> >  "Ran dataStream. Adding sink next"
>> > line appear in my log, and the only log statements from the
>> > JsonConcatenator
>> > class come from the `add` method, as shown below.
>> >
>> >
>> > DataStream
>> > 
>> >  combinedEnvelopes = dataStream
>> > .map(new MapFunction>()
>> {
>> > @Override
>> > public Tuple2 map(String incomingMessage) throws Exception {
>> > return mapToTuple(incomingMessage);
>> > }
>> > })
>> > .keyBy(0)
>> > .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>> > .aggregate(new JsonConcatenator());
>> >
>> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
>> > logger.info("Ran dataStream. Adding sink next")
>> >
>> > -
>> >
>> > private static class JsonConcatenator
>> > implements AggregateFunction> String>,
>> > Tuple2, String> {
>> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>> > @Override
>> > public Tuple2 createAccumulator() {
>> > return new Tuple2("","");
>> > }
>> >
>> > @Override
>> > public Tuple2 add(Tuple2
>> > value,
>> > Tuple2 accumulator) {
>> > logger.info("Running Add on value.f0: " + value.f0 + " and
>> > value.f1:
>> > " + value.f1);
>> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>> > }
>> >
>> > @Override
>> > public String getResult(Tuple2 accumulator) {
>> > logger.info("Running getResult on accumulator.f1: " +
>> > accumulator.f1);
>> > return "[" + accumulator.f1 + "]";
>> > }
>> >
>> > @Override
>> > public Tuple2 merge(Tuple2> String>
>> > a,
>> > Tuple2 b) {
>> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
>> > a.f1
>> > + " and b.f1: " + b.f1);
>> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>> > }
>> > }
>> >
>> >
>> >
>> >
>> > Any ideas?
>> >
>> >
>> > Chris Miller-2 wrote
>> >> I hit the same problem, as far as I can tell it should be fixed in
>> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
>> it
>> >> should be available in a day or two.
>> >>
>> >> https://github.com/apache/pulsar/pull/5068
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from:
>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Flink 'Job Cluster' mode Ui Access

2019-12-10 Thread Arvid Heise
Hi Jatin,

just to be sure. Did you increase the log level to debug [1] before
checking for *StaticFileServerHandler*?

Best,

Arvid

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html#configuring-log4j

On Mon, Dec 9, 2019 at 7:54 AM Jatin Banger 
wrote:

> Hi,
>
> I have checked the logs with this keyword  *StaticFileServerHandler   *in
> it, But there were no logs coming for "Flink Job Cluster".
> Then i checked for Flink Session Cluster, i was able to find the logs for
> the *StaticFileServerHandler *keyword.
>
> Can i raise this as bug ?
>
> Best Regards,
> Jatin
>
>
> On Thu, Dec 5, 2019 at 8:59 PM Chesnay Schepler 
> wrote:
>
>> Ok, it's good to know that the WebUI files are there.
>>
>> Please enable DEBUG logging and try again, searching for messages from
>> the StaticFileServerHandler.
>>
>> This handler logs every file that is requested (which effectively happens
>> when the WebUI is being served); let's see what is actually being requested.
>>
>> On 05/12/2019 05:57, Jatin Banger wrote:
>>
>> I have tried that already using
>> '$FLINK_HOME/bin/jobmanager.sh" start-foreground
>> Ui comes fine with this one.
>> Which means web/index.html is present.
>>
>>
>> On Wed, Dec 4, 2019 at 9:01 PM Chesnay Schepler 
>> wrote:
>>
>>> hmm...this is quite odd.
>>>
>>> Let's try to narrow things down a bit.
>>>
>>> Could you try starting a local cluster (using the same distribution) and
>>> checking whether the UI is accessible?
>>>
>>> Could you also check whether the flink-dist.jar in /lib contains
>>> web/index.html?
>>> On 04/12/2019 06:02, Jatin Banger wrote:
>>>
>>> Hi,
>>>
>>> I am using flink binary directly.
>>>
>>> I am using this command to deploy the script.
>>>
>>> "$FLINK_HOME/bin/standalone-job.sh"
>>> start-foreground --job-classname ${ARGS_FOR_JOB}
>>> where ARGS_FOR_JOB contain job class name and all other necessary
>>> details needed by the job.
>>>
>>> Best regards,
>>> Jatin
>>>
>>>
>>> On Fri, Nov 29, 2019 at 4:18 PM Chesnay Schepler 
>>> wrote:
>>>
 To clarify, you ran "mvn package -pl flink-dist -am" to build Fink?

 If so, could you run that again and provide us with the maven output?

 On 29/11/2019 11:23, Jatin Banger wrote:

 Hi,

 @vino yang   I am using flink 1.8.1

 I am using the following procedure for the deployment:

 https://github.com/apache/flink/blob/master/flink-container/docker/README.md

 And i tried accessing the path you mentioned:

 # curl :4081/#/overview
 {"errors":["Not found."]}

 Best Regards,
 Jatin

 On Thu, Nov 28, 2019 at 10:21 PM Chesnay Schepler 
 wrote:

> Could you try accessing :/#/overview ?
>
> The REST API is obviously accessible, and hence the WebUI should be
> too.
>
> How did you setup the session cluster? Are you using some custom Flink
> build or something, which potentially excluded flink-runtime-web from the
> classpath?
>
> On 28/11/2019 10:02, Jatin Banger wrote:
>
> Hi,
>
> I checked the log file there is no error.
> And I checked the pods internal ports by using rest api.
>
> # curl : 4081
> {"errors":["Not found."]}
> 4081 is the Ui port
>
> # curl :4081/config
> {"refresh-interval":3000,"timezone-name":"Coordinated Universal
> Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
> @ 11.02.2019 @ 22:17:09 CST"}
>
> # curl :4081/jobs
> {"jobs":[{"id":"___job_Id_","status":"RUNNING"}]}
>
> Which shows the state of the job as running.
>
> What else can we do ?
>
> Best regards,
> Jatin
>
> On Thu, Nov 28, 2019 at 1:28 PM vino yang 
> wrote:
>
>> Hi Jatin,
>>
>> Flink web UI does not depend on any deployment mode.
>>
>> You should check if there are error logs in the log file and the job
>> status is running state.
>>
>> Best,
>> Vino
>>
>> Jatin Banger  于2019年11月28日周四 下午3:43写道:
>>
>>> Hi,
>>>
>>> It seems there is Web Ui for Flink Session cluster, But for Flink
>>> Job Cluster it is Showing
>>>
>>> {"errors":["Not found."]}
>>>
>>> Is it the expected behavior for Flink Job Cluster Mode ?
>>>
>>> Best Regards,
>>> Jatin
>>>
>>
>

>>>
>>


Re: Basic question about flink programms

2019-12-10 Thread Arvid Heise
Hi KristoffSC,

it would be better if you'd open up a new thread. It's very rare for users
to check user lists after 1 year on a regular basis.

In general, if you have a cache, you usually don't want to serialize it. So
add the cache as a field inside the respective function (rewrite a lambda
to an anonymous class) and make the field transient to avoid serialization.
Be aware that you usually want to initialize the cache in your open()
function (so you need to use a RichXFunction).

Best,

Arvid

On Fri, Dec 6, 2019 at 1:23 PM KristoffSC 
wrote:

> Hi,
> Im having the same problem now.  What is your approach now after gaining
> some experience?
>
> Also do you use Spring DI to setup/initialize your jobs/process functions?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink authentication hbase use kerberos

2019-12-10 Thread Aljoscha Krettek
Hi,

I believe that accessing a Kerberos-secured HBase only works from a kerberized 
YARN, because you need the key tab shipping. But I’m not 100 % sure.

Best,
Aljoscha

> On 4. Dec 2019, at 07:41, venn  wrote:
> 
> Hi Guys:
> I wonder about, it is work that flink on yarn deploy on no 
> authentication Hadoop cluster, access hbase deploy on Kerberos authentication 
> Hadoop cluster? If work, what I need to do. I already config flink-conf-yaml 
> properties “security.kerberos.login.keytab” and 
> “security.kerberos.login.principal”.
>  
>  
> And i found the next paragraph in flink official website :  
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-kerberos.html
>   
>  
> Hadoop Security Module
> 
> This module uses the Hadoop UserGroupInformation (UGI) class to establish a 
> process-wide login user context. The login user is then used for all 
> interactions with Hadoop, including HDFS, HBase, and YARN.
> 
> If Hadoop security is enabled (in core-site.xml), the login user will have 
> whatever Kerberos credential is configured. Otherwise, the login user conveys 
> only the user identity of the OS account that launched the cluster.
> 
>  
> 
>  
> Thanks a lot !



Re: Side output question

2019-12-10 Thread Arvid Heise
Hi Mans,

there should be no issue to only have side-outputs in your operator. There
should also be no big drawbacks. I guess mostly some metrics will not be
properly populated, but you can always populate them manually or add new
ones.

Best,

Arvid

On Mon, Dec 2, 2019 at 8:40 PM M Singh  wrote:

> Hi:
>
> I am replacing SplitOperator in my flink application with a simple
> processor with side outputs.
>
> My questions is that does the main stream from which we get the side
> outputs need to have any events (ie, produced using by the using
> collector.collect) ?  Or can we have all the output as side outputs ? Also
> are there any pros and cons of at least one main collected output vs all
> side outputs ?
>
> Thanks
>
> Mans
>