Re: Question about processing a 3-level List data type in parquet

2020-11-04 Thread Naehee Kim
Hi Jingsong,

Thanks for the feedback. Can you let me know the concept and timeline of
BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?

Our use case is for backfill to process parquet files in case of any data
issue is found in the normal processing of kafka input. Thus, we want to
make a job to easily switch kafka input and parquet file input and vice
versa. Wonder if ParquetBulkFormat can fit in our use case.

Best,
Naehee

On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li  wrote:

> Hi Naehee, sorry for the late reply.
>
> I think you are right, there are bugs here. We didn't think about nested
> structures very well before.
>
> Now we mainly focus on the new BulkFormat implementation, which we need to
> consider when implementing the new ParquetBulkFormat.
>
> Best,
> Jingsong
>
> On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim  wrote:
>
>> Hi Jingsong,
>>
>> I am forwarding the email below to you, thinking you will have a good
>> idea about my questions below. I'd appreciate it if you give your thoughts.
>>
>> Thanks,
>> Naehee
>>
>>
>> -- Forwarded message -
>> From: Naehee Kim 
>> Date: Thu, Oct 29, 2020 at 4:38 PM
>> Subject: Question about processing a 3-level List data type in parquet
>> To: 
>>
>>
>> Hi Flink Dev Community,
>>
>> I've found RowConverter.java in flink-parquet module doesn't support
>> reading a 3-level list type in parquet though it is able to process a
>> 2-level list type.
>>
>> 3-level
>>
>> optional group my_list (LIST) {
>>   repeated group element {
>> required binary str (UTF8);
>>   };
>> }
>>
>>
>>   2-level
>>
>> optional group my_list (LIST) {
>>   repeated int32 element;
>> }
>>
>> Reference:
>> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
>>
>> The parquet file I am testing with was written by Spark job and it has a
>> 3-level list type. When I try to process the parquet file, it runs into
>> 'java.lang.ClassCastException: Expected instance of group converter but got
>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
>> error.
>>
>> I've tested with Flink 1.9 and checked RowConverter.java still remains
>> the same in v1.11. To process a 3-level list, I think RowConverter.java
>> should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A
>> 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if
>> my understanding is correct and if you have any plan to support a 3-level
>> List datatype in parquet.
>>
>> For your reference, here are code snippet along with stack trace.
>>
>> MessageType readSchema = (new 
>> AvroSchemaConverter()).convert(REPORTING_SCHEMA);
>> RowTypeInfo rowTypeInfo = (RowTypeInfo) 
>> ParquetSchemaConverter.fromParquetType(readSchema);
>> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new 
>> Path("file:///test-file.snappy.parquet"), readSchema);
>> DataStreamSource dataSource = env.createInput(parquetInputFormat, 
>> rowTypeInfo);
>>
>> -- stack trace
>>
>> Job execution failed.
>> org.apache.flink.runtime.client.JobExecutionException:
>>  at 
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>  at 
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
>>  at 
>> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
>>  at 
>> com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>  at 
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>  at 
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>  at 
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>  at 
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>  at 
>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>  at 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJ

Re: I have some interesting result with my test code

2020-11-04 Thread Jark Wu
Hi Kevin,

Could you share the code of how you register the FlinkKafkaConsumer as a
table?

Regarding your initialization of FlinkKafkaConsumer, I would recommend to
setStartFromEarliest() to guarantee it consumes all the records in
partitions.

Regarding the flush(), it seems it is in the foreach loop? So it is
not flushing
after publishing ALL events?
I'm not experienced with the flush() API, could this method block and the
following random events can't be published to Kafka?

Best,
Jark

On Wed, 4 Nov 2020 at 04:04, Robert Metzger  wrote:

> Hi Kevin,
> thanks a lot for posting this problem.
> I'm adding Jark to the thread, he or another committer working on Flink
> SQL can maybe provide some insights.
>
> On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon  wrote:
>
>> Looks like the event time that I've specified in the consumer is not
>> being respected. Does the timestamp assigner actually work in Kafka
>> consumers?
>>
>>   .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>> override def extractTimestamp(order: Order, recordTimestamp: Long): 
>> Long = {
>>   order.getTimestamp
>> }
>>   })
>>
>>
>> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon  wrote:
>>
>>> Hi guys, I've been recently experimenting with end-to-end testing
>>> environment with Kafka and Flink (1.11)
>>>
>>> I've setup an infrastructure with Docker Compose composed of single
>>> Kafka broker / Flink (1.11) / MinIO for checkpoint saves
>>>
>>> Here's the test scenario
>>>
>>> 1. Send 1000 messages with manual timestamp assigned to each event
>>> increased by 100 milliseconds per loop (first message and last message has
>>> a difference of 100 seconds). There are 3 partitions for the topic I'm
>>> writing to. Below code is the test message producer using Confluent's
>>> Python SDK
>>>
>>> order_producer = get_order_producer()
>>> current_timestamp = int(round(time() * 1000))
>>> for i in range(0, 1000):
>>> order_producer.produce(
>>> topic="order",
>>> key={"key": i % 100},
>>> value={
>>> "id": 1000,
>>> "customerId": i % 10,
>>> "timestamp": current_timestamp + i * 100
>>> }
>>> )
>>> order_producer.flush()
>>>
>>>
>>> 2. Flink performs an SQL query on this stream and publishes it back to
>>> Kafka topic that has 3 partitions. Below is the SQL code
>>>
>>> | SELECT
>>> |   o.id,
>>> |   COUNT(*),
>>> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
>>> | FROM
>>> |   order o
>>> | GROUP BY
>>> |   o.id,
>>> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>>>
>>> So I expect the sum of all the counts of the result to be equal to 1000
>>> but it seems that a lot of messages are missing (797 as below). I can't
>>> seem to figure out why though. I'm using event time for the environment
>>>
>>> [image: Screenshot 2020-11-02 at 23.35.23.png]
>>>
>>> *Below is the configuration code*
>>> Here's the code for the consumer settings for Kafka
>>>
>>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>>>   val properties = new Properties()
>>>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>>>   properties.setProperty("group.id", "awesome_order")
>>>
>>>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>>> "order",
>>> ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>>>   classOf[Order],
>>>   kafkaSchemaRegistry
>>> ),
>>> properties
>>>   )
>>>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>>>   kafkaConsumer.setStartFromGroupOffsets()
>>>   kafkaConsumer.assignTimestampsAndWatermarks {
>>> WatermarkStrategy
>>>   .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>>>   .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>> override def extractTimestamp(order: Order, recordTimestamp: Long): 
>>> Long = {
>>>   order.getTimestamp
>>> }
>>>   })
>>>   }
>>>   kafkaConsumer
>>> }
>>>
>>> Afterwards,
>>> 1. I create a tempview from this source data stream
>>> 2. perform SQL queries on it
>>> 3. append it back to a processed datastream
>>> 4. attach the stream to kafka sink
>>>
>>> Here's the code for the producer settings for Kafka
>>>
>>> private def initProcessedModelProducer(): 
>>> FlinkKafkaProducer[ProcessedModel] = {
>>>   val properties: Properties = new Properties()
>>>   properties.put("bootstrap.servers", kafkaBrokers)
>>>   properties.put("transaction.timeout.ms", "6")
>>>
>>>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>>> "processed_model",
>>> ConfluentRegistryAvroSerializationSchema.forSpecific(
>>>   classOf[ProcessedModel],
>>>   "procssed_model-value",
>>>   kafkaSchemaRegistry
>>> ),
>>> properties,
>>> null,
>>> Semantic.EXACTLY_ONCE,
>>> 5
>>>   )
>>>   kafkaProducer
>>> }
>>>
>>>
>>>
>>> *Side Note*
>>> Another interesting part is that, if I flush "after" publishing all events, 
>>> the processed event doesn't even seem to arriv

