Re: [DISCUSS] Connectors and NULL handling

2019-06-23 Thread Xiaowei Jiang
Error handling policy for streaming jobs goes beyond potential corrupted
messages in the source. Users may have subtle bugs while processing some
messages which may cause the streaming jobs to fail. Even though this can
be considered as a bug in user's code, users may prefer skip such messages
(or log them) and let the job continue in some cases. This may be an
opportunity to take such cases into consideration as well.

Xiaowei

On Fri, Jun 21, 2019 at 11:43 PM Rong Rong  wrote:

> Hi Aljoscha,
>
> Sorry for the late reply, I think the solution makes sense. Using the NULL
> return value to mark a message is corrupted is not a valid way since NULL
> value has semantic meaning in not just Kafka but also in a lot of other
> contexts.
>
> I was wondering if we can have a more meaningful interface for dealing with
> corrupted messages. I am thinking of 2 options on top of my head:
> 1. Create some special deserializer attribute (or a special record) to
> indicate corrupted messages like you suggested; this way we can not only
> encode the deserializing error but allow users to encode any corruption
> information for downstream processing.
> 2. Create a standard fetch error handling API on AbstractFetcher (for
> Kafka) and DataFetcher (for Kinesis); this way we can also handle error's
> other than deserializing problem, for example some even lower level
> exceptions like CRC check failure.
>
> I think either way will work. Also, as long as there's a way for end users
> to extend the error handling for message corruption, it will not
> reintroduce the problems these 2 original JIRA was trying to address.
>
> --
> Rong
>
> On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek 
> wrote:
>
> > Hi All,
> >
> > Thanks to Gary, I recently came upon an interesting cluster of issues:
> >  - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka
> consumer
> > to skip corrupted messages
> >  - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible
> > error handling in the Kafka consumer
> >  - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema
> > handle message record which value is null
> >
> > In light of the last one I’d like to look again at the first two. What
> > they introduced is that when the deserialisation schema returns NULL, the
> > Kafka consumer (and maybe also the Kinesis consumer) silently drops the
> > record. In Kafka NULL values have semantic meaning, i.e. they usually
> > encode a DELETE for the key of the message. If SimpleStringSchema
> returned
> > that null, our consumer would silently drop it and we would lose that
> > DELETE message. That doesn’t seem right.
> >
> > I think the right solution for null handling is to introduce a custom
> > record type that encodes both Kafka NULL values and the possibility of a
> > corrupt message that cannot be deserialised. Something like an Either
> type.
> > It’s then up to the application to handle those cases.
> >
> > Concretely, I want to discuss whether we should change our consumers to
> > not silently drop null records, but instead see them as errors. For
> > FLINK-11820, the solution is for users to write their own custom schema
> > that handles null values and returns a user-defined types that signals
> null
> > values.
> >
> > What do you think?
> >
> > Aljoscha
> >
> >
>


Re: Specifying parallelism on join operation

2019-06-23 Thread Xiaowei Jiang
You can use with(JoinFunction)

to workaround it. See JavaDoc for Flink 1.8:

@PublicEvolving

 @Deprecated 

public  SingleOutputStreamOperator

with(JoinFunction
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/datastream/JoinedStreams.WithWindow.html>,T2
,T>
function)

Deprecated. This method will be removed once the apply(JoinFunction)

method
is fixed in the next major version of Flink (2.0).
Completes the join operation with the user function that is executed for
each combination of elements with the same key in a window.

*Note:* This is a temporary workaround while the apply(JoinFunction)

method
has the wrong return type and hence does not allow one to set an
operator-specific parallelism

On Sat, Jun 22, 2019 at 9:14 AM Roshan Naik 
wrote:

>  I cant find any place to specify the parallelism for the join here.
>
> stream1.join( stream2 )
>  .where( .. )
> .equalTo( .. )
> .window( .. )
> .apply( .. );
>
> How can we specify that ?
>
> -roshan
>


Re: [Discuss] FLIP-43: Savepoint Connector

2019-06-04 Thread Xiaowei Jiang
 Hi Gordon & Seth, this looks like a very useful feature for analyze and manage 
states. 
I agree that using DataSet is probably the most practical choice right now. But 
in the longer adding the TableAPI support for this will be nice.
When analyzing the savepoint, I assume that the state backend restores the 
state first? This approach is generic and works for any state backend. However, 
sometimes it may be more efficient to directly analyzing the files on DFS 
without copying. We can probably add interface to allow state backend optimize 
such behavior in the future.
Also a quick question on the example in wiki: DataSet keyedState = 
operator.readKeyedState("uid", new ReaderFunction());Should 
operator.readKeyedState  be replaced with savepoint.readKeyedState here?

Regards,Xiaowei

On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek 
 wrote:  
 
 +1 I think is is a very valuable new additional and we should try and not get 
stuck on trying to design the perfect solution for everything

> On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai  wrote:
> 
> +1 to renaming it as State Processing API and adding it under the
> flink-libraries module.
> 
> I also think we can start with the development of the feature. From the
> feedback so far, it seems like we're in a good spot to add in at least the
> initial version of this API, hopefully making it ready for 1.9.0.
> 
> Cheers,
> Gordon
> 
> On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman  wrote:
> 
>> It seems like a recurring piece of feedback was a different name. I’d like
>> to propose moving the functionality to the libraries module and naming this
>> the State Processing API.
>> 
>> Seth
>> 
>>> On May 31, 2019, at 3:47 PM, Seth Wiesman  wrote:
>>> 
>>> The SavepointOutputFormat only writes out the savepoint metadata file
>> and should be mostly ignored.
>>> 
>>> The actual state is written out by stream operators and tied into the
>> flink runtime[1, 2, 3].
>>> 
>>> This is the most important part and the piece that I don’t think can be
>> reasonably extracted.
>>> 
>>> Seth
>>> 
>>> [1]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
>>> 
>>> [2]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
>>> 
>>> [3]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
>>> 
 On May 31, 2019, at 3:08 PM, Jan Lukavský  wrote:
 
 Hi Seth,
 
 yes, that helped! :-)
 
 What I was looking for is essentially
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
>> would be great if this could be written in a way, that would enable reusing
>> it in different engine (as I mentioned - Apache Spark). There seem to be
>> some issues though. It uses interface Savepoint, which uses several other
>> objects and interfaces from Flink's runtime. Maybe some convenience API
>> might help - Apache Beam, handles operator naming, so that definitely
>> should be transitionable between systems, but I'm not sure, how to
>> construct OperatorID from this name. Would you think, that it is possible
>> to come up with something that could be used in this portable way?
 
 I understand, there are some more conditions, that need to be satisfied
>> (grouping, aggregating, ...), which would of course have to be handled by
>> the target system. But Apache Beam can help leverage that. My idea would
>> be, that there can be runner specified PTransform, that takes PCollection
>> of some tuples of `(operator name, key, state name, value1), (operator
>> name, key, state name, value2)`, and Runner's responsibility would be to
>> group/aggregate this so that it can be written by runner's provided writer
>> (output format).
 
 All of this would need a lot more design, these are just ideas of "what
>> could be possible", I was just wondering if this FLIP can make some first
>> steps towards this.
 
 Many thanks for comments,
 
 Jan
 
> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> @Jan Gotcha,
> 
> So in reusing components it explicitly is not a writer. This is not a
>> savepoint output format in the way we have a parquet output format. The
>> reason for the Transform api is to hide the underlying details, it does not
>> simply append a output writer to the end of a dataset. This gets into the
>> implementation details but at a high level, the dataset is:
> 
> 1) partitioned using key groups
> 2) data is run through a standard stream operator that takes a
>> snapshot of its state after processing all records and outputs metadata
>> handles for each subtask
> 

Re: [VOTE] Release 1.8.0, release candidate #4

2019-03-26 Thread Xiaowei Jiang
 +1 (non-binding)

