Re: Understanding watermark

2020-01-14 Thread Cam Mach
Hi Guowei,

Thanks for your response.

What I understand from you, one operator has two watermarks? If so, one
operator's output watermark would be an input watermark of the next
operator? Does it sounds redundant?

Or do you mean the Web UI only show the input watermarks of every operator,
but since the source doesn't have input watermark show it show "No
Watermark" ? And we should have output watermark for source?

And, yes we want to understand when we should expect to see watermarks for
our "combined" sources (bounded and un-bounded) for our pipeline?

If you can be more directly, it would be very helpful.

Thanks,

On Tue, Jan 14, 2020 at 5:42 PM Guowei Ma  wrote:

> Hi, Cam,
> I think you might want to know why the web page does not show the
> watermark of the source.
> Currently, the web only shows the "input" watermark. The source only
> outputs the watermark so the web shows you that there is "No Watermark".
>  Actually Flink has "output" watermark metrics. I think Flink should also
> show this information on the web. Would you mind open a Jira to track this?
>
>
> Best,
> Guowei
>
>
> Cam Mach  于2020年1月15日周三 上午4:05写道:
>
>> Hi Till,
>>
>> Thanks for your response.
>>
>> Our sources are S3 and Kinesis. We have run several tests, and we are
>> able to take savepoint/checkpoint, but only when S3 complete reading. And
>> at that point, our pipeline has watermarks for other operators, but not the
>> source operator. We are not running `PROCESS_CONTINUOUSLY`, so we should
>> have watermark for the source as well, right?
>>
>>  Attached is snapshot of our pipeline.
>>
>> [image: image.png]
>>
>> Thanks
>>
>>
>>
>> On Tue, Jan 14, 2020 at 10:43 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Cam,
>>>
>>> could you share a bit more details about your job (e.g. which sources
>>> are you using, what are your settings, etc.). Ideally you can provide a
>>> minimal example in order to better understand the program.
>>>
>>> From a high level perspective, there might be different problems: First
>>> of all, Flink does not support checkpointing/taking a savepoint if some of
>>> the job's operator have already terminated iirc. But your description
>>> points rather into the direction that your bounded source does not
>>> terminate. So maybe you are reading a file via
>>> StreamExecutionEnvironment.createFileInput
>>> with FileProcessingMode.PROCESS_CONTINUOUSLY. But these things are hard to
>>> tell without a better understanding of your job.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jan 13, 2020 at 8:35 PM Cam Mach  wrote:
>>>
 Hello Flink expert,

 We have a pipeline that read both bounded and unbounded sources and our
 understanding is that when the bounded sources complete they should get a
 watermark of +inf and then we should be able to take a savepoint and safely
 restart the pipeline. However, we have source that never get watermarks and
 we are confused as to what we are seeing and what we should expect


 Cam Mach
 Software Engineer
 E-mail: cammac...@gmail.com
 Tel: 206 972 2768




[jira] [Created] (FLINK-15596) Support key-value messages for kafka producer for flink SQL \Tbale

2020-01-14 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15596:
-

 Summary:  Support  key-value messages for kafka producer for flink 
SQL \Tbale
 Key: FLINK-15596
 URL: https://issues.apache.org/jira/browse/FLINK-15596
 Project: Flink
  Issue Type: Wish
  Components: Connectors / Kafka
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15595) Resolution Order is chaotic not FLIP-68 defined

2020-01-14 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-15595:


 Summary: Resolution Order is chaotic not FLIP-68 defined
 Key: FLINK-15595
 URL: https://issues.apache.org/jira/browse/FLINK-15595
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Jingsong Lee
 Fix For: 1.10.0


First of all, the implementation is problematic. CoreModule returns 
BuiltinFunctionDefinition, which cannot be resolved in 
FunctionCatalogOperatorTable, so it will fall back to FlinkSqlOperatorTable.

Second, the function defined by CoreModule is seriously incomplete. You can 
compare it with FunctionCatalogOperatorTable, a lot less.

We should:
 * We should resolve BuiltinFunctionDefinition correctly in 
FunctionCatalogOperatorTable.
 * CoreModule should contains all functions in FlinkSqlOperatorTable, a simple 
way could provided calcite wrapper to wrap all functions.
 * PlannerContext.getBuiltinSqlOperatorTable should not contains 
FlinkSqlOperatorTable, we should use one 
FunctionCatalogOperatorTable.Otherwise, there will be a lot of confusion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15594) Streaming SQL end-to-end test (Blink planner) failed with output hash mismatch

2020-01-14 Thread PengFei Li (Jira)
PengFei Li created FLINK-15594:
--

 Summary: Streaming SQL end-to-end test (Blink planner) failed with 
output hash mismatch
 Key: FLINK-15594
 URL: https://issues.apache.org/jira/browse/FLINK-15594
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: PengFei Li