Re: Dependency injection and flink.

2020-11-04 Thread Dan Diephouse
Just want to chime in here that it would be fantastic to have a way to DI
in Flink. Ideally the injected services themselves don't get serialized at
all since they're just singletons in our case. E.g. we have an API client
that looks up data from our API and caches it for all the functions that
need it.

On Tue, Nov 3, 2020 at 12:32 AM Arvid Heise  wrote:

> Hi Santhosh,
>
> Flink does not support automatic DI on task level and there is no
> immediate plan as of now to support it out-of-the-box. In general, there
> are quite a few implications of using automatic DI in a distributed
> setting. For example, how is a singleton supposed to work? Nevertheless,
> Flink's job startup got overhauled in the last and the upcoming release, so
> it might be easier to support DI frameworks in the near future.
>
> What I usually recommend is to use automatic DI while creating the
> DataStream application and then switch to manual DI on task manager level
> (most folks confuse DI with automatic DI, but DI is a general pattern that
> is independent of any framework).
>
> Here is an example. Suppose you want to use ServiceA in some asyncIO call.
>
> DataStream inputStream = env.addSource(...);
> AsyncFunction function = new ExternalLookupFunction();
> AsyncDataStream.unorderedWait(inputStream, function, 1, 
> TimeUnit.SECONDS).print();
>
> class ExternalLookupFunction extends AsyncFunction {
>   @Autowired
>   ServiceA service; // <-- will be injected wherever the DataStream graph 
> is created
>
>   @Override
>   public void asyncInvoke(Integer input, ResultFuture 
> resultFuture) throws IOException {
>   service.call(input, resultFuture::complete); // <-- called only 
> on task manager
>   }
> }
>
>
> Now the question is how ServiceA is transferred from client/job manager to
> task manager. One solution is to make ServiceA Serializable and just let
> Java Serialization handle everything automatically. Alternatively, you can
> only serialize the configuration information and create the service on
> RichAsyncFunction#open.
>
> Let's see if someone else made progress on providing the initialization
> hooks as described in your linked thread. Note that the community is busy
> getting Flink 1.12 done, so it might take a while for more answers.
>
> Best,
>
> Arvid
>
> On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <
> santhoshvenkat1...@gmail.com> wrote:
>
>>
>> Hi,
>>
>> I'm trying to integrate a dependency injection framework with flink
>> within my company. When I searched the user-mailing list, I found the
>> following thread in flink which discussed about this in the past:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html
>>
>> Since the thread was ~2 yrs old, I'm creating this request.
>>
>> 1. How do we expect users to integrate flink with a dependency injection
>> framework. Are there any hooks/entry-points that we can use to seamlessly
>> integrate a DI-fwk with flink? How does the community recommend the
>> dependency injection integration?
>>
>> 2. Would it be possible to create the object(say spring objects) at a
>> flink-task scope ? Or all these objects(say spring) from a dependency
>> injection fwk are expected to be created at an entire process(JM/TM) level?
>>
>> Can someone please help answer the above questions and help me understand
>> the flink-guarantees better. Any help would be greatly appreciated.
>>
>> Thanks.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Dan Diephouse
@dandiep


Re: A question about flink sql retreact stream

2020-11-04 Thread Jark Wu
Thanks Henry for the detailed example,

I will explain why so many records at time 5.
That is because the retraction mechanism is per-record triggered in Flink
SQL, so there is record amplification in your case.
At time 5, the LAST_VALUE aggregation for stream a will first emit -(1,
12345, 0) and then +(1, 12345, 0).
When the -(1, 12345, 0) arrives at the join operator, it will join the
previous 3 records in stream b, and then send 3 retraction messages.
When the 3 retraction messages arrive at the sum aggregation, it
produces (F 33)(T 21)(F 21)(T 10)(F 10).
In contrast, when the +(1, 12345, 0) arrives the join operator, it sends 3
joined accumulation messages to sum aggregation, and produces (T 12)(F
12)(T 23)(F 23)(T 33) .

In Flink SQL, the mini-batch [1] optimization can reduce
this amplification, because it is triggered in a min-batch of records.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation


On Wed, 4 Nov 2020 at 23:01, Henry Dai  wrote:

>
> Dear flink developers&users
>
> I have a question about flink sql, It gives me a lot of trouble, Thank
> you very much for some help.
>
> Lets's assume we have two data stream, `order` and `order_detail`,
> they are from mysql binlog.
>
> Table `order` schema:
> id  int primary key
> order_idint
> statusint
>
> Table `order_detail` schema:
> id   int primary key
> order_id int
> quantity  int
>
> order : order_detail = 1:N, they are joined by `order_id`
>
> think we have following data sequence, and we compute sum(quantity)
> group by order.oreder_id after they are joined
>
> time orderorder__detail
>   result
> id  order_idstatusid  order_idquantity
> 1   1   12345   0
> 2 1   12345   10
>  (T 10)
> 3 2   12345   11
>  (F 10)(T 21)
> 4 3   12345   12
>  (F 21)(T 33)
> 5   1   12345   1
>   (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)
>
>
> Code:
> tableEnv.registerTableSource("a", new Order());
> tableEnv.registerTableSource("b", new OrderDetail());
> Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
> order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
> tableEnv.registerTable("ax", tbl1);
> Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
> order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
> tableEnv.registerTable("bx", tbl2);
> Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity)
> FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
> DataStream> stream =
> tableEnv.toRetractStream(table, Row.class);
> stream.print();
>
> Result:
> (true,12345,10)
> (false,12345,10)
> (true,12345,21)
> (false,12345,21)
> (true,12345,33)
> (false,12345,33)
> (true,12345,21)
> (false,12345,21)
> (true,12345,10)
> (false,12345,10)
> (true,12345,12)
> (false,12345,12)
> (true,12345,23)
> (false,12345,23)
> (true,12345,33)
>
>
> I cann't understand why flink emit so many records at time 5?
>
> In production, we consume binlog stream from kafka, convert stream to
> flink table, after sql computation, convert result table to flink stream
> where we only
> preserve TRUE message in retract stream, and emit them to downstream
> kafka.
>
> Do we have some method to realize flink dynamic table really (I mean,
> trigger computation only once), when we receive (1,12345,1) from `order`,
> only emit (F 33)(T 33).
>
> --
> best wishes
> hengyu
>


Re: Multi-stream SQL-like processing

