Re: Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 Thread tison
OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7
的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。

这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。

一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor
这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。

Best,
tison.


nicygan  于2020年3月7日周六 下午3:16写道:

> tison,你好运行到这里时,报空指针
> Caused by: java.lang.NullPointerException
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506)
>
> getNodeReports方法中:
> GetClusterNodesResponse response = rmClient.getClusterNodes(request);
> 这句的rmClient为null值。
>
>
>
> 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误:
> Exception in thread "main"
> org.apache.hadoop.service.ServiceStateException:
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state
> STARTED from state NOTINITED
> at
> org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129)
> at
> org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111)
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:190)
>
>
>
>
>
>
>
>
> 在 2020-03-07 11:15:10,"tison"  写道:
> >不成功的报错是啥?
> >
> >Best,
> >tison.
> >
> >
> >nicygan  于2020年3月7日周六 上午11:14写道:
> >
> >> dear all:
> >>
> >>
> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
> >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
> >>
> >> ..
> >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
> >> ..
> >> ..
> >> yarnClusterDescriptor.deployJobCluster(
> >> clusterSpecification,
> >>   jobGraph, true);
> >>
> >>
>


flink elasticsearch sink ????????????????

2020-03-06 Thread ??????
 
elasticsearch flink elasticsearch sink 
exactly-onceflinkelasticsearch
 sink??
??uuId





Re:Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 Thread nicygan
tison,你好运行到这里时,报空指针
Caused by: java.lang.NullPointerException
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506)

getNodeReports方法中:
GetClusterNodesResponse response = rmClient.getClusterNodes(request);
这句的rmClient为null值。



我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误:
Exception in thread "main" org.apache.hadoop.service.ServiceStateException: 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state 
STARTED from state NOTINITED
at 
org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129)
at 
org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:190)








在 2020-03-07 11:15:10,"tison"  写道:
>不成功的报错是啥?
>
>Best,
>tison.
>
>
>nicygan  于2020年3月7日周六 上午11:14写道:
>
>> dear all:
>>
>> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
>> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
>>
>> ..
>> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
>> ..
>> ..
>> yarnClusterDescriptor.deployJobCluster(
>> clusterSpecification,
>>   jobGraph, true);
>>
>>


RE: Flink Conf "yarn.flink-dist-jar" Question

2020-03-06 Thread Hailu, Andreas
Hi Tison, thanks for the reply. I’ve replied to the ticket. I’ll be watching it 
as well.

// ah

From: tison 
Sent: Friday, March 6, 2020 1:40 PM
To: Hailu, Andreas [Engineering] 
Cc: user@flink.apache.org
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question

FLINK-13938 seems a bit different than your requirement. The one totally 
matches is 
FLINK-14964.
 I'll appreciate it if you can share you opinion on the JIRA ticket.

Best,
tison.


tison mailto:wander4...@gmail.com>> 于2020年3月7日周六 上午2:35写道:
Yes your requirement is exactly taken into consideration by the community. We 
currently have an open JIRA ticket for the specific feature[1] and works for 
loosing the constraint of flink-jar schema to support DFS location should 
happen.

Best,
tison.

[1] 
https://issues.apache.org/jira/browse/FLINK-13938


Hailu, Andreas mailto:andreas.ha...@gs.com>> 于2020年3月7日周六 
上午2:03写道:
Hi,

We noticed that every time an application runs, it uploads the flink-dist 
artifact to the /user//.flink HDFS directory. This causes a user disk 
space quota issue as we submit thousands of apps to our cluster an hour. We had 
a similar problem with our Spark applications where it uploaded the Spark 
Assembly package for every app. Spark provides an argument to use a location in 
HDFS its for applications to leverage so they don’t need to upload them for 
every run, and that was our solution (see “spark.yarn.jar” configuration if 
interested.)

Looking at the Resource Orchestration Frameworks 
page,
 I see there’s might be a similar concept through a “yarn.flink-dist-jar” 
configuration option. I wanted to place the flink-dist package we’re using in a 
location in HDFS and configure out jobs to point to it, e.g.

yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar

Am I correct in that this is what I’m looking for? I gave this a try with some 
jobs today, and based on what I’m seeing in the launch_container.sh in our YARN 
application, it still looks like it’s being uploaded:

export 
_FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar"

How can I confirm? Or is this perhaps not config I’m looking for?

Best,
Andreas



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 Thread tison
不成功的报错是啥?

Best,
tison.


nicygan  于2020年3月7日周六 上午11:14写道:

> dear all:
>
> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
>
> ..
> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
> ..
> ..
> yarnClusterDescriptor.deployJobCluster(
> clusterSpecification,
>   jobGraph, true);
>
>


jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 Thread nicygan
dear all:

我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。

..
JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
..
..
yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
  jobGraph, true);



Understanding n LIST calls as part of checkpointing

2020-03-06 Thread Piyush Narang
Hi folks,

I was trying to debug a job which was taking 20-30s to checkpoint data to Azure 
FS (compared to typically < 5s) and as part of doing so, I noticed something 
that I was trying to figure out a bit better.
Our checkpoint path is as follows: 
my_user/featureflow/foo-datacenter/cluster_name/my_flink_job/checkpoint/chk-1234

What I noticed was that while trying to take checkpoints (incremental using 
rocksDB) we make a number of List calls to Azure:
my_user/featureflow/foo-datacenter/cluster_name/my_flink_job/checkpoint
my_user/featureflow/foo-datacenter/cluster_name/my_flink_job
my_user/featureflow/foo-datacenter/cluster_name
my_user/featureflow/foo-datacenter
my_user/featureflow
my_user

Each of these calls takes a few seconds and all of them seem to add up to make 
our checkpoint take time. The part I was hoping to understand on the Flink side 
was whether the behavior of making these List calls for each parent ‘directory’ 
/ blob all the way to the top was normal / expected?

We are exploring a couple of other angles on our end (potentially flattening 
the directory / blob structure to reduce the number of these calls, is the 
latency on the Azure side expected), but along with this I was hoping to 
understand if this behavior on the Flink side is expected / if there’s 
something which we could optimize as well.

Thanks,

-- Piyush



Re: Flink Conf "yarn.flink-dist-jar" Question

2020-03-06 Thread tison
FLINK-13938 seems a bit different than your requirement. The one totally
matches is FLINK-14964 .
I'll appreciate it if you can share you opinion on the JIRA ticket.

Best,
tison.


tison  于2020年3月7日周六 上午2:35写道:

> Yes your requirement is exactly taken into consideration by the community.
> We currently have an open JIRA ticket for the specific feature[1] and works
> for loosing the constraint of flink-jar schema to support DFS location
> should happen.
>
> Best,
> tison.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13938
>
>
> Hailu, Andreas  于2020年3月7日周六 上午2:03写道:
>
>> Hi,
>>
>>
>>
>> We noticed that every time an application runs, it uploads the flink-dist
>> artifact to the /user//.flink HDFS directory. This causes a user disk
>> space quota issue as we submit thousands of apps to our cluster an hour. We
>> had a similar problem with our Spark applications where it uploaded the
>> Spark Assembly package for every app. Spark provides an argument to use a
>> location in HDFS its for applications to leverage so they don’t need to
>> upload them for every run, and that was our solution (see “spark.yarn.jar”
>> configuration if interested.)
>>
>>
>>
>> Looking at the Resource Orchestration Frameworks page
>> ,
>> I see there’s might be a similar concept through a “yarn.flink-dist-jar”
>> configuration option. I wanted to place the flink-dist package we’re using
>> in a location in HDFS and configure out jobs to point to it, e.g.
>>
>>
>>
>> yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar
>>
>>
>>
>> Am I correct in that this is what I’m looking for? I gave this a try with
>> some jobs today, and based on what I’m seeing in the launch_container.sh in
>> our YARN application, it still looks like it’s being uploaded:
>>
>>
>>
>> export
>> _FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar"
>>
>>
>>
>> How can I confirm? Or is this perhaps not config I’m looking for?
>>
>>
>>
>> Best,
>>
>> Andreas
>>
>> --
>>
>> Your Personal Data: We may collect and process information about you that
>> may be subject to data protection laws. For more information about how we
>> use and disclose your personal data, how we protect your information, our
>> legal basis to use your information, your rights and who you can contact,
>> please refer to: www.gs.com/privacy-notices
>>
>


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-03-06 Thread tison
Yes your requirement is exactly taken into consideration by the community.
We currently have an open JIRA ticket for the specific feature[1] and works
for loosing the constraint of flink-jar schema to support DFS location
should happen.

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-13938


