Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Till Rohrmann
Congrats, Dian!

Cheers,
Till

On Fri, Aug 28, 2020 at 8:33 AM Wei Zhong  wrote:

> Congratulations Dian!
>
> > 在 2020年8月28日,14:29,Jingsong Li  写道:
> >
> > Congratulations , Dian!
> >
> > Best, Jingsong
> >
> > On Fri, Aug 28, 2020 at 11:06 AM Walter Peng  > wrote:
> > congrats!
> >
> > Yun Tang wrote:
> > > Congratulations , Dian!
> >
> >
> > --
> > Best, Jingsong Lee
>
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Wei Zhong
Congratulations Dian!

> 在 2020年8月28日,14:29,Jingsong Li  写道:
> 
> Congratulations , Dian!
> 
> Best, Jingsong
> 
> On Fri, Aug 28, 2020 at 11:06 AM Walter Peng  > wrote:
> congrats!
> 
> Yun Tang wrote:
> > Congratulations , Dian!
> 
> 
> -- 
> Best, Jingsong Lee



Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Jingsong Li
Congratulations , Dian!

Best, Jingsong

On Fri, Aug 28, 2020 at 11:06 AM Walter Peng  wrote:

> congrats!
>
> Yun Tang wrote:
> > Congratulations , Dian!
>


-- 
Best, Jingsong Lee


Re: Not able to Assign Watermark in Flink 1.11

2020-08-27 Thread Khachatryan Roman
Hi Anuj Jain,

You need to provide the type parameter when calling
WatermarkStrategy.forBoundedOutOfOrderness like this:

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))

Regards,
Roman


On Fri, Aug 28, 2020 at 6:49 AM aj  wrote:

>
> I am getting this error when trying to assign watermark in Flink  1.11
>
> *"Cannot resolve method 'withTimestampAssigner(anonymous
> org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)'"*
>
> FlinkKafkaConsumer bookingFlowConsumer = new 
> FlinkKafkaConsumer(topics,
> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
> .withTimestampAssigner(new SerializableTimestampAssigner() 
> {
>   @Override
>   public long extractTimestamp(GenericRecord genericRecord, long l) {
> return (long)genericRecord.get("event_ts");
>   }
> }));
>
>
> What is wrong with this.
>
> In Flink 1.9 I was using this function and it was working fine
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks {
>
>   @Override
>   public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> // LOGGER.info("timestamp", timestamp);
> return timestamp;
>   }
>
>   @Override
>   public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> // LOGGER.info("extractedTimestamp ", extractedTimestamp);
> return new Watermark(extractedTimestamp);
>   }
> }
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


user@flink.apache.org

2020-08-27 Thread Danny Chan
Hi, Sofya T. Irwin ~

Can you share your case why you need a timed-window join there ?

Now the sql timed window join is not supported yet, and i want to hear your
voice if it is necessary to support in SQL.


Sofya T. Irwin  于2020年7月30日周四 下午10:44写道:

> Hi,
> I'm trying to investigate a SQL job using a time-windowed join that is
> exhibiting a large, growing state. The join syntax is most similar to
> the interval join (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
> ).
>
> A few questions:
> 1. Am I correct in understanding that State TTL is generally not
> applicable for TableAPI&SQL? So we cannot use State TTL to limit state size
> for a join?
>
> 2. It seems that Flink should be able to expire state even without
> explicit settings based on this: "In TableAPI&SQL and DataStream, the
> window aggregation and time-windowed join will clear expired state using
> Timers which is triggered by watermark."  (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
> )
>
> To clarify: Does the above mean that Flink is expected to detect expired
> state and clear it without explicit configuration to allow it to do so?
>
> 3. I've looked into setting the idle state retention time. From what I can
> understand, this particular setting is appropriate for my use case.
> "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level
> configuration which will enable state ttl for all non-time-based operator
> states." (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
> )
>
> To clarify: Would enabling this setting control state growth? Is this only
> available for blink planner? Currently we are using the StreamPlanner. Is
> there any way to ensure that idle state has limited retention for
> applications using the StreamPlanner?
>
> Thanks ahead,
> Sofya
>


Re: Different deserialization schemas for Kafka keys and values

2020-08-27 Thread Manas Kale
Hi Robert,
Thanks for the info!

On Thu, Aug 27, 2020 at 8:01 PM Robert Metzger  wrote:

> Hi,
>
> Check out the KafkaDeserializationSchema (
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-deserializationschema)
> which allows you to deserialize the key and value bytes coming from Kafka.
>
> Best,
> Robert
>
>
> On Thu, Aug 27, 2020 at 1:56 PM Manas Kale  wrote:
>
>> Hi,
>> I have a kafka topic on which the key is serialized in a custom format
>> and the value is serialized as JSON. How do I create a FlinkKafakConsumer
>> that has different deserialization schemas for the key and value? Here's
>> what I tried:
>>
>> FlinkKafkaConsumer> advancedFeatureData = new 
>> FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new 
>> TypeInformationKeyValueSerializationSchema(
>> TypeInformation.of(new TypeHint() {}),
>> TypeInformation.of(new TypeHint() {}),
>> env.getConfig()
>> ), properties);
>>
>> However, I get the error:
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID: 121
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>> at
>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
>> at
>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>
>> Is there something I am missing with my approach or am I supposed to use
>> a completely different class than
>> TypeInformationKeyValueSerializationSchema?
>>
>


Not able to Assign Watermark in Flink 1.11

2020-08-27 Thread aj
I am getting this error when trying to assign watermark in Flink  1.11

*"Cannot resolve method 'withTimestampAssigner(anonymous
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)'"*

FlinkKafkaConsumer bookingFlowConsumer = new
FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
.withTimestampAssigner(new SerializableTimestampAssigner() {
  @Override
  public long extractTimestamp(GenericRecord genericRecord, long l) {
return (long)genericRecord.get("event_ts");
  }
}));


What is wrong with this.

In Flink 1.9 I was using this function and it was working fine

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks {

  @Override
  public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
// LOGGER.info("timestamp", timestamp);
return timestamp;
  }

  @Override
  public Watermark checkAndGetNextWatermark(GenericRecord record, long
extractedTimestamp) {
// simply emit a watermark with every event
// LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
  }
}


-- 
Thanks & Regards,
Anuj Jain






Re: Performance issue associated with managed RocksDB memory

2020-08-27 Thread Yun Tang
Hi Juha

Thanks for your enthusiasm to dig this problem and sorry for jumping in late 
for this thread to share something about write buffer manager in RocksDB.

First of all, the reason why you meet the poor performance is due to writer 
buffer manager has been assigned a much lower limit (due to poor managed memory 
size on that slot) than actual needed. The competition of allocating memory 
between different column families lead RocksDB to switch active memtable to 
immutable memtable in advance, which leads to the poor performance as this 
increase the write amplification.

To keep the memory not exceed the limit, write buffer manager would decide 
whether to flush the memtable in advance, which is the statement you found: 
mutable_memtable_memory_usage() > mutable_limit_ [1] and the memory usage 
includes allocated but not even used arean_block.
When talking about the arena, memory allocator in RocksDB, I need to correct 
one thing in your thread: the block cache would not allocate any memory, all 
memory is allocated from arena.

The core idea of RocksDB how to limit memory usage: arena allocates memory, 
write buffer manager decide when to switch memtable to control the active 
memory usage, and write buffer manager also accounts its allocated memory into 
the cache. The underlying block cache evict memory with accounting from write 
buffer manager and the cached block, filter & index.

By default, arena_block_size is not configured, and it would be 1/8 of write 
buffer size [2]. And the default write buffer size is 64MB, that's why you 
could find "Options.arena_block_size: 8388608" in your logs.
As you can see, RocksDB think it could use 64MB write buffer by default. 
However, Flink needs to control the total memory usage and has to configure 
write buffer manager based on the managed memory. From your logs "Write buffer 
is using 16789472 bytes out of a total of 17895697", I believe the managed 
memory of that slot (managed memory size / num of slots in one TM) is quite 
poor. If we have 1 slot with 1GB for task manager, the managed memory should be 
near 300MB which is fine for default RocksDB configuration. However, you just 
have about 90MB for the managed memory over that slot. When you enable managed 
memory on RocksDB, it would try its best to limit the total memory of all 
rocksDB instances within one slot under 90MB. Once you disable the managed 
memory control over rocksDB, each RocksDB instance could use about 
64*2+8=136MB, since you have two operators here, they could use more than 
200MB+ in one slot.

There existed several solutions to mitigate this regression:

  1.  Increase the overall managed memory size for one slot.
  2.  Increase the write buffer ratio
  3.  Set the arean_block_size explicitly instead of default 8MB to avoid 
unwanted flush in advance:

  e.g:   new ColumnFamilyOptions().setArenaBlockSize(2 * 1024 * 1024L);

[1] 
https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47
[2] 
https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196

Best
Yun Tang


From: Juha Mynttinen 
Sent: Monday, August 24, 2020 15:56
To: user@flink.apache.org 
Subject: Re: Performance issue associated with managed RocksDB memory

The issue can be reproduced by using a certain combinations of the value of
RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job
parallelism.

Examples that break:
* Parallelism 1 and WRITE_BUFFER_RATIO 0.1
* Parallelism 5 and the default WRITE_BUFFER_RATIO 0.5

Examples that work:
* Parallelism 1 and WRITE_BUFFER_RATIO 0.5, duration 34164 ms

In a working case (parallelism 1, WRITE_BUFFER_RATIO 0.2) RocksDB log looks
like this (right after the uninteresting bootup messages):

2020/08/21-12:55:41.693771 7f56e6643700 [db/db_impl.cc:1546] Created column
family [valueState] (ID 1)
2020/08/21-12:55:42.213743 7f56e6643700 [db/db_impl_write.cc:1103] Flushing
column family with largest mem table size. Write buffer is using 16789472
bytes out of a total of 17895697.
2020/08/21-12:55:42.213799 7f56e6643700 [db/db_impl_write.cc:1423]
[valueState] New memtable created with log file: #3. Immutable memtables: 0.
2020/08/21-12:55:42.213924 7f56deffd700 (Original Log Time
2020/08/21-12:55:42.213882) [db/db_impl_compaction_flush.cc:1560] Calling
FlushMemTableToOutputFile with column family [valueState], flush slots
available 1, compaction slots available 1, flush slots scheduled 1,
compaction slots scheduled 0
2020/08/21-12:55:42.213927 7f56deffd700 [db/flush_job.cc:304] [valueState]
[JOB 2] Flushing memtable with next log file: 3
2020/08/21-12:55:42.213969 7f56deffd700 EVENT_LOG_v1 {"time_micros":
1598003742213958, "job": 2, "event": "flush_started", "num_memtables": 1,
"num_entries": 170995, "num_deletes": 0, "memory_usage": 8399008,
"flush_reason": "Write Buffer Full"}
2020/08/21-12:55:42.213973 7f56deffd70

Re: Debezium Flink EMR

2020-08-27 Thread Jark Wu
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1.


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley  wrote:

> Hi again!
>
> I'm tested out locally in docker on Flink 1.11 first to get my bearings
> before downgrading to 1.10 and figuring out how to replace the Debezium
> connector. However, I'm getting the following error
> ```
> Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
> [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
> ```
>
> Any suggestions for me to fix this?
>
> code:
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val blinkStreamSettings =
> EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build()
> val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)
>
> // Table from Debezium mysql example docker:
> //
> +-+-+--+-+-++
> // | Field | Type | Null | Key | Default | Extra |
> //
> +-+-+--+-+-++
> // | id | int(11) | NO | PRI | NULL | auto_increment |
> // | customer_id | int(11) | NO | MUL | NULL | |
> // | street | varchar(255) | NO | | NULL | |
> // | city | varchar(255) | NO | | NULL | |
> // | state | varchar(255) | NO | | NULL | |
> // | zip | varchar(255) | NO | | NULL | |
> // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
> //
> +-+-+--+-+-++
>
> tableEnv.executeSql("""
> CREATE TABLE topic_addresses (
> -- schema is totally the same to the MySQL "addresses" table
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dbserver1.inventory.addresses',
> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'debezium-json' -- using debezium-json as the format
> )
> """)
>
> val table = tableEnv.from("topic_addresses").select($"*")
>
> // Defining a PK automatically puts it in Upsert mode, which we want.
> // TODO: type should be a keyword, is that acceptable by the DDL?
> tableEnv.executeSql("""
> CREATE TABLE ESAddresses (
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
> 'index' = 'flinkaddresses',
> 'format' = 'json'
> )
> """)
>
> table.executeInsert("ESAddresses").print()
>
> Thanks!
>
> On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley  wrote:
>
>> Thanks!
>>
>> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu  wrote:
>>
>>> Hi,
>>>
>>> Regarding the performance difference, the proposed way will have one
>>> more stateful operator (deduplication) than the native 1.11 cdc support.
>>> The overhead of the deduplication operator is just similar to a simple
>>> group by aggregate (max on each non-key column).
>>>
>>> Best,
>>> Jark
>>>
>>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley  wrote:
>>>
 Thank you so much for the help!

 On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira 
 wrote:

> Yes — you'll get the full row in the payload; and you can also access
> the change operation, which might be useful in your case.
>
> About performance, I'm summoning Kurt and @Jark Wu  to
> the thread, who will be able to give you a more complete answer and likely
> also some optimization tips for your specific use case.
>
> Marta
>
> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley  wrote:
>
>> Yup! This definitely helps and makes sense.
>>
>> The 'after' payload comes with all data from the row right? So
>> essentially inserts and updates I can insert/replace data by pk and null
>> values I just delete by pk, and then I can build out the rest of my joins
>> like normal.
>>
>> Are there any performance implications of doing it this way that is
>> different from the out-of-the-box 1.11 solution?
>>
>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
>> ma...@ververica.com> wrote:
>>
>>> Hi, Rex.
>>>
>>> Part of what enabled CDC support in Flink 1.11 was the refactoring
>>> of the table source interfaces (FLIP-95 [1]), and the new 
>>> ScanTableSource
>>> [2], which allows to emit bounded/unbounded streams with insert, update 
>>> and
>>> delete rows.
>>>
>>> In theory, you could consume data generated with Debezium as regular
>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient 
>>> way
>>> to really treat it as "changelog". As a workaround, what you can do in
>>> Flink 1.10 is process these messages as JSON and extract the "after" 
>>> field
>>> from

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Walter Peng

congrats!

Yun Tang wrote:

Congratulations , Dian!


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Yun Tang
Congratulations , Dian!

Best
Yun Tang

From: Yang Wang 
Sent: Friday, August 28, 2020 10:28
To: Arvid Heise 
Cc: Benchao Li ; dev ; user-zh 
; Dian Fu ; user 

Subject: Re: [ANNOUNCE] New PMC member: Dian Fu

Congratulations Dian !


Best,
Yang

Arvid Heise mailto:ar...@ververica.com>> 于2020年8月28日周五 
上午1:39写道:
Congrats Dian :)

On Thu, Aug 27, 2020 at 5:01 PM Benchao Li 
mailto:libenc...@apache.org>> wrote:
Congratulations Dian!

Cranmer, Danny  于2020年8月27日周四 下午10:55写道:
Congratulations Dian! :D

On 27/08/2020, 15:25, "Robert Metzger" 
mailto:rmetz...@apache.org>> wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Congratulations Dian!

On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:

> Congratulations Dian
> Best,
> Congxian
>
>
> Xintong Song mailto:tonysong...@gmail.com>> 
于2020年8月27日周四 下午7:50写道:
>
> > Congratulations Dian~!
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu 
mailto:imj...@gmail.com>> wrote:
> >
> > > Congratulations Dian!
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu 
mailto:xbjt...@gmail.com>> wrote:
> > >
> > > > Congrats, Dian!  Well deserved.
> > > >
> > > > Best
> > > > Leonard
> > > >
> > > > > 在 2020年8月27日,19:34,Kurt Young 
mailto:ykt...@gmail.com>> 写道:
> > > > >
> > > > > Congratulations Dian!
> > > > >
> > > > > Best,
> > > > > Kurt
> > > > >
> > > > >
> > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li 
mailto:lirui.fu...@gmail.com>>
> > wrote:
> > > > >
> > > > >> Congratulations Dian!
> > > > >>
> > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei 
mailto:yuanmei.w...@gmail.com>>
> > > > wrote:
> > > > >>
> > > > >>> Congrats!
> > > > >>>
> > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang 
mailto:hxbks...@gmail.com>
> >
> > > > wrote:
> > > > >>>
> > > >  Congratulations Dian!
> > > > 
> > > >  Best,
> > > >  Xingbo
> > > > 
> > > >  jincheng sun 
mailto:sunjincheng...@gmail.com>> 于2020年8月27日周四 
下午5:24写道:
> > > > 
> > > > > Hi all,
> > > > >
> > > > > On behalf of the Flink PMC, I'm happy to announce that Dian Fu
> is
> > > now
> > > > > part of the Apache Flink Project Management Committee (PMC).
> > > > >
> > > > > Dian Fu has been very active on PyFlink component, working on
> > > various
> > > > > important features, such as the Python UDF and Pandas
> > integration,
> > > > and
> > > > > keeps checking and voting for our releases, and also has
> > > successfully
> > > > > produced two releases(1.9.3&1.11.1) as RM, currently working 
as
> > RM
> > > > to push
> > > > > forward the release of Flink 1.12.
> > > > >
> > > > > Please join me in congratulating Dian Fu for becoming a Flink
> PMC
> > > > > Member!
> > > > >
> > > > > Best,
> > > > > Jincheng(on behalf of the Flink PMC)
> > > > >
> > > > 
> > > > >>
> > > > >> --
> > > > >> Best regards!
> > > > >> Rui Li
> > > > >>
> > > >
> > > >
> > >
> >
>



--

Best,
Benchao Li


--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Yang Wang
Congratulations Dian !


Best,
Yang

Arvid Heise  于2020年8月28日周五 上午1:39写道:

> Congrats Dian :)
>
> On Thu, Aug 27, 2020 at 5:01 PM Benchao Li  wrote:
>
>> Congratulations Dian!
>>
>> Cranmer, Danny  于2020年8月27日周四 下午10:55写道:
>>
>>> Congratulations Dian! :D
>>>
>>> On 27/08/2020, 15:25, "Robert Metzger"  wrote:
>>>
>>> CAUTION: This email originated from outside of the organization. Do
>>> not click links or open attachments unless you can confirm the sender and
>>> know the content is safe.
>>>
>>>
>>>
>>> Congratulations Dian!
>>>
>>> On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu 
>>> wrote:
>>>
>>> > Congratulations Dian
>>> > Best,
>>> > Congxian
>>> >
>>> >
>>> > Xintong Song  于2020年8月27日周四 下午7:50写道:
>>> >
>>> > > Congratulations Dian~!
>>> > >
>>> > > Thank you~
>>> > >
>>> > > Xintong Song
>>> > >
>>> > >
>>> > >
>>> > > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu 
>>> wrote:
>>> > >
>>> > > > Congratulations Dian!
>>> > > >
>>> > > > Best,
>>> > > > Jark
>>> > > >
>>> > > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu 
>>> wrote:
>>> > > >
>>> > > > > Congrats, Dian!  Well deserved.
>>> > > > >
>>> > > > > Best
>>> > > > > Leonard
>>> > > > >
>>> > > > > > 在 2020年8月27日,19:34,Kurt Young  写道:
>>> > > > > >
>>> > > > > > Congratulations Dian!
>>> > > > > >
>>> > > > > > Best,
>>> > > > > > Kurt
>>> > > > > >
>>> > > > > >
>>> > > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li <
>>> lirui.fu...@gmail.com>
>>> > > wrote:
>>> > > > > >
>>> > > > > >> Congratulations Dian!
>>> > > > > >>
>>> > > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei <
>>> yuanmei.w...@gmail.com>
>>> > > > > wrote:
>>> > > > > >>
>>> > > > > >>> Congrats!
>>> > > > > >>>
>>> > > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang <
>>> hxbks...@gmail.com
>>> > >
>>> > > > > wrote:
>>> > > > > >>>
>>> > > > >  Congratulations Dian!
>>> > > > > 
>>> > > > >  Best,
>>> > > > >  Xingbo
>>> > > > > 
>>> > > > >  jincheng sun  于2020年8月27日周四
>>> 下午5:24写道:
>>> > > > > 
>>> > > > > > Hi all,
>>> > > > > >
>>> > > > > > On behalf of the Flink PMC, I'm happy to announce that
>>> Dian Fu
>>> > is
>>> > > > now
>>> > > > > > part of the Apache Flink Project Management Committee
>>> (PMC).
>>> > > > > >
>>> > > > > > Dian Fu has been very active on PyFlink component,
>>> working on
>>> > > > various
>>> > > > > > important features, such as the Python UDF and Pandas
>>> > > integration,
>>> > > > > and
>>> > > > > > keeps checking and voting for our releases, and also
>>> has
>>> > > > successfully
>>> > > > > > produced two releases(1.9.3&1.11.1) as RM, currently
>>> working as
>>> > > RM
>>> > > > > to push
>>> > > > > > forward the release of Flink 1.12.
>>> > > > > >
>>> > > > > > Please join me in congratulating Dian Fu for becoming
>>> a Flink
>>> > PMC
>>> > > > > > Member!
>>> > > > > >
>>> > > > > > Best,
>>> > > > > > Jincheng(on behalf of the Flink PMC)
>>> > > > > >
>>> > > > > 
>>> > > > > >>
>>> > > > > >> --
>>> > > > > >> Best regards!
>>> > > > > >> Rui Li
>>> > > > > >>
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: OOM error for heap state backend.

2020-08-27 Thread Congxian Qiu
Hi
   The stack said that the job failed when restoring from
checkpoint/savepoint. If encounter this when in failover, maybe you can try
to find out the root cause which caused the job failover.
   For the stack, it because when restoring `HeapPriorityQueue`, there
would ensure there are enough size by resizeQueueArray[1](use Arrays.copy),
maybe this is the problem, could you please take heap dump when exit with
OOM?

[1]
https://github.com/apache/flink/blob/5e0b7970a9aea74aba4ebffaa75c37e960799b93/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java#L151

Best,
Congxian


Robert Metzger  于2020年8月27日周四 下午10:59写道:

> Hi Vishwas,
>
> Your scenario sounds like RocksDB would actually be recommended. I would
> always suggest to start with RocksDB, unless your state is really small
> compared to the available memory, or you need to optimize for performance.
> But maybe your job is running fine with RocksDB (performance wise), then
> there's no need to go into the details of heap memory management with Flink.
>
>
>
> On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara 
> wrote:
>
>> Thanks Andrey,
>> My question is related to
>>
>> The FsStateBackend is encouraged for:
>>
>>- Jobs with large state, long windows, large key/value states.
>>- All high-availability setups.
>>
>> How large is large state without any overhead added by the framework?
>>
>> Best,
>> Vishwas
>>
>> On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>>  is this quantifiable with respect to JVM heap size on a single node
 without the node being used for other tasks ?
>>>
>>>
>>> I don't quite understand this question. I believe the recommendation in
>>> docs has the same reason: use larger state objects so that the Java object
>>> overhead pays off.
>>> RocksDB keeps state in memory and on disk in the serialized form.
>>> Therefore it usually has a smaller footprint.
>>> Other jobs in the same task manager can potentially use other state
>>> backend depending on their state requirements.
>>> All tasks in the same task manager share the JVM heap as the task
>>> manager runs one JVM system process on the machine where it is deployed to.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi Andrey,
 Thanks for getting back to me so quickly. The screenshots are for 1GB
 heap, the keys for the state are 20 character strings(20 bytes, we don't
 have multi byte characters) . So the overhead seems to be quite large(4x)
 even in comparison to the checkpoint size(which already adds an overhead) .
 In this document [1] it says use FS/Heap backend for large states, is this
 quantifiable with respect to JVM heap size on a single node without the
 node being used for other tasks ?
 I have attached GC log for TM and JM



 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend

 Best,
 Vishwas

 On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
 wrote:

> Hi Vishwas,
>
> I believe the screenshots are from a heap size of 1GB?
>
> There are indeed many internal Flink state objects. They are overhead
> which is required for Flink to organise and track the state on-heap.
> Depending on the actual size of your state objects, the overhead may
> be relatively large or compared to the actual state size.
> For example, if you just keep integers in your state then overhead is
> probably a couple of times larger.
> It is not easy to estimate exactly on-heap size without through
> analysis.
>
> The checkpoint has little overhead and includes only actual state data
> - your serialized state objects which are probably smaller than their heap
> representation.
>
> So my guess is that the heap representation of the state is much
> bigger compared to the checkpoint size.
>
> I also cc other people who might add more thoughts about on-heap state
> size.
>
> You could also provide GC logs as Xintong suggested.
>
> Best,
> Andrey
>
> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
> wrote:
>
>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>> checkpoint size). I took a heap dump and I could not find any memory leak
>> from user code. I see the similar behaviour on smaller heap size, on a 
>> 1GB
>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>> screenshots of heap profiles if it helps. So when the state grows GC 
>> takes
>> a long time and sometimes the job manager removes TM slot because of
>> 1ms timeout and tries to restore the task in another task manager, 
>> this
>> creates a cascading effect and affects other jobs running on the cluster.
>> My tests were run in a single node cluster with 1 TM and

Re: Debezium Flink EMR

2020-08-27 Thread Rex Fenley
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings
before downgrading to 1.10 and figuring out how to replace the Debezium
connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
//
+-+-+--+-+-++
// | Field | Type | Null | Key | Default | Extra |
//
+-+-+--+-+-++
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
//
+-+-+--+-+-++

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley  wrote:

> Thanks!
>
> On Thu, Aug 27, 2020 at 5:33 AM Jark Wu  wrote:
>
>> Hi,
>>
>> Regarding the performance difference, the proposed way will have one more
>> stateful operator (deduplication) than the native 1.11 cdc support.
>> The overhead of the deduplication operator is just similar to a simple
>> group by aggregate (max on each non-key column).
>>
>> Best,
>> Jark
>>
>> On Tue, 25 Aug 2020 at 02:21, Rex Fenley  wrote:
>>
>>> Thank you so much for the help!
>>>
>>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira 
>>> wrote:
>>>
 Yes — you'll get the full row in the payload; and you can also access
 the change operation, which might be useful in your case.

 About performance, I'm summoning Kurt and @Jark Wu  to
 the thread, who will be able to give you a more complete answer and likely
 also some optimization tips for your specific use case.

 Marta

 On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley  wrote:

> Yup! This definitely helps and makes sense.
>
> The 'after' payload comes with all data from the row right? So
> essentially inserts and updates I can insert/replace data by pk and null
> values I just delete by pk, and then I can build out the rest of my joins
> like normal.
>
> Are there any performance implications of doing it this way that is
> different from the out-of-the-box 1.11 solution?
>
> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <
> ma...@ververica.com> wrote:
>
>> Hi, Rex.
>>
>> Part of what enabled CDC support in Flink 1.11 was the refactoring of
>> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>> [2], which allows to emit bounded/unbounded streams with insert, update 
>> and
>> delete rows.
>>
>> In theory, you could consume data generated with Debezium as regular
>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient 
>> way
>> to really treat it as "changelog". As a workaround, what you can do in
>> Flink 1.10 is process these messages as JSON and extract the "after" 
>> field
>> from the payload, and then apply de-duplication [3] to keep only the last
>> row.
>>
>> The DDL for your source table would look something like:
>>
>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE,
>> `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' =
>> 'json', ... );
>> Hope this helps!
>>
>> M

Re: Flink SQL Streaming Join Creates Duplicates

2020-08-27 Thread Austin Cawley-Edwards
Ah, I think the "Result Updating" is what got me -- INNER joins do the job!

On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> oops, the example query should actually be:
>
> SELECT table_1.a, table_1.b, table_2.c
> FROM table_1
> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>
> and duplicate results should actually be:
>
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 1", b = "data b 1", c = null)
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
> Record(a = "data a 2", b = "data b 2", c = null)
>
> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>> reading from a few CSV files and joins some records across them into a
>> couple of data streams (yes, this could be a batch job won't get into why
>> we chose streams unless it's relevant). These joins are producing some
>> duplicate records, one with the joined field present and one with the
>> joined field as `null`, though this happens only ~25% of the time. Reading
>> the docs on joins[1], I thought this could be caused by too strict Idle
>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>> doesn't seem to have an effect, and the problem still occurs when testing
>> on a subset of data that finishes processing in under a minute.
>>
>> The query roughly looks like:
>>
>> table_1 has fields a, b
>> table_2 has fields b, c
>>
>> SELECT table_1.a, table_1.b, table_1.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> Correct result:
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>
>> Results seem to be anywhere between all possible dups and the correct
>> result.
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = null, c = "data c 1")
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = null, c = "data c 2")
>>
>> The CSV files are registered as Flink Tables with the following:
>>
>> tableEnv.connect(
>> new FileSystem()
>> .path(path)
>> )
>> .withFormat(
>> new Csv()
>> .quoteCharacter('"')
>> .ignoreParseErrors()
>> )
>> .withSchema(schema)
>> .inAppendMode()
>> .createTemporaryTable(tableName);
>>
>>
>> I'm creating my table environment like so:
>>
>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .build();
>>
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> tableEnvSettings);
>>
>> TableConfig tConfig = tEnv.getConfig();
>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>
>>
>> Is there something I'm misconfiguring or have misunderstood the docs?
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>


Re: Flink SQL Streaming Join Creates Duplicates

2020-08-27 Thread Austin Cawley-Edwards
oops, the example query should actually be:

SELECT table_1.a, table_1.b, table_2.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

and duplicate results should actually be:

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = "data b 1", c = null)
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = "data b 2", c = null)

On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
> reading from a few CSV files and joins some records across them into a
> couple of data streams (yes, this could be a batch job won't get into why
> we chose streams unless it's relevant). These joins are producing some
> duplicate records, one with the joined field present and one with the
> joined field as `null`, though this happens only ~25% of the time. Reading
> the docs on joins[1], I thought this could be caused by too strict Idle
> State Retention[2], so I increased that to min, max (15min, 24h) but that
> doesn't seem to have an effect, and the problem still occurs when testing
> on a subset of data that finishes processing in under a minute.
>
> The query roughly looks like:
>
> table_1 has fields a, b
> table_2 has fields b, c
>
> SELECT table_1.a, table_1.b, table_1.c
> FROM table_1
> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>
> Correct result:
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>
> Results seem to be anywhere between all possible dups and the correct
> result.
>
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 1", b = null, c = "data c 1")
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
> Record(a = "data a 2", b = null, c = "data c 2")
>
> The CSV files are registered as Flink Tables with the following:
>
> tableEnv.connect(
> new FileSystem()
> .path(path)
> )
> .withFormat(
> new Csv()
> .quoteCharacter('"')
> .ignoreParseErrors()
> )
> .withSchema(schema)
> .inAppendMode()
> .createTemporaryTable(tableName);
>
>
> I'm creating my table environment like so:
>
> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .build();
>
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> tableEnvSettings);
>
> TableConfig tConfig = tEnv.getConfig();
> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>
>
> Is there something I'm misconfiguring or have misunderstood the docs?
>
> Thanks,
> Austin
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>


Flink SQL Streaming Join Creates Duplicates

2020-08-27 Thread Austin Cawley-Edwards
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
reading from a few CSV files and joins some records across them into a
couple of data streams (yes, this could be a batch job won't get into why
we chose streams unless it's relevant). These joins are producing some
duplicate records, one with the joined field present and one with the
joined field as `null`, though this happens only ~25% of the time. Reading
the docs on joins[1], I thought this could be caused by too strict Idle
State Retention[2], so I increased that to min, max (15min, 24h) but that
doesn't seem to have an effect, and the problem still occurs when testing
on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct
result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:

tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);


I'm creating my table environment like so:

EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));


Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time


Re: Failures due to inevitable high backpressure

2020-08-27 Thread Arvid Heise
Hi Hubert,

The most straight-forward reason for backpressure is under-provisioning of
the cluster. An application over time usually needs gradually more
resources. If the user base of your company grows, so does the amount of
messages (be it click stream, page impressions, or any kind of
transactions). Often time, also the operator state grows. Sometimes, it's
just that the events themselves become more complex and thus you need more
overall bandwidth. This means that from time to time, you need to increase
the memory of Flink (for state) or the number of compute nodes (to handle
more events). In the same way, you need to make sure that your sink scales
as well.

If you fail to keep up with the demand, the application gradually becomes
more unstable and you see the vicious cycle at some point, where the system
does not even catch up in off-hours where the number of events becomes
small.

First, it's important to understand what the bottleneck is. Web UI should
help to narrow it down quickly.
Second, if TM becomes unresponsive, chances are high that memory ran out
(on or off-heap). So it might be enough to increase memory. In any case,
I'd expect one of the TM logs to show an exception. You could also profile
GC time of the TMs.
Third, you also might want to check your state size. If it grows over time,
it might also be some kind of leak (also logic errors are common, where too
much is held in state and never evicted).
Fourth, closely monitor how the application behaves while recovery. Is it
making progress at all or stalling at the same point?
Fifth, it might be worthwhile to add a computation node to the cluster,
just so that everything runs again and then remove it. If you now have 2
days of data in need of reprocessing to catch up, even the aforementioned
tweaks may not be enough.

Best,

Arvid

On Wed, Aug 26, 2020 at 10:01 PM David Anderson 
wrote:

> One other thought: some users experiencing this have found it preferable
> to increase the checkpoint timeout to the point where it is effectively
> infinite. Checkpoints that can't timeout are likely to eventually complete,
> which is better than landing in the vicious cycle you described.
>
> David
>
> On Wed, Aug 26, 2020 at 7:41 PM David Anderson 
> wrote:
>
>> You should begin by trying to identify the cause of the backpressure,
>> because the appropriate fix depends on the details.
>>
>> Possible causes that I have seen include:
>>
>> - the job is inadequately provisioned
>> - blocking i/o is being done in a user function
>> - a huge number of timers are firing simultaneously
>> - event time skew between different sources is causing large amounts of
>> state to be buffered
>> - data skew (a hot key) is overwhelming one subtask or slot
>> - external systems can't keep up (e.g., a sink)
>> - lengthy GC pauses caused by running lots of slots per TM with the
>> FsStateBackend
>> - contention for critical resources (e.g., using a NAS as the local disk
>> for RocksDB)
>>
>> Unaligned checkpoints [1], new in Flink 1.11, should address this problem
>> in some cases, depending on the root cause. But first you should try to
>> figure out why you have high backpressure, because a number of the causes
>> listed above won't be helped by changing to unaligned checkpoints.
>>
>> Best,
>> David
>>
>> [1]
>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#unaligned-checkpoints-beta
>>
>> On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen 
>> wrote:
>>
>>> Hello,
>>>
>>> My Flink application has entered into a bad state and I was wondering if
>>> I could get some advice on how to resolve the issue.
>>>
>>> The sequence of events that led to a bad state:
>>>
>>> 1. A failure occurs (e.g., TM timeout) within the cluster
>>> 2. The application successfully recovers from the last completed
>>> checkpoint
>>> 3. The application consumes events from Kafka as quickly as it can. This
>>> introduces high backpressure.
>>> 4. A checkpoint is triggered
>>> 5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
>>> transaction timeout) and the application loops back to step #2. This
>>> creates a vicious cycle where no progress is made.
>>>
>>> I believe the underlying issue is the application experiencing high
>>> backpressure. This can cause the TM to not respond to heartbeats or cause
>>> long checkpoint durations due to delayed processing of the checkpoint.
>>>
>>> I'm confused on the best next steps to take. How do I ensure that
>>> heartbeats and checkpoints successfully complete when experiencing
>>> inevitable high packpressure?
>>>
>>> Best,
>>> Hubert
>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy

Re: Debezium Flink EMR

2020-08-27 Thread Rex Fenley
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu  wrote:

> Hi,
>
> Regarding the performance difference, the proposed way will have one more
> stateful operator (deduplication) than the native 1.11 cdc support.
> The overhead of the deduplication operator is just similar to a simple
> group by aggregate (max on each non-key column).
>
> Best,
> Jark
>
> On Tue, 25 Aug 2020 at 02:21, Rex Fenley  wrote:
>
>> Thank you so much for the help!
>>
>> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira 
>> wrote:
>>
>>> Yes — you'll get the full row in the payload; and you can also access
>>> the change operation, which might be useful in your case.
>>>
>>> About performance, I'm summoning Kurt and @Jark Wu  to
>>> the thread, who will be able to give you a more complete answer and likely
>>> also some optimization tips for your specific use case.
>>>
>>> Marta
>>>
>>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley  wrote:
>>>
 Yup! This definitely helps and makes sense.

 The 'after' payload comes with all data from the row right? So
 essentially inserts and updates I can insert/replace data by pk and null
 values I just delete by pk, and then I can build out the rest of my joins
 like normal.

 Are there any performance implications of doing it this way that is
 different from the out-of-the-box 1.11 solution?

 On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira 
 wrote:

> Hi, Rex.
>
> Part of what enabled CDC support in Flink 1.11 was the refactoring of
> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
> [2], which allows to emit bounded/unbounded streams with insert, update 
> and
> delete rows.
>
> In theory, you could consume data generated with Debezium as regular
> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
> to really treat it as "changelog". As a workaround, what you can do in
> Flink 1.10 is process these messages as JSON and extract the "after" field
> from the payload, and then apply de-duplication [3] to keep only the last
> row.
>
> The DDL for your source table would look something like:
>
> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ...
> );
> Hope this helps!
>
> Marta
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>
>
> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler 
> wrote:
>
>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>
>> On 20/08/2020 19:59, Rex Fenley wrote:
>>
>> Hi,
>>
>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector 
>> arrived
>> in Flink 1.11.0, from looking at the documentation.
>>
>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>
>> I'm wondering what alternative solutions are available for connecting
>> Debezium to Flink? Is there an open source Debezium connector that works
>> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
>> Debezium connector and compile it in my project using Flink 1.10.0 api?
>>
>> For context, I plan on doing some fairly complicated long lived
>> stateful joins / materialization using the Table API over data ingested
>> from Postgres and possibly MySQL.
>>
>> Appreciate any help, thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG
>>   |  FOLLOW US
>>   |  LIKE US
>> 
>>
>>
>>

 --

 Rex Fenley  |  Software Engineer - Mobile and Backend


 Remind.com  |  BLOG 
  |  FOLLOW US   |  LIKE US
 

>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |

Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-27 Thread Arvid Heise
Hi Averell,

This is a known bug [1] caused by the used AWS S3 library not respecting
the classloader [2].

The best solution is to upgrade to 1.10.1 (or take the s3-hadoop jar from
1.10.1). Don't try to put Xerces manually anywhere.

[1] https://issues.apache.org/jira/browse/FLINK-16014
[2] https://github.com/aws/aws-sdk-java/issues/2242

On Thu, Aug 27, 2020 at 4:34 PM Robert Metzger  wrote:

> Hi,
> I guess you've loaded the S3 filesystem using the s3 FS plugin.
>
> You need to put the right jar file containing the SAX2 driver class into
> the plugin directory where you've also put the S3 filesystem plugin.
> You can probably find out the name of the right sax2 jar file from your
> local setup where everything is working.
>
> I hope that helps!
>
> Best,
> Robert
>
> On Thu, Aug 27, 2020 at 1:38 PM Averell  wrote:
>
>> Hello,
>>
>> I have a Flink 1.10 job which runs in AWS EMR, checkpointing to S3a as
>> well
>> as writing output to S3a using StreamingFileSink. The job runs well until
>> I
>> add the Java Hadoop properties:  /-Dfs.s3a.acl.default=
>> BucketOwnerFullControl/. Since after that, the checkpoint process fails to
>> complete.
>>
>> /Caused by: org.xml.sax.SAXException: SAX2 driver class
>> org.apache.xerces.parsers.SAXParser not found/
>> I tried to add a jar file with that class
>> (https://mvnrepository.com/artifact/xerces/xercesImpl/2.12.0) to my
>> flink/lib/ directory, then got the same error but different stacktrace:
>> /Caused by: org.apache.flink.util.SerializedThrowable: SAX2 driver class
>> org.apache.xerces.parsers.SAXParser not found/
>>
>> This seems to be a dependencies conflict, but I couldn't track its root.
>> In my IDE I didn't have any dependencies issue, while I couldn't find
>> SAXParser in the dependencies tree.
>>
>> *Here is the stacktrace when the jar file is not there:*
>> /Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus
>> on
>>
>> s3a://mybucket/checkpoint/a9502b1c81ced10dfcbb21ac43f03e61/chk-2/41f51c24-60fd-474b-9f89-3d65d87037c7:
>> com.amazonaws.SdkClientException: Couldn't initialize a SAX driver to
>> create
>> an XMLReader: Couldn't initialize a SAX driver to create an XMLReader
>> at
>> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
>> at
>> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
>> at
>>
>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251)
>> at
>>
>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>> at
>>
>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:749)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
>> at
>>
>> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
>> at
>>
>> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
>> at
>>
>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:164)
>> at
>>
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
>> at
>>
>> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
>> at
>>
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
>> ... 17 more
>> Caused by: com.amazonaws.SdkClientException: Couldn't initialize a SAX
>> driver to create an XMLReader
>> at
>>
>> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.(XmlResponsesSaxParser.java:118)
>> at
>>
>> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:87)
>> at
>>
>> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
>> at
>>
>> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
>> at
>>
>> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
>> at
>>
>> com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
>> at
>>
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1554)
>> at
>>
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1272)
>> at
>>
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>> at
>>
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.d

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Arvid Heise
Congrats Dian :)

On Thu, Aug 27, 2020 at 5:01 PM Benchao Li  wrote:

> Congratulations Dian!
>
> Cranmer, Danny  于2020年8月27日周四 下午10:55写道:
>
>> Congratulations Dian! :D
>>
>> On 27/08/2020, 15:25, "Robert Metzger"  wrote:
>>
>> CAUTION: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Congratulations Dian!
>>
>> On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu 
>> wrote:
>>
>> > Congratulations Dian
>> > Best,
>> > Congxian
>> >
>> >
>> > Xintong Song  于2020年8月27日周四 下午7:50写道:
>> >
>> > > Congratulations Dian~!
>> > >
>> > > Thank you~
>> > >
>> > > Xintong Song
>> > >
>> > >
>> > >
>> > > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:
>> > >
>> > > > Congratulations Dian!
>> > > >
>> > > > Best,
>> > > > Jark
>> > > >
>> > > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu 
>> wrote:
>> > > >
>> > > > > Congrats, Dian!  Well deserved.
>> > > > >
>> > > > > Best
>> > > > > Leonard
>> > > > >
>> > > > > > 在 2020年8月27日,19:34,Kurt Young  写道:
>> > > > > >
>> > > > > > Congratulations Dian!
>> > > > > >
>> > > > > > Best,
>> > > > > > Kurt
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li <
>> lirui.fu...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > >> Congratulations Dian!
>> > > > > >>
>> > > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei <
>> yuanmei.w...@gmail.com>
>> > > > > wrote:
>> > > > > >>
>> > > > > >>> Congrats!
>> > > > > >>>
>> > > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang <
>> hxbks...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > >>>
>> > > > >  Congratulations Dian!
>> > > > > 
>> > > > >  Best,
>> > > > >  Xingbo
>> > > > > 
>> > > > >  jincheng sun  于2020年8月27日周四
>> 下午5:24写道:
>> > > > > 
>> > > > > > Hi all,
>> > > > > >
>> > > > > > On behalf of the Flink PMC, I'm happy to announce that
>> Dian Fu
>> > is
>> > > > now
>> > > > > > part of the Apache Flink Project Management Committee
>> (PMC).
>> > > > > >
>> > > > > > Dian Fu has been very active on PyFlink component,
>> working on
>> > > > various
>> > > > > > important features, such as the Python UDF and Pandas
>> > > integration,
>> > > > > and
>> > > > > > keeps checking and voting for our releases, and also has
>> > > > successfully
>> > > > > > produced two releases(1.9.3&1.11.1) as RM, currently
>> working as
>> > > RM
>> > > > > to push
>> > > > > > forward the release of Flink 1.12.
>> > > > > >
>> > > > > > Please join me in congratulating Dian Fu for becoming a
>> Flink
>> > PMC
>> > > > > > Member!
>> > > > > >
>> > > > > > Best,
>> > > > > > Jincheng(on behalf of the Flink PMC)
>> > > > > >
>> > > > > 
>> > > > > >>
>> > > > > >> --
>> > > > > >> Best regards!
>> > > > > >> Rui Li
>> > > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Example flink run with security options? Running on k8s in my case

2020-08-27 Thread Nico Kruber
Actually, your curl command may be incorrect since you didn't specify https as 
the protocol: Its man page says:
> If you specify URL without protocol:// prefix, curl will attempt to guess
> what protocol you might want. It will then default to HTTP but try other
> protocols based on often-used host name prefixes. For example, for host
> names starting with "ftp." curl will assume you want to speak FTP.

So I guess it wasn't actually using that and failed to connect. Unfortunately, 
an empty response doesn't tell you much since it could have established a 
connection which was then reset by the server. Please use one of Flink's REST 
endpoints[1] to verify - these should have some content in the response.

It may also be useful to pair curl with `--verbose` for more output and also 
look at the JM logs for any such problems.


I'm not sure how the GCP flink operator sets things up, but if submitting the 
job is independent of starting the JM and TM pods, you don't need any of the 
internal SSL configuration parameters for submitting a job. This is a per-
cluster setting!


As for the certificate generation: I'm not sure "myhost.company.org,ip:
127.0.0.1" would work here if the client is accessing the JM via the name 
"tls-flink-cluster-1-11-jobmanager"...I'm not 100% sure here, but I would 
assume there is verification on the actual URL that the certificate is 
supposed to secure. What you were saying when creating it was that the URL is 
either "myhost.company.org" or "127.0.0.1" which is not correct in the non-
local case.


Just one further note here: Because setting up SSL can be difficult, our 
Ververica Platform (also on the free-to-use community edition) comes with a 
SSL setup [2] that you can enable with a click of a button and it just works 
as expected. Maybe also something to check out (not just for configuring SSL). 
Feel free to contact me personally for more in this regard.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/
rest_api.html
[2] https://docs.ververica.com/user_guide/deployments/
configure_flink.html#ssl-tls-setup

On Thursday, 27 August 2020 13:36:45 CEST Adam Roberts wrote:
> Hey folks, outside of Kubernetes things are great yep, with the same
> generated files. 
> So to share what I'm doing a little more... and I've modified things to be
> more inline with the current docs 
> keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname
> "CN=flink.internal" -storepass internal_store_password -keyalg RSA -keysize
> 4096 -storetype PKCS12 keytool -genkeypair -alias flink.rest -keystore
> rest.keystore -dname "CN=myhost.company.org" -ext
> "SAN=dns:myhost.company.org,ip:127.0.0.1" -storepass rest_keystore_password
> -keyalg RSA -keysize 4096 -storetype PKCS12 keytool -exportcert -keystore
> rest.keystore -alias flink.rest -storepass rest_keystore_password -file
> flink.cer keytool -importcert -keystore rest.truststore -alias flink.rest
> -storepass rest_truststore_password -file flink.cer -noprompt kubectl
> delete secret flink-tls-secret-2
> # Create the simpler secret from main docs for Flink
> cat << EOF | kubectl create -n abp -f -
>   apiVersion: v1
>   kind: Secret
>   type: Opaque
>   metadata:
> name: flink-tls-secret-2
>   data:
> rest.keystore: $(cat ./rest.keystore | base64 | tr -d '\n')
> rest.truststore: $(cat ./rest.truststore | base64 | tr -d '\n')
> internal.keystore: $(cat ./internal.keystore | base64 | tr -d '\n')
> internal.truststore: $(cat ./internal.keystore | base64 | tr -d '\n')
> EOF
>  
> I run this script to get flink-tls-secret-2 with those files in, the keytool
> commands should be familiar since they're from the Flink 1.11 security
> docs). 
> Note I don't have a file called internal.truststore but neither do the docs,
> they mention file.truststore but don't tell me how that's made...maybe this
> is the problem? But things are fine with my normal Flink outside of
> Kubernetes set up. 
> The Job CustomResource does:
> 
> apiVersion: batch/v1
> kind: Job
> metadata:
>   name: sample-job
>   labels:
> app: flink-job
> spec:
>   template:
> spec:
>   # Run as flink user
>   securityContext:
> runAsUser: 
> runAsGroup: 
>   containers:
>   - name: wordcount
> # Replace this to be a Docker image with your built Flink app at a
> known location # Your build of Flink should be based on
> https://github.com/apache/flink-docker/tree/master/1.11/scala_2.12-java8-de
> bian # with a modification to the Dockerfile to add your jar in (with a
> COPY) image: adamroberts/mycoolflink:latest
> - /opt/flink/bin/flink
> - run
> - -D security.ssl.internal.enabled=true
> - -D security.ssl.rest.enabled=true
> - -D security.ssl.rest.keystore=/etc/flink-secrets/rest.keystore
> - -D security.ssl.rest.truststore=/etc/flink-secrets/rest.truststore
> - -D security.ssl.rest.keystore-password=rest_keystore_pass

Re: SDK vs Connectors

2020-08-27 Thread Robert Metzger
Hi Prasanna,

(General remark: For such questions, please send the email only to
user@flink.apache.org. There's no need to email to dev@ as well.)

I don't think Flink can do much if the library you are using isn't throwing
exceptions. Maybe the library has other means of error reporting (a
callback to register, or it returns futures) which you can integrate with
Flink.



On Sun, Aug 23, 2020 at 4:34 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Thanks for the Reply Yun,
>
> I see that when I publish the messages to SNS from map operator, in case
> of any errors I find the checkpointing mechanism takes care of "no data
> loss".
>
> One scenario I could not replicate is that, the method from SDK unable to
> send messages to SNS but remains silent not throwing any
> errors/exceptions.In this case we may not confirm "at least once guarantee"
> of delivery of messages.
>
> Prasanna.
>
> On Sun 23 Aug, 2020, 07:51 Yun Gao,  wrote:
>
>> Hi Prasanna,
>>
>>1) Semantically both a) and b) would be Ok. If the Custom sink could
>> be chained with the map operator (I assume the map operator is the
>> "Processing" in the graph), there should be also no much difference
>> physically, if they could not chain, then writting a custom sink would
>> cause another pass of network transferring, but the custom sink would be
>> run in a different thread, thus much more computation resources could be
>> exploited.
>>2) To achieve at-least-once, you need to implment the
>> "CheckpointedFunction" interface, and ensures flushing all the data to the
>> outside systems when snapshotting states. Since if the checkpointing
>> succeed, the previous data will not be replayed after failover, thus these
>> pieces of data need to be ensured written out before the checkpoint
>> succeeds.
>>3) From my side I don't think there are significant disadvantages of
>> writing custom sink functions.
>>
>> Best,
>>  Yun
>>
>>
>> --
>> Sender:Prasanna kumar
>> Date:2020/08/22 02:00:51
>> Recipient:user; 
>> Theme:SDK vs Connectors
>>
>> Hi Team,
>>
>> Following is the pipeline
>> Kafka => Processing => SNS Topics .
>>
>> Flink Does not provide a SNS connector out of the box.
>>
>> a) I implemented the above by using AWS SDK and published the messages in
>> the Map operator itself.
>> The pipeline is working well. I see messages flowing to SNS topics.
>>
>> b) Another approach is that I could write a custom sink function and
>> still publish to SNS using SDK in this stage.
>>
>> Questions
>> 1) What would be the primary difference between approach a) and b). Is
>> there any significant advantage of one over the other ?
>>
>> 2) Would at least once guarantee be confirmed if we follow the above
>> approach?
>>
>> 3) Would there be any significant disadvantages(rather what we need to be
>> careful ) of writing our custom sink functions ?
>>
>> Thanks,
>> Prasanna.
>>
>>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Benchao Li
Congratulations Dian!

Cranmer, Danny  于2020年8月27日周四 下午10:55写道:

> Congratulations Dian! :D
>
> On 27/08/2020, 15:25, "Robert Metzger"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> Congratulations Dian!
>
> On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu 
> wrote:
>
> > Congratulations Dian
> > Best,
> > Congxian
> >
> >
> > Xintong Song  于2020年8月27日周四 下午7:50写道:
> >
> > > Congratulations Dian~!
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:
> > >
> > > > Congratulations Dian!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu 
> wrote:
> > > >
> > > > > Congrats, Dian!  Well deserved.
> > > > >
> > > > > Best
> > > > > Leonard
> > > > >
> > > > > > 在 2020年8月27日,19:34,Kurt Young  写道:
> > > > > >
> > > > > > Congratulations Dian!
> > > > > >
> > > > > > Best,
> > > > > > Kurt
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li <
> lirui.fu...@gmail.com>
> > > wrote:
> > > > > >
> > > > > >> Congratulations Dian!
> > > > > >>
> > > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei <
> yuanmei.w...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >>> Congrats!
> > > > > >>>
> > > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang <
> hxbks...@gmail.com
> > >
> > > > > wrote:
> > > > > >>>
> > > > >  Congratulations Dian!
> > > > > 
> > > > >  Best,
> > > > >  Xingbo
> > > > > 
> > > > >  jincheng sun  于2020年8月27日周四
> 下午5:24写道:
> > > > > 
> > > > > > Hi all,
> > > > > >
> > > > > > On behalf of the Flink PMC, I'm happy to announce that
> Dian Fu
> > is
> > > > now
> > > > > > part of the Apache Flink Project Management Committee
> (PMC).
> > > > > >
> > > > > > Dian Fu has been very active on PyFlink component,
> working on
> > > > various
> > > > > > important features, such as the Python UDF and Pandas
> > > integration,
> > > > > and
> > > > > > keeps checking and voting for our releases, and also has
> > > > successfully
> > > > > > produced two releases(1.9.3&1.11.1) as RM, currently
> working as
> > > RM
> > > > > to push
> > > > > > forward the release of Flink 1.12.
> > > > > >
> > > > > > Please join me in congratulating Dian Fu for becoming a
> Flink
> > PMC
> > > > > > Member!
> > > > > >
> > > > > > Best,
> > > > > > Jincheng(on behalf of the Flink PMC)
> > > > > >
> > > > > 
> > > > > >>
> > > > > >> --
> > > > > >> Best regards!
> > > > > >> Rui Li
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
>

-- 

Best,
Benchao Li


Re: OOM error for heap state backend.

2020-08-27 Thread Robert Metzger
Hi Vishwas,

Your scenario sounds like RocksDB would actually be recommended. I would
always suggest to start with RocksDB, unless your state is really small
compared to the available memory, or you need to optimize for performance.
But maybe your job is running fine with RocksDB (performance wise), then
there's no need to go into the details of heap memory management with Flink.



On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara 
wrote:

> Thanks Andrey,
> My question is related to
>
> The FsStateBackend is encouraged for:
>
>- Jobs with large state, long windows, large key/value states.
>- All high-availability setups.
>
> How large is large state without any overhead added by the framework?
>
> Best,
> Vishwas
>
> On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>>  is this quantifiable with respect to JVM heap size on a single node
>>> without the node being used for other tasks ?
>>
>>
>> I don't quite understand this question. I believe the recommendation in
>> docs has the same reason: use larger state objects so that the Java object
>> overhead pays off.
>> RocksDB keeps state in memory and on disk in the serialized form.
>> Therefore it usually has a smaller footprint.
>> Other jobs in the same task manager can potentially use other state
>> backend depending on their state requirements.
>> All tasks in the same task manager share the JVM heap as the task manager
>> runs one JVM system process on the machine where it is deployed to.
>>
>> Best,
>> Andrey
>>
>> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
>> wrote:
>>
>>> Hi Andrey,
>>> Thanks for getting back to me so quickly. The screenshots are for 1GB
>>> heap, the keys for the state are 20 character strings(20 bytes, we don't
>>> have multi byte characters) . So the overhead seems to be quite large(4x)
>>> even in comparison to the checkpoint size(which already adds an overhead) .
>>> In this document [1] it says use FS/Heap backend for large states, is this
>>> quantifiable with respect to JVM heap size on a single node without the
>>> node being used for other tasks ?
>>> I have attached GC log for TM and JM
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>>
>>> Best,
>>> Vishwas
>>>
>>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
>>> wrote:
>>>
 Hi Vishwas,

 I believe the screenshots are from a heap size of 1GB?

 There are indeed many internal Flink state objects. They are overhead
 which is required for Flink to organise and track the state on-heap.
 Depending on the actual size of your state objects, the overhead may be
 relatively large or compared to the actual state size.
 For example, if you just keep integers in your state then overhead is
 probably a couple of times larger.
 It is not easy to estimate exactly on-heap size without through
 analysis.

 The checkpoint has little overhead and includes only actual state data
 - your serialized state objects which are probably smaller than their heap
 representation.

 So my guess is that the heap representation of the state is much bigger
 compared to the checkpoint size.

 I also cc other people who might add more thoughts about on-heap state
 size.

 You could also provide GC logs as Xintong suggested.

 Best,
 Andrey

 On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
 wrote:

> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
> checkpoint size). I took a heap dump and I could not find any memory leak
> from user code. I see the similar behaviour on smaller heap size, on a 1GB
> heap , the state size from checkpoint UI is 180 MB. Attaching some
> screenshots of heap profiles if it helps. So when the state grows GC takes
> a long time and sometimes the job manager removes TM slot because of
> 1ms timeout and tries to restore the task in another task manager, 
> this
> creates a cascading effect and affects other jobs running on the cluster.
> My tests were run in a single node cluster with 1 TM and 4 task slots with
> a parallelism of 4.
>
> Best,
> Vishwas
>
> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>> If you use Flink 1.7, check the older memory model docs [1] because
>> you referred to the new memory model of Flink 1.10 in your reference 2.
>> Could you also share a screenshot where you get the state size of 2.5
>> GB? Do you mean Flink WebUI?
>> Generally, it is quite hard to estimate the on-heap size of state
>> java objects. I never heard about such a Flink metric.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
>> wrote:
>>>

Re: Default Flink Metrics Graphite

2020-08-27 Thread Robert Metzger
I don't think these error messages give us a hint why you can't see the
metrics (because they are about registering metrics, not reporting them)

Are you sure you are using the right configuration parameters for Flink
1.10? That all required JARs are in the lib/ folder (on all machines) and
that your graphite setup is working (have you confirmed that you can show
any metrics in the Graphite UI (maybe from a Graphite demo thingy))?


On Thu, Aug 27, 2020 at 2:05 AM Vijayendra Yadav 
wrote:

> Hi Chesnay and Dawid,
>
> I see multiple entries as following in Log:
>
> 2020-08-26 23:46:19,105 WARN
> org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
> registering metric: numRecordsIn.
> java.lang.IllegalArgumentException: A metric named
> ip-99--99-99.taskmanager.container_1596056409708_1570_01_06.vdcs-kafka-flink-test.Map.0.numRecordsIn
> already exists
> at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
> 2020-08-26 23:46:19,094 WARN
> org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
> registering metric: numRecordsOut.
> java.lang.IllegalArgumentException: A metric named
> ip-99--99-999.taskmanager.container_1596056409708_1570_01_05.vdcs-kafka-flink-test.Map.2.numRecordsOut
> already exists
> at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
> at
> org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
> at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
> at
> org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
> at
> org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:343)
> at
> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:426)
> at
> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:359)
> at
> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:349)
> at
> org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:41)
> at
> org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:48)
> at
> org.apache.flink.runtime.metrics.groups.TaskMetricGroup.lambda$getOrAddOperator$0(TaskMetricGroup.java:154)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
> at
> org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:154)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:180)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
> at
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:75)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:429)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Regards,
> Vijay
>
>
> On Wed, Aug 26, 2020 at 7:53 AM Chesnay Schepler 
> wrote:
>
>> metrics.reporter.grph.class:
>> org.apache.flink.metrics.graphite.GraphiteReporter
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#graphite-orgapacheflinkmetricsgraphitegraphitereporter
>>
>> On 26/08/2020 16:40, Vijayendra Yadav wrote:
>>
>> Hi Dawid,
>>
>> I have 1.10.0 version of flink. What is alternative for this version ?
>>
>> Regards,
>> Vijay
>>
>>
>> On Aug 25, 2020, at 11:44 PM, Dawid Wysakowicz 
>>  wrote:
>>
>> 
>>
>> Hi Vijay,
>>
>> I think the problem might be that you are using a wrong version of the
>> reporter.
>>
>> You say you used flink-metrics-graphite-1.10.0.jar from 1.10 as a plugin,
>> but it was migrated to plugins in 1.11 only[1].
>>
>> I'd recommend trying it out with the same 1.11 version of Flink and
>> Graphite reporter.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16965
>> On 26/08/2020 08:04, Vijayendra Yadav wrote:
>>
>> Hi Nikola,
>>
>> To rule out any other cluster issues, I have tried it in my local now.
>> Steps as follows, but don't see any metrics yet.
>>
>> 1) Set up local Graphite
>>

Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-27 Thread Robert Metzger
Hi,
I guess you've loaded the S3 filesystem using the s3 FS plugin.

You need to put the right jar file containing the SAX2 driver class into
the plugin directory where you've also put the S3 filesystem plugin.
You can probably find out the name of the right sax2 jar file from your
local setup where everything is working.

I hope that helps!

Best,
Robert

On Thu, Aug 27, 2020 at 1:38 PM Averell  wrote:

> Hello,
>
> I have a Flink 1.10 job which runs in AWS EMR, checkpointing to S3a as well
> as writing output to S3a using StreamingFileSink. The job runs well until I
> add the Java Hadoop properties:  /-Dfs.s3a.acl.default=
> BucketOwnerFullControl/. Since after that, the checkpoint process fails to
> complete.
>
> /Caused by: org.xml.sax.SAXException: SAX2 driver class
> org.apache.xerces.parsers.SAXParser not found/
> I tried to add a jar file with that class
> (https://mvnrepository.com/artifact/xerces/xercesImpl/2.12.0) to my
> flink/lib/ directory, then got the same error but different stacktrace:
> /Caused by: org.apache.flink.util.SerializedThrowable: SAX2 driver class
> org.apache.xerces.parsers.SAXParser not found/
>
> This seems to be a dependencies conflict, but I couldn't track its root.
> In my IDE I didn't have any dependencies issue, while I couldn't find
> SAXParser in the dependencies tree.
>
> *Here is the stacktrace when the jar file is not there:*
> /Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on
>
> s3a://mybucket/checkpoint/a9502b1c81ced10dfcbb21ac43f03e61/chk-2/41f51c24-60fd-474b-9f89-3d65d87037c7:
> com.amazonaws.SdkClientException: Couldn't initialize a SAX driver to
> create
> an XMLReader: Couldn't initialize a SAX driver to create an XMLReader
> at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
> at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
> at
>
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251)
> at
>
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at
>
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:749)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
> at
>
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at
>
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at
>
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:164)
> at
>
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
> at
>
> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
> at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
> ... 17 more
> Caused by: com.amazonaws.SdkClientException: Couldn't initialize a SAX
> driver to create an XMLReader
> at
>
> com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.(XmlResponsesSaxParser.java:118)
> at
>
> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:87)
> at
>
> com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
> at
>
> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
> at
>
> com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
> at
>
> com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1554)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1272)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
>
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at
> com.amazonaws.http.AmazonHttpClient.e

Re: Different deserialization schemas for Kafka keys and values

2020-08-27 Thread Robert Metzger
Hi,

Check out the KafkaDeserializationSchema (
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-deserializationschema)
which allows you to deserialize the key and value bytes coming from Kafka.

Best,
Robert


On Thu, Aug 27, 2020 at 1:56 PM Manas Kale  wrote:

> Hi,
> I have a kafka topic on which the key is serialized in a custom format and
> the value is serialized as JSON. How do I create a FlinkKafakConsumer that
> has different deserialization schemas for the key and value? Here's what I
> tried:
>
> FlinkKafkaConsumer> advancedFeatureData = new 
> FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new 
> TypeInformationKeyValueSerializationSchema(
> TypeInformation.of(new TypeHint() {}),
> TypeInformation.of(new TypeHint() {}),
> env.getConfig()
> ), properties);
>
> However, I get the error:
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
> ID: 121
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at
> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
> at
> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
> Is there something I am missing with my approach or am I supposed to use a
> completely different class than TypeInformationKeyValueSerializationSchema?
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Robert Metzger
Congratulations Dian!

On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu  wrote:

> Congratulations Dian
> Best,
> Congxian
>
>
> Xintong Song  于2020年8月27日周四 下午7:50写道:
>
> > Congratulations Dian~!
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:
> >
> > > Congratulations Dian!
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu  wrote:
> > >
> > > > Congrats, Dian!  Well deserved.
> > > >
> > > > Best
> > > > Leonard
> > > >
> > > > > 在 2020年8月27日,19:34,Kurt Young  写道:
> > > > >
> > > > > Congratulations Dian!
> > > > >
> > > > > Best,
> > > > > Kurt
> > > > >
> > > > >
> > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li 
> > wrote:
> > > > >
> > > > >> Congratulations Dian!
> > > > >>
> > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei 
> > > > wrote:
> > > > >>
> > > > >>> Congrats!
> > > > >>>
> > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  >
> > > > wrote:
> > > > >>>
> > > >  Congratulations Dian!
> > > > 
> > > >  Best,
> > > >  Xingbo
> > > > 
> > > >  jincheng sun  于2020年8月27日周四 下午5:24写道:
> > > > 
> > > > > Hi all,
> > > > >
> > > > > On behalf of the Flink PMC, I'm happy to announce that Dian Fu
> is
> > > now
> > > > > part of the Apache Flink Project Management Committee (PMC).
> > > > >
> > > > > Dian Fu has been very active on PyFlink component, working on
> > > various
> > > > > important features, such as the Python UDF and Pandas
> > integration,
> > > > and
> > > > > keeps checking and voting for our releases, and also has
> > > successfully
> > > > > produced two releases(1.9.3&1.11.1) as RM, currently working as
> > RM
> > > > to push
> > > > > forward the release of Flink 1.12.
> > > > >
> > > > > Please join me in congratulating Dian Fu for becoming a Flink
> PMC
> > > > > Member!
> > > > >
> > > > > Best,
> > > > > Jincheng(on behalf of the Flink PMC)
> > > > >
> > > > 
> > > > >>
> > > > >> --
> > > > >> Best regards!
> > > > >> Rui Li
> > > > >>
> > > >
> > > >
> > >
> >
>


Re: Resource leak in DataSourceNode?

2020-08-27 Thread Robert Metzger
Hi Mark,

Thanks a lot for your message and the good investigation! I believe you've
found a bug in Flink. I filed an issue for the problem:
https://issues.apache.org/jira/browse/FLINK-19064.

Would you be interested in opening a pull request to fix this?
Otherwise, I'm sure a committer will pick up the issue soon.

I'm not aware of a simple workaround for the problem.

Best,
Robert


On Wed, Aug 26, 2020 at 4:05 PM Mark Davis  wrote:

> Hi,
>
> I am trying to investigate a problem with non-released resources in my
> application.
>
> I have a stateful application which submits Flink DataSet jobs using code
> very similar to the code in CliFrontend.
> I noticed what I am getting a lot of non-closed connections to my data
> store (HBase in my case). The connections are held by the application not
> the jobs themselves.
>
> I am using HBaseRowDataInputFormat and it seems that HBase connections
> opened in the configure() method during the job graph creation(before the
> jobs is executed) are not closed. My search lead me to the method
> DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics)
> where I see that a format is not closed after being configured.
>
> Is that correct? How can I overcome this issue?
>
> My application is long running that is probably why I observe the resource
> leak. Would I spawn a new JVM to run jobs this problem would not be
> noticeable.
>
> Thank you!
>
> Cheers,
>   Marc
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Congxian Qiu
Congratulations Dian
Best,
Congxian


Xintong Song  于2020年8月27日周四 下午7:50写道:

> Congratulations Dian~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:
>
> > Congratulations Dian!
> >
> > Best,
> > Jark
> >
> > On Thu, 27 Aug 2020 at 19:37, Leonard Xu  wrote:
> >
> > > Congrats, Dian!  Well deserved.
> > >
> > > Best
> > > Leonard
> > >
> > > > 在 2020年8月27日,19:34,Kurt Young  写道:
> > > >
> > > > Congratulations Dian!
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li 
> wrote:
> > > >
> > > >> Congratulations Dian!
> > > >>
> > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei 
> > > wrote:
> > > >>
> > > >>> Congrats!
> > > >>>
> > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang 
> > > wrote:
> > > >>>
> > >  Congratulations Dian!
> > > 
> > >  Best,
> > >  Xingbo
> > > 
> > >  jincheng sun  于2020年8月27日周四 下午5:24写道:
> > > 
> > > > Hi all,
> > > >
> > > > On behalf of the Flink PMC, I'm happy to announce that Dian Fu is
> > now
> > > > part of the Apache Flink Project Management Committee (PMC).
> > > >
> > > > Dian Fu has been very active on PyFlink component, working on
> > various
> > > > important features, such as the Python UDF and Pandas
> integration,
> > > and
> > > > keeps checking and voting for our releases, and also has
> > successfully
> > > > produced two releases(1.9.3&1.11.1) as RM, currently working as
> RM
> > > to push
> > > > forward the release of Flink 1.12.
> > > >
> > > > Please join me in congratulating Dian Fu for becoming a Flink PMC
> > > > Member!
> > > >
> > > > Best,
> > > > Jincheng(on behalf of the Flink PMC)
> > > >
> > > 
> > > >>
> > > >> --
> > > >> Best regards!
> > > >> Rui Li
> > > >>
> > >
> > >
> >
>


Re: Async IO with SQL API

2020-08-27 Thread Jark Wu
Hi,

Sorry for the late reply.

AFAIK, it's impossible to do Async IO on pure Table API / SQL in 1.9
old planner.
A doable way is convert the Table into DataStream and apply AsyncFunction
on it.

Best,
Jark

On Thu, 20 Aug 2020 at 00:35, Spurthi Chaganti 
wrote:

> Thank you Till for your response.
>
> We haven't completely evaluated migration to blink planner as it was not
> prioritized, certainly a task we would look into if we arent able to get a
> solution using SQL api.
>
> On Wed, Aug 19, 2020 at 3:23 AM Till Rohrmann 
> wrote:
>
>> Hi Spurthi,
>>
>> thanks for reaching out to the Flink community. Have you tried using the
>> Blink planner where these features are available?
>>
>> Pulling in Jark and Timo who worked on this feature and who might be able
>> to explain to you the rationale behind making the LookupableTableSource and
>> AsyncTableFunction a Blink planner feature only.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 18, 2020 at 8:11 PM Spurthi Chaganti <
>> spurthi.chaga...@gmail.com> wrote:
>>
>>> Hello folks,
>>>
>>> We are using flink 1.9 SQL API and we are NOT using blink planner.
>>> Our platform users express their flink jobs as SQL queries. We currently
>>> have a use case of asynchronously lookup data from third parties for every
>>> event we read from kafka stream and populate additional fields which we use
>>> later in our group bys.
>>>
>>> We were trying to prototype the best way we can asynchronously enrich
>>> our events and two options we could think of:
>>> 1. UDTF extending AsyncTableFunction and users can use these
>>> functions as part of SQL query. But it seems like AsyncTableFunction cannot
>>> be used as UDTF currently.
>>> 2. LookupableTableSource to access data in Temporal table but its
>>> only supported in blink.
>>>
>>>  Can you please suggest any other options I can try out?
>>>
>>> Thanks,
>>> -spurthi
>>>
>>
>
> --
> -spurthi
>


Re: Debezium Flink EMR

2020-08-27 Thread Jark Wu
Hi,

Regarding the performance difference, the proposed way will have one more
stateful operator (deduplication) than the native 1.11 cdc support.
The overhead of the deduplication operator is just similar to a simple
group by aggregate (max on each non-key column).

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley  wrote:

> Thank you so much for the help!
>
> On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira 
> wrote:
>
>> Yes — you'll get the full row in the payload; and you can also access the
>> change operation, which might be useful in your case.
>>
>> About performance, I'm summoning Kurt and @Jark Wu  to
>> the thread, who will be able to give you a more complete answer and likely
>> also some optimization tips for your specific use case.
>>
>> Marta
>>
>> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley  wrote:
>>
>>> Yup! This definitely helps and makes sense.
>>>
>>> The 'after' payload comes with all data from the row right? So
>>> essentially inserts and updates I can insert/replace data by pk and null
>>> values I just delete by pk, and then I can build out the rest of my joins
>>> like normal.
>>>
>>> Are there any performance implications of doing it this way that is
>>> different from the out-of-the-box 1.11 solution?
>>>
>>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira 
>>> wrote:
>>>
 Hi, Rex.

 Part of what enabled CDC support in Flink 1.11 was the refactoring of
 the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
 [2], which allows to emit bounded/unbounded streams with insert, update and
 delete rows.

 In theory, you could consume data generated with Debezium as regular
 JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
 to really treat it as "changelog". As a workaround, what you can do in
 Flink 1.10 is process these messages as JSON and extract the "after" field
 from the payload, and then apply de-duplication [3] to keep only the last
 row.

 The DDL for your source table would look something like:

 CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
 DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ...
 );
 Hope this helps!

 Marta

 [1]
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
 [3]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication


 On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler 
 wrote:

> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>
> On 20/08/2020 19:59, Rex Fenley wrote:
>
> Hi,
>
> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
> however, EMR only supports Flink 1.10.0, whereas Debezium Connector 
> arrived
> in Flink 1.11.0, from looking at the documentation.
>
> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>
> I'm wondering what alternative solutions are available for connecting
> Debezium to Flink? Is there an open source Debezium connector that works
> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
> Debezium connector and compile it in my project using Flink 1.10.0 api?
>
> For context, I plan on doing some fairly complicated long lived
> stateful joins / materialization using the Table API over data ingested
> from Postgres and possibly MySQL.
>
> Appreciate any help, thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG 
>  |  FOLLOW US   |  LIKE US
> 
>
>
>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: [Survey] Demand collection for stream SQL window join

2020-08-27 Thread Jark Wu
Thanks for the survey!

I'm also interested on the use cases of DataStream window join.

Best,
Jark

On Thu, 27 Aug 2020 at 14:40, Danny Chan  wrote:

> Hi, users, here i want to collect some use cases about the window join[1],
> which is a supported feature on the data stream. The purpose is to make a
> decision whether to support it also on the SQL side, for example, 2
> tumbling window join may look like this:
>
> ```sql
> select ... window_start, window_end
> from TABLE(
>   TUMBLE(
> DATA => TABLE table_a,
> TIMECOL => DESCRIPTOR(rowtime),
> SIZE => INTERVAL '1' MINUTE)) tumble_a
> [LEFT | RIGHT | FULL OUTER] JOIN TABLE(
>   TUMBLE(
> DATA => TABLE table_b,
> TIMECOL => DESCRIPTOR(rowtime),
> SIZE => INTERVAL '1' MINUTE)) tumble_b
> on tumble_a.col1 = tumble_b.col1 and ...
> ```
>
> I had some discussion off-line with some companies (Tencent, Bytedance and
> Meituan), and it seems that interval join is the most common case. The
> window join case is very few, so i'm looking forward there are some
> feed-back here.
>
> Expecially, it is apprecaited if you can share the use cases of the window
> join (using the Flink data stream or written by other programs) and why the
> window-join is a must(can not replace with normal stream join or interval
> join).
>
> Thanks in advance ~
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html
>
> Best,
> Danny Chan
>


Different deserialization schemas for Kafka keys and values

2020-08-27 Thread Manas Kale
Hi,
I have a kafka topic on which the key is serialized in a custom format and
the value is serialized as JSON. How do I create a FlinkKafakConsumer that
has different deserialization schemas for the key and value? Here's what I
tried:

FlinkKafkaConsumer> advancedFeatureData =
new FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new
TypeInformationKeyValueSerializationSchema(
TypeInformation.of(new TypeHint() {}),
TypeInformation.of(new TypeHint() {}),
env.getConfig()
), properties);

However, I get the error:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
121
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112)
at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

Is there something I am missing with my approach or am I supposed to use a
completely different class than TypeInformationKeyValueSerializationSchema?


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Xintong Song
Congratulations Dian~!

Thank you~

Xintong Song



On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:

> Congratulations Dian!
>
> Best,
> Jark
>
> On Thu, 27 Aug 2020 at 19:37, Leonard Xu  wrote:
>
> > Congrats, Dian!  Well deserved.
> >
> > Best
> > Leonard
> >
> > > 在 2020年8月27日,19:34,Kurt Young  写道:
> > >
> > > Congratulations Dian!
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:
> > >
> > >> Congratulations Dian!
> > >>
> > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei 
> > wrote:
> > >>
> > >>> Congrats!
> > >>>
> > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang 
> > wrote:
> > >>>
> >  Congratulations Dian!
> > 
> >  Best,
> >  Xingbo
> > 
> >  jincheng sun  于2020年8月27日周四 下午5:24写道:
> > 
> > > Hi all,
> > >
> > > On behalf of the Flink PMC, I'm happy to announce that Dian Fu is
> now
> > > part of the Apache Flink Project Management Committee (PMC).
> > >
> > > Dian Fu has been very active on PyFlink component, working on
> various
> > > important features, such as the Python UDF and Pandas integration,
> > and
> > > keeps checking and voting for our releases, and also has
> successfully
> > > produced two releases(1.9.3&1.11.1) as RM, currently working as RM
> > to push
> > > forward the release of Flink 1.12.
> > >
> > > Please join me in congratulating Dian Fu for becoming a Flink PMC
> > > Member!
> > >
> > > Best,
> > > Jincheng(on behalf of the Flink PMC)
> > >
> > 
> > >>
> > >> --
> > >> Best regards!
> > >> Rui Li
> > >>
> >
> >
>


Re: JSON to Parquet

2020-08-27 Thread Averell
Hi Dawid,

Thanks for the suggestion. So, basically I'll need to use the JSON connector
to get the JSON strings into Rows, and from Rows to Parquet records using
the parquet connecter?
I have never tried the TableAPI in the past, have been using the
StreamingAPI only. Will follow your suggestion now.

Thanks for your help.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Jark Wu
Congratulations Dian!

Best,
Jark

On Thu, 27 Aug 2020 at 19:37, Leonard Xu  wrote:

> Congrats, Dian!  Well deserved.
>
> Best
> Leonard
>
> > 在 2020年8月27日,19:34,Kurt Young  写道:
> >
> > Congratulations Dian!
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:
> >
> >> Congratulations Dian!
> >>
> >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei 
> wrote:
> >>
> >>> Congrats!
> >>>
> >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang 
> wrote:
> >>>
>  Congratulations Dian!
> 
>  Best,
>  Xingbo
> 
>  jincheng sun  于2020年8月27日周四 下午5:24写道:
> 
> > Hi all,
> >
> > On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
> > part of the Apache Flink Project Management Committee (PMC).
> >
> > Dian Fu has been very active on PyFlink component, working on various
> > important features, such as the Python UDF and Pandas integration,
> and
> > keeps checking and voting for our releases, and also has successfully
> > produced two releases(1.9.3&1.11.1) as RM, currently working as RM
> to push
> > forward the release of Flink 1.12.
> >
> > Please join me in congratulating Dian Fu for becoming a Flink PMC
> > Member!
> >
> > Best,
> > Jincheng(on behalf of the Flink PMC)
> >
> 
> >>
> >> --
> >> Best regards!
> >> Rui Li
> >>
>
>


SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-27 Thread Averell
Hello,

I have a Flink 1.10 job which runs in AWS EMR, checkpointing to S3a as well
as writing output to S3a using StreamingFileSink. The job runs well until I
add the Java Hadoop properties:  /-Dfs.s3a.acl.default=
BucketOwnerFullControl/. Since after that, the checkpoint process fails to
complete.

/Caused by: org.xml.sax.SAXException: SAX2 driver class
org.apache.xerces.parsers.SAXParser not found/
I tried to add a jar file with that class
(https://mvnrepository.com/artifact/xerces/xercesImpl/2.12.0) to my
flink/lib/ directory, then got the same error but different stacktrace:
/Caused by: org.apache.flink.util.SerializedThrowable: SAX2 driver class
org.apache.xerces.parsers.SAXParser not found/

This seems to be a dependencies conflict, but I couldn't track its root.
In my IDE I didn't have any dependencies issue, while I couldn't find
SAXParser in the dependencies tree.

*Here is the stacktrace when the jar file is not there:*
/Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on
s3a://mybucket/checkpoint/a9502b1c81ced10dfcbb21ac43f03e61/chk-2/41f51c24-60fd-474b-9f89-3d65d87037c7:
com.amazonaws.SdkClientException: Couldn't initialize a SAX driver to create
an XMLReader: Couldn't initialize a SAX driver to create an XMLReader
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2251)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:749)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:164)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 17 more
Caused by: com.amazonaws.SdkClientException: Couldn't initialize a SAX
driver to create an XMLReader
at
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.(XmlResponsesSaxParser.java:118)
at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:87)
at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
at
com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1554)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1272)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4266)
at
com.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:876)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listObjects$5(S3AFileSystem.java:1262)
at
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoke

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Leonard Xu
Congrats, Dian!  Well deserved.

Best
Leonard

> 在 2020年8月27日,19:34,Kurt Young  写道:
> 
> Congratulations Dian!
> 
> Best,
> Kurt
> 
> 
> On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:
> 
>> Congratulations Dian!
>> 
>> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei  wrote:
>> 
>>> Congrats!
>>> 
>>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>>> 
 Congratulations Dian!
 
 Best,
 Xingbo
 
 jincheng sun  于2020年8月27日周四 下午5:24写道:
 
> Hi all,
> 
> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
> part of the Apache Flink Project Management Committee (PMC).
> 
> Dian Fu has been very active on PyFlink component, working on various
> important features, such as the Python UDF and Pandas integration, and
> keeps checking and voting for our releases, and also has successfully
> produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
> forward the release of Flink 1.12.
> 
> Please join me in congratulating Dian Fu for becoming a Flink PMC
> Member!
> 
> Best,
> Jincheng(on behalf of the Flink PMC)
> 
 
>> 
>> --
>> Best regards!
>> Rui Li
>> 



RE: Example flink run with security options? Running on k8s in my case

2020-08-27 Thread Adam Roberts
Hey folks, outside of Kubernetes things are great yep, with the same generated files.
 
So to share what I'm doing a little more... and I've modified things to be more inline with the current docs
 
keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname "CN=flink.internal" -storepass internal_store_password -keyalg RSA -keysize 4096 -storetype PKCS12keytool -genkeypair -alias flink.rest -keystore rest.keystore -dname "CN=myhost.company.org" -ext "SAN=dns:myhost.company.org,ip:127.0.0.1" -storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12keytool -exportcert -keystore rest.keystore -alias flink.rest -storepass rest_keystore_password -file flink.cerkeytool -importcert -keystore rest.truststore -alias flink.rest -storepass rest_truststore_password -file flink.cer -noprompt
kubectl delete secret flink-tls-secret-2# Create the simpler secret from main docs for Flinkcat << EOF | kubectl create -n abp -f -  apiVersion: v1  kind: Secret  type: Opaque  metadata:    name: flink-tls-secret-2  data:    rest.keystore: $(cat ./rest.keystore | base64 | tr -d '\n')    rest.truststore: $(cat ./rest.truststore | base64 | tr -d '\n')    internal.keystore: $(cat ./internal.keystore | base64 | tr -d '\n')    internal.truststore: $(cat ./internal.keystore | base64 | tr -d '\n')EOF
 
I run this script to get flink-tls-secret-2 with those files in, the keytool commands should be familiar since they're from the Flink 1.11 security docs).
 
Note I don't have a file called internal.truststore but neither do the docs, they mention file.truststore but don't tell me how that's made...maybe this is the problem? But things are fine with my normal Flink outside of Kubernetes set up.
 
The Job CustomResource does:
apiVersion: batch/v1kind: Jobmetadata:  name: sample-job  labels:    app: flink-jobspec:  template:    spec:      # Run as flink user      securityContext:        runAsUser:         runAsGroup:       containers:      - name: wordcount        # Replace this to be a Docker image with your built Flink app at a known location        # Your build of Flink should be based on https://github.com/apache/flink-docker/tree/master/1.11/scala_2.12-java8-debian        # with a modification to the Dockerfile to add your jar in (with a COPY)        image: adamroberts/mycoolflink:latest        - /opt/flink/bin/flink        - run        - -D security.ssl.internal.enabled=true        - -D security.ssl.rest.enabled=true        - -D security.ssl.rest.keystore=/etc/flink-secrets/rest.keystore        - -D security.ssl.rest.truststore=/etc/flink-secrets/rest.truststore        - -D security.ssl.rest.keystore-password=rest_keystore_password        - -D security.ssl.rest.key-password=rest_keystore_password        - -D security.ssl.rest.truststore-password=rest_truststore_password        - -D security.ssl.internal.keystore=/etc/flink-secrets/internal.keystore        - -D security.ssl.internal.truststore=/etc/flink-secrets/internal.keystore        - -D security.ssl.internal.keystore-password=internal_store_password        - -D security.ssl.internal.key-password=internal_store_password        - -D security.ssl.internal.truststore-password=internal_store_password        - -m        - tls-flink-cluster-1-11-jobmanager:8081        - /opt/flink/examples/batch/WordCount.jar         - --input         - /opt/flink/NOTICE        volumeMounts:          - name: flink-secret-volume            mountPath: /etc/flink-secrets      volumes:      - name: flink-secret-volume        secret:          secretName: flink-tls-secret-2      restartPolicy: Never
 
If I modify that to be a simple curl image but keeping the secrets mounted in, I can kubectl exec in and curl the JobManager at  tls-flink-cluster-1-11-jobmanager:8081 - I get no response, but I get an error if I go to a different port or URL.
 
The secrets do look ok inside the container too.
 
The Cluster spec looks like this now
 
apiVersion: flinkoperator.k8s.io/v1beta1kind: FlinkClustermetadata:  name: tls-flink-cluster-1-11spec:  jobManager:    volumeMounts:    - name: flink-secret-volume      mountPath: /etc/flink-secrets    volumes:    - name: flink-secret-volume      secret:        secretName: flink-tls-secret-2    resources:      limits:        memory: 600Mi        cpu: "1.0"  taskManager:    volumeMounts:      - name: flink-secret-volume        mountPath: /etc/flink-secrets    volumes:    - name: flink-secret-volume      secret:        secretName: flink-tls-secret-2    replicas: 1    resources:      limits:        memory: 1Gi        cpu: "1.0"  image:    name: adamroberts/mycoolflink:latest  flinkProperties:    # https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html is helpful for this part.    web.submit.enable: "false"
    security.ssl.rest.enabled: "true"
    security.ssl.rest.keystore: "/etc/flink-secrets/rest.keystore"    security.ssl.rest.truststore: "/etc/flink-secrets/rest.truststore"    security.ssl.rest.keystore-password: "rest

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Kurt Young
Congratulations Dian!

Best,
Kurt


On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:

> Congratulations Dian!
>
> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei  wrote:
>
>> Congrats!
>>
>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>>
>>> Congratulations Dian!
>>>
>>> Best,
>>> Xingbo
>>>
>>> jincheng sun  于2020年8月27日周四 下午5:24写道:
>>>
 Hi all,

 On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
 part of the Apache Flink Project Management Committee (PMC).

 Dian Fu has been very active on PyFlink component, working on various
 important features, such as the Python UDF and Pandas integration, and
 keeps checking and voting for our releases, and also has successfully
 produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
 forward the release of Flink 1.12.

 Please join me in congratulating Dian Fu for becoming a Flink PMC
 Member!

 Best,
 Jincheng(on behalf of the Flink PMC)

>>>
>
> --
> Best regards!
> Rui Li
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Rui Li
Congratulations Dian!

On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei  wrote:

> Congrats!
>
> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
>> Congratulations Dian!
>>
>> Best,
>> Xingbo
>>
>> jincheng sun  于2020年8月27日周四 下午5:24写道:
>>
>>> Hi all,
>>>
>>> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>>
>>> Dian Fu has been very active on PyFlink component, working on various
>>> important features, such as the Python UDF and Pandas integration, and
>>> keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
>>> forward the release of Flink 1.12.
>>>
>>> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>>>
>>> Best,
>>> Jincheng(on behalf of the Flink PMC)
>>>
>>

-- 
Best regards!
Rui Li


Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Yangze Guo
Congrats Dian!

Best,
Yangze Guo

On Thu, Aug 27, 2020 at 6:26 PM Zhu Zhu  wrote:
>
> Congratulations Dian!
>
> Thanks,
> Zhu
>
> Zhijiang  于2020年8月27日周四 下午6:04写道:
>
> > Congrats, Dian!
> >
> > --
> > From:Yun Gao 
> > Send Time:2020年8月27日(星期四) 17:44
> > To:dev ; Dian Fu ; user <
> > user@flink.apache.org>; user-zh 
> > Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
> >
> > Congratulations Dian !
> >
> >  Best
> >  Yun
> >
> >
> > --
> > Sender:Marta Paes Moreira
> > Date:2020/08/27 17:42:34
> > Recipient:Yuan Mei
> > Cc:Xingbo Huang; jincheng sun > >; dev; Dian Fu; user<
> > user@flink.apache.org>; user-zh
> > Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
> >
> > Congrats, Dian!
> > On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
> >
> > Congrats!
> > On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
> >
> > Congratulations Dian!
> >
> > Best,
> > Xingbo
> > jincheng sun  于2020年8月27日周四 下午5:24写道:
> >
> > Hi all,
> >
> >
> > On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part 
> > of the Apache Flink Project Management Committee (PMC).
> >
> >
> > Dian Fu has been very active on PyFlink component, working on various 
> > important features, such as the Python UDF and Pandas integration, and 
> > keeps checking and voting for our releases, and also has successfully 
> > produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push 
> > forward the release of Flink 1.12.
> >
> > Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
> >
> > Best,
> > Jincheng(on behalf of the Flink PMC)
> >
> >
> >


Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Zhu Zhu
Congratulations Dian!

Thanks,
Zhu

Zhijiang  于2020年8月27日周四 下午6:04写道:

> Congrats, Dian!
>
> --
> From:Yun Gao 
> Send Time:2020年8月27日(星期四) 17:44
> To:dev ; Dian Fu ; user <
> user@flink.apache.org>; user-zh 
> Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
>
> Congratulations Dian !
>
>  Best
>  Yun
>
>
> --
> Sender:Marta Paes Moreira
> Date:2020/08/27 17:42:34
> Recipient:Yuan Mei
> Cc:Xingbo Huang; jincheng sun >; dev; Dian Fu; user<
> user@flink.apache.org>; user-zh
> Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
>
> Congrats, Dian!
> On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
>
> Congrats!
> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
> Congratulations Dian!
>
> Best,
> Xingbo
> jincheng sun  于2020年8月27日周四 下午5:24写道:
>
> Hi all,
>
>
> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
> the Apache Flink Project Management Committee (PMC).
>
>
> Dian Fu has been very active on PyFlink component, working on various 
> important features, such as the Python UDF and Pandas integration, and keeps 
> checking and voting for our releases, and also has successfully produced two 
> releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
> release of Flink 1.12.
>
> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
> Best,
> Jincheng(on behalf of the Flink PMC)
>
>
>


Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Zhijiang
Congrats, Dian!


--
From:Yun Gao 
Send Time:2020年8月27日(星期四) 17:44
To:dev ; Dian Fu ; user 
; user-zh 
Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

Congratulations Dian !

 Best
 Yun


--
Sender:Marta Paes Moreira
Date:2020/08/27 17:42:34
Recipient:Yuan Mei
Cc:Xingbo Huang; jincheng sun; 
dev; Dian Fu; 
user; user-zh
Theme:Re: [ANNOUNCE] New PMC member: Dian Fu

Congrats, Dian!
On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:

Congrats!
On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:

Congratulations Dian!

Best,
Xingbo
jincheng sun  于2020年8月27日周四 下午5:24写道:

Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various important 
features, such as the Python UDF and Pandas integration, and keeps checking and 
voting for our releases, and also has successfully produced two 
releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)



Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Yun Gao
Congratulations Dian !

 Best
 Yun


--
Sender:Marta Paes Moreira
Date:2020/08/27 17:42:34
Recipient:Yuan Mei
Cc:Xingbo Huang; jincheng sun; 
dev; Dian Fu; 
user; user-zh
Theme:Re: [ANNOUNCE] New PMC member: Dian Fu

Congrats, Dian!
On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:

Congrats!
On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:

Congratulations Dian!

Best,
Xingbo
jincheng sun  于2020年8月27日周四 下午5:24写道:

Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various important 
features, such as the Python UDF and Pandas integration, and keeps checking and 
voting for our releases, and also has successfully produced two 
releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Paul Lam
Congrats, Dian!

Best,
Paul Lam

> 2020年8月27日 17:42,Marta Paes Moreira  写道:
> 
> Congrats, Dian!
> 
> On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  > wrote:
> Congrats!
> 
> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  > wrote:
> Congratulations Dian!
> 
> Best,
> Xingbo
> 
> jincheng sun mailto:sunjincheng...@gmail.com>> 
> 于2020年8月27日周四 下午5:24写道:
> Hi all,
> 
> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
> the Apache Flink Project Management Committee (PMC).
> 
> Dian Fu has been very active on PyFlink component, working on various 
> important features, such as the Python UDF and Pandas integration, and keeps 
> checking and voting for our releases, and also has successfully produced two 
> releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
> release of Flink 1.12.
> 
> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
> 
> Best,
> Jincheng(on behalf of the Flink PMC)



Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Marta Paes Moreira
Congrats, Dian!

On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:

> Congrats!
>
> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
>> Congratulations Dian!
>>
>> Best,
>> Xingbo
>>
>> jincheng sun  于2020年8月27日周四 下午5:24写道:
>>
>>> Hi all,
>>>
>>> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>>
>>> Dian Fu has been very active on PyFlink component, working on various
>>> important features, such as the Python UDF and Pandas integration, and
>>> keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
>>> forward the release of Flink 1.12.
>>>
>>> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>>>
>>> Best,
>>> Jincheng(on behalf of the Flink PMC)
>>>
>>


Re: [DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-27 Thread Paul Lam
Hi,

I think it’s okay, given that we can either migrate to the universal connector 
or still use the compatible 0.10/0.11 connector of 1.11 release as Chesnay 
mentioned when upgrading to 1.12.

IIUC, the migration process to the universal connector would be (please correct 
me if I’m wrong):
1. Stop the job with a savepoint, committing the offset to Kafka brokers.
2. Modify user code, migrate to he universal connector, and change the source 
operator id to discard the old connector states.
3. Start the job with the savepoint, and read Kafka from group offsets.

Best,
Paul Lam

> 2020年8月27日 16:27,Aljoscha Krettek  写道:
> 
> @Konstantin: Yes, I'm talking about dropping those modules. We don't have any 
> special code for supporting Kafka 0.10/0.11 in the "modern" connector, that 
> comes from the Kafka Consumer/Producer code we're using.
> 
> @Paul: The modern Kafka connector works with Kafka brokers as far back as 
> 0.10, would that be enough or do you still think we should have the actual 
> Kafka 0.10 Consumer code in Flink as well.
> 
> Best,
> Aljoscha
> 
> On 25.08.20 23:15, Chesnay Schepler wrote:
>> +1 to remove both the 1.10 and 1.11 connectors.
>> The connectors have not been actively developed for some time. They are 
>> basically just sitting around causing noise by causing test instabilities 
>> and eating CI time.
>> It would  also allow us to really simplify the module structure of the Kafka 
>> connectors.
>> Users may continue to use the 1.11 version of the connectors with future 
>> Flink versions, and we may even provide critical bug fixes in a 1.11 bugfix 
>> release (albeit unlikely).
>> While ultimately this is a separate topic I would also be in favor of 
>> removing any migration paths we have from 0.11 to the universal connector;
>> as these are already present in 1.11 users may migrate to the universal 
>> connector before jumping to Flink 1.12+.
>> On 25/08/2020 18:49, Konstantin Knauf wrote:
>>> Hi Aljoscha,
>>> 
>>> I am assuming you're asking about dropping the 
>>> flink-connector-kafka-0.10/0.11 modules, right? Or are you talking about 
>>> removing support for Kafka 0.10/0.11 from the universal connector?
>>> 
>>> I am in favor of removing flink-connector-kafka-0.10/0.11 in the next 
>>> release. These modules would still be available in Flink 1.11- as a 
>>> reference, and could be used with Flink 1.12+ with small or no 
>>> modifications. To my knowledge, you also use the universal Kafka connector 
>>> with 0.10 brokers, but there might be a performance penalty if I remember 
>>> correctly. In general, I find it important to continuously reduce baggage 
>>> that accumulates over time and this seems like a good opportunity.
>>> 
>>> Best,
>>> 
>>> Konstantin
>>> 
>>> On Tue, Aug 25, 2020 at 4:59 AM Paul Lam >>  >> >> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> I'm lightly leaning towards keeping the 0.10 connector, for Kafka
>>> 0.10 still has a steady user base in my observation.
>>> 
>>> But if we drop 0.10 connector, can we ensure the users would be
>>> able to smoothly migrate to 0.11 connector/universal connector?
>>> 
>>> If I remember correctly, the universal connector is compatible
>>> with 0.10 brokers, but I want to double check that.
>>> 
>>> Best,
>>> Paul Lam
>>> 
 2020年8月24日 22:46,Aljoscha Krettek >>> 
 >> 写道:
 
 Hi all,
 
 this thought came up on FLINK-17260 [1] but I think it would be a
 good idea in general. The issue reminded us that Kafka didn't
 have an idempotent/fault-tolerant Producer before Kafka 0.11.0.
 By now we have had the "modern" Kafka connector that roughly
 follows new Kafka releases for a while and this one supports
 Kafka cluster versions as far back as 0.10.2.0 (I believe).
 
 What are your thoughts on removing support for older Kafka
 versions? And yes, I know that we had multiple discussions like
 this in the past but I'm trying to gauge the current sentiment.
 
 I'm cross-posting to the user-ml since this is important for both
 users and developers.
 
 Best,
 Aljoscha
 
 [1] https://issues.apache.org/jira/browse/FLINK-17260
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> Konstantin Knauf
>>> 
>>> https://twitter.com/snntrable
>>> 
>>> https://github.com/knaufk



Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Yuan Mei
Congrats!

On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:

> Congratulations Dian!
>
> Best,
> Xingbo
>
> jincheng sun  于2020年8月27日周四 下午5:24写道:
>
>> Hi all,
>>
>> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
>> part of the Apache Flink Project Management Committee (PMC).
>>
>> Dian Fu has been very active on PyFlink component, working on various
>> important features, such as the Python UDF and Pandas integration, and
>> keeps checking and voting for our releases, and also has successfully
>> produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
>> forward the release of Flink 1.12.
>>
>> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>>
>> Best,
>> Jincheng(on behalf of the Flink PMC)
>>
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Xingbo Huang
Congratulations Dian!

Best,
Xingbo

jincheng sun  于2020年8月27日周四 下午5:24写道:

> Hi all,
>
> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part
> of the Apache Flink Project Management Committee (PMC).
>
> Dian Fu has been very active on PyFlink component, working on various
> important features, such as the Python UDF and Pandas integration, and
> keeps checking and voting for our releases, and also has successfully
> produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
> forward the release of Flink 1.12.
>
> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
> Best,
> Jincheng(on behalf of the Flink PMC)
>


[ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread jincheng sun
Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part
of the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various
important features, such as the Python UDF and Pandas integration, and
keeps checking and voting for our releases, and also has successfully
produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
forward the release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)


Re: [DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-27 Thread Aljoscha Krettek
@Konstantin: Yes, I'm talking about dropping those modules. We don't 
have any special code for supporting Kafka 0.10/0.11 in the "modern" 
connector, that comes from the Kafka Consumer/Producer code we're using.


@Paul: The modern Kafka connector works with Kafka brokers as far back 
as 0.10, would that be enough or do you still think we should have the 
actual Kafka 0.10 Consumer code in Flink as well.


Best,
Aljoscha

On 25.08.20 23:15, Chesnay Schepler wrote:

+1 to remove both the 1.10 and 1.11 connectors.

The connectors have not been actively developed for some time. They are 
basically just sitting around causing noise by causing test 
instabilities and eating CI time.
It would  also allow us to really simplify the module structure of the 
Kafka connectors.


Users may continue to use the 1.11 version of the connectors with future 
Flink versions, and we may even provide critical bug fixes in a 1.11 
bugfix release (albeit unlikely).


While ultimately this is a separate topic I would also be in favor of 
removing any migration paths we have from 0.11 to the universal connector;
as these are already present in 1.11 users may migrate to the universal 
connector before jumping to Flink 1.12+.


On 25/08/2020 18:49, Konstantin Knauf wrote:

Hi Aljoscha,

I am assuming you're asking about dropping the 
flink-connector-kafka-0.10/0.11 modules, right? Or are you talking 
about removing support for Kafka 0.10/0.11 from the universal connector?


I am in favor of removing flink-connector-kafka-0.10/0.11 in the next 
release. These modules would still be available in Flink 1.11- as a 
reference, and could be used with Flink 1.12+ with small or no 
modifications. To my knowledge, you also use the universal Kafka 
connector with 0.10 brokers, but there might be a performance 
penalty if I remember correctly. In general, I find it important 
to continuously reduce baggage that accumulates over time and this 
seems like a good opportunity.


Best,

Konstantin

On Tue, Aug 25, 2020 at 4:59 AM Paul Lam > wrote:


    Hi Aljoscha,

    I'm lightly leaning towards keeping the 0.10 connector, for Kafka
    0.10 still has a steady user base in my observation.

    But if we drop 0.10 connector, can we ensure the users would be
    able to smoothly migrate to 0.11 connector/universal connector?

    If I remember correctly, the universal connector is compatible
    with 0.10 brokers, but I want to double check that.

    Best,
    Paul Lam


    2020年8月24日 22:46,Aljoscha Krettek mailto:aljos...@apache.org>> 写道:

    Hi all,

    this thought came up on FLINK-17260 [1] but I think it would be a
    good idea in general. The issue reminded us that Kafka didn't
    have an idempotent/fault-tolerant Producer before Kafka 0.11.0.
    By now we have had the "modern" Kafka connector that roughly
    follows new Kafka releases for a while and this one supports
    Kafka cluster versions as far back as 0.10.2.0 (I believe).

    What are your thoughts on removing support for older Kafka
    versions? And yes, I know that we had multiple discussions like
    this in the past but I'm trying to gauge the current sentiment.

    I'm cross-posting to the user-ml since this is important for both
    users and developers.

    Best,
    Aljoscha

    [1] https://issues.apache.org/jira/browse/FLINK-17260




--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk








Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-27 Thread Zhijiang
Congrats, thanks for the release manager work Zhu Zhu and everyone involved in!

Best,
Zhijiang
--
From:liupengcheng 
Send Time:2020年8月26日(星期三) 19:37
To:dev ; Xingbo Huang 
Cc:Guowei Ma ; user-zh ; Yangze 
Guo ; Dian Fu ; Zhu Zhu 
; user 
Subject:Re: [ANNOUNCE] Apache Flink 1.10.2 released

Thanks ZhuZhu for managing this release and everyone who contributed to this.

Best,
Pengcheng

 在 2020/8/26 下午7:06,“Congxian Qiu” 写入:

Thanks ZhuZhu for managing this release and everyone else who contributed
to this release!

Best,
Congxian


Xingbo Huang  于2020年8月26日周三 下午1:53写道:

> Thanks Zhu for the great work and everyone who contributed to this 
release!
>
> Best,
> Xingbo
>
> Guowei Ma  于2020年8月26日周三 下午12:43写道:
>
>> Hi,
>>
>> Thanks a lot for being the release manager Zhu Zhu!
>> Thanks everyone contributed to this!
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:
>>
>>> Thanks for Zhu's work to manage this release and everyone who
>>> contributed to this!
>>>
>>> Best,
>>> Yun Tang
>>> 
>>> From: Yangze Guo 
>>> Sent: Tuesday, August 25, 2020 14:47
>>> To: Dian Fu 
>>> Cc: Zhu Zhu ; dev ; user <
>>> user@flink.apache.org>; user-zh 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>>>
>>> Thanks a lot for being the release manager Zhu Zhu!
>>> Congrats to all others who have contributed to the release!
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>>> >
>>> > Thanks ZhuZhu for managing this release and everyone else who
>>> contributed to this release!
>>> >
>>> > Regards,
>>> > Dian
>>> >
>>> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>>> >
>>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
>>> all others who have contributed to the release!
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.10.2, which is the first bugfix release for the Apache 
Flink
>>> 1.10 series.
>>> >>
>>> >> Apache Flink(r) is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data 
streaming
>>> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347791
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>> >>
>>> >> Thanks,
>>> >> Zhu
>>> >
>>> >
>>>
>>