Re: Flink cannot recognized catalog set by registerCatalog.

2019-08-12 Thread Simon Su
Hi Jark


Thanks for your reply. 


It’s weird that In this case the tableEnv provide the api called 
“registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is 
necessary, but a unify way to use tableEnv is also important.


Thanks,
SImon


On 08/13/2019 12:27,Jark Wu wrote:
I think we might need to improve the javadoc of 
tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 


"Registers an external TableSink with already configured field names and field 
types in this TableEnvironment's catalog."


But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 


Regards,
Jark


On Tue, 13 Aug 2019 at 10:52, Xuefu Z  wrote:

Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su  wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .withBuiltInCatalogName("ca1")
> .withBuiltInDatabaseName("db1")
> .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z  wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su  wrote:
>
>> Hi All
>> I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> 

Re: Flink cannot recognized catalog set by registerCatalog.

2019-08-12 Thread Jark Wu
I think we might need to improve the javadoc of
tableEnv.registerTableSource/registerTableSink.
Currently, the comment says

"Registers an external TableSink with already configured field names and
field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one?
I think, it would be better to improve the description and add a NOTE on
it.

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z  wrote:

> Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
> To create table in your custom catalog, you could use
> tableEnv.sqlUpdate("create table ").
>
> Thanks,
> Xuefu
>
> On Mon, Aug 12, 2019 at 6:17 PM Simon Su  wrote:
>
> > Hi Xuefu
> >
> > Thanks for you reply.
> >
> > Actually I have tried it as your advises. I have tried to call
> > tableEnv.useCatalog and useDatabase. Also I have tried to use
> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
> > that when I call tableEnv.registerTableSource, it’s always use a
> “build-in”
> > Catalog and Database rather than the custom one. So if I want to use a
> > custom one, I have to write code like this:
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> > EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .withBuiltInCatalogName("ca1")
> > .withBuiltInDatabaseName("db1")
> > .build());
> >
> >
> > As Dawid said, if I want to store in my custom catalog, I can call
> > catalog.createTable or using DDL.
> >
> > Thanks,
> > SImon
> >
> > On 08/13/2019 02:55,Xuefu Z 
> wrote:
> >
> > Hi Simon,
> >
> > Thanks for reporting the problem. There is some rough edges around
> catalog
> > API and table environments, and we are improving post 1.9 release.
> >
> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> > Flink's CatalogManager, It doens't change the default catalog/database as
> > you expected. To switch to your newly registered catalog, you could call
> > tableEnv.useCatalog() and .useDatabase().
> >
> > As an alternative, you could fully qualify your table name with a
> > "catalog.db.table" syntax without switching current catalog/database.
> >
> > Please try those and let me know if you find new problems.
> >
> > Thanks,
> > Xuefu
> >
> >
> >
> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su  wrote:
> >
> >> Hi All
> >> I want to use a custom catalog by setting the name “ca1” and create
> a
> >> database under this catalog. When I submit the
> >> SQL, and it raises the error like :
> >>
> >>
> >> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From
> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> >> within 'ca1.db1'
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within
> >> 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> 

Re: Flink cannot recognized catalog set by registerCatalog.

2019-08-12 Thread Xuefu Z
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su  wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .withBuiltInCatalogName("ca1")
> .withBuiltInDatabaseName("db1")
> .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z  wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su  wrote:
>
>> Hi All
>> I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> 

Re: Flink cannot recognized catalog set by registerCatalog.

2019-08-12 Thread Simon Su
Hi Xuefu


Thanks for you reply. 


Actually I have tried it as your advises. I have tried to call 
tableEnv.useCatalog and useDatabase. Also I have tried to use 
“catalogname.databasename.tableName”  in SQL. I think the root cause is that 
when I call tableEnv.registerTableSource, it’s always use a “build-in”
Catalog and Database rather than the custom one. So if I want to use a custom 
one, I have to write code like this:


StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("ca1")
.withBuiltInDatabaseName("db1")
.build());


As Dawid said, if I want to store in my custom catalog, I can call 
catalog.createTable or using DDL.


Thanks,
SImon


On 08/13/2019 02:55,Xuefu Z wrote:
Hi Simon,


Thanks for reporting the problem. There is some rough edges around catalog API 
and table environments, and we are improving post 1.9 release.


Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in 
Flink's CatalogManager, It doens't change the default catalog/database as you 
expected. To switch to your newly registered catalog, you could call 
tableEnv.useCatalog() and .useDatabase().


As an alternative, you could fully qualify your table name with a 
"catalog.db.table" syntax without switching current catalog/database.


Please try those and let me know if you find new problems.


Thanks,
Xuefu







On Mon, Aug 12, 2019 at 12:38 AM Simon Su  wrote:

Hi All
I want to use a custom catalog by setting the name “ca1” and create a 
database under this catalog. When I submit the
SQL, and it raises the error like :


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
SQL validation failed. From line 1, column 98 to line 1, column 116: Object 
'orderstream' not found within 'ca1.db1'
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at 
org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at 
org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: 

Re: JDBC sink for streams API

2019-08-12 Thread Zili Chen
Hi Eduardo,

JDBCSinkFunction is a package-private class which you can make use of
by JDBCAppendTableSink. A typical statement could be

new JDBCAppendTableSink.builder()
   . ...
   .build()
   .consumeDataStream(upstream);

Also JDBCUpsertTableSink and JDBCTableSourceSinkFactory are worth to give a
look.

The successor of OutputFormatSinkFunction is StreamingFileSink, but it
seems out of
your requirements.

Best,
tison.


Eduardo Winpenny Tejedor  于2019年8月13日周二
上午7:59写道:

> Hi all,
>
> Could someone point me to the current advised way of adding a JDBC sink?
>
> Online I've seen one can DataStream.writeUsingOutputFormat() however I see
> the OutputFormatSinkFunction is deprecated.
>
> I can also see a JDBCSinkFunction (or JDBCUpsertSinkFunction) but that is
> "package private" so I can't use it myself. JDBCAppendSinkTable makes use
> of it but that doesn't work with the Streams API...I tried simply copying
> its code into a class of my own alas the flush method in JDBCOutputFormat
> is also "package private"...
>
> JDBC does not receive a lot of attention in Fabian's and Vasili's Stream
> Processing with Apache Flink or in the online documentation - is there a
> reason for this? Is it bad practice?
>
>
> Regards,
> Eduardo
>


JDBC sink for streams API

2019-08-12 Thread Eduardo Winpenny Tejedor
Hi all,

Could someone point me to the current advised way of adding a JDBC sink?

Online I've seen one can DataStream.writeUsingOutputFormat() however I see
the OutputFormatSinkFunction is deprecated.

I can also see a JDBCSinkFunction (or JDBCUpsertSinkFunction) but that is
"package private" so I can't use it myself. JDBCAppendSinkTable makes use
of it but that doesn't work with the Streams API...I tried simply copying
its code into a class of my own alas the flush method in JDBCOutputFormat
is also "package private"...

JDBC does not receive a lot of attention in Fabian's and Vasili's Stream
Processing with Apache Flink or in the online documentation - is there a
reason for this? Is it bad practice?


Regards,
Eduardo


Re: How can I pass multiple java options in standalone mode ?

2019-08-12 Thread Zili Chen
Hi Vishwas,

Replace ',' with ' '(space) should work.

Best,
tison.


Vishwas Siravara  于2019年8月13日周二 上午6:50写道:

