Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 Thread Jingsong Li
注意时区哦,SQL层默认使用UTC的long值

On Thu, Oct 29, 2020 at 12:12 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
> 但是我后来设置source 产生出watermark 还是不行;
> val dataStream = streamEnv.addSource(new MySource)
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
>   .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
> override def extractTimestamp(element: UserInfo, recordTimestamp:
> Long): Long = element.getTs.getTime
>   }))
> 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2020-10-28 16:29
> 收件人: user-zh
> 主题: Re: flink hive Streaming查询不到数据的问题
> Hi,
>
> 你的Source看起来并没有产出watermark,所以:
>
> 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
>
> Best,
> Jingsong
>
> On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
> > 你好:
> > 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> > 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> > 下面是我的代码
> >  object StreamMain {
> >   def main(args: Array[String]): Unit = {
> > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > streamEnv.setParallelism(3)
> >
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> >   .useBlinkPlanner()
> >   .inStreamingMode()
> >   .build()
> >
> > val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
> >
> > val dataStream = streamEnv.addSource(new MySource)
> >
> > val catalogName = "my_catalog"
> > val catalog = new HiveCatalog(
> >   catalogName,  // catalog name
> >   "yutest",// default database
> >
> >   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive
> config (hive-site.xml) directory
> >   "1.1.0"   // Hive version
> > )
> > tableEnv.registerCatalog(catalogName, catalog)
> > tableEnv.useCatalog(catalogName)
> >
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.useDatabase("yutest")
> >
> >
> > tableEnv.createTemporaryView("users", dataStream)
> > tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> > //  如果hive中已经存在了相应的表,则这段代码省略
> > val hiveSql = """CREATE external TABLE fs_table (
> > user_id STRING,
> > order_amount DOUBLE
> >   )
> >   partitioned by(
> >   dt string,
> >   h string,
> >   m string) stored as parquet
> >   TBLPROPERTIES (
> >
> > 'partition.time-extractor.timestamp-pattern'='$dt
> $h:$m:00',
> > 'sink.partition-commit.delay'='0s',
> > 'sink.partition-commit.trigger'='partition-time',
> >
> >
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> >   )""".stripMargin
> > tableEnv.executeSql(hiveSql)
> >
> >
> > val insertSql = "insert into  fs_table SELECT userId, amount, " + "
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM users"
> > tableEnv.executeSql(insertSql)
> >   }
> > }
> > public class MySource implements SourceFunction {
> > private volatile boolean run = true;
> > String userids[] = {
> >
> > "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
> >
> > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
> >
> > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
> >
> > "3ebfb9602ac07779||3ebfe9612a007979",
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
> >
> > "e7e896cd939685d7||e7e8e6c1930689d7",
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> > };
> >
> > @Override
> >
> > public void run(SourceFunction.SourceContext
> sourceContext) throws Exception {
> >
> > while (run) {
> >
> > String userid = userids[(int) (Math.random() *
> (userids.length - 1))];
> > UserInfo userInfo = new UserInfo();
> > userInfo.setUserId(userid);
> > userInfo.setAmount(Math.random() * 100);
> > userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> > sourceContext.collect(userInfo);
> > Thread.sleep(100);
> > }
> > }
> >
> > @Override
> > public void cancel() {
> > run = 

Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 Thread hdxg1101300...@163.com
我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
但是我后来设置source 产生出watermark 还是不行;
val dataStream = streamEnv.addSource(new MySource)
  
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
  .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
override def extractTimestamp(element: UserInfo, recordTimestamp: 
Long): Long = element.getTs.getTime
  }))
生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的



hdxg1101300...@163.com
 
发件人: Jingsong Li
发送时间: 2020-10-28 16:29
收件人: user-zh
主题: Re: flink hive Streaming查询不到数据的问题
Hi,
 
你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
 
Best,
Jingsong
 
On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:
 
> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
> userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> sourceContext.collect(userInfo);
> Thread.sleep(100);
> }
> }
>
> @Override
> public void cancel() {
> run = false;
> }
> }
> public class UserInfo implements Serializable {
> private String userId;
> private Double amount;
> private Timestamp ts;
>
> public String getUserId() {
> return userId;
> }
>
> public void setUserId(String userId) {
> this.userId = userId;
> }
>
> public 

Re: flink1.11 kafka connector

2020-10-28 Thread Jark Wu
目前还不支持,可以去社区开个 issue,看能不能赶上1.12

Best,
Jark


On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:

> hi、
> 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
>
>- fixed:每个Flink分区最多只能有一个Kafka分区。
>- round-robin:Flink分区循环分配给Kafka分区。
>


Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Alexander Bagerman
I did try it but this option seems to be for a third party jar. In my case
I would need to specify/ship a jar that contains the code where job is
being constracted. I'm not clear of
1. how to point to the containg jar
2. how to test such a submission from my project running in Eclipse
Alex

On Wed, Oct 28, 2020 at 8:21 PM Yun Gao  wrote:

> Hi Alexander,
>
> The signature of the createRemoteEnvironment is
>
> public static StreamExecutionEnvironment createRemoteEnvironment(
>   String host, int port, String... jarFiles);
>
> Which could also ship the jars to execute to remote cluster. Could you have a 
> try to also pass the jar files to the remote environment ?
>
>
> Best,
>
>  Yun
>
> --
> Sender:Alexander Bagerman
> Date:2020/10/29 10:43:16
> Recipient:
> Theme:How to deploy dynamically generated flink jobs?
>
>
>
> Hi,
>
> I am trying to build a functionality to dynamically configure a flink job
> (Java) in my code based on some additional metadata and submit it to a
> flink running in a session cluster.
>
> Flink version is 1.11.2
>
> The problem I have is how to provide a packed job to the cluster. When I
> am trying the following code
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);
> ... configuring job workflow here...
> env.execute(jobName);
>
> I am getting ClassNotFoundException stating that code for my mapping
> functions did not make it to the cluster. Which makes sense.
>
> What would be the right way to deploy dynamically configured flink jobs
> which are not packaged as a jar file but rather generated ad-hoc?
>
> Thanks
>
>


Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Yun Gao
Hi Alexander, 

The signature of the createRemoteEnvironment is 
public static StreamExecutionEnvironment createRemoteEnvironment(
  String host, int port, String... jarFiles);
Which could also ship the jars to execute to remote cluster. Could you have a 
try to also pass the jar files to the remote environment ?

Best,
 Yun
--
Sender:Alexander Bagerman
Date:2020/10/29 10:43:16
Recipient:
Theme:How to deploy dynamically generated flink jobs?

Hi,
I am trying to build a functionality to dynamically configure a flink job 
(Java) in my code based on some additional metadata and submit it to a flink 
running in a session cluster.
Flink version is 1.11.2
The problem I have is how to provide a packed job to the cluster. When I am 
trying the following code
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);
... configuring job workflow here...
env.execute(jobName);

I am getting ClassNotFoundException stating that code for my mapping functions 
did not make it to the cluster. Which makes sense.
What would be the right way to deploy dynamically configured flink jobs which 
are not packaged as a jar file but rather generated ad-hoc?
Thanks



Re: No pooled slot available and request to ResourceManager for new slot failed

2020-10-28 Thread Yangze Guo
Hi,

你job的并发是多少?一共请求了多少个slot?
方便的话最好发一下jm的日志来帮助排查

Best,
Yangze Guo

On Thu, Oct 29, 2020 at 10:07 AM marble.zh...@coinflex.com.INVALID
 wrote:
>
> 大家好。
>
> 只有一个job,设置了jm/tm各总内存为3G,一个taskmanager,总共10个slot,为什么还是报这个错?
>
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot failed
> ... 27 more
>
> 有没有一些建议,谢谢。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


How to deploy dynamically generated flink jobs?

2020-10-28 Thread Alexander Bagerman
Hi,

I am trying to build a functionality to dynamically configure a flink job
(Java) in my code based on some additional metadata and submit it to a
flink running in a session cluster.

Flink version is 1.11.2

The problem I have is how to provide a packed job to the cluster. When I am
trying the following code

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(hostName,
hostPort);
... configuring job workflow here...
env.execute(jobName);

I am getting ClassNotFoundException stating that code for my mapping
functions did not make it to the cluster. Which makes sense.

What would be the right way to deploy dynamically configured flink jobs
which are not packaged as a jar file but rather generated ad-hoc?

Thanks


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Jingsong Li
+1 to remove the Bucketing Sink.

Thanks for the effort on ORC and `HadoopPathBasedBulkFormatBuilder`, I
think it's safe to get rid of the old Bucketing API with them.

Best,
Jingsong

On Thu, Oct 29, 2020 at 3:06 AM Kostas Kloudas  wrote:

> Thanks for the discussion!
>
> From this thread I do not see any objection with moving forward with
> removing the sink.
> Given this I will open a voting thread tomorrow.
>
> Cheers,
> Kostas
>
> On Wed, Oct 28, 2020 at 6:50 PM Stephan Ewen  wrote:
> >
> > +1 to remove the Bucketing Sink.
> >
> > It has been very common in the past to remove code that was deprecated
> for multiple releases in favor of reducing baggage.
> > Also in cases that had no perfect drop-in replacement, but needed users
> to forward fit the code.
> > I am not sure I understand why this case is so different.
> >
> > Why the Bucketing Sink should be thrown out, in my opinion:
> >
> > The Bucketing sink makes it easier for users to add general Hadoop
> writes.
> > But the price is that it easily leads to dataloss, because it assumes
> flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS
> works somewhat, S3 works not at all).
> > I think the Bucketing sink is a trap for users, that's why it was
> deprecated long ago.
> >
> > The StreamingFileSink covers the majority of cases from the Bucketing
> Sink.
> > It does have some friction when adding/wrapping some general Hadoop
> writers. Parts will be solved with the transactional sink work.
> > If something is missing and blocking users, we can prioritize adding it
> to the Streaming File Sink. Also that is something we did before and it
> helped being pragmatic with moving forward, rather than being held back by
> "maybe there is something we don't know".
> >
> >
> >
> >
> > On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler 
> wrote:
> >>
> >> Then we can't remove it, because there is no way for us to ascertain
> >> whether anyone is still using it.
> >>
> >> Sure, the user ML is the best we got, but you can't argue that we don't
> >> want any users to be affected and then use an imperfect mean to find
> users.
> >> If you are fine with relying on the user ML, then you _are_ fine with
> >> removing it at the cost of friction for some users.
> >>
> >> To be clear, I, personally, don't have a problem with removing it (we
> >> have removed other connectors in the past that did not have a migration
> >> plan), I just reject he argumentation.
> >>
> >> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
> >> > No, I do not think that "we are fine with removing it at the cost of
> >> > friction for some users".
> >> >
> >> > I believe that this can be another discussion that we should have as
> >> > soon as we establish that someone is actually using it. The point I am
> >> > trying to make is that if no user is using it, we should remove it and
> >> > not leave unmaintained code around.
> >> >
> >> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler 
> wrote:
> >> >> The alternative could also be to use a different argument than "no
> one
> >> >> uses it", e.g., we are fine with removing it at the cost of friction
> for
> >> >> some users because there are better alternatives.
> >> >>
> >> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
> >> >>> I think that the mailing lists is the best we can do and I would say
> >> >>> that they seem to be working pretty well (e.g. the recent Mesos
> >> >>> discussion).
> >> >>> Of course they are not perfect but the alternative would be to never
> >> >>> remove anything user facing until the next major release, which I
> find
> >> >>> pretty strict.
> >> >>>
> >> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler <
> ches...@apache.org> wrote:
> >>  If the conclusion is that we shouldn't remove it if _anyone_ is
> using
> >>  it, then we cannot remove it because the user ML obviously does not
> >>  reach all users.
> >> 
> >>  On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> >> > Hi all,
> >> >
> >> > I am bringing the up again to see if there are any users actively
> >> > using the BucketingSink.
> >> > So far, if I am not mistaken (and really sorry if I forgot
> anything),
> >> > it is only a discussion between devs about the potential problems
> of
> >> > removing it. I totally understand Chesnay's concern about not
> >> > providing compatibility with the StreamingFileSink (SFS) and if
> there
> >> > are any users, then we should not remove it without trying to
> find a
> >> > solution for them.
> >> >
> >> > But if there are no users then I would still propose to remove the
> >> > module, given that I am not aware of any efforts to provide
> >> > compatibility with the SFS any time soon.
> >> > The reasons for removing it also include the facts that we do not
> >> > actively maintain it and we do not add new features. As for
> potential
> >> > missing features in the SFS compared to the BucketingSink that was

Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-28 Thread Jark Wu
issue created: https://issues.apache.org/jira/browse/FLINK-19861