Streaming SQL end-to-end test (Blink planner) failed with output hash mismatch  
in the instances [https://api.travis-ci.org/v3/job/636999143/log.txt 
|https://api.travis-ci.org/v3/job/636999143/log.txt]and 
[https://api.travis-ci.org/v3/job/636999153/log.txt]
{code:java}
// code placeholder
FAIL SQL Client Kafka 0.10.2.0: Output hash mismatch.  Got 
c6c06042b89c12234c91ed0a0d9c8406, expected 1303e3aa1d7aeb63024d539f15e42dd1.
head hexdump of actual:
000   2   0   1   8   -   0   3   -   1   2   0   8   :   0   0
010   :   0   0   ,   A   l   i   c   e   ,   T   h   i   s   w
020   a   s   a   w   a   r   n   i   n   g   .   ,   2   ,
030   S   u   c   c   e   s   s   c   o   n   s   t   a   n   t
040   f   o   l   d   i   n   g   .  \n   2   0   1   8   -   0
050   3   -   1   2   0   9   :   0   0   :   0   0   ,   B   o
060   b   ,   T   h   i   s   w   a   s   a   n   o   t   h
070   e   r   w   a   r   n   i   n   g   .   ,   1   ,   S   u
080   c   c   e   s   s   c   o   n   s   t   a   n   t   f
090   o   l   d   i   n   g   .  \n   2   0   1   8   -   0   3   -
0a0   1   2   0   9   :   0   0   :   0   0   ,   S   t   e   v
0b0   e   ,   T   h   i   s   w   a   s   a   n   o   t   h
0c0   e   r   i   n   f   o   .   ,   2   ,   S   u   c   c   e
0d0   s   s   c   o   n   s   t   a   n   t   f   o   l   d
0e0   i   n   g   .  \n   2   0   1   8   -   0   3   -   1   2
0f0   0   9   :   0   0   :   0   0   ,   A   l   i   c   e   ,   T
100   h   i   s   w   a   s   a   i   n   f   o   .   ,
110   1   ,   S   u   c   c   e   s   s   c   o   n   s   t   a
120   n   t   f   o   l   d   i   n   g   .  \n
12c
[FAIL] Test script contains errors.

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15593) add doc to remind users not using Hive aggregate functions in streaming mode

2020-01-14 Thread Bowen Li (Jira)
Bowen Li created FLINK-15593:


 Summary: add doc to remind users not using Hive aggregate 
functions in streaming mode
 Key: FLINK-15593
 URL: https://issues.apache.org/jira/browse/FLINK-15593
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.10.0


Note that Hive scalar and table functions implementing UDF, GenericUDF, and 
GenericUDTF interfaces should be good to run in both 
streaming and batch mode in Flink.

Due to that Hive functions are all built for batch processing, aggregate 
functions in Hive that implement UDAF and GenericUDAFResolver2 
interfaces may have unpredictable behaviors when used in streaming mode in 
Flink. We advice users to only use Hive aggregate functions
interfaces in batch mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-14 Thread Xintong Song
Thanks for the discussion, Stephan, Till and Andrey.

+1 for the managed fraction (0.4) and process.size (1.5G).

*JVM overhead min 196 -> 192Mb (128 + 64)*
> small correction for better power 2 alignment of sizes
>
Sorry, this was a typo (and the same for the jira comment which is
copy-pasted). It was 192mb used in the tuning report.

*meta space at least 96Mb?*
> There is still a concern about JVM metaspace being just 64Mb.
> We should confirm that it is not a problem by trying to test it also with
> the SQL jobs, Blink planner.
> Also, by running tpc-ds e2e Flink tests with this setting. Basically, where
> more classes are generated/loaded.
> We can look into this tomorrow.
>
I have already tried the setting metaspace to 64Mb with the e2e tests,
where I believe various sql / blink / tpc-ds test cases are included. (See
https://travis-ci.com/flink-ci/flink/builds/142970113 )
However, I'm also ok with 96Mb, since we are increasing the process.size to
1.5G.
My original concern for having larger metaspace size was that we may result
in too small flink.size for out-of-box configuration on containerized
setups.

*sanity check of JVM overhead*
> When the explicitly configured process and flink memory sizes are verified
> with the JVM meta space and overhead,
> JVM overhead does not have to be the exact fraction.
> It can be just within its min/max range, similar to how it is now for
> network/shuffle memory check after FLINK-15300.
>
Also +1 for this.

Thank you~

Xintong Song



On Wed, Jan 15, 2020 at 6:16 AM Andrey Zagrebin 
wrote:

> Hi all,
>
> Stephan, Till and me had another offline discussion today. Here is the
> outcome of our brainstorm.
>
> *managed fraction 0.4*
> just confirmed what we already discussed here.
>
> *process.size = 1536Mb (1,5Gb)*
> We agreed to have process.size in the default settings with the explanation
> of flink.size alternative in the comment.
> The suggestion is to increase it from 1024 to 1536mb. As you can see in the
> earlier provided calculation spreadsheet,
> it will result in bigger JVM Heap and managed memory (both ~0.5Gb) for all
> new setups.
> This should provide good enough experience for trying out Flink.
>
> *JVM overhead min 196 -> 192Mb (128 + 64)*
> small correction for better power 2 alignment of sizes
>
> *meta space at least 96Mb?*
> There is still a concern about JVM metaspace being just 64Mb.
> We should confirm that it is not a problem by trying to test it also with
> the SQL jobs, Blink planner.
> Also, by running tpc-ds e2e Flink tests with this setting. Basically, where
> more classes are generated/loaded.
> We can look into this tomorrow.
>
> *sanity check of JVM overhead*
> When the explicitly configured process and flink memory sizes are verified
> with the JVM meta space and overhead,
> JVM overhead does not have to be the exact fraction.
> It can be just within its min/max range, similar to how it is now for
> network/shuffle memory check after FLINK-15300.
>
> Best,Andrey
>
> On Tue, Jan 14, 2020 at 4:30 PM Stephan Ewen  wrote:
>
> > I like the idea of having a larger default "flink.size" in the
> config.yaml.
> > Maybe we don't need to double it, but something like 1280m would be okay?
> >
> > On Tue, Jan 14, 2020 at 3:47 PM Andrey Zagrebin 
> > wrote:
> >
> > > Hi all!
> > >
> > > Great that we have already tried out new FLIP-49 with the bigger jobs.
> > >
> > > I am also +1 for the JVM metaspace and overhead changes.
> > >
> > > Regarding 0.3 vs 0.4 for managed memory, +1 for having more managed
> > memory
> > > for Rocksdb limiting case.
> > >
> > > In general, this looks mostly to be about memory distribution between
> JVM
> > > heap and managed off-heap.
> > > Comparing to the previous default setup, the JVM heap dropped
> (especially
> > > for standalone) mostly due to moving managed from heap to off-heap and
> > then
> > > also adding framework off-heap.
> > > In general, this can be the most important consequence for beginners
> and
> > > those who rely on the default configuration.
> > > Especially the legacy default configuration in standalone with falling
> > back
> > > heap.size to flink.size but there it seems we cannot do too much now.
> > >
> > > I prepared a spreadsheet
> > > <
> > >
> >
> https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE
> > > >
> > > to play with numbers for the mentioned in the report setups.
> > >
> > > One idea would be to set process size (or smaller flink size
> > respectively)
> > > to a bigger default number, like 2048.
> > > In this case, the abs derived default JVM heap and managed memory are
> > close
> > > to the previous defaults, especially for managed fraction 0.3.
> > > This should align the defaults with the previous standalone try-out
> > > experience where the increased off-heap memory is not strictly
> controlled
> > > by the environment anyways.
> > > The consequence for container users who relied on and updated the
> default
> > > configuration is that 

Re: [VOTE] FLIP-90: Support SQL 2016-2017 JSON functions in Flink SQL

2020-01-14 Thread Zhenghua Gao
+1 (non-binding)

*Best Regards,*
*Zhenghua Gao*


On Wed, Jan 15, 2020 at 10:11 AM Danny Chan  wrote:

> +1 (non-binding)
>
> Best,
> Danny Chan
> 在 2019年12月31日 +0800 PM5:09,Forward Xu ,写道:
> > Hi all,
> >
> > I'd like to start the vote of FLIP-90 [1] since that we have reached an
> > agreement on the design in the discussion thread [2].
> >
> > This vote will be open for at least 72 hours. Unless there is an
> objection,
> > I will try to close it by January 3, 2020 08:00 UTC if we have received
> > sufficient votes.
> >
> > Best,
> > ForwardXu
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550
> > [2]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html
>


[jira] [Created] (FLINK-15592) Streaming sql throw hive related sql when it doesn't use any hive table

2020-01-14 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15592:
--

 Summary: Streaming sql throw hive related sql when it doesn't use 
any hive table
 Key: FLINK-15592
 URL: https://issues.apache.org/jira/browse/FLINK-15592
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Jeff Zhang


I use the following streaming sql to query a kafka table whose metadata is 
store in hive metastore via HiveCatalog. But it will throw hive related 
exception which is very confusing.

SQL
{code}
SELECT *
FROM (
   SELECT *,
 ROW_NUMBER() OVER(
   ORDER BY event_ts) AS rownum
   FROM source_kafka)
WHERE rownum <= 10
{code}

Exception
{code}
Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at 
org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:103)
... 13 more
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at 
org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction.lambda$createReturnTypeInference$0(HiveAggSqlFunction.java:82)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlCallBinding.getOperandType(SqlCallBinding.java:237)
at 
org.apache.calcite.sql.type.OrdinalReturnTypeInference.inferReturnType(OrdinalReturnTypeInference.java:40)
at 
org.apache.calcite.sql.type.SqlTypeTransformCascade.inferReturnType(SqlTypeTransformCascade.java:54)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at 
org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4105)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3389)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 

Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-14 Thread Xintong Song
Thanks all for the discussion.

We agreed to have process.size in the default settings with the explanation
> of flink.size alternative in the comment.
> This way we keep -tm as a shortcut to process.size only and any
> inconsistencies fail fast as if configured in yaml.
>

The conclusions sounds good to me. I'll update the JIRA description and PR
for FLINK-15530.

+1 to resolve this discussion thread. We can keep the default values
discussion in the "tuning FLIP-49 default configuration values" ML thread.


Thank you~

Xintong Song



On Wed, Jan 15, 2020 at 5:54 AM Andrey Zagrebin 
wrote:

> Hi all,
>
> Stephan, Till and me had another offline discussion today. Here is the
> outcome of our brainstorm.
>
> We agreed to have process.size in the default settings with the explanation
> of flink.size alternative in the comment.
> This way we keep -tm as a shortcut to process.size only and any
> inconsistencies fail fast as if configured in yaml.
> I will also follow-up on the thread "[Discuss] Tuning FLIP-49 configuration
> default values" with a bit more details.
>
> If no further objections, we can conclude this last point in this
> discussion.
>
> Best,
> Andrey
>
> On Tue, Jan 14, 2020 at 4:28 PM Stephan Ewen  wrote:
>
> > I think that problem exists anyways and is independent of the "-tm"
> option.
> >
> > You can have a combination of `task.heap.size` and `managed.size` and
> > `framework.heap.size` that conflicts with `flink.size`. In that case, we
> > get an exception during the startup (incompatible memory configuration)?
> > That is the price for having these "shortcut" options (`flink.size` and
> > `process.size`). But it is a fair price, because the shortcuts are very
> > valuable to most users.
> >
> > That is added with the "-tm" setting is that this is an easy way to get
> the
> > shortcut setting added, even if it was not in the config. So where a
> config
> > worked in standalone, it can now fail in a container setup when "-tm" is
> > used.
> > But that is expected, I guess, when you start manually tune different
> > memory types and end up defining an inconsistent combination. And it
> never
> > conflicts with the default setup, only with manually tuned regions.
> >
> > But this example shows why we need to have log output for the logic that
> > validates and configures the memory settings, so that users understand
> what
> > is happening.
> >
> > Best,
> > Stephan
> >
> >
> > On Tue, Jan 14, 2020 at 2:54 PM Till Rohrmann 
> > wrote:
> >
> > > Clearing the `flink.size` option and setting `process.size` could
> indeed
> > be
> > > a solution. The thing I'm wondering is what would happen if the user
> has
> > > configured `task.heap.size` and `managed.size` instead of `flink.size`?
> > > Would we also ignore these settings? If not, then we risk to run into
> the
> > > situation that TaskExecutorResourceUtils fails because the memory does
> > not
> > > add up. I guess the thing which bugs me a bit is the special casing
> which
> > > could lead easily into inconsistent behaviour if we don't cover all
> > cases.
> > > The consequence of using slightly different concepts (`flink.size`,
> > > `process.size`) in standalone vs. container/Yarn/Mesos mode in order to
> > > keep the status quo is an increased maintenance overhead which we
> should
> > be
> > > aware of.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Jan 14, 2020 at 3:59 AM Xintong Song 
> > > wrote:
> > >
> > > > True, even we have "process.size" rather than "flink.size" in the
> > default
> > > > config file, user can still have "flink.size" in their custom config
> > > file.
> > > > If we consider "-tm" as a shortcut for users to override the TM
> memory
> > > > size, then it makes less sense to require users to remove
> "flink.size"
> > > from
> > > > their config file whenever then want to use "-tm".
> > > >
> > > > I'm convinced that ignoring "flink.size" might not be a bad idea.
> > > > And I think we should also update the description of "-tm" (in
> > > > "FlinkYarnSessionCli") to explicitly mention that it would overwrite
> > > > "flink.size" and "process.size".
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Jan 14, 2020 at 2:24 AM Stephan Ewen 
> wrote:
> > > >
> > > > > Would be good to hear the thoughts of some more Yarn users, though.
> > > > >
> > > > > On Mon, Jan 13, 2020 at 7:23 PM Stephan Ewen 
> > wrote:
> > > > >
> > > > > > I think we need an interpretation of "-tm" regardless of what is
> in
> > > the
> > > > > > default configuration, because we can always have a modified
> > > > > configuration
> > > > > > and then a user passes the "-tm" flag.
> > > > > >
> > > > > > I kind of like the first proposal: Interpret "-tm" as "override
> > > memory
> > > > > > size config and set the Yarn TM container size." It would mean
> > > exactly
> > > > > > ignoring "taskmanager.memory.flink.size" and using the "-tm"
> value
> > > as "
> > > > > > 

[DISCUSS] Improve TableFactory