Hailu, Andreas  于2020年3月7日周六 上午2:03写道:

> Hi,
>
>
>
> We noticed that every time an application runs, it uploads the flink-dist
> artifact to the /user//.flink HDFS directory. This causes a user disk
> space quota issue as we submit thousands of apps to our cluster an hour. We
> had a similar problem with our Spark applications where it uploaded the
> Spark Assembly package for every app. Spark provides an argument to use a
> location in HDFS its for applications to leverage so they don’t need to
> upload them for every run, and that was our solution (see “spark.yarn.jar”
> configuration if interested.)
>
>
>
> Looking at the Resource Orchestration Frameworks page
> ,
> I see there’s might be a similar concept through a “yarn.flink-dist-jar”
> configuration option. I wanted to place the flink-dist package we’re using
> in a location in HDFS and configure out jobs to point to it, e.g.
>
>
>
> yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar
>
>
>
> Am I correct in that this is what I’m looking for? I gave this a try with
> some jobs today, and based on what I’m seeing in the launch_container.sh in
> our YARN application, it still looks like it’s being uploaded:
>
>
>
> export
> _FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar"
>
>
>
> How can I confirm? Or is this perhaps not config I’m looking for?
>
>
>
> Best,
>
> Andreas
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


Flink Conf "yarn.flink-dist-jar" Question

2020-03-06 Thread Hailu, Andreas
Hi,

We noticed that every time an application runs, it uploads the flink-dist 
artifact to the /user//.flink HDFS directory. This causes a user disk 
space quota issue as we submit thousands of apps to our cluster an hour. We had 
a similar problem with our Spark applications where it uploaded the Spark 
Assembly package for every app. Spark provides an argument to use a location in 
HDFS its for applications to leverage so they don't need to upload them for 
every run, and that was our solution (see "spark.yarn.jar" configuration if 
interested.)

Looking at the Resource Orchestration Frameworks 
page,
 I see there's might be a similar concept through a "yarn.flink-dist-jar" 
configuration option. I wanted to place the flink-dist package we're using in a 
location in HDFS and configure out jobs to point to it, e.g.

yarn.flink-dist-jar: hdfs:user/delp/.flink/flink-dist_2.11-1.9.1.jar

Am I correct in that this is what I'm looking for? I gave this a try with some 
jobs today, and based on what I'm seeing in the launch_container.sh in our YARN 
application, it still looks like it's being uploaded:

export 
_FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar"

How can I confirm? Or is this perhaps not config I'm looking for?

Best,
Andreas



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: History server UI not working

2020-03-06 Thread pwestermann
I am seeing this error in firefox:

ERROR TypeError: "this.statusService.configuration.features is undefined"
t http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
qr http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
Gr http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
ko http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
Oo http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
Bo http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
create http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
create http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
bootstrap http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
_moduleDoBootstrap
http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
_moduleDoBootstrap
http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
o http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
invoke http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
onInvoke http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
invoke http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
run http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
I http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
invokeTask http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
onInvokeTask http://10.25.197.60:8082/main.177039bdbab11da4f8ac.js:1
invokeTask http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
runTask http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
g http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
invokeTask http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
m http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1
k http://10.25.197.60:8082/polyfills.b37850e8279bc3caafc9.js:1

And this one in Chrome (both on Mac):

main.177039bdbab11da4f8ac.js:1 ERROR TypeError: Cannot read property
'web-submit' of undefined
at new t (main.177039bdbab11da4f8ac.js:1)
at qr (main.177039bdbab11da4f8ac.js:1)
at Gr (main.177039bdbab11da4f8ac.js:1)
at ko (main.177039bdbab11da4f8ac.js:1)
at Oo (main.177039bdbab11da4f8ac.js:1)
at Object.Bo [as createRootView] (main.177039bdbab11da4f8ac.js:1)
at e.create (main.177039bdbab11da4f8ac.js:1)
at e.create (main.177039bdbab11da4f8ac.js:1)
at t.bootstrap (main.177039bdbab11da4f8ac.js:1)
at main.177039bdbab11da4f8ac.js:1

Refreshing doesn't do anything.

Thanks for looking into this,

Peter



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: History server UI not working

2020-03-06 Thread Robert Metzger
I'm also suspecting a problem with the UI updated

Do the Developer Tools of the browser show any error messages?

On Thu, Mar 5, 2020 at 7:00 AM Yang Wang  wrote:

> If all the rest api could be viewed successfully, then the reason may be
> js cache.
> You could try to force a refresh(e.g. Cmd+Shft+R for Mac). It solved my
> problem before.
>
>
> Best,
> Yang
>
> pwestermann  于2020年3月4日周三 下午8:40写道:
>
>> We recently upgraded from Flink 1.7 to Flink 1.9.2 and the history server
>> UI
>> now seems to be broken. It doesn't load and always just displays a blank
>> screen.
>> The individual endpoints (e.g. /jobs/overview) still work.
>> Could this be an issue caused by the Angular update for the regular UI?
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-06 Thread Robert Metzger
@Chesnay Schepler : Does it make sense to file a ticket
to add the operator name to the latency metrics as well?

On Thu, Mar 5, 2020 at 4:31 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> thanks! I was wondering why the operator name is not implemented for the
> latency metrics, because for the other metrics it is implemented.
> but thanks anyway!
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Thu, Mar 5, 2020 at 2:06 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> Please find the answers to your questions below.
>>
>> > Each "operator_subtask_index" means each instance of the parallel
>> physical operator, doesn't it?
>> Yes.
>> > How can I set a fixed ID for the "operator_id" in my code so I can
>> identify quickly which operator I am measuring?
>> You are using the correct api (uid(...))
>> > What is the hash function used so I can identify my operator?
>> Flink uses
>> https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi community,
>>>
>>> I am tracking the latency of operators in Flink according to this
>>> reference [1]. When I am using Prometheus+Grafana I can issue a query using
>>> "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency"
>>> and I can check the percentiles of each "operator_id" and each
>>> "operator_subtask_index". Each "operator_subtask_index" means each instance
>>> of the parallel physical operator, doesn't it?
>>>
>>> How can I set a fixed ID for the "operator_id" in my code so I can
>>> identify quickly which operator I am measuring? I used "map(new
>>> MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash
>>> function that converts the string to a hash value. What is the hash
>>> function used so I can identify my operator? I know that I can use the Rest
>>> API [2] and if I name my operator it will have always the same hash when I
>>> restart the job, but I would like to set its name.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
>>> *-*
>>> *- Felipe Gutierrez*
>>>
>>> *- skype: felipe.o.gutierrez*
>>> *- **https://felipeogutierrez.blogspot.com
>>> * *
>>> *
>>>
>>


Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

2020-03-06 Thread Castro, Fernando C.
Arvid, thank you that was it!
After setting these properties to my Elasticsearch connector, I was able to see 
the records upserting into ES!

.bulkFlushMaxActions(2)
.bulkFlushInterval(1000L)

Thank you,
Fernando