> Hi guys,
> I have this entry in flink-conf.yaml file for jvm options.
> env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir
> }}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir
> }}/krb5.conf"
>
> Is this supposed to be a , separated list ? I get a parse exception when
> the cluster starts.
>
> Thanks,
> Vishwas
>


How can I pass multiple java options in standalone mode ?

2019-08-12 Thread Vishwas Siravara
Hi guys,
I have this entry in flink-conf.yaml file for jvm options.
env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir
}}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir
}}/krb5.conf"

Is this supposed to be a , separated list ? I get a parse exception when
the cluster starts.

Thanks,
Vishwas


Re: Status of the Integration of Flink with Hive

2019-08-12 Thread David Morin
Thanks a lot Bowen.
I've started reading these docs. Really helpful. It's a good description of
the Hive integration in Flink and how to use it.
I continue my dev.
See you soon


Le lun. 12 août 2019 à 20:55, Bowen Li  a écrit :

> Hi David,
>
> Check out Hive related documentations:
>
> -
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/catalog.html
> -
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive_integration.html
> -
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive_integration_example.html
>
> Note
> - I just merged a PR restructuring Hive related docs today, changes should
> reflect on the website in a day or so
> - I didn't find release-1.9-snapshot's doc, so just reference
> release-1.10-snapshot's doc for now. 1.9 rc2 has been released, official
> 1.9 should be out soon
> - Hive features are in beta in 1.9
>
> Feel free to open tickets if you have feature requests.
>
>
> On Fri, Aug 9, 2019 at 8:00 AM David Morin 
> wrote:
>
>> Hi,
>>
>> I want to connect my Flink streaming job to Hive.
>> At the moment, what is the best way to connect to Hive.
>> Some features seems to be in development.
>> Some really cool features have been described here:
>> https://fr.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019
>> My first need is to read and update Hive metadata.
>> Concerning the Hive data I can store them directly in HDFS (as Orc
>> format) in a first step.
>> thx.
>>
>> David
>>
>>


[ANNOUNCE] Seattle Flink Meetup at Uber on 8/22

2019-08-12 Thread Bowen Li
Hi All !

Join our next Seattle Flink Meetup at Uber Seattle, featuring talks of
[Flink + Kappa+ @ Uber] and [Flink + Pulsar for streaming-first, unified
data processing].

- TALK #1: Moving from Lambda and Kappa Architectures to Kappa+ with Flink
at Uber
- TALK #2: When Apache Pulsar meets Apache Flink

Checkout event details and RSVP at
https://www.meetup.com/seattle-flink/events/263782233/ . See you soon!

Bowen


Re: Flink cannot recognized catalog set by registerCatalog.

2019-08-12 Thread Xuefu Z
Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog
API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
Flink's CatalogManager, It doens't change the default catalog/database as
you expected. To switch to your newly registered catalog, you could call
tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a
"catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su  wrote:

> Hi All
> I want to use a custom catalog by setting the name “ca1” and create a
> database under this catalog. When I submit the
> SQL, and it raises the error like :
>
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed. From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
> 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>
> It seems that Calcite cannot find the source object as expected, After I
> debug the code I found that when using 

Re: Status of the Integration of Flink with Hive

2019-08-12 Thread Bowen Li
Hi David,

Check out Hive related documentations:

-
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/catalog.html
-
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive_integration.html
-
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive_integration_example.html

Note
- I just merged a PR restructuring Hive related docs today, changes should
reflect on the website in a day or so
- I didn't find release-1.9-snapshot's doc, so just reference
release-1.10-snapshot's doc for now. 1.9 rc2 has been released, official
1.9 should be out soon
- Hive features are in beta in 1.9

Feel free to open tickets if you have feature requests.


On Fri, Aug 9, 2019 at 8:00 AM David Morin 
wrote:

> Hi,
>
> I want to connect my Flink streaming job to Hive.
> At the moment, what is the best way to connect to Hive.
> Some features seems to be in development.
> Some really cool features have been described here:
> https://fr.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019
> My first need is to read and update Hive metadata.
> Concerning the Hive data I can store them directly in HDFS (as Orc format)
> in a first step.
> thx.
>
> David
>
>


Custom Windows/Triggers/Evictors examples or tutorials

2019-08-12 Thread Yoandy Rodríguez
Hello Again,

I'm looking for some examples of how to implement custom
windows/triggers/evictors using flink 1.6.4, a simple search gives me
surprisingly little about the subject. Can you point me to some repos,
talks, tutorials?

-- 
Best Regards
Yoandy Rodríguez




Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

`transaction.timeout.ms` is a producer setting, thus you can increase
it accordingly.

Note, that brokers bound the range via `transaction.max.timeout.ms`;
thus, you may need to increase this broker configs, too.


- -Matthias

On 8/12/19 2:43 AM, Piotr Nowojski wrote:
> Hi,
> 
> Ok, I see. You can try to rewrite your logic (or maybe records
> schema by adding some ID fields) to manually deduplicating the
> records after processing them with at least once semantic. Such
> setup is usually simpler, with slightly better throughput and
> significantly better latency (end-to-end exactly once latency is
> limited by checkpointing time).
> 
> Piotrek
> 
>> On 12 Aug 2019, at 11:12, Tony Wei > > wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks a lot. I need exactly once in my use case, but instead of 
>> having the risk of losing data, at least once is more acceptable
>> when error occurred.
>> 
>> Best, Tony Wei
>> 
>> Piotr Nowojski > > 於 2019年8月12日 週一 下午3:27寫道:
>> 
>> Hi,
>> 
>> Yes, if it’s due to transaction timeout you will lose the data.
>> 
>> Whether can you fallback to at least once, that depends on
>> Kafka, not on Flink, since it’s the Kafka that timeouts those 
>> transactions and I don’t see in the documentation anything that 
>> could override this [1]. You might try disabling the mechanism
>> via setting 
>> `transaction.abort.timed.out.transaction.cleanup.interval.ms 
>> 
`
>>
>> 
or `transaction.remove.expired.transaction.cleanup.interval.ms
>> `
,
>>
>> 
but that’s question more to Kafka guys. Maybe Becket could help
>> with this.
>> 
>> Also it MIGHT be that Kafka doesn’t remove records from the
>> topics when aborting the transaction and MAYBE you can still
>> access them via “READ_UNCOMMITTED” mode. But that’s again,
>> question to Kafka.
>> 
>> Sorry that I can not help more.
>> 
>> If you do not care about exactly once, why don’t you just set
>> the connector to at least once mode?
>> 
>> Piotrek
>> 
>>> On 12 Aug 2019, at 06:29, Tony Wei >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I had the same exception recently. I want to confirm that if
>>> it is due to transaction timeout, then I will lose those data.
>>> Am I right? Can I make it fall back to at least once semantic
>>> in this situation?
>>> 
>>> Best, Tony Wei
>>> 
>>> Piotr Nowojski >> > 於 2018年3月21日 週三 下午10:28 寫道:
>>> 
>>> Hi,
>>> 
>>> But that’s exactly the case: producer’s transaction timeout 
>>> starts when the external transaction starts - but 
>>> FlinkKafkaProducer011 keeps an active Kafka transaction for the
>>> whole period between checkpoints.
>>> 
>>> As I wrote in the previous message:
>>> 
 in case of failure, your timeout must also be able to cover