2020-01-14 Thread Jingsong Li
Hi dev,

I'd like to kick off a discussion on the improvement of TableSourceFactory
and TableSinkFactory.

Motivation:
Now the main needs and problems are:
1.Connector can't get TableConfig [1], and some behaviors really need to be
controlled by the user's table configuration. In the era of catalog, we
can't put these config in connector properties, which is too inconvenient.
2.Connector can't know if this is batch or stream execution mode. But the
sink implementation of batch and stream is totally different. I understand
there is an update mode property now, but it splits the batch and stream in
the catalog dimension. In fact, this information can be obtained through
the current TableEnvironment.
3.No interface to call validation. Now our validation is more util classes.
It depends on whether or not the connector calls. Now we have some new
validations to add, such as [2], which is really confuse uses, even
developers. Another problem is that our SQL update (DDL) does not have
validation [3]. It is better to report an error when executing DDL,
otherwise it will confuse the user.

Proposed change draft for 1 and 2:

interface CatalogTableContext {
   ObjectPath getTablePath();
   CatalogTable getTable();
   ReadableConfig getTableConfig();
   boolean isStreamingMode();
}

public interface TableSourceFactory extends TableFactory {

   default TableSource createTableSource(CatalogTableContext context) {
  return createTableSource(context.getTablePath(), context.getTable());
   }

   ..
}

Proposed change draft for 3:

public interface TableFactory {

   TableValidators validators();

   interface TableValidators {
  ConnectorDescriptorValidator connectorValidator();
  TableSchemaValidator schemaValidator();
  FormatDescriptorValidator formatValidator();
   }
}

What do you think?

[1] https://issues.apache.org/jira/browse/FLINK-15290
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
[3] https://issues.apache.org/jira/browse/FLINK-15509

Best,
Jingsong Lee


Re: [VOTE] FLIP-90: Support SQL 2016-2017 JSON functions in Flink SQL

2020-01-14 Thread Danny Chan
+1 (non-binding)

Best,
Danny Chan
在 2019年12月31日 +0800 PM5:09,Forward Xu ,写道:
> Hi all,
>
> I'd like to start the vote of FLIP-90 [1] since that we have reached an
> agreement on the design in the discussion thread [2].
>
> This vote will be open for at least 72 hours. Unless there is an objection,
> I will try to close it by January 3, 2020 08:00 UTC if we have received
> sufficient votes.
>
> Best,
> ForwardXu
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html


[jira] [Created] (FLINK-15591) support CREATE TEMPORARY TABLE/VIEW in DDL

2020-01-14 Thread Bowen Li (Jira)
Bowen Li created FLINK-15591:


 Summary: support CREATE TEMPORARY TABLE/VIEW in DDL
 Key: FLINK-15591
 URL: https://issues.apache.org/jira/browse/FLINK-15591
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Bowen Li
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Understanding watermark

2020-01-14 Thread Guowei Ma
Hi, Cam,
I think you might want to know why the web page does not show the watermark
of the source.
Currently, the web only shows the "input" watermark. The source only
outputs the watermark so the web shows you that there is "No Watermark".
 Actually Flink has "output" watermark metrics. I think Flink should also
show this information on the web. Would you mind open a Jira to track this?


Best,
Guowei


Cam Mach  于2020年1月15日周三 上午4:05写道:

> Hi Till,
>
> Thanks for your response.
>
> Our sources are S3 and Kinesis. We have run several tests, and we are able
> to take savepoint/checkpoint, but only when S3 complete reading. And at
> that point, our pipeline has watermarks for other operators, but not the
> source operator. We are not running `PROCESS_CONTINUOUSLY`, so we should
> have watermark for the source as well, right?
>
>  Attached is snapshot of our pipeline.
>
> [image: image.png]
>
> Thanks
>
>
>
> On Tue, Jan 14, 2020 at 10:43 AM Till Rohrmann 
> wrote:
>
>> Hi Cam,
>>
>> could you share a bit more details about your job (e.g. which sources are
>> you using, what are your settings, etc.). Ideally you can provide a minimal
>> example in order to better understand the program.
>>
>> From a high level perspective, there might be different problems: First
>> of all, Flink does not support checkpointing/taking a savepoint if some of
>> the job's operator have already terminated iirc. But your description
>> points rather into the direction that your bounded source does not
>> terminate. So maybe you are reading a file via
>> StreamExecutionEnvironment.createFileInput
>> with FileProcessingMode.PROCESS_CONTINUOUSLY. But these things are hard to
>> tell without a better understanding of your job.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 13, 2020 at 8:35 PM Cam Mach  wrote:
>>
>>> Hello Flink expert,
>>>
>>> We have a pipeline that read both bounded and unbounded sources and our
>>> understanding is that when the bounded sources complete they should get a
>>> watermark of +inf and then we should be able to take a savepoint and safely
>>> restart the pipeline. However, we have source that never get watermarks and
>>> we are confused as to what we are seeing and what we should expect
>>>
>>>
>>> Cam Mach
>>> Software Engineer
>>> E-mail: cammac...@gmail.com
>>> Tel: 206 972 2768
>>>
>>>


[jira] [Created] (FLINK-15590) add section for current catalog and current database

2020-01-14 Thread Bowen Li (Jira)
Bowen Li created FLINK-15590:


 Summary: add section for current catalog and current database
 Key: FLINK-15590
 URL: https://issues.apache.org/jira/browse/FLINK-15590
 Project: Flink
  Issue Type: Task
  Components: Deployment / Docker
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15589) remove beta tag from catalog and hive doc

2020-01-14 Thread Bowen Li (Jira)
Bowen Li created FLINK-15589:


 Summary: remove beta tag from catalog and hive doc
 Key: FLINK-15589
 URL: https://issues.apache.org/jira/browse/FLINK-15589
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

2020-01-14 Thread Bowen Li
Hi devs,

I've updated the wiki according to feedbacks. Please take another look.

Thanks!


On Fri, Jan 10, 2020 at 2:24 PM Bowen Li  wrote:

> Thanks everyone for the prompt feedback. Please see my response below.
>
> > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> java.time.Instant semantic, and should be mapped to Flink's TIME/TIMESTAMP
> WITH LOCAL TIME ZONE
>
> Zhenghua, you are right that pg's 'timestamp with timezone' should be
> translated into flink's 'timestamp with local timezone'. I don't find 'time
> with (local) timezone' though, so we may not support that type from pg in
> Flink.
>
> > I suggest that the parameters can be completely consistent with the
> JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
> "DriverManager.getConnection".
> That allow "default db, username, pwd" things optional. They can included
> in URL. Of course JDBC api also allows establishing connections to
> different databases in a db instance. So I think we don't need provide a
> "base_url", we can just provide a real "url". To be consistent with JDBC
> api.
>
> Jingsong, what I'm saying is a builder can be added on demand later if
> there's enough user requesting it, and doesn't need to be a core part of
> the FLIP.
>
> Besides, unfortunately Postgres doesn't allow changing databases via JDBC.
>
> JDBC provides different connecting options as you mentioned, but I'd like
> to keep our design and API simple and having to handle extra parsing logic.
> And it doesn't shut the door for what you proposed as a future effort.
>
> > Since the PostgreSQL does not have catalog but schema under database,
> why not mapping the PG-database to Flink catalog and PG-schema to Flink
> database
>
> Danny, because 1) there are frequent use cases where users want to switch
> databases or referencing objects across databases in a pg instance 2)
> schema is an optional namespace layer in pg, it always has a default value
> ("public") and can be invisible to users if they'd like to as shown in the
> FLIP 3) as you mentioned it is specific to postgres, and I don't feel it's
> necessary to map Postgres substantially different than others DBMSs with
> additional complexity
>
> >'base_url' configuration: We are following the configuration format
> guideline [1] which suggest to use dash (-) instead of underline (_). And
> I'm a little confused the meaning of "base_url" at the first glance,
> another idea is split it into several configurations: 'driver', 'hostname',
> 'port'.
>
> Jark, I agreed we should use "base-url" in yaml config.
>
> I'm not sure about having hostname and port separately because you can
> specify multiple hosts with ports in jdbc, like
> "jdbc:dbms/host1:port1,host2:port2/", for connection failovers. Separating
> them would make configurations harder.
>
> I will add clear doc and example to avoid any possible confusion.
>
> > 'default-database' is optional, then which database will be used or what
> is the behavior when the default database is not selected.
>
> This should be DBMS specific. For postgres, it will be the 
> database.
>
>
> On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao  wrote:
>
>> Hi Bowen, Thanks for driving this.
>> I think it would be very convenience to use tables in external DBs with
>> JDBC Catalog.
>>
>> I have one concern about "Flink-Postgres Data Type Mapping" part:
>>
>> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant
>> semantic,
>> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li 
>> wrote:
>>
>> > Hi Bowen, thanks for reply and updating.
>> >
>> > > I don't see much value in providing a builder for jdbc catalogs, as
>> they
>> > only have 4 or 5 required params, no optional ones. I prefer users just
>> > provide a base url without default db, usrname, pwd so we don't need to
>> > parse url all around, as I mentioned jdbc catalog may need to establish
>> > connections to different databases in a db instance,
>> >
>> > I suggest that the parameters can be completely consistent with the
>> > JDBCTableSource / JDBCTableSink.
>> > If you take a look to JDBC api: "DriverManager.getConnection".
>> > That allow "default db, username, pwd" things optional. They can
>> included
>> > in URL. Of course JDBC api also allows establishing connections to
>> > different databases in a db instance.
>> > So I think we don't need provide a "base_url", we can just provide a
>> real
>> > "url".
>> > To be consistent with JDBC api.
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu  wrote:
>> >
>> > > Thanks Bowen for the reply,
>> > >
>> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good to
>> me.
>> > >
>> > > I have some other minor comments when I went through the updated
>> > > documentation:
>> > >
>> > > 1) 'base_url' configuration: We are following the configuration format
>> > > guideline [1] 

Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-14 Thread Andrey Zagrebin
Hi all,