From: Arvid Heise 
Date: Thursday, March 5, 2020 at 2:27 AM
To: "Castro, Fernando C. [US-US]" 
Cc: Jark Wu , John Smith , 
"user@flink.apache.org" 
Subject: Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Hi Fernando,

How much data are you trying to write? If you just use single messages for 
testing, it could be that the default bulk settings are not working well.

If so, could you please adjust the following settings and report back?

public enum SinkOption {
   BULK_FLUSH_MAX_ACTIONS,
   BULK_FLUSH_MAX_SIZE,
   BULK_FLUSH_INTERVAL
}

On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. 
mailto:fernando.cas...@leidos.com>> wrote:
Thank you guys. So I have no idea of why data is not being pushed to 
Elasticsearch… ☹

My complete code is at 
https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
Btw, for some reason I still need to pass .documentType to the Elasticsearch 
connection descriptor (getting it from 
org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t 
do types anymore.

In case you can’t access stackoverflow for some reason, here is the code below 
too:
/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, 
FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
private var formatter = null

@throws[Exception]
override def open(parameters: Configuration): Unit = {
  super.open(parameters)
  //formatter = DateTimeFormat.forPattern("-MM-dd HH:mm:ss")
}

@throws[Exception]
override def map(csvLine: String): Transfers = {
  //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
  var splitCsv = csvLine.stripLineEnd.split(",")

  val arrLength = splitCsv.length
  val i = 0
  if (arrLength != 13) {
for (i <- arrLength + 1 to 13) {
  if (i == 13) {
splitCsv = splitCsv :+ "0.0"
  } else {
splitCsv = splitCsv :+ ""
  }
}
  }
  var trans = new Transfers()
  trans.rowId = splitCsv(0)
  trans.subjectId = splitCsv(1)
  trans.hadmId = splitCsv(2)
  trans.icuStayId = splitCsv(3)
  trans.dbSource = splitCsv(4)
  trans.eventType = splitCsv(5)
  trans.prev_careUnit = splitCsv(6)
  trans.curr_careUnit = splitCsv(7)
  trans.prev_wardId = splitCsv(8)
  trans.curr_wardId = splitCsv(9)
  trans.inTime = splitCsv(10)
  trans.outTime = splitCsv(11)
  trans.los = splitCsv(12).toDouble

  return trans
}
  }

  def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// Set properties per KafkaConsumer API
val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
properties.setProperty("group.id", "test")

// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new 
SimpleStringSchema(), properties)
// Read from beginning of topic
myKConsumer.setStartFromEarliest()

val streamSource = env
  .addSource(myKConsumer)

// Transform CSV into a Transfers object
val streamTransfers = streamSource.map(new TransfersMapper())

// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)

// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
tEnv.createTemporaryView("transfers", tblTransfers)

tEnv.connect(
  new Elasticsearch()
.version("7")

.host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local",
 9200, "http")   // required: one or more Elasticsearch hosts to connect to
.index("transfers-sum")
.documentType("_doc") // not sure why this is still needed for ES7
.keyNullLiteral("n/a")
)
  .withFormat(new Json().jsonSchema("{type: 

Re: Flink Deployment failing with RestClientException

2020-03-06 Thread Robert Metzger
Hey Samir,

can you try setting the following configuration parameter (make sure the
JobManager log confirms that the changed value is in effect)
web.timeout: 30

This might uncover the underlying problem (as we are waiting longer for the
underlying issue to timeout).

Are you able to upgrade to the latest Flink version easily?


On Thu, Mar 5, 2020 at 7:02 PM Andrey Zagrebin 
wrote:

> Hi Samir,
>
> It may be a known issue [1][2] where some action during job submission
> takes too long time but eventually completes in job manager.
> Have you checked job manager logs whether there are any other failures,
> not “Ask timed out"?
> Have you checked Web UI whether all the jobs have been started in fact
> despite the client error?
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-16429
> [2] https://issues.apache.org/jira/browse/FLINK-16018
>
> On 5 Mar 2020, at 17:49, Samir Tusharbhai Chauhan <
> samir.tusharbhai.chau...@prudential.com.sg> wrote:
>
> Hi,
>
> I am having issue where after deploying few jobs, it starts failing with
> below errors. I don’t have such issue in other environments. What should I
> check first in such scenario?
> *My environment is*
> Azure Kubernetes 1.15.7
> Flink 1.6.0
> Zookeeper 3.4.10
>
>
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> submit job (JobID: e83db2da358db355ccdcf6740c6bb134)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:249)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
> at
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:432)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Exception is not retryable.
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ... 12 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Exception is not retryable.
> ... 10 more
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job submission
> failed.]
> at
> 

Re: How to print the aggregated state everytime it is updated?

2020-03-06 Thread Robert Metzger
Hey,

I don't think you need to use a window operator for this use case. A reduce
(or fold) operation should be enough:
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/


On Fri, Mar 6, 2020 at 11:50 AM kant kodali  wrote:

> Hi,
>
> Thanks for this. so how can I emulate an infinite window while outputting
> every second? simply put, I want to store the state forever (say years) and
> since rocksdb is my state backend I am assuming I can state the state until
> I run out of disk. However I want to see all the updates to the states
> every second. sounds to me I need to have a window of one second, compute
> for that window and pass it on to next window or is there some other way?
>
> Thanks
>
> On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> From the description, you use window operator, and set to event time.
>> then you should call `DataStream.assignTimestampsAndWatermarks` to set
>> the timestamp and watermark.
>> Window is triggered when the watermark exceed the window end time
>>
>> Best,
>> Congxian
>>
>>
>> kant kodali  于2020年3月4日周三 上午5:11写道:
>>
>>> Hi All,
>>>
>>> I have a custom aggregated state that is represent by Set and I
>>> have a stream of values coming in from Kafka where I inspect, compute the
>>> custom aggregation and store it in Set. Now, I am trying to figureout
>>> how do I print the updated value everytime this state is updated?
>>>
>>> Imagine I have a Datastream>
>>>
>>> I tried few things already but keep running into the following
>>> exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I
>>> thought watermarks are not mandatory in Flink especially when I want to
>>> keep this aggregated state forever. any simple code sample on how to print
>>> the streaming aggregated state represented by Datastream> will be
>>> great! You can imagine my Set has a toString() method that takes
>>> cares of printing..and I just want to see those values in stdout.
>>>
>>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
>>> timestamp (= no timestamp marker). Is the time characteristic set to
>>> 'ProcessingTime', or did you forget to call
>>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>>
>>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread tison
Hi Jingsong,

I think your propose is "--classpath can occur behind the jar file".

Generally speaking I agree on that it is a painful required format that
users tend to just ignore that order how an option occurs. So it is +1 from
my side to loose the constraint.

However, for the migration and implementation part, things go into a bit
tricky.

For user interface, let's say we only enable --classpath to occur behind
the jar file, at least the semantic changes if there is a user pass
--classpath intended to be a main argument.

Besides, said we fix on the library commons-cli to implement the CLI, it
would be a bit tricky we implement such special taken logic.

Accidentally I encounter similar CLI problem recently so here are some of
my thoughts about the problem,

1. I agree that for options, users tend to treat  the same as
[OPTIONS] and mix up the order. It would be an improvement we loss the
constraint.
2. Then, we still have to introduce something that users specify their args
for the main method.
3. In order to achieve 2, there is a mature solution in shell scripting
that use double-dash(--) to to signify the end of command options.
4. Now, if we keep  as position argument, to support mix-ordered
position argument & named argument, we might switch to other library such
as argparse4j since commons-cli doesn't support position argument. An
alternative is we change  as named argument but then we double
break user interface.

Though, it will break user interface so we firstly **MUST** start a
discussion and see whether the community think of it and if so, how to
integrate it. For me, read the doc is an easy solution to save us from
breaking user interface. I don't stick to loose the constraint.

Best,
tison.


