[jira] [Created] (FLINK-18620) Unify behaviors of active resource managers

2020-07-16 Thread Xintong Song (Jira)
Xintong Song created FLINK-18620:


 Summary: Unify behaviors of active resource managers
 Key: FLINK-18620
 URL: https://issues.apache.org/jira/browse/FLINK-18620
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Xintong Song
Assignee: Xintong Song


Flink supports various deployment modes: standalone, Kubernetes, Yarn & Mesos. 
For each deployment mode, a resource manager is implemented for managing the 
resources.

While StandaloneResourceManager is quite different from the others by not being 
able to dynamically request and release resources, the other three 
(KubernetesResourceManager, YarnResourceManager and MesosResourceManager) share 
many logics in common. These common logics are currently duplicately 
implemented by each of the active resource managers. Such duplication leads to 
extra maintaining overhead and amplifies stability risks.

This ticket proposes a refactor design for the resource managers, with better 
abstraction deduplicating common logics implementations and minimizing the 
deployment specific behaviors.

This proposal is a pure refactor effort. It does not intend to change any of 
the current resource management behaviors.

A detailed design doc and a simplified proof-of-concept implementation for the 
Kubernetes deployment are linked to this ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release candidate #4)

2020-07-16 Thread Zhijiang
Hi Thomas,

Thanks for your further profiling information and glad to see we already 
finalized the location to cause the regression. 
Actually I was also suspicious of the point of #snapshotState in previous 
discussions since it indeed cost much time to block normal operator processing.

Based on your below feedback, the sleep time during #snapshotState might be the 
main concern, and I also digged into the implementation of 
FlinkKinesisProducer#snapshotState.
while (producer.getOutstandingRecordsCount() > 0) {
   producer.flush();
   try {
  Thread.sleep(500);
   } catch (InterruptedException e) {
  LOG.warn("Flushing was interrupted.");
  break;
   }
}
It seems that the sleep time is mainly affected by the internal operations 
inside KinesisProducer implementation provided by amazonaws, which I am not 
quite familiar with. 
But I noticed there were two upgrades related to it in release-1.11.0. One is 
for upgrading amazon-kinesis-producer to 0.14.0 [1] and another is for 
upgrading aws-sdk-version to 1.11.754 [2].
You mentioned that you already reverted the SDK upgrade to verify no changes. 
Did you also revert the [1] to verify?
[1] https://issues.apache.org/jira/browse/FLINK-17496
[2] https://issues.apache.org/jira/browse/FLINK-14881

Best,
Zhijiang
--
From:Thomas Weise 
Send Time:2020年7月17日(星期五) 05:29
To:dev 
Cc:Zhijiang ; Stephan Ewen ; 
Arvid Heise ; Aljoscha Krettek 
Subject:Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release 
candidate #4)

Sorry for the delay.

I confirmed that the regression is due to the sink (unsurprising, since
another job with the same consumer, but not the producer, runs as expected).

As promised I did CPU profiling on the problematic application, which gives
more insight into the regression [1]

The screenshots show that the average time for snapshotState increases from
~9s to ~28s. The data also shows the increase in sleep time during
snapshotState.

Does anyone, based on changes made in 1.11, have a theory why?

I had previously looked at the changes to the Kinesis connector and also
reverted the SDK upgrade, which did not change the situation.

It will likely be necessary to drill into the sink / checkpointing details
to understand the cause of the problem.

Let me know if anyone has specific questions that I can answer from the
profiling results.

Thomas

[1]
https://docs.google.com/presentation/d/159IVXQGXabjnYJk3oVm3UP2UW_5G-TGs_u9yzYb030I/edit?usp=sharing

On Mon, Jul 13, 2020 at 11:14 AM Thomas Weise  wrote:

> + dev@ for visibility
>
> I will investigate further today.
>
>
> On Wed, Jul 8, 2020 at 4:42 AM Aljoscha Krettek 
> wrote:
>
>> On 06.07.20 20:39, Stephan Ewen wrote:
>> >- Did sink checkpoint notifications change in a relevant way, for
>> example
>> > due to some Kafka issues we addressed in 1.11 (@Aljoscha maybe?)
>>
>> I think that's unrelated: the Kafka fixes were isolated in Kafka and the
>> one bug I discovered on the way was about the Task reaper.
>>
>>
>> On 07.07.20 17:51, Zhijiang wrote:
>> > Sorry for my misunderstood of the previous information, Thomas. I was
>> assuming that the sync checkpoint duration increased after upgrade as it
>> was mentioned before.
>> >
>> > If I remembered correctly, the memory state backend also has the same
>> issue? If so, we can dismiss the rocksDB state changes. As the slot sharing
>> enabled, the downstream and upstream should
>> > probably deployed into the same slot, then no network shuffle effect.
>> >
>> > I think we need to find out whether it has other symptoms changed
>> besides the performance regression to further figure out the scope.
>> > E.g. any metrics changes, the number of TaskManager and the number of
>> slots per TaskManager from deployment changes.
>> > 40% regression is really big, I guess the changes should also be
>> reflected in other places.
>> >
>> > I am not sure whether we can reproduce the regression in our AWS
>> environment by writing any Kinesis jobs, since there are also normal
>> Kinesis jobs as Thomas mentioned after upgrade.
>> > So it probably looks like to touch some corner case. I am very willing
>> to provide any help for debugging if possible.
>> >
>> >
>> > Best,
>> > Zhijiang
>> >
>> >
>> > --
>> > From:Thomas Weise 
>> > Send Time:2020年7月7日(星期二) 23:01
>> > To:Stephan Ewen 
>> > Cc:Aljoscha Krettek ; Arvid Heise <
>> ar...@ververica.com>; Zhijiang 
>> > Subject:Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0,
>> release candidate #4)
>> >
>> > We are deploying our apps with FlinkK8sOperator. We have one job that
>> works as expected after the upgrade and the one discussed here that has the
>> performance regression.
>> >
>> > "The performance regression is obvious caused by long duration of sync
>> checkpoint process in Kinesis sink operator, which would block the normal
>> data processing until back 

Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release candidate #4)