- checked checksums and GPG files
- build from source successfully- run end-to-end precommit tests successfully- 
run end-to-end nightly tests successfully
Xiaowei
On Tuesday, March 26, 2019, 8:09:19 PM GMT+8, Yu Li  
wrote:  
 
 +1 (non-binding)

- Checked release notes: OK
- Checked sums and signatures: OK
- Source release
    - contains no binaries: OK
    - contains no 1.8-SNAPSHOT references: OK
    - build from source: OK (8u101)
    - mvn clean verify: OK (8u101)
- Binary release
    - no examples appear to be missing
    - started a cluster; WebUI reachable, example ran successfully
    - end-to-end test (all but K8S and docker ones): OK (8u101)
- Repository appears to contain all expected artifacts

Best Regards,
Yu


On Tue, 26 Mar 2019 at 14:28, Kurt Young  wrote:

> +1 (non-binding)
>
> Checked items:
> - checked checksums and GPG files
> - verified that the source archives do not contains any binaries
> - checked that all POM files point to the same version
> - build from source successfully
>
> Best,
> Kurt
>
>
> On Tue, Mar 26, 2019 at 10:57 AM Shaoxuan Wang 
> wrote:
>
> > +1 (non-binding)
> >
> > I tested RC4 with the following items:
> > - Maven Central Repository contains all artifacts
> > - Built the source with Maven (ensured all source files have Apache
> > headers), and executed built-in tests via "mvn clean verify"
> > - Manually executed the tests in IntelliJ IDE
> > - Verify that the quickstarts for Scala and Java are working with the
> > staging repository in IntelliJ
> > - Checked the benchmark results. The perf regression of
> > tuple-key-by/statebackend/tumblingWindow are gone, but the regression on
> > serializer still exists.
> >
> > Regards,
> > Shaoxuan
> >
> > On Tue, Mar 26, 2019 at 8:06 AM jincheng sun 
> > wrote:
> >
> > > Hi Aljoscha, I think you are right, increase the timeout config will
> fix
> > > this issue. this depends on the resource of Travis.  I would like share
> > > some phenomenon during my test (not the flink problem) as follows:  :-)
> > >
> > > During my testing, `mvn clean verify` and `nightly end-to-end test `
> both
> > > consume a lot of machine resources (especially memory/network), and the
> > > network bandwidth requirements of `nightly end-to-end test ` are also
> > very
> > > high. In China, need to use VPN acceleration (100~200Kb before
> > > acceleration, 3~4Mb after acceleration), I have encountered: [Avro
> > > Confluent Schema Registry nightly end-to-end test' failed after 18
> > minutes
> > > and 15 seconds! Test exited with exit Code 1] takes more than 18
> minutes,
> > > the download failed because the network bandwidth is not enough. and it
> > > runs smoothly when using VPN acceleration. The overall end-to-end run
> was
> > > passed twice. The Docker resource configuration (CUPs 7, Mem: 28.7G,
> > Swap:
> > > 3.5G). See detail log here
> > > <
> > >
> >
> https://docs.google.com/document/d/1CcyTCyZmMmP57pkKv4drjSuxW61_u78HR3q1fJJODMw/edit?usp=sharing
> > > >
> > > .
> > >
> > > Just now, I had checked the Travis for your last commit (Increase
> startup
> > > timeout in end-to-end tests), in addition to the Cleanup phase, other
> > > phases are successful. here
> > > 
> > >
> > > In order to verify that our speculation is accurate, I can help with 10
> > and
> > > 20 seconds timeout config on my repo verification to see if 100%
> > recurring
> > > timeout problem. It is already running, we are waiting for the result.
> > > 10seconds  >
> > > 20seconds  >
> > >
> > > Best,
> > > Jincheng
> > >
> > > Aljoscha Krettek  于2019年3月26日周二 上午1:04写道:
> > >
> > > > Thanks for the testing done so far!
> > > >
> > > > There has been quite some flakiness on Travis lately, see here:
> > > > https://travis-ci.org/apache/flink/branches <
> > > > https://travis-ci.org/apache/flink/branches>. I’m a bit hesitant to
> > > > release in this state. Looking at the tests you can see that all of
> the
> > > > end-to-end tests fail because waiting for the dispatcher to come up
> > times
> > > > out. I also noticed that this usually takes about 5-8 seconds on
> > Travis,
> > > so
> > > > a 10 second timeout might be a bit low. I pushed commits to increase
> > that
> > > > to 20 secs. Let’s see what will happen.
> > > >
> > > > I’ll keep you posted!
> > > > Aljoscha
> > > >
> > > > > On 25. Mar 2019, at 13:13, jincheng sun 
> > > > wrote:
> > > > >
> > > > > Great thanks for preparing the RC4 of Flink 1.8.0, Aljoscha!
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > I checked the functional things as follows(Without performance
> > > > > verification):
> > > > >
> > > > > 1. Checking Artifacts:
> > > > >
> > > > >    1). Download the release source code -  SUCCESS
> > > > >    2). Check Source release flink-1.8.0-src.tgz.sha512 - SUCCESS
> > > > >    3). Downl

Re: [ANNOUNCE] Contributing Alibaba's Blink

2019-01-21 Thread Xiaowei Jiang
 Thanks Stephan! We are hoping to make the process as non-disruptive as 
possible to the Flink community. Making the Blink codebase public is the first 
step that hopefully facilitates further discussions.
Xiaowei

On Monday, January 21, 2019, 11:46:28 AM PST, Stephan Ewen 
 wrote:  
 
 Dear Flink Community!

Some of you may have heard it already from announcements or from a Flink
Forward talk:
Alibaba has decided to open source its in-house improvements to Flink,
called Blink!
First of all, big thanks to team that developed these improvements and made
this
contribution possible!

Blink has some very exciting enhancements, most prominently on the Table
API/SQL side
and the unified execution of these programs. For batch (bounded) data, the
SQL execution
has full TPC-DS coverage (which is a big deal), and the execution is more
than 10x faster
than the current SQL runtime in Flink. Blink has also added support for
catalogs,
improved the failover speed of batch queries and the resource management.
It also
makes some good steps in the direction of more deeply unifying the batch
and streaming
execution.

The proposal is to merge Blink's enhancements into Flink, to give Flink's
SQL/Table API and
execution a big boost in usability and performance.

Just to avoid any confusion: This is not a suggested change of focus to
batch processing,
nor would this break with any of the streaming architecture and vision of
Flink.
This contribution follows very much the principle of "batch is a special
case of streaming".
As a special case, batch makes special optimizations possible. In its
current state,
Flink does not exploit many of these optimizations. This contribution adds
exactly these
optimizations and makes the streaming model of Flink applicable to harder
batch use cases.

Assuming that the community is excited about this as well, and in favor of
these enhancements
to Flink's capabilities, below are some thoughts on how this contribution
and integration
could work.

--- Making the code available ---

At the moment, the Blink code is in the form of a big Flink fork (rather
than isolated
patches on top of Flink), so the integration is unfortunately not as easy
as merging a
few patches or pull requests.

To support a non-disruptive merge of such a big contribution, I believe it
make sense to make
the code of the fork available in the Flink project first.
>From there on, we can start to work on the details for merging the
enhancements, including
the refactoring of the necessary parts in the Flink master and the Blink
code to make a
merge possible without repeatedly breaking compatibility.

The first question is where do we put the code of the Blink fork during the
merging procedure?
My first thought was to temporarily add a repository (like
"flink-blink-staging"), but we could
also put it into a special branch in the main Flink repository.


I will start a separate thread about discussing a possible strategy to
handle and merge
such a big contribution.

Best,
Stephan
  

Re: [DISCUSS] Support Interactive Programming in Flink Table API

2018-11-22 Thread Xiaowei Jiang
Relying on a callback for the temp table for clean up is not very reliable.
There is no guarantee that it will be executed successfully. We may risk
leaks when that happens. I think that it's safer to have an association
between temp table and session id. So we can always clean up temp tables
which are no longer associated with any active sessions.

Regards,
Xiaowei

On Thu, Nov 22, 2018 at 12:55 PM jincheng sun 
wrote:

> Hi Jiangjie&Shaoxuan,
>
> Thanks for initiating this great proposal!
>
> Interactive Programming is very useful and user friendly in case of your
> examples.
> Moreover, especially when a business has to be executed in several stages
> with dependencies,such as the pipeline of Flink ML, in order to utilize the
> intermediate calculation results we have to submit a job by env.execute().
>
> About the `cache()`  , I think is better to named `persist()`, And The
> Flink framework determines whether we internally cache in memory or persist
> to the storage system,Maybe save the data into state backend
> (MemoryStateBackend or RocksDBStateBackend etc.)
>
> BTW, from the points of my view in the future, support for streaming and
> batch mode switching in the same job will also benefit in "Interactive
> Programming",  I am looking forward to your JIRAs and FLIP!
>
> Best,
> Jincheng
>
>
> Becket Qin  于2018年11月20日周二 下午9:56写道:
>
> > Hi all,
> >
> > As a few recent email threads have pointed out, it is a promising
> > opportunity to enhance Flink Table API in various aspects, including
> > functionality and ease of use among others. One of the scenarios where we
> > feel Flink could improve is interactive programming. To explain the
> issues
> > and facilitate the discussion on the solution, we put together the
> > following document with our proposal.
> >
> >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >
> > Feedback and comments are very welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>


Re: [DISCUSS] Long-term goal of making flink-table Scala-free

2018-11-22 Thread Xiaowei Jiang
Hi Timo, thanks for driving this! I think that this is a nice thing to do.
While we are doing this, can we also keep in mind that we want to
eventually have a TableAPI interface only module which users can take
dependency on, but without including any implementation details?

Xiaowei

On Thu, Nov 22, 2018 at 6:37 PM Fabian Hueske  wrote:

> Hi Timo,
>
> Thanks for writing up this document.
> I like the new structure and agree to prioritize the porting of the
> flink-table-common classes.
> Since flink-table-runtime is (or should be) independent of the API and
> planner modules, we could start porting these classes once the code is
> split into the new module structure.
> The benefits of a Scala-free flink-table-runtime would be a Scala-free
> execution Jar.
>
> Best, Fabian
>
>
> Am Do., 22. Nov. 2018 um 10:54 Uhr schrieb Timo Walther <
> twal...@apache.org
> >:
>
> > Hi everyone,
> >
> > I would like to continue this discussion thread and convert the outcome
> > into a FLIP such that users and contributors know what to expect in the
> > upcoming releases.
> >
> > I created a design document [1] that clarifies our motivation why we
> > want to do this, how a Maven module structure could look like, and a
> > suggestion for a migration plan.
> >
> > It would be great to start with the efforts for the 1.8 release such
> > that new features can be developed in Java and major refactorings such
> > as improvements to the connectors and external catalog support are not
> > blocked.
> >
> > Please let me know what you think.
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1PPo6goW7tOwxmpFuvLSjFnx7BF8IVz0w3dcmPPyqvoY/edit?usp=sharing
> >
> >
> > Am 02.07.18 um 17:08 schrieb Fabian Hueske:
> > > Hi Piotr,
> > >
> > > thanks for bumping this thread and thanks for Xingcan for the comments.
> > >
> > > I think the first step would be to separate the flink-table module into
> > > multiple sub modules. These could be:
> > >
> > > - flink-table-api: All API facing classes. Can be later divided further
> > > into Java/Scala Table API/SQL
> > > - flink-table-planning: involves all planning (basically everything we
> do
> > > with Calcite)
> > > - flink-table-runtime: the runtime code
> > >
> > > IMO, a realistic mid-term goal is to have the runtime module and
> certain
> > > parts of the planning module ported to Java.
> > > The api module will be much harder to port because of several
> > dependencies
> > > to Scala core classes (the parser framework, tree iterations, etc.).
> I'm
> > > not saying we should not port this to Java, but it is not clear to me
> > (yet)
> > > how to do it.
> > >
> > > I think flink-table-runtime should not be too hard to port. The code
> does
> > > not make use of many Scala features, i.e., it's writing very Java-like.
> > > Also, there are not many dependencies and operators can be individually
> > > ported step-by-step.
> > > For flink-table-planning, we can have certain packages that we port to
> > Java
> > > like planning rules or plan nodes. The related classes mostly extend
> > > Calcite's Java interfaces/classes and would be natural choices for
> being
> > > ported. The code generation classes will require more effort to port.
> > There
> > > are also some dependencies in planning on the api module that we would
> > need
> > > to resolve somehow.
> > >
> > > For SQL most work when adding new features is done in the planning and
> > > runtime modules. So, this separation should already reduce
> "technological
> > > dept" quite a lot.
> > > The Table API depends much more on Scala than SQL.
> > >
> > > Cheers, Fabian
> > >
> > >
> > >
> > > 2018-07-02 16:26 GMT+02:00 Xingcan Cui :
> > >
> > >> Hi all,
> > >>
> > >> I also think about this problem these days and here are my thoughts.
> > >>
> > >> 1) We must admit that it’s really a tough task to interoperate with
> Java
> > >> and Scala. E.g., they have different collection types (Scala
> collections
> > >> v.s. java.util.*) and in Java, it's hard to implement a method which
> > takes
> > >> Scala functions as parameters. Considering the major part of the code
> > base
> > >> is implemented in Java, +1 for this goal from a long-term view.
> > >>
> > >> 2) The ideal solution would be to just expose a Scala API and make all
> > the
> > >> other parts Scala-free. But I am not sure if it could be achieved even
> > in a
> > >> long-term. Thus as Timo suggested, keep the Scala codes in
> > >> "flink-table-core" would be a compromise solution.
> > >>
> > >> 3) If the community makes the final decision, maybe any new features
> > >> should be added in Java (regardless of the modules), in order to
> prevent
> > >> the Scala codes from growing.
> > >>
> > >> Best,
> > >> Xingcan
> > >>
> > >>
> > >>> On Jul 2, 2018, at 9:30 PM, Piotr Nowojski 
> > >> wrote:
> > >>> Bumping the topic.
> > >>>
> > >>> If we want to do this, the sooner we decide, the less code we will
> have
> > >> to rewrite. I have some objections/c

Re: [DISCUSS] Table API Enhancement Outline