Jingsong Li  于2020年3月6日周五 下午10:27写道:

> Hi tison and Aljoscha,
>
> Do you think "--classpath can not be in front of jar file" is an
> improvement? Or need documentation? Because I used to be confused.
>
> Best,
> Jingsong Lee
>
> On Fri, Mar 6, 2020 at 10:22 PM tison  wrote:
>
>> I think the problem is that --classpath should be before the user jar,
>> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>>
>> Best,
>> tison.
>>
>>
>> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>>
>>> Hi,
>>>
>>> first a preliminary question: does the jar file contain
>>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>>> here?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 06.03.20 13:25, ouywl wrote:
>>> > Hi all
>>> >   When I start a flinkcluster in session mode, It include jm/tm.
>>> And then I
>>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>>> a.jar’. Even
>>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>>> exception “
>>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>>> > file:///opt/flink/job/fastjson-1.2.66.jar
>>> > Starting execution of program
>>> > Executing TopSpeedWindowing example with default input data set.
>>> > Use --input to specify file input.
>>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> > at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> > at
>>> >
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> > at java.lang.reflect.Method.invoke(Method.java:498)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>> > at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>> > at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>>> >As I read the code , flink cli have not load the —classspath jar,
>>> So It seems
>>> > a bug about the flink cli. Are you agree with me?
>>> > Best,
>>> > Ouywl
>>> >
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Yang Wang
I think tison's answer is on point. All the Flink cli options should be
specified before the user jar. We have a very clear help message.

Syntax: run [OPTIONS]  


Best,
Yang

Jingsong Li  于2020年3月6日周五 下午10:27写道:

> Hi tison and Aljoscha,
>
> Do you think "--classpath can not be in front of jar file" is an
> improvement? Or need documentation? Because I used to be confused.
>
> Best,
> Jingsong Lee
>
> On Fri, Mar 6, 2020 at 10:22 PM tison  wrote:
>
>> I think the problem is that --classpath should be before the user jar,
>> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>>
>> Best,
>> tison.
>>
>>
>> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>>
>>> Hi,
>>>
>>> first a preliminary question: does the jar file contain
>>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>>> here?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 06.03.20 13:25, ouywl wrote:
>>> > Hi all
>>> >   When I start a flinkcluster in session mode, It include jm/tm.
>>> And then I
>>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>>> a.jar’. Even
>>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>>> exception “
>>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>>> > file:///opt/flink/job/fastjson-1.2.66.jar
>>> > Starting execution of program
>>> > Executing TopSpeedWindowing example with default input data set.
>>> > Use --input to specify file input.
>>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> > at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> > at
>>> >
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> > at java.lang.reflect.Method.invoke(Method.java:498)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>> > at
>>> >
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>> > at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>> > at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>>> >As I read the code , flink cli have not load the —classspath jar,
>>> So It seems
>>> > a bug about the flink cli. Are you agree with me?
>>> > Best,
>>> > Ouywl
>>> >
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Jingsong Li
Hi tison and Aljoscha,

Do you think "--classpath can not be in front of jar file" is an
improvement? Or need documentation? Because I used to be confused.

Best,
Jingsong Lee

On Fri, Mar 6, 2020 at 10:22 PM tison  wrote:

> I think the problem is that --classpath should be before the user jar,
> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>
> Best,
> tison.
>
>
> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>
>> Hi,
>>
>> first a preliminary question: does the jar file contain
>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>> here?
>>
>> Best,
>> Aljoscha
>>
>> On 06.03.20 13:25, ouywl wrote:
>> > Hi all
>> >   When I start a flinkcluster in session mode, It include jm/tm.
>> And then I
>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>> a.jar’. Even
>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>> exception “
>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>> > file:///opt/flink/job/fastjson-1.2.66.jar
>> > Starting execution of program
>> > Executing TopSpeedWindowing example with default input data set.
>> > Use --input to specify file input.
>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> > at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> > at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>> >As I read the code , flink cli have not load the —classspath jar, So
>> It seems
>> > a bug about the flink cli. Are you agree with me?
>> > Best,
>> > Ouywl
>> >
>>
>

-- 
Best, Jingsong Lee


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread tison
It is because as implementation when we parse command line argument it
"stopAtNonOptions" at the arbitrary content user jar. All arguments later
will be regarded as args passed to user main.

For user serving, when you run `./bin/flink run -h`, it prints

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS]  

that explicit explains the format.

Best,
tison.


tison  于2020年3月6日周五 下午10:22写道:

> I think the problem is that --classpath should be before the user jar,
> i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar
>
> Best,
> tison.
>
>
> Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:
>
>> Hi,
>>
>> first a preliminary question: does the jar file contain
>> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
>> here?
>>
>> Best,
>> Aljoscha
>>
>> On 06.03.20 13:25, ouywl wrote:
>> > Hi all
>> >   When I start a flinkcluster in session mode, It include jm/tm.
>> And then I
>> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
>> a.jar’. Even
>> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw
>> exception “
>> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
>> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
>> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
>> > file:///opt/flink/job/fastjson-1.2.66.jar
>> > Starting execution of program
>> > Executing TopSpeedWindowing example with default input data set.
>> > Use --input to specify file input.
>> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
>> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> > at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> > at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> > at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
>> >As I read the code , flink cli have not load the —classspath jar, So
>> It seems
>> > a bug about the flink cli. Are you agree with me?
>> > Best,
>> > Ouywl
>> >
>>
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread tison
I think the problem is that --classpath should be before the user jar,
i.e., /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar

Best,
tison.


Aljoscha Krettek  于2020年3月6日周五 下午10:03写道:

> Hi,
>
> first a preliminary question: does the jar file contain
> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
> here?
>
> Best,
> Aljoscha
>
> On 06.03.20 13:25, ouywl wrote:
> > Hi all
> >   When I start a flinkcluster in session mode, It include jm/tm. And
> then I
> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
> a.jar’. Even
> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception
> “
> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
> > file:///opt/flink/job/fastjson-1.2.66.jar
> > Starting execution of program
> > Executing TopSpeedWindowing example with default input data set.
> > Use --input to specify file input.
> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> > at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
> >As I read the code , flink cli have not load the —classspath jar, So
> It seems
> > a bug about the flink cli. Are you agree with me?
> > Best,
> > Ouywl
> >
>


Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Jingsong Li
Hi ouywl,

As I know, "--classpath" should be in front of jar file, it means:
/opt/flink/bin/flink run --jobmanager ip:8081 --class
com.netease.java.TopSpeedWindowing --parallelism 1 --detached --classpath
file:///opt/flink/job/fastjson-1.2.66.jar
/opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar

You can have a try.

Best,
Jingsong Lee

On Fri, Mar 6, 2020 at 10:03 PM Aljoscha Krettek 
wrote:

> Hi,
>
> first a preliminary question: does the jar file contain
> com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar
> here?
>
> Best,
> Aljoscha
>
> On 06.03.20 13:25, ouywl wrote:
> > Hi all
> >   When I start a flinkcluster in session mode, It include jm/tm. And
> then I
> > submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path
> a.jar’. Even
> > the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception
> “
> > /opt/flink/bin/flink run --jobmanager ip:8081 --class
> > com.netease.java.TopSpeedWindowing --parallelism 1 --detached
> > /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
> > file:///opt/flink/job/fastjson-1.2.66.jar
> > Starting execution of program
> > Executing TopSpeedWindowing example with default input data set.
> > Use --input to specify file input.
> > java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
> > at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> > at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
> >As I read the code , flink cli have not load the —classspath jar, So
> It seems
> > a bug about the flink cli. Are you agree with me?
> > Best,
> > Ouywl
> >
>


-- 
Best, Jingsong Lee


Re: Writing retract streams to Kafka