2020-07-16 Thread Thomas Weise
Sorry for the delay.

I confirmed that the regression is due to the sink (unsurprising, since
another job with the same consumer, but not the producer, runs as expected).

As promised I did CPU profiling on the problematic application, which gives
more insight into the regression [1]

The screenshots show that the average time for snapshotState increases from
~9s to ~28s. The data also shows the increase in sleep time during
snapshotState.

Does anyone, based on changes made in 1.11, have a theory why?

I had previously looked at the changes to the Kinesis connector and also
reverted the SDK upgrade, which did not change the situation.

It will likely be necessary to drill into the sink / checkpointing details
to understand the cause of the problem.

Let me know if anyone has specific questions that I can answer from the
profiling results.

Thomas

[1]
https://docs.google.com/presentation/d/159IVXQGXabjnYJk3oVm3UP2UW_5G-TGs_u9yzYb030I/edit?usp=sharing

On Mon, Jul 13, 2020 at 11:14 AM Thomas Weise  wrote:

> + dev@ for visibility
>
> I will investigate further today.
>
>
> On Wed, Jul 8, 2020 at 4:42 AM Aljoscha Krettek 
> wrote:
>
>> On 06.07.20 20:39, Stephan Ewen wrote:
>> >- Did sink checkpoint notifications change in a relevant way, for
>> example
>> > due to some Kafka issues we addressed in 1.11 (@Aljoscha maybe?)
>>
>> I think that's unrelated: the Kafka fixes were isolated in Kafka and the
>> one bug I discovered on the way was about the Task reaper.
>>
>>
>> On 07.07.20 17:51, Zhijiang wrote:
>> > Sorry for my misunderstood of the previous information, Thomas. I was
>> assuming that the sync checkpoint duration increased after upgrade as it
>> was mentioned before.
>> >
>> > If I remembered correctly, the memory state backend also has the same
>> issue? If so, we can dismiss the rocksDB state changes. As the slot sharing
>> enabled, the downstream and upstream should
>> > probably deployed into the same slot, then no network shuffle effect.
>> >
>> > I think we need to find out whether it has other symptoms changed
>> besides the performance regression to further figure out the scope.
>> > E.g. any metrics changes, the number of TaskManager and the number of
>> slots per TaskManager from deployment changes.
>> > 40% regression is really big, I guess the changes should also be
>> reflected in other places.
>> >
>> > I am not sure whether we can reproduce the regression in our AWS
>> environment by writing any Kinesis jobs, since there are also normal
>> Kinesis jobs as Thomas mentioned after upgrade.
>> > So it probably looks like to touch some corner case. I am very willing
>> to provide any help for debugging if possible.
>> >
>> >
>> > Best,
>> > Zhijiang
>> >
>> >
>> > --
>> > From:Thomas Weise 
>> > Send Time:2020年7月7日(星期二) 23:01
>> > To:Stephan Ewen 
>> > Cc:Aljoscha Krettek ; Arvid Heise <
>> ar...@ververica.com>; Zhijiang 
>> > Subject:Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0,
>> release candidate #4)
>> >
>> > We are deploying our apps with FlinkK8sOperator. We have one job that
>> works as expected after the upgrade and the one discussed here that has the
>> performance regression.
>> >
>> > "The performance regression is obvious caused by long duration of sync
>> checkpoint process in Kinesis sink operator, which would block the normal
>> data processing until back pressure the source."
>> >
>> > That's a constant. Before (1.10) and upgrade have the same sync
>> checkpointing time. The question is what change came in with the upgrade.
>> >
>> >
>> >
>> > On Tue, Jul 7, 2020 at 7:33 AM Stephan Ewen  wrote:
>> >
>> > @Thomas Just one thing real quick: Are you using the standalone setup
>> scripts (like start-cluster.sh, and the former "slaves" file) ?
>> > Be aware that this is now called "workers" because of avoiding
>> sensitive names.
>> > In one internal benchmark we saw quite a lot of slowdown initially,
>> before seeing that the cluster was not a distributed cluster any more ;-)
>> >
>> >
>> > On Tue, Jul 7, 2020 at 9:08 AM Zhijiang 
>> wrote:
>> > Thanks for this kickoff and help analysis, Stephan!
>> > Thanks for the further feedback and investigation, Thomas!
>> >
>> > The performance regression is obvious caused by long duration of sync
>> checkpoint process in Kinesis sink operator, which would block the normal
>> data processing until back pressure the source.
>> > Maybe we could dig into the process of sync execution in checkpoint.
>> E.g. break down the steps inside respective operator#snapshotState to
>> statistic which operation cost most of the time, then
>> > we might probably find the root cause to bring such cost.
>> >
>> > Look forward to the further progress. :)
>> >
>> > Best,
>> > Zhijiang
>> >
>> > --
>> > From:Stephan Ewen 
>> > Send Time:2020年7月7日(星期二) 14:52
>> > To:Thomas Weise 
>> > Cc:Stephan Ewen ; Zhijiang <
>> 

Re: Using md5 hash while sinking files to s3

2020-07-16 Thread Chesnay Schepler
I only quickly skimmed the Hadoop docs and found this (although it is 
not documented very well I might add). If this does not do the trick, 
I'd suggest to reach out to the Hadoop project, since we're using their 
S3 filesystem.


On 16/07/2020 19:32, nikita Balakrishnan wrote:

Hey Chesnay,

Thank you for getting back with that! I tried setting that too, it 
still gives me the same exception. Is there something else that I'm 
missing?
I also have 
fs.s3a.bucket..server-side-encryption-algorithm=SSE-KMS 
and fs.s3a.bucket..server-side-encryption.key set.


Is there no need to set the md5 hash value manually while sinking? The 
fs.s3a.etag.checksum.enabled: true will do it for me? And Do I need to 
specify anywhere that we have to use md5 hashing?