>>> the additional downtime required for the successful job 
>>> restart. Thus you should increase your timeout accordingly.
>>> 
>>> I think that 15 minutes timeout is a way too small value. If 
>>> your job fails because of some intermittent failure (for 
>>> example worker crash/restart), you will only have a couple of 
>>> minutes for a successful Flink job restart. Otherwise you will
>>> lose some data (because of the transaction timeouts).
>>> 
>>> Piotrek
>>> 
 On 21 Mar 2018, at 10:30, Dongwon Kim >>> > wrote:
 
 Hi Piotr,
 
 Now my streaming pipeline is working without retries. I
 decreased Flink's checkpoint interval from 15min to 10min as
 you suggested [see screenshot_10min_ckpt.png].
 
 I though that producer's transaction timeout starts when the 
 external transaction starts. The truth is that Producer's
 transaction timeout starts after the last external checkpoint
 is committed. Now that I have 15min for Producer's
 transaction timeout and 10min for Flink's checkpoint
 interval, and every checkpoint takes less than 5 minutes,
 everything is working fine. Am I right?
 
 Anyway thank you very much for the detailed explanation!
 
 Best,
 
 Dongwon
 
 
 
 On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
 mailto:pi...@data-artisans.com>> 
 wrote:
 
 Hi,
 
 Please increase transaction.timeout.ms 
  to a greater value or 
 decrease Flink’s checkpoint interval, I’m pretty sure the
 issue here is that those two values are overlapping. I think
 that’s even visible on the screenshots. First checkpoint
 completed started at 14:28:48 and ended at 14:30:43, while
 the second one started at 14:45:53 and ended at 14:49:16.
 That gives you minimal transaction duration of 15 minutes and
 10 seconds, with maximal transaction duration of 21 minutes.
 
 In HAPPY 

I'm not able to make a stream-stream Time windows JOIN in Flink SQL

2019-08-12 Thread Theo Diefenthal
Hi there, 

Currently, I'm trying to write a SQL query which shall executed a time 
windowed/bounded JOIN on two data streams. 

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute 
id, ts, userName. I want to receive the natural JOIN of both streams with 
events of the same day. 

In Oracle (With a ts column as number instead of Timestamp, for historical 
reasons), I do the following: 

SELECT * 
FROM STREAM1 
JOIN STREAM2 ON STREAM1. "user" = STREAM2. "userName" 
AND TRUNC ( TO_DATE ( '19700101' , 'MMDD' ) + ( 1 / 24 / 60 / 60 / 1000 ) * 
STREAM1. "ts" ) = TRUNC ( TO_DATE ( '19700101' , 'MMDD' ) + ( 1 / 24 / 60 / 
60 / 1000 ) * STREAM2. "ts" ); 
which yields 294 rows with my test data (14 elements from stream1 match to 21 
elements in stream2 on the one day of test data). Now I want to query the same 
in Flink. So I registered both streams as table and properly registered the 
even-time (by specifying ts.rowtime as table column). 

My goal is to produce a time-windowed JOIN so that, if both streams advance 
their watermark far enough, an element is written out into an append only 
stream. 

First try (to conform time-bounded-JOIN conditions): 
SELECT s1.id, s2.id 
FROM STREAM1 AS s1 
JOIN STREAM2 AS s2 
ON s1.`user` = s2.userName 
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR 
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR 
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY ) = TUMBLE_START(s2.ts, INTERVAL '1' 
DAY ) -- Reduce to matchings on the same day. 
This yielded in the exception "Rowtime attributes must not be in the input rows 
of a regular join. As a workaround you can cast the time attributes of input 
tables to TIMESTAMP before.". So I'm still in the area of regular joins, not 
time-windowed JOINs, even though I made the explicit BETWEEN for both input 
streams! 

Then I found [1], which really is my query but without the last condition 
(reduce to matching on the same day). I tried this one as well, just to have a 
starting point, but the error is the same. 
I then reduced the Condition to just one time bound: 
SELECT s1.id, s2.id 
FROM STREAM1 AS s1 
JOIN STREAM2 AS s2 
ON s1.`user` = s2.userName 
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR 
which runs as a query but doesn't produce any results. Most likely because 
Flink still thinks of a regular join instead of a time-window JOIN and doesn't 
emit any resutls. (FYI interest, after executing the query, I convert the Table 
back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests). 

My questions are now: 
1. How do I see if Flink treats my table result as a regular JOIN result or a 
time-bounded JOIN? 
2. What is the proper way to formulate my initial query, finding all matching 
events within the same tumbling window? 

Best regards 
Theo Diefenthal 

[1] [ 
https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
 | 
https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
 ] Slide 18 


Re: Why available task slots are not leveraged for pipeline?

2019-08-12 Thread Zhu Zhu
Hi Cam,

Zili is correct.
Each shared slot can at most host one instance of each different
task(JobVertex). So you will have at most 13 tasks in each slot.
As shown in
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources
.

To specify its parallelism individually, you can invoke setParallelism on
each operator.

Thanks,
Zhu Zhu

Zili Chen  于2019年8月12日周一 下午8:00写道:

> Hi Cam,
>
> If you set parallelism to 60, then you would make use of all 60 slots you
> have and
> for you case, each slot executes a chained operator contains 13 tasks. It
> is not
> the case one slot executes at least 60 sub-tasks.
>
> Best,
> tison.
>
>
> Cam Mach  于2019年8月12日周一 下午7:55写道:
>
>> Hi Zhu and Abhishek,
>>
>> Thanks for your response and pointers. It's correct, the count of
>> parallelism will be the number of slot used for a pipeline. And, the number
>> (or count) of the parallelism is also used to generate number of sub-tasks
>> for each operator. In my case, I have parallelism of 60, it generates 60
>> sub-tasks for each operator. And so it'll be too much for one slot execute
>> at least 60 sub-tasks. I am wondering if there is a way we can set number
>> of generated sub-tasks, different than number of parallelism?
>>
>> Cam Mach
>> Software Engineer
>> E-mail: cammac...@gmail.com
>> Tel: 206 972 2768
>>
>>
>>
>> On Sun, Aug 11, 2019 at 10:37 PM Zhu Zhu  wrote:
>>
>>> Hi Cam,
>>> This case is expected due to slot sharing.
>>> A slot can be shared by one instance of different tasks. So the used
>>> slot is count of your max parallelism of a task.
>>> You can specify the shared group with slotSharingGroup(String
>>> slotSharingGroup) on operators.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Abhishek Jain  于2019年8月12日周一 下午1:23写道:
>>>
 What you'se seeing is likely operator chaining. This is the default
 behaviour of grouping sub tasks to avoid transer overhead (from one slot to
 another). You can disable chaining if you need to. Please refer task
 and operator chains
 
 .

 - Abhishek

 On Mon, 12 Aug 2019 at 09:56, Cam Mach  wrote:

> Hello Flink expert,
>
> I have a cluster with 10 Task Managers, configured with 6 task slot
> each, and a pipeline that has 13 tasks/operators with parallelism of 5. 
> But
> when running the pipeline I observer that only  5 slots are being used, 
> the
> other 55 slots are available/free. It should use all of my slots, right?
> since I have 13 (tasks) x 5 = 65 sub-tasks? What are the configuration 
> that
> I missed in order to leverage all of the available slots for my pipelines?
>
> Thanks,
> Cam
>
>



Re: Why available task slots are not leveraged for pipeline?

2019-08-12 Thread Zili Chen
Hi Cam,

If you set parallelism to 60, then you would make use of all 60 slots you
have and
for you case, each slot executes a chained operator contains 13 tasks. It
is not
the case one slot executes at least 60 sub-tasks.

