Re:Re: Questions of "State Processing API in Scala"

2020-08-31 Thread izual
I tried to fix the small mistake of sample code in State-Processor-API doc[1], 
could someone do a doc review[2] for me, thank you.


1: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state
2: https://github.com/apache/flink/pull/13266





At 2020-01-21 15:54:56, "Tzu-Li (Gordon) Tai"  wrote:
>Hi Izual,
>
>Thanks for reporting this! I'm also forwarding this to the user mailing
>list, as that is the more suitable place for this question.
>
>I think the usability of the State Processor API in Scala is indeed
>something that hasn’t been looked at closely yet.
>
>On Tue, Jan 21, 2020 at 8:12 AM izual  wrote:
>
>> Hi community,
>>
>> When I use state in Scala, something makes confused, I followed these
>> steps to generate and read states:
>>
>> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
>> and run jobA => that makes good.
>>
>> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
>> expected.
>>
>> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
>> below), and run jobB => failed, exceptions shows that "Caused by:
>> org.apache.flink.util.StateMigrationException: The new key serializer must
>> be compatible."
>>
>>
>> ReaderFunction code as below:
>>
>> ```
>>
>>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
>> Long)] {
>>
>> var countState: ValueState[(Long, Long)] = _
>>
>> override def open(parameters: Configuration): Unit = {
>>
>>   val stateDescriptor = new ValueStateDescriptor("average",
>> createTypeInformation[(Long, Long)])
>>
>>   countState = getRuntimeContext().getState(stateDescriptor)
>>
>> }
>>
>> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
>> out: Collector[(Long, Long)]): Unit = {
>>
>>   out.collect(countState.value())
>>
>> }
>>
>>   }
>>
>> ```
>>
>> d. then I try to use java.lang.Long instead of Long in key-type, and run
>> jobB => exception just disappeared and that makes good.
>>
>> This makes me confused. Did I miss some features in State-Processing-API,
>> such as `magic-implicits`?
>>
>
>This part is explainable. The "magic-implicits" actually happen in the
>DataStream Scala API.
>Any primitive Scala types will inferred and serialized as their Java
>counterparts.
>AFAIK, this would not happen in the State Processor API yet and therefore
>why you are getting the StateMigrationException.
>When using Scala types directly with the State Processor API, I would guess
>that Kryo (as a generic fallback) was being used to access state.
>This can probably be confirmed by looking at the exception stack trace. Can
>you post a full copy of that?
>
>This should be resolvable by properly supporting Scala for the State
>Processor API, but it's just that up to this point, we didn't have a plan
>for that yet.
>Can you open a JIRA for this? I think it'll be a reasonable extension to
>the API.
>
>
>>
>> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
>> again,this time I tried to use Tuple(java.lang.Long) or something else, but
>> does not work.
>>
>
>I'm not sure what you mean here. Where is this keyBy happening? In the
>Scala DataStream job, or the State Processor API?
>
>
>>
>> Hope.
>>
>> 1:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>>
>> 2:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>
>
>Cheers,
>Gordon


Re: SAX2 driver class org.apache.xerces.parsers.SAXParser not found

2020-08-31 Thread Averell
Hello Robert, Arvid,

As I am running on EMR, and currently AWS only supports version 1.10.
I tried both solutions that you suggested ((i) copying a SAXParser
implementation to the plugins folder and (ii) using the S3FS Plugin from
1.10.1), and both worked - I could have successful checkpoints.

However, intermittenly my checkpoints still fail (about 10%). And whenever
it fails, there are non-completed files left in S3 (attached screenshot
below).
I'm not sure whether those uncompleted files are expected, or is that a bug?

Thanks and regards,
Averell

 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Exception on s3 committer

2020-08-31 Thread Ivan Yang
Hi Yun,

Thank you so much for you suggestion.

(1) The job couldn’t restore from the last checkpoint. The exception is in my 
original email.
(2) No, I didn’t change any multipart upload settings. 
(3) The file is gone. I have another batch process that reads Flink output s3 
bucket and pushes objects to another bucket. Upon success read and write, The 
batch job will delete the file. What’s puzzling me is if Flink hasn’t 
successfully commit the multipart file, it should not be visible to the batch 
job. It looks the situation is while Flink tried to commit the multipart file, 
it crashed and restarted. The file is committed on s3 successfully, but not 
acknowledge recorded on Flink side. In between, the batch job consumed the 
file. I don’t know if that’s possible.

Thanks
Ivan

> On Aug 30, 2020, at 11:10 PM, Yun Gao  wrote:
> 
> 
> Hi Ivan,
> 
>I think there might be some points to check:
> 
>1. Is the job restored from the latest successful checkpoint after restart 
> ? 
>2. Have you ever changed the timeout settings for uncompleted multipart 
> upload ?
>3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 
> exist or not ?
> 
> Best,
>  Yun
> 
> --Original Mail --
> Sender:Ivan Yang 
> Send Date:Sat Aug 29 12:43:28 2020
> Recipients:user 
> Subject:Exception on s3 committer
> Hi all,
> 
> We got this exception after a job restart. Does anyone know what may lead to 
> this situation? and how to get pass this Checkpoint issue? Prior to this, the 
> job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
> writing out 10K files to s3 every 10 minutes using 
> StreamingFileSink/BulkFormat to various s3 prefixes. Thanks in advance. -Ivan
> 
> 2020-08-28 15:17:58
> java.io .IOException: Recovering commit failed for object 
> cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object 
> does not exist and MultiPart Upload 
> 3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
>  is not valid.
> at 
> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io .F

Re: Debezium Flink EMR

2020-08-31 Thread Marta Paes Moreira
Hey, Rex!

This is likely due to the tombstone records that Debezium produces for
DELETE operations (i.e. a record with the same key as the deleted row and a
value of null). These are markers for Kafka to indicate that log compaction
can remove all records for the given key, and the initial implementation of
the debezium-format can't handle them. This issue is already documented
(and solved) in [1].