2018-11-18 Thread Xiaowei Jiang
shorter, more explicit, and better aligned
> > with the regular window.groupBy.select aggregations that are supported
> > today.
> >
> > tab.window(Tumble ... as 'w)
> >.groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >.agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
> >
> >
> > Best, Fabian
> >
> > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
> > sunjincheng...@gmail.com>:
> >
> >> Hi Fabian/Xiaowei,
> >>
> >> I am very sorry for my late reply! Glad to see your reply, and sounds
> >> pretty good!
> >> I agree that the approach with append() which can clearly defined the
> >> result schema is better which Fabian mentioned.
> >> In addition and append() and also contains non-time attributes, e.g.:
> >>
> >>tab('name, 'age, 'address, 'rowtime)
> >>tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
> >> 'address, 'rowtime)
> >>.window(Tumble over 5.millis on 'rowtime as 'w)
> >>.groupBy('w, 'address)
> >>
> >> In this way the append() is very useful, and the behavior is very
> similar
> >> to withForwardedFields() in DataSet.
> >> So +1 to using append() approach for the map()&flatmap()!
> >>
> >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
> >> Xiaowei's approach that define the keys to be implied in the result
> table
> >> and appears at the beginning, for example as follows:
> >>  tab.window(Tumble ... as 'w)
> >>.groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >>.agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> >>.select('k1, 'col1, 'w.rowtime as 'rtime)
> >>
> >> What to you think? @Fabian @Xiaowei
> >>
> >> Thanks,
> >> Jincheng
> >>
> >> Fabian Hueske  于2018年11月9日周五 下午6:35写道:
> >>
> >>> Hi Jincheng,
> >>>
> >>> Thanks for the summary!
> >>> I like the approach with append() better than the implicit forwarding
> as
> >> it
> >>> clearly indicates which fields are forwarded.
> >>> However, I don't see much benefit over the flatMap(Expression*)
> variant,
> >> as
> >>> we would still need to analyze the full expression tree to ensure that
> at
> >>> most (or exactly?) one Scalar / TableFunction is used.
> >>>
> >>> Best,
> >>> Fabian
> >>>
> >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
> >>> sunjincheng...@gmail.com>:
> >>>
> >>>> Hi all,
> >>>>
> >>>> We are discussing very detailed content about this proposal. We are
> >>> trying
> >>>> to design the API in many aspects (functionality, compatibility, ease
> >> of
> >>>> use, etc.). I think this is a very good process. Only such a detailed
> >>>> discussion, In order to develop PR more clearly and smoothly in the
> >> later
> >>>> stage. I am very grateful to @Fabian and  @Xiaowei for sharing a lot
> of
> >>>> good ideas.
> >>>> About the definition of method signatures I want to share my points
> >> here
> >>>> which I am discussing with fabian in google doc (not yet completed),
> as
> >>>> follows:
> >>>>
> >>>> Assume we have a table:
> >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string,
> >>>> 'proctime.proctime)
> >>>>
> >>>> Approach 1:
> >>>> case1: Map follows Source Table
> >>>> val result =
> >>>>  tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied
> >> in
> >>>> the output
> >>>>  .window(Tumble over 5.millis on 'proctime as 'w)
> >>>>
> >>>> case2: FatAgg follows Window (Fabian mentioned above)
> >>>> val result =
> >>>>tab.window(Tumble ... as 'w)
> >>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >>>>   .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
> >>>>   .select('k1, 'col1,

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-11-17 Thread Xiaowei Jiang
Thanks Xuefu for the detailed design doc! One question on the properties
associated with the catalog objects. Are we going to leave them completely
free form or we are going to set some standard for that? I think that the
answer may depend on if we want to explore catalog specific optimization
opportunities. In any case, I think that it might be helpful for
standardize as much as possible into strongly typed classes and use leave
these properties for catalog specific things. But I think that we can do it
in steps.

Xiaowei

On Fri, Nov 16, 2018 at 4:00 AM Bowen Li  wrote:

> Thanks for keeping on improving the overall design, Xuefu! It looks quite
> good to me now.
>
> Would be nice that cc-ed Flink committers can help to review and confirm!
>
>
>
> One minor suggestion: Since the last section of design doc already touches
> some new sql statements, shall we add another section in our doc and
> formalize the new sql statements in SQL Client and TableEnvironment that
> are gonna come along naturally with our design? Here are some that the
> design doc mentioned and some that I came up with:
>
> To be added:
>
>- USE  - set default catalog
>- USE  - set default schema
>- SHOW CATALOGS - show all registered catalogs
>- SHOW SCHEMAS [FROM catalog] - list schemas in the current default
>catalog or the specified catalog
>- DESCRIBE VIEW view - show the view's definition in CatalogView
>- SHOW VIEWS [FROM schema/catalog.schema] - show views from current or a
>specified schema.
>
>(DDLs that can be addressed by either our design or Shuyi's DDL design)
>
>- CREATE/DROP/ALTER SCHEMA schema
>- CREATE/DROP/ALTER CATALOG catalog
>
> To be modified:
>
>- SHOW TABLES [FROM schema/catalog.schema] - show tables from current or
>a specified schema. Add 'from schema' to existing 'SHOW TABLES'
> statement
>- SHOW FUNCTIONS [FROM schema/catalog.schema] - show functions from
>current or a specified schema. Add 'from schema' to existing 'SHOW
> TABLES'
>statement'
>
>
> Thanks, Bowen
>
>
>
> On Wed, Nov 14, 2018 at 10:39 PM Zhang, Xuefu 
> wrote:
>
> > Thanks, Bowen, for catching the error. I have granted comment permission
> > with the link.
> >
> > I also updated the doc with the latest class definitions. Everyone is
> > encouraged to review and comment.
> >
> > Thanks,
> > Xuefu
> >
> > --
> > Sender:Bowen Li 
> > Sent at:2018 Nov 14 (Wed) 06:44
> > Recipient:Xuefu 
> > Cc:piotr ; dev ; Shuyi
> > Chen 
> > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> >
> > Hi Xuefu,
> >
> > Currently the new design doc
> > <
> https://docs.google.com/document/d/1Y9it78yaUvbv4g572ZK_lZnZaAGjqwM_EhjdOv4yJtw/edit
> >
> > is on “view only" mode, and people cannot leave comments. Can you please
> > change it to "can comment" or "can edit" mode?
> >
> > Thanks, Bowen
> >
> >
> > On Mon, Nov 12, 2018 at 9:51 PM Zhang, Xuefu 
> > wrote:
> > Hi Piotr
> >
> > I have extracted the API portion of  the design and the google doc is
> here
> > <
> https://docs.google.com/document/d/1Y9it78yaUvbv4g572ZK_lZnZaAGjqwM_EhjdOv4yJtw/edit?usp=sharing
> >.
> > Please review and provide your feedback.
> >
> > Thanks,
> > Xuefu
> >
> > --
> > Sender:Xuefu 
> > Sent at:2018 Nov 12 (Mon) 12:43
> > Recipient:Piotr Nowojski ; dev <
> > dev@flink.apache.org>
> > Cc:Bowen Li ; Shuyi Chen 
> > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> >
> > Hi Piotr,
> >
> > That sounds good to me. Let's close all the open questions ((there are a
> > couple of them)) in the Google doc and I should be able to quickly split
> > it into the three proposals as you suggested.
> >
> > Thanks,
> > Xuefu
> >
> > --
> > Sender:Piotr Nowojski 
> > Sent at:2018 Nov 9 (Fri) 22:46
> > Recipient:dev ; Xuefu 
> > Cc:Bowen Li ; Shuyi Chen 
> > Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> >
> > Hi,
> >
> >
> > Yes, it seems like the best solution. Maybe someone else can also
> suggests if we can split it further? Maybe changes in the interface in one
> doc, reading from hive meta store another and final storing our meta
> informations in hive meta store?
> >
> > Piotrek
> >
> > > On 9 Nov 2018, at 01:44, Zhang, Xuefu  wrote:
> > >
> > > Hi Piotr,
> > >
> > > That seems to be good idea!
> > >
> >
> > > Since the google doc for the design is currently under extensive
> review, I will leave it as it is for now. However, I'll convert it to two
> different FLIPs when the time comes.
> > >
> > > How does it sound to you?
> > >
> > > Thanks,
> > > Xuefu
> > >
> > >
> > > --
> > > Sender:Piotr Nowojski 
> > > Sent at:2018 Nov 9 (Fri) 02:31
> > > Recipient:dev 
> > > Cc:Bowen Li ; Xuefu  > >; Shuyi Chen 
> > > Subject:Re: [DISCUS

Re: [DISCUSS] Task speculative execution for Flink batch

2018-11-17 Thread Xiaowei Jiang
Thanks Yangyu for the nice design doc! One thing to consider is the
granularity of speculation. Multiple task may propagate data through
pipeline mode. In such case, fixing a single task may not be enough. But
you might be able to fix this problem by increasing the granularity of
speculation. The traditional case of a single speculative task can be
considered as a special case of this.

Xiaowei

On Sat, Nov 17, 2018 at 10:27 PM Tao Yangyu  wrote:

> Hi all,
>
> After refined, the detailed design doc is here:
>
> https://docs.google.com/document/d/1X_Pfo4WcO-TEZmmVTTYNn44LQg5gnFeeaeqM7ZNLQ7M/edit?usp=sharing
>
> Your kind reviews and comments are very appreciated and will help so much
> the feature to be completed.
>
> Best,
> Ryan
>
>
> Tao Yangyu  于2018年11月7日周三 下午4:49写道:
>
> > Thanks so much for your all feedbacks!
> >
> > Yes, as mentioned above by Jin Sun, the design currently targets batch to
> > explore the general framework and basic modules. The strategy could be
> also
> > applied to stream with some extended code, for example, the result
> > commitment.
> >
> > Jin Sun  于2018年11月7日周三 上午8:38写道:
> >
> >> I think this is target for batch at the very beginning, the idea should
> >> be also work for both case, with different algorithm/strategy.
> >>
> >> Ryan, since you are working on this, I will assign FLINK-10644 <
> >> https://issues.apache.org/jira/browse/FLINK-10644> to you.
> >>
> >> Jin
> >>
> >> > On Nov 6, 2018, at 4:45 AM, Till Rohrmann 
> wrote:
> >> >
> >> > Thanks for starting this discussion Ryan. I'm looking forward to your
> >> > design document about this feature. Quick question: Will it be a batch
> >> only
> >> > feature? If no, then it needs to take checkpointing into account as
> >> well.
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Tue, Nov 6, 2018 at 4:29 AM zhijiang  >> .invalid>
> >> > wrote:
> >> >
> >> >> Thanks yangyu for launching this discussion.
> >> >>
> >> >> I really like this proposal. We ever found this scene frequently that
> >> some
> >> >> long tail tasks to delay the total batch job execution time in
> >> production.
> >> >> We also have some thoughts for bringing this mechanism. Looking
> >> forward to
> >> >> your detail design doc, then we can discussion further.
> >> >>
> >> >> Best,
> >> >> Zhijiang
> >> >> --
> >> >> 发件人:Tao Yangyu 
> >> >> 发送时间:2018年11月6日(星期二) 11:01
> >> >> 收件人:dev 
> >> >> 主 题:[DISCUSS] Task speculative execution for Flink batch
> >> >>
> >> >> Hi everyone,
> >> >>
> >> >> We propose task speculative execution for Flink batch in this message
> >> as
> >> >> follows.
> >> >>
> >> >> In the batch mode, the job is usually divided into multiple parallel
> >> tasks
> >> >> executed cross many nodes in the cluster. It is common to encounter
> the
> >> >> performance degradation on some nodes due to hardware problems or
> >> accident
> >> >> I/O busy and high CPU load. This kind of degradation can probably
> >> cause the
> >> >> running tasks on the node to be quite slow that is so called long
> tail
> >> >> tasks. Although the long tail tasks will not fail, they can severely
> >> affect
> >> >> the total job running time. Flink task scheduler does not take this
> >> long
> >> >> tail problem into account currently.
> >> >>
> >> >>
> >> >>
> >> >> Here we propose the speculative execution strategy to handle the
> >> problem.
> >> >> The basic idea is to run a copy of task on another node when the
> >> original
> >> >> task is identified to be long tail. In more details, the speculative
> >> task
> >> >> will be triggered when the scheduler detects that the data processing
> >> >> throughput of a task is much slower than others. The speculative task
> >> is
> >> >> executed in parallel with the original one and share the same failure
> >> retry
> >> >> mechanism. Once either task complete, the scheduler admits its output
> >> as
> >> >> the final result and cancel the other running one. The preliminary
> >> >> experiments has demonstrated the effectiveness.
> >> >>
> >> >>
> >> >> The detailed design doc will be ready soon.  Your reviews and
> comments
> >> will
> >> >> be much appreciated.
> >> >>
> >> >>
> >> >> Thanks!
> >> >>
> >> >> Ryan
> >> >>
> >> >>
> >>
> >>
>


Re: Flink sql joined with dimtable from mysql

2018-11-13 Thread Xiaowei Jiang
It was not super clean on what you did. But from your description, the join
was not correct initially because existing MySQL data was not seen by Flink
yet. Later when updates are seen by Flink, the result will be correct. A
better place for such question is probably on the user mailing list.

Xiaowei

On Tue, Nov 13, 2018 at 6:08 PM yelun <986463...@qq.com> wrote:

> hi,
>
> I want to use flink sql to left join static dimension table from mysql
> currently, so I converted the mysql table into data stream to join with
> DataStream which has converted to flink table. While I found that the
> real-time stream data is not joined correctly with mysql data  at the
> beginning, but the latter stream can be joined correctly. So I want to ask
> that is there any good way to make real-time stream can join with mysql
> data which has loaded and supporting dynamicly loading mysql data into
> memory once each hour. Thanks a lot.


Re: [DISCUSS] Table API Enhancement Outline

2018-11-07 Thread Xiaowei Jiang
Hi Fabian,

I think that the key question you raised is if we allow extra parameters in
the methods map/flatMap/agg/flatAgg. I can see why allowing that may appear
more convenient in some cases. However, it might also cause some confusions
if we do that. For example, do we allow multiple UDFs in these expressions?
If we do, the semantics may be weird to define, e.g. what does
table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though
not allowing it may appear less powerful, but it can make things more
intuitive too. In the case of agg/flatAgg, we can define the keys to be
implied in the result table and appears at the beginning. You can use a
select method if you want to modify this behavior. I think that eventually
we will have some API which allows other expressions as additional
parameters, but I think it's better to do that after we introduce the
concept of nested tables. A lot of things we suggested here can be
considered as special cases of that. But things are much simpler if we
leave that to later.

Regards,
Xiaowei

On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske  wrote:

> Hi,
>
> * Re emit:
> I think we should start with a well understood semantics of full
> replacement. This is how the other agg functions work.
> As was said before, there are open questions regarding an append mode
> (checkpointing, whether supporting retractions or not and if yes how to
> declare them, ...).
> Since this seems to be an optimization, I'd postpone it.
>
> * Re grouping keys:
> I don't think we should automatically add them because the result schema
> would not be intuitive.
> Would they be added at the beginning of the tuple or at the end? What
> metadata fields of windows would be added? In which order would they be
> added?
>
> However, we could support syntax like this:
> val t: Table = ???
> t
>   .window(Tumble ... as 'w)
>   .groupBy('a, 'b)
>   .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as 'rtime)
>
> The result schema would be clearly defined as [b, a, f1, f2, ..., fn, wend,
> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
>
> * Re Multi-staged evaluation:
> I think this should be an optimization that can be applied if the UDF
> implements the merge() method.
>
> Best, Fabian
>
> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
> wshaox...@gmail.com
> >:
>
> > Hi xiaowei,
> >
> > Yes, I agree with you that the semantics of TableAggregateFunction emit
> is
> > much more complex than AggregateFunction. The fundamental difference is
> > that TableAggregateFunction emits a "table" while AggregateFunction
> outputs
> > (a column of) a "row". In the case of AggregateFunction it only has one
> > mode which is “replacing” (complete update). But for
> > TableAggregateFunction, it could be incremental (only emit the new
> updated
> > results) update or complete update (always emit the entire table when
> > “emit" is triggered).  From the performance perspective, we might want to
> > use incremental update. But we need review and design this carefully,
> > especially taking into account the cases of the failover (instead of just
> > back-up the ACC it may also needs to remember the emit offset) and
> > retractions, as the semantics of TableAggregateFunction emit are
> different
> > than other UDFs. TableFunction also emits a table, but it does not need
> to
> > worry this due to the nature of stateless.
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang  wrote:
> >
> > > Hi Jincheng,
> > >
> > > Thanks for adding the public interfaces! I think that it's a very good
> > > start. There are a few points that we need to have more discussions.
> > >
> > >- TableAggregateFunction - this is a very complex beast, definitely
> > the
> > >most complex user defined objects we introduced so far. I think
> there
> > > are
> > >quite some interesting questions here. For example, do we allow
> > >multi-staged TableAggregate in this case? What is the semantics of
> > > emit? Is
> > >it amendments to the previous output, or replacing it? I think that
> > this
> > >subject itself is worth a discussion to make sure we get the details
> > > right.
> > >- GroupedTable.agg - does the group keys automatically appear in the
> > >output? how about the case of windowing aggregation?
> > >
> > > Regards,
> > > Xiaowei
> > >
> > > On Tue, Nov 

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

2018-11-07 Thread Xiaowei Jiang
Hi Piotr:

I want to clarify one thing first: I think that we will keep the
interoperability between TableAPI and DataStream in any case. So user can
switch between the two whenever needed. Given that, it would still be very
helpful that users can use one API to achieve most of what they do.
Currently, TableAPI/SQL is good enough for most data analytics kind of
scenarios, but there are some limitations that when removed will help we go
even further in this direction. An initial list of these is provided in
another thread. These are naturally extensions to TableAPI which we need to
do just for the sake of making TableAPI more usable.

TableAPI and SQL share the same underlying implementation, so enhancement
in one will end up helping the other. I don't see them as competitive.
TableAPI is easier to extend because that we have a bit more freedom in
adding new functionalities. In reality, TableAPI can be mixed with SQL as
well.

On the implementation side, I agree that Table API/SQL and DataStream
should try to share as much as possible. But that question is orthogonal to
the API discussion.

This thread is meant to enhancing the functionalities of TableAPI. I don't
think that anyone is suggesting either reducing the effort in SQL or
DataStream. So let's focus on how we can enhance TableAPI.

Regards,
Xiaowei


Re: [DISCUSS] Enhancing the functionality and productivity of Table API

2018-11-06 Thread Xiaowei Jiang
Hi Fabian,

I totally agree with you that we should incrementally improve TableAPI. We
don't suggest that we do anything drastic such as replacing DataSet API
yet. We should see how much we can achieve by extending TableAPI cleanly.
By then, we should see if there are any natural boundaries on how far we
can get. Hopefully it should be much easier for the community to reach
consensus at that point.

As you pointed out, the APIs in DataSet have mixed flavors. Some of them
are logical while others are very physical (e.g., MapPartition). We should
really think what the use cases for such physical API are. Is it possible
that their existence points to limitations in our current optimizations? If
we do better in our optimizations, can we use more logical APIs to achieve
the task? We did some investigations for the "hard" APIs in DataSet before.
We will post some of the lessons we learned and hope to get feedback from
you guys.

Iteration is a very complex and interesting topic. I think that we can live
with optimizations stopping at the boundary of iterations (at least
initially).

Regards,
Xiaowei

On Tue, Nov 6, 2018 at 6:21 PM Fabian Hueske  wrote:

> Thanks for the replies Xiaowei and others!
>
> You are right, I did not consider the batch optimization that would be
> missing if the DataSet API would be ported to extend the DataStream API.
> By extending the scope of the Table API, we can gain a holistic logical &
> physical optimization which would be great!
> Is your plan to move all DataSet API functionality into the Table API?
> If so, do you envision any batch-related API in DataStream at all or should
> this be done by converting a batch table to DataStream? I'm asking because
> if there would be batch features in DataStream, we would need some
> optimization there as well.
>
> I think the proposed separation of Table API (stateless APIs) and
> DataStream (APIs that expose state handling) is a good idea.
> On a side note, the DataSet API discouraged state handing in user function,
> so porting this Table API would be quite "natural".
>
> As I said before, I like that we can incrementally extend the Table API.
> Map and FlatMap functions do not seem too difficult.
> Reduce, GroupReduce, Combine, GroupCombine, MapPartition might be more
> tricky, esp. if we want to support retractions.
> Iterations should be a challenge. I assume that Calcite does not support
> iterations, so we probably need to split query / program and optimize parts
> separately (IIRC, this is also how Flink's own optimizer handles this).
> To what extend are you planning to support explicit physical operations
> like partitioning, sorting or optimizer hints?
>
> I haven't had a look in the design document that you shared. Probably, I
> find answers to some of my questions there ;-)
>
> Regarding the question of SQL or Table API, I agree that extending the
> scope of the Table API does not limit the scope for SQL.
> By adding more operations to the Table API we can expand it to use case
> that are not well-served by SQL.
> As others have said, we'll of course continue to extend and improve Flink's
> SQL support (within the bounds of the standard).
>
> Best, Fabian
>
> Am Di., 6. Nov. 2018 um 10:09 Uhr schrieb jincheng sun <
> sunjincheng...@gmail.com>:
>
> > Hi Jark,
> > Glad to see your feedback!
> > That's Correct, The proposal is aiming to extend the functionality for
> > Table API! I like add "drop" to fit the use case you mentioned. Not only
> > that, if a 100-columns Table. and our UDF needs these 100 columns, we
> don't
> > want to define the eval as eval(column0...column99), we prefer to define
> > eval as eval(Row)。Using it like this: table.select(udf (*)). All we also
> > need to consider if we put the columns package as a row. In a scenario
> like
> > this, we have Classification it as cloumn operation, and  list the
> changes
> > to the column operation after the map/flatMap/agg/flatAgg phase is
> > completed. And Currently,  Xiaowei has started a threading outlining
> which
> > talk about what we are proposing. Please see the detail in the mail
> thread:
> > Please see the detail in the mail thread:
> >
> >
> https://mail.google.com/mail/u/0/#search/xiaowei/FMfcgxvzLWzfvCnmvMzzSfxHTSfdwLkB
> > <
> >
> https://mail.google.com/mail/u/0/#search/xiaowei/FMfcgxvzLWzfvCnmvMzzSfxHTSfdwLkB
> > >
> >  .
> >
> > At this stage the Table API Enhancement Outline as follows:
> >
> >
> https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit?usp=sharing
> >
> > Please let we know if you have further thoughts or feedback!
> >
> > Thanks,
> > Jincheng
> >
> >
> > Jark Wu  于2018年11月6日周二 下午3:35写道:
> >
> > > Hi jingcheng,
> > >
> > > Thanks for your proposal. I think it is a helpful enhancement for
> > TableAPI
> > > which is a solid step forward for TableAPI.
> > > It doesn't weaken SQL or DataStream, because the conversion between
> > > DataStream and Table still works.
> > > People with advanced cases (e.g. co

Re: [DISCUSS] Table API Enhancement Outline

2018-11-06 Thread Xiaowei Jiang
Hi Jincheng,

Thanks for adding the public interfaces! I think that it's a very good
start. There are a few points that we need to have more discussions.

   - TableAggregateFunction - this is a very complex beast, definitely the
   most complex user defined objects we introduced so far. I think there are
   quite some interesting questions here. For example, do we allow
   multi-staged TableAggregate in this case? What is the semantics of emit? Is
   it amendments to the previous output, or replacing it? I think that this
   subject itself is worth a discussion to make sure we get the details right.
   - GroupedTable.agg - does the group keys automatically appear in the
   output? how about the case of windowing aggregation?

Regards,
Xiaowei

On Tue, Nov 6, 2018 at 6:25 PM jincheng sun 
wrote:

> Hi, Xiaowei,
>
> Thanks for bring up the discuss of Table API Enhancement Outline !
>
> I quickly looked at the overall content, these are good expressions of our
> offline discussions. But from the points of my view, we should add the
> usage of public interfaces that we will introduce in this propose.  So, I
> added the following usage description of  interface and operators  in
> google doc:
>
> 1. Map Operator
> Map operator is a new operator of Table, Map operator can apply a
> scalar function, and can return multi-column. The usage as follows:
>
>   val res = tab
>  .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
>  .select(‘a, ‘c)
>
> 2. FlatMap Operator
> FaltMap operator is a new operator of Table, FlatMap operator can apply
> a table function, and can return multi-row. The usage as follows:
>
>   val res = tab
>   .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
>   .select(‘a, ‘c)
>
> 3. Agg Operator
> Agg operator is a new operator of Table/GroupedTable, Agg operator can
> apply a aggregate function, and can return multi-column. The usage as
> follows:
>
>val res = tab
>   .groupBy(‘a) // leave groupBy-Clause out to define global aggregates
>   .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
>   .select(‘a, ‘c)
>
> 4.  FlatAgg Operator
> FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg
> operator can apply a table aggregate function, and can return multi-row.
> The usage as follows:
>
> val res = tab
>.groupBy(‘a) // leave groupBy-Clause out to define global table
> aggregates
>.flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
>.select(‘a, ‘c)
>
>   5. TableAggregateFunction
>  The behavior of table aggregates is most like GroupReduceFunction did,
> which computed for a group of elements, and output  a group of elements.
> The TableAggregateFunction can be applied on GroupedTable.flatAgg() . The
> interface of TableAggregateFunction has a lot of content, so I don't copy
> it here, Please look at the detail in google doc:
>
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>
> I will be very appreciate to anyone for reviewing and commenting.
>
> Best,
> Jincheng
>


[DISCUSS] Table API Enhancement Outline

2018-11-05 Thread Xiaowei Jiang
Hi All,

As Jincheng brought up in the previous email, there are a set of
improvements needed to make Table API more complete/self-contained. To give
a better overview on this, Jincheng, Jiangjie, Shaoxuan and myself
discussed offline a bit and came up with an initial outline.

Table API Enhancement Outline


Please take a look and your comments are welcome!

Regards,
Xiaowei


Re: [DISCUSS] Enhancing the functionality and productivity of Table API

2018-11-05 Thread Xiaowei Jiang
Hi Fabian, these are great questions! I have some quick thoughts on some of
these.

Optimization opportunities: I think that you are right UDFs are more like
blackboxes today. However this can change if we let user develop UDFs
symbolically in the future (i.e., Flink will look inside the UDF code,
understand it and potentially do codegen to execute it). This will open the
door for potential optimizations.

Moving batch functionality to DataStream: I actually think that moving
batch functionality to Table API is probably a better idea. Currently,
DataStream API is very deterministic and imperative. On the other hand,
DataSet API has an optimizer and can choose very different execution plans.
Another distinguishing feature of DataStream API is that users get direct
access to state/statebackend which we intensionally avoided in Table API so
far. The introduction of states is probably the biggest innovation by
Flink. At the same time, abusing states may also be the largest source for
reliability/performance issues. Shielding users away from dealing with
state directly is a key advantage in Table API. I think this is probably a
good boundary between DataStream and Table API. If users HAVE to manipulate
state explicitly, go with DataStream, otherwise, go with Table API.

Because Flink is extending into more and more scenarios (e.g., batch,
streaming & micro-service), we may inevitably end up with multiple APIs. It
appears that data analytics (batch & streaming) related applications can be
well served with Table API which not only unifies batch and streaming, but
also relieves the users from dealing with states explicitly. On the other
hand, DataStream is very convenient and powerful for micro-service kind of
applications because explicitly state access may be necessary in such cases.

We will start a threading outlining what we are proposing in Table API.

Regards,
Xiaowei

On Mon, Nov 5, 2018 at 7:03 PM Fabian Hueske  wrote:

> Hi Jincheng,
>
> Thanks for this interesting proposal.
> I like that we can push this effort forward in a very fine-grained manner,
> i.e., incrementally adding more APIs to the Table API.
>
> However, I also have a few questions / concerns.
> Today, the Table API is tightly integrated with the DataSet and DataStream
> APIs. It is very easy to convert a Table into a DataSet or DataStream and
> vice versa. This mean it is already easy to combine custom logic an
> relational operations. What I like is that several aspects are clearly
> separated like retraction and timestamp handling (see below) + all
> libraries on DataStream/DataSet can be easily combined with relational
> operations.
> I can see that adding more functionality to the Table API would remove the
> distinction between DataSet and DataStream. However, wouldn't we get a
> similar benefit by extending the DataStream API for proper support for
> bounded streams (as is the long-term goal of Flink)?
> I'm also a bit skeptical about the optimization opportunities we would
> gain. Map/FlatMap UDFs are black boxes that cannot be easily removed
> without additional information (I did some research on this a few years ago
> [1]).
>
> Moreover, I think there are a few tricky details that need to be resolved
> to enable a good integration.
>
> 1) How to deal with retraction messages? The DataStream API does not have a
> notion of retractions. How would a MapFunction or FlatMapFunction handle
> retraction? Do they need to be aware of the change flag? Custom windowing
> and aggregation logic would certainly need to have that information.
> 2) How to deal with timestamps? The DataStream API does not give access to
> timestamps. In the Table API / SQL these are exposed as regular attributes.
> How can we ensure that timestamp attributes remain valid (i.e. aligned with
> watermarks) if the output is produced by arbitrary code?
> There might be more issues of this kind.
>
> My main question would be how much would we gain with this proposal over a
> tight integration of Table API and DataStream API, assuming that batch
> functionality is moved to DataStream?
>
> Best, Fabian
>
> [1] http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf
>
>
> Am Mo., 5. Nov. 2018 um 02:49 Uhr schrieb Rong Rong :
>
> > Hi Jincheng,
> >
> > Thank you for the proposal! I think being able to define a process /
> > co-process function in table API definitely opens up a whole new level of
> > applications using a unified API.
> >
> > In addition, as Tzu-Li and Hequn have mentioned, the benefit of
> > optimization layer of Table API will already bring in additional benefit
> > over directly programming on top of DataStream/DataSet API. I am very
> > interested an looking forward to seeing the support for more complex use
> > cases, especially iterations. It will enable table API to define much
> > broader, event-driven use cases such as real-time ML prediction/training.
> >
> > As Timo mentioned, This will make Table API diverge from the SQL API. But
> > a

Re: [DISCUSS] Release 1.3.0 RC1 (Non voting, testing release candidate)

2017-05-19 Thread Xiaowei Jiang
Hi Robert,

I did the following checks and found no issues:

  - Check if checksums and GPG files match the corresponding release files
  - Verify that the source archives do not contain any binaries
  - Check if the source release is building properly with Maven (including
license header check and checkstyle). Also the tests should be executed
(mvn clean verify).
  - Check build for custom Hadoop version (2.3.0, 2.4.1, 2.6.3, 2.7.2)
  - Check build for Scala 2.11
  - Check that the README.md file is meaningful

thanks
Xiaowei

On Fri, May 19, 2017 at 6:29 PM, Chesnay Schepler 
wrote:

> Whoops, this is the PR for enabling the test:
> https://github.com/apache/flink/pull/3844
>
>
> On 19.05.2017 12:14, Robert Metzger wrote:
>
>> Thank you for all your input.
>>
>> @Chesnay, in your email you are pointing to the same PR twice:
>> This PR fixes the compilation on Windows:  (reviewed once, most recent
>> changes not reviewed)
>> https://github.com/apache/flink/pull/3854
>> This PR enables a test for savepoint compatibility: (nice to have, easy to
>> review)
>> https://github.com/apache/flink/pull/3854
>>
>> Also the "should define more than one task slot" thing is not important
>> IMO.
>>
>> I think the "empty path on windows" thing is not a release blocker.
>>
>> --
>>
>> These are the issues mentioned in the thread that are still open and
>> blockers:
>> - Add nested serializers to config snapshots of composite serializers:
>> https://github.com/apache/flink/pull/3937 has no review yet
>> - FLINK-6610  WebServer
>> could not be created,when set the "jobmanager.web.submit.enable" to false
>> - FLINK-6629 
>> ClusterClient
>> cannot submit jobs to HA cluster if address not set in configuration
>>
>>
>>
>> On Fri, May 19, 2017 at 12:17 AM, Till Rohrmann 
>> wrote:
>>
>> I might have found another blocker:
>>> https://issues.apache.org/jira/browse/FLINK-6629.
>>>
>>> The issue is that the ClusterClient only allows to submit jobs to an HA
>>> cluster if you have specified the JobManager's address in the
>>> flink-conf.yaml or via the command line options. If no address is set,
>>> then
>>> it fails completely. If the wrong address is set, which can easily happen
>>> in an HA setting, then we are not able to find the proper connecting
>>> address for the ActorSystem. This basically voids Flink's HA
>>> capabilities.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, May 18, 2017 at 10:23 PM, Chesnay Schepler 
>>> wrote:
>>>
>>> The test document says that the default flink-conf.yml "should define

>>> more
>>>
 than one task slot", but it currently configures exactly 1 task slot.
 Not
 sure if it is a typo in the doc though.


 On 18.05.2017 22:10, Chesnay Schepler wrote:

 The start-cluster.sh script failed for me on Windows when executed in a
> directory containing spaces.
>
> On 18.05.2017 20:47, Chesnay Schepler wrote:
>
> FLINK-6610 should also be fixed; it is currently not possible to
>>
> disable
>>>
 web-submissions.
>>
>> On 18.05.2017 18:13, jincheng sun wrote:
>>
>> Hi Robert,
>>> I have some checks to do and some test improve PRs (
>>> https://issues.apache.org/jira/browse/FLINK-6619) need be done soon.
>>>
>>> Best,
>>> SunJincheng
>>>
>>> 2017-05-18 22:17 GMT+08:00 Greg Hogan :
>>>
>>> The following tickets for 1.3.0 have a PR in need of review:
>>>
 [FLINK-6582] [docs] Project from maven archetype is not buildable by
 default
 [FLINK-6616] [docs] Clarify provenance of official Docker images


 On May 18, 2017, at 5:40 AM, Fabian Hueske 

>>> wrote:
>>>
 I have a couple of PRs ready with bugfixes that I'll try to get in
>
 as
>>>
 well.

 Should be done soon.
>
> 2017-05-18 11:24 GMT+02:00 Till Rohrmann :
>
> I'd like to get a fix in for
>
>> https://issues.apache.org/jira/browse/FLINK-6612. This can
>>
> basically
>>>
 thwart
>> Flink's recovery capabilities.
>>
>> On Thu, May 18, 2017 at 11:13 AM, Chesnay Schepler <
>> ches...@apache.org>
>> wrote:
>>
>> This PR reduces logging noise a bit: (got +1 to merge)
>>
>>> https://github.com/apache/flink/pull/3917
>>>
>>> This PR fixes the compilation on Windows:  (reviewed once, most
>>> recent
>>> changes not reviewed)
>>> https://github.com/apache/flink/pull/3854
>>>
>>> This PR enables a test for savepoint compatibility: (nice to
>>> have,
>>> easy
>>>
>>> to
>>
>> review)
>>> https://github.com/apache/flink/pull/3854
>>>
>>>

[jira] [Created] (FLINK-6621) Legal check for 1.3.0 RC01 Release

2017-05-17 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-6621:


 Summary: Legal check for 1.3.0 RC01 Release
 Key: FLINK-6621
 URL: https://issues.apache.org/jira/browse/FLINK-6621
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Xiaowei Jiang
Assignee: Xiaowei Jiang


Do pre-checks for the following:
Check if checksums and GPG files match the corresponding release files
Verify that the source archives do not contain any binaries
Check if the source release is building properly with Maven (including license 
header check and checkstyle). Also the tests should be executed (mvn clean 
verify).
Verify that the LICENSE and NOTICE file is correct for the binary and source 
releases
All dependencies must be checked for their license and the license must be ASL 
2.0 compatible
Check that all POM files point to the same version (mostly relevant to examine 
quickstart artifact files)
Read the README.md file



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-23 Thread Xiaowei Jiang
Very nice discussion! The deadlock issue due to back pressure mechanism is 
temporary, which is going to be fixed once Stephan change it to a credit based 
approach. So we probably should not base our proposal on that temporary 
limitation. Once we have that issue fixed, the operator can choose to not pull 
from some input and still not result in deadlock. This approach should in 
general be more performant than buffering the main inputs. If we expose such 
freedom to the operator (i.e. let operator choose when to pull from an input), 
it's also more flexible. For example, the operator can code the logic to decide 
when a side input is ready. Another upside of this blocking approach is that we 
may need to work on buffering.

Regards,
Xiaowei


Add partitionedKeyBy to DataStream

2016-10-20 Thread Xiaowei Jiang
After we do any interesting operations (e.g. reduce) on KeyedStream, the
result becomes DataStream. In a lot of cases, the output still has the same
or compatible keys with the KeyedStream (logically). But to do further
operations on these keys, we are forced to use keyby again. This works
semantically, but is costly in two aspects. First, it destroys the
possibility of chaining, which is one of the most important optimization
technique. Second, keyby will greatly expand the connected components of
tasks, which has implications in failover optimization.

To address this shortcoming, we propose a new operator partitionedKeyBy.

DataStream {
public  KeyedStream partitionedKeyBy(KeySelector key)
}

Semantically, DataStream.partitionedKeyBy(key) is equivalent to
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the
taskid as an extra field. This guarantees that records from different tasks
will never produce the same keys.

With this, it's possible to do

ds.keyBy(key1).reduce(func1)
.partitionedKeyBy(key1).reduce(func2)
.partitionedKeyBy(key2).reduce(func3);

Most importantly, in certain cases, we will be able to chains these into a
single vertex.

Please share your thoughts. The JIRA is at https://issues.apache.org/j
ira/browse/FLINK-4855

Xiaowei


Efficient Batch Operator in Streaming

2016-10-20 Thread Xiaowei Jiang
Very often, it's more efficient to process a batch of records at once
instead of processing them one by one. We can use window to achieve this
functionality. However, window will store all records in states, which can
be costly. It's desirable to have an efficient implementation of batch
operator. The batch operator works per task and behave similarly to aligned
windows. Here is an example of how the interface looks like to a user.

interface BatchFunction {
// add the record to the buffer
// returns if the batch is ready to be flushed
boolean addRecord(T record);

// process all pending records in the buffer
void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the
record in its own buffer. The addRecord returns if the pending buffer
should be flushed. In that case, the operator invokes flush.

Please share your thoughts. The corresponding JIRA is
https://issues.apache.org/jira/browse/FLINK-4854

Xiaowei


[jira] [Created] (FLINK-4855) Add partitionedKeyBy to DataStream

2016-10-18 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-4855:


 Summary: Add partitionedKeyBy to DataStream
 Key: FLINK-4855
 URL: https://issues.apache.org/jira/browse/FLINK-4855
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Xiaowei Jiang
Assignee: MaGuowei


After we do any interesting operations (e.g. reduce) on KeyedStream, the result 
becomes DataStream. In a lot of cases, the output still has the same or 
compatible keys with the KeyedStream (logically). But to do further operations 
on these keys, we are forced to use keyby again. This works semantically, but 
is costly in two aspects. First, it destroys the possibility of chaining, which 
is one of the most important optimization technique. Second, keyby will greatly 
expand the connected components of tasks, which has implications in failover 
optimization.

To address this shortcoming, we propose a new operator partitionedKeyBy.

DataStream {
public  KeyedStream partitionedKeyBy(KeySelector key)
}

Semantically, DataStream.partitionedKeyBy(key) is equivalent to 
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as 
an extra field. This guarantees that records from different tasks will never 
produce the same keys.

With this, it's possible to do

ds.keyBy(key1).reduce(func1)
.partitionedKeyBy(key1).reduce(func2)
.partitionedKeyBy(key2).reduce(func3);

Most importantly, in certain cases, we will be able to chains these into a 
single vertex.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4854) Efficient Batch Operator in Streaming

2016-10-18 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-4854:


 Summary: Efficient Batch Operator in Streaming
 Key: FLINK-4854
 URL: https://issues.apache.org/jira/browse/FLINK-4854
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaowei Jiang
Assignee: MaGuowei


Very often, it's more efficient to process a batch of records at once instead 
of processing them one by one. We can use window to achieve this functionality. 
However, window will store all records in states, which can be costly. It's 
desirable to have an efficient implementation of batch operator. The batch 
operator works per task and behave similarly to aligned windows. Here is an 
example of how the interface looks like to a user.

interface BatchFunction {
// add the record to the buffer
// returns if the batch is ready to be flushed
boolean addRecord(T record);

// process all pending records in the buffer
void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the 
record in its own buffer. The addRecord returns if the pending buffer should be 
flushed. In that case, the operator invokes flush.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)