On Thu, Jul 16, 2020 at 12:04 AM Chesnay Schepler > wrote:


Please try configuring :

fs.s3a.etag.checksum.enabled: true


On 16/07/2020 03:11, nikita Balakrishnan wrote:

Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at

org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at

org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at

org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: 

Re: Using md5 hash while sinking files to s3

2020-07-16 Thread nikita Balakrishnan
Hey Chesnay,

Thank you for getting back with that! I tried setting that too, it still
gives me the same exception. Is there something else that I'm missing?
I also have fs.s3a.bucket..server-side-encryption-algorithm=SSE-KMS
and fs.s3a.bucket..server-side-encryption.key set.

Is there no need to set the md5 hash value manually while sinking? The
fs.s3a.etag.checksum.enabled:
true will do it for me? And Do I need to specify anywhere that we have to
use md5 hashing?


On Thu, Jul 16, 2020 at 12:04 AM Chesnay Schepler 
wrote:

> Please try configuring :
>
> fs.s3a.etag.checksum.enabled: true
>
>
> On 16/07/2020 03:11, nikita Balakrishnan wrote:
>
> Hello team,
>
> I’m developing a system where we are trying to sink to an immutable s3
> bucket. This bucket has server side encryption set as KMS. The DataStream
> sink works perfectly fine when I don’t use the immutable bucket but when I
> use an immutable bucket, I get exceptions regarding multipart upload
> failures. It says we need to enable md5 hashing for the put object to work.
>
> Here’s the stack trace:
>
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> java.io.IOException: Uploading parts failed
> ... 11 common frames omitted
> Caused by: java.io.IOException: Uploading parts failed
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
> at
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
> ... 10 common frames omitted
> Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
> raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
> com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
> is required for Put Part requests with Object Lock parameters (Service:
> Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
> S3 Extended Request ID: ), S3 Extended Request ID: xx
> :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
> with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
> Code: InvalidRequest; Request ID: ; S3 Extended Request ID: )
> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> at 

[jira] [Created] (FLINK-18619) Update training to use WatermarkStrategy

2020-07-16 Thread Seth Wiesman (Jira)
Seth Wiesman created FLINK-18619:


 Summary: Update training to use WatermarkStrategy
 Key: FLINK-18619
 URL: https://issues.apache.org/jira/browse/FLINK-18619
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training
Reporter: Seth Wiesman
Assignee: Seth Wiesman
 Fix For: 1.12.0, 1.11.1






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-130: Support for Python DataStream API (Stateless Part)

2020-07-16 Thread Hequn Cheng
Hi,

Thanks a lot for your discussions.
I think Aljoscha makes good suggestions here! Those problematic APIs should
not be added to the new Python DataStream API.

Only one item I want to add based on the reply from Shuiqiang:
I would also tend to keep the readTextFile() method. Apart from print(),
the readTextFile() may also be very helpful and frequently used for playing
with Flink.
For example, it is used in our WordCount example[1] which is almost the
first Flink program that every beginner runs.
It is more efficient for reading multi-line data compared to
fromCollection() meanwhile far more easier to be used compared to Kafka,
Kinesis, RabbitMQ,etc., in
cases for playing with Flink.

What do you think?

Best,
Hequn

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java


On Thu, Jul 16, 2020 at 3:37 PM Shuiqiang Chen  wrote:

> Hi Aljoscha,
>
> Thank you for your valuable comments! I agree with you that there is some
> optimization space for existing API and can be applied to the python
> DataStream API implementation.
>
> According to your comments, I have concluded them into the following parts:
>
> 1. SingleOutputStreamOperator and DataStreamSource.
> Yes, the SingleOutputStreamOperator and DataStreamSource are a bit
> redundant, so we can unify their APIs into DataStream to make it more
> clear.
>
> 2. The internal or low-level methods.
>  - DataStream.get_id(): Has been removed in the FLIP wiki page.
>  - DataStream.partition_custom(): Has been removed in the FLIP wiki page.
>  - SingleOutputStreamOperator.can_be_parallel/forceNoParallel: Has been
> removed in the FLIP wiki page.
> Sorry for mistakenly making those internal methods public, we would not
> expose them to users in the Python API.
>
> 3. "declarative" Apis.
> - KeyedStream.sum/min/max/min_by/max_by: Has been removed in the FLIP wiki
> page. They could be well covered by Table API.
>
> 4. Spelling problems.
> - StreamExecutionEnvironment.from_collections. Should be from_collection().
> - StreamExecutionEnvironment.generate_sequenece. Should be
> generate_sequence().
> Sorry for the spelling error.
>
> 5. Predefined source and sink.
> As you said, most of the predefined sources are not suitable for
> production, we can ignore them in the new Python DataStream API.
> There is one exception that maybe I think we should add the print() since
> it is commonly used by users and it is very useful for debugging jobs. We
> can add comments for the API that it should never be used for production.
> Meanwhile, as you mentioned, a good alternative that always prints on the
> client should also be supported. For this case, maybe we can add the
> collect method and return an Iterator. With the iterator, uses can print
> the content on the client. This is also consistent with the behavior in
> Table API.
>
> 6. For Row.
> Do you mean that we should not expose the Row type in Python API? Maybe I
> haven't gotten your concerns well.
> We can use tuple type in Python DataStream to support Row. (I have updated
> the example section of the FLIP to reflect the design.)
>
> Highly appreciated for your suggestions again. Looking forward to your
> feedback.
>
> Best,
> Shuiqiang
>
> Aljoscha Krettek  于2020年7月15日周三 下午5:58写道:
>
> > Hi,
> >
> > thanks for the proposal! I have some comments about the API. We should
> not
> > blindly copy the existing Java DataSteam because we made some mistakes
> with
> > that and we now have a chance to fix them and not forward them to a new
> API.
> >
> > I don't think we need SingleOutputStreamOperator, in the Scala API we
> just
> > have DataStream and the relevant methods from SingleOutputStreamOperator
> > are added to DataStream. Having this extra type is more confusing than
> > helpful to users, I think. In the same vain, I think we also don't need
> > DataStreamSource. The source methods can also just return a DataStream.
> >
> > There are some methods that I would consider internal and we shouldn't
> > expose them:
> >  - DataStream.get_id(): this is an internal method
> >  - DataStream.partition_custom(): I think adding this method was a
> mistake
> > because it's to low-level, I could be convinced otherwise
> >  - DataStream.print()/DataStream.print_to_error(): These are questionable
> > because they print to the TaskManager log. Maybe we could add a good
> > alternative that always prints on the client, similar to the Table API
> >  - DataStream.write_to_socket(): It was a mistake to add this sink on
> > DataStream it is not fault-tolerant and shouldn't be used in production
> >
> >  - KeyedStream.sum/min/max/min_by/max_by: Nowadays, the Table API should
> > be used for "declarative" use cases and I think these methods should not
> be
> > in the DataStream API
> >  - SingleOutputStreamOperator.can_be_parallel/forceNoParallel: these are
> > internal methods
> >
> >  - 