2020-03-06 Thread Gyula Fóra
Thanks Kurt, I came to the same conclusions after trying what Jark
provided. I can get similar behaviour if I reduce the grouping window to 1
sec but still keep the join window large.

Gyula

On Fri, Mar 6, 2020 at 3:09 PM Kurt Young  wrote:

> @Gyula Fóra  I think your query is right, we should
> produce insert only results if you have event time and watermark defined.
> I've create https://issues.apache.org/jira/browse/FLINK-16466 to track
> this issue.
>
> Best,
> Kurt
>
>
> On Fri, Mar 6, 2020 at 12:14 PM Kurt Young  wrote:
>
>> Actually this use case lead me to start thinking about one question:
>> If watermark is enabled, could we also support GROUP BY event_time
>> instead of forcing
>> user defining a window based on the event_time.
>>
>> GROUP BY a standalone event_time can also be treated as a special window,
>> which has
>> both start_time and end_time equals to event_time. And when watermark
>> surpass the event_time,
>> we can still get the complete data of such group and do required
>> aggregation and then emit
>> insert only results.
>>
>> That would ease user's burden for not having to define a window when they
>> already have event
>> time and watermark defined.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu  wrote:
>>
>>> Hi Gyula,
>>>
>>> Does tumbling 5 seconds for aggregation meet your need? For example:
>>>
>>> INSERT INTO QueryResult
>>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
>>> SECOND), sum(t.quantity) AS quantity
>>> FROM
>>>   ItemTransactions AS t,
>>>   Queries AS q
>>> WHERE
>>>   t.itemId = q.itemId AND
>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>> q.event_time
>>> GROUP BY
>>>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra  wrote:
>>>
 I see, maybe I just dont understand how to properly express what I am
 trying to compute.

 Basically I want to aggregate the quantities of the transactions that
 happened in the 5 seconds before the query.
 Every query.id belongs to a single query (event_time, itemid) but
 still I have to group :/

 Gyula

 On Thu, Mar 5, 2020 at 3:45 PM Kurt Young  wrote:

> I think the issue is not caused by event time interval join, but the
> aggregation after the join:
> GROUP BY t.itemId, q.event_time, q.queryId;
>
> In this case, there is still no chance for Flink to determine whether
> the groups like (itemId, eventtime, queryId) have complete data or not.
> As a comparison, if you change the grouping key to a window which
> based only on q.event_time, then the query would emit insert only results.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra 
> wrote:
>
>> That's exactly the kind of behaviour I am looking for Kurt ("ignore
>> all delete messages").
>>
>> As for the data completion, in my above example it is basically an
>> event time interval join.
>> With watermarks defined Flink should be able to compute results once
>> in exactly the same way as for the tumbling window.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>>
>>> Back to this case, I assume you are expecting something like "ignore
>>> all delete messages" flag? With this
>>> flag turned on, Flink will only send insert messages which
>>> corresponding current correct results to kafka and
>>> drop all retractions and deletes on the fly.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>>>
 > I also don't completely understand at this point why I can write
 the result of a group, tumble window aggregate to Kafka and not this 
 window
 join / aggregate.

 If you are doing a tumble window aggregate with watermark enabled,
 Flink will only fire a final result for
 each window at once, no modification or retractions will happen
 after a window is calculated and fired.
 But with some other arbitrary aggregations, there is not enough
 information for Flink to determine whether
 the data is complete or not, so the framework will keep calculating
 results when receiving new records and
 retract earlier results by firing retraction/deletion messages.

 Best,
 Kurt


 On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra 
 wrote:

> Thanks Benoît!
>
> I can see now how I can implement this myself through the provided
> sink interfaces but I was trying to avoid having to write code for 
> this :D
> My initial motivation was to see whether we are able to write out
> any kind of table to Kafka as a simple stream of "upserts".
>

Re: Writing retract streams to Kafka

2020-03-06 Thread Kurt Young
@Gyula Fóra  I think your query is right, we should
produce insert only results if you have event time and watermark defined.
I've create https://issues.apache.org/jira/browse/FLINK-16466 to track this
issue.

Best,
Kurt


On Fri, Mar 6, 2020 at 12:14 PM Kurt Young  wrote:

> Actually this use case lead me to start thinking about one question:
> If watermark is enabled, could we also support GROUP BY event_time instead
> of forcing
> user defining a window based on the event_time.
>
> GROUP BY a standalone event_time can also be treated as a special window,
> which has
> both start_time and end_time equals to event_time. And when watermark
> surpass the event_time,
> we can still get the complete data of such group and do required
> aggregation and then emit
> insert only results.
>
> That would ease user's burden for not having to define a window when they
> already have event
> time and watermark defined.
>
> Best,
> Kurt
>
>
> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu  wrote:
>
>> Hi Gyula,
>>
>> Does tumbling 5 seconds for aggregation meet your need? For example:
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
>> SECOND), sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
>> GROUP BY
>>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>>
>> Best,
>> Jark
>>
>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra  wrote:
>>
>>> I see, maybe I just dont understand how to properly express what I am
>>> trying to compute.
>>>
>>> Basically I want to aggregate the quantities of the transactions that
>>> happened in the 5 seconds before the query.
>>> Every query.id belongs to a single query (event_time, itemid) but still
>>> I have to group :/
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young  wrote:
>>>
 I think the issue is not caused by event time interval join, but the
 aggregation after the join:
 GROUP BY t.itemId, q.event_time, q.queryId;

 In this case, there is still no chance for Flink to determine whether
 the groups like (itemId, eventtime, queryId) have complete data or not.
 As a comparison, if you change the grouping key to a window which based
 only on q.event_time, then the query would emit insert only results.

 Best,
 Kurt


 On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra 
 wrote:

> That's exactly the kind of behaviour I am looking for Kurt ("ignore
> all delete messages").
>
> As for the data completion, in my above example it is basically an
> event time interval join.
> With watermarks defined Flink should be able to compute results once
> in exactly the same way as for the tumbling window.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>
>> Back to this case, I assume you are expecting something like "ignore
>> all delete messages" flag? With this
>> flag turned on, Flink will only send insert messages which
>> corresponding current correct results to kafka and
>> drop all retractions and deletes on the fly.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>>
>>> > I also don't completely understand at this point why I can write
>>> the result of a group, tumble window aggregate to Kafka and not this 
>>> window
>>> join / aggregate.
>>>
>>> If you are doing a tumble window aggregate with watermark enabled,
>>> Flink will only fire a final result for
>>> each window at once, no modification or retractions will happen
>>> after a window is calculated and fired.
>>> But with some other arbitrary aggregations, there is not enough
>>> information for Flink to determine whether
>>> the data is complete or not, so the framework will keep calculating
>>> results when receiving new records and
>>> retract earlier results by firing retraction/deletion messages.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra 
>>> wrote:
>>>
 Thanks Benoît!

 I can see now how I can implement this myself through the provided
 sink interfaces but I was trying to avoid having to write code for 
 this :D
 My initial motivation was to see whether we are able to write out
 any kind of table to Kafka as a simple stream of "upserts".

 I also don't completely understand at this point why I can write
 the result of a group, tumble window aggregate to Kafka and not this 
 window
 join / aggregate.

 Cheers,
 Gyula

 On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
 benoit.pa...@centraliens-lille.org> wrote:

> Hi Gyula,
>

Re: (DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread Aljoscha Krettek

Hi,

first a preliminary question: does the jar file contain 
com.alibaba.fastjson.JSON? Could you maybe list the contents of the jar 
here?


Best,
Aljoscha

On 06.03.20 13:25, ouywl wrote:

Hi all
  When I start a flinkcluster in session mode, It include jm/tm. And then I
submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path  a.jar’. Even
the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception “
/opt/flink/bin/flink run --jobmanager ip:8081 --class
com.netease.java.TopSpeedWindowing --parallelism 1 --detached
/opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath
file:///opt/flink/job/fastjson-1.2.66.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSON
at com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)”
   As I read the code , flink cli have not load the —classspath jar, So It seems
