Re: Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-01 Thread Qingsheng Ren
Hi Isidoros,

Two assumptions in my mind: 

1. Records are not evenly distributed across different keys, e.g. some 
accountId just has more events than others. If the record distribution is 
predicable, you can try to combine other fields or include more information 
into the key field to help balancing the distribution. 

2. Keys themselves are not distributed evenly. In short the subtask ID that a 
key belongs to is calculated by murmurHash(key.hashCode()) % maxParallelism, so 
if the distribution of keys is quite strange, it’s possible that most keys drop 
into the same subtask with the algorithm above. AFAIK there isn't such kind of 
metric for monitoring number of keys in a subtask, but I think you can simply 
investigate it with a map function after keyBy. 

Hope this would be helpful!

Qingsheng

> On Apr 1, 2022, at 17:37, Isidoros Ioannou  wrote:
> 
> Hello,
> 
> we ran a flink application version 1.13.2 that consists of a kafka source 
> with one partition so far
> then we filter the data based on some conditions, mapped to POJOS and we 
> transform to a KeyedStream based on an accountId long property from the POJO. 
> The downstream operators are 10 CEP operators that run with parallelism of 14 
> and the maxParallelism is set to the (operatorParallelism * 
> operatorParallelism).
> As you see in the image attached the events are distributed unevenly so some 
> subtasks are busy and others are idle.
> Is there any way to distribute evenly the load to the subtasks? Thank you in 
> advance.
> 
>  



Re: [External] Re: Potential Bug with Date Serialization for Table Stream

2022-04-01 Thread Tom Thornton
Hi Martijn,

Thank you for following up on this. We ended up changing two parts:

When creating the DataType we instead used

new AtomicDataType(new DateType(false), java.sql.Date.class);

So we could override the conversion class for the constructor
.
We also changed the logic when converting an avro schema to type
information and used the following in place of the default of
LocalTimeTypeInfo[java.time.LocalDate]

org.apache.flink.table.api.Types.SQL_DATE()

Thank you for the help. We do want to upgrade versions and would likely
help. For now we have this workaround with the current version.

On Fri, Apr 1, 2022 at 4:28 AM Martijn Visser 
wrote:

> Hi Tom,
>
> Sorry for the late reply, I missed this. In the upcoming Flink 1.15 a
> number of improvements on CAST will be included [1] Would you be able to
> test this with the current RC0 of Flink 1.15? [2]
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
> [1] https://issues.apache.org/jira/browse/FLINK-24403
> [2] https://lists.apache.org/thread/qpzz298lh5zq5osxmoo0ky6kg0b0r5zg
>
>
> On Tue, 22 Mar 2022 at 18:06, Tom Thornton  wrote:
>
>> Hi Martijn,
>>
>> Do you know what could be causing this issue given our Flink version? Is
>> this possibly a bug with that version?
>>
>> Thanks,
>> Tom
>>
>> On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton  wrote:
>>
>>> Hi Martijn,
>>>
>>> We are using 1.11.6.
>>>
>>> Thank you for the help.
>>>
>>> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser 
>>> wrote:
>>>
 Hi Tom,

 Which version of Flink are you using?

 Best regards,

 Martijn Visser
 https://twitter.com/MartijnVisser82


 On Wed, 16 Mar 2022 at 23:59, Tom Thornton  wrote:

> Per the docs ,
> I'm hoping to confirm whether or not an error we are seeing is a bug with
> Flink. We have a job that uses a Kafka source to read Avro records. The
> kafka source is converted into a StreamTableSource. We are using the
> new Blink table planner to execute SQL on the table stream. The output is
> then put in a sink back to kafka as Avro records. Whenever a query selects
> a column that has an avro logicalType of date, we get this error (link to 
> full
> stack trace ).
>
> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be 
> cast to class java.time.LocalDate (java.sql.Date is in module java.sql of 
> loader 'platform'; java.time.LocalDate is in module java.base of loader 
> 'bootstrap')
> at 
> org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)