Re: Performance test Flink vs Storm

2020-07-16 Thread Prasanna kumar
Xintong Song,


   - Which version of Flink is used?*1.10*
   - Which deployment mode is used? *Standalone*
   - Which cluster mode is used? *Job*
   - Do you mean you have a 4core16gb node for each task manager, and each
   task manager has 4 slots? *Yeah*. *There are totally 3 taskmanagers in
   the cluster.  2TMs are t2.medium machine 2 core 4 gb per machine. 1 slot
   per core. 1TM is t2.large 4core 16gb . 4slots in the machine. There were
   other jobs running in the t2.medium TMs. T2.large machine is where the
   performance testing job was running. *
   - Sounds like you are running a streaming job without using any state.
   Have you tuned the managed memory fraction
   (`taskmanager.memory.managed.fraction`) to zero as suggested in the
   document[1]?  *No i have not set the
   taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
   Job manager backend. *
   - *The CPU maximum spike i spotted was 40%. *

*Between i did some latest test only on t2.medium machine with 2 slots per
core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
*I added rebalance to the inputstream.   ex: *inputStream.rebalance().map()
*I was able to get latency in the range 130ms - 2sec.*

Let me also know if there are more things to consider here.

Thanks
Prasanna.

On Thu, Jul 16, 2020 at 4:04 PM Xintong Song  wrote:

> Hi Prasanna,
>
> Trying to understand how Flink is deployed.
>
>- Which version of Flink is used?
>- Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
>- Which cluster mode is used? (Job/Session)
>- Do you mean you have a 4core16gb node for each task manager, and
>each task manager has 4 slots?
>- Sounds like you are running a streaming job without using any state.
>Have you tuned the managed memory fraction
>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>document[1]?
>
> When running a stateless job or using a heap state backend
>> (MemoryStateBackend or FsStateBackend), set managed memory to zero.
>>
>
> I can see a few potential problems.
>
>- Managed memory is probably not configured. That means a significant
>fraction of memory is unused.
>- It sounds like the CPU processing time is not the bottleneck. Thus
>increasing the parallelism will not give you better performance, but will
>on the other hand increase the overhead load on the task manager.
>
> Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
> you have observed lack of performance in reading from Kafka compared to
> Storm.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
>
> On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi
>>
>> Sending to you all separately as you answered one of my earlier query.
>>
>> Thanks,
>> Prasanna.
>>
>>
>> -- Forwarded message -
>> From: Prasanna kumar 
>> Date: Wed 15 Jul, 2020, 23:27
>> Subject: Performance test Flink vs Storm
>> To: , user 
>>
>>
>> Hi,
>>
>> We are testing flink and storm for our streaming pipelines on various
>> features.
>>
>> In terms of Latency,i see the flink comes up short on storm even if more
>> CPU is given to it. Will Explain in detail.
>>
>> *Machine*. t2.large 4 core 16 gb. is used for Used for flink task
>> manager and storm supervisor node.
>> *Kafka Partitions* 4
>> *Messages tested:* 1million
>> *Load* : 50k/sec
>>
>> *Scenario*:
>> Read from Kafka -> Transform (Map to a different JSON format) - > Write
>> to a Kafka topic.
>>
>> *Test 1*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink) .
>> Flink. Operator level parallelism not set. Task Parallelism is set as 1.
>> Task slot is 1 per core.
>>
>> Storm was 130 milliseconds faster in 1st record.
>> Storm was 20 seconds faster in 1 millionth record.
>>
>> *Test 2*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink)
>> Flink. Operator level parallelism not set. Task Parallelism is set as 4.
>> Task slot is 1 per core. So all cores is used.
>>
>> Storm was 180 milliseconds faster in 1st record.
>> Storm was 25 seconds faster in 1 millionth record.
>>
>> *Observations here*
>> 1) Increasing Parallelism did not increase the performance in Flink
>> rather it became 50ms to 5s slower.
>> 2) Flink is slower in Reading from Kafka compared to storm. Thats where
>> the bulk of the latency is.  for the millionth record its 19-24 seconds
>> slower.
>> 3) Once message is read, flink takes lesser time to transform and write
>> to kafka compared to storm.
>>
>> *Other Flink Config*
>> jobmanager.heap.size: 1024m
>>
>> taskmanager.memory.process.size: 1568m
>>
>> *How do we improve the latency ? *
>> *Why does latency becomes worse when parallelism is increased and matched
>> to 

Re: [DISCUSS] FLIP-129: Refactor Descriptor API to register connector in Table API

2020-07-16 Thread Jark Wu
Thank you all for the discussion!

Here are my comments:

2) I agree we should support Expression as a computed column. But I'm in
favor of Leonard's point that maybe we can also support SQL string
expression as a computed column.
Because it also keeps aligned with DDL. The concern for Expression is that
converting Expression to SQL string, or (de)serializing Expression is
another topic not clear and may involve lots of work.
Maybe we can support Expression later if time permits.