In the meantime, can you try adding "tombstones.on.delete":false" to the
configuration of your Debezium MySQL connector? Marta
[1] https://issues.apache.org/jira/browse/FLINK-18705

On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley  wrote:

> Hi, getting so close but ran into another issue:
>
> Flink successfully reads changes from Debezium/Kafka and writes them to
> Elasticsearch, but there's a problem with deletions. When I DELETE a row
> from MySQL the deletion makes it successfully all the way to Elasticsearch
> which is great, but then the taskmanager suddenly dies with a null pointer
> exception. Inserts and Updates do not have the same problem. This seems
> very odd. Any help would be much appreciated. Thanks!
>
> flink-taskmanager_1| 2020-08-31 23:30:33,684 WARN
>  org.apache.flink.runtime.taskmanager.Task[] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
> flink-taskmanager_1| java.lang.NullPointerException: null
> flink-taskmanager_1| at java.lang.String.(String.java:566)
> ~[?:1.8.0_265]
> flink-taskmanager_1| at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
> flink-taskmanager_1| at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1| at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1| at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> flink-taskmanager_1| 2020-08-31 23:30:33,720 INFO
>  org.apache.flink.runtime.taskmanager.Task[] - Freeing
> task resources for Source: TableSourceScan(table=[[default_catalog,
> default_database, topic_addresses]], fields=[id, customer_id, street, city,
> state, zip, type]) -> Sink:
> Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
> customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8).
> flink-taskmanager_1| 2020-08-31 23:30:33,728 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Source: TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> 2b79917cb528f37fad7f636740d2fdd8.
> flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> topic_addresses]], fields=[id, customer_id, street, city, state, zip,
> type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
> fields=[id, customer_id, street, city, state, zip, type]) (1/2)
> (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
> flink-jobmanager_1 | java.lang.NullPointerException: null
> flink-jobmanager_1 | at java.lang.String.(String.java:566)
> ~[?:1.8.0_265]
> flink-jobmanager

Re: Debezium Flink EMR

2020-08-31 Thread Rex Fenley
Hi, getting so close but ran into another issue:

Flink successfully reads changes from Debezium/Kafka and writes them to
Elasticsearch, but there's a problem with deletions. When I DELETE a row
from MySQL the deletion makes it successfully all the way to Elasticsearch
which is great, but then the taskmanager suddenly dies with a null pointer
exception. Inserts and Updates do not have the same problem. This seems
very odd. Any help would be much appreciated. Thanks!

flink-taskmanager_1| 2020-08-31 23:30:33,684 WARN
 org.apache.flink.runtime.taskmanager.Task[] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
flink-taskmanager_1| java.lang.NullPointerException: null
flink-taskmanager_1| at java.lang.String.(String.java:566)
~[?:1.8.0_265]
flink-taskmanager_1| at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1| at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1| at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1| at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1| 2020-08-31 23:30:33,720 INFO
 org.apache.flink.runtime.taskmanager.Task[] - Freeing
task resources for Source: TableSourceScan(table=[[default_catalog,
default_database, topic_addresses]], fields=[id, customer_id, street, city,
state, zip, type]) -> Sink:
Sink(table=[default_catalog.default_database.ESAddresses], fields=[id,
customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8).
flink-taskmanager_1| 2020-08-31 23:30:33,728 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Un-registering task and sending final execution state FAILED to JobManager
for task Source: TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
2b79917cb528f37fad7f636740d2fdd8.
flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
TableSourceScan(table=[[default_catalog, default_database,
topic_addresses]], fields=[id, customer_id, street, city, state, zip,
type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses],
fields=[id, customer_id, street, city, state, zip, type]) (1/2)
(2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
flink-jobmanager_1 | java.lang.NullPointerException: null
flink-jobmanager_1 | at java.lang.String.(String.java:566)
~[?:1.8.0_265]
flink-jobmanager_1 | at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[?:?]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[?:?]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[?:?]
flink-jobmanager_1 | at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~

Re: Flink SQL Streaming Join Creates Duplicates

2020-08-31 Thread Austin Cawley-Edwards
Hey Arvid,

Yes, I was able to self-answer this one. Was just confused on the
non-deterministic behavior of the FULL OUTER join statement. Thinking
through it and took a harder read through the Dynamic Tables doc section[1]
where "Result Updating" is hinted at, and the behavior makes total sense in
a streaming env.

Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/dynamic_tables.html

On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise  wrote:

> Hi Austin,
>
> Do I assume correctly, that you self-answered your question? If not, could
> you please update your current progress?
>
> Best,
>
> Arvid
>
> On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Ah, I think the "Result Updating" is what got me -- INNER joins do the
>> job!
>>
>> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> oops, the example query should actually be:
>>>
>>> SELECT table_1.a, table_1.b, table_2.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> and duplicate results should actually be:
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = "data b 1", c = null)
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = "data b 2", c = null)
>>>
>>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey all,

 I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
 reading from a few CSV files and joins some records across them into a
 couple of data streams (yes, this could be a batch job won't get into why
 we chose streams unless it's relevant). These joins are producing some
 duplicate records, one with the joined field present and one with the
 joined field as `null`, though this happens only ~25% of the time. Reading
 the docs on joins[1], I thought this could be caused by too strict Idle
 State Retention[2], so I increased that to min, max (15min, 24h) but that
 doesn't seem to have an effect, and the problem still occurs when testing
 on a subset of data that finishes processing in under a minute.

 The query roughly looks like:

 table_1 has fields a, b
 table_2 has fields b, c

 SELECT table_1.a, table_1.b, table_1.c
 FROM table_1
 LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

 Correct result:
 Record(a = "data a 1", b = "data b 1", c = "data c 1")
 Record(a = "data a 2", b = "data b 2", c = "data c 2")

 Results seem to be anywhere between all possible dups and the correct
 result.

 Record(a = "data a 1", b = "data b 1", c = "data c 1")
 Record(a = "data a 1", b = null, c = "data c 1")
 Record(a = "data a 2", b = "data b 2", c = "data c 2")
 Record(a = "data a 2", b = null, c = "data c 2")

 The CSV files are registered as Flink Tables with the following:

 tableEnv.connect(
 new FileSystem()
 .path(path)
 )
 .withFormat(
 new Csv()
 .quoteCharacter('"')
 .ignoreParseErrors()
 )
 .withSchema(schema)
 .inAppendMode()
 .createTemporaryTable(tableName);


 I'm creating my table environment like so:

 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .build();

 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
 tableEnvSettings);

 TableConfig tConfig = tEnv.getConfig();
 tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));


 Is there something I'm misconfiguring or have misunderstood the docs?

 Thanks,
 Austin

 [1]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
 [2]:
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time