2020-11-04 Thread Jark Wu
Yes. The dynamism might be a problem.
Does Kafka Connect support discovering new tables and synchronizing them
dynamically?

Best,
Jark

On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki 
wrote:

> Hi Jark, thanks for joining the discussion!
> I understand your point of view that SQL environment is probably not the
> best for what I was looking to achieve.
> The idea of a configuration tool sounds almost perfect :) Almost , because:
> Without the "StatementSet" that you mentioned at the end I would be
> worried about resource consumption (job & task manager objects, buffers,
> connections) of having one topology per table. That would be a significant
> loss against architecture of Kafka Connect kind.
> With StatementSet I understand this is not a case, but there is another
> issue: We lose the dynamism. That is, the job won't be able to discover new
> tables. We would need to always restart the whole (reconfigured)
> StatementSet job. (Anyway, this approach sounds good enough to try it out
> in my current assignment.)
> The other issue I see is that I still need to define the DSL for the
> configuration(sth like config of KConnect). SQL will not be it, it will
> probably be barely a way to implement the tool.
>
> I would appreciate your comments, Jark.
> Also if anyone would like to add other ideas, feel welcome!
>
> Best,
> Krzysztof
>
> śr., 4 lis 2020 o 09:37 Jark Wu  napisał(a):
>
>> Hi Krzysztof,
>>
>> This is a very interesting idea.
>>
>> I think SQL is not a suitable tool for this use case, because SQL is a
>> structured query language
>>  where the table schema is fixed and never changes during job running.
>>
>> However, I think it can be a configuration tool project on top of Flink
>> SQL.
>> The configuration tool can dynamically generate all the queries according
>> to the config
>>  and submit them in one job.
>>
>> For example, if the configuration says "synchronize from mysql address
>> '' to kafka broker ''",
>> then the generated Flink SQL would like:
>>
>> CREATE TABLE db (
>>   `database_name` STRING,
>>   `table_name` STRING,
>>   `data` BYTES  // encodes all the columns value, can be a better
>> structure for performance
>> ) WITH (
>>   connector = ...   // a new connector scan all tables from the mysql
>> address
>>   url = 'jdbc:mysql://localhost:3306/flink-test'
>> );
>>
>> // the configuration tool will generate multiple INSERT INTO according to
>> how many tables in the DB
>> INSERT INTO kafka_table1
>> SELECT parse_data(table_name, data)   // the parse_data UDF will infer
>> schema from database
>> FROM db WHERE table = 'table1'// or schema registry and
>> deserialize the data into columns with different types.
>>
>> INSERT INTO kafka_table2
>> SELECT parse_data(table_name, data)
>> FROM db WHERE table = 'table2'
>>
>> ...
>>
>> The configuration tool can use `StatementSet` to package all the INSERT
>> INTO queries together and submit them in one job.
>> With the `StatementSet`, the job will share the common source task, so
>> the tables in MySQL are only read once.
>>
>> Best,
>> Jark
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki 
>> wrote:
>>
>>> Hi community, I would like to confront one idea with you.
>>>
>>> I was thinking that Flink SQL could be a Flink's answer for Kafka
>>> Connect (more powerful, with advantages like being decoupled from Kafka).
>>> Flink SQL would be the configuration language for Flink "connectors",
>>> sounds great!.
>>> But one thing does not allow me to implement this idea: There is no
>>> possibility to run SQL-based processing over multiple similar inputs and
>>> produce multiple similar outputs (counted in tens or hundreds).
>>> As a problem example that I need to solve, consider that I have a
>>> hundred of Kafka topics, with similar data in each. And I would like to
>>> sink them to a SQL database. With Kafka connect, I can use a single
>>> connector with JDBC sink, that properly configured will dump each topic to
>>> a separate table properly keeping the schema (based on what is in the
>>> schema registry).
>>> With Flink SQL I would need to run a query per topic/table, I believe.
>>> Similarly with sourcing data. There is this cool project
>>> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
>>> SQL database, but when used with SQL, it can only pull in one table per
>>> query.
>>> These cases can be solved using the datastream API. With it I can code
>>> pulling in/pushing out multiple table streams. But then "the configuration"
>>> is a much bigger effort, because it requires using java code. And that is a
>>> few hours vs few days case, an enormous difference.
>>>
>>> So in the end some questions:
>>> * Do you know how SQL could be extended to support handling such cases
>>> elegantly, with a single job in the end?
>>> * Or do you believe SQL should not be used for that case and we should
>>> come up with a different tool and configuration language? I.e. sth lik

union stream vs multiple operators

2020-11-04 Thread Alexey Trenikhun
Hello,
I have two Kafka topics ("A" and "B") which provide similar structure wise data 
but with different load pattern, for example hundreds records per second  in 
first topic while 10 records per second in second topic. Events processed using 
same algorithm and output in common sink, currently my pipeline is something 
like:

Source A->T-\
   -> Sink
Source B->T-/

Instead of this pipeline I can union two streams and send them to common 
KeyedProcessFunction T:

Source A-\
 (union)-> T -> Sink
Source B-/

What are pros and cons of these approaches?

Thanks,
Alexey


Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-04 Thread Fuyao Li
Hi Flink Users and Community,

For the first part of the question, the 12 hour time difference is caused
by a time extraction bug myself. I can get the time translated correctly
now. The type cast problem does have some workarounds to solve it..

My major blocker right now is the onTimer part is not properly triggered. I
guess it is caused by failing to configure the correct watermarks &
timestamp assigners. Please give me some insights.

1. If I don't chain the assignTimestampsAndWatermarks() method in together
with keyedBy().. and process().. method. The context.timestamp() in my
processElement() function will be null. Is this some expected behavior? The
Flink examples didn't chain it together. (see example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
)
2. If I use registerEventTimeTimer() in processElement(). The onTimer
method will not be triggered. However, I can trigger the onTimer method if
I simply change it to registerProcessingTimeTimer(). I am using the
settings below in the stream env.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);

My code for method the process chain:
retractStream

.assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((booleanRowTuple2,
timestamp) -> {
Row rowData = booleanRowTuple2.f1;
LocalDateTime headerTime =
(LocalDateTime)rowData.getField(3);
LocalDateTime linesTime =
(LocalDateTime)rowData.getField(7);

LocalDateTime latestDBUpdateTime = null;
if (headerTime != null && linesTime != null) {
latestDBUpdateTime =
headerTime.isAfter(linesTime) ? headerTime : linesTime;
}
else {
latestDBUpdateTime = (headerTime != null) ?
headerTime : linesTime;
}
if (latestDBUpdateTime != null) {
return
latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
}
// In the worst case, we use system time
instead, which should never be reached.
return System.currentTimeMillis();
}))
//  .assignTimestampsAndWatermarks(new MyWaterStrategy())  //
second way to create watermark, doesn't work
.keyBy(value -> {
// There could be null fields for header invoice_id
field
String invoice_id_key = (String)value.f1.getField(0);
if (invoice_id_key == null) {
invoice_id_key = (String)value.f1.getField(4);
}
return invoice_id_key;
})
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);

Best regards,
Fuyao

On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li  wrote:

> Hi Flink Community,
>
> I am doing some research work on Flink Datastream and Table API and I meet
> two major problems. I am using Flink 1.11.2, scala version 2.11, java 8. My
> use case looks like this. I plan to write a data processing pipeline with
> two stages. My goal is to construct a business object containing
> information from several Kafka streams with a primary key and emit the
> complete business object if such primary key doesn't  appear in the
> pipeline for 10 seconds.
>
> In the first stage, I first consume three Kafka streams and transform it
> to Flink Datastream using a deserialization schema containing some type and
> date format transformation, and then I register these data streams as Table
> and do a full outer join one by one using Table API. I also add query
> configuration for this to avoid excessive state. The primary key is also
> the join key.
>
> In the second stage, I transform the joined table to a retracted stream
> and put it into KeyedProcessFunction to generate the business object if the
> business object's primary key is inactive for 10 second.
>
> Is this way of handling the data the suggested approach? (I understand I
> can directly consume kafka data in Table API. I haven't tried that yet,
> maybe that's better?) Any suggestion is welcomed. During implementing this,
> I meet two major problems and several smaller questions under each problem.
>
>
> 1. Some type cast behavior of retracted streams I can't explain.
>
> (1) In the initial stage, I registered some field as *java.sql

Re: Multi-stream SQL-like processing

2020-11-04 Thread Krzysztof Zarzycki
Hi Jark, thanks for joining the discussion!
I understand your point of view that SQL environment is probably not the
best for what I was looking to achieve.
The idea of a configuration tool sounds almost perfect :) Almost , because:
Without the "StatementSet" that you mentioned at the end I would be worried
about resource consumption (job & task manager objects, buffers,
connections) of having one topology per table. That would be a significant
loss against architecture of Kafka Connect kind.
With StatementSet I understand this is not a case, but there is another
issue: We lose the dynamism. That is, the job won't be able to discover new
tables. We would need to always restart the whole (reconfigured)
StatementSet job. (Anyway, this approach sounds good enough to try it out
in my current assignment.)
The other issue I see is that I still need to define the DSL for the
configuration(sth like config of KConnect). SQL will not be it, it will
probably be barely a way to implement the tool.

I would appreciate your comments, Jark.
Also if anyone would like to add other ideas, feel welcome!

Best,
Krzysztof

śr., 4 lis 2020 o 09:37 Jark Wu  napisał(a):

> Hi Krzysztof,
>
> This is a very interesting idea.
>
> I think SQL is not a suitable tool for this use case, because SQL is a
> structured query language
>  where the table schema is fixed and never changes during job running.
>
> However, I think it can be a configuration tool project on top of Flink
> SQL.
> The configuration tool can dynamically generate all the queries according
> to the config
>  and submit them in one job.
>
> For example, if the configuration says "synchronize from mysql address
> '' to kafka broker ''",
> then the generated Flink SQL would like:
>
> CREATE TABLE db (
>   `database_name` STRING,
>   `table_name` STRING,
>   `data` BYTES  // encodes all the columns value, can be a better
> structure for performance
> ) WITH (
>   connector = ...   // a new connector scan all tables from the mysql
> address
>   url = 'jdbc:mysql://localhost:3306/flink-test'
> );
>
> // the configuration tool will generate multiple INSERT INTO according to
> how many tables in the DB
> INSERT INTO kafka_table1
> SELECT parse_data(table_name, data)   // the parse_data UDF will infer
> schema from database
> FROM db WHERE table = 'table1'// or schema registry and
> deserialize the data into columns with different types.
>
> INSERT INTO kafka_table2
> SELECT parse_data(table_name, data)
> FROM db WHERE table = 'table2'
>
> ...
>
> The configuration tool can use `StatementSet` to package all the INSERT
> INTO queries together and submit them in one job.
> With the `StatementSet`, the job will share the common source task, so the
> tables in MySQL are only read once.
>
> Best,
> Jark
>
>
>
>
>
>
>
>
>
>
> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki 
> wrote:
>
>> Hi community, I would like to confront one idea with you.
>>
>> I was thinking that Flink SQL could be a Flink's answer for Kafka Connect
>> (more powerful, with advantages like being decoupled from Kafka). Flink SQL
>> would be the configuration language for Flink "connectors", sounds great!.
>> But one thing does not allow me to implement this idea: There is no
>> possibility to run SQL-based processing over multiple similar inputs and
>> produce multiple similar outputs (counted in tens or hundreds).
>> As a problem example that I need to solve, consider that I have a hundred
>> of Kafka topics, with similar data in each. And I would like to sink them
>> to a SQL database. With Kafka connect, I can use a single connector with
>> JDBC sink, that properly configured will dump each topic to a separate
>> table properly keeping the schema (based on what is in the schema
>> registry).
>> With Flink SQL I would need to run a query per topic/table, I believe.
>> Similarly with sourcing data. There is this cool project
>> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
>> SQL database, but when used with SQL, it can only pull in one table per
>> query.
>> These cases can be solved using the datastream API. With it I can code
>> pulling in/pushing out multiple table streams. But then "the configuration"
>> is a much bigger effort, because it requires using java code. And that is a
>> few hours vs few days case, an enormous difference.
>>
>> So in the end some questions:
>> * Do you know how SQL could be extended to support handling such cases
>> elegantly, with a single job in the end?
>> * Or do you believe SQL should not be used for that case and we should
>> come up with a different tool and configuration language? I.e. sth like
>> Kafka Connect
>> * Do you know of any other project that implements this idea?
>>
>> I definitely believe that this is a great use case for Flink to be an
>> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore
>> there is a need for a solution for my case.
>>
>> Thanks for answering!
>> Krzysztof
>>
>> [1] ht

Re: Error while retrieving the leader gateway after making Flink config changes

2020-11-04 Thread Claude M
This issue had to do with the update strategy for the Flink deployment.
When I changed it to the following, it will work:

  strategy:
type: RollingUpdate
rollingUpdate:
  maxSurge: 0
  maxUnavailable: 1

On Tue, Nov 3, 2020 at 1:39 PM Robert Metzger  wrote:

> Thanks a lot for providing the logs.
>
> My theory of what is happening is the following:
> 1. You are probably increasing the memory for the JobManager, when
> changing the  jobmanager.memory.flink.size configuration value
> 2. Due to this changed memory configuration, Kubernetes, Docker or the
> Linux kernel are killing your JobManager process because it allocates too
> much memory.
>
> Flink should not stop like this. Fatal errors are logged explicitly, kill
> signals are also logged.
> Can you check Kubernetes, Docker, Linux for any signs that they are
> killing your JobManager?
>
>
>
> On Tue, Nov 3, 2020 at 7:06 PM Claude M  wrote:
>
>> Thanks for your reply Robert.  Please see attached log from the job
>> manager, the last line is the only thing I see different from a pod that
>> starts up successfully.
>>
>> On Tue, Nov 3, 2020 at 10:41 AM Robert Metzger 
>> wrote:
>>
>>> Hi Claude,
>>>
>>> I agree that you should be able to restart individual pods with a
>>> changed memory configuration. Can you share the full Jobmanager log of the
>>> failed restart attempt?
>>>
>>> I don't think that the log statement you've posted explains a start
>>> failure.
>>>
>>> Regards,
>>> Robert
>>>
>>> On Tue, Nov 3, 2020 at 2:33 AM Claude M  wrote:
>>>

 Hello,

 I have Flink 1.10.2 installed in a Kubernetes cluster.
 Anytime I make a change to the flink.conf, the Flink jobmanager pod
 fails to restart.
 For example, I modified the following memory setting in the flink.conf:
 jobmanager.memory.flink.size.
 After I deploy the change, the pod fails to restart and the following
 is seen in the log:

 WARN
  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  -
 Error while retrieving the leader gateway. Retrying to connect to
 akka.tcp://flink@flink-jobmanager:50010/user/dispatcher.

 The pod can be restored by doing one of the following but these are not
 acceptable solutions:

- Revert the changes made to the flink.conf to the previous settings
- Remove the Flink Kubernetes deployment before doing a deployment
- Delete the flink cluster folder in Zookeeper

 I don't understand why making any changes in the flink.conf causes this
 problem.
 Any help is appreciated.


 Thank You

>>>


Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-04 Thread Rex Fenley
Thank you for the info!

Is there a timetable for when the next version with this change might
release?

On Wed, Nov 4, 2020 at 2:44 AM Timo Walther  wrote:

> Hi Rex,
>
> sorry for the late reply. POJOs will have much better support in the
> upcoming Flink versions because they have been fully integrated with the
> new table type system mentioned in FLIP-37 [1] (e.g. support for
> immutable POJOs and nested DataTypeHints etc).
>
> For queries, scalar, and table functions you can already use the full
> POJOs within the table ecosystem.
>
> However, the only missing piece is the new translation of POJOs from
> Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
> then I would recommend to either use `Row` as the output of the table
> API or try to use a scalar function before that maps to the desired data
> structure.
>
> I hope this helps a bit.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
> On 02.11.20 21:44, Rex Fenley wrote:
> > My jobs normally use the blink planner, I noticed with this test that
> > may not be the case.
> >
> > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley  > > wrote:
> >
> > Flink 1.11.2 with Scala 2.12
> >
> > Error:
> > [info] JobScalaTest:
> > [info] - dummy *** FAILED ***
> > [info]   org.apache.flink.table.api.ValidationException: Field types
> > of query result and registered TableSink  do not match.
> > [info] Query schema: [user: BIGINT, product: ROW<`name`
> > VARCHAR(2147483647), `id` BIGINT>, amount: INT]
> > [info] Sink schema: [user: BIGINT, product:
> > LEGACY('STRUCTURED_TYPE', 'ANY >
>  
> rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAECAAB4cQB-ABZ1cQB-ABoDdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAB0AAZbMC05XStzcQB-ADEAdAAwKFtccHtMfV9cJF1bXHB7T

A question about flink sql retreact stream

2020-11-04 Thread Henry Dai
Dear flink developers&users

I have a question about flink sql, It gives me a lot of trouble, Thank
you very much for some help.

Lets's assume we have two data stream, `order` and `order_detail`, they
are from mysql binlog.

Table `order` schema:
id  int primary key
order_idint
statusint

Table `order_detail` schema:
id   int primary key
order_id int
quantity  int

order : order_detail = 1:N, they are joined by `order_id`

think we have following data sequence, and we compute sum(quantity)
group by order.oreder_id after they are joined

time orderorder__detail
result
id  order_idstatusid  order_idquantity
1   1   12345   0
2 1   12345   10
 (T 10)
3 2   12345   11
 (F 10)(T 21)
4 3   12345   12
 (F 21)(T 33)
5   1   12345   1
  (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)


Code:
tableEnv.registerTableSource("a", new Order());
tableEnv.registerTableSource("b", new OrderDetail());
Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
tableEnv.registerTable("ax", tbl1);
Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
tableEnv.registerTable("bx", tbl2);
Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity)
FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
DataStream> stream =
tableEnv.toRetractStream(table, Row.class);
stream.print();

Result:
(true,12345,10)
(false,12345,10)
(true,12345,21)
(false,12345,21)
(true,12345,33)
(false,12345,33)
(true,12345,21)
(false,12345,21)
(true,12345,10)
(false,12345,10)
(true,12345,12)
(false,12345,12)
(true,12345,23)
(false,12345,23)
(true,12345,33)


I cann't understand why flink emit so many records at time 5?

In production, we consume binlog stream from kafka, convert stream to
flink table, after sql computation, convert result table to flink stream
where we only
preserve TRUE message in retract stream, and emit them to downstream
kafka.

Do we have some method to realize flink dynamic table really (I mean,
trigger computation only once), when we receive (1,12345,1) from `order`,
only emit (F 33)(T 33).

-- 
best wishes
hengyu


A question about flink sql retreact stream

2020-11-04 Thread Henry Dai
Dear flink developers&users

I have a question about flink sql, It gives me a lot of trouble, Thank
you very much for some help.

Lets's assume we have two data stream, `order` and `order_detail`, they
are from mysql binlog.

Table `order` schema:
id  int primary key
order_idint
statusint

Table `order_detail` schema:
id   int primary key
order_id int
quantity  int

order : order_detail = 1:N, they are joined by `order_id`

think we have following data sequence, and we compute sum(quantity)
group by order.oreder_id after they are joined

time orderorder__detail
result
id  order_idstatusid  order_idquantity
1   1   12345   0
2 1   12345   10
 (T 10)
3 2   12345   11
 (F 10)(T 21)
4 3   12345   12
 (F 21)(T 33)
5   1   12345   1
  (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)


Code:
tableEnv.registerTableSource("a", new Order());
tableEnv.registerTableSource("b", new OrderDetail());
Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
tableEnv.registerTable("ax", tbl1);
Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
tableEnv.registerTable("bx", tbl2);
Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity)
FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
DataStream> stream =
tableEnv.toRetractStream(table, Row.class);
stream.print();

Result:
(true,12345,10)
(false,12345,10)
(true,12345,21)
(false,12345,21)
(true,12345,33)
(false,12345,33)
(true,12345,21)
(false,12345,21)
(true,12345,10)
(false,12345,10)
(true,12345,12)
(false,12345,12)
(true,12345,23)
(false,12345,23)
(true,12345,33)


I cann't understand why flink emit so many records at time 5?

In production, we consume binlog stream from kafka, convert stream to
flink table, after sql computation, convert result table to flink stream
where we only
preserve TRUE message in retract stream, and emit them to downstream
kafka.

Do we have some method to realize flink dynamic table really (I mean,
trigger computation only once), when we receive (1,12345,1) from `order`,
only emit (F 33)(T 33).

-- 
best wishes
hengyu


A question of Flink SQL aggregation

2020-11-04 Thread Henry Dai
Hi,

Let's assume we have two stream, order stream& order detail stream, they
are from mysql binlog.

Table `order` schema: id primary key, order_id and order_status
Table `order_detail` schema: id primary key, order_id and quantity

one order item have several order_detail items

if we have following data sequence:

   order


-- 
best wishes
hengyu


a couple of memory questions