6,7) I still prefer the "new" keyword over builder. I don't think immutable
is a strong reason. I care more about usability and experience from users
and devs perspective.
  - Users need to type more words if using builder:
`KafkaConnector.newBuilder()...build()`  vs `new KafkaConnector()...`
  - It's more difficult for developers to write a descriptor.  2 classes
(KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders,
schema, partitionedBy, like, etc..).
With the "new" keyword all the common methods are defined by the
framework.
  - It's hard to have the same API style for different connectors, because
the common methods are defined by users. For example, some may have
`withSchema`, `partitionKey`, `withLike`, etc...

8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`,
but the generic `Connector#option`. This doesn't work when using format
options.

new Connector("kafka")
 .option(JsonOptions.IGNORE_PARSE_ERRORS, true);   // this is wrong,
because "kafka" requires "json.ignore-parse-errors" as the option key, not
the "ignore-parse-errors".



Hi Timo, regarding having a complete new stack, I have thought about that.
But I still prefer to refactor the existing stack. Reasons:
Because I think it will be more confusing if users will see two similar
stacks and may have many problems if using the wrong class.
For example, we may have two `Schema` and `TableDescriptor` classes. The
`KafkaConnector` can't be used in legacy `connect()` API,
the legacy `Kafka` class can't be used in the new `createTemporaryTable()`
API.
Besides, the existing API has been deprecated in 1.11, I think it's fine to
remove them in 1.12.


Best,
Jark


On Thu, 16 Jul 2020 at 15:26, Jingsong Li  wrote:

> Thanks for the discussion.
>
> Descriptor lacks the watermark and the computed column is too long.
>
> 1) +1 for just `column(...)`
>
> 2) +1 for being consistent with Table API, the Java Table API should be
> Expression DSL. We don't need pure string support, users should just use
> DDL instead. I think this is just a schema descriptor? The schema
> descriptor should be consistent with DDL, so, definitely, it should
> contain computed columns information.
>
> 3) +1 for not containing Schema#proctime and
> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in
> legacy apis.
>
> 6,7) +1 for removing "new" and builder and making it immutable, For Jark,
> the starting method is the static method, the others are not.
>
> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`.
> For Leonard, I don't think user needs “json.fail-on-missing-field” rather
> than “fail-on-missing-field”, user should
> need “fail-on-missing-field” rather than “json.fail-on-missing-field", the
> recommended way is "JsonFormat.newInstance().option()", should
> configure options in the format scope.
>
> Best,
> Jingsong
>
> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu  wrote:
>
>> Thanks Jark bring this discussion and organize the FLIP document.
>>
>> Thanks Dawid and Timo for the feedback. Here are my thoughts.
>>
>> 1)  I’m +1 with using column() for both cases.
>>
>> 2) Expression DSL vs pure SQL string for computed columns
>>
>> I think we can support them both and implement the pure SQL String first,
>> I agree that Expression DSL brings more possibility and flexibility, but
>> using SQL string is a more unified way which can reuse most logic with DDL
>> like validation and persist in Catalog,
>> and Converting Expression DSL to SQL Expression is another big topic and
>> I did not figure out a feasible idea until now.
>> So, maybe we can postpone the Expression DSL support considered the
>> reality.
>>
>> 3) Methods Schema#proctime and
>> Schema#watermarkFor#boundedOutOfOrderTimestamps
>>
>>  +1 with Dawid’s proposal to offer SQL like methods.
>>  Schema()
>> .column("proctime", proctime());
>> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds()))
>> And we can simplify watermarkFor(“colName”, Expression
>> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I
>> think the later one has can express the meaning of “ WATERMARK FOR
>> column_name AS watermark_strategy_expression“ well.
>>
>> 5)6)7) The new keyword vs the static method vs builder pattern
>>
>> I have not strong tendency,  the new keyword and the static method on
>> descriptor can nearly treated as a builder  and do same things like
>> builder.
>> For the builder 

[jira] [Created] (FLINK-18618) Docker e2e tests are failing on CI

2020-07-16 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-18618:


 Summary: Docker e2e tests are failing on CI
 Key: FLINK-18618
 URL: https://issues.apache.org/jira/browse/FLINK-18618
 Project: Flink
  Issue Type: Improvement
  Components: Build System / Azure Pipelines
Reporter: Chesnay Schepler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18617) run flink with openjdk 11 get java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available error

2020-07-16 Thread dongjie.shi (Jira)
dongjie.shi created FLINK-18617:
---

 Summary: run flink with openjdk 11 get 
java.lang.UnsupportedOperationException: sun.misc.Unsafe or 
java.nio.DirectByteBuffer.(long, int) not available error
 Key: FLINK-18617
 URL: https://issues.apache.org/jira/browse/FLINK-18617
 Project: Flink
  Issue Type: Bug
Reporter: dongjie.shi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18615) run flink with openjdk11 get java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available

2020-07-16 Thread dongjie.shi (Jira)
dongjie.shi created FLINK-18615:
---

 Summary: run flink with openjdk11 get 
java.lang.UnsupportedOperationException: sun.misc.Unsafe or 
java.nio.DirectByteBuffer.(long, int) not available
 Key: FLINK-18615
 URL: https://issues.apache.org/jira/browse/FLINK-18615
 Project: Flink
  Issue Type: Bug
Reporter: dongjie.shi


I have noticed https://issues.apache.org/jira/browse/FLINK-16263

and changed flink version to 1.11.0

and configured 

io.netty.tryReflectionSetAccessible: true

in flink-1.11.0/conf/flink-conf.yaml before i start the job manager and task 
manager 

but when I do sth with apache arrow 0.17.0, I still get the error:

 



 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: ca85f0a81e91a51b0ea79cd192f030d4)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
ca85f0a81e91a51b0ea79cd192f030d4)
 at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
 at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
 at 
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
 at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
 at 
com.intel.analytics.zoo.serving.ClusterServing$$anonfun$run$1.apply$mcV$sp(ClusterServing.scala:62)
 at scala.util.control.Breaks.breakable(Breaks.scala:38)
 at com.intel.analytics.zoo.serving.ClusterServing$.run(ClusterServing.scala:52)
 at 
com.intel.analytics.zoo.serving.ClusterServing$.main(ClusterServing.scala:73)
 at com.intel.analytics.zoo.serving.ClusterServing.main(ClusterServing.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: ca85f0a81e91a51b0ea79cd192f030d4)
 at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
 at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
 at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
 at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
 at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
 at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
 at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
 at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
 at 

[jira] [Created] (FLINK-18616) Add SHOW CURRENT DDLs

2020-07-16 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-18616:


 Summary: Add SHOW CURRENT DDLs
 Key: FLINK-18616
 URL: https://issues.apache.org/jira/browse/FLINK-18616
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Jingsong Lee
 Fix For: 1.12.0


Supports:

SHOW CURRENT CATALOG;

SHOW CURRENT DATABASE;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

2020-07-16 Thread Kurt Young
Hi Xuanna,

Thanks for the detailed design doc, it described clearly how the API looks
and how to interact with Flink runtime.
However, the part which relates to SQL's optimizer is kind of blurry. To be
more precise, I have following questions:

1. How do you identify the CachedTable? I can imagine there would be map
representing the cache, how do you
compare the keys of the map? One approach is they will be compared by java
objects, which is simple but has
limited scope. For example, users created another table using some
interfaces of TableEnvironment, and the table
is exactly the same as the cached one, you won't be able to identify it.
Another choice is calculating the "signature" or
"diest" of the cached table, which involves string representation of the
whole sub tree represented by the cached table.
I don't think Flink currently provides such a mechanism around Table
though.

2. How does the CachedTable affect the optimizer? Specifically, will you
have a dedicated QueryOperation for it, will you have
a dedicated logical & physical RelNode for it? And I also don't see a
description about how to work with current optimize phases,
from Operation to Calcite rel node, and then to Flink's logical and
physical node, which will be at last translated to Flink's exec node.
There also exists other optimizations such as dead lock breaker, as well as
sub plan reuse inside the optimizer, I'm not sure whether
the logic dealing with cached tables can be orthogonal to all of these.
Hence I expect you could have a more detailed description here.

3. What's the effect of calling TableEnvironment.close()? You already
explained this would drop all caches this table env has,
could you also explain where other functionality still works for this table
env? Like can use still create/drop tables/databases/function
through this table env? What happens to the catalog and all temporary
objects of this table env?

One minor comment: I noticed you used some not existing API in the examples
you gave, like table.collect(), which is a little
misleading.

Best,
Kurt


On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su  wrote:

> Hi folks,
>
> I'd like to revive the discussion about FLIP-36 Support Interactive
> Programming in Flink Table API
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
> The FLIP proposes to add support for interactive programming in Flink
> Table API. Specifically, it let users cache the intermediate
> results(tables) and use them in the later jobs to avoid recomputing the
> intermediate result(tables).
>
> I am looking forward to any opinions and suggestions from the community.
>
> Best,
> Xuannan
> On May 7, 2020, 5:40 PM +0800, Xuannan Su , wrote:
> > Hi,
> >
> > There are some feedbacks from @Timo and @Kurt in the voting thread for
> FLIP-36 and I want to share my thoughts here.
> >
> > 1. How would the FLIP-36 look like after FLIP-84?
> > I don't think FLIP-84 will affect FLIP-36 from the public API
> perspective. Users can call .cache on a table object and the cached table
> will be generated whenever the table job is triggered to execute, either by
> Table#executeInsert or StatementSet#execute. I think that FLIP-36 should
> aware of the changes made by FLIP-84, but it shouldn't be a problem. At the
> end of the day, FLIP-36 only requires the ability to add a sink to a node,
> submit a table job with multiple sinks, and replace the cached table with a
> source.
> >
> > 2. How can we support cache in a multi-statement SQL file?
> > The most intuitive way to support cache in a multi-statement SQL file is
> by using a view, where the view is corresponding to a cached table.
> >
> > 3. Unifying the cached table and materialized views
> > It is true that the cached table and the materialized view are similar
> in some way. However, I think the materialized view is a more complex
> concept. First, a materialized view requires some kind of a refresh
> mechanism to synchronize with the table. Secondly, the life cycle of a
> materialized view is longer. The materialized view should be accessible
> even after the application exits and should be accessible by another
> application, while the cached table is only accessible in the application
> where it is created. The cached table is introduced to avoid recomputation
> of an intermediate table to support interactive programming in Flink Table
> API. And I think the materialized view needs more discussion and certainly
> deserves a whole new FLIP.
> >
> > Please let me know your thought.
> >
> > Best,
> > Xuannan
> >
> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su  wrote:
> > Hi folks,
> >
> > The FLIP-36 is updated according to the discussion with Becket. In the
> meantime, any comments are very welcome.
> >
> > If there are no further comments, I would like to start the voting
> > thread by tomorrow.
> >
> > Thanks,
> > Xuannan
> >
> >
> > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su 
> wrote:
> > > > Hi Becket,
> > > 

[jira] [Created] (FLINK-18614) Performance regression 2020.07.13 (most benchmarks)

2020-07-16 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-18614:
-

 Summary: Performance regression 2020.07.13 (most benchmarks)
 Key: FLINK-18614
 URL: https://issues.apache.org/jira/browse/FLINK-18614
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan


http://codespeed.dak8s.net:8000/timeline/

 

Last good commit a028ba216a35a70c4f2d337bdede195bf7701192 Jul 8 13:48:35 2020

[FLINK-18528][table] Update UNNEST to new type system

 

First bad commit 0fbea46ac0271dd84fa8acd7f99f449a9a0d458c Jul 12 21:22:22 2020

[FLINK-18552][tests] Update migration tests of 
StatefulJobWBroadcastStateMigrationITCase to cover migration till release-1.11

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-130: Support for Python DataStream API (Stateless Part)

2020-07-16 Thread Shuiqiang Chen
Hi Aljoscha,

Thank you for your valuable comments! I agree with you that there is some
optimization space for existing API and can be applied to the python
DataStream API implementation.

According to your comments, I have concluded them into the following parts:

1. SingleOutputStreamOperator and DataStreamSource.
Yes, the SingleOutputStreamOperator and DataStreamSource are a bit
redundant, so we can unify their APIs into DataStream to make it more clear.

2. The internal or low-level methods.
 - DataStream.get_id(): Has been removed in the FLIP wiki page.
 - DataStream.partition_custom(): Has been removed in the FLIP wiki page.
 - SingleOutputStreamOperator.can_be_parallel/forceNoParallel: Has been
removed in the FLIP wiki page.
Sorry for mistakenly making those internal methods public, we would not
expose them to users in the Python API.

3. "declarative" Apis.
- KeyedStream.sum/min/max/min_by/max_by: Has been removed in the FLIP wiki
page. They could be well covered by Table API.

4. Spelling problems.
- StreamExecutionEnvironment.from_collections. Should be from_collection().
- StreamExecutionEnvironment.generate_sequenece. Should be
generate_sequence().
Sorry for the spelling error.

5. Predefined source and sink.
As you said, most of the predefined sources are not suitable for
production, we can ignore them in the new Python DataStream API.
There is one exception that maybe I think we should add the print() since
it is commonly used by users and it is very useful for debugging jobs. We
can add comments for the API that it should never be used for production.
Meanwhile, as you mentioned, a good alternative that always prints on the
client should also be supported. For this case, maybe we can add the
collect method and return an Iterator. With the iterator, uses can print
the content on the client. This is also consistent with the behavior in
Table API.

6. For Row.
Do you mean that we should not expose the Row type in Python API? Maybe I
haven't gotten your concerns well.
We can use tuple type in Python DataStream to support Row. (I have updated
the example section of the FLIP to reflect the design.)

Highly appreciated for your suggestions again. Looking forward to your
feedback.

Best,
Shuiqiang

Aljoscha Krettek  于2020年7月15日周三 下午5:58写道:

> Hi,
>
> thanks for the proposal! I have some comments about the API. We should not
> blindly copy the existing Java DataSteam because we made some mistakes with
> that and we now have a chance to fix them and not forward them to a new API.
>
> I don't think we need SingleOutputStreamOperator, in the Scala API we just
> have DataStream and the relevant methods from SingleOutputStreamOperator
> are added to DataStream. Having this extra type is more confusing than
> helpful to users, I think. In the same vain, I think we also don't need
> DataStreamSource. The source methods can also just return a DataStream.
>
> There are some methods that I would consider internal and we shouldn't
> expose them:
>  - DataStream.get_id(): this is an internal method
>  - DataStream.partition_custom(): I think adding this method was a mistake
> because it's to low-level, I could be convinced otherwise
>  - DataStream.print()/DataStream.print_to_error(): These are questionable
> because they print to the TaskManager log. Maybe we could add a good
> alternative that always prints on the client, similar to the Table API
>  - DataStream.write_to_socket(): It was a mistake to add this sink on
> DataStream it is not fault-tolerant and shouldn't be used in production
>
>  - KeyedStream.sum/min/max/min_by/max_by: Nowadays, the Table API should
> be used for "declarative" use cases and I think these methods should not be
> in the DataStream API
>  - SingleOutputStreamOperator.can_be_parallel/forceNoParallel: these are
> internal methods
>
>  - StreamExecutionEnvironment.from_parallel_collection(): I think the
> usability is questionable
>  - StreamExecutionEnvironment.from_collections -> should be called
> from_collection
>  - StreamExecutionEnvironment.generate_sequenece -> should be called
> generate_sequence
>
> I think most of the predefined sources are questionable:
>  - fromParallelCollection: I don't know if this is useful
>  - readTextFile: most of the variants are not useful/fault-tolerant
>  - readFile: same
>  - socketTextStream: also not useful except for toy examples
>  - createInput: also not useful, and it's legacy DataSet InputFormats
>
> I think we need to think hard whether we want to further expose Row in our
> APIs. I think adding it to flink-core was more an accident than anything
> else but I can see that it would be useful for Python/Java interop.
>
> Best,
> Aljoscha
>
>
> On Mon, Jul 13, 2020, at 04:38, jincheng sun wrote:
> > Thanks for bring up this DISCUSS Shuiqiang!
> >
> > +1 for the proposal!
> >
> > Best,
> > Jincheng
> >
> >
> > Xingbo Huang  于2020年7月9日周四 上午10:41写道:
> >
> > > Hi Shuiqiang,
> > >
> > > Thanks a lot for driving this discussion.
> > > Big +1 

Re: [DISCUSS] FLIP-129: Refactor Descriptor API to register connector in Table API

2020-07-16 Thread Jingsong Li
Thanks for the discussion.

Descriptor lacks the watermark and the computed column is too long.

1) +1 for just `column(...)`

2) +1 for being consistent with Table API, the Java Table API should be
Expression DSL. We don't need pure string support, users should just use
DDL instead. I think this is just a schema descriptor? The schema
descriptor should be consistent with DDL, so, definitely, it should
contain computed columns information.

3) +1 for not containing Schema#proctime and
Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in
legacy apis.

6,7) +1 for removing "new" and builder and making it immutable, For Jark,
the starting method is the static method, the others are not.

8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. For
Leonard, I don't think user needs “json.fail-on-missing-field” rather than
“fail-on-missing-field”, user should need “fail-on-missing-field” rather
than “json.fail-on-missing-field", the recommended way is
"JsonFormat.newInstance().option()", should configure options in the
format scope.

Best,
Jingsong

On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu  wrote:

> Thanks Jark bring this discussion and organize the FLIP document.
>
> Thanks Dawid and Timo for the feedback. Here are my thoughts.
>
> 1)  I’m +1 with using column() for both cases.
>
> 2) Expression DSL vs pure SQL string for computed columns
>
> I think we can support them both and implement the pure SQL String first,
> I agree that Expression DSL brings more possibility and flexibility, but
> using SQL string is a more unified way which can reuse most logic with DDL
> like validation and persist in Catalog,
> and Converting Expression DSL to SQL Expression is another big topic and I
> did not figure out a feasible idea until now.
> So, maybe we can postpone the Expression DSL support considered the
> reality.
>
> 3) Methods Schema#proctime and
> Schema#watermarkFor#boundedOutOfOrderTimestamps
>
>  +1 with Dawid’s proposal to offer SQL like methods.
>  Schema()
> .column("proctime", proctime());
> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds()))
> And we can simplify watermarkFor(“colName”, Expression
> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I
> think the later one has can express the meaning of “ WATERMARK FOR
> column_name AS watermark_strategy_expression“ well.
>
> 5)6)7) The new keyword vs the static method vs builder pattern
>
> I have not strong tendency,  the new keyword and the static method on
> descriptor can nearly treated as a builder  and do same things like
> builder.
> For the builder pattern, we will introduce six
> methods(connector.Builder()、connector.Builder.build(), format.Builder(),
> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think
> we could reduce these unnecessary methods.  I ‘m slightly +1 for new
> keyword if we need a choice.
>
> 8) `Connector.option(...)` class should also accept `ConfigOption`
> I’m slightly -1 for this, ConfigOption may not work because the key for
> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of
> json, we need “json.fail-on-missing-field” rather than
> “fail-on-missing-field”.
>
> public static final ConfigOption FAIL_ON_MISSING_FIELD =
> ConfigOptions
> .key("fail-on-missing-field")
> .booleanType()
> .defaultValue(false)
>
> WDYT?
>
> Best,
> Leonard Xu
>
>
> > 在 2020年7月15日,16:37,Timo Walther  写道:
> >
> > Hi Jark,
> >
> > thanks for working on this issue. It is time to fix this last part of
> inconsistency in the API. I also like the core parts of the FLIP, esp. that
> TableDescriptor is one entity that can be passed to different methods. Here
> is some feedback from my side:
> >
> > 1) +1 for just `column(...)`
> >
> > 2) Expression DSL vs pure SQL string for computed columns
> > I agree with Dawid. Using the Expression DSL is desireable for a
> consistent API. Furthermore, otherwise people need to register functions if
> they want to use them in an expression. Refactoring TableSchema is
> definitely on the list for 1.12. Maybe we can come up with some
> intermediate solution where we transform the expression to a SQL expression
> for the catalog. Until the discussions around FLIP-80 and
> CatalogTableSchema have been finalized.
> >
> > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps
> > We should design the descriptor very close to the SQL syntax. The more
> similar the syntax the more likely it is too keep the new descriptor API
> stable.
> >
> > 6) static method vs new keyword
> > Actually, the `new` keyword was one of the things that bothered me most
> in the old design. Fluent APIs avoid this nowadays.
> >
> > 7) make the descriptors immutable with builders
> > The descriptors are some kind of builders already. But they are not
> called "builder". Instead of coming up with the new concept of a
> "descriptor", we should use terminology that people esp. Java/Scala 

[jira] [Created] (FLINK-18613) How to support retract & upsert sink for a TableSink ?

2020-07-16 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-18613:
-

 Summary: How to support retract & upsert sink for a TableSink ?
 Key: FLINK-18613
 URL: https://issues.apache.org/jira/browse/FLINK-18613
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / Planner
Reporter: hehuiyuan


Environment : FLink 1.9 / Blink planner

Hi , i want to ask a question :

I have a job that executes multiple sql and a TableSink class:

(1) insert into table_0 select count(*) from table1;

(2)insert into table_2 select name, sum(score) form table1 group by name;

 

The TableSink implements UpsertStreamTablesink interface.

That is ok for SQL (2), but is not suppported for SQL (1) which there are not 
keys.

 

But i want to use a TableSink which can support the upsert and retract , can 
you give me some advices?   ~ Thanks.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Using md5 hash while sinking files to s3

2020-07-16 Thread Chesnay Schepler

Please try configuring :

fs.s3a.etag.checksum.enabled: true


On 16/07/2020 03:11, nikita Balakrishnan wrote:

Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
S3 Extended Request ID: ), S3 Extended Request ID: xx
:InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
Code: InvalidRequest; Request ID: ; S3 Extended Request ID: )
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
at

[jira] [Created] (FLINK-18612) WordCount example failure when using relative output path

2020-07-16 Thread Zhijiang (Jira)
Zhijiang created FLINK-18612:


 Summary: WordCount example failure when using relative output path
 Key: FLINK-18612
 URL: https://issues.apache.org/jira/browse/FLINK-18612
 Project: Flink
  Issue Type: Bug
  Components: fs
Affects Versions: 1.11.0, 1.11.1
Reporter: Zhijiang
 Fix For: 1.12.0, 1.11.2


The failure log can be found here 
[log|https://pipelines.actions.githubusercontent.com/revSbsLpzrFApLL6BmCvScWt72tRe3wYUv7fCdCtThtI5bydk7/_apis/pipelines/1/runs/27244/signedlogcontent/21?urlExpires=2020-07-16T06%3A35%3A49.4559813Z=HMACV1=%2FfAsJgIlIf%2BDitViRJYh0DAGJZjJwhsCGS219ZyniAA%3D].

When execute the following command, we can reproduce this problem locally.
* bin/start-cluster.sh
* bin/flink run -p 1 examples/streaming/WordCount.jar --input input --output 
result

It is caused by the 
[commit|https://github.com/apache/flink/commit/a2deff2967b7de423b10f7f01a41c06565c37e62#diff-2010e422f5e43a971cd7134a9e0b9a5f
 ].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)