>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Debezium Flink EMR

2020-08-31 Thread Rex Fenley
Ah, my bad, thanks for pointing that out Arvid!

On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise  wrote:

> Hi Rex,
>
> you still forgot
>
> 'debezium-json.schema-include' = true
>
> Please reread my mail.
>
>
> On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley  wrote:
>
>> Thanks for the input, though I've certainly included a schema as is
>> reflected earlier in this thread. Including here again
>> ...
>> tableEnv.executeSql("""
>> CREATE TABLE topic_addresses (
>> -- schema is totally the same to the MySQL "addresses" table
>> id INT,
>> customer_id INT,
>> street STRING,
>> city STRING,
>> state STRING,
>> zip STRING,
>> type STRING,
>> PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'dbserver1.inventory.addresses',
>> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
>> 'properties.group.id' = 'testGroup',
>> 'format' = 'debezium-json' -- using debezium-json as the format
>> )
>> """)
>>
>> val table = tableEnv.from("topic_addresses").select($"*")
>> ...
>>
>> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> the connector expects a value without a schema, but the message contains
>>> a schema. You can tell Flink that the schema is included as written in the
>>> documentation [1].
>>>
>>> CREATE TABLE topic_products (
>>>   -- schema is totally the same to the MySQL "products" table
>>>   id BIGINT,
>>>   name STRING,
>>>   description STRING,
>>>   weight DECIMAL(10, 2)) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'products_binlog',
>>>  'properties.bootstrap.servers' = 'localhost:9092',
>>>  'properties.group.id' = 'testGroup',
>>>  'format' = 'debezium-json',
>>>  'debezium-json.schema-include' = true)
>>>
>>> @Jark Wu  , it would be probably good to make the
>>> connector more robust and catch these types of misconfigurations.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>>
>>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:
>>>
 Awesome, so that took me a step further. When running i'm receiving an
 error however. FYI, my docker-compose file is based on the Debezium mysql
 tutorial which can be found here
 https://debezium.io/documentation/reference/1.2/tutorial.html

 Part of the stack trace:

 flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt
 Debezium JSON message
 '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
 co

Re: Debezium Flink EMR

2020-08-31 Thread Arvid Heise
Hi Rex,

you still forgot

'debezium-json.schema-include' = true

Please reread my mail.


On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley  wrote:

> Thanks for the input, though I've certainly included a schema as is
> reflected earlier in this thread. Including here again
> ...
> tableEnv.executeSql("""
> CREATE TABLE topic_addresses (
> -- schema is totally the same to the MySQL "addresses" table
> id INT,
> customer_id INT,
> street STRING,
> city STRING,
> state STRING,
> zip STRING,
> type STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'dbserver1.inventory.addresses',
> 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'debezium-json' -- using debezium-json as the format
> )
> """)
>
> val table = tableEnv.from("topic_addresses").select($"*")
> ...
>
> On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> the connector expects a value without a schema, but the message contains
>> a schema. You can tell Flink that the schema is included as written in the
>> documentation [1].
>>
>> CREATE TABLE topic_products (
>>   -- schema is totally the same to the MySQL "products" table
>>   id BIGINT,
>>   name STRING,
>>   description STRING,
>>   weight DECIMAL(10, 2)) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'products_binlog',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'testGroup',
>>  'format' = 'debezium-json',
>>  'debezium-json.schema-include' = true)
>>
>> @Jark Wu  , it would be probably good to make the
>> connector more robust and catch these types of misconfigurations.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>>
>> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:
>>
>>> Awesome, so that took me a step further. When running i'm receiving an
>>> error however. FYI, my docker-compose file is based on the Debezium mysql
>>> tutorial which can be found here
>>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>>
>>> Part of the stack trace:
>>>
>>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt
>>> Debezium JSON message
>>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>>> cool street","city":"Big
>>> City","state":"California","zip":"9","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"sna

Re: Debezium Flink EMR

2020-08-31 Thread Rex Fenley
Thanks for the input, though I've certainly included a schema as is
reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise  wrote:

> Hi Rex,
>
> the connector expects a value without a schema, but the message contains a
> schema. You can tell Flink that the schema is included as written in the
> documentation [1].
>
> CREATE TABLE topic_products (
>   -- schema is totally the same to the MySQL "products" table
>   id BIGINT,
>   name STRING,
>   description STRING,
>   weight DECIMAL(10, 2)) WITH (
>  'connector' = 'kafka',
>  'topic' = 'products_binlog',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'debezium-json',
>  'debezium-json.schema-include' = true)
>
> @Jark Wu  , it would be probably good to make the
> connector more robust and catch these types of misconfigurations.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format
>
> On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:
>
>> Awesome, so that took me a step further. When running i'm receiving an
>> error however. FYI, my docker-compose file is based on the Debezium mysql
>> tutorial which can be found here
>> https://debezium.io/documentation/reference/1.2/tutorial.html
>>
>> Part of the stack trace:
>>
>> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt Debezium
>> JSON message
>> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
>> cool street","city":"Big
>> City","state":"California","zip":"9","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.10","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
>> flink-jobmanager_1 | a