2020-11-04 Thread Colletta, Edward
Using Flink 1.9.2 with FsStateBackend, Session cluster.


  1.  Does heap state get cleaned up when a job is cancelled?

We have jobs that we run on a daily basis.  We start each morning and cancel 
each evening.  We noticed that the process size does not seem to shrink.  We 
are looking at the resident size of the process with ps and also the USED 
column for Heap on the taskmanager page of the flink dashboard.

  1.  How can I examine the usage of Flink Managed Memory?

 The configuration documentation seems to indicate this is used for batch jobs, 
and we are only using the Streaming API.   I reduced 
taskmanager.memory.fraction to 0.3, but I think this is still reserving too 
much memory to an area we will not be using.




Re: Increase in parallelism has very bad impact on performance

2020-11-04 Thread Arvid Heise
Hi Sidney,

could you describe how your aggregation works and how your current pipeline
looks like? Is the aggregation partially applied before shuffling the data?
I'm a bit lost on how aggregation without keyby looks like.

A decrease in throughput may also be a result of more overhead and less
available memory. It also depends on how long you wait with your
measurements after starting (as more parallelism = slower start). The way
you measure may greatly influence the result and might explain the
fluctuation.

Also how does your slot distribution now look like?

Btw from your description, it still sounds like a big country may slow down
the overall process. So a histogram over the countries would be
very helpful.

On Wed, Nov 4, 2020 at 12:01 PM Sidney Feiner 
wrote:

> You're right, this is scale problem (for me that's performance).
>
> As for what you were saying about the data skew, that could be it so I
> tried running the job without using keyBy and I wrote an aggregator that
> accumulates the events per country and then wrote a FlatMap that takes that
> map and returns a stream of events per country. I was hoping that that way
> I won't have skewing problems as all the data is actually handled in the
> same tasks (and I don't mind that).
>
> But even after this change, I'm experiencing the same scaling limit.
>
> And I actually found something inefficient in my code and now that I've
> fixed it, the app seems to scale a bit better. I also decreased the time
> window which increased the scaling some more.
>
> So now I still hit a scaling limit but it seems a bit better already:
> Parallelism Throughput/sec Throughput/slot/sec Increase in parallelism (%) 
> Increase
> in events (%) % Of expected increase
> 1 2,630 2,630 - - -
> 15 16,340 1,180 1500% 621% 41.4%
> 30 22,100 736 200% 135% 67.5%
> 50 16,600 332 166% 75% 45%
> The last column is to check how "linearly" the app actually scales. Best
> case scenario is 100% when the increase in parallelism is 200% and the
> increase in handled events increases by 200% as well.
>
> It is pretty clear to see that my app is far from scaling linearly, and
> its throughput even *decreases* from parallelism 30 to parallelism 50.
>
> What could cause these weird and unstable numbers of % in expected
> increase even though I'm not using a KeyedWindow anymore?
>
>
>
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
> --
> *From:* Arvid Heise 
> *Sent:* Tuesday, November 3, 2020 8:54 PM
> *To:* Sidney Feiner 
> *Cc:* Yangze Guo ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Increase in parallelism has very bad impact on performance
>
> Hi Sidney,
>
> you might recheck your first message. Either it's incorrectly written or
> you are a victim of a fallacy.
>
> With 1 slot, you have 1.6K events per slot = 1.6K overall.
> With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a
> decent speedup.
> With 10, you still have 6K overall.
>
> So you haven't experienced any performance degradation (what your title
> suggests). It's rather that you hit a practical scale-up/out boundary.
>
> Now of course, you'd like to see your system to scale beyond that 6K into
> the realm of 45k per second and I can assure you that it's well possible in
> your setup. However, we need to figure out why it's not doing it.
>
> The most likely reason that would explain the behavior is indeed data
> skew. Your observation also matches it: even though you distribute your
> job, some slots are doing much more work than other slots.
>
> So the first thing that you should do is to plot a histogram over country
> codes. What you will likely see is that 20% of all records belong to the
> same country (US?). That's where your practical scale-up boundary comes
> from. Since you group by country, there is no way to calculate it in a
> distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm
> assuming it's the window operator (or rather everything after HASH) for now.
>
> Btw concerning hash collisions: just because you have in theory some
> 26^2=676 combinations in a 2-letter ASCII string, you have <200 countries =
> unique values. Moreover, two values with the same hash is very common as
> the hash is remapped to your parallelism. So if your parallelism is 5, you
> have only 5 hash buckets where you need to put in 40 countries on average.
> Let's assume you have US, CN, UK as your countries with most entries and a
> good hash function remapped to 5 buckets, then you have 4% probability of
> having them all assigned to the same bucket, but almost 60% of two of them
> being in the same bucket.
>
> Nevertheless, even without collisions your scalability is limited by the
> largest country. That's independent of the used system and inherent to your
> query. So if you indeed see this data skew, then the best way is to modify
> the query. Possible

Re: Increase in parallelism has very bad impact on performance

2020-11-04 Thread Sidney Feiner
You're right, this is scale problem (for me that's performance).

As for what you were saying about the data skew, that could be it so I tried 
running the job without using keyBy and I wrote an aggregator that accumulates 
the events per country and then wrote a FlatMap that takes that map and returns 
a stream of events per country. I was hoping that that way I won't have skewing 
problems as all the data is actually handled in the same tasks (and I don't 
mind that).

But even after this change, I'm experiencing the same scaling limit.

And I actually found something inefficient in my code and now that I've fixed 
it, the app seems to scale a bit better. I also decreased the time window which 
increased the scaling some more.

So now I still hit a scaling limit but it seems a bit better already:
Parallelism Throughput/sec  Throughput/slot/sec Increase in parallelism 
(%) Increase in events (%)  % Of expected increase
1   2,630   2,630   -   -   -
15  16,340  1,180   1500%   621%41.4%
30  22,100  736 200%135%67.5%
50  16,600  332 166%75% 45%

The last column is to check how "linearly" the app actually scales. Best case 
scenario is 100% when the increase in parallelism is 200% and the increase in 
handled events increases by 200% as well.

It is pretty clear to see that my app is far from scaling linearly, and its 
throughput even decreases from parallelism 30 to parallelism 50.

What could cause these weird and unstable numbers of % in expected increase 
even though I'm not using a KeyedWindow anymore?





Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Arvid Heise 
Sent: Tuesday, November 3, 2020 8:54 PM
To: Sidney Feiner 
Cc: Yangze Guo ; user@flink.apache.org 

Subject: Re: Increase in parallelism has very bad impact on performance

Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you 
are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a 
decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title 
suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the 
realm of 45k per second and I can assure you that it's well possible in your 
setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. 
Your observation also matches it: even though you distribute your job, some 
slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country 
codes. What you will likely see is that 20% of all records belong to the same 
country (US?). That's where your practical scale-up boundary comes from. Since 
you group by country, there is no way to calculate it in a distributed manner. 
Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the 
window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 
combinations in a 2-letter ASCII string, you have <200 countries = unique 
values. Moreover, two values with the same hash is very common as the hash is 
remapped to your parallelism. So if your parallelism is 5, you have only 5 hash 
buckets where you need to put in 40 countries on average. Let's assume you have 
US, CN, UK as your countries with most entries and a good hash function 
remapped to 5 buckets, then you have 4% probability of having them all assigned 
to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the 
largest country. That's independent of the used system and inherent to your 
query. So if you indeed see this data skew, then the best way is to modify the 
query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due 
to semantics.
- You use multiple aggregation steps (country + state), then country. 
Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a 
filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use 
machines that have strong CPU slots. (it's also no use to go beyond parallelism 
of 10)