On Wed, 28 Oct 2020 at 11:00, Danny Chan  wrote:

> Our behavior also conflicts with the SQL standard, we should also mention
> this in the document.
>
> Till Rohrmann  于2020年10月27日周二 下午10:37写道:
>
>> Thanks for the clarification. This improvement would be helpful, I
>> believe.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 27, 2020 at 1:19 PM Jark Wu  wrote:
>>
>>> Hi Till,
>>>
>>> The documentation mentions that "this function is not deterministic"
>>> where the "not deterministic" means the value of this function is not
>>> deterministic for every record.
>>> However, this is not very clear for users. I think we can improve the
>>> documentation.
>>>
>>> Best,
>>> Jark
>>>
>>> On Tue, 27 Oct 2020 at 15:59, Till Rohrmann 
>>> wrote:
>>>
 Quick question Jark: Is this difference in behaviour documented? I
 couldn't find it in the docs.

 Cheers,
 Till

 On Tue, Oct 27, 2020 at 7:30 AM Jark Wu  wrote:

> Hi Longdexin,
>
> In traditional batch sql, NOW() is executed and determined before the
> job is submitted and will not change for every processed record.
> However, this doesn't make much sense in streaming sql, therefore,
> NOW() function in Flink is executed for every record.
>
> Best,
> Jark
>
> On Fri, 23 Oct 2020 at 16:30, Till Rohrmann 
> wrote:
>
>> Hi Longdexin,
>>
>> thanks for reaching out to the Flink community. I am pulling in Jark
>> who might be able to help you with this question.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:
>>
>>> From my point of view, the value of NOW() function in SQL is certain
>>> by the
>>> time when the streaming app is launched and will not change with the
>>> process
>>> time. However, as a new Flink user, I'm not so sure of that. By the
>>> way, if
>>> my attemp is to keep the time logic to update all the time, what
>>> should I
>>> do?
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


No pooled slot available and request to ResourceManager for new slot failed

2020-10-28 Thread marble.zh...@coinflex.com.INVALID
大家好,

我已经分配了8个taskmanager.numberOfTaskSlots,但还是遇到如下exception,
我为job/task分配了每个3G的总内存。有没有什么建议?, 谢谢

Caused by:
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
Could not fulfill slot request 294b93e601744edd7be66dec41e8d8ed. Requested
resource profile (ResourceProfile{UNKNOWN}) is unfulfillable.
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$fulfillPendingSlotRequestWithPendingTaskManagerSlot$11(SlotManagerImpl.java:883)
at
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:879)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:866)
at
org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:52)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:866)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:386)
... 26 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/


No pooled slot available and request to ResourceManager for new slot failed

2020-10-28 Thread marble.zh...@coinflex.com.INVALID
大家好。

只有一个job,设置了jm/tm各总内存为3G,一个taskmanager,总共10个slot,为什么还是报这个错?

Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
No pooled slot available and request to ResourceManager for new slot failed
... 27 more

有没有一些建议,谢谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink??????????????????

2020-10-28 Thread ????????????
??
  Linuxflink??31.9.3
 ??1?? ./start-cluster.sh  Name or service not 
knownname ce-hjjcgl-svr-02??21??3??
 [appuser@ce-hjjcgl-svr-01 bin]$ ./start-cluster.sh
 Starting cluster.
 Starting standalonesession daemon on host ce-hjjcgl-svr-01.
 : Name or service not knownname ce-hjjcgl-svr-02
 Starting taskexecutor daemon on host ce-hjjcgl-svr-03.





 ??2??./start-cluster.sh??
 [appuser@ce-hjjcgl-svr-02 bin]$ ./start-cluster.sh
Starting cluster.
[INFO] 1 instance(s) of standalonesession are already running on 
ce-hjjcgl-svr-02.
Starting standalonesession daemon on host ce-hjjcgl-svr-02.
: Name or service not knownname ce-hjjcgl-svr-02




 ??sshhosts??
 127.0.0.1 localhost localhost.localdomain localhost4 
localhost4.localdomain4
::1localhost localhost.localdomain 
localhost6 localhost6.localdomain6
10.212.139.219 ce-hjjcgl-svr-01
10.212.139.220 ce-hjjcgl-svr-02
10.212.139.221 ce-hjjcgl-svr-03

??masters??
ce-hjjcgl-svr-01:8081

slaves??
ce-hjjcgl-svr-02
ce-hjjcgl-svr-03


2??flink-conf.yaml??

env.java.home: /usr/java/jdk1.8.0_202/


jobmanager.rpc.address: ce-hjjcgl-svr-02

  jobmanager.rpc.port: 6123



??

维表选择

2020-10-28 Thread zjfpla...@hotmail.com
Hi,
请问各位,维表选择问题:Temporal 
table和RichAsyncFunction各自适用领域,优缺点,以及任务停止后启动,sv,cv会不会有什么问题
最好有生产环境的实际使用情况来说下,非常感谢



zjfpla...@hotmail.com


Re: Kubernetes Job Cluster, does it autoterminate?

2020-10-28 Thread Matthias Pohl
Hi Ruben,
thanks for reaching out to us. Flink's native Kubernetes Application mode
[1] might be what you're looking for.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application

On Wed, Oct 28, 2020 at 11:50 AM Ruben Laguna 
wrote:

> Hi,
>
> First time user , I'm just evaluating Flink at the moment, and I was
> reading
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#deploy-job-cluster
> and I don't fully understand if a Job Cluster will autoterminate after
> the job is completed (for at batch job) ?
>
> The examples look to me like  like the task manager pods will continue
> running as it's configured as Deployment.
>
> So is there any way to achieve "autotermination" or am I supposed to
> monitor the job status externally (like from airflow) and delete the
> JobManager and TaskManager kubernetes resources from there?
>
> --
> /Rubén Laguna
>