Editing Rowtime for SQL Table

2020-08-31 Thread Satyam Shekhar
Hello,

I use Flink for continuous evaluation of SQL queries on streaming data. One
of the use cases requires us to run recursive SQL queries. I am unable to
find a way to edit rowtime time attribute of the intermediate result table.

For example, let's assume that there is a table T0 with schema -
root
 |-- str1: STRING
 |-- int1: BIGINT
 |-- utime: TIMESTAMP(3)
 |-- itime: TIMESTAMP(3) *ROWTIME*

Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");

I wish to change the rowtime of V0 from itime to utime. I tried doing -

V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

but ran into the following exception -

org.apache.flink.table.api.ValidationException: Window properties can only
be used on windowed tables.
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
~[flink-table-api-java-1.11.1.jar:1.11.1]

Any guidance on how to address this?

Regards,
Satyam


Re: FileSystemHaServices and BlobStore

2020-08-31 Thread Khachatryan Roman
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun  wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> --
> *From:* Alexey Trenikhun 
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> --
> *From:* Khachatryan Roman 
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun  wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use 
> FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>


[ANNOUNCE] Weekly Community Update 2020/35

2020-08-31 Thread Konstantin Knauf
Dear community,

happy to share a brief community update for the past week with configurable
memory sharing between Flink and its Python "side car", stateful Python
UDFs, an introduction of our GSoD participants and a little bit more.

Flink Development
==

* [datastream api] Dawid has started a vote to remove DataStream#fold and
DataStream#split (both already deprecated) in Flink 1.12. [1]

* [runtime] Xintong has started a discussion thread for "Intra-Slot
Management Memory Sharing", which lets users configure the fraction of
managed memory that should be used by Flink internally (RocksDB or Batch
algorithms) on the one side and the Python process on the other side. [2]

* [python] Wei Zhon has started discussion for FLIP-139, which aims to add
support for *stateful *Python UDFs for the Table API/SQL. So far, only
stateless functions are supported. [3]

* [documentation] Kartik Khare and Mohammad Haseeb Asif will work with the
Apache Flink Community to improve the documentation of Flink SQL as part of
their participation in Google Season of Docs 2020. [4]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Remove-deprecated-DataStream-fold-and-DataStream-split-in-1-12-tp44229.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-139-General-Python-User-Defined-Aggregate-Function-on-Table-API-tp44139.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Introducing-the-GSoD-2020-Participants-tp44144.html

flink-packages.org
==

* fabricalab has published a DynamoDB streaming source on flink-packages.org.
[5]

[5] https://flink-packages.org/packages/streaming-flink-dynamodb-connector

Notable Bugs
==

* [FLINK-18934] [1.11.1] Flink's mechanism to deal with idle
sources/partitions introduced in Flink 1.11 [6] does not currently work
with co-functions, union or joins. [7]

[6]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
[7] https://issues.apache.org/jira/browse/FLINK-18934

Events, Blog Posts, Misc
===

* Dian Fu is now part of the Apache Flink PMC. Congratulations! [8]