I also noticed that you have several forward channels. There is usually no need 
for them. Task chaining is much faster. Especially if you enableObjectReuse [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html


On Tue, Nov 3, 2020 at 3:14 PM Sidney Fein

Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Flavio Pompermaier
Here it is: https://issues.apache.org/jira/browse/FLINK-19969

Best,
Flavio

On Wed, Nov 4, 2020 at 11:08 AM Kostas Kloudas  wrote:

> Could you also post the ticket here @Flavio Pompermaier  and we will
> have a look before the upcoming release.
>
> Thanks,
> Kostas
>
> On Wed, Nov 4, 2020 at 10:58 AM Chesnay Schepler 
> wrote:
> >
> > Good find, this is an oversight in the CliFrontendParser; no help is
> > printed for the run-application action.
> > Can you file a ticket?
> >
> > On 11/4/2020 10:53 AM, Flavio Pompermaier wrote:
> > > Hello everybody,
> > > I was looking into currently supported application-modes when
> > > submitting a Flink job so I tried to use the CLI help (I'm using Flink
> > > 11.0) but I can't find any help about he action "run-application" at
> > > the moment...am I wrong? Is there any JIRA to address this missing
> > > documentation?
> > >
> > > Best,
> > > Flavio
> >
> >


Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-04 Thread Timo Walther

Hi Rex,

sorry for the late reply. POJOs will have much better support in the 
upcoming Flink versions because they have been fully integrated with the 
new table type system mentioned in FLIP-37 [1] (e.g. support for 
immutable POJOs and nested DataTypeHints etc).


For queries, scalar, and table functions you can already use the full 
POJOs within the table ecosystem.


However, the only missing piece is the new translation of POJOs from 
Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until 
then I would recommend to either use `Row` as the output of the table 
API or try to use a scalar function before that maps to the desired data 
structure.


I hope this helps a bit.

Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API


On 02.11.20 21:44, Rex Fenley wrote:
My jobs normally use the blink planner, I noticed with this test that 
may not be the case.


On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley > wrote:


Flink 1.11.2 with Scala 2.12

Error:
[info] JobScalaTest:
[info] - dummy *** FAILED ***
[info]   org.apache.flink.table.api.ValidationException: Field types
of query result and registered TableSink  do not match.
[info] Query schema: [user: BIGINT, product: ROW<`name`
VARCHAR(2147483647), `id` BIGINT>, amount: INT]
[info] Sink schema: [user: BIGINT, product:
LEGACY('STRUCTURED_TYPE', 'ANY'),
amount: INT]
[info]   at

org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
[info]   at

org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
[info]   at

org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
[info]   at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
[info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
[info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
[info]   at
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
[info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
[info]   at
scala.collection.IterableLike.foreach$(IterableLike.scala:73)
[info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

Code:
import com.remind.graph.people.PeopleJobScala

import org.scalatest.funsuite._
import org.scalatest.BeforeAndAfter

import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.table.runtime.util._
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.api.common.state.ListState
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.runtime.state.FunctionSnapshotContext
import org.apache.flink.types.Row

import java.io.Serializable;
import java.sql.Timestamp;
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicInteger
import java.{util => ju}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try

caseclassOrder(user: Long, product: ProductItem, amount: Int) {
defthis() {
this(0, null, 0)
}

overridedeftoString(): String = {
return"Order{"+
"user="+ user +
", product='"+ product + '\''+
", amount="+ amount +
'}';
}
}

caseclassProductItem(name: String, id: Long) {
defthis() {
this(null, 0)
}

overridedeftoString(): String = {
return"Product{"+
"name='"+ name + '\''+
", id="+ id +
'}';
}
}

classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
varenv: StreamExecutionEnvironment = _
vartEnv: StreamTableEnvironment = _

before {
this.env = StreamExecutionEnvironment.getExecutionEnvironment
this.env.setParallelism(2)
this.env.getConfig.enableObjectReuse()
valsetting = EnvironmentSettings.newInstance().inStreamingMode().build()
this.tEnv = StreamTableEnvironment.create(env, setting)
}

after {
StreamTestSink.clear()
// TestValuesTableFactory.clearAllData()
}

defdateFrom(stringDate: String): java.sql.Date = {
valdate = newSimpleDateFormat("dd/MM/")
.parse(stringDate)
returnnewjava.sql.Da

Re: Flink kafka - Message Prioritization

2020-11-04 Thread Aljoscha Krettek

I'm afraid there's nothing in Flink that would make this possible right now.

Have you thought about if this would be possible by using the vanilla 
Kafka Consumer APIs? I'm not sure that it's possible to read messages 
with prioritization using their APIs.


Best,
Aljoscha

On 04.11.20 08:34, Robert Metzger wrote:

Hi Vignesh,

I'm adding Aljoscha to the thread, he might have an idea how to solve this
with the existing Flink APIs (the closest idea I had was the N-ary stream
operator, but I guess that doesn't support backpressuring individual
upstream operators -- side inputs would be needed for that?)

The only somewhat feasible idea I came up with, which only makes sense if
you don't need any exactly once guarantees, is implementing your own Kafka
connector (or forking the existing Kafka connector in Flink (then you could
also get exactly once)).
In this custom Kafka connector, you could, conceptually have two Kafka
consumers each feeding messages into their bounded queue. A third thread is
always emptying the messages from the queue with priority.

Best,
Robert


On Thu, Oct 29, 2020 at 1:29 PM Vignesh Ramesh 
wrote:


Hi,

I have a flink pipeline which reads from a kafka topic does a map
operation(builds an ElasticSearch model) and sinks it to Elasticsearch

*Pipeline-1:*

Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

Now i want some messages to be prioritized(processed quickly not
necessarily in any order). I am okay in creating a new topic and placing
the priority messages in it (or) do a partition based buckets(Ex:
https://github.com/riferrei/bucket-priority-pattern i don't think it's
possible in flink kafka connector since partition assignment is present
inside FlinkKafkaConsumerBase ).

*I tried the below solution:*

I created another topic (topic2 in which i placed the priority messages)
and with it a new Flink pipeline

*Pipeline-2:*

Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

But the problem is, I want to consume topic2 as soon as possible. I can
have a delay/slowness in topic1 because of that. If there is no message in
topic2 then topic1 should be given more priority. But in the above case
both the pipelines are getting processed equally. Increasing the
parallelism of pipeline-2 to a big number doesn't help as when there is no
message in topic2 then topic1 is still very slow(parallelism of topic 2 is
wasted).

How can i achieve this using Flink Kafka connector? Is it possible to
achieve it in any other way?


Regards,

Vignesh







Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Kostas Kloudas
Could you also post the ticket here @Flavio Pompermaier  and we will
have a look before the upcoming release.

Thanks,
Kostas

On Wed, Nov 4, 2020 at 10:58 AM Chesnay Schepler  wrote:
>
> Good find, this is an oversight in the CliFrontendParser; no help is
> printed for the run-application action.
> Can you file a ticket?
>
> On 11/4/2020 10:53 AM, Flavio Pompermaier wrote:
> > Hello everybody,
> > I was looking into currently supported application-modes when
> > submitting a Flink job so I tried to use the CLI help (I'm using Flink
> > 11.0) but I can't find any help about he action "run-application" at
> > the moment...am I wrong? Is there any JIRA to address this missing
> > documentation?
> >
> > Best,
> > Flavio
>
>


Re: Does Flink operators synchronize states?

2020-11-04 Thread Yuta Morisawa

Hi Arvid,

Thank you for your detailed answer. I read your answer and finally found 
that I did not understand well on the difference between micro-batch 
model and continuous(one-by-one) processing model. I am familiar with 
micro-batch model but not with continuous one. So, I will search some 
documentation on it. Thank you again your answer.


Regards,
Yuta

On 2020/11/02 1:07, Arvid Heise wrote:

Hi Yuta,

there are indeed a few important differences between Spark and Flink. 
However, please also note that different APIs behave differently on both 
systems. So it would be good if you could clarify what you are doing, so 
I can go in more detail.


As a starting point, you can always check the architecture overview page 
[1] of Flink.


Then keep in mind that Flink approaches the whole scheduling from a 
streaming perspective and Spark from a batch perspective. In Flink, most 
tasks are always running with a few exceptions (pure batch API = Spark 
default), whereas in Spark tasks are usually scheduled in waves with a 
few exceptions (continuous processing in structured streaming = Flink 
default).


Note that there is also quite a bit moving in both systems. In Flink, we 
try to get rid of the old batch subsystem and fully integrate it in 
streaming, such that the actual scheduling mode is determined more 
dynamically for parts of the whole application. Think of a job where you 
need to do some batch preprocessing to build up some dictionary and then 
use it to enrich streaming data. During next year, Flink should be able 
to fully exploit the data properties of streaming and batch tasks of the 
same application. In Spark, they also seem to work towards supporting 
more complex applications in continuous processing mode (so beyond the 
current embarrassing parallel operations), for which they may also need 
to revise their scheduling model.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html


On Fri, Oct 30, 2020 at 10:05 AM Yuta Morisawa 
mailto:yu-moris...@kddi-research.jp>> wrote:


Hello,

I am wondering whether Flink operators synchronize their execution
states like Apache Spark. In Apache Spark, the master decides
everything, for example, it schedules jobs and assigns tasks to
Executors so that each job is executed in a synchronized way. But Flink
looks different. It appears that each TaskManagers are dedicated to
specific operators and they asynchronously execute tasks. Is this
understanding correct?

In short, I want to know how Flink assigns tasks to TaskManagers and
how
manage them because I think it is important for performance tuning.
Could you tell me If you have any detail documentation?

Regards,
Yuta
-- 




--

Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache FlinkConference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
(Toni) Cheng


Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Chesnay Schepler
Good find, this is an oversight in the CliFrontendParser; no help is 
printed for the run-application action.

Can you file a ticket?

On 11/4/2020 10:53 AM, Flavio Pompermaier wrote:

Hello everybody,
I was looking into currently supported application-modes when 
submitting a Flink job so I tried to use the CLI help (I'm using Flink 
11.0) but I can't find any help about he action "run-application" at 
the moment...am I wrong? Is there any JIRA to address this missing 
documentation?


Best,
Flavio





Missing help about run-application action in Flink CLI client

2020-11-04 Thread Flavio Pompermaier
Hello everybody,
I was looking into currently supported application-modes when submitting a
Flink job so I tried to use the CLI help (I'm using Flink 11.0) but I can't
find any help about he action "run-application" at the moment...am I wrong?
Is there any JIRA to address this missing documentation?

Best,
Flavio


Re: Multi-stream SQL-like processing

2020-11-04 Thread Jark Wu
Hi Krzysztof,

This is a very interesting idea.

I think SQL is not a suitable tool for this use case, because SQL is a
structured query language
 where the table schema is fixed and never changes during job running.

However, I think it can be a configuration tool project on top of Flink
SQL.
The configuration tool can dynamically generate all the queries according
to the config
 and submit them in one job.

For example, if the configuration says "synchronize from mysql address
'' to kafka broker ''",
then the generated Flink SQL would like:

CREATE TABLE db (
  `database_name` STRING,
  `table_name` STRING,
  `data` BYTES  // encodes all the columns value, can be a better structure
for performance
) WITH (
  connector = ...   // a new connector scan all tables from the mysql
address
  url = 'jdbc:mysql://localhost:3306/flink-test'
);

// the configuration tool will generate multiple INSERT INTO according to
how many tables in the DB
INSERT INTO kafka_table1
SELECT parse_data(table_name, data)   // the parse_data UDF will infer
schema from database
FROM db WHERE table = 'table1'// or schema registry and
deserialize the data into columns with different types.

INSERT INTO kafka_table2
SELECT parse_data(table_name, data)
FROM db WHERE table = 'table2'

...

The configuration tool can use `StatementSet` to package all the INSERT
INTO queries together and submit them in one job.
With the `StatementSet`, the job will share the common source task, so the
tables in MySQL are only read once.

Best,
Jark










On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki 
wrote:

> Hi community, I would like to confront one idea with you.
>
> I was thinking that Flink SQL could be a Flink's answer for Kafka Connect
> (more powerful, with advantages like being decoupled from Kafka). Flink SQL
> would be the configuration language for Flink "connectors", sounds great!.
> But one thing does not allow me to implement this idea: There is no
> possibility to run SQL-based processing over multiple similar inputs and
> produce multiple similar outputs (counted in tens or hundreds).
> As a problem example that I need to solve, consider that I have a hundred
> of Kafka topics, with similar data in each. And I would like to sink them
> to a SQL database. With Kafka connect, I can use a single connector with
> JDBC sink, that properly configured will dump each topic to a separate
> table properly keeping the schema (based on what is in the schema
> registry).
> With Flink SQL I would need to run a query per topic/table, I believe.
> Similarly with sourcing data. There is this cool project
> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
> SQL database, but when used with SQL, it can only pull in one table per
> query.
> These cases can be solved using the datastream API. With it I can code
> pulling in/pushing out multiple table streams. But then "the configuration"
> is a much bigger effort, because it requires using java code. And that is a
> few hours vs few days case, an enormous difference.
>
> So in the end some questions:
> * Do you know how SQL could be extended to support handling such cases
> elegantly, with a single job in the end?
> * Or do you believe SQL should not be used for that case and we should
> come up with a different tool and configuration language? I.e. sth like
> Kafka Connect
> * Do you know of any other project that implements this idea?
>
> I definitely believe that this is a great use case for Flink to be an
> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore
> there is a need for a solution for my case.
>
> Thanks for answering!
> Krzysztof
>
> [1] https://github.com/ververica/flink-cdc-connectors
>