a bug about the flink cli. Are you agree with me?
Best,
Ouywl



回复: 如何通过Flink SQL注册Hbase源表

2020-03-06 Thread psyche19830...@163.com
已经解决了,感谢!



psyche19830...@163.com
 
发件人: psyche19830...@163.com
发送时间: 2020-03-06 17:44
收件人: user-zh
主题: 如何通过Flink SQL注册Hbase源表
各位好,
最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。
我在Hbase的默认命名空间里,创建了一个resume表,表结构如下:

我的Flink测试代码如下:
@Test
public void testReadFromHBase() throws Exception {
StreamExecutionEnvironment 
env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
//HBaseTableSource resume = new HBaseTableSource();
tableEnv.sqlUpdate("create table resume(\n" +
" binfo ROW<>,\n" + 
" edu ROW<>,  \n" +
" work ROW<>  \n" +
") with (" +
" 'connector.type' = 'hbase',  " +
" 'connector.version' = '1.4.3', " +
" 'connector.table-name' = 'resume'," +
" 'connector.zookeeper.quorum' = 
'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," +
" 'connector.zookeeper.znode.parent' = '/hbase'" +
")");
Table table = tableEnv.sqlQuery("select * from resume");
DataStream> out = tableEnv.toRetractStream(table, 
Row.class);
out.print();
env.execute();
}
运行报下面的错误:
org.apache.flink.table.api.ValidationException: Could not map binfo column to 
the underlying physical type root
. No such field.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223)
at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)






psyche19830...@163.com


如何通过Flink SQL注册Hbase源表

2020-03-06 Thread psyche19830...@163.com
各位好,
最近公司想用flink来做流式计算,在研究Flink SQL读写HBase的时候遇到一些问题,希望能从您们那里得到帮忙。
我在Hbase的默认命名空间里,创建了一个resume表,表结构如下:

我的Flink测试代码如下:
@Test
public void testReadFromHBase() throws Exception {
StreamExecutionEnvironment 
env=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
//HBaseTableSource resume = new HBaseTableSource();
tableEnv.sqlUpdate("create table resume(\n" +
" binfo ROW<>,\n" + 
" edu ROW<>,  \n" +
" work ROW<>  \n" +
") with (" +
" 'connector.type' = 'hbase',  " +
" 'connector.version' = '1.4.3', " +
" 'connector.table-name' = 'resume'," +
" 'connector.zookeeper.quorum' = 
'flink01.flink.net:2181,flink02.flink:2181,flink03.flink:2181'," +
" 'connector.zookeeper.znode.parent' = '/hbase'" +
")");
Table table = tableEnv.sqlQuery("select * from resume");
DataStream> out = tableEnv.toRetractStream(table, 
Row.class);
out.print();
env.execute();
}
运行报下面的错误:
org.apache.flink.table.api.ValidationException: Could not map binfo column to 
the underlying physical type root
. No such field.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$null$7(TypeMappingUtils.java:223)
at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)






psyche19830...@163.com


回复:keyby后滚动窗口,watermark如何只触发所在的组,而不触发所有的组

2020-03-06 Thread 17626017841
Hi,
Keyby是把相同的key分配到同一个窗口处理,哪些key分配到哪个窗口,跟你设置得的窗口并行度有关?如果想把某个key分到单独的一个窗口实例我觉得需要自定义partition.


Best,
Sun.Zhu




| |
17626017841
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年03月06日 17:20,Utopia 写道:
Hi,

Flink 的水印在同一个算子上都是一样的,所以每个 keyed stream 共享的是一个水印,不能分别触发器。

Best  regards
Utopia
在 2020年3月6日 +0800 17:10,小旋锋 ,写道:
> 大家好:
> 数据流经过 keyby 分组后,在分别进入滚动窗口:
> sourceDataStream .keyBy(id) 
> .window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() 
> .print()
>
> 测试数据:id 从1~1500,每个id由两条数据,每条数据的 
> eventtime是一样的,最后一条是id为1,eventtime大了20秒用来触发窗口计算的数据
> 实验结果:最后一条用来触发的数据发出后,所有的窗口都计算输出了
>   
> 请问如何配置 最后一条id为1的数据只触发 id=1所在的组的窗口呢?
>
>
> 感谢


(DISSCUSS) flink cli need load '--classpath' files

2020-03-06 Thread ouywl






Hi all    When I start a flinkcluster in session mode, It include jm/tm. And then I submit a job like ‘bin/flink run —jobmanager “ip:8081” —class path  a.jar’. Even the a.jar in all jm/tm and ‘bin/flink’ mechine . It will throw exception “/opt/flink/bin/flink run --jobmanager ip:8081 --class com.netease.java.TopSpeedWindowing --parallelism 1 --detached /opt/flink/job/kafkaDemo19-1.0-SNAPSHOT.jar --classpath file:///opt/flink/job/fastjson-1.2.66.jarStarting execution of programExecuting TopSpeedWindowing example with default input data set.Use --input to specify file input.java.lang.NoClassDefFoundError: com/alibaba/fastjson/JSONat com.netease.java.TopSpeedWindowing.main(TopSpeedWindowing.java:98)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)” As I read the code , flink cli have not load the —classspath jar, So It seems a bug about the flink cli. Are you agree with me?


  



Best,Ouywl



 






Re: How do I get the value of 99th latency inside an operator?

2020-03-06 Thread Aljoscha Krettek

Hi,

I'm afraid you're correct, this is currently not exposed and you would 
have to hack around some things and/or use reflection.


AbstractStreamOperator has a field latencyStats [1], which is what holds 
the metrics. This is being updated from method 
reportOrUpdateLatencyMarker [2].


I hope that helps somewhat.

Best,
Aljoscha

[1] 
https://github.com/apache/flink/blob/ab642cb616fb909893e2c650b0b4c2aa10407e6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L154
[2] 
https://github.com/apache/flink/blob/ab642cb616fb909893e2c650b0b4c2aa10407e6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L702


On 05.03.20 18:17, Felipe Gutierrez wrote:

Hi community,

where from the Dlink code I can get the value of 99th percentile latency
(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{operator_id="93352199ce18d8917f20fdf82cedb1b4",quantile="0.99"})?

Probably I will have to hack the Flink source code to export those values
to my own operator. Nevertheless, it is what I need.

Kind Regards,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*



Re: The parallelism of sink is always 1 in sqlUpdate

2020-03-06 Thread Jingsong Li
Which sink do you use?
It depends on sink implementation like [1]

[1]
https://github.com/apache/flink/blob/2b13a4155fd4284f6092decba867e71eea058043/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java#L147

Best,
Jingsong Lee

On Fri, Mar 6, 2020 at 6:37 PM faaron zheng  wrote:

> Thanks for you attention.  The input of sink is 500, and there is no order
> by and limit.
>
> Jingsong Li  于 2020年3月6日周五 下午6:15写道:
>
>> Hi faaron,
>>
>> For sink parallelism.
>> - What is parallelism of the input of sink? The sink parallelism should
>> be same.
>> - Does you sql have order by or limit ?
>> Flink batch sql not support range partition now, so it will use single
>> parallelism to run order by.
>>
>> For the memory of taskmanager.
>> There is manage memory option to configure.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html#managed-memory
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Mar 6, 2020 at 5:38 PM faaron zheng 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am trying to use flink sql to run hive task. I use tEnv.sqlUpdate to
>>> execute my sql which looks like "insert overtwrite ... select ...". But I
>>> find the parallelism of sink is always 1, it's intolerable for large data.
>>> Why it happens? Otherwise, Is there any guide to decide the memory of
>>> taskmanager when I have two huge table to hashjoin, for example, each table
>>> has several TB data?
>>>
>>> Thanks,
>>> Faaron
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Re: How to print the aggregated state everytime it is updated?