Re: Flink SQL and data shuffling (keyBy)

2022-04-01 Thread Marios Trivyzas
Hi!

I don't think there is a way to achieve that without resorting to
DataStream API.
I don't know if using the PARTITIONED BY clause in the create statement of
the table can help to "balance" the data, see
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#partitioned-by
.


On Thu, Mar 31, 2022 at 7:18 AM Yaroslav Tkachenko 
wrote:

> Hey everyone,
>
> I'm trying to use Flink SQL to construct a set of transformations for my
> application. Let's say the topology just has three steps:
>
> - SQL Source
> - SQL SELECT statement
> - SQL Sink (via INSERT)
>
> The sink I'm using (JDBC) would really benefit from data partitioning (by
> PK ID) to avoid conflicting transactions and deadlocks. I can force Flink
> to partition the data by the PK ID before the INSERT by resorting to
> DataStream API and leveraging the keyBy method, then transforming
> DataStream back to the Table again...
>
> Is there a simpler way to do this? I understand that, for example, a GROUP
> BY statement will probably perform similar data shuffling, but what if I
> have a simple SELECT followed by INSERT?
>
> Thank you!
>


-- 
Marios


Re: DBT-flink profile?

2022-04-01 Thread Martijn Visser
Hi Georg,

I'm not aware of anyone currently working on a DBT/Flink integration. It
would be great if someone would want to investigate or experiment with
those who have knowledge and time on DBT and Flink.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 25 Mar 2022 at 15:50, Georg Heiler 
wrote:

> Hi,
>
> use is perhaps not the right word (yet) rather experiment. But both would
> be relevant. And in particular, also the streaming option.
>
> I also just found: https://materialize.com/docs/guides/dbt/ outlining how
> dbt and streaming could potentially be married. Perhaps their integration
> could serve as an example?
>
> Best,
> Georg
>
> Am Fr., 25. März 2022 um 05:01 Uhr schrieb Yun Gao :
>
>> Hi Georg,
>>
>> May I have a double confirmation for integrating with dbt,
>> are you currenty want to use it for batch jobs or streaming jobs?
>>
>> Best,
>> Yun Gao
>>
>>
>>
>> --
>> Sender:Georg Heiler
>> Date:2022/03/25 01:27:26
>> Recipient:user
>> Theme:DBT-flink profile?
>>
>> Hi,
>>
>> is anyone working on a DBT Flink plugin/profile?
>>
>> https://docs.getdbt.com/reference/profiles.yml hosts many other
>> databases - and I think this kind of support would be really beneficial for
>> the SQL part of Flink.
>>
>> Best,
>> Georg
>>
>>


Re: [External] Re: Potential Bug with Date Serialization for Table Stream

2022-04-01 Thread Martijn Visser
Hi Tom,

Sorry for the late reply, I missed this. In the upcoming Flink 1.15 a
number of improvements on CAST will be included [1] Would you be able to
test this with the current RC0 of Flink 1.15? [2]

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1] https://issues.apache.org/jira/browse/FLINK-24403
[2] https://lists.apache.org/thread/qpzz298lh5zq5osxmoo0ky6kg0b0r5zg


On Tue, 22 Mar 2022 at 18:06, Tom Thornton  wrote:

> Hi Martijn,
>
> Do you know what could be causing this issue given our Flink version? Is
> this possibly a bug with that version?
>
> Thanks,
> Tom
>
> On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton  wrote:
>
>> Hi Martijn,
>>
>> We are using 1.11.6.
>>
>> Thank you for the help.
>>
>> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser 
>> wrote:
>>
>>> Hi Tom,
>>>
>>> Which version of Flink are you using?
>>>
>>> Best regards,
>>>
>>> Martijn Visser
>>> https://twitter.com/MartijnVisser82
>>>
>>>
>>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton  wrote:
>>>
 Per the docs ,
 I'm hoping to confirm whether or not an error we are seeing is a bug with
 Flink. We have a job that uses a Kafka source to read Avro records. The
 kafka source is converted into a StreamTableSource. We are using the
 new Blink table planner to execute SQL on the table stream. The output is
 then put in a sink back to kafka as Avro records. Whenever a query selects
 a column that has an avro logicalType of date, we get this error (link to 
 full
 stack trace ).

 Caused by: java.lang.ClassCastException: class java.sql.Date cannot be 
 cast to class java.time.LocalDate (java.sql.Date is in module java.sql of 
 loader 'platform'; java.time.LocalDate is in module java.base of loader 
 'bootstrap')
 at 
 org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
 at 
 org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
 at 
 org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
 at 
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
 at 
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
 at 
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
 at 
 org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 at 
 org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 at 
 org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 at 
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
 at 
 org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
 at 
 org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
 at 
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
 at 
 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
 at java.base/java.lang.Thread.run(Thread.java:829)


 The avro schema definition for a date field is as follows:

 {
 "name": "date",
 "type": {
 "type": "int",
 "logicalType": "date"
 },
 "doc": "date"
 },

 Any query that selects a date column would produce the error (and any 
 query without a column with type date will work). Example of a query that 
 causes the error:

 select `date` from table1

 As suggested in the docs, I also tried this with parent-first loading and 
 got the same error. When we run the same job without the Blink table 
 planner, i.e., useOldPlanner(), we do not get this error. Is this a bug 
 with Flink? Or is there something 

Re: Question about community collaboration options

2022-04-01 Thread Martijn Visser
Hi Ted,

This is a great question. There are usually bi-weekly sync meetings to
discuss plans and progress for the next Flink release. For example, there
was a regular one for the Flink 1.15 release [1]

I do see some things that we could improve on as a Flink community. For
example, there are quite a large number of open PRs. PRs don't always get a
review or get merged. I'm hearing from other contributors that they need to
ping people in order to get attention for their PR. Some PRs are of poor
quality, because they don't adhere to the code contribution guide. I can
imagine that newly interested contributors don't know exactly where to
start.

I see other open source projects who indeed run a public Slack channel,
like Apache Airflow [2]. Kubernetes actually has a triage team and
processes documented [3].

I'm curious what other Users and Dev'ers think. What are some of the
problems that you're currently experiencing and how do you think we could
improve/solve those?

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1] https://cwiki.apache.org/confluence/display/FLINK/1.15+Release
[2] https://airflow.apache.org/community/ (Under "Ask a question")
[3]
https://github.com/kubernetes/community/blob/master/sig-contributor-experience/triage-team/triage.md

On Thu, 31 Mar 2022 at 23:44, Hao t Chang  wrote:

> Hi,
>
>
>
> I have been looking into Flink and joined the mailing lists recently. I am
> trying to figure out how the community members collaborate. For example, is
> there Slack channels or Weekly sync up calls where the community members
> can participate and talk with each other to brainstorm, design, and make
> decisions?
>
>
>
> Ted
>


Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-01 Thread Isidoros Ioannou
Hello,

we ran a flink application version 1.13.2 that consists of a kafka source
with one partition so far
then we filter the data based on some conditions, mapped to POJOS and we
transform to a KeyedStream based on an accountId long property from the
POJO. The downstream operators are 10 CEP operators that run with
parallelism of 14 and the maxParallelism is set to the (operatorParallelism
* operatorParallelism).
As you see in the image attached the events are distributed unevenly so
some subtasks are busy and others are idle.
Is there any way to distribute evenly the load to the subtasks? Thank you
in advance.
[image: Capture.PNG]


Re: Could you please give me a hand about json object in flink sql

2022-04-01 Thread Qingsheng Ren
Hi, 

I’m afraid you have to use a UDTF to parse the content and construct the final 
json string manually. The key problem is that the field “content” is actually a 
STRING, although it looks like a json object. Currently the json format 
provided by Flink could not handle this kind of field defined as STRING. Also 
considering the schema of this “content” field is not fixed across records, 
Flink SQL can’t use one DDL to consume / produce records with changing schema. 