Stephan, Till and me had another offline discussion today. Here is the
outcome of our brainstorm.

*managed fraction 0.4*
just confirmed what we already discussed here.

*process.size = 1536Mb (1,5Gb)*
We agreed to have process.size in the default settings with the explanation
of flink.size alternative in the comment.
The suggestion is to increase it from 1024 to 1536mb. As you can see in the
earlier provided calculation spreadsheet,
it will result in bigger JVM Heap and managed memory (both ~0.5Gb) for all
new setups.
This should provide good enough experience for trying out Flink.

*JVM overhead min 196 -> 192Mb (128 + 64)*
small correction for better power 2 alignment of sizes

*meta space at least 96Mb?*
There is still a concern about JVM metaspace being just 64Mb.
We should confirm that it is not a problem by trying to test it also with
the SQL jobs, Blink planner.
Also, by running tpc-ds e2e Flink tests with this setting. Basically, where
more classes are generated/loaded.
We can look into this tomorrow.

*sanity check of JVM overhead*
When the explicitly configured process and flink memory sizes are verified
with the JVM meta space and overhead,
JVM overhead does not have to be the exact fraction.
It can be just within its min/max range, similar to how it is now for
network/shuffle memory check after FLINK-15300.

Best,Andrey

On Tue, Jan 14, 2020 at 4:30 PM Stephan Ewen  wrote:

> I like the idea of having a larger default "flink.size" in the config.yaml.
> Maybe we don't need to double it, but something like 1280m would be okay?
>
> On Tue, Jan 14, 2020 at 3:47 PM Andrey Zagrebin 
> wrote:
>
> > Hi all!
> >
> > Great that we have already tried out new FLIP-49 with the bigger jobs.
> >
> > I am also +1 for the JVM metaspace and overhead changes.
> >
> > Regarding 0.3 vs 0.4 for managed memory, +1 for having more managed
> memory
> > for Rocksdb limiting case.
> >
> > In general, this looks mostly to be about memory distribution between JVM
> > heap and managed off-heap.
> > Comparing to the previous default setup, the JVM heap dropped (especially
> > for standalone) mostly due to moving managed from heap to off-heap and
> then
> > also adding framework off-heap.
> > In general, this can be the most important consequence for beginners and
> > those who rely on the default configuration.
> > Especially the legacy default configuration in standalone with falling
> back
> > heap.size to flink.size but there it seems we cannot do too much now.
> >
> > I prepared a spreadsheet
> > <
> >
> https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE
> > >
> > to play with numbers for the mentioned in the report setups.
> >
> > One idea would be to set process size (or smaller flink size
> respectively)
> > to a bigger default number, like 2048.
> > In this case, the abs derived default JVM heap and managed memory are
> close
> > to the previous defaults, especially for managed fraction 0.3.
> > This should align the defaults with the previous standalone try-out
> > experience where the increased off-heap memory is not strictly controlled
> > by the environment anyways.
> > The consequence for container users who relied on and updated the default
> > configuration is that the containers will be requested with the double
> > size.
> >
> > Best,
> > Andrey
> >
> >
> > On Tue, Jan 14, 2020 at 11:20 AM Till Rohrmann 
> > wrote:
> >
> > > +1 for the JVM metaspace and overhead changes.
> > >
> > > On Tue, Jan 14, 2020 at 11:19 AM Till Rohrmann 
> > > wrote:
> > >
> > >> I guess one of the most important results of this experiment is to
> have
> > a
> > >> good tuning guide available for users who are past the initial try-out
> > >> phase because the default settings will be kind of a compromise. I
> > assume
> > >> that this is part of the outstanding FLIP-49 documentation task.
> > >>
> > >> If we limit RocksDB's memory consumption by default, then I believe
> that
> > >> 0.4 would give the better all-round experience as it leaves a bit more
> > >> memory for RocksDB. However, I'm a bit sceptical whether we should
> > optimize
> > >> the default settings for a configuration where the user still needs to
> > >> activate the strict memory limiting for RocksDB. In this case, I would
> > >> expect that the user could also adapt the managed memory fraction.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Tue, Jan 14, 2020 at 3:39 AM Xintong Song 
> > >> wrote:
> > >>
> > >>> Thanks for the feedback, Stephan and Kurt.
> > >>>
> > >>> @Stephan
> > >>>
> > >>> Regarding managed memory fraction,
> > >>> - It makes sense to keep the default value 0.4, if we assume rocksdb
> > >>> memory is limited by default.
> > >>> - AFAIK, currently rocksdb by default does not limit its memory
> usage.
> > >>> And I'm positive to change it.
> > >>> - Personally, I don't like the idea that we the out-of-box experience
> > >>> (for which we set the default fraction) relies on that users will
> > 

Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-14 Thread Andrey Zagrebin
Hi all,

Stephan, Till and me had another offline discussion today. Here is the
outcome of our brainstorm.

We agreed to have process.size in the default settings with the explanation
of flink.size alternative in the comment.
This way we keep -tm as a shortcut to process.size only and any
inconsistencies fail fast as if configured in yaml.
I will also follow-up on the thread "[Discuss] Tuning FLIP-49 configuration
default values" with a bit more details.

If no further objections, we can conclude this last point in this
discussion.

Best,
Andrey

On Tue, Jan 14, 2020 at 4:28 PM Stephan Ewen  wrote:

> I think that problem exists anyways and is independent of the "-tm" option.
>
> You can have a combination of `task.heap.size` and `managed.size` and
> `framework.heap.size` that conflicts with `flink.size`. In that case, we
> get an exception during the startup (incompatible memory configuration)?
> That is the price for having these "shortcut" options (`flink.size` and
> `process.size`). But it is a fair price, because the shortcuts are very
> valuable to most users.
>
> That is added with the "-tm" setting is that this is an easy way to get the
> shortcut setting added, even if it was not in the config. So where a config
> worked in standalone, it can now fail in a container setup when "-tm" is
> used.
> But that is expected, I guess, when you start manually tune different
> memory types and end up defining an inconsistent combination. And it never
> conflicts with the default setup, only with manually tuned regions.
>
> But this example shows why we need to have log output for the logic that
> validates and configures the memory settings, so that users understand what
> is happening.
>
> Best,
> Stephan
>
>
> On Tue, Jan 14, 2020 at 2:54 PM Till Rohrmann 
> wrote:
>
> > Clearing the `flink.size` option and setting `process.size` could indeed
> be
> > a solution. The thing I'm wondering is what would happen if the user has
> > configured `task.heap.size` and `managed.size` instead of `flink.size`?
> > Would we also ignore these settings? If not, then we risk to run into the
> > situation that TaskExecutorResourceUtils fails because the memory does
> not
> > add up. I guess the thing which bugs me a bit is the special casing which
> > could lead easily into inconsistent behaviour if we don't cover all
> cases.
> > The consequence of using slightly different concepts (`flink.size`,
> > `process.size`) in standalone vs. container/Yarn/Mesos mode in order to
> > keep the status quo is an increased maintenance overhead which we should
> be
> > aware of.
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 14, 2020 at 3:59 AM Xintong Song 
> > wrote:
> >
> > > True, even we have "process.size" rather than "flink.size" in the
> default
> > > config file, user can still have "flink.size" in their custom config
> > file.
> > > If we consider "-tm" as a shortcut for users to override the TM memory
> > > size, then it makes less sense to require users to remove "flink.size"
> > from
> > > their config file whenever then want to use "-tm".
> > >
> > > I'm convinced that ignoring "flink.size" might not be a bad idea.
> > > And I think we should also update the description of "-tm" (in
> > > "FlinkYarnSessionCli") to explicitly mention that it would overwrite
> > > "flink.size" and "process.size".
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Jan 14, 2020 at 2:24 AM Stephan Ewen  wrote:
> > >
> > > > Would be good to hear the thoughts of some more Yarn users, though.
> > > >
> > > > On Mon, Jan 13, 2020 at 7:23 PM Stephan Ewen 
> wrote:
> > > >
> > > > > I think we need an interpretation of "-tm" regardless of what is in
> > the
> > > > > default configuration, because we can always have a modified
> > > > configuration
> > > > > and then a user passes the "-tm" flag.
> > > > >
> > > > > I kind of like the first proposal: Interpret "-tm" as "override
> > memory
> > > > > size config and set the Yarn TM container size." It would mean
> > exactly
> > > > > ignoring "taskmanager.memory.flink.size" and using the "-tm" value
> > as "
> > > > > "taskmanager.memory.process.size".
> > > > > That does not sound too bad to me.
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Mon, Jan 13, 2020 at 5:35 PM Andrey Zagrebin <
> > azagre...@apache.org>
> > > > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> While working on changing process memory to Flink memory in
> default
> > > > >> configuration, Xintong encountered a problem.
> > > > >> When -tm option is used to rewrite container memory, basically
> > process
> > > > >> memory, it can collide with the default Flink memory.
> > > > >> For legacy users it should not be a problem as we adjusted the
> > legacy
> > > > heap
> > > > >> size option to be interpreted differently for standalone and
> > container
> > > > >> modes.
> > > > >>
> > > > >> One solution could be to say in -tm docs that we rewrite both
> > 

Re: Understanding watermark

2020-01-14 Thread Cam Mach
Hi Till,

Thanks for your response.