Best,
tison.


Cam Mach  于2019年8月12日周一 下午7:55写道:

> Hi Zhu and Abhishek,
>
> Thanks for your response and pointers. It's correct, the count of
> parallelism will be the number of slot used for a pipeline. And, the number
> (or count) of the parallelism is also used to generate number of sub-tasks
> for each operator. In my case, I have parallelism of 60, it generates 60
> sub-tasks for each operator. And so it'll be too much for one slot execute
> at least 60 sub-tasks. I am wondering if there is a way we can set number
> of generated sub-tasks, different than number of parallelism?
>
> Cam Mach
> Software Engineer
> E-mail: cammac...@gmail.com
> Tel: 206 972 2768
>
>
>
> On Sun, Aug 11, 2019 at 10:37 PM Zhu Zhu  wrote:
>
>> Hi Cam,
>> This case is expected due to slot sharing.
>> A slot can be shared by one instance of different tasks. So the used slot
>> is count of your max parallelism of a task.
>> You can specify the shared group with slotSharingGroup(String
>> slotSharingGroup) on operators.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Abhishek Jain  于2019年8月12日周一 下午1:23写道:
>>
>>> What you'se seeing is likely operator chaining. This is the default
>>> behaviour of grouping sub tasks to avoid transer overhead (from one slot to
>>> another). You can disable chaining if you need to. Please refer task
>>> and operator chains
>>> 
>>> .
>>>
>>> - Abhishek
>>>
>>> On Mon, 12 Aug 2019 at 09:56, Cam Mach  wrote:
>>>
 Hello Flink expert,

 I have a cluster with 10 Task Managers, configured with 6 task slot
 each, and a pipeline that has 13 tasks/operators with parallelism of 5. But
 when running the pipeline I observer that only  5 slots are being used, the
 other 55 slots are available/free. It should use all of my slots, right?
 since I have 13 (tasks) x 5 = 65 sub-tasks? What are the configuration that
 I missed in order to leverage all of the available slots for my pipelines?

 Thanks,
 Cam


>>>


Re: Why available task slots are not leveraged for pipeline?

2019-08-12 Thread Cam Mach
Hi Zhu and Abhishek,

Thanks for your response and pointers. It's correct, the count of
parallelism will be the number of slot used for a pipeline. And, the number
(or count) of the parallelism is also used to generate number of sub-tasks
for each operator. In my case, I have parallelism of 60, it generates 60
sub-tasks for each operator. And so it'll be too much for one slot execute
at least 60 sub-tasks. I am wondering if there is a way we can set number
of generated sub-tasks, different than number of parallelism?

Cam Mach
Software Engineer
E-mail: cammac...@gmail.com
Tel: 206 972 2768



On Sun, Aug 11, 2019 at 10:37 PM Zhu Zhu  wrote:

> Hi Cam,
> This case is expected due to slot sharing.
> A slot can be shared by one instance of different tasks. So the used slot
> is count of your max parallelism of a task.
> You can specify the shared group with slotSharingGroup(String
> slotSharingGroup) on operators.
>
> Thanks,
> Zhu Zhu
>
> Abhishek Jain  于2019年8月12日周一 下午1:23写道:
>
>> What you'se seeing is likely operator chaining. This is the default
>> behaviour of grouping sub tasks to avoid transer overhead (from one slot to
>> another). You can disable chaining if you need to. Please refer task and
>> operator chains
>> 
>> .
>>
>> - Abhishek
>>
>> On Mon, 12 Aug 2019 at 09:56, Cam Mach  wrote:
>>
>>> Hello Flink expert,
>>>
>>> I have a cluster with 10 Task Managers, configured with 6 task slot
>>> each, and a pipeline that has 13 tasks/operators with parallelism of 5. But
>>> when running the pipeline I observer that only  5 slots are being used, the
>>> other 55 slots are available/free. It should use all of my slots, right?
>>> since I have 13 (tasks) x 5 = 65 sub-tasks? What are the configuration that
>>> I missed in order to leverage all of the available slots for my pipelines?
>>>
>>> Thanks,
>>> Cam
>>>
>>>
>>


Changing the way keys are defined breaks savepoints

2019-08-12 Thread Andrea Gallina

Hi everyone,

I have a job running in production whose structure is approximately this;

stream
?? .filter(inboundData -> inboundData.hasToBeFiltered())
?? .keyBy("myKey")
?? .process(doSomething());

I've recently decided to test the extent to which I can change a job's 
structure without breaking backward compatibility; more specifically, 
I've tried to change the way the key is defined in the keyBy() operator 
by defining it as a lambda function rather than by field expression. The 
modified structure would therefore look like this:


stream
?? .filter(inboundData -> inboundData.hasToBeFiltered())
?? .keyBy(inboundData -> inboundData.getMyKey())
?? .process(doSomething());

I then tried to run the new job by restoring the savepoint taken with 
the old structure, but I get a state migration exception:


org.apache.flink.util.StateMigrationException: The new key serializer 
must be compatible


Now this was a bit unexpected since changing the way a key is defined 
does not seem like a breaking change (unlike changing the actual key 
used for partitioning).


Is this an expected behavior or am I missing something?

Thanks


---
Questa e-mail ? stata controllata per individuare virus con Avast antivirus.
https://www.avast.com/antivirus



答复: flink1.10版本连接hive报错

2019-08-12 Thread 苏 欣
感谢各位大佬提供思路,我增加了lzo的jar后不再报这种错而且能取到hive表的数据了。

我以为在flink-shaded-hadoop-2-uber里面包含了所有hadoop相关的包所以没去考虑缺包的问题

附下缺少的pom内容:


   org.apache.hadoop
   hadoop-lzo
   0.4.13






发送自 Windows 10 版邮件应用




发件人: zhisheng 
发送时间: Saturday, August 10, 2019 5:58:02 PM
收件人: user-zh 
主题: Re: flink1.10版本连接hive报错

hi 苏欣:
建议先检查一下最后打的 jar 包里面是否包含了  com.hadoop.compression.lzo.LzoCodec 和
com.hadoop.compression.lzo.LzoCodec

苏 欣  于2019年8月9日周五 下午5:41写道:

> 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
>
> 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
> 请问有人遇到过这种问题吗?
>
> 报错信息如下:
> 
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error.,  org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> job.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
> at
> 

Re: Why Job Manager die/restarted when Task Manager die/restarted?

2019-08-12 Thread Cam Mach
Hi Zhu,

Look like it's expected. Those are the cases that are happened to our
cluster.

Thanks for your response, Zhu

Cam



On Sun, Aug 11, 2019 at 10:53 PM Zhu Zhu  wrote:

> Another possibility is the JM is killed externally, e.g. K8s may kill
> JM/TM if it exceeds the resource limit.
>
> Thanks,
> Zhu Zhu
>
> Zhu Zhu  于2019年8月12日周一 下午1:45写道:
>
>> Hi Cam,
>>
>> Flink master should not die when getting disconnected with task managers.
>> It may exit for cases below:
>> 1. when the job terminated(FINISHED/FAILED/CANCELED). If you job is
>> configured with no restart retry, a TM failure can cause the job to be
>> FAILED.
>> 2. JM lost HA leadership, e.g. lost connection to ZK
>> 3. encounters other unexpected fatal errors. In this case we need to
>> check the log to see what happens then
>>
>> Thanks,
>> Zhu Zhu
>>
>> Cam Mach  于2019年8月12日周一 下午12:15写道:
>>
>>> Hello Flink experts,
>>>
>>> We are running Flink under Kubernetes and see that Job Manager
>>> die/restarted whenever Task Manager die/restarted or couldn't get connected
>>> each other. Is there any specific configurations/parameters that we need to
>>> turn on to stop this? Or this is expected?
>>>
>>> Thanks,
>>> Cam
>>>
>>>


Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Piotr Nowojski
Hi,