[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-PMC-member-Dian-Fu-tp44170p44240.html

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-31 Thread Manas Kale
Guess I figured out a solution for the first question as well - I am
packaging multiple main() classes in the same JAR and specifying entrypoint
classes when submitting the JAR. Most of my issues stemmed from an
improperly configured POM file and a mismatch in Flink runtime versions.
I'll assume this is the recommended way to go about doing this, thanks for
reading and have a great day!

On Mon, Aug 31, 2020 at 12:03 PM Manas Kale  wrote:

> Hi,
> I solved my second issue - I was not following Maven's convention for
> placing source code (I had not placed my source in src/main/java).
> However, I still would like some help with my first question - what is the
> recommended way to set a project with multiple main() classes? At the end,
> I would like to be able to run each main() class as a separate job. Should
> I create a single JAR and specify different entrypoint classes each time or
> should I create separate JARs for each main() class?
>
> On Mon, Aug 31, 2020 at 11:13 AM Manas Kale  wrote:
>
>> Hi,
>> I have an IntelliJ project that has multiple classes with main()
>> functions. I want to package this project as a JAR that I can submit to the
>> Flink cluster and specify the entry class when I start the job. Here are my
>> questions:
>>
>>- I am not really familiar with Maven and would appreciate some
>>pointers/examples. From what I understand, I will need to use some sort of
>>transformer in the Maven shade plugin to merge all of the classes. *If
>>this is correct, can I see a small example? *
>>- Also, I can't get a single main class working:
>>
>>
>> http://maven.apache.org/POM/4.0.0"; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>4.0.0
>>
>>flink_summarization
>>flink_summarization
>>0.1
>>jar
>>
>>Flink Quickstart Job
>>http://www.myorganization.org
>>
>>
>>   UTF-8
>>   1.10.1
>>   1.8
>>   2.11
>>   ${java.version}
>>   ${java.version}
>>
>>
>>
>>   
>>  apache.snapshots
>>  Apache Development Snapshot Repository
>>  
>> https://repository.apache.org/content/repositories/snapshots/
>>  
>> false
>>  
>>  
>> true
>>  
>>   
>>
>>
>>
>>   
>>   
>>   
>>   
>>  org.apache.flink
>>  flink-java
>>  ${flink.version}
>>  provided
>>   
>>   
>>  org.apache.flink
>>  
>> flink-streaming-java_${scala.binary.version}
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-connector-kafka_2.11
>>  ${flink.version}
>>   
>>
>>   
>>  org.apache.flink
>>  flink-state-processor-api_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-connector-jdbc_2.11
>>  1.11.0
>>   
>>
>>   
>>   
>>   
>>  org.slf4j
>>  slf4j-log4j12
>>  1.7.7
>>  runtime
>>   
>>   
>>  log4j
>>  log4j
>>  1.2.17
>>  runtime
>>   
>>
>>   
>>   
>>  org.apache.flink
>>  flink-test-utils_${scala.binary.version}
>>  ${flink.version}
>>  test
>>   
>>   
>>  org.apache.flink
>>  flink-runtime_2.11
>>  ${flink.version}
>>  test
>>  tests
>>   
>>   
>>  org.apache.flink
>>  flink-streaming-java_2.11
>>  ${flink.version}
>>  test
>>  tests
>>   
>>   
>>  org.assertj
>>  assertj-core
>>  
>>  3.16.1
>>  test
>>   
>>
>>
>>
>>
>>
>>   
>>
>>  
>>  
>> org.apache.maven.plugins
>> maven-compiler-plugin
>> 3.1
>> 
>>${java.version}
>>${java.version}
>> 
>>  
>>
>>  
>>  
>>  
>> org.apache.maven.plugins
>> maven-shade-plugin
>> 3.0.0
>> 
>> 
>>false
>> 
>> 
>>
>>
>>   package
>>   
>>  shade
>>   
>>   
>>  
>> 
>>org.apache.flink:force-shading
>>com.google.code.findbugs:jsr305
>>org.slf4j:*
>>log4j:*
>> 
>>  
>>  
>> 
>>
>>*:*
>>
>>   

Re: runtime memory management

2020-08-31 Thread Xintong Song
Well, that's a long story. In general, there are 2 steps.

   1. *Which operators are deployed in the same slot?* Operators are first
   *chained*[1] together, then a *slot sharing strategy*[2] is applied by
   default.
   2. *Which task managers are slots allocated from?*
  1. For active deployments (Kubernetes, Yarn, Mesos), task
  managers are launched on demand. That means ideally you should
not have too
  many empty slots.
  2. For the standalone deployment, by default slots are allocated
  randomly from all registered task managers. You can configure[3] the
  cluster to allocate slots evenly across task managers.


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html#tasks-and-operator-chains
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/internals/job_scheduling.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#cluster-evenly-spread-out-slots

On Mon, Aug 31, 2020 at 4:31 PM lec ssmi  wrote:

> Thanks.
> When the program starts, how is each operator allocated in taskmanager?
> For example, if I have 2 taskmanagers and 10 operators, 9 operators  are
> allocated to tm-A and the remaining one is placed in tm-B, the utilization
> of resources will be very low.
>
> Xintong Song  于2020年8月31日周一 下午2:45写道:
>
>> Hi,
>>
>> For a complex streaming job, is there any way to tilt the memory towards
>>> stateful operators?
>>
>> If streaming jobs are interested, the quick answer is no. Memory is
>> fetched on demand for all operators.
>>
>> Currently, only managed memory for batch jobs are pre-planned for each
>> operator.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Aug 31, 2020 at 1:33 PM lec ssmi  wrote:
>>
>>> HI:
>>>   Generally speaking, when we submitting the flink program, the number
>>> of taskmanager and the memory of each tn will be specified. And the
>>> smallest real execution unit of flink should be operator.
>>>Since the calculation logic corresponding to each operator is
>>> different, some need to save the state, and some don't.  Therefore, the
>>> memory size required by each operator should be different. How does the
>>> flink program allocate taskmanager memory to the operator by default?
>>>   In our production practice, with the increase of traffic, some
>>> operators (mainly stateful such as join and groupby) often have
>>> insufficient memory, resulting in slower calculations. The usual approach
>>> is to increase the entire taskmanager memory. But will this part of the
>>> increased memory be allocated to the map-like operator, or that the memory
>>> itself is fetched on demand  in the same taskmanager  whoever needs the
>>> memory will fetch it until the memory is used up,  in other words, there is
>>> no preset memory allocation ratio. For a complex streaming job, is there
>>> any way to tilt the memory towards stateful operators?
>>>
>>>  Thanks.
>>>
>>>
>>>
>>>


Re: Security vulnerabilities of dependencies in Flink 1.11.1

2020-08-31 Thread Arvid Heise
Hi Shravan,

we periodically bump version numbers, especially for major releases and
basic dependencies such as netty.

However, running a simple scan over dependencies is not that useful without
also checking whether the reported issues are actually triggered by code.
For example, we are not using jackson to process YAML, so that this
vulnerability is not triggered at all. If you are not ingesting Json
through table API, then the outdated jackson-databind is actually not a
security issue as well.

Nevertheless, the respective teams will take a closer look at the report
though. If we see that the vulnerabilities are actively used, then we will
bump soonish.

How do these potential vulnerabilities affect your operations? I'd assume
that most users run isolated Flink clusters if not isolated applications.
Then, the netty vulnerability could never be exploited because netty ports
should not be exposed. On the other hand, if your Flink cluster is fully
exposed, then you may have bigger problems then the dependencies.

Best,

Arvid

On Mon, Aug 31, 2020 at 9:13 AM shravan 
wrote:

> issues.docx
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2439/issues.docx>
>
>
> Hello,
>
> We are using Apache Flink 1.11.1 version and our security scans report the
> following issues.
> Please let us know your comments on these security vulnerabilities and fix
> plans for them.
>
> PFA a word document with details in regard to CVE numbers, details, and
> it's
> severity.
>
> Issues in a nutshell,
> 1. Flink-shaded-netty, has netty 4.1.39 which is vulnerable
> 2. Flink-shaded-jackson, has snakeyaml 1.24 which is vulnerable
> 3. Flink-table, has vulnerable version of Jackson-databind in table APIs
>
> Looking forward on a response.
>
> Thanks,
> Shravan
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Idle stream does not advance watermark in connected stream

2020-08-31 Thread Dawid Wysakowicz
Hey Arvid,

The problem is that the StreamStatus.IDLE is set on the Task level. It
is not propagated to the operator. Combining of the Watermark for a
TwoInputStreamOperator happens in the AbstractStreamOperator:

    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

    public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

There we do not know that e.g. the whole input 1 is idle. Therefore if
we do not receive any Watermarks from it (it became IDLE) we do not
progress the Watermark starting from any two input operator. We are
missing similar handling of the IDLE status from the task level which
works well for one input operators and multiple parallel upstream instances.

Best,

Dawid

On 31/08/2020 11:05, Arvid Heise wrote:
> Hi Aljoscha,
>
> I don't quite follow your analysis. If both sources are configured
> with idleness, they should send a periodic watermark on timeout.
> So the code that you posted would receive watermarks on the idle
> source and thus advance watermarks periodically.
>
> If an idle source does not emit a watermark at all, then I'm curious
> why it's not mapped to StreamStatus.IDLE [1], which would trigger the
> desired behavior.
>
> [1]
> https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd791669898b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79
>
> On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek  > wrote:
>
> Yes, I'm afraid this analysis is correct. The StreamOperator,
> AbstractStreamOperator to be specific, computes the combined
> watermarks
> from both inputs here:
> 
> https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
>
> The operator layer is not aware of idleness so it will never
> notice. The
> idleness only works on the level of inputs but is never forwarded
> to an
> operator itself.
>
> To fix this we would have to also make operators aware of idleness
> such
> that they can take this into account when computing the combined
> output
> watermark.
>
> Best,
> Aljoscha
>
> On 26.08.20 10:02, Dawid Wysakowicz wrote:
> > Hi Kien,
> >
> > I am afraid this is a valid bug. I am not 100% sure but the way I
> > understand the code the idleness mechanism applies to input
> channels,
> > which means e.g. when multiple parallell instances shuffle its
> results
> > to downstream operators.
> >
> > In case of a two input operator, combining the watermark of two
> > different upstream operators happens inside of the operator itself.
> > There we do not have the idleness status. We do not have a
> status that a
> > whole upstream operator became idle. That's definitely a
> bug/limitation.
> >
> > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
> >
> > Best,
> >
> > Dawid
> >
> > On 24/08/2020 16:00, Truong Duc Kien wrote:
> >> Hi all,
> >> We are testing the new Idleness detection feature in Flink 1.11,
> >> however, it does not work as we expected:
> >> When we connect two data streams, of which one is idle, the output
> >> watermark CoProcessOperator does not increase, hence the program
> >> cannot progress.
> >>
> >> I've made a small project to illustrate the problem. The watermark
> >> received by the sink does not increase at all until the idle
> source is
> >> stopped.
> >>
> >> https://github.com/kien-truong/flink-idleness-testing
> >>
> >> Is this a bug or does the idleness detection not support this
> use case ?
> >>
> >> Regards.
> >> Kien
> >
>
>
>
> -- 
>
> Arvid Heise| Senior Java Developer
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward - The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbHRegistered at Amtsg

Re: Issues with Flink Batch and Hadoop dependency

2020-08-31 Thread Arvid Heise
Hi Dan,

Your approach in general is good. You might want to use the bundled hadoop
uber jar [1] to save some time if you find the appropriate version. You can
also build your own version and include it then in lib/.

In general, I'd recommend moving away from sequence files. As soon as you
change your records minimally, everything falls apart. Going with
established binary formats like Avro or Parquet is usually desired also
because of the additional tooling and pays quickly off in the long run.

[1] https://flink.apache.org/downloads.html#additional-components

On Sat, Aug 29, 2020 at 10:50 PM Dan Hill  wrote:

> I was able to get a basic version to work by including a bunch of hadoop
> and s3 dependencies in the job jar and hacking in some hadoop config
> values.  It's probably not optimal but it looks like I'm unblocked.
>
> On Fri, Aug 28, 2020 at 12:11 PM Dan Hill  wrote:
>
>> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
>> debugging and haven't been able to figure it out.  Any help would be
>> greatly appreciated.
>>
>>
>> *Problem*
>> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
>> I try to create a Flink Batch job to read these Sequence files, I get the
>> following error:
>>
>> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>>
>> It fails on this readSequenceFile.
>>
>> env.createInput(HadoopInputs.readSequenceFile(Text.class,
>> ByteWritable.class, INPUT_FILE))
>>
>> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
>> job, I get the following error when trying to run the job:
>>
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "s3"
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
>> at
>> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
>> at
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
>> at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
>> at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)
>>
>>
>> *Extra context*
>> I'm using this Helm chart 
>> for creating Flink.  I'm using v1.10.1.
>>
>>
>> *Questions*
>> Are there any existing projects that read batch Hadoop file formats from
>> S3?
>>
>> I've looked at these instructions for Hadoop Integration
>> .
>> I'm assuming my configuration is wrong.  I'm also assuming I need the
>> hadoop dependency properly setup in the jobmanager and taskmanager (not in
>> the job itself).  If I use this Helm chart, do I need to download a hadoop
>> common jar into the Flink images for jobmanager and taskmanager?  Are there
>> pre-built images which I can use that already have the dependencies setup?
>>
>>
>> - Dan
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Debezium Flink EMR