Our sources are S3 and Kinesis. We have run several tests, and we are able
to take savepoint/checkpoint, but only when S3 complete reading. And at
that point, our pipeline has watermarks for other operators, but not the
source operator. We are not running `PROCESS_CONTINUOUSLY`, so we should
have watermark for the source as well, right?

 Attached is snapshot of our pipeline.

[image: image.png]

Thanks



On Tue, Jan 14, 2020 at 10:43 AM Till Rohrmann  wrote:

> Hi Cam,
>
> could you share a bit more details about your job (e.g. which sources are
> you using, what are your settings, etc.). Ideally you can provide a minimal
> example in order to better understand the program.
>
> From a high level perspective, there might be different problems: First of
> all, Flink does not support checkpointing/taking a savepoint if some of the
> job's operator have already terminated iirc. But your description points
> rather into the direction that your bounded source does not terminate. So
> maybe you are reading a file via StreamExecutionEnvironment.createFileInput
> with FileProcessingMode.PROCESS_CONTINUOUSLY. But these things are hard to
> tell without a better understanding of your job.
>
> Cheers,
> Till
>
> On Mon, Jan 13, 2020 at 8:35 PM Cam Mach  wrote:
>
>> Hello Flink expert,
>>
>> We have a pipeline that read both bounded and unbounded sources and our
>> understanding is that when the bounded sources complete they should get a
>> watermark of +inf and then we should be able to take a savepoint and safely
>> restart the pipeline. However, we have source that never get watermarks and
>> we are confused as to what we are seeing and what we should expect
>>
>>
>> Cam Mach
>> Software Engineer
>> E-mail: cammac...@gmail.com
>> Tel: 206 972 2768
>>
>>


Re: Understanding watermark

2020-01-14 Thread Till Rohrmann
Hi Cam,

could you share a bit more details about your job (e.g. which sources are
you using, what are your settings, etc.). Ideally you can provide a minimal
example in order to better understand the program.

>From a high level perspective, there might be different problems: First of
all, Flink does not support checkpointing/taking a savepoint if some of the
job's operator have already terminated iirc. But your description points
rather into the direction that your bounded source does not terminate. So
maybe you are reading a file via StreamExecutionEnvironment.createFileInput
with FileProcessingMode.PROCESS_CONTINUOUSLY. But these things are hard to
tell without a better understanding of your job.

Cheers,
Till

On Mon, Jan 13, 2020 at 8:35 PM Cam Mach  wrote:

> Hello Flink expert,
>
> We have a pipeline that read both bounded and unbounded sources and our
> understanding is that when the bounded sources complete they should get a
> watermark of +inf and then we should be able to take a savepoint and safely
> restart the pipeline. However, we have source that never get watermarks and
> we are confused as to what we are seeing and what we should expect
>
>
> Cam Mach
> Software Engineer
> E-mail: cammac...@gmail.com
> Tel: 206 972 2768
>
>


[jira] [Created] (FLINK-15588) check registered udf via catalog API cannot be a scala inner class

2020-01-14 Thread Bowen Li (Jira)
Bowen Li created FLINK-15588:


 Summary: check registered udf via catalog API cannot be a scala 
inner class
 Key: FLINK-15588
 URL: https://issues.apache.org/jira/browse/FLINK-15588
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Bowen Li
 Fix For: 1.11.0


scala inner class cannot be instantiated via reflection directly. thus they 
cannot be catalog functions stored as full class name. 

we should check that in catalog API to make sure we remind users of it with 
proper error messages



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15587) Flink image does not run on OpenShift

2020-01-14 Thread LCID Fire (Jira)
LCID Fire created FLINK-15587:
-

 Summary: Flink image does not run on OpenShift
 Key: FLINK-15587
 URL: https://issues.apache.org/jira/browse/FLINK-15587
 Project: Flink
  Issue Type: Bug
Reporter: LCID Fire


Trying to run Flink (flink:latest) on OpenShift (3.11) does not work.

The output is:
{noformat}
Starting standalonesession daemon on host flink-jobmanager-7949d5c546-49cc8. 
/opt/flink/bin/flink-daemon.sh: line 127: 
/opt/flink/log/flink--standalonesession-0-flink-jobmanager-7949d5c546-49cc8.out:
 Permission denied
{noformat}
Since OpenShift is a locked down Container Environment, the rights inside the 
Container are probably not minimized and too broad.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-14 Thread Stephan Ewen
I like the idea of having a larger default "flink.size" in the config.yaml.
Maybe we don't need to double it, but something like 1280m would be okay?

On Tue, Jan 14, 2020 at 3:47 PM Andrey Zagrebin 
wrote:

> Hi all!
>
> Great that we have already tried out new FLIP-49 with the bigger jobs.
>
> I am also +1 for the JVM metaspace and overhead changes.
>
> Regarding 0.3 vs 0.4 for managed memory, +1 for having more managed memory
> for Rocksdb limiting case.
>
> In general, this looks mostly to be about memory distribution between JVM
> heap and managed off-heap.
> Comparing to the previous default setup, the JVM heap dropped (especially
> for standalone) mostly due to moving managed from heap to off-heap and then
> also adding framework off-heap.
> In general, this can be the most important consequence for beginners and
> those who rely on the default configuration.
> Especially the legacy default configuration in standalone with falling back
> heap.size to flink.size but there it seems we cannot do too much now.
>
> I prepared a spreadsheet
> <
> https://docs.google.com/spreadsheets/d/1mJaMkMPfDJJ-w6nMXALYmTc4XxiV30P5U7DzgwLkSoE
> >
> to play with numbers for the mentioned in the report setups.
>
> One idea would be to set process size (or smaller flink size respectively)
> to a bigger default number, like 2048.
> In this case, the abs derived default JVM heap and managed memory are close
> to the previous defaults, especially for managed fraction 0.3.
> This should align the defaults with the previous standalone try-out
> experience where the increased off-heap memory is not strictly controlled
> by the environment anyways.
> The consequence for container users who relied on and updated the default
> configuration is that the containers will be requested with the double
> size.
>
> Best,
> Andrey
>
>
> On Tue, Jan 14, 2020 at 11:20 AM Till Rohrmann 
> wrote:
>
> > +1 for the JVM metaspace and overhead changes.
> >
> > On Tue, Jan 14, 2020 at 11:19 AM Till Rohrmann 
> > wrote:
> >
> >> I guess one of the most important results of this experiment is to have
> a
> >> good tuning guide available for users who are past the initial try-out
> >> phase because the default settings will be kind of a compromise. I
> assume
> >> that this is part of the outstanding FLIP-49 documentation task.
> >>
> >> If we limit RocksDB's memory consumption by default, then I believe that
> >> 0.4 would give the better all-round experience as it leaves a bit more
> >> memory for RocksDB. However, I'm a bit sceptical whether we should
> optimize
> >> the default settings for a configuration where the user still needs to
> >> activate the strict memory limiting for RocksDB. In this case, I would
> >> expect that the user could also adapt the managed memory fraction.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Jan 14, 2020 at 3:39 AM Xintong Song 
> >> wrote:
> >>
> >>> Thanks for the feedback, Stephan and Kurt.
> >>>
> >>> @Stephan
> >>>
> >>> Regarding managed memory fraction,
> >>> - It makes sense to keep the default value 0.4, if we assume rocksdb
> >>> memory is limited by default.
> >>> - AFAIK, currently rocksdb by default does not limit its memory usage.
> >>> And I'm positive to change it.
> >>> - Personally, I don't like the idea that we the out-of-box experience
> >>> (for which we set the default fraction) relies on that users will
> manually
> >>> turn another switch on.
> >>>
> >>> Regarding framework heap memory,
> >>> - The major reason we set it by default is, as you mentioned, that to
> >>> have a safe net of minimal JVM heap size.
> >>> - Also, considering the in progress FLIP-56 (dynamic slot allocation),
> >>> we want to reserve some heap memory that will not go into the slot
> >>> profiles. That's why we decide the default value according to the heap
> >>> memory usage of an empty task executor.
> >>>
> >>> @Kurt
> >>> Regarding metaspace,
> >>> - This config option ("taskmanager.memory.jvm-metaspace") only takes
> >>> effect on TMs. Currently we do not set metaspace size for JM.
> >>> - If we have the same metaspace problem on TMs, then yes, changing it
> >>> from 128M to 64M will make it worse. However, IMO 10T tpc-ds benchmark
> >>> should not be considered as out-of-box experience and it makes sense to
> >>> tune the configurations for it. I think the smaller metaspace size
> would be
> >>> a better choice for the first trying-out, where a job should not be too
> >>> complicated, the TM size could be relative small (e.g. 1g).
> >>>
> >>> Thank you~
> >>>
> >>> Xintong Song
> >>>
> >>>
> >>>
> >>> On Tue, Jan 14, 2020 at 9:38 AM Kurt Young  wrote:
> >>>
>  HI Xingtong,
> 
>  IIRC during our tpc-ds 10T benchmark, we have suffered by JM's
>  metaspace size and full gc which
>  caused by lots of classloadings of source input split. Could you check
>  whether changing the default
>  value from 128MB to 64MB will make it worse?
> 
>  Correct me if I misunderstood anything, 

Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-14 Thread Stephan Ewen
I think that problem exists anyways and is independent of the "-tm" option.

You can have a combination of `task.heap.size` and `managed.size` and
`framework.heap.size` that conflicts with `flink.size`. In that case, we
get an exception during the startup (incompatible memory configuration)?
That is the price for having these "shortcut" options (`flink.size` and
`process.size`). But it is a fair price, because the shortcuts are very
valuable to most users.