Re: Could you add some example about this document? Thanks`1

2020-10-28 Thread Robert Metzger
Hi,
from the messages you've sent on the user@ mailing list in the recent
weeks, I see that you are in the process of learning Flink. The Flink
community won't be able to provide you with full, runnable examples for
every method Flink provides.
Rather, we have a few running examples, and conceptual explanations and
small API snippets in our documentation.

I haven't checked all your questions in detail, but I would generally
recommend you to put more effort in trying to figure out problems yourself
before posting to this mailing list.
One starting point for learning Flink is the Flink Training:
https://flink.apache.org/training.html
Also, Flink provides a number of runnable TableAPI examples in
"flink-examples-table".

I'm sorry that not all your questions have been answered here on the list.
The developers sometimes need to prioritize where they spend their time
answering user questions, I hope you understand.

Best regards,
Robert


On Sun, Oct 25, 2020 at 5:01 AM 大森林  wrote:

> Dear Mr  Timo Walther:
>
> I'm learing  document
> 
> and have send 3 mails about
> executeinsert/flataggregate/GroupBy Window Aggregation from the document
> to
> user@flink.apache.org
> each of which is provided with full code,but no replies.
>
> I found
> the authors of above 3 parts are NOT in the mailing list
> Of course I understand they are very busy now.
>
> Could you add some completed examples(java style) of above 3 parts in
> flink github?
>
> Much Thanks for your help.
>
>
>
>
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
Thanks for the discussion!

>From this thread I do not see any objection with moving forward with
removing the sink.
Given this I will open a voting thread tomorrow.

Cheers,
Kostas

On Wed, Oct 28, 2020 at 6:50 PM Stephan Ewen  wrote:
>
> +1 to remove the Bucketing Sink.
>
> It has been very common in the past to remove code that was deprecated for 
> multiple releases in favor of reducing baggage.
> Also in cases that had no perfect drop-in replacement, but needed users to 
> forward fit the code.
> I am not sure I understand why this case is so different.
>
> Why the Bucketing Sink should be thrown out, in my opinion:
>
> The Bucketing sink makes it easier for users to add general Hadoop writes.
> But the price is that it easily leads to dataloss, because it assumes 
> flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS 
> works somewhat, S3 works not at all).
> I think the Bucketing sink is a trap for users, that's why it was deprecated 
> long ago.
>
> The StreamingFileSink covers the majority of cases from the Bucketing Sink.
> It does have some friction when adding/wrapping some general Hadoop writers. 
> Parts will be solved with the transactional sink work.
> If something is missing and blocking users, we can prioritize adding it to 
> the Streaming File Sink. Also that is something we did before and it helped 
> being pragmatic with moving forward, rather than being held back by "maybe 
> there is something we don't know".
>
>
>
>
> On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler  wrote:
>>
>> Then we can't remove it, because there is no way for us to ascertain
>> whether anyone is still using it.
>>
>> Sure, the user ML is the best we got, but you can't argue that we don't
>> want any users to be affected and then use an imperfect mean to find users.
>> If you are fine with relying on the user ML, then you _are_ fine with
>> removing it at the cost of friction for some users.
>>
>> To be clear, I, personally, don't have a problem with removing it (we
>> have removed other connectors in the past that did not have a migration
>> plan), I just reject he argumentation.
>>
>> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
>> > No, I do not think that "we are fine with removing it at the cost of
>> > friction for some users".
>> >
>> > I believe that this can be another discussion that we should have as
>> > soon as we establish that someone is actually using it. The point I am
>> > trying to make is that if no user is using it, we should remove it and
>> > not leave unmaintained code around.
>> >
>> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler  
>> > wrote:
>> >> The alternative could also be to use a different argument than "no one
>> >> uses it", e.g., we are fine with removing it at the cost of friction for
>> >> some users because there are better alternatives.
>> >>
>> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
>> >>> I think that the mailing lists is the best we can do and I would say
>> >>> that they seem to be working pretty well (e.g. the recent Mesos
>> >>> discussion).
>> >>> Of course they are not perfect but the alternative would be to never
>> >>> remove anything user facing until the next major release, which I find
>> >>> pretty strict.
>> >>>
>> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  
>> >>> wrote:
>>  If the conclusion is that we shouldn't remove it if _anyone_ is using
>>  it, then we cannot remove it because the user ML obviously does not
>>  reach all users.
>> 
>>  On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
>> > Hi all,
>> >
>> > I am bringing the up again to see if there are any users actively
>> > using the BucketingSink.
>> > So far, if I am not mistaken (and really sorry if I forgot anything),
>> > it is only a discussion between devs about the potential problems of
>> > removing it. I totally understand Chesnay's concern about not
>> > providing compatibility with the StreamingFileSink (SFS) and if there
>> > are any users, then we should not remove it without trying to find a
>> > solution for them.
>> >
>> > But if there are no users then I would still propose to remove the
>> > module, given that I am not aware of any efforts to provide
>> > compatibility with the SFS any time soon.
>> > The reasons for removing it also include the facts that we do not
>> > actively maintain it and we do not add new features. As for potential
>> > missing features in the SFS compared to the BucketingSink that was
>> > mentioned before, I am not aware of any fundamental limitations and
>> > even if there are, I would assume that the solution is not to direct
>> > the users to a deprecated sink but rather try to increase the
>> > functionality of the actively maintained one.
>> >
>> > Please keep in mind that the BucketingSink is deprecated since FLINK
>> > 1.9 and there is a new File Sink that is coming as part of 

Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never recovers

2020-10-28 Thread Bohinski, Kevin
Hi Yang,

Thanks again for all the help!

We are still seeing this with 1.11.2 and ZK.
Looks like others are seeing this as well and they found a solution 
https://translate.google.com/translate?hl=en=zh-CN=https://cloud.tencent.com/developer/article/1731416=search

Should this solution be added to 1.12?

Best
kevin

On 2020/08/14 02:48:50, Yang Wang mailto:d...@gmail.com>> wrote:
> Hi kevin,>
>
> Thanks for sharing more information. You are right. Actually, "too old>
> resource version" is caused by a bug>
> of fabric8 kubernetes-client[1]. It has been fix in v4.6.1. And we have>
> bumped the kubernetes-client version>
> to v4.9.2 in Flink release-1.11. Also it has been backported to release>
> 1.10 and will be included in the next>
> minor release version(1.10.2).>
>
> BTW, if you really want all your jobs recovered when jobmanager crashed,>
> you still need to configure the Zookeeper high availability.>
>
> [1]. https://github.com/fabric8io/kubernetes-client/pull/1800>
>
>
> Best,>
> Yang>
>
> Bohinski, Kevin mailto:ke...@comcast.com>> 于2020年8月14日周五 
> 上午6:32写道:>
>
> > Might be useful>
> >>
> > https://stackoverflow.com/a/61437982>
> >>
> >>
> >>
> > Best,>
> >>
> > kevin>
> >>
> >>
> >>
> >>
> >>
> > *From: *"Bohinski, Kevin" mailto:ke...@comcast.com>>>
> > *Date: *Thursday, August 13, 2020 at 6:13 PM>
> > *To: *Yang Wang mailto:da...@gmail.com>>>
> > *Cc: *"user@flink.apache.org" 
> > mailto:us...@flink.apache.org>>>
> > *Subject: *Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job>
> > never recovers>
> >>
> >>
> >>
> > Hi>
> >>
> >>
> >>
> > Got the logs on crash, hopefully they help.>
> >>
> >>
> >>
> > 2020-08-13 22:00:40,336 ERROR>
> > org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal>
> > error occurred in ResourceManager.>
> >>
> > io.fabric8.kubernetes.client.KubernetesClientException: too old resource>
> > version: 8617182 (8633230)>
> >>
> > at>
> > io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)>
> > [?:1.8.0_262]>
> >>
> > at>
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)>
> > [?:1.8.0_262]>
> >>
> > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]>
> >>
> > 2020-08-13 22:00:40,337 ERROR>
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal>
> > error occurred in the cluster entrypoint.>
> >>
> > io.fabric8.kubernetes.client.KubernetesClientException: too old resource>
> > version: 8617182 (8633230)>
> >>
> > at>
> > io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > 

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Stephan Ewen
+1 to remove the Bucketing Sink.

It has been very common in the past to remove code that was deprecated for
multiple releases in favor of reducing baggage.
Also in cases that had no perfect drop-in replacement, but needed users to
forward fit the code.
I am not sure I understand why this case is so different.

Why the Bucketing Sink should be thrown out, in my opinion:

The Bucketing sink makes it easier for users to add general Hadoop writes.
But the price is that it easily leads to dataloss, because it assumes
flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS
works somewhat, S3 works not at all).
I think the Bucketing sink is a trap for users, that's why it was
deprecated long ago.

The StreamingFileSink covers the majority of cases from the Bucketing Sink.
It does have some friction when adding/wrapping some general Hadoop
writers. Parts will be solved with the transactional sink work.
If something is missing and blocking users, we can prioritize adding it to
the Streaming File Sink. Also that is something we did before and it helped
being pragmatic with moving forward, rather than being held back by "maybe
there is something we don't know".




On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler 
wrote:

> Then we can't remove it, because there is no way for us to ascertain
> whether anyone is still using it.
>
> Sure, the user ML is the best we got, but you can't argue that we don't
> want any users to be affected and then use an imperfect mean to find users.
> If you are fine with relying on the user ML, then you _are_ fine with
> removing it at the cost of friction for some users.
>
> To be clear, I, personally, don't have a problem with removing it (we
> have removed other connectors in the past that did not have a migration
> plan), I just reject he argumentation.
>
> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
> > No, I do not think that "we are fine with removing it at the cost of
> > friction for some users".
> >
> > I believe that this can be another discussion that we should have as
> > soon as we establish that someone is actually using it. The point I am
> > trying to make is that if no user is using it, we should remove it and
> > not leave unmaintained code around.
> >
> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler 
> wrote:
> >> The alternative could also be to use a different argument than "no one
> >> uses it", e.g., we are fine with removing it at the cost of friction for
> >> some users because there are better alternatives.
> >>
> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
> >>> I think that the mailing lists is the best we can do and I would say
> >>> that they seem to be working pretty well (e.g. the recent Mesos
> >>> discussion).
> >>> Of course they are not perfect but the alternative would be to never
> >>> remove anything user facing until the next major release, which I find
> >>> pretty strict.
> >>>
> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler 
> wrote:
>  If the conclusion is that we shouldn't remove it if _anyone_ is using
>  it, then we cannot remove it because the user ML obviously does not
>  reach all users.
> 
>  On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I am bringing the up again to see if there are any users actively
> > using the BucketingSink.
> > So far, if I am not mistaken (and really sorry if I forgot anything),
> > it is only a discussion between devs about the potential problems of
> > removing it. I totally understand Chesnay's concern about not
> > providing compatibility with the StreamingFileSink (SFS) and if there
> > are any users, then we should not remove it without trying to find a
> > solution for them.
> >
> > But if there are no users then I would still propose to remove the
> > module, given that I am not aware of any efforts to provide
> > compatibility with the SFS any time soon.
> > The reasons for removing it also include the facts that we do not
> > actively maintain it and we do not add new features. As for potential
> > missing features in the SFS compared to the BucketingSink that was
> > mentioned before, I am not aware of any fundamental limitations and
> > even if there are, I would assume that the solution is not to direct
> > the users to a deprecated sink but rather try to increase the
> > functionality of the actively maintained one.
> >
> > Please keep in mind that the BucketingSink is deprecated since FLINK
> > 1.9 and there is a new File Sink that is coming as part of FLIP-143
> > [1].
> > Again, if there are any active users who cannot migrate easily, then
> > we cannot remove it before trying to provide a smooth migration path.
> >
> > Thanks,
> > Kostas
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> >
> > On Fri, Oct 16, 2020 at 4:36 PM Chesnay 

Re:LocalBufferPoo死锁

2020-10-28 Thread hailongwang
Hi,
这个应该是下游算子有压力,可以根据 Inpool 指标查看哪个算子有瓶颈,然后对应的进行处理。




Best,
Hailong Wang
在 2020-10-27 18:57:55,"1548069580" <1548069...@qq.com> 写道
>各位好:
>最近遇到一个问题,上游有反压的情况下任务运行一段时间后出现上下游数据均停滞的情况,通过jstack命令发现,source算子阻塞了,同时观察到下游也在等待数据。堆栈如下:
>"Legacy Source Thread - Source: Custom Source (1/2)" #95 prio=5 os_prio=0 
>tid=0x7fafa4018000 nid=0x57d waiting on condition [0x7fb03d48a000]
> java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for <0x00074afaf508 (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:241)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:210)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
>   at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:151)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:122)
>   at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>   - locked <0x00074a5f6a98 (a java.lang.Object)
>   at 
> com.jd.bdp.flink.sink.jimdb.common.SourceTimeMillisMock.run(SourceTimeMillisMock.java:25)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
>
>
>初步认为是卡死在获取Memory Segment,但是在flink ui上观察到Memory Segments 
>Available充足,想咨询下各位这种情况如何分析解决,麻烦各位了。


Re:flinkSQL针对join操作设置不同窗口

2020-10-28 Thread hailongwang
Hi s_hongliang,
1、如果用 DataStream API 的话,可以需要使用 State 对需要被关联的表进行存储,并且设置 TTL。
2、如果使用 SQL 的话:
2.1、可以将需要被关联的数据存入Hbase 或者 Mysql,然后保证只有当天的数据,在 SQL 中使用 Temporal Table[1] 关联。
2.2、使用 temporal-table-function[2] ,设置StateRetentionTime同时过滤掉关联上昨天的数据。
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#temporal-table
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#temporal-table-function




Best,
Hailong Wang
在 2020-10-28 15:06:39,"奔跑的小飞袁"  写道:
>hello
>我们这有一种业务场景是关于两个动态表的join,其中一张表是完全的动态表,去关联另一张动态表中当天的数据,请问这种情况的下join场景支持吗
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:关于并行下watermark不生成

2020-10-28 Thread hailongwang
Hi BenChen,
1. 可以保证需要 watermark 算子之前的算子和前面的算子不是 Forward 。
2. 如果是自己实现的 Connector 的话,可能定时检测调用 SourceFunction#markAsTemporarilyIdle 来标记为 
idle,我看目前 Kafka 是刚启动时候进行检测。
Best,
Hailong Wang
在 2020-10-28 17:54:22,"BenChen"  写道:
>Hi 
>all,在Flink1.11里面新增了WatermarkStetagy来处理某个并行度下没有数据导致watermark不触发的问题,在1.10里面Flink有什么机制解决这个问题吗?谢谢
>
>
>| |
>BenChen
>|
>|
>haibin...@163.com
>|
>签名由网易邮箱大师定制
>


Re:tumbling的窗口更新随着job运行时间越长,delay越久,sliding不会

2020-10-28 Thread hailongwang
Hi marble,
看到你是在 window 内一直使用 agg 累加的,所以可以使用 filesystem backend 
加速,但是可能内存会相对耗的比较多。因为rocksdb backend的话,每一条数据都会有一次put 和 get 的 IO 操作,故会比较慢些。
至于你提到的为什么 24h size,2s slide 的窗口没有延迟,5 min,1s 的连续 trigger 
缺延迟了。这两者的行为不一样,其实没有什么可比的。
对于第二种,trigger 是依靠 timer 注册触发的,这样的话每秒都需要进行触发(如果是 process time),这样可能会太密集了。




Best,
Hailong Wang
在 2020-10-28 16:21:24,"marble.zh...@coinflex.com.INVALID" 
 写道:
>大家好。
>
>我用的tumbling window,
>ds.keyBy(CandleView::getMarketCode)
>.timeWindow(Time.minutes(5L))
>   
>.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>.aggregate(new OhlcAggregateFunction(), new
>OhlcWindowFunction())
>.addSink(new PgSink(jdbcUrl, userName, password,
>candle_table_5m))
>.name(candle_table_5m);
>
>Sliding Window:
>
>ds.keyBy(CandleView::getMarketCode)
>.timeWindow(Time.hours(24L), Time.seconds(2))
>.aggregate(new OhlcAggregateFunction(), new
>TickerWindowFunction())
>.addSink(new PgSink(jdbcUrl, userName, password,
>candle_table_24h))
>.name(candle_table_24h);
>
>一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。
>基于的是同一个dataStream
>
>有没有什么建议,或者哪个地方用错了? 谢谢
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: NoResourceAvailableException

2020-10-28 Thread Khachatryan Roman
Hi Alexander,

Thanks for sharing,

I see a lot of exceptions in the logs, particularly
*Caused by: java.net.BindException: Could not start actor system on any
port in port range 6123

which means that there's probably more than one instance running and is
likely the root cause.
So it makes sense to make sure that the previous attempts cleaned up.

Regards,
Roman


On Tue, Oct 20, 2020 at 12:08 AM Alexander Semeshchenko 
wrote:

> Hi Roman,
> I made the cluster: 1 master, 2 worker. All - 8 cpu, 32 g RAM . Red Hat
> Enterprise Linux Server release 7.9 (Maipo)
> vsmart-f01 - master
> vsmart-f02 - worker
> vsmart-f03 - worker
> tvsmart-f02askmanager.numberOfTaskSlots for each node is : 8
>
> Then:
> *[flink@vsmart-f01 flink-1.11.1]$ ./bin/start-cluster.sh *
> *Starting cluster.*
> *[INFO] 1 instance(s) of standalonesession are already running on
> vsmart-f01.*
> *Starting standalonesession daemon on host vsmart-f01.*
> *flink@10.92.194.19 's password: *
> *[INFO] 1 instance(s) of taskexecutor are already running on vsmart-f02.*
> *Starting taskexecutor daemon on host vsmart-f02.*
> *flink@10.92.194.20 's password: *
> *Starting taskexecutor daemon on host vsmart-f03.*
>
> The cluster start up, running WordCount from master:
> *./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount
>  ./examples/batch/WordCount.jar  --output file:/tmp/wordcount_out*
>
> After 5 min. the job was canceled.
> In the screenshot appeared that was never assigned taskmanager for the job
> operator.
> I've put the 3 logs(  from each node) here.
>
> Thanks and Best Regards.
> Alex
>
>
> On Mon, Oct 19, 2020 at 5:47 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> This message isn't actually a problem - netty can't find the native
>> transports and falls back to nio-based one.
>> Does increasing taskmanager.numberOfTaskSlots in flink-conf.yaml help?
>> Can you share the full logs in DEBUG mode?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Oct 19, 2020 at 6:14 PM Alexander Semeshchenko 
>> wrote:
>>
>>> thank you for your response.
>>>
>>> taskmanager has 1 slot , 1 slot free but WordCount job never change its
>>> status from "Created".
>>> After more less 5 min. job is canceled.
>>> I attached screenshot of taskmanager.
>>>
>>> Best Regards
>>> Alexander
>>>
>>> On Wed, Oct 14, 2020 at 6:13 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi,
 Thanks for sharing the details and sorry for the late reply.
 You can check the number of free slots in the task manager in the web
 UI (http://localhost:8081/#/task-manager by default).
 Before running the program, there should be 1 TM with 1 slot available
 which should be free (with default settings).

 If there are other jobs, you can increase slots per TM by increasing
 taskmanager.numberOfTaskSlots in flink-conf.yaml [1].

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-numberoftaskslots

 Regards,
 Roman


 On Wed, Oct 14, 2020 at 6:56 PM Alexander Semeshchenko <
 as77...@gmail.com> wrote:

> Hi, is there any news about my issue "Flink -
>  NoResourceAvailableException " post - installed WordCount job ?
> Best
>
> On Fri, Oct 9, 2020 at 10:19 AM Alexander Semeshchenko <
> as77...@gmail.com> wrote:
>
>> Yes, I made the following accions:
>> -   download Flink
>> -   ./bin/start-cluster.sh.
>> -   ./bin/flink run ./examples/streaming/WordCount.jar
>> 
>> Then, tried to increase values for > ulimit , VM memory values...
>> Below I put the logs messages.
>>
>> It's rare as I could do the  same job on: My Macbook( 8 cpu, 16g RAM
>> ), on k8s cluster - 4 cpu, 8g RAM
>>
>> Thanks
>>
>>
>>
>> On Fri, Oct 9, 2020 at 3:32 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> I assume that before submitting a job you started a cluster with
>>> default settings with ./bin/start-cluster.sh.
>>>
>>> Did you submit any other jobs?
>>> Can you share the logs from log folder?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Oct 7, 2020 at 11:03 PM Alexander Semeshchenko <
>>> as77...@gmail.com> wrote:
>>>

 

 Installing (download & tar zxf) Apache Flink 1.11.1 and running: 
 ./bin/flink
 run examples/streaming/WordCount.jar it show on the nice message
 after more less 5 min. the trying of submitting:  Caused by:
 org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
 Could not allocate the required slot within slot request timeout. 
 Please
 make sure that the cluster has enough resources. at
 

Re: Building Flink on VirtualBox VM failing

2020-10-28 Thread Khachatryan Roman
The values printed by the OOM killer seem indeed strange. But from the line
above the memory usage seems fine: rss=2440960.
Running the given command I see only one forked process.
Probably, this is an issue of OOM killer running in VM on Wwindows host.
Can you try with OOM killer disabled?

Regards,
Roman


On Fri, Oct 23, 2020 at 3:02 PM Juha Mynttinen 
wrote:

> I'm trying again running the tests, now I have four cores
> (previously five) and 12 GB RAM (previously 8 GB). I'm still hit by the OOM
> killer.
>
> The command I'm running is:
>
> mvn -Dflink.forkCount=1 -Dflink.forkCountTestPackage=1 clean verify
>
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 01:17 h
> [INFO] Finished at: 2020-10-23T15:36:50+03:00
> [INFO] Final Memory: 180M/614M
> [INFO]
> 
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test
> (integration-tests) on project flink-tests: There are test failures.
> [ERROR]
> [ERROR] Please refer to
> /home/juha/git/flink/flink-tests/target/surefire-reports for the individual
> test results.
> [ERROR] Please refer to dump files (if any exist) [date].dump,
> [date]-jvmRun[N].dump and [date].dumpstream.
> [ERROR] ExecutionException The forked VM terminated without properly
> saying goodbye. VM crash or System.exit called?
> [ERROR] Command was /bin/sh -c cd /home/juha/git/flink/flink-tests/target
> && /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xms2048m -Xmx2048m
> -Dmvn.forkNumber=1 -XX:+UseG1GC -jar
> /home/juha/git/flink/flink-tests/target/surefire/surefirebooter15842756015305201470.jar
> /home/juha/git/flink/flink-tests/target/surefire
> 2020-10-23T14-19-18_685-jvmRun1 surefire394592676817174474tmp
> surefire_117413817767116882164827tmp
> [ERROR] Error occurred in starting fork, check output in log
> [ERROR] Process Exit Code: 137
> [ERROR] Crashed tests:
> [ERROR]
> org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
> [ERROR] org.apache.maven.surefire.booter.SurefireBooterForkException:
> ExecutionException The forked VM terminated without properly saying
> goodbye. VM crash or System.exit called?
> [ERROR] Command was /bin/sh -c cd /home/juha/git/flink/flink-tests/target
> && /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xms2048m -Xmx2048m
> -Dmvn.forkNumber=1 -XX:+UseG1GC -jar
> /home/juha/git/flink/flink-tests/target/surefire/surefirebooter15842756015305201470.jar
> /home/juha/git/flink/flink-tests/target/surefire
> 2020-10-23T14-19-18_685-jvmRun1 surefire394592676817174474tmp
> surefire_117413817767116882164827tmp
> [ERROR] Error occurred in starting fork, check output in log
> [ERROR] Process Exit Code: 137
> [ERROR] Crashed tests:
> [ERROR]
> org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510)
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:457)
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:298)
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246)
> [ERROR] at
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> [ERROR] at
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> [ERROR] at
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> [ERROR] at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> [ERROR] at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> [ERROR] at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> [ERROR] at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> [ERROR] at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> [ERROR] at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> [ERROR] at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> [ERROR] at
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
> [ERROR] at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
> [ERROR] at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155)
> [ERROR] at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
> [ERROR] at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216)
> [ERROR] at org.apache.maven.cli.MavenCli.main(MavenCli.java:160)
> [ERROR] at
> 

Re: 官方后续会有支持kafka lag metric的计划吗

2020-10-28 Thread silence
hi zhisheng
我找到两篇相关的参考博客你看一下
https://blog.csdn.net/a1240466196/article/details/107853926
https://www.jianshu.com/p/c7515bdde1f7



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 官方后续会有支持kafka lag metric的计划吗

2020-10-28 Thread zhisheng
hi, silence

对于你提到的第一种方案,我觉得在 flink 里面是做不到的,因为 flink 只可以拿得到消费数据的 offset 信息,但是拿不到 kafka
里面该 topic 具体分区最新的 offset 值,那么也就无法相减得到每个分区的 lag,从而无法获取整个 topic 的 lag。

对于第二种方案,我觉得是可行的,可以在自己作业里面埋点(当前系统时间与消费到的数据的事件时间的差值),然后每个并行度分别上报,最后监控页面可以看到作业分区延迟最大是多长时间。

Best!
zhisheng

silence  于2020年10月28日周三 下午7:55写道:

> 目前消费kafka会有lag的情况发生,因此想基于flink metric进行上报监控kakfa的消费延时情况
> 主要是两种情况:
> 1、当前group消费的offset和对应topic最大offset之间的差值,也就是积压的数据量
> 2、当前消费的最新记录的timestamp和系统时间之间的差值,也就是消费的时间延时
>
> kafka lag的监控对实时任务的稳定运行有着非常重要的作用,
> 网上也检索到了一些基于源码修改的实现,但修改源码的话也不利于后面flink版本的升级,还是希望官方可以考虑支持一下
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于并行下watermark不生成

2020-10-28 Thread zhisheng
hi,Benchen

可以考虑在 source 算子后面加一个 rebalance()

Best!
zhisheng

Shubin Ruan  于2020年10月28日周三 下午7:36写道:

> 可以考虑在数据源处进行处理:
>
>
> 设置个时间阈值,若检测到某个 key 下的数据超过时间阈值还未更新,则根据系统的 processing time 按照某种逻辑生成1条水印发送到下游。
> 在 2020-10-28 18:54:22,"BenChen"  写道:
> >Hi
> all,在Flink1.11里面新增了WatermarkStetagy来处理某个并行度下没有数据导致watermark不触发的问题,在1.10里面Flink有什么机制解决这个问题吗?谢谢
> >
> >
> >| |
> >BenChen
> >|
> >|
> >haibin...@163.com
> >|
> >签名由网易邮箱大师定制
> >
>


Re: Flink是否可以动态调整任务并行度

2020-10-28 Thread zhisheng
应该不支持

ZT.Ren <18668118...@163.com> 于2020年10月28日周三 下午3:53写道:

> 印象中,Flink1.9之后的某个版本支持动态调整并行度,但忘记怎么使用了。有没有哪位同学帮忙指点下,多谢


官方后续会有支持kafka lag metric的计划吗

2020-10-28 Thread silence
目前消费kafka会有lag的情况发生,因此想基于flink metric进行上报监控kakfa的消费延时情况
主要是两种情况:
1、当前group消费的offset和对应topic最大offset之间的差值,也就是积压的数据量
2、当前消费的最新记录的timestamp和系统时间之间的差值,也就是消费的时间延时

kafka lag的监控对实时任务的稳定运行有着非常重要的作用,
网上也检索到了一些基于源码修改的实现,但修改源码的话也不利于后面flink版本的升级,还是希望官方可以考虑支持一下



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:通过算子的构造方法传递变量失效

2020-10-28 Thread Shubin Ruan
hi,可以把部分代码贴出来看一下吗?







在 2020-10-28 17:29:58,"freeza1...@outlook.com"  写道:
>hi all:
>我定义了1个flatMap,通过构造方法传递了1个int类型的变量, 我在最外层定义了2条流,流定义的时候.flatMap(int)传入了这个变量, 
>  目前有2个不同的flatmap,构造方法传入的这个int变量为2个不同的值,
>当有数据流过这2个算子的时候,发现该int变量并没有发生变化,请如何给算子传递变量。
>
>
>
>freeza1...@outlook.com


Re:关于并行下watermark不生成

2020-10-28 Thread Shubin Ruan
可以考虑在数据源处进行处理:


设置个时间阈值,若检测到某个 key 下的数据超过时间阈值还未更新,则根据系统的 processing time 按照某种逻辑生成1条水印发送到下游。
在 2020-10-28 18:54:22,"BenChen"  写道:
>Hi 
>all,在Flink1.11里面新增了WatermarkStetagy来处理某个并行度下没有数据导致watermark不触发的问题,在1.10里面Flink有什么机制解决这个问题吗?谢谢
>
>
>| |
>BenChen
>|
>|
>haibin...@163.com
>|
>签名由网易邮箱大师定制
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
Then we can't remove it, because there is no way for us to ascertain 
whether anyone is still using it.


Sure, the user ML is the best we got, but you can't argue that we don't 
want any users to be affected and then use an imperfect mean to find users.
If you are fine with relying on the user ML, then you _are_ fine with 
removing it at the cost of friction for some users.


To be clear, I, personally, don't have a problem with removing it (we 
have removed other connectors in the past that did not have a migration 
plan), I just reject he argumentation.


On 10/28/2020 12:21 PM, Kostas Kloudas wrote:

No, I do not think that "we are fine with removing it at the cost of
friction for some users".

I believe that this can be another discussion that we should have as
soon as we establish that someone is actually using it. The point I am
trying to make is that if no user is using it, we should remove it and
not leave unmaintained code around.

On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler  wrote:

The alternative could also be to use a different argument than "no one
uses it", e.g., we are fine with removing it at the cost of friction for
some users because there are better alternatives.

On 10/28/2020 10:46 AM, Kostas Kloudas wrote:

I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:

If the conclusion is that we shouldn't remove it if _anyone_ is using
it, then we cannot remove it because the user ML obviously does not
reach all users.

On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that 

Re:退订

2020-10-28 Thread Shubin Ruan
发送邮件到 user-zh-unsubscr...@flink.apache.org 即可完成退订。














在 2020-10-28 19:20:27,"李国鹏"  写道:
>退订


Re:退订

2020-10-28 Thread hailongwang
Hi,
退订需要发送邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1] [1] 
https://flink.apache.org/community.html#mailing-lists


Best,
Hailong Wang
在 2020-10-28 18:20:27,"李国鹏"  写道:
>退订


退订

2020-10-28 Thread 李国鹏
退订

Re: RestClusterClient and classpath

2020-10-28 Thread Flavio Pompermaier
I'm runnin the code from Eclipse, the jar exists and it contains the
classes Flink is not finding..maybe I can try to use IntelliJ in the
afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler 
wrote:

> @Kostas: Ah, I missed that.
>
> @Flavio: the only alternative I can think your jar does not contain the
> classes, or does not exist at all on the machine your application is run
> on.
>
> On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I will have a look in the whole stack trace in a bit.
> >
> > @Chesnay Schepler I think that we are setting the correct classloader
> > during jobgraph creation [1]. Is that what you mean?
> >
> > Cheers,
> > Kostas
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
> >
> > On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> >  wrote:
> >> Always the same problem.
> >>
> >> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> >> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> >> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> >> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> >> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> >> ... 10 more
> >>
> >> I've also tried with
> >>
> >>  flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
> "parent-first");
> >>
> >> but nothing changes.
> >>
> >> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler 
> wrote:
> >>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some
> things outside the usercode classlodaer (getPipelineFromProgram()),
> specifically the call to the main method.
> >>>
> >>> @klou This seems like wrong behavior?
> >>>
> >>> @Flavio What you could try in the meantime is wrap the call to
> createJobGraph like this:
> >>>
> >>> final ClassLoader contextClassLoader =
> Thread.currentThread().getContextClassLoader();
> >>> try {
> >>>
>  
> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
> >>> // do tstuff
> >>> } finally {
> >>> Thread.currentThread().setContextClassLoader(contextClassLoader);
> >>> }
> >>>
> >>>
> >>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
> >>>
> >>> Any help here?  How can I understand why the classes inside the jar
> are not found when creating the PackagedProgram?
> >>>
> >>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>  In the logs I see that the jar is the classpath (I'm trying to debug
> the program from the IDE)..isn'it?
> 
>  Classpath: [file:/tmp/job-bundle.jar]
>  ...
> 
>  Best,
>  Flavio
> 
>  On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler 
> wrote:
> > * your JobExecutor is _not_ putting it on the classpath.
> >
> > On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
> >
> > Well it happens on the client before you even hit the
> RestClusterClient, so I assume that either your jar is not packaged
> correctly or you your JobExecutor is putting it on the classpath.
> >
> > On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
> >
> > Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
> class I'm trying to use as a client towards the Flink cluster - session
> mode).
> > it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
> >
> > The code of getBatchEnv is:
> >
> > @Deprecated
> >public static BatchEnv getBatchEnv() {
> >  // TODO use the following when ready to convert from/to
> datastream
> >  // return
> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
> >  ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >  BatchTableEnvironment ret = BatchTableEnvironment.create(env);
> >  customizeEnv(ret);
> >  return new BatchEnv(env, ret);
> >}
> >
> >private static void customizeEnv(TableEnvironment ret) {
> >  final Configuration conf = ret.getConfig().getConfiguration();
> >  //
> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
> 2);
> >  conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
> >  conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
> FLINK_TEST_TMP_DIR);
> >  // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
> 4); //NOSONAR
> >  //
> conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
> 0.4f);//NOSONAR
> >  // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
> 32768 * 2);//NOSONAR
> >  // 

Checkpoint size的问题

2020-10-28 Thread gsralex
Hi, All
Checkpoint 一般Web UI显示的是400MB左右,但是查看HDFS实际的大小,不到1MB(_metadata) 
,想问下这之间size的偏差为什么这么大?

Re: RestClusterClient and classpath

2020-10-28 Thread Chesnay Schepler

@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the 
classes, or does not exist at all on the machine your application is run on.


On 10/28/2020 12:08 PM, Kostas Kloudas wrote:

Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
 wrote:

Always the same problem.

Caused by: java.lang.ClassNotFoundException: it.test.XXX
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more

I've also tried with

 flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

but nothing changes.

On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler  wrote:

hmm..it appears as if PackagedProgramUtils#createJobGraph does some things 
outside the usercode classlodaer (getPipelineFromProgram()), specifically the 
call to the main method.

@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to createJobGraph 
like this:

final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
try {

Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
// do tstuff
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}


On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:

Any help here?  How can I understand why the classes inside the jar are not 
found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier  
wrote:

In the logs I see that the jar is the classpath (I'm trying to debug the 
program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler  wrote:

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

Well it happens on the client before you even hit the RestClusterClient, so I 
assume that either your jar is not packaged correctly or you your JobExecutor 
is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm 
trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
   public static BatchEnv getBatchEnv() {
 // TODO use the following when ready to convert from/to datastream
 // return 
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment ret = BatchTableEnvironment.create(env);
 customizeEnv(ret);
 return new BatchEnv(env, ret);
   }

   private static void customizeEnv(TableEnvironment ret) {
 final Configuration conf = ret.getConfig().getConfiguration();
 // 
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
2);
 conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
 conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
 // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
 // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
0.4f);//NOSONAR
 // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 
2);//NOSONAR
 // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 
2);// NOSONAR
 conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
 conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
 conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
 final List kryoSerializers = new ArrayList<>();
 kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
JodaDateTimeSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
TBaseSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
TBaseSerializer.class));
 conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, 

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
The alternative could also be to use a different argument than "no one 
uses it", e.g., we are fine with removing it at the cost of friction for 
some users because there are better alternatives.


On 10/28/2020 10:46 AM, Kostas Kloudas wrote:

I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:

If the conclusion is that we shouldn't remove it if _anyone_ is using
it, then we cannot remove it because the user ML obviously does not
reach all users.

On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we should close FLIP-46 now with an 

Re: RestClusterClient and classpath

2020-10-28 Thread Kostas Kloudas
Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
 wrote:
>
> Always the same problem.
>
> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> ... 10 more
>
> I've also tried with
>
> flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>
> but nothing changes.
>
> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler  wrote:
>>
>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things 
>> outside the usercode classlodaer (getPipelineFromProgram()), specifically 
>> the call to the main method.
>>
>> @klou This seems like wrong behavior?
>>
>> @Flavio What you could try in the meantime is wrap the call to 
>> createJobGraph like this:
>>
>> final ClassLoader contextClassLoader = 
>> Thread.currentThread().getContextClassLoader();
>> try {
>>
>> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>// do tstuff
>> } finally {
>>Thread.currentThread().setContextClassLoader(contextClassLoader);
>> }
>>
>>
>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>
>> Any help here?  How can I understand why the classes inside the jar are not 
>> found when creating the PackagedProgram?
>>
>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier  
>> wrote:
>>>
>>> In the logs I see that the jar is the classpath (I'm trying to debug the 
>>> program from the IDE)..isn'it?
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>> ...
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler  
>>> wrote:

 * your JobExecutor is _not_ putting it on the classpath.

 On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

 Well it happens on the client before you even hit the RestClusterClient, 
 so I assume that either your jar is not packaged correctly or you your 
 JobExecutor is putting it on the classpath.

 On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

 Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class 
 I'm trying to use as a client towards the Flink cluster - session mode).
 it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

 The code of getBatchEnv is:

 @Deprecated
   public static BatchEnv getBatchEnv() {
 // TODO use the following when ready to convert from/to datastream
 // return 
 getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
 ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment ret = BatchTableEnvironment.create(env);
 customizeEnv(ret);
 return new BatchEnv(env, ret);
   }

   private static void customizeEnv(TableEnvironment ret) {
 final Configuration conf = ret.getConfig().getConfiguration();
 // 
 conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
  2);
 conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
 conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
 FLINK_TEST_TMP_DIR);
 // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
 //NOSONAR
 // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
 0.4f);//NOSONAR
 // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 
 2);//NOSONAR
 // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 
 2);// NOSONAR
 conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// 
 NOSONAR
 conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
 conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
 NOSONAR
 final List kryoSerializers = new ArrayList<>();
 kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
 JodaDateTimeSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
 

Re: adding core-site xml to flink1.11

2020-10-28 Thread Shachar Carmeli
10x

On 2020/10/27 10:42:40, Robert Metzger  wrote: 
> Hi,
> 
> it seems that this is what you have to do for now. However, I see that it
> would be nice if Flink would allow reading from multiple configuration
> files, so that you can have a "common configuration" and a "per cluster"
> configuration.
> 
> I filed a JIRA ticket for a feature request:
> https://issues.apache.org/jira/browse/FLINK-19828
> 
> 
> On Tue, Oct 27, 2020 at 10:54 AM Shachar Carmeli 
> wrote:
> 
> > Hi,
> > Thank you for your reply,
> > WE are deploying on kubernetes and the xml is part of the  common config
> > map to all flink jobs we have(or at least was for previous versions)
> >
> > This means that we need to duplicate the configuration in the
> > flink-conf.yaml for each job
> > instead of having a common configmap
> >
> > Thanks,
> > Shachar
> >
> > On 2020/10/27 08:48:17, Robert Metzger  wrote:
> > > Hi Shachar,
> > >
> > > Why do you want to use the core-site.xml to configure the file system?
> > >
> > > Since we are adding the file systems as plugins, their initialization is
> > > customized. It might be the case that we are intentionally ignoring xml
> > > configurations from the classpath.
> > > You can configure the filesystem in the flink-conf.yaml file.
> > >
> > >
> > > On Sun, Oct 25, 2020 at 7:56 AM Shachar Carmeli 
> > > wrote:
> > >
> > > > Hi,
> > > > I'm trying to define filesystem to flink 1.11 using core-site.xml
> > > > I tried adding in the flink-conf.yaml env.hadoop.conf.dir and I see it
> > is
> > > > added to the classpath
> > > > also adding environment variable HADOOP_CONF_DIR didn't help
> > > >
> > > > The flink 1.11.2 is running on docker using kubernetes
> > > >
> > > > I added hadoop using plugin as mentioned in
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
> > > >
> > > > when configure the parameters manually I can connect to the local s3a
> > > > server
> > > > So it looks like the flink is not reading the core-site.xml file
> > > >
> > > > please advise
> > > >
> > > > Thanks,
> > > > Shachar
> > > >
> > >
> >
> 


关于并行下watermark不生成

2020-10-28 Thread BenChen
Hi 
all,在Flink1.11里面新增了WatermarkStetagy来处理某个并行度下没有数据导致watermark不触发的问题,在1.10里面Flink有什么机制解决这个问题吗?谢谢


| |
BenChen
|
|
haibin...@163.com
|
签名由网易邮箱大师定制



Fwd: Kubernetes Job Cluster, does it autoterminate?

2020-10-28 Thread Ruben Laguna
Hi,

First time user , I'm just evaluating Flink at the moment, and I was
reading 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#deploy-job-cluster
and I don't fully understand if a Job Cluster will autoterminate after
the job is completed (for at batch job) ?

The examples look to me like  like the task manager pods will continue
running as it's configured as Deployment.

So is there any way to achieve "autotermination" or am I supposed to
monitor the job status externally (like from airflow) and delete the
JobManager and TaskManager kubernetes resources from there?

--
/Rubén Laguna


Re: RestClusterClient and classpath

2020-10-28 Thread Flavio Pompermaier
Always the same problem.

Caused by: java.lang.ClassNotFoundException: it.test.XXX
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more

I've also tried with

flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

but nothing changes.

On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler 
wrote:

> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things
> outside the usercode classlodaer (getPipelineFromProgram()), specifically
> the call to the main method.
>
> @klou This seems like wrong behavior?
>
> @Flavio What you could try in the meantime is wrap the call to
> createJobGraph like this:
>
> final ClassLoader contextClassLoader = 
> Thread.currentThread().getContextClassLoader();try {
>
> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>// do tstuff} finally {
>Thread.currentThread().setContextClassLoader(contextClassLoader);}
>
>
> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>
> Any help here?  How can I understand why the classes inside the jar are
> not found when creating the PackagedProgram?
>
> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier 
> wrote:
>
>> In the logs I see that the jar is the classpath (I'm trying to debug the
>> program from the IDE)..isn'it?
>>
>> Classpath: [file:/tmp/job-bundle.jar]
>> ...
>>
>> Best,
>> Flavio
>>
>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler 
>> wrote:
>>
>>> * your JobExecutor is _not_ putting it on the classpath.
>>>
>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>
>>> Well it happens on the client before you even hit the RestClusterClient,
>>> so I assume that either your jar is not packaged correctly or you your
>>> JobExecutor is putting it on the classpath.
>>>
>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>
>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
>>> class I'm trying to use as a client towards the Flink cluster - session
>>> mode).
>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>
>>> The code of getBatchEnv is:
>>>
>>> @Deprecated
>>>   public static BatchEnv getBatchEnv() {
>>> // TODO use the following when ready to convert from/to datastream
>>> // return
>>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>> ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>> customizeEnv(ret);
>>> return new BatchEnv(env, ret);
>>>   }
>>>
>>>   private static void customizeEnv(TableEnvironment ret) {
>>> final Configuration conf = ret.getConfig().getConfiguration();
>>> //
>>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>>> 2);
>>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
>>> FLINK_TEST_TMP_DIR);
>>> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>> //NOSONAR
>>> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>> 0.4f);//NOSONAR
>>> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768
>>> * 2);//NOSONAR
>>> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768
>>> * 2);// NOSONAR
>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);//
>>> NOSONAR
>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
>>> NOSONAR
>>> final List kryoSerializers = new ArrayList<>();
>>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
>>> JodaDateTimeSerializer.class));
>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
>>> TBaseSerializer.class));
>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
>>> TBaseSerializer.class));
>>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>
>>>   }
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>>
>>> System.out: (none)
>>>
>>> System.err: (none)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>> at
>>> 

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-28 Thread Ori Popowski
Hi Xintong,

See here:

# Top memory users
ps auxwww --sort -rss | head -10
USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
/etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
/etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
/etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
/etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
/usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
/usr/lib/systemd/systemd-journald
root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
/usr/bin/amazon-ssm-agent
root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps auxwww
--sort -rss
root  6532  0.0  0.0 183124  3592 ?S00:30   0:00
/usr/sbin/CROND -n

On Wed, Oct 28, 2020 at 11:34 AM Xintong Song  wrote:

> Hi Ori,
>
> The error message suggests that there's not enough physical memory on the
> machine to satisfy the allocation. This does not necessarily mean a managed
> memory leak. Managed memory leak is only one of the possibilities. There
> are other potential reasons, e.g., another process/container on the machine
> used more memory than expected, Yarn NM is not configured with enough
> memory reserved for the system processes, etc.
>
> I would suggest to first look into the machine memory usages, see whether
> the Flink process indeed uses more memory than expected. This could be
> achieved via:
> - Run the `top` command
> - Look into the `/proc/meminfo` file
> - Any container memory usage metrics that are available to your Yarn
> cluster
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski  wrote:
>
>> After the job is running for 10 days in production, TaskManagers start
>> failing with:
>>
>> Connection unexpectedly closed by remote task manager
>>
>> Looking in the machine logs, I can see the following error:
>>
>> = Java processes for user hadoop =
>> OpenJDK 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
>> allocate memory' (err
>> #
>> # There is insufficient memory for the Java Runtime Environment to
>> continue.
>> # Native memory allocation (mmap) failed to map 1006567424 bytes for
>> committing reserved memory.
>> # An error report file with more information is saved as:
>> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
>> === End java processes for user hadoop ===
>>
>> In addition, the metrics for the TaskManager show very low Heap memory
>> consumption (20% of Xmx).
>>
>> Hence, I suspect there is a memory leak in the TaskManager's Managed
>> Memory.
>>
>> This my TaskManager's memory detail:
>> flink process 112g
>> framework.heap.size 0.2g
>> task.heap.size 50g
>> managed.size 54g
>> framework.off-heap.size 0.5g
>> task.off-heap.size 1g
>> network 2g
>> XX:MaxMetaspaceSize 1g
>>
>> As you can see, the managed memory is 54g, so it's already high (my
>> managed.fraction is set to 0.5).
>>
>> I'm running Flink 1.10. Full job details attached.
>>
>> Can someone advise what would cause a managed memory leak?
>>
>>
>>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:
>
> If the conclusion is that we shouldn't remove it if _anyone_ is using
> it, then we cannot remove it because the user ML obviously does not
> reach all users.
>
> On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I am bringing the up again to see if there are any users actively
> > using the BucketingSink.
> > So far, if I am not mistaken (and really sorry if I forgot anything),
> > it is only a discussion between devs about the potential problems of
> > removing it. I totally understand Chesnay's concern about not
> > providing compatibility with the StreamingFileSink (SFS) and if there
> > are any users, then we should not remove it without trying to find a
> > solution for them.
> >
> > But if there are no users then I would still propose to remove the
> > module, given that I am not aware of any efforts to provide
> > compatibility with the SFS any time soon.
> > The reasons for removing it also include the facts that we do not
> > actively maintain it and we do not add new features. As for potential
> > missing features in the SFS compared to the BucketingSink that was
> > mentioned before, I am not aware of any fundamental limitations and
> > even if there are, I would assume that the solution is not to direct
> > the users to a deprecated sink but rather try to increase the
> > functionality of the actively maintained one.
> >
> > Please keep in mind that the BucketingSink is deprecated since FLINK
> > 1.9 and there is a new File Sink that is coming as part of FLIP-143
> > [1].
> > Again, if there are any active users who cannot migrate easily, then
> > we cannot remove it before trying to provide a smooth migration path.
> >
> > Thanks,
> > Kostas
> >
> > [1] 
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> >
> > On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:
> >> @Seth: Earlier in this discussion it was said that the BucketingSink
> >> would not be usable in 1.12 .
> >>
> >> On 10/16/2020 4:25 PM, Seth Wiesman wrote:
> >>> +1 It has been deprecated for some time and the StreamingFileSink has
> >>> stabalized with a large number of formats and features.
> >>>
> >>> Plus, the bucketing sink only implements a small number of stable
> >>> interfaces[1]. I would expect users to continue to use the bucketing sink
> >>> from the 1.11 release with future versions for some time.
> >>>
> >>> Seth
> >>>
> >>> https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172
> >>>
> >>> On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:
> >>>
>  @Arvid Heise I also do not remember exactly what were all the
>  problems. The fact that we added some more bulk formats to the
>  streaming file sink definitely reduced the non-supported features. In
>  addition, the latest discussion I found on the topic was [1] and the
>  conclusion of that discussion seems to be to remove it.
> 
>  Currently, I cannot find any obvious reason why keeping the
>  BucketingSink, apart from the fact that we do not have a migration
>  plan unfortunately. This is why I posted this to dev@ and user@.
> 
>  Cheers,
>  Kostas
> 
>  [1]
>  https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
> 
>  On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> > I remember this conversation popping up a few times already and I'm in
> > general a big fan of removing BucketingSink.
> >
> > However, until now there were a few features lacking in 
> > StreamingFileSink
> > that are present in BucketingSink and that are being actively used (I
>  can't
> > exactly remember them now, but I can look it up if everyone else is also
> > suffering from bad memory). Did we manage to add them in the meantime? 
> > If
> > not, then it feels rushed to remove it at this point.
> >
> > On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
>  wrote:
> >> @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> >> to migrate from the BucketingSink to the StreamingFileSink. It may be
> >> possible but it will require some effort because the logic would be
> >> "read the old state, commit it, and start fresh with the
> >> StreamingFileSink."
> >>
> >> On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> >> wrote:
> >>> On 13.10.20 14:01, David Anderson wrote:
>  

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-28 Thread Xintong Song
Hi Ori,

The error message suggests that there's not enough physical memory on the
machine to satisfy the allocation. This does not necessarily mean a managed
memory leak. Managed memory leak is only one of the possibilities. There
are other potential reasons, e.g., another process/container on the machine
used more memory than expected, Yarn NM is not configured with enough
memory reserved for the system processes, etc.

I would suggest to first look into the machine memory usages, see whether
the Flink process indeed uses more memory than expected. This could be
achieved via:
- Run the `top` command
- Look into the `/proc/meminfo` file
- Any container memory usage metrics that are available to your Yarn cluster

Thank you~

Xintong Song



On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski  wrote:

> After the job is running for 10 days in production, TaskManagers start
> failing with:
>
> Connection unexpectedly closed by remote task manager
>
> Looking in the machine logs, I can see the following error:
>
> = Java processes for user hadoop =
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
> allocate memory' (err
> #
> # There is insufficient memory for the Java Runtime Environment to
> continue.
> # Native memory allocation (mmap) failed to map 1006567424 bytes for
> committing reserved memory.
> # An error report file with more information is saved as:
> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
> === End java processes for user hadoop ===
>
> In addition, the metrics for the TaskManager show very low Heap memory
> consumption (20% of Xmx).
>
> Hence, I suspect there is a memory leak in the TaskManager's Managed
> Memory.
>
> This my TaskManager's memory detail:
> flink process 112g
> framework.heap.size 0.2g
> task.heap.size 50g
> managed.size 54g
> framework.off-heap.size 0.5g
> task.off-heap.size 1g
> network 2g
> XX:MaxMetaspaceSize 1g
>
> As you can see, the managed memory is 54g, so it's already high (my
> managed.fraction is set to 0.5).
>
> I'm running Flink 1.10. Full job details attached.
>
> Can someone advise what would cause a managed memory leak?
>
>
>


Re: RestClusterClient and classpath

2020-10-28 Thread Chesnay Schepler
hmm..it appears as if PackagedProgramUtils#createJobGraph does some 
things outside the usercode classlodaer (getPipelineFromProgram()), 
specifically the call to the main method.


@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to 
createJobGraph like this:


final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader(); try {
   
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
 // do tstuff}finally {
   Thread.currentThread().setContextClassLoader(contextClassLoader); }


On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
Any help here?  How can I understand why the classes inside the jar 
are not found when creating the PackagedProgram?


On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


In the logs I see that the jar is the classpath (I'm trying to
debug the program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

Well it happens on the client before you even hit the
RestClusterClient, so I assume that either your jar is not
packaged correctly or you your JobExecutor is putting it on
the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

Sure. Here it is (org.apache.flink.client.cli.JobExecutor is
my main class I'm trying to use as a client towards the
Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to
datastream
    // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf =
ret.getConfig().getConfiguration();
    //

conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
    //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
    //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
    //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
    //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
    final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at

org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at

org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at

org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at

org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at

it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at


tumbling的窗口更新随着job运行时间越长,delay越久,sliding不会

2020-10-28 Thread marble.zh...@coinflex.com.INVALID
大家好。

我用的tumbling window,
ds.keyBy(CandleView::getMarketCode)
.timeWindow(Time.minutes(5L))
   
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.aggregate(new OhlcAggregateFunction(), new
OhlcWindowFunction())
.addSink(new PgSink(jdbcUrl, userName, password,
candle_table_5m))
.name(candle_table_5m);

Sliding Window:

ds.keyBy(CandleView::getMarketCode)
.timeWindow(Time.hours(24L), Time.seconds(2))
.aggregate(new OhlcAggregateFunction(), new
TickerWindowFunction())
.addSink(new PgSink(jdbcUrl, userName, password,
candle_table_24h))
.name(candle_table_24h);

一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。
基于的是同一个dataStream

有没有什么建议,或者哪个地方用错了? 谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


通过算子的构造方法传递变量失效

2020-10-28 Thread freeza1...@outlook.com
hi all:
我定义了1个flatMap,通过构造方法传递了1个int类型的变量, 我在最外层定义了2条流,流定义的时候.flatMap(int)传入了这个变量,  
目前有2个不同的flatmap,构造方法传入的这个int变量为2个不同的值,
当有数据流过这2个算子的时候,发现该int变量并没有发生变化,请如何给算子传递变量。



freeza1...@outlook.com


Re: RestClusterClient and classpath

2020-10-28 Thread Flavio Pompermaier
Any help here?  How can I understand why the classes inside the jar are not
found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier 
wrote:

> In the logs I see that the jar is the classpath (I'm trying to debug the
> program from the IDE)..isn'it?
>
> Classpath: [file:/tmp/job-bundle.jar]
> ...
>
> Best,
> Flavio
>
> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler 
> wrote:
>
>> * your JobExecutor is _not_ putting it on the classpath.
>>
>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>
>> Well it happens on the client before you even hit the RestClusterClient,
>> so I assume that either your jar is not packaged correctly or you your
>> JobExecutor is putting it on the classpath.
>>
>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>
>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
>> class I'm trying to use as a client towards the Flink cluster - session
>> mode).
>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>
>> The code of getBatchEnv is:
>>
>> @Deprecated
>>   public static BatchEnv getBatchEnv() {
>> // TODO use the following when ready to convert from/to datastream
>> // return
>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>> customizeEnv(ret);
>> return new BatchEnv(env, ret);
>>   }
>>
>>   private static void customizeEnv(TableEnvironment ret) {
>> final Configuration conf = ret.getConfig().getConfiguration();
>> //
>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>> 2);
>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
>> FLINK_TEST_TMP_DIR);
>> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>> //NOSONAR
>> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>> 0.4f);//NOSONAR
>> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768
>> * 2);//NOSONAR
>> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 *
>> 2);// NOSONAR
>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);//
>> NOSONAR
>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
>> NOSONAR
>> final List kryoSerializers = new ArrayList<>();
>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
>> JodaDateTimeSerializer.class));
>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
>> TBaseSerializer.class));
>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
>> TBaseSerializer.class));
>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>
>>   }
>>
>> Classpath: [file:/tmp/job-bundle.jar]
>>
>> System.out: (none)
>>
>> System.err: (none)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>> at
>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>> at
>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>> at
>> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>> ... 3 more
>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
>> 

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
If the conclusion is that we shouldn't remove it if _anyone_ is using 
it, then we cannot remove it because the user ML obviously does not 
reach all users.


On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we should close FLIP-46 now with an explanatory
message to avoid confusion.

--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng






Re: flink1.11日志上报

2020-10-28 Thread m13162790856
我们这边也是这样搜集日志上报  es 保留最近一个月的数据不回保留全部数据


在 2020年10月27日 20:48,zhisheng 写道:


弱弱的问一下,你们集群作业数量大概多少?因为用户可能打印原始数据在日志里面,这个数据量确实还是很大的,全部将日志打到 ES 每月需要多少成本啊? 
Storm☀️  于2020年10月27日周二 下午8:37写道: > 
我们也是用的kafkaappender进行日志上报,然后在ES中提供日志检索 > > > > -- > Sent from: 
http://apache-flink.147419.n8.nabble.com/ >

the remote task manager was lost

2020-10-28 Thread guanxianchun
flink版本: flink-1.11
taskmanager memory: 8G
jobmanager memory: 2G
akka.ask.timeout:20s
akka.retry-gate-closed-for: 5000
client.timeout:600s

运行一段时间后报the remote task manager was lost ,错误信息如下:
2020-10-28 00:25:30,608 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
checkpoint 411 for job 031e5f122711786fcc11ee6eb47291fa (2703770 bytes in
336 ms).
2020-10-28 00:27:30,273 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering
checkpoint 412 (type=CHECKPOINT) @ 1603816050239 for job
031e5f122711786fcc11ee6eb47291fa.
2020-10-28 00:27:30,776 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
checkpoint 412 for job 031e5f122711786fcc11ee6eb47291fa (3466688 bytes in
509 ms).
2020-10-28 00:29:30,246 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering
checkpoint 413 (type=CHECKPOINT) @ 1603816170239 for job
031e5f122711786fcc11ee6eb47291fa.
2020-10-28 00:29:30,597 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
checkpoint 413 for job 031e5f122711786fcc11ee6eb47291fa (2752681 bytes in
334 ms).
2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
  
[] - Association with remote system
[akka.tcp://fl...@hadoop01.dev.test.cn:13912] has failed, address is now
gated for [5000] ms. Reason: [Disassociated] 
2020-10-28 00:29:47,353 WARN  akka.remote.ReliableDeliverySupervisor
  
[] - Association with remote system
[akka.tcp://flink-metr...@hadoop01.dev.test.cn:31260] has failed, address is
now gated for [5000] ms. Reason: [Disassociated] 
2020-10-28 00:29:47,377 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
KeyedProcess -> async wait operator -> Map (1/3)
(f84731e57528b326ad15ddc17821d1b8) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@538198b8.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager
'hadoop01.dev.test.cn/192.168.1.21:7527'. This might indicate that the
remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at

Re: [BULK]Re: [SURVEY] Remove Mesos support

2020-10-28 Thread Till Rohrmann
Hi Oleksandr,

yes you are right. The biggest problem is at the moment the lack of test
coverage and thereby confidence to make changes. We have some e2e tests
which you can find here [1]. These tests are, however, quite coarse grained
and are missing a lot of cases. One idea would be to add a Mesos e2e test
based on Flink's end-to-end test framework [2]. I think what needs to be
done there is to add a Mesos resource and a way to submit jobs to a Mesos
cluster to write e2e tests.

[1] https://github.com/apache/flink/tree/master/flink-jepsen
[2]
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common

Cheers,
Till

On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi 
wrote:

> Hello Xintong,
>
> Thanks for the insights and support.
>
> Browsing the Mesos backlog and didn't identify anything critical, which is
> left there.
>
> I see that there are were quite a lot of contributions to the Flink Mesos
> in the recent version:
> https://github.com/apache/flink/commits/master/flink-mesos.
> We plan to validate the current Flink master (or release 1.12 branch) our
> Mesos setup. In case of any issues, we will try to propose changes.
> My feeling is that our test results shouldn't affect the Flink 1.12
> release cycle. And if any potential commits will land into the 1.12.1 it
> should be totally fine.
>
> In the future, we would be glad to help you guys with any
> maintenance-related questions. One of the highest priorities around this
> component seems to be the development of the full e2e test.
>
> Kind Regards
> Oleksandr Nitavskyi
> 
> From: Xintong Song 
> Sent: Tuesday, October 27, 2020 7:14 AM
> To: dev ; user 
> Cc: Piyush Narang 
> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>
> Hi Piyush,
>
> Thanks a lot for sharing the information. It would be a great relief that
> you are good with Flink on Mesos as is.
>
> As for the jira issues, I believe the most essential ones should have
> already been resolved. You may find some remaining open issues here [1],
> but not all of them are necessary if we decide to keep Flink on Mesos as is.
>
> At the moment and in the short future, I think helps are mostly needed on
> testing the upcoming release 1.12 with Mesos use cases. The community is
> currently actively preparing the new release, and hopefully we could come
> up with a release candidate early next month. It would be greatly
> appreciated if you fork as experienced Flink on Mesos users can help with
> verifying the release candidates.
>
>
> Thank you~
>
> Xintong Song
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open
> <
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D=0
> >
>
> On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang  p.nar...@criteo.com>> wrote:
>
> Hi Xintong,
>
>
>
> Do you have any jiras that cover any of the items on 1 or 2? I can reach
> out to folks internally and see if I can get some folks to commit to
> helping out.
>
>
>
> To cover the other qs:
>
>   *   Yes, we’ve not got a plan at the moment to get off Mesos. We use
> Yarn for some our Flink workloads when we can. Mesos is only used when we
> need streaming capabilities in our WW dcs (as our Yarn is centralized in
> one DC)
>   *   We’re currently on Flink 1.9 (old planner). We have a plan to bump
> to 1.11 / 1.12 this quarter.
>   *   We typically upgrade once every 6 months to a year (not every
> release). We’d like to speed up the cadence but we’re not there yet.
>   *   We’d largely be good with keeping Flink on Mesos as-is and
> functional while missing out on some of the newer features. We understand
> the pain on the communities side and we can take on the work if we see some
> fancy improvement in Flink on Yarn / K8s that we want in Mesos to put in
> the request to port it over.
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
> From: Xintong Song mailto:tonysong...@gmail.com>>
> Date: Sunday, October 25, 2020 at 10:57 PM
> To: dev mailto:d...@flink.apache.org>>, user <
> user@flink.apache.org>
> Cc: Lasse Nedergaard  lassenedergaardfl...@gmail.com>>,  p.nar...@criteo.com>>
> Subject: Re: [SURVEY] Remove Mesos support
>
>
>
> Thanks for sharing the information with us, Piyush an Lasse.
>
>
>
> @Piyush
>
>
>
> Thanks for offering the help. IMO, there are currently several problems
> that make supporting Flink on Mesos 

Re: flink hive Streaming查询不到数据的问题

2020-10-28 Thread Jingsong Li
Hi,

你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。

Best,
Jingsong

On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
> userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> sourceContext.collect(userInfo);
> Thread.sleep(100);
> }
> }
>
> @Override
> public void cancel() {
> run = false;
> }
> }
> public class UserInfo implements Serializable {
> private String userId;
> private Double amount;
> private Timestamp ts;
>
> public String getUserId() {
> return userId;
> }
>
> public void setUserId(String userId) {
> this.userId = userId;
> }
>
> public Double getAmount() {
> return amount;
> }
>
> public void setAmount(Double amount) {
> this.amount = amount;
> }
>
> public Timestamp getTs() {
> return ts;
> }
>
> public void setTs(Timestamp ts) {
> this.ts = ts;
> }
> }
>
> hive (yutest)>
>  >
>  > show partitions fs_table;
> OK
> partition
> Time taken: 20.214 seconds
>
> --
> hdxg1101300...@163.com
>


-- 
Best, Jingsong Lee


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:
>
> @Seth: Earlier in this discussion it was said that the BucketingSink
> would not be usable in 1.12 .
>
> On 10/16/2020 4:25 PM, Seth Wiesman wrote:
> > +1 It has been deprecated for some time and the StreamingFileSink has
> > stabalized with a large number of formats and features.
> >
> > Plus, the bucketing sink only implements a small number of stable
> > interfaces[1]. I would expect users to continue to use the bucketing sink
> > from the 1.11 release with future versions for some time.
> >
> > Seth
> >
> > https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172
> >
> > On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:
> >
> >> @Arvid Heise I also do not remember exactly what were all the
> >> problems. The fact that we added some more bulk formats to the
> >> streaming file sink definitely reduced the non-supported features. In
> >> addition, the latest discussion I found on the topic was [1] and the
> >> conclusion of that discussion seems to be to remove it.
> >>
> >> Currently, I cannot find any obvious reason why keeping the
> >> BucketingSink, apart from the fact that we do not have a migration
> >> plan unfortunately. This is why I posted this to dev@ and user@.
> >>
> >> Cheers,
> >> Kostas
> >>
> >> [1]
> >> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
> >>
> >> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> >>> I remember this conversation popping up a few times already and I'm in
> >>> general a big fan of removing BucketingSink.
> >>>
> >>> However, until now there were a few features lacking in StreamingFileSink
> >>> that are present in BucketingSink and that are being actively used (I
> >> can't
> >>> exactly remember them now, but I can look it up if everyone else is also
> >>> suffering from bad memory). Did we manage to add them in the meantime? If
> >>> not, then it feels rushed to remove it at this point.
> >>>
> >>> On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
> >> wrote:
>  @Chesnay Schepler  Off the top of my head, I cannot find an easy way
>  to migrate from the BucketingSink to the StreamingFileSink. It may be
>  possible but it will require some effort because the logic would be
>  "read the old state, commit it, and start fresh with the
>  StreamingFileSink."
> 
>  On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
>  wrote:
> > On 13.10.20 14:01, David Anderson wrote:
> >> I thought this was waiting on FLIP-46 -- Graceful Shutdown
> >> Handling --
>  and
> >> in fact, the StreamingFileSink is mentioned in that FLIP as a
>  motivating
> >> use case.
> > Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.
> >> Thanks
> > for the reminder, we should close FLIP-46 now with an explanatory
> > message to avoid confusion.
> >>>
> >>> --
> >>>
> >>> Arvid Heise | Senior Java Developer
> >>>
> >>> 
> >>>
> >>> Follow us @VervericaData
> >>>
> >>> --
> >>>
> >>> Join Flink Forward  - The Apache Flink
> >>> Conference
> >>>
> >>> Stream Processing | Event Driven | Real Time
> >>>
> >>> --
> >>>
> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >>>
> >>> --
> >>> Ververica GmbH
> >>> 

flink hive Streaming查询不到数据的问题

2020-10-28 Thread hdxg1101300...@163.com
你好:
我现在在使用flink 1.11.2版本 hive1.1.0 版本;
当我在使用flink hive streaming的使用发现按照 示例写数据到hive 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
下面是我的代码
 object StreamMain {
  def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20))

val dataStream = streamEnv.addSource(new MySource)

val catalogName = "my_catalog"
val catalog = new HiveCatalog(
  catalogName,  // catalog name
  "yutest",// default database
  "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive config 
(hive-site.xml) directory
  "1.1.0"   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("yutest")


tableEnv.createTemporaryView("users", dataStream)
tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
//  如果hive中已经存在了相应的表,则这段代码省略
val hiveSql = """CREATE external TABLE fs_table (
user_id STRING,
order_amount DOUBLE
  )
  partitioned by(
  dt string,
  h string,
  m string) stored as parquet
  TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='metastore,success-file'
  )""".stripMargin
tableEnv.executeSql(hiveSql)

val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
FROM users"
tableEnv.executeSql(insertSql)
  }
}
public class MySource implements SourceFunction {
private volatile boolean run = true;
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", 
"67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
"dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
"3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", 
"aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", 
"a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};

@Override
public void run(SourceFunction.SourceContext sourceContext) 
throws Exception {

while (run) {
String userid = userids[(int) (Math.random() * (userids.length - 
1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(System.currentTimeMillis()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}

@Override
public void cancel() {
run = false;
}
}
public class UserInfo implements Serializable {
private String userId;
private Double amount;
private Timestamp ts;

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public Double getAmount() {
return amount;
}

public void setAmount(Double amount) {
this.amount = amount;
}

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}
}

hive (yutest)>
 >
 > show partitions fs_table;
OK
partition
Time taken: 20.214 seconds



hdxg1101300...@163.com


回复: Re: 关于flink-sql 维表join问题

2020-10-28 Thread 史 正超
我最近的写的业务和你差不多,不过我关联的是两张表,一张mysql的维表,一张binlog的流表。最开始我都是left join 
,发现只有binlog流表有数据时才计算。
后面 我做嵌套的查询,先与mysql维表inner join(直接join),然后再套一层query 再与流表left 
join,现在情况正常。就算binlog的流表没有数据也有计算到。

发件人: Jark Wu 
发送时间: 2020年10月28日 7:24
收件人: user-zh 
主题: Re: Re: 关于flink-sql 维表join问题

因为我理解你是想一天一个全量用户绩效结果表,不覆盖前一天的绩效结果,在我看来这就是批的需求了,因为需要重新读取 source
数据,而这从需求上就是有一天的 delay。

如果你能接受覆盖之前的绩效,那么可能可以使用 mysql-cdc connector 去获取 users 的全量+增量流式 source。
比如  mysql_cdc_users 就是去读取 mysql 中的 users 表,然后可以 left join 上绩效流等等,覆盖更新用户的绩效。

insert into mysql_users_latest_kpi
select * from mysql_cdc_users AS u left join kpi_stream AS k on u.memberId
= k.rowkey

mysql_users_latest_kpi 中就会存储最新的用户绩效,且不断在覆盖更新(一旦有新的绩效到达)。

Best,
Jark




On Wed, 28 Oct 2020 at 14:45, 夜思流年梦  wrote:

> 批处理的确是可以解决这类问题,只不过不是实时的了,主要是想使用flink-sql解决这类报表问题;
> 另外问一句, flink-sql 有打算解决这类问题吗?我感觉这个场景还挺多的呢
> 维表 left join 一张流表, 维表全量数据关联流表,既能获取到实时流表的统计数据,又能保证维表的数据是一个实时更新(或者定期更新)的状态
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-27 17:24:05,"Jark Wu"  写道:
> >我觉得这个更像是一个周期性调度的批处理需求。因为你流处理,只能一直读取员工表的增量,没法每天读个全量。
> >是不是用 flink batch + 调度更好一点呢?
> >
> >Best,
> >Jark
> >
> >On Tue, 27 Oct 2020 at 16:08, 夜思流年梦  wrote:
> >
> >> 目前在准备搞实时数仓:碰到一个问题:
> >> 比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表;
> >> 如果是正常SQL的话是这样join :
> >>
> >>
> >> 维表 left join  流表  1
> >> left join 流表 2
> >> left join 流表 3
> >> left join 流表 4
> >>
> >>
> >> 因为flink-sql 的temporal join 不支持 维表在左边 left join 流表,
> >>
> >>
> >> 故只能 流表在左,维表在右来join
> >> 即:select  * from  table a  left join dim_XXX  FOR SYSTEM_TIME AS OF
> >> a.proctime as c on a.memberId=c.rowkey
> >>
> >>
> >>
> 但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4
> >> 张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。
> >>
> >>
> >>
> 上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左
> >> left join 流表)有没有比较好的解决方案
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>


flinkSQL针对join操作设置不同窗口

2020-10-28 Thread 奔跑的小飞袁
hello
我们这有一种业务场景是关于两个动态表的join,其中一张表是完全的动态表,去关联另一张动态表中当天的数据,请问这种情况的下join场景支持吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink是否可以动态调整任务并行度

2020-10-28 Thread ZT.Ren
印象中,Flink1.9之后的某个版本支持动态调整并行度,但忘记怎么使用了。有没有哪位同学帮忙指点下,多谢

Re: flinksql 不支持 % 运算

2020-10-28 Thread Danny Chan
%是非标准的 SQL 语法,不推荐使用。

Benchao Li  于2020年10月26日周一 下午9:26写道:

> 1.11的话通过配置是无法实现的。可以把这个pr[1] cherry-pick到1.11的分支上编译一下来实现1.11上使用%
>
> [1] https://github.com/apache/flink/pull/12818
>
> 夜思流年梦  于2020年10月26日周一 下午4:16写道:
>
> > flink 版本1.11
> > 目前flink-sql 好像不支持取余运算,会报错:
> > 比如:SELECT * FROM Orders WHERE a % 2 = 0
> > Percent remainder '%' is not allowed under the current SQL conformance
> > level
> >
> >
> > 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复
> >
> >
> >
> >
> > 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 请问批处理有反压嘛?

2020-10-28 Thread Danny Chan
有的,反压机制借助于 runtime 的网络 buffer,和批流无关。

请叫我雷锋 <854194...@qq.com> 于2020年10月27日周二 下午8:02写道:

> 如题


Re: Working with bounded Datastreams - Flink 1.11.1

2020-10-28 Thread Danny Chan
In SQL, you can use the over window to deduplicate the messages by the id
[1], but i'm not sure if there are same semantic operators in DataStream.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

s_penakalap...@yahoo.com  于2020年10月28日周三
下午12:34写道:

> Hi All,
>
> Request your inputs please.
>
> Regards,
> Sunitha
>
> On Tuesday, October 27, 2020, 01:01:41 PM GMT+5:30,
> s_penakalap...@yahoo.com  wrote:
>
>
> Hi Team,
>
> I want to use Flink Datastream for Batch operations which involves huge
> data, I did try to calculate count and average on the whole Datastream with
> out using window function.
>
>  Approach I tried to calculate count on the datastream:
> 1> Read data from table (say past 2 days of data) as Datastream
> 2> apply Key operation on the datastream
> 3> then use reduce function to find count, sum and average.
>
> I have written output to file and also inserted into table: sample data
> from file is:
>
> vehicleId=aa, count=1, fuel=10, avgFuel=0.0
> vehicleId=dd, count=1, fuel=7, avgFuel=0.0
> vehicleId=dd, count=2, fuel=22, avgFuel=11.0
> vehicleId=dd, count=3, fuel=42, avgFuel=14.0
> vehicleId=ee, count=1, fuel=0, avgFuel=0.0
>
> what I am looking for is , when there are multiple records with same
> vehicle Id I see that only the final record is having correct values (like 
> vehicleId=dd).
> Is there any way to get only one final record for each vehicle as shown
> below:
> vehicleId=aa, count=1, fuel=10, avgFuel=0.0
> vehicleId=dd, count=3, fuel=42, avgFuel=14.0
> vehicleId=ee, count=1, fuel=0, avgFuel=0.0
>
> Also I request some help on how to sort whole DataStream based on one
> attribute. Say we have x records in one Batch Job I would like to sort and
> fetch X-2 position record per vehicle.
>
> Regards,
> Sunitha.
>
>


Re: Re: 关于flink-sql 维表join问题

2020-10-28 Thread Jark Wu
因为我理解你是想一天一个全量用户绩效结果表,不覆盖前一天的绩效结果,在我看来这就是批的需求了,因为需要重新读取 source
数据,而这从需求上就是有一天的 delay。

如果你能接受覆盖之前的绩效,那么可能可以使用 mysql-cdc connector 去获取 users 的全量+增量流式 source。
比如  mysql_cdc_users 就是去读取 mysql 中的 users 表,然后可以 left join 上绩效流等等,覆盖更新用户的绩效。

insert into mysql_users_latest_kpi
select * from mysql_cdc_users AS u left join kpi_stream AS k on u.memberId
= k.rowkey

mysql_users_latest_kpi 中就会存储最新的用户绩效,且不断在覆盖更新(一旦有新的绩效到达)。

Best,
Jark




On Wed, 28 Oct 2020 at 14:45, 夜思流年梦  wrote:

> 批处理的确是可以解决这类问题,只不过不是实时的了,主要是想使用flink-sql解决这类报表问题;
> 另外问一句, flink-sql 有打算解决这类问题吗?我感觉这个场景还挺多的呢
> 维表 left join 一张流表, 维表全量数据关联流表,既能获取到实时流表的统计数据,又能保证维表的数据是一个实时更新(或者定期更新)的状态
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-27 17:24:05,"Jark Wu"  写道:
> >我觉得这个更像是一个周期性调度的批处理需求。因为你流处理,只能一直读取员工表的增量,没法每天读个全量。
> >是不是用 flink batch + 调度更好一点呢?
> >
> >Best,
> >Jark
> >
> >On Tue, 27 Oct 2020 at 16:08, 夜思流年梦  wrote:
> >
> >> 目前在准备搞实时数仓:碰到一个问题:
> >> 比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表;
> >> 如果是正常SQL的话是这样join :
> >>
> >>
> >> 维表 left join  流表  1
> >> left join 流表 2
> >> left join 流表 3
> >> left join 流表 4
> >>
> >>
> >> 因为flink-sql 的temporal join 不支持 维表在左边 left join 流表,
> >>
> >>
> >> 故只能 流表在左,维表在右来join
> >> 即:select  * from  table a  left join dim_XXX  FOR SYSTEM_TIME AS OF
> >> a.proctime as c on a.memberId=c.rowkey
> >>
> >>
> >>
> 但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4
> >> 张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。
> >>
> >>
> >>
> 上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左
> >> left join 流表)有没有比较好的解决方案
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>


Re:Re: 关于flink-sql 维表join问题

2020-10-28 Thread 夜思流年梦
批处理的确是可以解决这类问题,只不过不是实时的了,主要是想使用flink-sql解决这类报表问题;
另外问一句, flink-sql 有打算解决这类问题吗?我感觉这个场景还挺多的呢
维表 left join 一张流表, 维表全量数据关联流表,既能获取到实时流表的统计数据,又能保证维表的数据是一个实时更新(或者定期更新)的状态

















在 2020-10-27 17:24:05,"Jark Wu"  写道:
>我觉得这个更像是一个周期性调度的批处理需求。因为你流处理,只能一直读取员工表的增量,没法每天读个全量。
>是不是用 flink batch + 调度更好一点呢?
>
>Best,
>Jark
>
>On Tue, 27 Oct 2020 at 16:08, 夜思流年梦  wrote:
>
>> 目前在准备搞实时数仓:碰到一个问题:
>> 比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表;
>> 如果是正常SQL的话是这样join :
>>
>>
>> 维表 left join  流表  1
>> left join 流表 2
>> left join 流表 3
>> left join 流表 4
>>
>>
>> 因为flink-sql 的temporal join 不支持 维表在左边 left join 流表,
>>
>>
>> 故只能 流表在左,维表在右来join
>> 即:select  * from  table a  left join dim_XXX  FOR SYSTEM_TIME AS OF
>> a.proctime as c on a.memberId=c.rowkey
>>
>>
>> 但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4
>> 张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。
>>
>>
>> 上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左
>> left join 流表)有没有比较好的解决方案
>>
>>
>>
>>
>>
>>
>>
>>