2020-08-31 Thread Arvid Heise
Hi Rex,

the connector expects a value without a schema, but the message contains a
schema. You can tell Flink that the schema is included as written in the
documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
 'debezium-json.schema-include' = true)

@Jark Wu  , it would be probably good to make the
connector more robust and catch these types of misconfigurations.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#how-to-use-debezium-format

On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley  wrote:

> Awesome, so that took me a step further. When running i'm receiving an
> error however. FYI, my docker-compose file is based on the Debezium mysql
> tutorial which can be found here
> https://debezium.io/documentation/reference/1.2/tutorial.html
>
> Part of the stack trace:
>
> flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt Debezium
> JSON message
> '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111
> cool street","city":"Big
> City","state":"California","zip":"9","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.10","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
> flink-jobmanager_1 | at
> org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
> ~[flink-json-1.11.1.jar:1.11.1]
> flink-jobmanager_1 | at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[?:?]
> flink-jobmanager_1 | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[?:?]
> flink-jobmanager_1 | at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[?:?]
> flink-jobmanager_1 | at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

Re: Flink Migration

2020-08-31 Thread Arvid Heise
Hi Navneeth,

if everything worked before and you just experience later issues, it would
be interesting to know if your state size grew over time. An application
over time usually needs gradually more resources. If the user base of your
company grows, so does the amount of messages (be it click stream, page
impressions, or any kind of transactions). Often time, also the operator
state grows. Sometimes, it's just that the events themselves become more
complex and thus you need more overall bandwidth. This means that from time
to time, you need to increase the memory of Flink (for state) or the number
of compute nodes (to handle more events). In the same way, you need to make
sure that your sink scales as well.