That is added with the "-tm" setting is that this is an easy way to get the
shortcut setting added, even if it was not in the config. So where a config
worked in standalone, it can now fail in a container setup when "-tm" is
used.
But that is expected, I guess, when you start manually tune different
memory types and end up defining an inconsistent combination. And it never
conflicts with the default setup, only with manually tuned regions.

But this example shows why we need to have log output for the logic that
validates and configures the memory settings, so that users understand what
is happening.

Best,
Stephan


On Tue, Jan 14, 2020 at 2:54 PM Till Rohrmann  wrote:

> Clearing the `flink.size` option and setting `process.size` could indeed be
> a solution. The thing I'm wondering is what would happen if the user has
> configured `task.heap.size` and `managed.size` instead of `flink.size`?
> Would we also ignore these settings? If not, then we risk to run into the
> situation that TaskExecutorResourceUtils fails because the memory does not
> add up. I guess the thing which bugs me a bit is the special casing which
> could lead easily into inconsistent behaviour if we don't cover all cases.
> The consequence of using slightly different concepts (`flink.size`,
> `process.size`) in standalone vs. container/Yarn/Mesos mode in order to
> keep the status quo is an increased maintenance overhead which we should be
> aware of.
>
> Cheers,
> Till
>
> On Tue, Jan 14, 2020 at 3:59 AM Xintong Song 
> wrote:
>
> > True, even we have "process.size" rather than "flink.size" in the default
> > config file, user can still have "flink.size" in their custom config
> file.
> > If we consider "-tm" as a shortcut for users to override the TM memory
> > size, then it makes less sense to require users to remove "flink.size"
> from
> > their config file whenever then want to use "-tm".
> >
> > I'm convinced that ignoring "flink.size" might not be a bad idea.
> > And I think we should also update the description of "-tm" (in
> > "FlinkYarnSessionCli") to explicitly mention that it would overwrite
> > "flink.size" and "process.size".
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jan 14, 2020 at 2:24 AM Stephan Ewen  wrote:
> >
> > > Would be good to hear the thoughts of some more Yarn users, though.
> > >
> > > On Mon, Jan 13, 2020 at 7:23 PM Stephan Ewen  wrote:
> > >
> > > > I think we need an interpretation of "-tm" regardless of what is in
> the
> > > > default configuration, because we can always have a modified
> > > configuration
> > > > and then a user passes the "-tm" flag.
> > > >
> > > > I kind of like the first proposal: Interpret "-tm" as "override
> memory
> > > > size config and set the Yarn TM container size." It would mean
> exactly
> > > > ignoring "taskmanager.memory.flink.size" and using the "-tm" value
> as "
> > > > "taskmanager.memory.process.size".
> > > > That does not sound too bad to me.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Mon, Jan 13, 2020 at 5:35 PM Andrey Zagrebin <
> azagre...@apache.org>
> > > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> While working on changing process memory to Flink memory in default
> > > >> configuration, Xintong encountered a problem.
> > > >> When -tm option is used to rewrite container memory, basically
> process
> > > >> memory, it can collide with the default Flink memory.
> > > >> For legacy users it should not be a problem as we adjusted the
> legacy
> > > heap
> > > >> size option to be interpreted differently for standalone and
> container
> > > >> modes.
> > > >>
> > > >> One solution could be to say in -tm docs that we rewrite both
> options
> > > >> under
> > > >> the hood: process and Flink memory, basically unset Flink memory
> from
> > > yaml
> > > >> config.
> > > >> The downside is that this adds more magic.
> > > >>
> > > >> Alternatively, we can keep process memory in default config and, as
> > > >> mentioned before, increase it to maintain the user experience by
> > > matching
> > > >> the previous default setting for heap (now Flink in standalone)
> size.
> > > >> The Flink memory can be mentioned in process memory comment as a
> > simpler
> > > >> alternative which does not require accounting for JVM overhead.
> > > >> The downside is again more confusion while trying out Flink and
> tuning
> > > >> memory at the same time.
> > > >> On the other hand, if memory already needs to be tuned it should
> > > >> quite quickly lead to the necessity of 

Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-14 Thread Andrey Zagrebin
Hi all!

Great that we have already tried out new FLIP-49 with the bigger jobs.

I am also +1 for the JVM metaspace and overhead changes.

Regarding 0.3 vs 0.4 for managed memory, +1 for having more managed memory
for Rocksdb limiting case.

In general, this looks mostly to be about memory distribution between JVM
heap and managed off-heap.
Comparing to the previous default setup, the JVM heap dropped (especially
for standalone) mostly due to moving managed from heap to off-heap and then
also adding framework off-heap.
In general, this can be the most important consequence for beginners and
those who rely on the default configuration.
Especially the legacy default configuration in standalone with falling back
heap.size to flink.size but there it seems we cannot do too much now.

I prepared a spreadsheet

to play with numbers for the mentioned in the report setups.

One idea would be to set process size (or smaller flink size respectively)
to a bigger default number, like 2048.
In this case, the abs derived default JVM heap and managed memory are close
to the previous defaults, especially for managed fraction 0.3.
This should align the defaults with the previous standalone try-out
experience where the increased off-heap memory is not strictly controlled
by the environment anyways.
The consequence for container users who relied on and updated the default
configuration is that the containers will be requested with the double size.

Best,
Andrey


On Tue, Jan 14, 2020 at 11:20 AM Till Rohrmann  wrote:

> +1 for the JVM metaspace and overhead changes.
>
> On Tue, Jan 14, 2020 at 11:19 AM Till Rohrmann 
> wrote:
>
>> I guess one of the most important results of this experiment is to have a
>> good tuning guide available for users who are past the initial try-out
>> phase because the default settings will be kind of a compromise. I assume
>> that this is part of the outstanding FLIP-49 documentation task.
>>
>> If we limit RocksDB's memory consumption by default, then I believe that
>> 0.4 would give the better all-round experience as it leaves a bit more
>> memory for RocksDB. However, I'm a bit sceptical whether we should optimize
>> the default settings for a configuration where the user still needs to
>> activate the strict memory limiting for RocksDB. In this case, I would
>> expect that the user could also adapt the managed memory fraction.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 14, 2020 at 3:39 AM Xintong Song 
>> wrote:
>>
>>> Thanks for the feedback, Stephan and Kurt.
>>>
>>> @Stephan
>>>
>>> Regarding managed memory fraction,
>>> - It makes sense to keep the default value 0.4, if we assume rocksdb
>>> memory is limited by default.
>>> - AFAIK, currently rocksdb by default does not limit its memory usage.
>>> And I'm positive to change it.
>>> - Personally, I don't like the idea that we the out-of-box experience
>>> (for which we set the default fraction) relies on that users will manually
>>> turn another switch on.
>>>
>>> Regarding framework heap memory,
>>> - The major reason we set it by default is, as you mentioned, that to
>>> have a safe net of minimal JVM heap size.
>>> - Also, considering the in progress FLIP-56 (dynamic slot allocation),
>>> we want to reserve some heap memory that will not go into the slot
>>> profiles. That's why we decide the default value according to the heap
>>> memory usage of an empty task executor.
>>>
>>> @Kurt
>>> Regarding metaspace,
>>> - This config option ("taskmanager.memory.jvm-metaspace") only takes
>>> effect on TMs. Currently we do not set metaspace size for JM.
>>> - If we have the same metaspace problem on TMs, then yes, changing it
>>> from 128M to 64M will make it worse. However, IMO 10T tpc-ds benchmark
>>> should not be considered as out-of-box experience and it makes sense to
>>> tune the configurations for it. I think the smaller metaspace size would be
>>> a better choice for the first trying-out, where a job should not be too
>>> complicated, the TM size could be relative small (e.g. 1g).
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Jan 14, 2020 at 9:38 AM Kurt Young  wrote:
>>>
 HI Xingtong,

 IIRC during our tpc-ds 10T benchmark, we have suffered by JM's
 metaspace size and full gc which
 caused by lots of classloadings of source input split. Could you check
 whether changing the default
 value from 128MB to 64MB will make it worse?

 Correct me if I misunderstood anything, also cc @Jingsong

 Best,
 Kurt


 On Tue, Jan 14, 2020 at 3:44 AM Stephan Ewen  wrote:

> Hi all!
>
> Thanks a lot, Xintong, for this thorough analysis. Based on your
> analysis,
> here are some thoughts:
>
> +1 to change default JVM metaspace size from 128MB to 64MB
> +1 to change default JVM overhead min size from 128MB to 196MB
>
> Concerning the managed memory 

[jira] [Created] (FLINK-15586) BucketingSink is ignoring plugins when trying to re-instantiate the HadoopFileSystem

2020-01-14 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-15586:
--

 Summary: BucketingSink is ignoring plugins when trying to 
re-instantiate the HadoopFileSystem
 Key: FLINK-15586
 URL: https://issues.apache.org/jira/browse/FLINK-15586
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.10.0
Reporter: Piotr Nowojski
 Fix For: 1.10.0


