Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

2020-03-04 Thread Arvid Heise
Hi Fernando,

How much data are you trying to write? If you just use single messages for
testing, it could be that the default bulk settings are not working well.

If so, could you please adjust the following settings and report back?

public enum SinkOption {
   BULK_FLUSH_MAX_ACTIONS,
   BULK_FLUSH_MAX_SIZE,
   BULK_FLUSH_INTERVAL
}


On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <
fernando.cas...@leidos.com> wrote:

> Thank you guys. So I have no idea of why data is not being pushed to
> Elasticsearch… ☹
>
>
>
> My complete code is at
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
>
> Btw, for some reason I still need to pass .documentType to the
> Elasticsearch connection descriptor (getting it from
> org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7
> doesn’t do types anymore.
>
>
>
> In case you can’t access stackoverflow for some reason, here is the code
> below too:
>
>
>
>
> */* * This Scala source file was generated by the Gradle 'init' task. */ 
> **package
> *flinkNamePull
>
> *import *java.time.LocalDateTime
> *import *java.util.Properties
>
> *import *org.apache.flink.api.common.serialization.SimpleStringSchema
> *import *org.apache.flink.streaming.api.scala._
> *import *org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010,
> FlinkKafkaProducer010}
> *import *org.apache.flink.api.common.functions.RichMapFunction
> *import *org.apache.flink.configuration.Configuration
> *import *org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> *import *org.apache.flink.table.api.{DataTypes, Table}
> *import *org.apache.flink.table.api.scala.StreamTableEnvironment
> *import *org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
> *import *org.apache.flink.types.Row
>
> *object *Demo {
>
>
>
> */*** MapFunction to generate Transfers POJOs from parsed CSV data.
> */   **class *TransfersMapper *extends *RichMapFunction[String,
> Transfers] {
> *private var *formatter =
>
> *null *@throws[Exception]
> *override def *open(parameters: Configuration): Unit = {
>   *super*.open(parameters)
>
> *//formatter = DateTimeFormat.forPattern("-MM-dd HH:mm:ss") *}
>
> @throws[Exception]
> *override def *map(csvLine: String): Transfers = {
>
> *//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")   **var
> *splitCsv = csvLine.stripLineEnd.split(*","*)
>
>   *val *arrLength = splitCsv.length
>   *val *i = 0
>   *if *(arrLength != 13) {
> *for *(i <- arrLength + 1 to 13) {
>   *if *(i == 13) {
> splitCsv = splitCsv :+
> *"0.0"   *} *else *{
> splitCsv = splitCsv :+
> *""   *}
> }
>   }
>   *var *trans = *new *Transfers()
>   trans.*rowId *= splitCsv(0)
>   trans.*subjectId *= splitCsv(1)
>   trans.*hadmId *= splitCsv(2)
>   trans.*icuStayId *= splitCsv(3)
>   trans.*dbSource *= splitCsv(4)
>   trans.*eventType *= splitCsv(5)
>   trans.*prev_careUnit *= splitCsv(6)
>   trans.*curr_careUnit *= splitCsv(7)
>   trans.*prev_wardId *= splitCsv(8)
>   trans.*curr_wardId *= splitCsv(9)
>   trans.*inTime *= splitCsv(10)
>   trans.*outTime *= splitCsv(11)
>   trans.*los *= splitCsv(12).toDouble
>
>   *return *trans
> }
>   }
>
>   *def *main(args: Array[String]) {
>
> *// Create streaming execution environment **val *env =
> StreamExecutionEnvironment.
> *getExecutionEnvironment *env.setParallelism(1)
>
>
> *// Set properties per KafkaConsumer API **val *properties = *new *
> Properties()
> properties.setProperty(*"bootstrap.servers"*, *"kafka.kafka:9092"*)
> properties.setProperty(*"group.id "*, *"test"*)
>
>
> *// Add Kafka source to environment **val *myKConsumer = *new *
> FlinkKafkaConsumer010[String](*"raw.data3"*, *new *SimpleStringSchema(),
> properties)
>
> *// Read from beginning of topic *myKConsumer.setStartFromEarliest()
>
> *val *streamSource = env
>   .addSource(myKConsumer)
>
>
> *// Transform CSV into a Transfers object **val *streamTransfers =
> streamSource.map(*new *TransfersMapper())
>
>
> *// create a TableEnvironment **val *tEnv = StreamTableEnvironment.
> *create*(env)
>
>
> *// register a Table **val *tblTransfers: Table =
> tEnv.fromDataStream(streamTransfers)
> tEnv.createTemporaryView(*"transfers"*, tblTransfers)
>
> tEnv.connect(
>   *new *Elasticsearch()
> .version(*"7"*)
> .host(
> *"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local"*,
> 9200, *"http"*)
> *// required: one or more Elasticsearch hosts to connect to *
> .index(*"transfers-sum"*)
> .documentType(*"_doc"*) *// not sure why this is still needed for
> ES7*
> .keyNullLiteral(*"n/a"*)
> )
>   .withFormat(*new *Json().jsonSchema(*"{type: 'object', properties:
> {curr_careUnit: {type: 

Re: Teradata as JDBC Connection

2020-03-04 Thread Arvid Heise
Hi Norm,

the error message already points to the main issue: your property names are
not correct.





*Unsupported property keys: drivername update-mode password dburl username*

You should use the builder to properly configure the sink [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbcappendtablesink

On Thu, Mar 5, 2020 at 12:58 AM Norm Vilmer (Contractor) <
norm.vil...@wnco.com> wrote:

> Same error with this change:
>
> public class Teradata extends ConnectorDescriptor {
> /**
>  * Constructs a {@link ConnectorDescriptor}.
>  */
> public Teradata() {
> super("jdbc", 1, false);
> }
>
> @Override
> protected Map toConnectorProperties() {
> Map map = new HashMap<>();
> map.put(JDBCValidator.CONNECTOR_DRIVER,
> "com.teradata.jdbc.TeraDriver");
> map.put(JDBCValidator.CONNECTOR_URL,
> "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
> map.put(JDBCValidator.CONNECTOR_USERNAME, "...");
> map.put(JDBCValidator.CONNECTOR_PASSWORD, "...!");
> return map;
> }
> }
>
> -Original Message-
> From: Norm Vilmer (Contractor) 
> Sent: Wednesday, March 4, 2020 10:37 AM
> To: user@flink.apache.org
> Subject: EXTERNAL - Teradata as JDBC Connection
>
> Caution: Sender is from outside SWA. Take caution before opening
> links/attachments or replying with sensitive data. If suspicious, forward
> to 'suspici...@wnco.com'.
>
> Using Flink 1.10 and coding in Java 11, is it possible use to write to
> Teradata in append mode? MySQL, PostgreSQL, and Derby are the only
> supported drivers listed. Thanks.
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_table_connect.html-23connectors=DwIFAg=dyyteaO_66X5RejcGgaVFCWGX8V6S6CQobBcYjo__mc=a8BqCmWrJ1FuU14JVrlQLeWdeeSBWSiCJA9Y5xTWafg=kfV3arAbKYvpd5IvCtggkHsoDXKTgA1RrGMWrbcWZOo=n91D15kGNf9TDtKedGYD8EfDYxnvEzY8POgNtSE-icY=
>
> I created the ConnectorDescriptor below and am using it from
> tableEnvironment.connect() but get the exception shown below.
>
> public class Teradata extends ConnectorDescriptor {
> /**
>  * Constructs a {@link ConnectorDescriptor}.
>  */
> public Teradata() {
> super("jdbc", 1, false);
> }
>
> @Override
> protected Map toConnectorProperties() {
> Map map = new HashMap<>();
> map.put("Drivername", "com.teradata.jdbc.TeraDriver");
> map.put("DBUrl",
> "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
> map.put("Username", "...");
> map.put("Password", "...");
> return map;
> }
> }
>
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in the classpath.
>
> Reason: No factory supports all properties.
>
> The matching candidates:
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> Unsupported property keys:
> drivername
> update-mode
> password
> dburl
> username
>
> The following properties are requested:
> connector.property-version=1
> connector.type=jdbc
> dburl=jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP
> drivername=com.teradata.jdbc.TeraDriver
> password=xxx
> schema.0.data-type=VARCHAR(1)
> schema.0.name=f0
> schema.1.data-type=VARCHAR(1)
> schema.1.name=f1
> schema.2.data-type=VARCHAR(1)
> schema.2.name=f2
> update-mode=append
> username=xxx
>
> The following factories have been considered:
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
>
> *** CONFIDENTIALITY NOTICE ***
>
> This e-mail message and all attachments transmitted with it may contain
> legally privileged and confidential information intended solely for the use
> of the addressee. If the reader of this message is not the intended
> recipient, you are hereby notified that any reading, dissemination,
> distribution, copying, or other use of this message or its attachments is
> strictly prohibited. If you have received this message in error, please
> notify the sender immediately and delete this message from your system.
> Thank you.
>
> *** CONFIDENTIALITY NOTICE ***
>
> This e-mail message and all attachments transmitted with it may contain
> legally privileged and confidential information intended solely for the use
> of the addressee. If the reader of this message is not the intended
> recipient, you are hereby notified that any reading, dissemination,
> distribution, copying, or other use of this message or its attachments is
> strictly prohibited. If you have received this message in error, please
> 

Re: Rocksdb Serialization issue

2020-03-04 Thread Arvid Heise
Hi David,

the obvious reason is that your state stored an enum value that is not
present anymore. It tries to deserialize the 512. entry in your enum that
is not available.

However, since it's highly unlikely that you actually have that many enum
values in the same enum class, we are actually looking at a corrupt stream,
which is hard to fix. Could you describe which state you have?

Did you upgrade Flink or your application? If it's Flink, it's a bug. If
it's application, it may be that state is incompatible and would need to be
migrated.

Did you restart from checkpoint or savepoint?

On Thu, Mar 5, 2020 at 1:14 AM David Morin 
wrote:

> Hello,
>
> I have this Exception in my datastream app and I can't find the root cause.
> I consume data from Kafka and it fails when I try to get a value from my
> MapState in RocksDB.
> It was working in previous release of my app but I can't find the cause of
> this error.
>
> java.lang.ArrayIndexOutOfBoundsException: 512
> at
> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
> at
> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
> at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
> at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
> at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
> ..
>
> Flink version: 1.9.2
>
>
>


Re: How to use self defined json format when create table from kafka stream?

2020-03-04 Thread Jark Wu
Hi Lei,

Currently, Flink SQL doesn't support to register a binlog format (i.e. just
define "order_id" and "order_no", but the json schema has other binlog
fields).
This is exactly what we want to support in FLIP-105 [1] and FLIP-95.

For now, if you want to consume such json data, you have to define the full
schema, e.g. "type", "timestmap", and so on...

Btw, what Change Data Capture (CDC) tool are you using?

Best,
Jark

[1]:
https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#


On Thu, 5 Mar 2020 at 11:40, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> I want to rigister a table from mysql binlog like this:
>
> tEnv.sqlUpdate("CREATE TABLE order(\n"
> + "order_id BIGINT,\n"
> + "order_no VARCHAR,\n"
> + ") WITH (\n"
> + "'connector.type' = 'kafka',\n"
> ...
> + "'update-mode' = 'append',\n"
> + "'format.type' = 'json',\n"
> + "'format.derive-schema' = 'true'\n"
> + ")");
>
> using the following log format:
>
> {
>   "type" : "update",
>   "timestamp" : 1583373066000,
>   "binlog_filename" : "mysql-bin.000453",
>   "binlog_position" : 923020943,
>   "database" : "wms",
>   "table_name" : "t_pick_order",
>   "table_id" : 131936,
>   "columns" : [ {
> "id" : 1,
> "name" : "order_id",
> "column_type" : -5,
> "last_value" : 4606458,
> "value" : 4606458
>   }, {
> "id" : 2,
> "name" : "order_no",
> "column_type" : 12,
> "last_value" : "EDBMFSJ1S2003050006628",
> "value" : "EDBMFSJ1S2003050006628"
>   }]
> }
>
>
> Surely the format.type' = 'json',\n" will not parse the result as I
> expected.
> Is there any method I can implement this? For example, using a self
> defined format class.
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
>
>


Re: History server UI not working

2020-03-04 Thread Yang Wang
If all the rest api could be viewed successfully, then the reason may be js
cache.
You could try to force a refresh(e.g. Cmd+Shft+R for Mac). It solved my
problem before.


Best,
Yang

pwestermann  于2020年3月4日周三 下午8:40写道:

> We recently upgraded from Flink 1.7 to Flink 1.9.2 and the history server
> UI
> now seems to be broken. It doesn't load and always just displays a blank
> screen.
> The individual endpoints (e.g. /jobs/overview) still work.
> Could this be an issue caused by the Angular update for the regular UI?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Hive Source With Kerberos认证问题

2020-03-04 Thread Rui Li
能不能先用doAs的方式来试一下,比如注册HiveCatalog的部分在UserGroupInformation.getLoginUser().doAs()里做,排查下是不是HiveMetaStoreClient没有用上你登录用户的信息。
另外你的hive版本是2.1.1么?从stacktrace上来看跟2.1.1的代码对不上,比如
HiveMetaStoreClient.java的第562行:
https://github.com/apache/hive/blob/rel/release-2.1.1/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java#L562

On Wed, Mar 4, 2020 at 9:17 PM 叶贤勋  wrote:

> 你好,
> datanucleus jar的包的问题已经解决,之前应该是没有通过hive.metastore.uris进行连接访问HMS。
> 我在HiveCatalog的open方法里面做了Kerberos登录,
> UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
> 并且已经登录成功。按理说Kerberos登录成功后在这个进程就应该有权限访问metastore了吧。但是在创建megastore
> client时报了以下错误。
>
> 2020-03-04 20:23:17,191 DEBUG
> org.apache.flink.table.catalog.hive.HiveCatalog   - Hive
> MetaStore Uris is thrift://***1:9083,thrift://***2:9083.
> 2020-03-04 20:23:17,192 INFO
>  org.apache.flink.table.catalog.hive.HiveCatalog   - Created
> HiveCatalog 'myhive'
> 2020-03-04 20:23:17,360 INFO
>  org.apache.hadoop.security.UserGroupInformation   - Login
> successful for user ***/dev@***.COM using keytab file
> /Users/yexianxun/IdeaProjects/flink-1.9.0/build-target/examples/hive/kerberos/key.keytab
> 2020-03-04 20:23:17,360 DEBUG
> org.apache.flink.table.catalog.hive.HiveCatalog   - login user
> by kerberos, principal is ***/dev@***.CO, login is true
> 2020-03-04 20:23:17,374 INFO
>  org.apache.curator.framework.imps.CuratorFrameworkImpl- Starting
> 2020-03-04 20:23:17,374 DEBUG org.apache.curator.CuratorZookeeperClient
>   - Starting
> 2020-03-04 20:23:17,374 DEBUG org.apache.curator.ConnectionState
>  - Starting
> 2020-03-04 20:23:17,374 DEBUG org.apache.curator.ConnectionState
>  - reset
> 2020-03-04 20:23:17,374 INFO  org.apache.zookeeper.ZooKeeper
>  - Initiating client connection,
> connectString=***1:2181,***2:2181,***3:2181 sessionTimeout=6
> watcher=org.apache.curator.ConnectionState@6b52dd31
> 2020-03-04 20:23:17,379 DEBUG
> org.apache.zookeeper.client.ZooKeeperSaslClient   - JAAS
> loginContext is: HiveZooKeeperClient
> 2020-03-04 20:23:17,381 WARN  org.apache.zookeeper.ClientCnxn
>   - SASL configuration failed:
> javax.security.auth.login.LoginException: Unable to obtain password from
> user
>  Will continue connection to Zookeeper server without SASL authentication,
> if Zookeeper server allows it.
> 2020-03-04 20:23:17,381 INFO  org.apache.zookeeper.ClientCnxn
>   - Opening socket connection to server ***1:2181
> 2020-03-04 20:23:17,381 ERROR org.apache.curator.ConnectionState
>  - Authentication failed
> 2020-03-04 20:23:17,384 INFO  org.apache.zookeeper.ClientCnxn
>   - Socket connection established to ***1:2181, initiating
> session
> 2020-03-04 20:23:17,384 DEBUG org.apache.zookeeper.ClientCnxn
>   - Session establishment request sent on ***1:2181
> 2020-03-04 20:23:17,393 INFO  org.apache.zookeeper.ClientCnxn
>   - Session establishment complete on server ***1:2181,
> sessionid = 0x16f7af0645c25a8, negotiated timeout = 4
> 2020-03-04 20:23:17,393 INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State
> change: CONNECTED
> 2020-03-04 20:23:17,397 DEBUG org.apache.zookeeper.ClientCnxn
>   - Reading reply sessionid:0x16f7af0645c25a8, packet::
> clientPath:null serverPath:null finished:false header:: 1,3  replyHeader::
> 1,292064345364,0  request:: '/hive_base,F  response::
> s{17179869635,17179869635,1527576303010,1527576303010,0,3,0,0,0,1,249117832596}
> 2020-03-04 20:23:17,400 DEBUG org.apache.zookeeper.ClientCnxn
>   - Reading reply sessionid:0x16f7af0645c25a8, packet::
> clientPath:null serverPath:null finished:false header:: 2,12  replyHeader::
> 2,292064345364,0  request:: '/hive_base/namespaces/hive/uris,F  response::
> v{'dGhyaWZ0Oi8vaHphZGctYmRtcy03LnNlcnZlci4xNjMub3JnOjkwODM=,'dGhyaWZ0Oi8vaHphZGctYmRtcy04LnNlcnZlci4xNjMub3JnOjkwODM=},s{17179869664,17179869664,1527576306106,1527576306106,0,1106,0,0,0,2,292063632993}
> 2020-03-04 20:23:17,401 INFO  hive.metastore
>  - atlasProxy is set to
> 2020-03-04 20:23:17,401 INFO  hive.metastore
>  - Trying to connect to metastore with URI thrift://
> hzadg-bdms-7.server.163.org:9083
> 2020-03-04 20:23:17,408 INFO  hive.metastore
>  - tokenStrForm should not be null for querynull
> 2020-03-04 20:23:17,432 DEBUG org.apache.thrift.transport.TSaslTransport
>  - opening transport
> org.apache.thrift.transport.TSaslClientTransport@3c69362a
> 2020-03-04 20:23:17,441 ERROR org.apache.thrift.transport.TSaslTransport
>  - SASL negotiation failure
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
>   at
> 

Re: How to test flink job recover from checkpoint

2020-03-04 Thread Eleanore Jin
Hi Zhu Zhu and Abhinav,

I am able to verify the recovery from checkpoint based on your suggestions,
thanks a lot for the help!
Eleanore

On Wed, Mar 4, 2020 at 5:40 PM Bajaj, Abhinav 
wrote:

> I implemented a custom function that throws up a runtime exception.
>
>
>
> You can extend from simpler MapFunction or more complicated
> RichParallelSourceFunction depending on your use case.
>
> You can add logic to throw a runtime exception on a certain condition in
> the map or run method.   .
>
> You can use a count or timer to trigger the exception.
>
>
>
> Sharing a quick handwritten example.
>
>
>
> DataStream stream = .
>
> DataStream mappedStream = stream.map(new MapFunction String>>() {
>
>   @Override
>
>   public String map(String value) throws Exception {
>
> if (SOME_CONDITION) {
>
>   throw new RuntimeException("Lets test checkpointing");
>
> }
>
> return value;
>
>   }
>
> });
>
>
>
> ~ Abhinav Bajaj
>
>
>
>
>
> *From: *Eleanore Jin 
> *Date: *Wednesday, March 4, 2020 at 4:40 PM
> *To: *user , user-zh 
> *Subject: *How to test flink job recover from checkpoint
>
>
>
> Hi,
>
>
>
> I have a flink application and checkpoint is enabled, I am running locally
> using miniCluster.
>
>
>
> I just wonder if there is a way to simulate the failure, and verify that
> flink job restarts from checkpoint?
>
>
>
> Thanks a lot!
>
> Eleanore
>


Re: Question about runtime filter

2020-03-04 Thread Jingsong Li
Great exploration. And thanks for your information.
I believe you have a deep understanding of Flink's internal mechanism.

Best,
Jingsong Lee

On Thu, Mar 5, 2020 at 12:09 PM faaron zheng  wrote:

> I finally got through the runtimefilter in 1.10, the reason why it didn't
> call commit method is in OperatorCodeGenerator. It should call endInput()
> method correctly in generateOneInputStreamOperator. A complete process of
> runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side
> constructs bloomfilter and commit. 3. Jobmanager merge bloomfilter as a
> global one. 4. Probe side get global bloomfilter and filter data. Although
> runtimefilter is already achieved in blink, it doesn't have a independent
> commit. So it's a little hard to merge whole code once. I hope it helps if
> anyone try to do same thing.
>
> faaron zheng  于 2020年3月2日周一 下午7:52写道:
>
>> I set sql.exec.runtime-filter.wait to true. HiveTableSource take much
>> longer time but get same result. I think the reason is not commit
>> preAggregateAccumulator. But I dont know why it happens?
>>
>> JingsongLee  于 2020年3月2日周一 下午3:22写道:
>>
>>> Hi,
>>>
>>> Does runtime filter probe side wait for building runtime filter?
>>> Can you check the start time of build side and probe side?
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> --
>>> From:faaron zheng 
>>> Send Time:2020年3月2日(星期一) 14:55
>>> To:user 
>>> Subject:Question about runtime filter
>>>
>>> Hi, everyone
>>>
>>> These days, I am trying to implement runtime filter in flink1.10 with
>>> flink-sql-benchmark  according to blink. I mainly change three part of
>>> flink code: add runtime filter rule; modify the code gen and bloomfilter;
>>> add some aggregatedaccumulator  methods according to accumulator. Now, It
>>> seems runtime filter works in execution graph as follows:
>>> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
>>> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
>>> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
>>> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
>>> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
>>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
>>> -> Calc(select=[i_item_sk], where=[((i_category =
>>> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
>>> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>>>
>>> and
>>>
>>> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
>>> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
>>> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
>>> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
>>> d_same_day_lq, d_current_day, d_current_week, d_current_month,
>>> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
>>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
>>> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>>>
>>>
>>> However,the number of records sent is the same as normal.  Anyone who
>>> can give me some advices?
>>>
>>>
>>>
>>> Thanks
>>>
>>>

-- 
Best, Jingsong Lee


Re: Question about runtime filter

2020-03-04 Thread faaron zheng
I finally got through the runtimefilter in 1.10, the reason why it didn't
call commit method is in OperatorCodeGenerator. It should call endInput()
method correctly in generateOneInputStreamOperator. A complete process of
runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side
constructs bloomfilter and commit. 3. Jobmanager merge bloomfilter as a
global one. 4. Probe side get global bloomfilter and filter data. Although
runtimefilter is already achieved in blink, it doesn't have a independent
commit. So it's a little hard to merge whole code once. I hope it helps if
anyone try to do same thing.

faaron zheng  于 2020年3月2日周一 下午7:52写道:

> I set sql.exec.runtime-filter.wait to true. HiveTableSource take much
> longer time but get same result. I think the reason is not commit
> preAggregateAccumulator. But I dont know why it happens?
>
> JingsongLee  于 2020年3月2日周一 下午3:22写道:
>
>> Hi,
>>
>> Does runtime filter probe side wait for building runtime filter?
>> Can you check the start time of build side and probe side?
>>
>> Best,
>> Jingsong Lee
>>
>> --
>> From:faaron zheng 
>> Send Time:2020年3月2日(星期一) 14:55
>> To:user 
>> Subject:Question about runtime filter
>>
>> Hi, everyone
>>
>> These days, I am trying to implement runtime filter in flink1.10 with
>> flink-sql-benchmark  according to blink. I mainly change three part of
>> flink code: add runtime filter rule; modify the code gen and bloomfilter;
>> add some aggregatedaccumulator  methods according to accumulator. Now, It
>> seems runtime filter works in execution graph as follows:
>> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
>> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
>> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
>> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
>> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
>> -> Calc(select=[i_item_sk], where=[((i_category =
>> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
>> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>>
>> and
>>
>> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
>> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
>> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
>> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
>> d_same_day_lq, d_current_day, d_current_week, d_current_month,
>> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
>> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>>
>>
>> However,the number of records sent is the same as normal.  Anyone who can
>> give me some advices?
>>
>>
>>
>> Thanks
>>
>>


How to use self defined json format when create table from kafka stream?

2020-03-04 Thread wangl...@geekplus.com.cn

I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
+ "order_id BIGINT,\n"
+ "order_no VARCHAR,\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
...
+ "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
"id" : 1,
"name" : "order_id",
"column_type" : -5,
"last_value" : 4606458,
"value" : 4606458
  }, {
"id" : 2,
"name" : "order_no",
"column_type" : 12,
"last_value" : "EDBMFSJ1S2003050006628",
"value" : "EDBMFSJ1S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined 
format class.

Thanks,
Lei



wangl...@geekplus.com.cn



Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-04 Thread Xintong Song
Hi Abhinav,

Do you mind sharing the complete 'jobmanager.log'?

org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot
> request, no ResourceManager connected.
>
Sometimes you see this log because the ResourceManager is not yet connect
when the slot request arrives the SlotPool. If the ResourceManager is
connected later, the SlotPool will still send the pending slot requests, in
that case you should find logs for SlotPool requesting slots from
ResourceManager.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms……
>
This error message simply means that the slot requests are not satisfied in
5min. Various reasons might cause this problem.

   - The ResourceManager is not connected at all.
   - The ResourceManager is connected, but some TaskExecutors are not
   registered due to the ZK problem.
   - ZK recovery takes too much time, so that despite all JM, RM, TMs are
   able to connect to the ZK there might not be enough time to satisfy the
   slot request before the timeout.

It would need the complete 'jobmanager.log' (at least those from the job
restart to the NoResourceAvailableException) to find out which is the case.

Thank you~

Xintong Song



On Thu, Mar 5, 2020 at 7:30 AM Bajaj, Abhinav 
wrote:

> While I setup to reproduce the issue with debug logs, I would like to
> share more information I noticed in INFO logs.
>
>
>
> Below is the sequence of events/exceptions I notice during the time
> zookeeper was disrupted.
>
> I apologize in advance as they are a bit verbose.
>
>
>
>- Zookeeper seems to be down and leader election is disrupted –
>
>
>
> · 2020-02-27 06:28:23.572 [Curator-ConnectionStateManager-0]
> level=WARN  o.a.f.runtime.leaderelection.ZooKeeperLeaderElectionService  -
> Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@FOO_BAR:6126/user/resourcemanager
> no longer participates in the leader election.
>
> · 2020-02-27 06:28:23.573 [Curator-ConnectionStateManager-0]
> level=INFO  org.apache.flink.runtime.jobmaster.JobManagerRunner  -
> JobManager for job FOO_BAR (5a910928a71b469a091be168b0e74722) was revoked
> leadership at akka.tcp://flink@ FOO_BAR:6126/user/jobmanager_1.
>
> · 2020-02-27 06:28:23.573
> [flink-akka.actor.default-dispatcher-9897] level=INFO
> o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> ResourceManager akka.tcp://flink@ FOO_BAR:6126/user/resourcemanager was
> revoked leadership. Clearing fencing token.
>
> · 2020-02-27 06:28:23.574
> [flink-akka.actor.default-dispatcher-9897] level=INFO
> o.a.flink.runtime.resourcemanager.slotmanager.SlotManager  - Suspending
> the SlotManager.
>
> · 2020-02-27 06:28:53.577 [Curator-Framework-0] level=ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
> occurred in the cluster entrypoint.
>
> org.apache.flink.runtime.dispatcher.DispatcherException: Received an error
> from the LeaderElectionService.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.handleError(Dispatcher.java:941)
>
> at
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.unhandledError(ZooKeeperLeaderElectionService.java:416)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:576)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:572)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:571)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:740)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
>
> at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
>
> at 

Re: How to test flink job recover from checkpoint

2020-03-04 Thread Bajaj, Abhinav
I implemented a custom function that throws up a runtime exception.

You can extend from simpler MapFunction or more complicated 
RichParallelSourceFunction depending on your use case.
You can add logic to throw a runtime exception on a certain condition in the 
map or run method.   .
You can use a count or timer to trigger the exception.

Sharing a quick handwritten example.

DataStream stream = .
DataStream mappedStream = stream.map(new MapFunction>() 
{
  @Override
  public String map(String value) throws Exception {
if (SOME_CONDITION) {
  throw new RuntimeException("Lets test checkpointing");
}
return value;
  }
});

~ Abhinav Bajaj


From: Eleanore Jin 
Date: Wednesday, March 4, 2020 at 4:40 PM
To: user , user-zh 
Subject: How to test flink job recover from checkpoint

Hi,

I have a flink application and checkpoint is enabled, I am running locally 
using miniCluster.

I just wonder if there is a way to simulate the failure, and verify that flink 
job restarts from checkpoint?

Thanks a lot!
Eleanore


Re: How to test flink job recover from checkpoint

2020-03-04 Thread Zhu Zhu
Hi Eleanore,

You can change your application tasks to throw exceptions in a certain
frequency.
Alternatively, if the application has external dependencies (e.g. source),
you can trigger failures manually by manipulating the status of the
external service (e.g. shutdown the source service, or break the network
connection between the Flink app and the source service).

Thanks,
Zhu Zhu

Eleanore Jin  于2020年3月5日周四 上午8:40写道:

> Hi,
>
> I have a flink application and checkpoint is enabled, I am running locally
> using miniCluster.
>
> I just wonder if there is a way to simulate the failure, and verify that
> flink job restarts from checkpoint?
>
> Thanks a lot!
> Eleanore
>


Re: How to test flink job recover from checkpoint

2020-03-04 Thread Zhu Zhu
Hi Eleanore,

You can change your application tasks to throw exceptions in a certain
frequency.
Alternatively, if the application has external dependencies (e.g. source),
you can trigger failures manually by manipulating the status of the
external service (e.g. shutdown the source service, or break the network
connection between the Flink app and the source service).

Thanks,
Zhu Zhu

Eleanore Jin  于2020年3月5日周四 上午8:40写道:

> Hi,
>
> I have a flink application and checkpoint is enabled, I am running locally
> using miniCluster.
>
> I just wonder if there is a way to simulate the failure, and verify that
> flink job restarts from checkpoint?
>
> Thanks a lot!
> Eleanore
>


Re: StreamingFileSink Not Flushing All Data

2020-03-04 Thread Austin Cawley-Edwards
Hey Kostas,

We’re a little bit off from a 1.10 update but I can certainly see if that
CompressWriterFactory might solve my use case for when we do.

If there is anything I can do to help document that feature, please let me
know.

Thanks!

Austin

On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas  wrote:

> Hi Austin,
>
> I will have a look at your repo. In the meantime, given that [1] is
> already merged in 1.10,
> would upgrading to 1.10 and using the newly introduced
> CompressWriterFactory be an option for you?
>
> It is unfortunate that this feature was not documented.
>
> Cheers,
> Kostas
>
> [1] https://issues.apache.org/jira/browse/FLINK-13634
>
>
> On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy
>> Kostas -- strange though, as I wasn't using a bounded source when I first
>> ran into this issue. I have updated the example repo to use an unbounded
>> source[1], and the same file corruption problems remain.
>>
>> Anything else I could be doing wrong with the compression stream?
>>
>> Thanks again,
>> Austin
>>
>> [1]:
>> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>>
>> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas 
>> wrote:
>>
>>> Hi Austin and Rafi,
>>>
>>> @Rafi Thanks for providing the pointers!
>>> Unfortunately there is no progress on the FLIP (or the issue).
>>>
>>> @ Austin In the meantime, what you could do --assuming that your input
>>> is bounded --  you could simply not stop the job after the whole input is
>>> processed, then wait until the output is committed, and then cancel the
>>> job. I know and I agree that this is not an elegant solution but it is a
>>> temporary workaround.
>>>
>>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:
>>>
 Hi,

 This happens because StreamingFileSink does not support a finite input
 stream.
 In the docs it's mentioned under "Important Considerations":

 [image: image.png]

 This behaviour often surprises users...

 There's a FLIP
 
  and
 an issue  about
 fixing this. I'm not sure what's the status though, maybe Kostas can share.

 Thanks,
 Rafi


 On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> Hi Dawid and Kostas,
>
> Sorry for the late reply + thank you for the troubleshooting. I put
> together an example repo that reproduces the issue[1], because I did have
> checkpointing enabled in my previous case -- still must be doing something
> wrong with that config though.
>
> Thanks!
> Austin
>
> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>
>
> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
> wrote:
>
>> Hi Austin,
>>
>> Dawid is correct in that you need to enable checkpointing for the
>> StreamingFileSink to work.
>>
>> I hope this solves the problem,
>> Kostas
>>
>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>  wrote:
>> >
>> > Hi Austing,
>> >
>> > If I am not mistaken the StreamingFileSink by default flushes on
>> checkpoints. If you don't have checkpoints enabled it might happen that 
>> not
>> all data is flushed.
>> >
>> > I think you can also adjust that behavior with:
>> >
>> > forBulkFormat(...)
>> >
>> > .withRollingPolicy(/* your custom logic */)
>> >
>> > I also cc Kostas who should be able to correct me if I am wrong.
>> >
>> > Best,
>> >
>> > Dawid
>> >
>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>> >
>> > Hi there,
>> >
>> > Using Flink 1.9.1, trying to write .tgz files with the
>> StreamingFileSink#BulkWriter. It seems like flushing the output stream
>> doesn't flush all the data written. I've verified I can create valid 
>> files
>> using the same APIs and data on there own, so thinking it must be 
>> something
>> I'm doing wrong with the bulk format. I'm writing to the local 
>> filesystem,
>> with the `file://` protocol.
>> >
>> > For Tar/ Gzipping, I'm using the Apache Commons Compression
>> library, version 1.20.
>> >
>> > Here's a runnable example of the issue:
>> >
>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
>> > import
>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
>> > import
>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
>> > import 

How to test flink job recover from checkpoint

2020-03-04 Thread Eleanore Jin
Hi,

I have a flink application and checkpoint is enabled, I am running locally
using miniCluster.

I just wonder if there is a way to simulate the failure, and verify that
flink job restarts from checkpoint?

Thanks a lot!
Eleanore


How to test flink job recover from checkpoint

2020-03-04 Thread Eleanore Jin
Hi,

I have a flink application and checkpoint is enabled, I am running locally
using miniCluster.

I just wonder if there is a way to simulate the failure, and verify that
flink job restarts from checkpoint?

Thanks a lot!
Eleanore


Rocksdb Serialization issue

2020-03-04 Thread David Morin
Hello,

I have this Exception in my datastream app and I can't find the root cause.
I consume data from Kafka and it fails when I try to get a value from my 
MapState in RocksDB.
It was working in previous release of my app but I can't find the cause of this 
error.

java.lang.ArrayIndexOutOfBoundsException: 512
at 
org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
at 
org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
at 
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
..

Flink version: 1.9.2




RE: Teradata as JDBC Connection

2020-03-04 Thread Norm Vilmer (Contractor)
Same error with this change:

public class Teradata extends ConnectorDescriptor {
/**
 * Constructs a {@link ConnectorDescriptor}.
 */
public Teradata() {
super("jdbc", 1, false);
}

@Override
protected Map toConnectorProperties() {
Map map = new HashMap<>();
map.put(JDBCValidator.CONNECTOR_DRIVER, "com.teradata.jdbc.TeraDriver");
map.put(JDBCValidator.CONNECTOR_URL, 
"jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
map.put(JDBCValidator.CONNECTOR_USERNAME, "...");
map.put(JDBCValidator.CONNECTOR_PASSWORD, "...!");
return map;
}
}

-Original Message-
From: Norm Vilmer (Contractor)  
Sent: Wednesday, March 4, 2020 10:37 AM
To: user@flink.apache.org
Subject: EXTERNAL - Teradata as JDBC Connection

Caution: Sender is from outside SWA. Take caution before opening 
links/attachments or replying with sensitive data. If suspicious, forward to 
'suspici...@wnco.com'.

Using Flink 1.10 and coding in Java 11, is it possible use to write to Teradata 
in append mode? MySQL, PostgreSQL, and Derby are the only supported drivers 
listed. Thanks.

https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_table_connect.html-23connectors=DwIFAg=dyyteaO_66X5RejcGgaVFCWGX8V6S6CQobBcYjo__mc=a8BqCmWrJ1FuU14JVrlQLeWdeeSBWSiCJA9Y5xTWafg=kfV3arAbKYvpd5IvCtggkHsoDXKTgA1RrGMWrbcWZOo=n91D15kGNf9TDtKedGYD8EfDYxnvEzY8POgNtSE-icY=
 

I created the ConnectorDescriptor below and am using it from 
tableEnvironment.connect() but get the exception shown below.

public class Teradata extends ConnectorDescriptor {
/**
 * Constructs a {@link ConnectorDescriptor}.
 */
public Teradata() {
super("jdbc", 1, false);
}

@Override
protected Map toConnectorProperties() {
Map map = new HashMap<>();
map.put("Drivername", "com.teradata.jdbc.TeraDriver");
map.put("DBUrl", "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
map.put("Username", "...");
map.put("Password", "...");
return map;
}
}

org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' 
in the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
Unsupported property keys:
drivername
update-mode
password
dburl
username

The following properties are requested:
connector.property-version=1
connector.type=jdbc
dburl=jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP
drivername=com.teradata.jdbc.TeraDriver
password=xxx
schema.0.data-type=VARCHAR(1)
schema.0.name=f0
schema.1.data-type=VARCHAR(1)
schema.1.name=f1
schema.2.data-type=VARCHAR(1)
schema.2.name=f2
update-mode=append
username=xxx

The following factories have been considered:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory

*** CONFIDENTIALITY NOTICE ***

This e-mail message and all attachments transmitted with it may contain legally 
privileged and confidential information intended solely for the use of the 
addressee. If the reader of this message is not the intended recipient, you are 
hereby notified that any reading, dissemination, distribution, copying, or 
other use of this message or its attachments is strictly prohibited. If you 
have received this message in error, please notify the sender immediately and 
delete this message from your system. Thank you.

*** CONFIDENTIALITY NOTICE ***

This e-mail message and all attachments transmitted with it may contain legally 
privileged and confidential information intended solely for the use of the 
addressee. If the reader of this message is not the intended recipient, you are 
hereby notified that any reading, dissemination, distribution, copying, or 
other use of this message or its attachments is strictly prohibited. If you 
have received this message in error, please notify the sender immediately and 
delete this message from your system. Thank you.


Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-04 Thread Bajaj, Abhinav
While I setup to reproduce the issue with debug logs, I would like to share 
more information I noticed in INFO logs.

Below is the sequence of events/exceptions I notice during the time zookeeper 
was disrupted.
I apologize in advance as they are a bit verbose.


  *   Zookeeper seems to be down and leader election is disrupted –



· 2020-02-27 06:28:23.572 [Curator-ConnectionStateManager-0] level=WARN 
 o.a.f.runtime.leaderelection.ZooKeeperLeaderElectionService  - Connection to 
ZooKeeper suspended. The contender 
akka.tcp://flink@FOO_BAR:6126/user/resourcemanager no longer participates in 
the leader election.

· 2020-02-27 06:28:23.573 [Curator-ConnectionStateManager-0] level=INFO 
 org.apache.flink.runtime.jobmaster.JobManagerRunner  - JobManager for job 
FOO_BAR (5a910928a71b469a091be168b0e74722) was revoked leadership at 
akka.tcp://flink@ FOO_BAR:6126/user/jobmanager_1.

· 2020-02-27 06:28:23.573 [flink-akka.actor.default-dispatcher-9897] 
level=INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - 
ResourceManager akka.tcp://flink@ FOO_BAR:6126/user/resourcemanager was revoked 
leadership. Clearing fencing token.

· 2020-02-27 06:28:23.574 [flink-akka.actor.default-dispatcher-9897] 
level=INFO  o.a.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
Suspending the SlotManager.

· 2020-02-27 06:28:53.577 [Curator-Framework-0] level=ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred 
in the cluster entrypoint.

org.apache.flink.runtime.dispatcher.DispatcherException: Received an error from 
the LeaderElectionService.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.handleError(Dispatcher.java:941)

at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.unhandledError(ZooKeeperLeaderElectionService.java:416)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:576)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:572)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93)

at 
org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:571)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:740)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Unhandled error in 
ZooKeeperLeaderElectionService: Background operation retry gave up

... 18 common frames omitted

Caused by: 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$ConnectionLossException:
 KeeperErrorCode = ConnectionLoss

at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)

at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)

... 10 common frames omitted



  *   ClusterEntrypoint restarts and tries to connect to Zookeeper. It seems 
its fails for some time but able to connect later -



· 2020-02-27 06:28:56.467 [main] level=INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -  Starting 
StandaloneSessionClusterEntrypoint (Version: 1.7.1, Rev:89eafb4, 

Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-04 Thread Bajaj, Abhinav
Thanks Xintong for pointing that out.

I will dig deeper and get back with my findings.

~ Abhinav Bajaj

From: Xintong Song 
Date: Tuesday, March 3, 2020 at 7:36 PM
To: "Bajaj, Abhinav" 
Cc: "user@flink.apache.org" 
Subject: Re: JobMaster does not register with ResourceManager in high 
availability setup

Hi Abhinav,

The JobMaster log "Connecting to ResourceManager ..." is printed after 
JobMaster retrieve ResourceManager address from ZooKeeper. In your case, I 
assume there's some ZK problem that JM cannot resolve RM address.



Have you confirmed whether the ZK pods are recovered after the second 
disruption? And does the address changed?



You can also try to enable debug logs for the following components, to see if 
there's any useful information.

org.apache.flink.runtime.jobmaster

org.apache.flink.runtime.resourcemanager

org.apache.flink.runtime.highavailability

org.apache.flink.runtime.leaderretrieval

org.apache.zookeeper



Thank you~

Xintong Song


On Wed, Mar 4, 2020 at 5:42 AM Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

We recently came across an issue where JobMaster does not register with 
ResourceManager in Fink high availability setup.
Let me share the details below.

Setup

  *   Flink 1.7.1
  *   K8s
  *   High availability mode with a single Jobmanager and 3 zookeeper nodes in 
quorum.

Scenario

  *   Zookeeper pods are disrupted by K8s that leads to resetting of leadership 
of JobMaster & ResourceManager and restart of the Flink job.

Observations

  *   After the first disruption of Zookeeper, JobMaster and ResourceManager 
were reset & were able to register with each other. Sharing few logs that 
confirm that. Flink job restarted successfully.

org.apache.flink.runtime.jobmaster.JobMaster  - Connecting to 
ResourceManager

o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job 
manager

o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job 
manager

org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully 
registered at ResourceManager...

  *After another disruption later on the same Flink cluster, JobMaster & 
ResourceManager were not connected and below logs can be noticed and eventually 
scheduler times out.
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot 
request, no ResourceManager connected.

   ………


org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms……

  *   I can confirm from the logs that both JobMaster & ResourceManager were 
running. JobMaster was trying to recover the job and ResourceManager registered 
the taskmanagers.
  *   The odd thing is that the log for JobMaster trying to connect to 
ResourceManager is missing. So I assume JobMaster didn’t try to connect to 
ResourceManager.

I can share more logs if required.

Has anyone noticed similar behavior or is this a known issue with Flink 1.7.1?
Any recommendations or suggestions on fix or workaround?

Appreciate your time and help here.

~ Abhinav Bajaj




Re: Unable to recover from savepoint and checkpoint

2020-03-04 Thread Puneet Kinra
I killed the task manager and job manager forcefully by the kill -9 command
and while recovering
I am checking the flag returned by the isRestored method in the
Intializestate function.
 anyways I figured the issue and fixed it thanks for the support.

On Tue, Mar 3, 2020 at 7:24 PM Gary Yao  wrote:

> Hi Puneet,
>
> Can you describe how you validated that the state is not restored
> properly? Specifically, how did you introduce faults to the cluster?
>
> Best,
> Gary
>
> On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Sorry for the missed information
>>
>> On recovery the value is coming as false instead of true, state.backend
>> has been configured in flink-conf.yaml  along the
>> the path for checkpointing and savepoint.
>>
>> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> Hi
>>>
>>> Stuck with the simple program regarding the checkpointing Flink version
>>> I am using 1.10.0
>>>
>>> *Here I have created DummySource for testing*
>>>
>>> *DummySource*
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>
>>> public class BeaconSource implements SourceFunction>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>> private Boolean isRunning=true;
>>>
>>>
>>> public BeaconSource() {
>>> super();
>>> // TODO Auto-generated constructor stub
>>> }
>>>
>>>
>>>
>>> public void cancel() {
>>> // TODO Auto-generated method stub
>>>
>>> this.isRunning=false;
>>>
>>> }
>>>
>>> public void run(SourceContext> arg0) throws
>>> Exception {
>>> // TODO Auto-generated method stub
>>> while(isRunning) {
>>> Thread.sleep(3L);
>>> arg0.collect(new Tuple2(10L,"AMQSource"));
>>> }
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> ---
>>> *KeyedProcessFunction (to register the timer and update the status to
>>> true so that only one-time trigger should)*
>>>
>>>
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.common.functions.IterationRuntimeContext;
>>> import org.apache.flink.api.common.functions.RuntimeContext;
>>> import org.apache.flink.api.common.state.ListState;
>>> import org.apache.flink.api.common.state.ListStateDescriptor;
>>> import org.apache.flink.api.common.state.ValueState;
>>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.TypeHint;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.java.tuple.Tuple;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.tuple.Tuple3;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>> import org.apache.flink.util.Collector;
>>>
>>> import com.google.gson.JsonObject;
>>> import com.google.gson.JsonParser;
>>>
>>> import scala.collection.mutable.LinkedHashMap;
>>>
>>>
>>>
>>> import java.util.HashMap;
>>> import java.util.Map;
>>> import java.util.Map.Entry;
>>> import java.util.Set;
>>>
>>> public class TimeProcessTrigger extends
>>> KeyedProcessFunction,String>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>> /**
>>> *
>>> */
>>>
>>> private transient ValueState contacthistory;
>>> private static final  Long  ONE_MINUTE=6L;
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void onTimer(long timestamp, KeyedProcessFunction>> Tuple2, String>.OnTimerContext ctx,
>>> Collector out) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.onTimer(timestamp, ctx, out);
>>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void open(Configuration parameters) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.open(parameters);
>>>
>>>
>>> ValueStateDescriptor descriptor = new
>>> ValueStateDescriptor(
>>> "contact-history", // the state name
>>> Boolean.class); // type information
>>>
>>> this.contacthistory=getRuntimeContext().getState(descriptor);
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void processElement(Tuple2 input,
>>> KeyedProcessFunction, String>.Context ctx,
>>> Collector collect)
>>> throws Exception {
>>> // TODO Auto-generated method stub
>>>
>>>
>>> System.out.println(this.contacthistory.value());
>>> Boolean value = this.contacthistory.value();
>>> if(value==null) {
>>> Long currentTime = ctx.timerService().currentProcessingTime();
>>> Long regTimer=currentTime+ONE_MINUTE;

Re: Flink's Either type information

2020-03-04 Thread Arvid Heise
Hi Jacopo,

to prevent type erasure in Java, you need to create a sub-type that
contains only reified types.

Instead of using a generic type with bound variables in

 stream.process(new MyKeyedBroadcastProcessFunction());

you can use

 stream.process(new MyKeyedBroadcastProcessFunction() {
 });

This will create an anonymous sub-type of MyKeyedBroadcastProcessFunction
that has the two types reified.

Another solution is to already create the sub type in your factory method.

  KeyedBroadcastProcessFunction, String, Either>
createFunction(...) {
  return KeyedBroadcastProcessFunction, String, Either> {
  ...
  };
 }


On Wed, Mar 4, 2020 at 4:08 PM  wrote:

> Hi all,
>
>
>
> Yes my problem is that I do not create the function inline but create a
> function directly when creating the data stream job.
>
> My code (which I cannot share) is exactly like your example, Yun, are you
> aware if there is a way to prevent code erasure?
>
>
>
> Kind regards,
>
>
>
> Jacopo Gobbi
>
>
>
>
>
> *From:* Yun Gao [mailto:yungao...@aliyun.com]
> *Sent:* Freitag, 21. Februar 2020 16:00
> *To:* Robert Metzger; Gobbi, Jacopo-XT
> *Cc:* user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>   Hi Jacopo, Robert,
>
>
>
>  Very sorry for missing the previous email and not response in
> time. I think exactly as Robert has pointed out with the example: using
> inline anonymous subclass of *KeyedBroadcastProcessFunction* should not
> cause the problem. As far as I know, the possible reason that cause the
> attached exception might be that the parameter types of *Either get* erased
> due to the way to create *KeyedBroadcastProcessFunction* object. For
> example, if you first implement a generic subclass of
> *KeyedBroadcastProcessFunction* like*:*
>
>
>
>   *public class MyKeyedBroadcastProcessFunction MyRightType> extends KeyedBroadcastProcessFunction String>, String, Either> { ... }*
>
>
>
>  and create a function object directly when constructing the
> DataStream job:
>
>
>
>  *stream.process(new MyKeyedBroadcastProcessFunction MyRightType>());*
>
>
>
>  Then *MyLeftType* and *MyRightType *will be erased and will cause
> the attached exception when Flink tries to inference the output type.
>
>
>
>  And I totally agree with Robert that attaching the corresponding
> codes would help debugging the problem.
>
>
>
>   Yours,
>
> Yun
>
>
>
>
>
> --
>
> From:Robert Metzger 
>
> Send Time:2020 Feb. 21 (Fri.) 19:47
>
> To:jacopo.gobbi 
>
> Cc:yungao.gy ; user 
>
> Subject:Re: Flink's Either type information
>
>
>
> Hey Jacopo,
>
> can you post an example to reproduce the issue? I've tried it, but it
> worked in this artificial example:
>
>
>
> MapStateDescriptor state = *new 
> *MapStateDescriptor<>(*"test"*, BasicTypeInfo.*STRING_TYPE_INFO*, 
> BasicTypeInfo.*STRING_TYPE_INFO*);
> DataStream> result = input
>   .map((MapFunction>) value -> 
> Tuple2.*of*(0, 
> value)).returns(TupleTypeInfo.*getBasicTupleTypeInfo*(Integer.*class*, 
> String.*class*))
>   .keyBy(0).connect(input.broadcast(state))
>   .process(*new *KeyedBroadcastProcessFunction String>, String, Either>() {
>  @Override
>  *public void *processElement(Tuple2 value, 
> ReadOnlyContext ctx, Collector> out) *throws 
> *Exception {
> out.collect(Either.*Left*(111));
>  }
>  @Override
>  *public void *processBroadcastElement(String value, Context ctx, 
> Collector> out) *throws *Exception { }
>   });
> result.print();
>
>
>
> On Wed, Feb 19, 2020 at 6:07 PM  wrote:
>
> Yes, I create it the way you mentioned.
>
>
>
> *From:* Yun Gao [mailto:yungao...@aliyun.com]
> *Sent:* Dienstag, 18. Februar 2020 10:12
> *To:* Gobbi, Jacopo-XT; user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>   Hi Jacopo,
>
>
>
>   Could you also provide how the KeyedBroadcastProcessFunction is
> created when constructing datastream API ? For example, are you using
> something like
>
>
>
>   new KeyedBroadcastProcessFunction Either() {
>
>// Function implementation
>
>  }
>
>
>
>  or something else?
>
>
>
>  Best,
>
>   Yun
>
>
>
>
>
> --
>
> From:jacopo.gobbi 
>
> Send Time:2020 Feb. 17 (Mon.) 18:31
>
> To:user 
>
> Subject:Flink's Either type information
>
>
>
> Hi all,
>
>
>
> How can an Either value be returned by a KeyedBroadcastProcessFunction?
>
> We keep getting "InvalidTypesException: Type extraction is not possible on
> Either type as it does not contain information about the 'left' type." when
> doing: out.collect(Either.Right(myObject));
>
>
>
> Thanks,
>
>
>
> Jacopo Gobbi
>
>
>
>
>
>
>


Re: checkpoint _metadata file has >20x different in size among different check-points

2020-03-04 Thread Arvid Heise
Hi Yu,

are you using incremental checkpoints [1]? If so, then the smaller
checkpoints would be the deltas and the larger the complete state.

[1]
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html

On Wed, Mar 4, 2020 at 6:41 PM Yu Yang  wrote:

> Hi all,
>
> We have a flink job that does check-pointing per 10 minutes. We noticed
> that for the check-points of this job,  the _metadata file size can vary a
> lot. In some checkpoint, we observe that _metadata file size was >900MB,
> while in some other check-points of the same job, the _metadata file size
> is < 4MB.  Any insights on what may cause the difference?
>
> Thank you!
>
> Regards,
> -Yu
>


Re: Very large _metadata file

2020-03-04 Thread Jacob Sevart
Kostas and Gordon,

Thanks for the suggestions! I'm on RocksDB. We don't have that setting
configured so it should be at the default 1024b. This is the full "state.*"
section showing in the JobManager UI.

[image: Screen Shot 2020-03-04 at 9.56.20 AM.png]

Jacob

On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Jacob,
>
> Apart from what Klou already mentioned, one slightly possible reason:
>
> If you are using the FsStateBackend, it is also possible that your state
> is small enough to be considered to be stored inline within the metadata
> file.
> That is governed by the "state.backend.fs.memory-threshold" configuration,
> with a default value of 1024 bytes, or can also be configured with the
> `fileStateSizeThreshold` argument when constructing the `FsStateBackend`.
> The purpose of that threshold is to ensure that the backend does not
> create a large amount of very small files, where potentially the file
> pointers are actually larger than the state itself.
>
> Cheers,
> Gordon
>
>
>
> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas  wrote:
>
>> Hi Jacob,
>>
>> Could you specify which StateBackend you are using?
>>
>> The reason I am asking is that, from the documentation in [1]:
>>
>> "Note that if you use the MemoryStateBackend, metadata and savepoint
>> state will be stored in the _metadata file. Since it is
>> self-contained, you may move the file and restore from any location."
>>
>> I am also cc'ing Gordon who may know a bit more about state formats.
>>
>> I hope this helps,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
>> 
>>
>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
>> >
>> > Per the documentation:
>> >
>> > "The meta data file of a Savepoint contains (primarily) pointers to all
>> files on stable storage that are part of the Savepoint, in form of absolute
>> paths."
>> >
>> > I somehow have a _metadata file that's 1.9GB. Running strings on it I
>> find 962 strings, most of which look like HDFS paths, which leaves a lot of
>> that file-size unexplained. What else is in there, and how exactly could
>> this be happening?
>> >
>> > We're running 1.6.
>> >
>> > Jacob
>>
>

-- 
Jacob Sevart
Software Engineer, Safety


checkpoint _metadata file has >20x different in size among different check-points

2020-03-04 Thread Yu Yang
Hi all,

We have a flink job that does check-pointing per 10 minutes. We noticed
that for the check-points of this job,  the _metadata file size can vary a
lot. In some checkpoint, we observe that _metadata file size was >900MB,
while in some other check-points of the same job, the _metadata file size
is < 4MB.  Any insights on what may cause the difference?

Thank you!

Regards,
-Yu


Re: Building with Hadoop 3

2020-03-04 Thread Stephan Ewen
Have you tried to just export Hadoop 3's classpath to `HADOOP_CLASSPATH`
and see if that works out of the box?

If the main use case is HDFS access, then there is a fair chance it might
just work, because Flink uses only a small subset of the Hadoop FS API
which is stable between 2.x and 3.x, as far as I tried it out a while back.

On Tue, Mar 3, 2020 at 6:03 PM LINZ, Arnaud 
wrote:

> Hello,
>
> Have you shared it somewhere on the web already?
>
> Best,
>
> Arnaud
>
>
>
> *De :* vino yang 
> *Envoyé :* mercredi 4 décembre 2019 11:55
> *À :* Márton Balassi 
> *Cc :* Chesnay Schepler ; Foster, Craig <
> foscr...@amazon.com>; user@flink.apache.org; d...@flink.apache.org
> *Objet :* Re: Building with Hadoop 3
>
>
>
> Hi Marton,
>
>
>
> Thanks for your explanation. Personally, I look forward to your
> contribution!
>
>
>
> Best,
>
> Vino
>
>
>
> Márton Balassi  于2019年12月4日周三 下午5:15写道:
>
> Wearing my Cloudera hat I can tell you that we have done this exercise for
> our distros of the  3.0 and 3.1 Hadoop versions. We have not contributed
> these back just yet, but we are open to do so. If the community is
> interested we can contribute those changes back to flink-shaded and suggest
> the necessay changes to flink too. The task was not overly complex, but it
> certainly involved a bit of dependency hell. :-)
>
>
>
> Right now we are focused on internal timelines, but we could invest into
> contributing this back in the end of January timeframe if the community
> deems this a worthwhile effort.
>
>
>
> Best,
>
> Marton
>
>
>
> On Wed, Dec 4, 2019 at 10:00 AM Chesnay Schepler 
> wrote:
>
> There's no JIRA and no one actively working on it. I'm not aware of any
> investigations on the matter; hence the first step would be to just try it
> out.
>
>
>
> A flink-shaded artifact isn't a hard requirement; Flink will work with any
> 2.X hadoop distribution (provided that there aren't any dependency clashes).
>
>
>
> On 03/12/2019 18:22, Foster, Craig wrote:
>
> Hi:
>
> I don’t see a JIRA for Hadoop 3 support. I see a comment on a JIRA here
> from a year ago that no one is looking into Hadoop 3 support [1]. Is there
> a document or JIRA that now exists which would point to what needs to be
> done to support Hadoop 3? Right now builds with Hadoop 3 don’t work
> obviously because there’s no flink-shaded-hadoop-3 artifacts.
>
>
>
> Thanks!
>
> Craig
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11086
>
>
>
>
>
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Teradata as JDBC Connection

2020-03-04 Thread Norm Vilmer (Contractor)
Using Flink 1.10 and coding in Java 11, is it possible use to write to Teradata 
in append mode? MySQL, PostgreSQL, and Derby are the only supported drivers 
listed. Thanks.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#connectors

I created the ConnectorDescriptor below and am using it from 
tableEnvironment.connect() but get the exception shown below.

public class Teradata extends ConnectorDescriptor {
/**
 * Constructs a {@link ConnectorDescriptor}.
 */
public Teradata() {
super("jdbc", 1, false);
}

@Override
protected Map toConnectorProperties() {
Map map = new HashMap<>();
map.put("Drivername", "com.teradata.jdbc.TeraDriver");
map.put("DBUrl", "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
map.put("Username", "...");
map.put("Password", "...");
return map;
}
}

org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' 
in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
Unsupported property keys:
drivername
update-mode
password
dburl
username

The following properties are requested:
connector.property-version=1
connector.type=jdbc
dburl=jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP
drivername=com.teradata.jdbc.TeraDriver
password=xxx
schema.0.data-type=VARCHAR(1)
schema.0.name=f0
schema.1.data-type=VARCHAR(1)
schema.1.name=f1
schema.2.data-type=VARCHAR(1)
schema.2.name=f2
update-mode=append
username=xxx

The following factories have been considered:
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory

*** CONFIDENTIALITY NOTICE ***

This e-mail message and all attachments transmitted with it may contain legally 
privileged and confidential information intended solely for the use of the 
addressee. If the reader of this message is not the intended recipient, you are 
hereby notified that any reading, dissemination, distribution, copying, or 
other use of this message or its attachments is strictly prohibited. If you 
have received this message in error, please notify the sender immediately and 
delete this message from your system. Thank you.


CFP: Workshop on Large Scale RDF Analytics (LASCAR-20) at ESWC'20

2020-03-04 Thread Hajira Jabeen
**
We apologize for cross-postings.
We appreciate your great help in forwarding this CFP to your
colleagues and friends.
**

Call for *Papers & Posters* for the 2nd Workshop on Large Scale RDF
Analytics (LASCAR-20), collocated with the Extended Semantic Web
Conference (ESWC) 2020

Venue & Dates:
Heraklion, Crete, Greece, May 31st, 2020

Workshop Website:http://lascar.sda.tech

[Topics of interest]

LASCAR-20 seeks original articles and posters describing theoretical
and practical methods as well as techniques for performing scalable
analytics on knowledge graphs. All papers must be original and not
simultaneously submitted to another journal or conference. The
following paper and poster categories are welcome:

  * Decentralized KG data management including parsing, compression,
partitioning and smart indexing
  * Large scale KG enrichment using link prediction, entity
resolution, entity disambiguation or similarity estimation
  * Machine Learning e.g. clustering, blocking, or anomaly detection
  * Complex analytics with distributed KG embeddings
  * Connecting property Graphs with RDF and reasoning
  * Use-cases presenting semantic technologies at industry scale

[Important Dates]

Electronic submission of full papers:   March 14th, 2020
Notification of paper acceptance:   March 27th, 2020
Camera-ready of accepted papers:April 10th, 2020
Workshop day:   May 31st, 2020

[Submission Guidelines]

All papers should be formatted according to the standard LNCS Style.
All papers will be peer reviewed using the single-blind approach.
Authors of the accepted papers will be asked to register for the
workshop and will have the opportunity to present and participate in
the workshop. Long papers should not be longer than 10 pages including
the references, and short papers should not exceed 6 pages, including
all references. The accepted papers will be published online in CEUR
Workshop Proceedings (CEUR-WS.org). Proceedings will be available for
download after the conference. The pre-print will be made available
during the conference. The authors of the accepted posters will be
invited to present their posters at the workshop.

Please submit your work via EasyChair
(https://www.easychair.org/conferences/?conf=lascar20)

Dr.  Hajira Jabeen
Senior researcher,
SDA, Universität Bonn.

http://sda.cs.uni-bonn.de/people/dr-hajira-jabeen/
hajirajabeen.github.io/ 


RE: Flink's Either type information

2020-03-04 Thread jacopo.gobbi
Hi all,

Yes my problem is that I do not create the function inline but create a 
function directly when creating the data stream job.
My code (which I cannot share) is exactly like your example, Yun, are you aware 
if there is a way to prevent code erasure?

Kind regards,

Jacopo Gobbi


From: Yun Gao [mailto:yungao...@aliyun.com]
Sent: Freitag, 21. Februar 2020 16:00
To: Robert Metzger; Gobbi, Jacopo-XT
Cc: user
Subject: [External] Re: Flink's Either type information

  Hi Jacopo, Robert,

 Very sorry for missing the previous email and not response in time. I 
think exactly as Robert has pointed out with the example: using inline 
anonymous subclass of KeyedBroadcastProcessFunction should not cause the 
problem. As far as I know, the possible reason that cause the attached 
exception might be that the parameter types of Either get erased due to the way 
to create KeyedBroadcastProcessFunction object. For example, if you first 
implement a generic subclass of KeyedBroadcastProcessFunction like:

  public class MyKeyedBroadcastProcessFunction 
extends KeyedBroadcastProcessFunction, String, 
Either> { ... }

 and create a function object directly when constructing the DataStream job:

 stream.process(new MyKeyedBroadcastProcessFunction());

 Then MyLeftType and MyRightType will be erased and will cause the attached 
exception when Flink tries to inference the output type.

 And I totally agree with Robert that attaching the corresponding codes 
would help debugging the problem.

  Yours,
Yun


--
From:Robert Metzger 
Send Time:2020 Feb. 21 (Fri.) 19:47
To:jacopo.gobbi 
Cc:yungao.gy ; user 
Subject:Re: Flink's Either type information

Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it worked in 
this artificial example:


MapStateDescriptor state = new MapStateDescriptor<>("test", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream> result = input
  .map((MapFunction>) value -> Tuple2.of(0, 
value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, 
String.class))
  .keyBy(0).connect(input.broadcast(state))
  .process(new KeyedBroadcastProcessFunction, String, Either>() {
 @Override
 public void processElement(Tuple2 value, 
ReadOnlyContext ctx, Collector> out) throws Exception {
out.collect(Either.Left(111));
 }
 @Override
 public void processBroadcastElement(String value, Context ctx, 
Collector> out) throws Exception { }
  });
result.print();

On Wed, Feb 19, 2020 at 6:07 PM 
mailto:jacopo.go...@ubs.com>> wrote:
Yes, I create it the way you mentioned.

From: Yun Gao [mailto:yungao...@aliyun.com]
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information

  Hi Jacopo,

  Could you also provide how the KeyedBroadcastProcessFunction is 
created when constructing datastream API ? For example, are you using something 
like

  new KeyedBroadcastProcessFunction() {
   // Function implementation
 }

 or something else?

 Best,
  Yun


--
From:jacopo.gobbi mailto:jacopo.go...@ubs.com>>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user mailto:user@flink.apache.org>>
Subject:Flink's Either type information

Hi all,

How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on 
Either type as it does not contain information about the 'left' type." when 
doing: out.collect(Either.Right(myObject));

Thanks,

Jacopo Gobbi



Visit our website at http://www.ubs.com

This message contains confidential information and is intended only 
for the individual named.  If you are not the named addressee you 
should not disseminate, distribute or copy this e-mail.  Please 
notify the sender immediately by e-mail if you have received this 
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or 
error-free as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses.  The sender 
therefore does not accept liability for any errors or omissions in the 
contents of this message which arise as a result of e-mail transmission.  
If verification is required please request a hard-copy version.  This 
message is provided for informational purposes and should not be 
construed as a solicitation or offer to buy or sell any securities 
or related financial instruments.

UBS reserves the right to retain all messages. Messages are protected
and accessed only in legally justified cases.

For information on how UBS processes and keeps secure your personal
data, please visit our Privacy 

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

2020-03-04 Thread Castro, Fernando C.
Thank you guys. So I have no idea of why data is not being pushed to 
Elasticsearch… ☹

My complete code is at 
https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
Btw, for some reason I still need to pass .documentType to the Elasticsearch 
connection descriptor (getting it from 
org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t 
do types anymore.

In case you can’t access stackoverflow for some reason, here is the code below 
too:
/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, 
FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
private var formatter = null

@throws[Exception]
override def open(parameters: Configuration): Unit = {
  super.open(parameters)
  //formatter = DateTimeFormat.forPattern("-MM-dd HH:mm:ss")
}

@throws[Exception]
override def map(csvLine: String): Transfers = {
  //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
  var splitCsv = csvLine.stripLineEnd.split(",")

  val arrLength = splitCsv.length
  val i = 0
  if (arrLength != 13) {
for (i <- arrLength + 1 to 13) {
  if (i == 13) {
splitCsv = splitCsv :+ "0.0"
  } else {
splitCsv = splitCsv :+ ""
  }
}
  }
  var trans = new Transfers()
  trans.rowId = splitCsv(0)
  trans.subjectId = splitCsv(1)
  trans.hadmId = splitCsv(2)
  trans.icuStayId = splitCsv(3)
  trans.dbSource = splitCsv(4)
  trans.eventType = splitCsv(5)
  trans.prev_careUnit = splitCsv(6)
  trans.curr_careUnit = splitCsv(7)
  trans.prev_wardId = splitCsv(8)
  trans.curr_wardId = splitCsv(9)
  trans.inTime = splitCsv(10)
  trans.outTime = splitCsv(11)
  trans.los = splitCsv(12).toDouble

  return trans
}
  }

  def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// Set properties per KafkaConsumer API
val properties = new Properties()
properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
properties.setProperty("group.id", "test")

// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new 
SimpleStringSchema(), properties)
// Read from beginning of topic
myKConsumer.setStartFromEarliest()

val streamSource = env
  .addSource(myKConsumer)

// Transform CSV into a Transfers object
val streamTransfers = streamSource.map(new TransfersMapper())

// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)

// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
tEnv.createTemporaryView("transfers", tblTransfers)

tEnv.connect(
  new Elasticsearch()
.version("7")

.host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local",
 9200, "http")   // required: one or more Elasticsearch hosts to connect to
.index("transfers-sum")
.documentType("_doc") // not sure why this is still needed for ES7
.keyNullLiteral("n/a")
)
  .withFormat(new Json().jsonSchema("{type: 'object', properties: 
{curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
  .withSchema(new Schema()
.field("curr_careUnit", DataTypes.STRING())
.field("sum", DataTypes.DOUBLE())
  )
  .inUpsertMode()
  .createTemporaryTable("transfersSum")

val result = tEnv.sqlQuery(
  """
|SELECT curr_careUnit, sum(los)
|FROM transfers
|GROUP BY curr_careUnit
|""".stripMargin)

result.insertInto("transfersSum")

tEnv.toRetractStream[Row](result).print() //Just to see if something is 
actually happening (and it is)

env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}


Thank you,
Fernando


From: Jark Wu 
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith 
Cc: "Castro, Fernando C. [US-US]" , 
"user@flink.apache.org" 
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or 

Re: CREATE TABLE with Schema derived from format

2020-03-04 Thread Gyula Fóra
Hi!

Initially we were looking at 2) but 1) would be the best solution. I think
both are would be very valuable.

My only concern related to using the Schema Registry as a Catalog is the
interaction with other Catalogs in the system. Maybe you are using a Hive
catalog to track a bunch of tables, and now you would have to switch to the
Schema Registry.
Maybe in this case it would be good to be able to import tables from one
catalog to another.

Gyula


On Wed, Mar 4, 2020 at 2:24 PM Jark Wu  wrote:

> Yes. From my perspective, deriving schema from schema registry is the most
> important use case of FLINK-16420.
>
> Some initial idea about this:
> 1) introduce a SchemaRegisteryCatalog to allow users run queries on
> existing topics without manual table definition. see FLINK-12256
> 2) provide a connector property for schema registery url to derive schema
> from it, and the CREATE TABLE statement can leave out schema part, e.g.
>
> CREATE TABLE user_behavior WITH ("connector"="kafka",
> "topic"="user_behavior", "schema.registery.url"="localhost:8081")
>
> Which way are you looking for?
>
> Best,
> Jark
>
> On Wed, 4 Mar 2020 at 19:09, Gyula Fóra  wrote:
>
>> Hi Jark,
>>
>> Thank you for the clarification this is exactly what I was looking for,
>> especially for the second part regarding schema registry integration.
>>
>> This question came up as we were investigating how the schema registry
>> integration should look like :)
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Mar 4, 2020 at 12:06 PM Jark Wu  wrote:
>>
>>> Hi Gyula,
>>>
>>> That's a good point and is on the roadmap.
>>>
>>> In 1.10, JSON and CSV format can derive format schema from table schema.
>>> So you don't need to specify format schema in properties anymore if you are
>>> using 1.10.
>>>
>>> On the contrary, we are planning to derive table schema from format
>>> schema if it is specified, e.g. "format.fields", "format.avro-file-path".
>>> Furthermore, table schema can be inferenced if there is a schema
>>> registry or even read some data and infer it.
>>> I created FLINK-16420 to track this effort. But not sure we have enough
>>> time to support it before 1.11.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]: https://issues.apache.org/jira/browse/FLINK-16420
>>>
>>>
>>> On Wed, 4 Mar 2020 at 18:21, Gyula Fóra  wrote:
>>>
 Hi All!

 I am wondering if it would be possible to change the CREATE TABLE
 statement so that it would also work without specifying any columns.

 The format generally defines the available columns so maybe we could
 simply use them as is if we want.

 This would be very helpful when exploring different data sources.

 Let me know what you think!
 Gyula

>>>


Re: CREATE TABLE with Schema derived from format

2020-03-04 Thread Jark Wu
Yes. From my perspective, deriving schema from schema registry is the most
important use case of FLINK-16420.

Some initial idea about this:
1) introduce a SchemaRegisteryCatalog to allow users run queries on
existing topics without manual table definition. see FLINK-12256
2) provide a connector property for schema registery url to derive schema
from it, and the CREATE TABLE statement can leave out schema part, e.g.

CREATE TABLE user_behavior WITH ("connector"="kafka",
"topic"="user_behavior", "schema.registery.url"="localhost:8081")

Which way are you looking for?

Best,
Jark

On Wed, 4 Mar 2020 at 19:09, Gyula Fóra  wrote:

> Hi Jark,
>
> Thank you for the clarification this is exactly what I was looking for,
> especially for the second part regarding schema registry integration.
>
> This question came up as we were investigating how the schema registry
> integration should look like :)
>
> Cheers,
> Gyula
>
> On Wed, Mar 4, 2020 at 12:06 PM Jark Wu  wrote:
>
>> Hi Gyula,
>>
>> That's a good point and is on the roadmap.
>>
>> In 1.10, JSON and CSV format can derive format schema from table schema.
>> So you don't need to specify format schema in properties anymore if you are
>> using 1.10.
>>
>> On the contrary, we are planning to derive table schema from format
>> schema if it is specified, e.g. "format.fields", "format.avro-file-path".
>> Furthermore, table schema can be inferenced if there is a schema registry
>> or even read some data and infer it.
>> I created FLINK-16420 to track this effort. But not sure we have enough
>> time to support it before 1.11.
>>
>> Best,
>> Jark
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-16420
>>
>>
>> On Wed, 4 Mar 2020 at 18:21, Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> I am wondering if it would be possible to change the CREATE TABLE
>>> statement so that it would also work without specifying any columns.
>>>
>>> The format generally defines the available columns so maybe we could
>>> simply use them as is if we want.
>>>
>>> This would be very helpful when exploring different data sources.
>>>
>>> Let me know what you think!
>>> Gyula
>>>
>>


History server UI not working

2020-03-04 Thread pwestermann
We recently upgraded from Flink 1.7 to Flink 1.9.2 and the history server UI
now seems to be broken. It doesn't load and always just displays a blank
screen. 
The individual endpoints (e.g. /jobs/overview) still work.
Could this be an issue caused by the Angular update for the regular UI?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Serialization as stable (kafka) output format?

2020-03-04 Thread Theo Diefenthal
Hi, 

Without knowing too much about flink serialization, I know that Flinks states 
that it serializes POJOtypes much faster than even the fast Kryo for Java. I 
further know that it supports schema evolution in the same way as avro. 

In our project, we have a star architecture, where one flink job produces 
results into a kafka topic and where we have multiple downstream consumers from 
that kafka topic (Mostly other flink jobs). 
For fast development cycles, we currently use JSON as output format for the 
kafka topic due to easy debugging capabilities and best migration 
possibilities. However, when scaling up, we need to switch to a more efficient 
format. Most often, Avro is mentioned in combination with a schema registry, as 
its much more efficient then JSON where essentially, each message contains the 
schema as well. However, in most benchmarks, avro turns out to be rather slow 
in terms of CPU cycles ( e.g. [ https://github.com/eishay/jvm-serializers/wiki 
| [1] ] ) 

My question(s) now: 
1. Is it reasonable to use flink serializers as message format in Kafka? 
2. Are there any downsides in using flinks serialization result as output 
format to kafka? 
3. Can downstream consumers, written in Java, but not flink components, also 
easily deserialize flink serialized POJOs? Or do they have a dependency to at 
least full flink-core? 
4. Do you have benchmarks comparing flink (de-)serialization performance to 
e.g. kryo and avro? 

The only thing I come up with why I wouldn't use flink serialization is that we 
wouldn't have a schema registry, but in our case, we share all our POJOs in a 
jar which is used by all components, so that is kind of a schema registry 
already and if we only make avro compatible changes, which are also well 
treated by flink, that shouldn't be any limitation compared to like 
avro+registry? 

Best regards 
Theo 

[1] [ https://github.com/eishay/jvm-serializers/wiki | 
https://github.com/eishay/jvm-serializers/wiki ] 


Re: CREATE TABLE with Schema derived from format

2020-03-04 Thread Gyula Fóra
Hi Jark,

Thank you for the clarification this is exactly what I was looking for,
especially for the second part regarding schema registry integration.

This question came up as we were investigating how the schema registry
integration should look like :)

Cheers,
Gyula

On Wed, Mar 4, 2020 at 12:06 PM Jark Wu  wrote:

> Hi Gyula,
>
> That's a good point and is on the roadmap.
>
> In 1.10, JSON and CSV format can derive format schema from table schema.
> So you don't need to specify format schema in properties anymore if you are
> using 1.10.
>
> On the contrary, we are planning to derive table schema from format schema
> if it is specified, e.g. "format.fields", "format.avro-file-path".
> Furthermore, table schema can be inferenced if there is a schema registry
> or even read some data and infer it.
> I created FLINK-16420 to track this effort. But not sure we have enough
> time to support it before 1.11.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/FLINK-16420
>
>
> On Wed, 4 Mar 2020 at 18:21, Gyula Fóra  wrote:
>
>> Hi All!
>>
>> I am wondering if it would be possible to change the CREATE TABLE
>> statement so that it would also work without specifying any columns.
>>
>> The format generally defines the available columns so maybe we could
>> simply use them as is if we want.
>>
>> This would be very helpful when exploring different data sources.
>>
>> Let me know what you think!
>> Gyula
>>
>


Re: CREATE TABLE with Schema derived from format

2020-03-04 Thread Jark Wu
Hi Gyula,

That's a good point and is on the roadmap.

In 1.10, JSON and CSV format can derive format schema from table schema. So
you don't need to specify format schema in properties anymore if you are
using 1.10.

On the contrary, we are planning to derive table schema from format schema
if it is specified, e.g. "format.fields", "format.avro-file-path".
Furthermore, table schema can be inferenced if there is a schema registry
or even read some data and infer it.
I created FLINK-16420 to track this effort. But not sure we have enough
time to support it before 1.11.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16420


On Wed, 4 Mar 2020 at 18:21, Gyula Fóra  wrote:

> Hi All!
>
> I am wondering if it would be possible to change the CREATE TABLE
> statement so that it would also work without specifying any columns.
>
> The format generally defines the available columns so maybe we could
> simply use them as is if we want.
>
> This would be very helpful when exploring different data sources.
>
> Let me know what you think!
> Gyula
>


Re: Very large _metadata file

2020-03-04 Thread Tzu-Li (Gordon) Tai
Hi Jacob,

Apart from what Klou already mentioned, one slightly possible reason:

If you are using the FsStateBackend, it is also possible that your state is
small enough to be considered to be stored inline within the metadata file.
That is governed by the "state.backend.fs.memory-threshold" configuration,
with a default value of 1024 bytes, or can also be configured with the
`fileStateSizeThreshold` argument when constructing the `FsStateBackend`.
The purpose of that threshold is to ensure that the backend does not create
a large amount of very small files, where potentially the file pointers are
actually larger than the state itself.

Cheers,
Gordon



On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas  wrote:

> Hi Jacob,
>
> Could you specify which StateBackend you are using?
>
> The reason I am asking is that, from the documentation in [1]:
>
> "Note that if you use the MemoryStateBackend, metadata and savepoint
> state will be stored in the _metadata file. Since it is
> self-contained, you may move the file and restore from any location."
>
> I am also cc'ing Gordon who may know a bit more about state formats.
>
> I hope this helps,
> Kostas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
>
> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
> >
> > Per the documentation:
> >
> > "The meta data file of a Savepoint contains (primarily) pointers to all
> files on stable storage that are part of the Savepoint, in form of absolute
> paths."
> >
> > I somehow have a _metadata file that's 1.9GB. Running strings on it I
> find 962 strings, most of which look like HDFS paths, which leaves a lot of
> that file-size unexplained. What else is in there, and how exactly could
> this be happening?
> >
> > We're running 1.6.
> >
> > Jacob
>


回复: Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

2020-03-04 Thread 王智
我的需求是2,现在我使用的是execEnv.createInput(inputFormat()),

我先去试试 env.addSource(new InputFormatSourceFunction(..)...)。

多谢~








原始邮件


发件人:"JingsongLee"< lzljs3620...@aliyun.com.INVALID ;

发件时间:2020/3/4 17:40

收件人:"user-zh"< user-zh@flink.apache.org ;

主题: Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 
不兼容问题咨询



Hi, 你的需求是什么?下列哪种? - 1.想用unbounded source,continuous的file 
source,监控文件夹,发送新文件,且需要支持多文件夹 - 2.只是想用bounded的input format,需要支持多文件 如果是1,现在仍然不支持。 
如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。 Best, 
Jingsong Lee -- 
From:王智Send Time:2020年3月4日(星期三) 17:34 To:user-zhSubject:flink 1.8 
内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询 我在使用flink 
1.8 自定义 FileInputFormat 
的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~ 问题1: 
StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 的作用是什么?  
相关的代码描述如下 StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑 if 
(inputFormat instanceof FileInputFormat) {   
@SuppressWarnings("unchecked")   FileInputFormat

Re: Question on the Kafka connector parameter "connector.properties.zookeeper.connect"

2020-03-04 Thread Jark Wu
Hi Weike,

You are right. It is not needed since Kafka 0.9+. We already have an issue
to make it optional.
See https://issues.apache.org/jira/browse/FLINK-16125. We are planning to
fix it in 1.10.1 too.

Best,
Jark

On Wed, 4 Mar 2020 at 18:23, Weike Dong  wrote:

> Hi,
>
>
>
> Recently I have found that in the Flink Kafka Connector, the parameter
> "connector.properties.zookeeper.connect" is made mandatory for users.
> Therefore without it, Flink would throw an exception saying “Caused by:
> org.apache.flink.table.api.ValidationException: Could not find required
> property 'connector.properties.zookeeper.connect'”.
>
>
>
> However, as far as I know, for Kafka versions that are currently supported
> by the universal connector, this setting is not mandatory anymore. Here I
> would like know if this parameter could be changed to optional, and what
> are the negative impacts if doing so?
>
>
>
>
>
> Thanks : )
>
>
>
> Sincerely,
>
> Weike
>


Question on the Kafka connector parameter "connector.properties.zookeeper.connect"

2020-03-04 Thread Weike Dong
Hi,

 

Recently I have found that in the Flink Kafka Connector, the parameter
"connector.properties.zookeeper.connect" is made mandatory for users.
Therefore without it, Flink would throw an exception saying "Caused by:
org.apache.flink.table.api.ValidationException: Could not find required
property 'connector.properties.zookeeper.connect'".

 

However, as far as I know, for Kafka versions that are currently supported
by the universal connector, this setting is not mandatory anymore. Here I
would like know if this parameter could be changed to optional, and what are
the negative impacts if doing so?

 



 

Thanks : )

 

Sincerely,

Weike



CREATE TABLE with Schema derived from format

2020-03-04 Thread Gyula Fóra
Hi All!

I am wondering if it would be possible to change the CREATE TABLE statement
so that it would also work without specifying any columns.

The format generally defines the available columns so maybe we could simply
use them as is if we want.

This would be very helpful when exploring different data sources.

Let me know what you think!
Gyula


Re: Very large _metadata file

2020-03-04 Thread Kostas Kloudas
Hi Jacob,

Could you specify which StateBackend you are using?

The reason I am asking is that, from the documentation in [1]:

"Note that if you use the MemoryStateBackend, metadata and savepoint
state will be stored in the _metadata file. Since it is
self-contained, you may move the file and restore from any location."

I am also cc'ing Gordon who may know a bit more about state formats.

I hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html

On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
>
> Per the documentation:
>
> "The meta data file of a Savepoint contains (primarily) pointers to all files 
> on stable storage that are part of the Savepoint, in form of absolute paths."
>
> I somehow have a _metadata file that's 1.9GB. Running strings on it I find 
> 962 strings, most of which look like HDFS paths, which leaves a lot of that 
> file-size unexplained. What else is in there, and how exactly could this be 
> happening?
>
> We're running 1.6.
>
> Jacob


Re: StreamingFileSink Not Flushing All Data

2020-03-04 Thread Kostas Kloudas
Hi Austin,

I will have a look at your repo. In the meantime, given that [1] is already
merged in 1.10,
would upgrading to 1.10 and using the newly introduced
CompressWriterFactory be an option for you?

It is unfortunate that this feature was not documented.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13634


On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi all,
>
> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas
> -- strange though, as I wasn't using a bounded source when I first ran into
> this issue. I have updated the example repo to use an unbounded source[1],
> and the same file corruption problems remain.
>
> Anything else I could be doing wrong with the compression stream?
>
> Thanks again,
> Austin
>
> [1]:
> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>
> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas  wrote:
>
>> Hi Austin and Rafi,
>>
>> @Rafi Thanks for providing the pointers!
>> Unfortunately there is no progress on the FLIP (or the issue).
>>
>> @ Austin In the meantime, what you could do --assuming that your input is
>> bounded --  you could simply not stop the job after the whole input is
>> processed, then wait until the output is committed, and then cancel the
>> job. I know and I agree that this is not an elegant solution but it is a
>> temporary workaround.
>>
>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:
>>
>>> Hi,
>>>
>>> This happens because StreamingFileSink does not support a finite input
>>> stream.
>>> In the docs it's mentioned under "Important Considerations":
>>>
>>> [image: image.png]
>>>
>>> This behaviour often surprises users...
>>>
>>> There's a FLIP
>>> 
>>>  and
>>> an issue  about
>>> fixing this. I'm not sure what's the status though, maybe Kostas can share.
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hi Dawid and Kostas,

 Sorry for the late reply + thank you for the troubleshooting. I put
 together an example repo that reproduces the issue[1], because I did have
 checkpointing enabled in my previous case -- still must be doing something
 wrong with that config though.

 Thanks!
 Austin

 [1]: https://github.com/austince/flink-streaming-file-sink-compression


 On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
 wrote:

> Hi Austin,
>
> Dawid is correct in that you need to enable checkpointing for the
> StreamingFileSink to work.
>
> I hope this solves the problem,
> Kostas
>
> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>  wrote:
> >
> > Hi Austing,
> >
> > If I am not mistaken the StreamingFileSink by default flushes on
> checkpoints. If you don't have checkpoints enabled it might happen that 
> not
> all data is flushed.
> >
> > I think you can also adjust that behavior with:
> >
> > forBulkFormat(...)
> >
> > .withRollingPolicy(/* your custom logic */)
> >
> > I also cc Kostas who should be able to correct me if I am wrong.
> >
> > Best,
> >
> > Dawid
> >
> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
> >
> > Hi there,
> >
> > Using Flink 1.9.1, trying to write .tgz files with the
> StreamingFileSink#BulkWriter. It seems like flushing the output stream
> doesn't flush all the data written. I've verified I can create valid files
> using the same APIs and data on there own, so thinking it must be 
> something
> I'm doing wrong with the bulk format. I'm writing to the local filesystem,
> with the `file://` protocol.
> >
> > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
> version 1.20.
> >
> > Here's a runnable example of the issue:
> >
> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> > import
> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> > import
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> > import org.apache.flink.api.common.serialization.BulkWriter;
> > import org.apache.flink.core.fs.FSDataOutputStream;
> > import org.apache.flink.core.fs.Path;
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> >
> > import java.io.FileOutputStream;
> > import java.io.IOException;
> > import java.io.Serializable;
> > import java.nio.charset.StandardCharsets;
> >

Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

2020-03-04 Thread JingsongLee
Hi,

你的需求是什么?下列哪种?
- 1.想用unbounded source,continuous的file source,监控文件夹,发送新文件,且需要支持多文件夹
- 2.只是想用bounded的input format,需要支持多文件

如果是1,现在仍然不支持。
如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。

Best,
Jingsong Lee


--
From:王智 
Send Time:2020年3月4日(星期三) 17:34
To:user-zh 
Subject:flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 
不兼容问题咨询

我在使用flink 1.8 自定义 FileInputFormat 
的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~




问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 
的作用是什么? 

相关的代码描述如下




StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
if (inputFormat instanceof FileInputFormat) {
   @SuppressWarnings("unchecked")
   FileInputFormat

[DISCUSS] FLIP-111: Docker image unification

2020-03-04 Thread Andrey Zagrebin
Hi All,

If you have ever touched the docker topic in Flink, you
probably noticed that we have multiple places in docs and repos which
address its various concerns.

We have prepared a FLIP [1] to simplify the perception of docker topic in
Flink by users. It mostly advocates for an approach of extending official
Flink image from the docker hub. For convenience, it can come with a set of
bash utilities and documented examples of their usage. The utilities allow
to:

   - run the docker image in various modes (single job, session master,
   task manager etc)
   - customise the extending Dockerfile
   - and its entry point

Eventually, the FLIP suggests to remove all other user facing Dockerfiles
and building scripts from Flink repo, move all docker docs to
apache/flink-docker and adjust existing docker use cases to refer to this
new approach (mostly Kubernetes now).

The first contributed version of Flink docker integration also contained
example and docs for the integration with Bluemix in IBM cloud. We also
suggest to maintain it outside of Flink repository (cc Markus Müller).

Thanks,
Andrey

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-111%3A+Docker+image+unification


flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

2020-03-04 Thread 王智
我在使用flink 1.8 自定义 FileInputFormat 
的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~




问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 
的作用是什么? 

相关的代码描述如下




StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
if (inputFormat instanceof FileInputFormat) {
   @SuppressWarnings("unchecked")
   FileInputFormat