2020-03-06 Thread kant kodali
Hi,

Thanks for this. so how can I emulate an infinite window while outputting
every second? simply put, I want to store the state forever (say years) and
since rocksdb is my state backend I am assuming I can state the state until
I run out of disk. However I want to see all the updates to the states
every second. sounds to me I need to have a window of one second, compute
for that window and pass it on to next window or is there some other way?

Thanks

On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu  wrote:

> Hi
>
> From the description, you use window operator, and set to event time. then
> you should call `DataStream.assignTimestampsAndWatermarks` to set the
> timestamp and watermark.
> Window is triggered when the watermark exceed the window end time
>
> Best,
> Congxian
>
>
> kant kodali  于2020年3月4日周三 上午5:11写道:
>
>> Hi All,
>>
>> I have a custom aggregated state that is represent by Set and I
>> have a stream of values coming in from Kafka where I inspect, compute the
>> custom aggregation and store it in Set. Now, I am trying to figureout
>> how do I print the updated value everytime this state is updated?
>>
>> Imagine I have a Datastream>
>>
>> I tried few things already but keep running into the following exception.
>> Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
>> watermarks are not mandatory in Flink especially when I want to keep this
>> aggregated state forever. any simple code sample on how to print the
>> streaming aggregated state represented by Datastream> will be
>> great! You can imagine my Set has a toString() method that takes
>> cares of printing..and I just want to see those values in stdout.
>>
>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
>> timestamp (= no timestamp marker). Is the time characteristic set to
>> 'ProcessingTime', or did you forget to call
>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>
>


Re: The parallelism of sink is always 1 in sqlUpdate

2020-03-06 Thread faaron zheng
Thanks for you attention.  The input of sink is 500, and there is no order
by and limit.

Jingsong Li  于 2020年3月6日周五 下午6:15写道:

> Hi faaron,
>
> For sink parallelism.
> - What is parallelism of the input of sink? The sink parallelism should be
> same.
> - Does you sql have order by or limit ?
> Flink batch sql not support range partition now, so it will use single
> parallelism to run order by.
>
> For the memory of taskmanager.
> There is manage memory option to configure.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html#managed-memory
>
> Best,
> Jingsong Lee
>
> On Fri, Mar 6, 2020 at 5:38 PM faaron zheng  wrote:
>
>> Hi all,
>>
>> I am trying to use flink sql to run hive task. I use tEnv.sqlUpdate to
>> execute my sql which looks like "insert overtwrite ... select ...". But I
>> find the parallelism of sink is always 1, it's intolerable for large data.
>> Why it happens? Otherwise, Is there any guide to decide the memory of
>> taskmanager when I have two huge table to hashjoin, for example, each table
>> has several TB data?
>>
>> Thanks,
>> Faaron
>>
>
>
> --
> Best, Jingsong Lee
>


Re: The parallelism of sink is always 1 in sqlUpdate

2020-03-06 Thread Jingsong Li
Hi faaron,

For sink parallelism.
- What is parallelism of the input of sink? The sink parallelism should be
same.
- Does you sql have order by or limit ?
Flink batch sql not support range partition now, so it will use single
parallelism to run order by.

For the memory of taskmanager.
There is manage memory option to configure.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html#managed-memory

Best,
Jingsong Lee

On Fri, Mar 6, 2020 at 5:38 PM faaron zheng  wrote:

> Hi all,
>
> I am trying to use flink sql to run hive task. I use tEnv.sqlUpdate to
> execute my sql which looks like "insert overtwrite ... select ...". But I
> find the parallelism of sink is always 1, it's intolerable for large data.
> Why it happens? Otherwise, Is there any guide to decide the memory of
> taskmanager when I have two huge table to hashjoin, for example, each table
> has several TB data?
>
> Thanks,
> Faaron
>


-- 
Best, Jingsong Lee


The parallelism of sink is always 1 in sqlUpdate

2020-03-06 Thread faaron zheng
Hi all,

I am trying to use flink sql to run hive task. I use tEnv.sqlUpdate to
execute my sql which looks like "insert overtwrite ... select ...". But I
find the parallelism of sink is always 1, it's intolerable for large data.
Why it happens? Otherwise, Is there any guide to decide the memory of
taskmanager when I have two huge table to hashjoin, for example, each table
has several TB data?

Thanks,
Faaron


Re: How to print the aggregated state everytime it is updated?

2020-03-06 Thread Congxian Qiu
Hi

>From the description, you use window operator, and set to event time. then
you should call `DataStream.assignTimestampsAndWatermarks` to set the
timestamp and watermark.
Window is triggered when the watermark exceed the window end time

Best,
Congxian


kant kodali  于2020年3月4日周三 上午5:11写道:

> Hi All,
>
> I have a custom aggregated state that is represent by Set and I have
> a stream of values coming in from Kafka where I inspect, compute the custom
> aggregation and store it in Set. Now, I am trying to figureout how do
> I print the updated value everytime this state is updated?
>
> Imagine I have a Datastream>
>
> I tried few things already but keep running into the following exception.
> Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
> watermarks are not mandatory in Flink especially when I want to keep this
> aggregated state forever. any simple code sample on how to print the
> streaming aggregated state represented by Datastream> will be
> great! You can imagine my Set has a toString() method that takes
> cares of printing..and I just want to see those values in stdout.
>
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
> (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>


Re: keyby后滚动窗口,watermark如何只触发所在的组,而不触发所有的组

2020-03-06 Thread Utopia
Hi,

Flink 的水印在同一个算子上都是一样的,所以每个 keyed stream 共享的是一个水印,不能分别触发器。

Best  regards
Utopia
在 2020年3月6日 +0800 17:10,小旋锋 ,写道:
> 大家好:
> 数据流经过 keyby 分组后,在分别进入滚动窗口:
> sourceDataStream .keyBy(id) 
> .window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() 
> .print()
>
> 测试数据:id 从1~1500,每个id由两条数据,每条数据的 
> eventtime是一样的,最后一条是id为1,eventtime大了20秒用来触发窗口计算的数据
> 实验结果:最后一条用来触发的数据发出后,所有的窗口都计算输出了
>   
> 请问如何配置 最后一条id为1的数据只触发 id=1所在的组的窗口呢?
>
>
> 感谢


keyby????????????watermark????????????????????????????????????

2020-03-06 Thread ??????

?? keyby 
sourceDataStream .keyBy(id) 
.window(TumblingEventTimeWindows.of(Time.seconds(10L))).reduce() .print()
   
??id ??1~1500??id?? 
eventtimeid??1??eventtime20

??
  
 id??1 
id=1??




Re: java.time.LocalDateTime in POJO type

2020-03-06 Thread Guowei Ma
Hi KristoffSC,

As far as I know, there is no simple API to let you directly use the
LocalTimeTypeInfo for LocalDataTime in your POJO class. (maybe other guys
know)

If the serializer/deserializer of LocalDataTime is very critical for you
there might be two methods.

1. Using the StreamExecutionEnvironment::registerTypeWithKryoSerializer to
register your own serializer/deserializer for the LocalDataTime.class
2. Register a new TypeInfoFactory for your Pojo. [1] This could reuse
the LocalTimeTypeInfo.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#defining-type-information-using-a-factory

Best,
Guowei


KristoffSC  于2020年3月5日周四 下午10:58写道:

> Thanks,
> do you have any example how I could use it?
>
> Basically I have a POJO class that has LocalDateTime filed in it.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Backpressure and 99th percentile latency

2020-03-06 Thread Felipe Gutierrez
Thanks for the clarified answer @Zhijiang, I am gonna monitor
inputQueueLength and outputQueueLength to check some relation with
backpressure. Although I think it is better to use outPoolUsage and
inPoolUsage according to [1].
However, in your opinion is it better (faster to see) to use
inputQueueLength and outputQueueLength or outPoolUsage and inPoolUsage
to monitor a consequence of backpressure? I mean, is there a faster
way to show that the latency increased due to backpressure? Maybe if I
create my own metric on my own operator or udf?

Thanks @Arvid. In the end I want to be able to hold SLAs. For me, the
SLA would be the minimum latency. If I understood correctly, in the
time that I started to have backpressure the latency track metrics are
not a very precise indication of how much backpressure my application
is suffering. It just indicates that there is backpressure.
What would you say that is more less precise metric to tune the
throughput in order to not have backpressure. Something like, if I
have 50,000 milliseconds of latency and the normal latency is 150
milliseconds, the throughput has to decrease by a factor of 50,000/150
times.

Just a note. I am not changing the throughput of the sources yet. I am
changing the size of the window without restart the job. But I guess
they have the same meaning for this question.

[1] https://flink.apache.org/2019/07/23/flink-network-stack-2.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


On Fri, Mar 6, 2020 at 8:17 AM Arvid Heise  wrote:
>
> Hi Felipe,
>
> latency under backpressure has to be carefully interpreted. Latency's 
> semantics actually require that the data source is read in a timely manner; 
> that is, there is no bottleneck in your pipeline where data is piling up.
>
> Thus, to measure latency in experiments you must ensure that the current 
> throughput is below the maximum throughput, for example by gradually 
> increasing the throughput with a generating source or through some throttles 
> on the external source. Until you reach the maximum throughput, latencies 
> semantics is exactly like you expect it. Everything after that is more or 
> less just reciprocal to backpressure.
>
> If you go away from the theoretical consideration and look at actual setups, 
> you can easily see why this distinction makes sense: if you have a 
> low-latency application, you are doomed if you have backpressure (cannot hold 
> SLAs). You would immediately rescale if you see signs of backpressure (or 
> even earlier). Then, latency always has the desired semantics.
>
> On Fri, Mar 6, 2020 at 5:55 AM Zhijiang  wrote:
>>
>> Hi Felipe,
>>
>> Try to answer your below questions.
>>
>> > I understand that I am tracking latency every 10 seconds for each physical 
>> > instance operator. Is that right?
>>
>> Generally right. The latency marker is emitted from source and flow through 
>> all the intermediate operators until sink. This interval controls the 
>> emitting frequency of source.
>>
>> > The backpressure goes away but the 99th percentile latency is still the 
>> > same. Why? Does it have no relation with each other?
>>
>> The latency might be influenced by buffer flush timeout, network transport 
>> and load, etc.  In the case of backpressure, there are huge in-flight data 
>> accumulated in network wire, so the latency marker is queuing to wait for 
>> network transport which might bring obvious delay. Even the latency marker 
>> can not be emitted in time from source because of no available buffers 
>> temporarily.
>>
>> After the backpressure goes away, that does not mean there are no 
>> accumulated buffers on network wire, just not reaching the degree of 
>> backpressure. So the latency marker still needs to be queued with 
>> accumulated buffers on the wire. And it might take some time to digest the 
>> previous accumulated buffers completed to relax the latency. I guess it 
>> might be your case. You can monitor the metrics of "inputQueueLength" and 
>> "outputQueueLength" for confirming the status. Anyway, the answer is yes 
>> that it has relation with backpressure, but might have some delay to see the 
>> changes obviously.
>>
>> >In the end I left the experiment for more than 2 hours running and only 
>> >after about 1,5 hour the 99th percentile latency got down to milliseconds. 
>> >Is that normal?
>>
>> I guess it is normal as mentioned above.  After there are no accumulated 
>> buffers in network stack completely without backpressure, it should go down 
>> to milliseconds.
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Felipe Gutierrez 
>> Send Time:2020 Mar. 6 (Fri.) 05:04
>> To:user 
>> Subject:Backpressure and 99th percentile latency
>>
>> Hi,
>>
>> I am a bit confused about the topic of tracking latency in Flink [1]. It 
>> says if I use the 

Re: How to override flink-conf parameters for SQL Client

2020-03-06 Thread Gyula Fóra
I feel that the current configuration section of the environment file
assumes too much about what the user wants to configure.
Configuring Table specific options is one thing but there are certainly
many many cases where users are deploying jobs in a per-job-cluster mode and
being able to override certain Flink configs is very useful.

With the huge improvements to the StreamExecutionEnv configuration coming
in 1.10 this is even more so.

I would propose to introduce a differentiation logic between "hard-coded"
configs (the ones we have now) and arbitrary config parameters that should
override the Configuration coming from flink-conf.yaml.
it could look something like:

flink-config:
  state.backend : ROCKSDB
  ..
table-config:
  what we have now


On Fri, Mar 6, 2020 at 3:22 AM Jeff Zhang  wrote:

> There's 2 kinds of configuration: job level & cluster level. I am afraid
> we don't have document to differentiate that, it depends on how user
> understand these configuration. We may need to improve document on that.
>
> Kurt Young  于2020年3月6日周五 上午8:34写道:
>
>> If you already have a running flink cluster and you want submit another
>> job to this cluster, then all the configurations
>> relates to process parameters like TM memory, slot number etc are not be
>> able to modify.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra  wrote:
>>
>>> Kurt can you please explain which conf parameters do you mean?
>>>
>>> In regular executions (Yarn for instance) we  have dynamic config
>>> parameters overriding any flink-conf argument.
>>> So it is not about setting them in the user code but it should happen
>>> before the ClusterDescriptors are created (ie in the together with the
>>> CustomCommandLine logic)
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:49 PM Kurt Young  wrote:
>>>
 IIRC the tricky thing here is not all the config options belong to
 flink-conf.yaml can be adjust dynamically in user's program.
 So it will end up like some of the configurations can be overridden but
 some are not. The experience is not quite good for users.

 Best,
 Kurt


 On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:

> Hi Gyula,
>
> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
> that user could override any features in flink-conf.yaml. (Actually any
> features here
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
> Of course you can run flink sql in Zeppelin, and could also leverage other
> features of Zeppelin, like visualization.
>
> If you are interested, you could try the master branch of Zeppelin +
> this improvement PR
>
> https://github.com/apache/zeppelin
> https://github.com/apache/zeppelin/pull/3676
>
> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>
>
>
>
>
>
> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>
>> I could basically list a few things I want to set (execution.target
>> for example), but it's fair to assume that I would like to be able to set
>> anything :)
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Maybe Blink planner has invoked
>>> "StreamExecutionEnvironment.configure", which planner do you use?
>>>
>>> But "StreamExecutionEnvironment.configure" is only for partial
>>> configuration, can not for all configuration in flink-conf.yaml.
>>> So what's the config do you want to set? I know some config like
>>> "taskmanager.network.blocking-shuffle.compression.enabled" can not 
>>> set
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>>
 Hi Gyula,

 Flink configurations can be overrided via
 `TableConfig#getConfiguration()`, however, SQL CLI only allows to set 
 Table
 specific configs.
 I will think it as a bug/improvement of SQL CLI which should be
 fixed in 1.10.1.

 Best,
 Jark

 On Thu, 5 Mar 2020 at 18:12, Gyula Fóra 
 wrote:

> Thanks Caizhi,
>
> This seems like a pretty big shortcoming for any
> multi-user/multi-app environment. I will open a jira for this.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
> wrote:
>
>> Hi Gyula.
>>
>> I'm afraid there is no way to override all Flink configurations
>> currently. SQL client yaml file can only override some of the Flink
>> configurations.
>>
>> Configuration entries indeed can only set Table specific configs,
>> while deployment entires are used to set the result fetching address 
>> and
>> port. There is currently no way to change the