Cheers,

Qingsheng

> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> Thanks so much for your precious time reading my email!
> Resently I'm working on the Flink sql (with version 1.13) in my project and 
> encountered one problem about json format data, hope you can take a look, 
> thanks! Below is the description of my issue.
> 
> I use kafka as source and sink, I define kafka source table like this:
> 
>  CREATE TABLE TableSource (
>   schema STRING,
>   payload ROW(
>   `id` STRING,
>   `content` STRING
>  )
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_source',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'all_gp',
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
>  );
> 
> Define the kafka sink table like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` STRING NOT NULL
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_sink',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
> );
> 
> 
> Then insert into TableSink with data from TableSource:
> INSERT INTO TableSink SELECT id, content FROM TableSource;
> 
> Then I use "kafka-console-producer.sh" to produce data below into topic 
> "topic_source" (TableSource):
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"name\":\"Jone\",\"age\":20}"}}
> 
> 
> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
> output is:
> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
> 
> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
> I want the the value of "content" is json object, not json string.
> 
> And what's more, the format of "content" in TableSource is not fixed, it can 
> be any json formated(or json array format) string, such as:
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> 
> So my question is, how can I transform json format string(like 
> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
> (like{"name":"Jone","age":20} ).
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
>  



Re: [DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-04-01 Thread r pp
hi~ Can you send your full code ?

Ryan van Huuksloot  于2022年3月31日周四 22:58写道:

> Hello!
>
> *Problem:*
> I am connecting to a Kafka Source with the Watermark Strategy below.
>
> val watermarkStrategy = WatermarkStrategy
>   .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
>   .withTimestampAssigner(new 
> SerializableTimestampAssigner[StarscreamEventCounter_V1] {
> override def extractTimestamp(element: StarscreamEventCounter_V1, 
> recordTimestamp: Long): Long =
>   element.envelopeTimestamp
>   })
>
> The Watermarks are correctly getting assigned.
> However, when a reduce function is used the window never terminates
> because the `ctx.getCurrentWatermark()` returns the default value of
> `-9223372036854775808` in perpetuity.
>
> This is the stream code:
>
> stream
>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)
>
> The reduce uses this overloaded operator:
>
> @JsonIgnoreProperties(ignoreUnknown = true)
> case class StarscreamEventCounter_V1(
>   envelopeTimestamp: Long,
>   numberOfEvents: Int = 1
> ) {
>   def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
> 
> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), 
> that.numberOfEvents + this.numberOfEvents)
>   }
>
>
> *Attempt to Solve:*
> 1. Validate that the Watermark is set on the source
> a. Set a custom trigger to emit a watermark on each event just in case
> 2. Test with aggregate / process functions
> a. Both other functions work properly - window closes and emits to a
> PrintSink
> 3. Change Watermark Generator to a custom generator
> a. Also change time horizons and let run for 1 day - window never
> closes due to the watermark being stuck at the min default. The sink never
> receives the data but the UI says there are records being output!
>
> *Hypothesis:*
> The output id of a reduce function is causing an upstream issue where the
> watermark is no longer assigned to the Window. I haven't been able to lock
> down what exactly is causing the issue though. My thought is that it might
> be a bug given it works for Aggregate/Process.
> It could be a bug in the IndexedCombinedWatermarkStatus, the partial
> watermarks should not be the min default value when I set the watermark per
> event - this is what I will be looking into until I hear back. I validated
> that the watermark is set correctly in CombinedWatermarkStatus.
>
> *Tools:*
> - Flink 1.14.3
> - Scala
> - DataStream API
>
> Any assistance would be great! Happy to provide more context or
> clarification if something isn't clear!
>
> Thanks!
>
> Ryan van Huuksloot
> Data Developer | Data Platform Engineering | Streaming Capabilities
> [image: Shopify]
> 
>


-- 
Best,
  pp