If you fail to keep up with the demand, the application gradually becomes
more unstable (for example by running out of memory repeatedly). I'm
suspecting that this may happen in your case.

First, it's important to understand what the bottleneck is. Web UI should
help to narrow it down quickly. You can also share your insights and we can
discuss further strategies.

If nothing works out, I also recommend an upgrade. Your best migration path
would be to use Flink 1.7, which should allow a smoother transition for
state [1]. I'd guess that afterwards, you should be able to migrate to 1.11
with almost no code changes.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17

On Fri, Aug 28, 2020 at 1:43 PM Yun Tang  wrote:

> Hi Navneeth
>
> First of all, I suggest to upgrade Flink version to latest version.
> And you could refer here [1] for the savepoint compatibility when
> upgrading Flink.
>
> For the problem that cannot connect address, you could login your pod and
> run 'nslookup jobmanager' to see whether the host could be resolved.
> You can also check the service of 'jobmanager' whether work as expected
> via 'kubectl get svc' .
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>
> Best
> Yun Tang
>
> --
> *From:* Navneeth Krishnan 
> *Sent:* Friday, August 28, 2020 17:00
> *To:* user 
> *Subject:* Flink Migration
>
> Hi All,
>
> We are currently on a very old version of flink 1.4.0 and it has worked
> pretty well. But lately we have been facing checkpoint timeout issues. We
> would like to minimize any changes to the current pipelines and go ahead
> with the migration. With that said our first pick was to migrate to 1.5.6
> and later migrate to a newer version.
>
> Do you guys think a more recent version like 1.6 or 1.7 might work? We did
> try 1.8 but it requires some changes in the pipelines.
>
> When we tried 1.5.6 with docker compose we were unable to get the task
> manager attached to jobmanager. Are there some specific configurations
> required for newer versions?
>
> Logs:
>
> 8-28 07:36:30.834 [main] INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils  - TaskManager will
> try to connect for 1 milliseconds before falling back to heuristics
>
> 2020-08-28 07:36:30.853 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Retrieved new target
> address jobmanager/172.21.0.8:6123.
>
> 2020-08-28 07:36:31.279 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
> address jobmanager/172.21.0.8:6123
>
> 2020-08-28 07:36:31.280 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.281 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.281 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.282 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/127.0.0.1': Invalid argument (connect failed)
>
> 2020-08-28 07:36:31.283 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.284 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/127.0.0.1': Invalid argument (connect failed)
>
> 2020-08-28 07:36:31.684 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
> address jobmanager/172.21.0.8:6123
>
> 2020-08-28 07:36:31.686 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused)
>
> 2020-08-28 07:36:31.687 [main] INFO
> org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
> address '/172.21.0.9': Connection refused (Connection refuse

Re: Flink SQL Streaming Join Creates Duplicates

2020-08-31 Thread Arvid Heise
Hi Austin,

Do I assume correctly, that you self-answered your question? If not, could
you please update your current progress?

Best,

Arvid

On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Ah, I think the "Result Updating" is what got me -- INNER joins do the
> job!
>
> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> oops, the example query should actually be:
>>
>> SELECT table_1.a, table_1.b, table_2.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> and duplicate results should actually be:
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = "data b 1", c = null)
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = "data b 2", c = null)
>>
>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>>> reading from a few CSV files and joins some records across them into a
>>> couple of data streams (yes, this could be a batch job won't get into why
>>> we chose streams unless it's relevant). These joins are producing some
>>> duplicate records, one with the joined field present and one with the
>>> joined field as `null`, though this happens only ~25% of the time. Reading
>>> the docs on joins[1], I thought this could be caused by too strict Idle
>>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>>> doesn't seem to have an effect, and the problem still occurs when testing
>>> on a subset of data that finishes processing in under a minute.
>>>
>>> The query roughly looks like:
>>>
>>> table_1 has fields a, b
>>> table_2 has fields b, c
>>>
>>> SELECT table_1.a, table_1.b, table_1.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> Correct result:
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>
>>> Results seem to be anywhere between all possible dups and the correct
>>> result.
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = null, c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = null, c = "data c 2")
>>>
>>> The CSV files are registered as Flink Tables with the following:
>>>
>>> tableEnv.connect(
>>> new FileSystem()
>>> .path(path)
>>> )
>>> .withFormat(
>>> new Csv()
>>> .quoteCharacter('"')
>>> .ignoreParseErrors()
>>> )
>>> .withSchema(schema)
>>> .inAppendMode()
>>> .createTemporaryTable(tableName);
>>>
>>>
>>> I'm creating my table environment like so:
>>>
>>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>>> .useBlinkPlanner()
>>> .build();
>>>
>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>>> tableEnvSettings);
>>>
>>> TableConfig tConfig = tEnv.getConfig();
>>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>>
>>>
>>> Is there something I'm misconfiguring or have misunderstood the docs?
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Idle stream does not advance watermark in connected stream