Ok, I see. You can try to rewrite your logic (or maybe records schema by adding 
some ID fields) to manually deduplicating the records after processing them 
with at least once semantic. Such setup is usually simpler, with slightly 
better throughput and significantly better latency (end-to-end exactly once 
latency is limited by checkpointing time).

Piotrek

> On 12 Aug 2019, at 11:12, Tony Wei  wrote:
> 
> Hi Piotr,
> 
> Thanks a lot. I need exactly once in my use case, but instead of having the 
> risk of losing data, at least once is more acceptable when error occurred.
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski mailto:pi...@data-artisans.com>> 於 
> 2019年8月12日 週一 下午3:27寫道:
> Hi,
> 
> Yes, if it’s due to transaction timeout you will lose the data.
> 
> Whether can you fallback to at least once, that depends on Kafka, not on 
> Flink, since it’s the Kafka that timeouts those transactions and I don’t see 
> in the documentation anything that could override this [1]. You might try 
> disabling the mechanism via setting 
> `transaction.abort.timed.out.transaction.cleanup.interval.ms 
> ` or 
> `transaction.remove.expired.transaction.cleanup.interval.ms 
> `, but 
> that’s question more to Kafka guys. Maybe Becket could help with this.
> 
> Also it MIGHT be that Kafka doesn’t remove records from the topics when 
> aborting the transaction and MAYBE you can still access them via 
> “READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 
> 
> Sorry that I can not help more.
> 
> If you do not care about exactly once, why don’t you just set the connector 
> to at least once mode?
> 
> Piotrek
> 
>> On 12 Aug 2019, at 06:29, Tony Wei > > wrote:
>> 
>> Hi,
>> 
>> I had the same exception recently. I want to confirm that if it is due to 
>> transaction timeout,
>> then I will lose those data. Am I right? Can I make it fall back to at least 
>> once semantic in
>> this situation?
>> 
>> Best,
>> Tony Wei
>> 
>> Piotr Nowojski mailto:pi...@data-artisans.com>> 於 
>> 2018年3月21日 週三 下午10:28寫道:
>> Hi,
>> 
>> But that’s exactly the case: producer’s transaction timeout starts when the 
>> external transaction starts - but FlinkKafkaProducer011 keeps an active 
>> Kafka transaction for the whole period between checkpoints.
>> 
>> As I wrote in the previous message:
>> 
>> > in case of failure, your timeout must also be able to cover the additional 
>> > downtime required for the successful job restart. Thus you should increase 
>> > your timeout accordingly.
>> 
>> I think that 15 minutes timeout is a way too small value. If your job fails 
>> because of some intermittent failure (for example worker crash/restart), you 
>> will only have a couple of minutes for a successful Flink job restart. 
>> Otherwise you will lose some data (because of the transaction timeouts).
>> 
>> Piotrek
>> 
>>> On 21 Mar 2018, at 10:30, Dongwon Kim >> > wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Now my streaming pipeline is working without retries. 
>>> I decreased Flink's checkpoint interval from 15min to 10min as you 
>>> suggested [see screenshot_10min_ckpt.png].
>>> 
>>> I though that producer's transaction timeout starts when the external 
>>> transaction starts.
>>> The truth is that Producer's transaction timeout starts after the last 
>>> external checkpoint is committed.
>>> Now that I have 15min for Producer's transaction timeout and 10min for 
>>> Flink's checkpoint interval, and every checkpoint takes less than 5 
>>> minutes, everything is working fine.
>>> Am I right?
>>> 
>>> Anyway thank you very much for the detailed explanation!
>>> 
>>> Best,
>>> 
>>> Dongwon
>>> 
>>> 
>>> 
>>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski >> > wrote:
>>> Hi,
>>> 
>>> Please increase transaction.timeout.ms  to 
>>> a greater value or decrease Flink’s checkpoint interval, I’m pretty sure 
>>> the issue here is that those two values are overlapping. I think that’s 
>>> even visible on the screenshots. First checkpoint completed started at 
>>> 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 
>>> and ended at 14:49:16. That gives you minimal transaction duration of 15 
>>> minutes and 10 seconds, with maximal transaction duration of 21 minutes.
>>> 
>>> In HAPPY SCENARIO (without any failure and restarting), you should assume 
>>> that your timeout interval should cover with some safety margin the period 
>>> between start of a checkpoint and end of the NEXT checkpoint, since this is 
>>> the upper bound how long the transaction might be used. In your case at 
>>> least ~25 minutes.
>>> 
>>> On top of that, as described in the docs, 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance

Re: some slots are not be available,when job is not running

2019-08-12 Thread Xintong Song
Hi,

It would be good if you can provide the job manager and task manager log
files, so that others can analysis the problem?

Thank you~

Xintong Song



On Mon, Aug 12, 2019 at 10:12 AM pengcheng...@bonc.com.cn <
pengcheng...@bonc.com.cn> wrote:

> Hi all,
> some slots are not be available,when job is not running.
> I get TM dump when job is not running,and analysis it with *Eclipse
> Memory Analyzer*. Here are some of the results which look useful:
>
>
>- org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
>@ 0x7f9442c8
>
> Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map ->
> Timestamps/Watermarks -> from: (ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR,
> O_ORDERKEY_LONG, O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE,
> O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT,
> O_TIMESTAMP1, O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT,
> O_TIMESTAMP_EVENT, OVER_TIME) -> where: (AND(LIKE(O_ORDERPRIORITY,
> _UTF-16LE'%M%'), <=(O_CLERK, _UTF-16LE'Clerk#00144'), >=(O_CLERK,
> _UTF-16LE'Clerk#00048'), =(O_ORDERSTATUS, _UTF-16LE'O'))), select:
> (O_CLERK, O_ORDERDATE, CAST(_UTF-16LE'O') AS O_ORDERSTATUS,
> O_ORDERPRIORITY, OVER_TIME, EXTRACT(FLAG(MONTH), O_DATE) AS $f5, O_COMMENT)
> -> time attribute: (OVER_TIME) (2/8) 272 1,281,344 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
> @ 0x7f944200 true
>
>- org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
>@ 0x7f9442f5a630
>
> Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map ->
> from: (ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR, O_ORDERKEY_LONG,
> O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE,
> O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_TIMESTAMP1,
> O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT, O_TIMESTAMP_EVENT,
> OVER_TIME) -> where: (AND(LIKE(O_ORDERPRIORITY, _UTF-16LE'3%'),
> =(O_ORDERSTATUS, _UTF-16LE'F'), <=(O_CLERK, _UTF-16LE'Clerk#06144'),
> >=(O_CLERK, _UTF-16LE'Clerk#00048'))), select: (O_CLERK, O_ORDERDATE,
> CAST(_UTF-16LE'F') AS O_ORDERSTATUS, OVER_TIME, EXTRACT(FLAG(MONTH),
> O_ORDERDATE) AS $f4, O_COMMENT, O_CUSTKEY) (2/8) 272 1,274,312 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
> @ 0x7f9442f5a268 true
>
>- org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
>@ 0x7f94f4f048a8
>
> Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map ->
> from: (ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR, O_ORDERKEY_LONG,
> O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE,
> O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_TIMESTAMP1,
> O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT, O_TIMESTAMP_EVENT,
> OVER_TIME) -> where: (AND(IN(TRIM(FLAG(BOTH), _UTF-16LE' ',
> O_ORDERPRIORITY), _UTF-16LE'1-URGENT', _UTF-16LE'3-MEDIUM',
> _UTF-16LE'5-LOW'), >=(O_CLERK, _UTF-16LE'Clerk#24400'),
> >(EXTRACT(FLAG(MONTH), O_ORDERDATE), 6), <>(O_ORDERSTATUS, _UTF-16LE'O'),
> <=(O_ORDERKEY_INT, 12889))), select: (O_ORDERKEY_INT, O_CUSTKEY,
> O_ORDERSTATUS, O_ORDERDATE, O_CLERK, O_DATE, OVER_TIME) (2/8) 272
> 1,274,184 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
> @ 0x7f94f4f04800 true
>
>- org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
>@ 0x7f94441a1aa8
>
> Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map ->
> from: (ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR, O_ORDERKEY_LONG,
> O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE,
> O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_TIMESTAMP1,
> O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT, O_TIMESTAMP_EVENT,
> OVER_TIME) -> where: (AND(LIKE(O_ORDERPRIORITY, _UTF-16LE'%M%'),
> <=(O_CLERK, _UTF-16LE'Clerk#00144'), >=(O_CLERK,
> _UTF-16LE'Clerk#00048'), =(O_ORDERSTATUS, _UTF-16LE'O'))), select:
> (O_CLERK, O_ORDERDATE, CAST(_UTF-16LE'O') AS O_ORDERSTATUS,
> O_ORDERPRIORITY, CEIL(MOD(O_CUSTKEY, EXTRACT(FLAG(MONTH), O_ORDERDATE))) AS
> $f4, OVER_TIME, EXTRACT(FLAG(MONTH), O_DATE) AS $f6, O_COMMENT) (2/8) 272
> 1,263,416 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
> @ 0x7f94441a1a00 true
>
>- org.apache.kafka.common.utils.KafkaThread @ 0x7f91de001b78 »
>
> kafka-producer-network-thread | producer-1 184 342,912 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
> @ 0x7f91de40 true
>
>- org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
>@ 0x7f947a57d290
>
> Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map ->
> from: (ORD_ID, PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST,
> PS_COMMENT, PS_INT, PS_LONG, PS_DOUBLE8, PS_DOUBLE14, PS_DOUBLE15,
> PS_NUMBER1, PS_NUMBER2, 

Re: Re: 有一些TaskManager的slot不可用,尽管没有任务正在运行

2019-08-12 Thread Xintong Song
你的问题描述比较笼统,最好是能提供一些详细的信息和日志,这样其他人才好帮助你。
例如你用的是哪个版本的flink,运行的是什么模式 (perjob / session),是在什么环境下运行的(standalone / yarn /
mesos / k8s),是如何判断slot没有被释放的等。


Thank you~

Xintong Song



On Mon, Aug 12, 2019 at 3:57 AM pengcheng...@bonc.com.cn <
pengcheng...@bonc.com.cn> wrote:

> 你好,谢谢,图片显示确实有问题,不过没关系,图片不是很重要。
>
>
>
> pengcheng...@bonc.com.cn
>
> 发件人: Xintong Song
> 发送时间: 2019-08-09 20:09
> 收件人: user-zh
> 主题: Re: 有一些TaskManager的slot不可用,尽管没有任务正在运行
> Hi,
>
> 邮件中的图片显示不出来。Flink邮件列表的图片附件是有点问题的,如果是截图最好上传到其他地方然后把链接贴出来。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Aug 9, 2019 at 10:06 AM pengcheng...@bonc.com.cn <
> pengcheng...@bonc.com.cn> wrote:
>
> > 各位大佬:
> >
> > 有对这种情况比较了解的吗?任务结束后,一些slot并没有释放掉。
> >
> >
> > 如图所示:
> >
> >
> >
> >
> > --
> > pengcheng...@bonc.com.cn
> >
>


Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Tony Wei
Hi Piotr,

Thanks a lot. I need exactly once in my use case, but instead of having the
risk of losing data, at least once is more acceptable when error occurred.

Best,
Tony Wei

Piotr Nowojski  於 2019年8月12日 週一 下午3:27寫道:

> Hi,
>
> Yes, if it’s due to transaction timeout you will lose the data.
>
> Whether can you fallback to at least once, that depends on Kafka, not on
> Flink, since it’s the Kafka that timeouts those transactions and I don’t
> see in the documentation anything that could override this [1]. You might
> try disabling the mechanism via setting `
> transaction.abort.timed.out.transaction.cleanup.interval.ms` or `
> transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s
> question more to Kafka guys. Maybe Becket could help with this.
>
> Also it MIGHT be that Kafka doesn’t remove records from the topics when
> aborting the transaction and MAYBE you can still access them via
> “READ_UNCOMMITTED” mode. But that’s again, question to Kafka.
>
> Sorry that I can not help more.
>
> If you do not care about exactly once, why don’t you just set the
> connector to at least once mode?
>
> Piotrek
>
> On 12 Aug 2019, at 06:29, Tony Wei  wrote:
>
> Hi,
>
> I had the same exception recently. I want to confirm that if it is due to
> transaction timeout,
> then I will lose those data. Am I right? Can I make it fall back to at
> least once semantic in
> this situation?
>
> Best,
> Tony Wei
>
> Piotr Nowojski  於 2018年3月21日 週三 下午10:28寫道:
>
>> Hi,
>>
>> But that’s exactly the case: producer’s transaction timeout starts when
>> the external transaction starts - but FlinkKafkaProducer011 keeps an active
>> Kafka transaction for the whole period between checkpoints.
>>
>> As I wrote in the previous message:
>>
>> > in case of failure, your timeout must also be able to cover the
>> additional downtime required for the successful job restart. Thus you
>> should increase your timeout accordingly.
>>
>> I think that 15 minutes timeout is a way too small value. If your job
>> fails because of some intermittent failure (for example worker
>> crash/restart), you will only have a couple of minutes for a successful
>> Flink job restart. Otherwise you will lose some data (because of the
>> transaction timeouts).
>>
>> Piotrek
>>
>> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
>>
>> Hi Piotr,
>>
>> Now my streaming pipeline is working without retries.
>> I decreased Flink's checkpoint interval from 15min to 10min as you
>> suggested [see screenshot_10min_ckpt.png].
>>
>> I though that producer's transaction timeout starts when the external
>> transaction starts.
>> The truth is that Producer's transaction timeout starts after the last
>> external checkpoint is committed.
>> Now that I have 15min for Producer's transaction timeout and 10min for
>> Flink's checkpoint interval, and every checkpoint takes less than 5
>> minutes, everything is working fine.
>> Am I right?
>>
>> Anyway thank you very much for the detailed explanation!
>>
>> Best,
>>
>> Dongwon
>>
>>
>>
>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Please increase transaction.timeout.ms to a greater value or decrease
>>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
>>> two values are overlapping. I think that’s even visible on the screenshots.
>>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
>>> the second one started at 14:45:53 and ended at 14:49:16. That gives you
>>> minimal transaction duration of 15 minutes and 10 seconds, with maximal
>>> transaction duration of 21 minutes.
>>>
>>> In HAPPY SCENARIO (without any failure and restarting), you should
>>> assume that your timeout interval should cover with some safety margin the
>>> period between start of a checkpoint and end of the NEXT checkpoint, since
>>> this is the upper bound how long the transaction might be used. In your
>>> case at least ~25 minutes.
>>>
>>> On top of that, as described in the docs,
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>>  ,
>>> in case of failure, your timeout must also be able to cover the additional
>>> downtime required for the successful job restart. Thus you should increase
>>> your timeout accordingly.
>>>
>>> Piotrek
>>>
>>>
>>> On 20 Mar 2018, at 11:58, Dongwon Kim  wrote:
>>>
>>> Hi Piotr,
>>>
>>> We have set producer's [transaction.timeout.ms] to 15 minutes and have
>>> used the default setting for broker (15 mins).
>>> As Flink's checkpoint interval is 15 minutes, it is not a situation
>>> where Kafka's timeout is smaller than Flink's checkpoint interval.
>>> As our first checkpoint just takes 2 minutes, it seems like transaction
>>> is not committed properly.
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski >> > wrote:
>>>
 Hi,

 What’s your Kafka’s transaction timeout setting? Please both check
 

some slots are not be available,when job is not running

2019-08-12 Thread pengcheng...@bonc.com.cn
Hi all,
some slots are not be available,when job is not running.
I get TM dump when job is not running,and analysis it with Eclipse Memory 
Analyzer. Here are some of the results which look useful:

org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread @ 
0x7f9442c8
Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map -> 
Timestamps/Watermarks -> from: (ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR, 
O_ORDERKEY_LONG, O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, 
O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_TIMESTAMP1, 
O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT, O_TIMESTAMP_EVENT, OVER_TIME) 
-> where: (AND(LIKE(O_ORDERPRIORITY, _UTF-16LE'%M%'), <=(O_CLERK, 
_UTF-16LE'Clerk#00144'), >=(O_CLERK, _UTF-16LE'Clerk#00048'), 
=(O_ORDERSTATUS, _UTF-16LE'O'))), select: (O_CLERK, O_ORDERDATE, 
CAST(_UTF-16LE'O') AS O_ORDERSTATUS, O_ORDERPRIORITY, OVER_TIME, 
EXTRACT(FLAG(MONTH), O_DATE) AS $f5, O_COMMENT) -> time attribute: (OVER_TIME) 
(2/8)2721,281,344org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
 @ 0x7f944200true
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread @ 
0x7f9442f5a630
Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map -> from: 
(ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR, O_ORDERKEY_LONG, 
O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, 
O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_TIMESTAMP1, 
O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT, O_TIMESTAMP_EVENT, OVER_TIME) 
-> where: (AND(LIKE(O_ORDERPRIORITY, _UTF-16LE'3%'), =(O_ORDERSTATUS, 
_UTF-16LE'F'), <=(O_CLERK, _UTF-16LE'Clerk#06144'), >=(O_CLERK, 
_UTF-16LE'Clerk#00048'))), select: (O_CLERK, O_ORDERDATE, 
CAST(_UTF-16LE'F') AS O_ORDERSTATUS, OVER_TIME, EXTRACT(FLAG(MONTH), 
O_ORDERDATE) AS $f4, O_COMMENT, O_CUSTKEY) 
(2/8)2721,274,312org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
 @ 0x7f9442f5a268true
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread @ 
0x7f94f4f048a8
Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map -> from: 
(ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR, O_ORDERKEY_LONG, 
O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, 
O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_TIMESTAMP1, 
O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT, O_TIMESTAMP_EVENT, OVER_TIME) 
-> where: (AND(IN(TRIM(FLAG(BOTH), _UTF-16LE' ', O_ORDERPRIORITY), 
_UTF-16LE'1-URGENT', _UTF-16LE'3-MEDIUM', _UTF-16LE'5-LOW'), >=(O_CLERK, 
_UTF-16LE'Clerk#24400'), >(EXTRACT(FLAG(MONTH), O_ORDERDATE), 6), 
<>(O_ORDERSTATUS, _UTF-16LE'O'), <=(O_ORDERKEY_INT, 12889))), select: 
(O_ORDERKEY_INT, O_CUSTKEY, O_ORDERSTATUS, O_ORDERDATE, O_CLERK, O_DATE, 
OVER_TIME) 
(2/8)2721,274,184org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
 @ 0x7f94f4f04800true
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread @ 
0x7f94441a1aa8
Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map -> from: 
(ORD_ID, O_ORDERKEY_INT, O_ORDERKEY_VARCHAR, O_ORDERKEY_LONG, 
O_ORDERKEY_DOUBLE, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, 
O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_TIMESTAMP1, 
O_TIMESTAMP2, O_DATE, O_TIMESTAMP, O_DATE_EVENT, O_TIMESTAMP_EVENT, OVER_TIME) 
-> where: (AND(LIKE(O_ORDERPRIORITY, _UTF-16LE'%M%'), <=(O_CLERK, 
_UTF-16LE'Clerk#00144'), >=(O_CLERK, _UTF-16LE'Clerk#00048'), 
=(O_ORDERSTATUS, _UTF-16LE'O'))), select: (O_CLERK, O_ORDERDATE, 
CAST(_UTF-16LE'O') AS O_ORDERSTATUS, O_ORDERPRIORITY, CEIL(MOD(O_CUSTKEY, 
EXTRACT(FLAG(MONTH), O_ORDERDATE))) AS $f4, OVER_TIME, EXTRACT(FLAG(MONTH), 
O_DATE) AS $f6, O_COMMENT) 
(2/8)2721,263,416org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
 @ 0x7f94441a1a00true
org.apache.kafka.common.utils.KafkaThread @ 0x7f91de001b78 »
kafka-producer-network-thread | 
producer-1184342,912org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
 @ 0x7f91de40true
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread @ 
0x7f947a57d290
Kafka 0.10 Fetcher for Source: Custom Source -> Map -> Filter -> Map -> from: 
(ORD_ID, PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT, 
PS_INT, PS_LONG, PS_DOUBLE8, PS_DOUBLE14, PS_DOUBLE15, PS_NUMBER1, PS_NUMBER2, 
PS_NUMBER3, PS_NUMBER4, PS_DATE, PS_TIMESTAMP, PS_DATE_EVENT, 
PS_TIMESTAMP_EVENT, OVER_TIME) -> select: (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, 
PS_COMMENT, OVER_TIME) 
(1/8)272243,512org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
 @ 0x7f947a57c6d0true
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread @ 
0x7f91e0da4c40 »

Re: Flink cannot recognized catalog set by registerCatalog.

2019-08-12 Thread Dawid Wysakowicz
Hi Simon,

First of all for more thorough discussion you might want to have a look
at this thread:
https://lists.apache.org/thread.html/b450df1a7bf10187301820e529cbc223ce63f233c1af0f0c7415e62b@%3Cdev.flink.apache.org%3E

TL;DR; All objects registered with registerTable/registerTableSource are
temporary objects that do not have serializable form and therefore can
only be stored in an in-memory catalog. The useCatalog/useDatabase are
experimental APIs in the upcoming 1.9 release.

If you want to be sure that tables are stored in a given catalog you can
either register it directly via tEnv.getCatalog().createTable() or you
can try using SQL DDL.

Best,

Dawid

On 12/08/2019 09:37, Simon Su wrote:
> Hi All
>     I want to use a custom catalog by setting the name “ca1” and
> create a database under this catalog. When I submit the 
> SQL, and it raises the error like :
>
>     Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 1, column 98 to line 1, column 116: Object 'orderstream' not
> found within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not
> found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
> Object 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>    
> It seems that Calcite cannot find the source object as expected, After
> I debug the code I found that 

Flink cannot recognized catalog set by registerCatalog.

2019-08-12 Thread Simon Su
Hi All
I want to use a custom catalog by setting the name “ca1” and create a 
database under this catalog. When I submit the 
SQL, and it raises the error like :


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
SQL validation failed. From line 1, column 98 to line 1, column 116: Object 
'orderstream' not found within 'ca1.db1'
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at 
org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at 
org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 
'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more
   
It seems that Calcite cannot find the source object as expected, After I debug 
the code I found that when using tableEnv.registerTableSource or 
registerTableSink, It will use a build-in catalog with a hard-code catalog name 
( default-catalog ) and database name ( default_database ) while 
tableEnv.registerCatalog here cannot change this behaviros, So is this a 
reasonable behaviors ? If I don’t want to use default build-in catalog and 
database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change 
build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
"comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");


Re:flink源码编译可以不编译scala代码吗

2019-08-12 Thread chaojianok
Scala版本不一致,你本机的是Scala 2.13.0,Flink源码对应的是Scala 2.11,你把本机的Scala版本和Flink 
1.7对应的Scala版本保持一致就可以了,建议是本地安装Scala 2.11重新编译Flink 1.7。
在 2019-08-12 15:15:38,"苟刚"  写道:
>
>
>
>Hi,All:
>
>
>  我再尝试编译flink 1.7的源码时,遇到如下错误,本人对scala不是很了解,不知道是不是版本问题引起,另外可以去掉sacla模块编译吗:
> 本机scala版本:2.13.0
>JDK 版本: 1.8.0_91
>[ERROR] Failed to execute goal 
>org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) 
>on project flink-scala_2.11: Compilation failure: Compilation failure:
>[ERROR] 
>/Users/gang.gou/work/git/github/flink/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java:[67,44]
> 不兼容的类型: 
>无法推断org.apache.flink.api.java.typeutils.runtime.EitherSerializer<>的类型参数
>[ERROR] 原因: 不存在类型变量L,R,T,T的实例, 
>以使org.apache.flink.api.java.typeutils.runtime.EitherSerializer与org.apache.flink.api.common.typeutils.TypeSerializer>一致
>[ERROR] 
>/Users/gang.gou/work/git/github/flink/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java:[78,86]
> 不兼容的类型: 
>org.apache.flink.api.common.typeutils.TypeSerializer>无法转换为org.apache.flink.api.java.typeutils.runtime.EitherSerializer
>[ERROR] -> [Help 1]
>[ERROR] 
>[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
>switch.
>[ERROR] Re-run Maven using the -X switch to enable full debug logging.
>[ERROR] 
>[ERROR] For more information about the errors and possible solutions, please 
>read the following articles:
>[ERROR] [Help 1] 
>http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>[ERROR] 
>[ERROR] After correcting the problems, you can resume the build with the 
>command
>[ERROR]   mvn  -rf :flink-scala_2.11
>
>
>Process finished with exit code 1
>
>
>
>
>--
>Best Wishes
>   Galen.K


Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Piotr Nowojski
Hi,

Yes, if it’s due to transaction timeout you will lose the data.

Whether can you fallback to at least once, that depends on Kafka, not on Flink, 
since it’s the Kafka that timeouts those transactions and I don’t see in the 
documentation anything that could override this [1]. You might try disabling 
the mechanism via setting 
`transaction.abort.timed.out.transaction.cleanup.interval.ms` or 
`transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s 
question more to Kafka guys. Maybe Becket could help with this.

Also it MIGHT be that Kafka doesn’t remove records from the topics when 
aborting the transaction and MAYBE you can still access them via 
“READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, why don’t you just set the connector to 
at least once mode?

Piotrek

> On 12 Aug 2019, at 06:29, Tony Wei  wrote:
> 
> Hi,
> 
> I had the same exception recently. I want to confirm that if it is due to 
> transaction timeout,
> then I will lose those data. Am I right? Can I make it fall back to at least 
> once semantic in
> this situation?
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski mailto:pi...@data-artisans.com>> 於 
> 2018年3月21日 週三 下午10:28寫道:
> Hi,
> 
> But that’s exactly the case: producer’s transaction timeout starts when the 
> external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka 
> transaction for the whole period between checkpoints.
> 
> As I wrote in the previous message:
> 
> > in case of failure, your timeout must also be able to cover the additional 
> > downtime required for the successful job restart. Thus you should increase 
> > your timeout accordingly.
> 
> I think that 15 minutes timeout is a way too small value. If your job fails 
> because of some intermittent failure (for example worker crash/restart), you 
> will only have a couple of minutes for a successful Flink job restart. 
> Otherwise you will lose some data (because of the transaction timeouts).
> 
> Piotrek
> 
>> On 21 Mar 2018, at 10:30, Dongwon Kim > > wrote:
>> 
>> Hi Piotr,
>> 
>> Now my streaming pipeline is working without retries. 
>> I decreased Flink's checkpoint interval from 15min to 10min as you suggested 
>> [see screenshot_10min_ckpt.png].
>> 
>> I though that producer's transaction timeout starts when the external 
>> transaction starts.
>> The truth is that Producer's transaction timeout starts after the last 
>> external checkpoint is committed.
>> Now that I have 15min for Producer's transaction timeout and 10min for 
>> Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, 
>> everything is working fine.
>> Am I right?
>> 
>> Anyway thank you very much for the detailed explanation!
>> 
>> Best,
>> 
>> Dongwon
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> Please increase transaction.timeout.ms  to a 
>> greater value or decrease Flink’s checkpoint interval, I’m pretty sure the 
>> issue here is that those two values are overlapping. I think that’s even 
>> visible on the screenshots. First checkpoint completed started at 14:28:48 
>> and ended at 14:30:43, while the second one started at 14:45:53 and ended at 
>> 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 
>> seconds, with maximal transaction duration of 21 minutes.
>> 
>> In HAPPY SCENARIO (without any failure and restarting), you should assume 
>> that your timeout interval should cover with some safety margin the period 
>> between start of a checkpoint and end of the NEXT checkpoint, since this is 
>> the upper bound how long the transaction might be used. In your case at 
>> least ~25 minutes.
>> 
>> On top of that, as described in the docs, 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>  
>> 
>>  , in case of failure, your timeout must also be able to cover the 
>> additional downtime required for the successful job restart. Thus you should 
>> increase your timeout accordingly. 
>> 
>> Piotrek
>> 
>> 
>>> On 20 Mar 2018, at 11:58, Dongwon Kim >> > wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> We have set producer's [transaction.timeout.ms 
>>> ] to 15 minutes and have used the default 
>>> setting for broker (15 mins).
>>> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
>>> Kafka's timeout is smaller than Flink's checkpoint interval.
>>> As our first checkpoint just takes 2 minutes, it seems like transaction is 
>>> not committed properly.
>>> 
>>> Best,
>>> 
>>> - Dongwon
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski >> 

Scylla connector

2019-08-12 Thread Lian Jiang
Hi,

i am new to Flink. Is there scylla connector equivalent to the cassandra
connector:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/cassandra.html?
Or can Flink use Scylla as a sink via the cassandra connector? Thanks.