{{BucketingSink#createHadoopFileSystem}} first is loading 
{{org.apache.flink.core.fs.FileSystem}} through the plugin class loader 
(correctly), but later in the "re-instantiate" branch it's ignoring the plugin 
class loader/{{PluginManager}} classes altogether, which means it will be 
looking for non existing classes in the parent class loader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-14 Thread Till Rohrmann
Clearing the `flink.size` option and setting `process.size` could indeed be
a solution. The thing I'm wondering is what would happen if the user has
configured `task.heap.size` and `managed.size` instead of `flink.size`?
Would we also ignore these settings? If not, then we risk to run into the
situation that TaskExecutorResourceUtils fails because the memory does not
add up. I guess the thing which bugs me a bit is the special casing which
could lead easily into inconsistent behaviour if we don't cover all cases.
The consequence of using slightly different concepts (`flink.size`,
`process.size`) in standalone vs. container/Yarn/Mesos mode in order to
keep the status quo is an increased maintenance overhead which we should be
aware of.

Cheers,
Till

On Tue, Jan 14, 2020 at 3:59 AM Xintong Song  wrote:

> True, even we have "process.size" rather than "flink.size" in the default
> config file, user can still have "flink.size" in their custom config file.
> If we consider "-tm" as a shortcut for users to override the TM memory
> size, then it makes less sense to require users to remove "flink.size" from
> their config file whenever then want to use "-tm".
>
> I'm convinced that ignoring "flink.size" might not be a bad idea.
> And I think we should also update the description of "-tm" (in
> "FlinkYarnSessionCli") to explicitly mention that it would overwrite
> "flink.size" and "process.size".
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 14, 2020 at 2:24 AM Stephan Ewen  wrote:
>
> > Would be good to hear the thoughts of some more Yarn users, though.
> >
> > On Mon, Jan 13, 2020 at 7:23 PM Stephan Ewen  wrote:
> >
> > > I think we need an interpretation of "-tm" regardless of what is in the
> > > default configuration, because we can always have a modified
> > configuration
> > > and then a user passes the "-tm" flag.
> > >
> > > I kind of like the first proposal: Interpret "-tm" as "override memory
> > > size config and set the Yarn TM container size." It would mean exactly
> > > ignoring "taskmanager.memory.flink.size" and using the "-tm" value as "
> > > "taskmanager.memory.process.size".
> > > That does not sound too bad to me.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Mon, Jan 13, 2020 at 5:35 PM Andrey Zagrebin 
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> While working on changing process memory to Flink memory in default
> > >> configuration, Xintong encountered a problem.
> > >> When -tm option is used to rewrite container memory, basically process
> > >> memory, it can collide with the default Flink memory.
> > >> For legacy users it should not be a problem as we adjusted the legacy
> > heap
> > >> size option to be interpreted differently for standalone and container
> > >> modes.
> > >>
> > >> One solution could be to say in -tm docs that we rewrite both options
> > >> under
> > >> the hood: process and Flink memory, basically unset Flink memory from
> > yaml
> > >> config.
> > >> The downside is that this adds more magic.
> > >>
> > >> Alternatively, we can keep process memory in default config and, as
> > >> mentioned before, increase it to maintain the user experience by
> > matching
> > >> the previous default setting for heap (now Flink in standalone) size.
> > >> The Flink memory can be mentioned in process memory comment as a
> simpler
> > >> alternative which does not require accounting for JVM overhead.
> > >> The downside is again more confusion while trying out Flink and tuning
> > >> memory at the same time.
> > >> On the other hand, if memory already needs to be tuned it should
> > >> quite quickly lead to the necessity of understanding the memory model
> in
> > >> Flink.
> > >>
> > >> Best,
> > >> Andrey
> > >>
> > >> On Thu, Jan 9, 2020 at 12:27 PM Stephan Ewen 
> wrote:
> > >>
> > >> > Great! Thanks, guys, for the continued effort on this topic!
> > >> >
> > >> > On Thu, Jan 9, 2020 at 5:27 AM Xintong Song 
> > >> wrote:
> > >> >
> > >> > > Thanks all for the discussion. I believe we have get consensus on
> > all
> > >> the
> > >> > > open questions discussed in this thread.
> > >> > >
> > >> > > Since Andrey already create a jira ticket for renaming shuffle
> > memory
> > >> > > config keys with "taskmanager.memory.network.*", I'll create
> ticket
> > >> for
> > >> > the
> > >> > > other topic that puts flink.size in flink-conf.yaml.
> > >> > >
> > >> > > Thank you~
> > >> > >
> > >> > > Xintong Song
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Wed, Jan 8, 2020 at 9:39 PM Andrey Zagrebin <
> > azagre...@apache.org>
> > >> > > wrote:
> > >> > >
> > >> > > > It also looks to me that we should only swap network and memory
> in
> > >> the
> > >> > > > option names: 'taskmanager.memory.network.*'.
> > >> > > > There is no strong consensus towards using new 'shuffle' naming
> so
> > >> we
> > >> > can
> > >> > > > just rename it to  'taskmanager.memory.network.*' as 'shuffle'
> > >> naming
> > >> > has
> > >> > > > never been released.
> > >> > > > When we have 

[VOTE][RESULT] FLIP-90: Support SQL 2016-2017 JSON functions in Flink SQL

2020-01-14 Thread Forward Xu
 Hi all,

 Votes on FLIP-90 have been supported by 3 commitors. Thanks everyone for
voting. Related discussions in [1] & [2] & [3]. The results of this vote
are in [4] & [5].

 Best,
 Forward

[1]
https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-90-Support-SQL-2016-2017-JSON-functions-in-Flink-SQL-tt36532.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-90-Support-SQL-2016-2017-JSON-functions-in-Flink-SQL-tt36341.html


[jira] [Created] (FLINK-15585) Improve function identifier string in plan digest

2020-01-14 Thread Jark Wu (Jira)
Jark Wu created FLINK-15585:
---

 Summary: Improve function identifier string in plan digest
 Key: FLINK-15585
 URL: https://issues.apache.org/jira/browse/FLINK-15585
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jark Wu


Currently, we are using {{UserDefinedFunction#functionIdentifier}} as the 
identifier string of UDFs in plan digest, for example: 

{code:java}
LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc1$8050927803993624f40152a838c98018($2)],
 rowType=...)
{code}

However, the result of {{UserDefinedFunction#functionIdentifier}} will change 
if we just add a method in UserDefinedFunction, because it uses Java 
serialization. Then we have to update 60 plan tests which is very annoying. 

In the other hand, displaying the function identifier string in operator name 
in Web UI is verbose to users. 

In order to improve this situation, there are something we can do:
1) If the UDF has a catalog function name, we can just use the catalog name as 
the digest. Otherwise, fallback to (2). 
2) If the UDF doesn't contain fields, we just use the full calss name as the 
digest. Otherwise, fallback to (3).
3) Use identifier string which will do the full serialization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15584) Give nested data type of ROWs in ValidationException

2020-01-14 Thread Jira
Benoît Paris created FLINK-15584:


 Summary: Give nested data type of ROWs in ValidationException
 Key: FLINK-15584
 URL: https://issues.apache.org/jira/browse/FLINK-15584
 Project: Flink
  Issue Type: Improvement
Reporter: Benoît Paris


In

 

 
{code:java}
INSERT INTO baz_sinkSELECT
  a,
  ROW(b, c)