2020-08-31 Thread Arvid Heise
Hi Aljoscha,

I don't quite follow your analysis. If both sources are configured with
idleness, they should send a periodic watermark on timeout.
So the code that you posted would receive watermarks on the idle source and
thus advance watermarks periodically.

If an idle source does not emit a watermark at all, then I'm curious why
it's not mapped to StreamStatus.IDLE [1], which would trigger the desired
behavior.

[1]
https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd791669898b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79

On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek 
wrote:

> Yes, I'm afraid this analysis is correct. The StreamOperator,
> AbstractStreamOperator to be specific, computes the combined watermarks
> from both inputs here:
>
> https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
>
> The operator layer is not aware of idleness so it will never notice. The
> idleness only works on the level of inputs but is never forwarded to an
> operator itself.
>
> To fix this we would have to also make operators aware of idleness such
> that they can take this into account when computing the combined output
> watermark.
>
> Best,
> Aljoscha
>
> On 26.08.20 10:02, Dawid Wysakowicz wrote:
> > Hi Kien,
> >
> > I am afraid this is a valid bug. I am not 100% sure but the way I
> > understand the code the idleness mechanism applies to input channels,
> > which means e.g. when multiple parallell instances shuffle its results
> > to downstream operators.
> >
> > In case of a two input operator, combining the watermark of two
> > different upstream operators happens inside of the operator itself.
> > There we do not have the idleness status. We do not have a status that a
> > whole upstream operator became idle. That's definitely a bug/limitation.
> >
> > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
> >
> > Best,
> >
> > Dawid
> >
> > On 24/08/2020 16:00, Truong Duc Kien wrote:
> >> Hi all,
> >> We are testing the new Idleness detection feature in Flink 1.11,
> >> however, it does not work as we expected:
> >> When we connect two data streams, of which one is idle, the output
> >> watermark CoProcessOperator does not increase, hence the program
> >> cannot progress.
> >>
> >> I've made a small project to illustrate the problem. The watermark
> >> received by the sink does not increase at all until the idle source is
> >> stopped.
> >>
> >> https://github.com/kien-truong/flink-idleness-testing
> >>
> >> Is this a bug or does the idleness detection not support this use case ?
> >>
> >> Regards.
> >> Kien
> >
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Implementation of setBufferTimeout(timeoutMillis)

2020-08-31 Thread Pankaj Chand
Thank you so much, Yun! It is exactly what I needed.

On Mon, Aug 31, 2020 at 1:50 AM Yun Gao  wrote:

> Hi Pankaj,
>
> I think it should be in
> org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.
>
> Best,
>  Yun
>
>
>
> --
> Sender:Pankaj Chand
> Date:2020/08/31 02:40:15
> Recipient:user
> Theme:Implementation of setBufferTimeout(timeoutMillis)
>
> Hello,
>
> The documentation gives the following two sample lines for setting the
> buffer timeout for the streaming environment or transformation.
>
>
>
> *env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new
> MyMapper()).setBufferTimeout(timeoutMillis);*
>
> I have been trying to find where (file and method) in the Flink source
> code are the buffers being flushed by iteratively referring to the value of
> timeoutMillis (or the default value), but have been unsuccessful. Please
> help.
>
> Thanks,
>
> Pankaj
>
>


Re: runtime memory management

2020-08-31 Thread lec ssmi
Thanks.
When the program starts, how is each operator allocated in taskmanager?
For example, if I have 2 taskmanagers and 10 operators, 9 operators  are
allocated to tm-A and the remaining one is placed in tm-B, the utilization
of resources will be very low.

Xintong Song  于2020年8月31日周一 下午2:45写道:

> Hi,
>
> For a complex streaming job, is there any way to tilt the memory towards
>> stateful operators?
>
> If streaming jobs are interested, the quick answer is no. Memory is
> fetched on demand for all operators.
>
> Currently, only managed memory for batch jobs are pre-planned for each
> operator.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 31, 2020 at 1:33 PM lec ssmi  wrote:
>
>> HI:
>>   Generally speaking, when we submitting the flink program, the number of
>> taskmanager and the memory of each tn will be specified. And the smallest
>> real execution unit of flink should be operator.
>>Since the calculation logic corresponding to each operator is
>> different, some need to save the state, and some don't.  Therefore, the
>> memory size required by each operator should be different. How does the
>> flink program allocate taskmanager memory to the operator by default?
>>   In our production practice, with the increase of traffic, some
>> operators (mainly stateful such as join and groupby) often have
>> insufficient memory, resulting in slower calculations. The usual approach
>> is to increase the entire taskmanager memory. But will this part of the
>> increased memory be allocated to the map-like operator, or that the memory
>> itself is fetched on demand  in the same taskmanager  whoever needs the
>> memory will fetch it until the memory is used up,  in other words, there is
>> no preset memory allocation ratio. For a complex streaming job, is there
>> any way to tilt the memory towards stateful operators?
>>
>>  Thanks.
>>
>>
>>
>>


Security vulnerabilities of dependencies in Flink 1.11.1

2020-08-31 Thread shravan
issues.docx

  

Hello,

We are using Apache Flink 1.11.1 version and our security scans report the
following issues.  
Please let us know your comments on these security vulnerabilities and fix
plans for them.

PFA a word document with details in regard to CVE numbers, details, and it's
severity.

Issues in a nutshell,
1. Flink-shaded-netty, has netty 4.1.39 which is vulnerable
2. Flink-shaded-jackson, has snakeyaml 1.24 which is vulnerable
3. Flink-table, has vulnerable version of Jackson-databind in table APIs

Looking forward on a response.

Thanks,
Shravan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/