FROM foo_source{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-90: Support SQL 2016-2017 JSON functions in Flink SQL

2020-01-14 Thread Hequn Cheng
+1
Thanks a lot for driving this. @ForwardXu

Best,
Hequn

On Mon, Jan 13, 2020 at 10:07 AM Kurt Young  wrote:

> +1
>
> Best,
> Kurt
>
>
> On Tue, Jan 7, 2020 at 2:59 PM Jingsong Li  wrote:
>
> > +1 non-binding. Thanks Forward for driving this.
> >
> > Considering that it is made up of independent and certain things from
> > SQL standard and Calcite, I think it can be started as soon as possible.
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Dec 31, 2019 at 5:09 PM Forward Xu 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start the vote of FLIP-90 [1] since that we have reached an
> > > agreement on the design in the discussion thread [2].
> > >
> > > This vote will be open for at least 72 hours. Unless there is an
> > objection,
> > > I will try to close it by January 3, 2020 08:00 UTC if we have received
> > > sufficient votes.
> > >
> > > Best,
> > > ForwardXu
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550
> > > [2]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: I wan to subscribe to Development related discussions topics

2020-01-14 Thread tison
forward to Mehmet.

Best,
tison.


Benchao Li  于2020年1月14日周二 下午7:28写道:

> Hi Mehmet,
>
> You can subscribe to dev mailing list by sending a email to
> *dev-subscr...@flink.apache.org
> , *not dev@flink.apache.org.
> Hope this helps.
>
> Mehmet Ozan Güven  于2020年1月14日周二 下午7:25写道:
>
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: I wan to subscribe to Development related discussions topics

2020-01-14 Thread Benchao Li
Hi Mehmet,

You can subscribe to dev mailing list by sending a email to
*dev-subscr...@flink.apache.org
, *not dev@flink.apache.org.
Hope this helps.

Mehmet Ozan Güven  于2020年1月14日周二 下午7:25写道:

>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


I wan to subscribe to Development related discussions topics

2020-01-14 Thread Mehmet Ozan Güven



[jira] [Created] (FLINK-15583) Scala walkthrough archetype does not compile on Java 11

2020-01-14 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-15583:
---

 Summary: Scala walkthrough archetype does not compile on Java 11
 Key: FLINK-15583
 URL: https://issues.apache.org/jira/browse/FLINK-15583
 Project: Flink
  Issue Type: Bug
  Components: Quickstarts
Affects Versions: 1.10.0
Reporter: Arvid Heise


While compiling a projected created with walkthrough archetype, the following 
error occurs
{noformat}
02:55:58.048 [ERROR] error: java.lang.NoClassDefFoundError: 
javax/tools/ToolProvider
02:55:58.048 [INFO] at 
scala.reflect.io.JavaToolsPlatformArchive.iterator(ZipArchive.scala:301){noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-14 Thread Till Rohrmann
+1 for the JVM metaspace and overhead changes.

On Tue, Jan 14, 2020 at 11:19 AM Till Rohrmann  wrote:

> I guess one of the most important results of this experiment is to have a
> good tuning guide available for users who are past the initial try-out
> phase because the default settings will be kind of a compromise. I assume
> that this is part of the outstanding FLIP-49 documentation task.
>
> If we limit RocksDB's memory consumption by default, then I believe that
> 0.4 would give the better all-round experience as it leaves a bit more
> memory for RocksDB. However, I'm a bit sceptical whether we should optimize
> the default settings for a configuration where the user still needs to
> activate the strict memory limiting for RocksDB. In this case, I would
> expect that the user could also adapt the managed memory fraction.
>
> Cheers,
> Till
>
> On Tue, Jan 14, 2020 at 3:39 AM Xintong Song 
> wrote:
>
>> Thanks for the feedback, Stephan and Kurt.
>>
>> @Stephan
>>
>> Regarding managed memory fraction,
>> - It makes sense to keep the default value 0.4, if we assume rocksdb
>> memory is limited by default.
>> - AFAIK, currently rocksdb by default does not limit its memory usage.
>> And I'm positive to change it.
>> - Personally, I don't like the idea that we the out-of-box experience
>> (for which we set the default fraction) relies on that users will manually
>> turn another switch on.
>>
>> Regarding framework heap memory,
>> - The major reason we set it by default is, as you mentioned, that to
>> have a safe net of minimal JVM heap size.
>> - Also, considering the in progress FLIP-56 (dynamic slot allocation), we
>> want to reserve some heap memory that will not go into the slot profiles.
>> That's why we decide the default value according to the heap memory usage
>> of an empty task executor.
>>
>> @Kurt
>> Regarding metaspace,
>> - This config option ("taskmanager.memory.jvm-metaspace") only takes
>> effect on TMs. Currently we do not set metaspace size for JM.
>> - If we have the same metaspace problem on TMs, then yes, changing it
>> from 128M to 64M will make it worse. However, IMO 10T tpc-ds benchmark
>> should not be considered as out-of-box experience and it makes sense to
>> tune the configurations for it. I think the smaller metaspace size would be
>> a better choice for the first trying-out, where a job should not be too
>> complicated, the TM size could be relative small (e.g. 1g).
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Jan 14, 2020 at 9:38 AM Kurt Young  wrote:
>>
>>> HI Xingtong,
>>>
>>> IIRC during our tpc-ds 10T benchmark, we have suffered by JM's metaspace
>>> size and full gc which
>>> caused by lots of classloadings of source input split. Could you check
>>> whether changing the default
>>> value from 128MB to 64MB will make it worse?
>>>
>>> Correct me if I misunderstood anything, also cc @Jingsong
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Tue, Jan 14, 2020 at 3:44 AM Stephan Ewen  wrote:
>>>
 Hi all!

 Thanks a lot, Xintong, for this thorough analysis. Based on your
 analysis,
 here are some thoughts:

 +1 to change default JVM metaspace size from 128MB to 64MB
 +1 to change default JVM overhead min size from 128MB to 196MB

 Concerning the managed memory fraction, I am not sure I would change it,
 for the following reasons:

   - We should assume RocksDB will be limited to managed memory by
 default.
 This will either be active by default or we would encourage everyone to
 use
 this by default, because otherwise it is super hard to reason about the
 RocksDB footprint.
   - For standalone, a managed memory fraction of 0.3 is less than half
 of
 the managed memory from 1.9.
   - I am not sure if the managed memory fraction is a value that all
 users
 adjust immediately when scaling up the memory during their first try-out
 phase. I would assume that most users initially only adjust
 "memory.flink.size" or "memory.process.size". A value of 0.3 will lead
 to
 having too large heaps and very little RocksDB / batch memory even when
 scaling up during the initial exploration.
   - I agree, though, that 0.5 looks too aggressive, from your
 benchmarks.
 So maybe keeping it at 0.4 could work?

 And one question: Why do we set the Framework Heap by default? Is that
 so
 we reduce the managed memory further is less than framework heap would
 be
 left from the JVM heap?

 Best,
 Stephan

 On Thu, Jan 9, 2020 at 10:54 AM Xintong Song 
 wrote:

 > Hi all,
 >
 > As described in FLINK-15145 [1], we decided to tune the default
 > configuration values of FLIP-49 with more jobs and cases.
 >
 > After spending time analyzing and tuning the configurations, I've come
 > with several findings. To be brief, I would suggest the following
 changes,
 > and for more details please 

Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-14 Thread Till Rohrmann
I guess one of the most important results of this experiment is to have a
good tuning guide available for users who are past the initial try-out
phase because the default settings will be kind of a compromise. I assume
that this is part of the outstanding FLIP-49 documentation task.

If we limit RocksDB's memory consumption by default, then I believe that
0.4 would give the better all-round experience as it leaves a bit more
memory for RocksDB. However, I'm a bit sceptical whether we should optimize
the default settings for a configuration where the user still needs to
activate the strict memory limiting for RocksDB. In this case, I would
expect that the user could also adapt the managed memory fraction.

Cheers,
Till

On Tue, Jan 14, 2020 at 3:39 AM Xintong Song  wrote:

> Thanks for the feedback, Stephan and Kurt.
>
> @Stephan
>
> Regarding managed memory fraction,
> - It makes sense to keep the default value 0.4, if we assume rocksdb
> memory is limited by default.
> - AFAIK, currently rocksdb by default does not limit its memory usage. And
> I'm positive to change it.
> - Personally, I don't like the idea that we the out-of-box experience (for
> which we set the default fraction) relies on that users will manually turn
> another switch on.
>
> Regarding framework heap memory,
> - The major reason we set it by default is, as you mentioned, that to have
> a safe net of minimal JVM heap size.
> - Also, considering the in progress FLIP-56 (dynamic slot allocation), we
> want to reserve some heap memory that will not go into the slot profiles.
> That's why we decide the default value according to the heap memory usage
> of an empty task executor.
>
> @Kurt
> Regarding metaspace,
> - This config option ("taskmanager.memory.jvm-metaspace") only takes
> effect on TMs. Currently we do not set metaspace size for JM.
> - If we have the same metaspace problem on TMs, then yes, changing it from
> 128M to 64M will make it worse. However, IMO 10T tpc-ds benchmark should
> not be considered as out-of-box experience and it makes sense to tune the
> configurations for it. I think the smaller metaspace size would be a better
> choice for the first trying-out, where a job should not be too complicated,
> the TM size could be relative small (e.g. 1g).
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 14, 2020 at 9:38 AM Kurt Young  wrote:
>
>> HI Xingtong,
>>
>> IIRC during our tpc-ds 10T benchmark, we have suffered by JM's metaspace
>> size and full gc which
>> caused by lots of classloadings of source input split. Could you check
>> whether changing the default
>> value from 128MB to 64MB will make it worse?
>>
>> Correct me if I misunderstood anything, also cc @Jingsong
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Jan 14, 2020 at 3:44 AM Stephan Ewen  wrote:
>>
>>> Hi all!
>>>
>>> Thanks a lot, Xintong, for this thorough analysis. Based on your
>>> analysis,
>>> here are some thoughts:
>>>
>>> +1 to change default JVM metaspace size from 128MB to 64MB
>>> +1 to change default JVM overhead min size from 128MB to 196MB
>>>
>>> Concerning the managed memory fraction, I am not sure I would change it,
>>> for the following reasons:
>>>
>>>   - We should assume RocksDB will be limited to managed memory by
>>> default.
>>> This will either be active by default or we would encourage everyone to
>>> use
>>> this by default, because otherwise it is super hard to reason about the
>>> RocksDB footprint.
>>>   - For standalone, a managed memory fraction of 0.3 is less than half of
>>> the managed memory from 1.9.
>>>   - I am not sure if the managed memory fraction is a value that all
>>> users
>>> adjust immediately when scaling up the memory during their first try-out
>>> phase. I would assume that most users initially only adjust
>>> "memory.flink.size" or "memory.process.size". A value of 0.3 will lead to
>>> having too large heaps and very little RocksDB / batch memory even when
>>> scaling up during the initial exploration.
>>>   - I agree, though, that 0.5 looks too aggressive, from your benchmarks.
>>> So maybe keeping it at 0.4 could work?
>>>
>>> And one question: Why do we set the Framework Heap by default? Is that so
>>> we reduce the managed memory further is less than framework heap would be
>>> left from the JVM heap?
>>>
>>> Best,
>>> Stephan
>>>
>>> On Thu, Jan 9, 2020 at 10:54 AM Xintong Song 
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > As described in FLINK-15145 [1], we decided to tune the default
>>> > configuration values of FLIP-49 with more jobs and cases.
>>> >
>>> > After spending time analyzing and tuning the configurations, I've come
>>> > with several findings. To be brief, I would suggest the following
>>> changes,
>>> > and for more details please take a look at my tuning report [2].
>>> >
>>> >- Change default managed memory fraction from 0.4 to 0.3.
>>> >- Change default JVM metaspace size from 128MB to 64MB.
>>> >- Change default JVM overhead min size from 128MB to 196MB.
>>> >
>>> > Looking 

[jira] [Created] (FLINK-15582) Enable batch scheduling tests in LegacySchedulerBatchSchedulingTest for DefaultScheduler as well

2020-01-14 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-15582:
---

 Summary: Enable batch scheduling tests in 
LegacySchedulerBatchSchedulingTest for DefaultScheduler as well
 Key: FLINK-15582
 URL: https://issues.apache.org/jira/browse/FLINK-15582
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


{{testSchedulingOfJobWithFewerSlotsThanParallelism}} is a common case but it is 
only tested with legacy scheduler in {{LegacySchedulerBatchSchedulingTest}} at 
the moment.
We should enable it for DefaultScheduler as well. 
This also allows us to safely remove {{LegacySchedulerBatchSchedulingTest}} 
when we are removing the LegacyScheduler and related components without loosing 
test coverage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)