Re: Flink State Processor API Example - Java

2021-06-24 Thread Guowei Ma
Hi Sandeep

What I understand is that you want to manipulate the state. So I think you
could use the old schema to read the state first, and then write it to a
new schema, instead of using a new schema to read an old schema format data.
In addition, I would like to ask, if you want to do "State Schema
Evolution" ? Flink currently supports avro+pojo's schema evolution[1], and
you don't need to do this manually.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution

Best,
Guowei


On Fri, Jun 25, 2021 at 3:04 AM Sandeep khanzode 
wrote:

> Hello,
>
> 1.] Can someone please share a working example of how to read
> ValueState and MapState from a checkpoint and update it? I
> tried to assemble a working Java example but there are bit and pieces of
> info around.
>
> 2.] I am using Avro 1.7.7 with Flink for state entities since versions
> belong Avro 1.7.7 add a serialVersionUid and then I cannot replace the
> class with a new Avro schema seamlessly. If I update the Avro schema, and
> the Avro Maven plugin runs, a new class with a new serialVersionUid is
> created and that cannot be replaced in the state with the Java exception
> stating that local and state copies are different.  Any example would be
> really appreciated.
>
> Thanks,
> Sandip


Re: Flink checkpoint periodically fail

2021-06-24 Thread Guowei Ma
Hi Qihua
It seems that the job fail because of checkpoint timeout(10min) from the
second picture. I found that the checkpoint fail is because one of your own
customs source could not acknowledge the cp.
So I think you could add some log in your source to figure out what is
happening at the moment.
Best,
Guowei


On Fri, Jun 25, 2021 at 6:21 AM Qihua Yang  wrote:

> Hi,
> We are using flink to consume data from kafka topics and push to elastic
> search cluster. We got an issue. checkpoint success 9 times and fail 2
> times. Those failures cause the job manager to restart. That pattern
> repeats every 20 ~ 25 minutes.
> The flink job has 72 subtasks. For every failed checkpoint, there are a
> few subtasks didn't acknowledge the checkpoint.
> Flink pod cpu usage and memory usage are pretty low.
> Elastic search node cpu and memory usage are also pretty low.
>
> Does anyone know why? And how to fix it?
> Attached are the graphs
>
> Thanks,
> Qihua
>


Re: Flink checkpoint periodically fail

2021-06-24 Thread JING ZHANG
Hi Qihua,
>From the second picture you provide, the checkpoint 53 is timeout because
subtask which id is 6.
Would you please provide the taskmanage.log of subtask 6, we could try to
find the specific reason for checkpoint 53 failure.
Besides, you said the checkpoint failure appears every 20~25 minutes, is
every failure due to timeout?

Best regards,
JING ZHANG

Qihua Yang  于2021年6月25日周五 上午6:21写道:

> Hi,
> We are using flink to consume data from kafka topics and push to elastic
> search cluster. We got an issue. checkpoint success 9 times and fail 2
> times. Those failures cause the job manager to restart. That pattern
> repeats every 20 ~ 25 minutes.
> The flink job has 72 subtasks. For every failed checkpoint, there are a
> few subtasks didn't acknowledge the checkpoint.
> Flink pod cpu usage and memory usage are pretty low.
> Elastic search node cpu and memory usage are also pretty low.
>
> Does anyone know why? And how to fix it?
> Attached are the graphs
>
> Thanks,
> Qihua
>


Re: multiple jobs in same flink app

2021-06-24 Thread Yang Wang
It is a big challenge for us if we want to support multiple jobs in the
same application.
For example,
1. If some jobs have finished, we should not run them again when the
JobManager failover. It means we need to store the finished jobs in the HA
services.
2. Some jobs are only run in specific conditions, it will make the recovery
more complicated.

if () {
  env.executeAsync("job1");
}
env.executeAsync("job2");



Best,
Yang

Qihua Yang  于2021年6月25日周五 上午5:13写道:

> Hi,
>
> We are using HA mode. Looks like multiple jobs is not an option for us
> That makes sense! Thanks for your guys' help!
>
> Thanks,
> Qihua
>
>
> On Wed, Jun 23, 2021 at 7:28 PM Yang Wang  wrote:
>
>> Robert is right. We Could only support single job submission in
>> application mode when the HA mode is enabled.
>>
>> This is a known limitation of current application mode implementation.
>>
>> Best,
>> Yang
>>
>> Robert Metzger  于2021年6月24日周四 上午3:54写道:
>>
>>> Thanks a lot for checking again. I just started Flink in Application
>>> mode with a jar that contains two "executeAsync" submissions, and indeed
>>> two jobs are running.
>>>
>>> I think the problem in your case is that you are using High Availability
>>> (I guess, because there are log statements from the
>>> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:
>>>
>>> The Application Mode allows for multi-execute() applications but
 High-Availability is not supported in these cases. High-Availability in
 Application Mode is only supported for single-execute() applications.
>>>
>>>
>>> See also: https://issues.apache.org/jira/browse/FLINK-19358
>>>
>>> Sorry again that I gave you invalid information in my first answer.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/
>>>
>>>
>>>
>>>
>>> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang  wrote:
>>>
 Hi Robert,

 But I saw Flink doc shows application mode can run multiple jobs? Or I
 misunderstand it?

 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/



 *Compared to the Per-Job mode, the Application Mode allows the submission 
 of applications consisting of multiple jobs. The order of job execution is 
 not affected by the deployment mode but by the call used to launch the 
 job. Using execute(), which is blocking, establishes an order and it will 
 lead to the execution of the "next" job being postponed until "this" job 
 finishes. Using executeAsync(), which is non-blocking, will lead to the 
 "next" job starting before "this" job finishes.*


 On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger 
 wrote:

> Hi Qihua,
>
> Application Mode is meant for executing one job at a time, not
> multiple jobs on the same JobManager.
> If you want to do that, you need to use session mode, which allows
> managing multiple jobs on the same JobManager.
>
> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:
>
>> Hi Arvid,
>>
>> Do you know if I can start multiple jobs for a single flink
>> application?
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang 
>> wrote:
>>
>>> Hi,
>>>
>>> I am using application mode.
>>>
>>> Thanks,
>>> Qihua
>>>
>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise 
>>> wrote:
>>>
 Hi Qihua,

 Which execution mode are you using?

 On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang 
 wrote:

> Hi,
>
> Thank you for your reply. What I want is flink app has multiple
> jobs, each job manage a stream. Currently our flink app has only 1 
> job that
> manage multiple streams.
> I did try env.executeAsyc(), but it still doesn't work. From the
> log, when the second executeAsync() was called, it shows " *Job
>  was recovered successfully.*"
> Looks like the second executeAsync() recover the first job. Not
> start a second job.
>
> Thanks,
> Qihua
>
>
> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise 
> wrote:
>
>> Hi,
>>
>> env.execute("Job 1"); is a blocking call. You either have to use
>> executeAsync or use a separate thread to submit the second job. If 
>> Job 1
>> finishes then this would also work by having sequential execution.
>>
>> However, I think what you actually want to do is to use the same
>> env with 2 topologies and 1 single execute like this.
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream1 = env.addSource(new
>> SourceFunction());
>> stream1.addSink(new 

Re: PyFlink kafka producer topic override

2021-06-24 Thread Curt Buechter
Thanks. I will reconsider my architecture.

On Thu, Jun 24, 2021 at 1:37 AM Arvid Heise  wrote:

> Hi,
>
> getTargetTopic can really be used as Curt needs it. So it would be good to
> add to PyFlink as well.
>
> However, I'm a bit skeptical that Kafka can really handle that model well.
> It's usually encouraged to use rather fewer, larger topics and you'd rather
> use partitions here instead of topics. There is a huge overhead on managing
> topics and even some hard limits that can be reached quicker than it
> initially appears.
> Of course, if you want to use ACLs, afaik you cannot define them on
> partitions, so you probably do not have any choice but to do it your way
> (or consider an alternative architecture).
> It's probably also fine if you just have THE one table that you need to
> divide, but it's not a pattern that scales to more tables or to an
> ever-growing number of tenants. Partitions scale much much better.
>
> Also from painful past experiences, you should absolutely create all
> topics beforehand on your production env and really use getTargetTopic just
> for routing.
>
> On Thu, Jun 24, 2021 at 8:10 AM Dian Fu  wrote:
>
>> OK, got it. Then it seems that split streams is also not quite suitable
>> to address your requirements as you still need to iterate over each of the
>> side output corresponding to each topic.
>>
>> Regarding to getTargetTopic [1],  per my understanding, it’s not designed
>> to dynamically assign the topic for an element according to the
>> documentation. However, I’m not 100% sure about this and maybe I missed
>> something.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java#L48
>>
>> Regards,
>> Dian
>>
>> 2021年6月24日 上午11:45,Curt Buechter  写道:
>>
>> Hi Dian,
>> Thanks for the reply.
>> I don't think a filter function makes sense here. I have 2,000 tenants in
>> the source database, and I want all records for a single tenant in a
>> tenant-specific topic. So, with a filter function, if I understand it
>> correctly, I would need 2,000 different filters, which isn't very practical.
>>
>> An example:
>> source_topic(tenant_id, first_name, last_name)
>>
>> destination:
>> tenant1.sink_topic (first_name, last_name)
>> tenant2.sink_topic (first_name, last_name)
>> ...
>> tenant2000.sink_topic (first_name, last_name)
>>
>> On 2021/06/24 03:18:36, Dian Fu  wrote:
>> > You are right that split is still not supported. Does it make sense for
>> you to split the stream using a filter function? There is some overhead
>> compared the built-in stream.split as you need to provide a filter function
>> for each sub-stream and so a record will evaluated multiple times.>
>> >
>> > > 2021年6月24日 上午3:08,Curt Buechter  写道:>
>> > > >
>> > > Hi,>
>> > > New PyFlink user here. Loving it so far. The first major problem I've
>> run into is that I cannot create a Kafka Producer with dynamic topics. I
>> see that this has been available for quite some time in Java with Keyed
>> Serialization using the getTargetTopic method. Another way to do this in
>> Java may be with stream.split(), and adding a different sink for the split
>> streams. Stream splitting is also not available in PyFlink.>
>> > > Am I missing anything? Has anyone implemented this before in PyFlink,
>> or know of a way to make it happen?>
>> > > The use case here is that I'm using a CDC Debezium connector to
>> populate kafka topics from a multi-tenant database, and I'm trying to use
>> PyFlink to split the records into a different topic for each tenant.>
>> > > >
>> > > Thanks>
>> >
>> >
>>
>>
>>


Re: multiple jobs in same flink app

2021-06-24 Thread Qihua Yang
Hi,

We are using HA mode. Looks like multiple jobs is not an option for us
That makes sense! Thanks for your guys' help!

Thanks,
Qihua


On Wed, Jun 23, 2021 at 7:28 PM Yang Wang  wrote:

> Robert is right. We Could only support single job submission in
> application mode when the HA mode is enabled.
>
> This is a known limitation of current application mode implementation.
>
> Best,
> Yang
>
> Robert Metzger  于2021年6月24日周四 上午3:54写道:
>
>> Thanks a lot for checking again. I just started Flink in Application mode
>> with a jar that contains two "executeAsync" submissions, and indeed two
>> jobs are running.
>>
>> I think the problem in your case is that you are using High Availability
>> (I guess, because there are log statements from the
>> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:
>>
>> The Application Mode allows for multi-execute() applications but
>>> High-Availability is not supported in these cases. High-Availability in
>>> Application Mode is only supported for single-execute() applications.
>>
>>
>> See also: https://issues.apache.org/jira/browse/FLINK-19358
>>
>> Sorry again that I gave you invalid information in my first answer.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/
>>
>>
>>
>>
>> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang  wrote:
>>
>>> Hi Robert,
>>>
>>> But I saw Flink doc shows application mode can run multiple jobs? Or I
>>> misunderstand it?
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>>>
>>>
>>>
>>> *Compared to the Per-Job mode, the Application Mode allows the submission 
>>> of applications consisting of multiple jobs. The order of job execution is 
>>> not affected by the deployment mode but by the call used to launch the job. 
>>> Using execute(), which is blocking, establishes an order and it will lead 
>>> to the execution of the "next" job being postponed until "this" job 
>>> finishes. Using executeAsync(), which is non-blocking, will lead to the 
>>> "next" job starting before "this" job finishes.*
>>>
>>>
>>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger 
>>> wrote:
>>>
 Hi Qihua,

 Application Mode is meant for executing one job at a time, not multiple
 jobs on the same JobManager.
 If you want to do that, you need to use session mode, which allows
 managing multiple jobs on the same JobManager.

 On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:

> Hi Arvid,
>
> Do you know if I can start multiple jobs for a single flink
> application?
>
> Thanks,
> Qihua
>
>
> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> I am using application mode.
>>
>> Thanks,
>> Qihua
>>
>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise 
>> wrote:
>>
>>> Hi Qihua,
>>>
>>> Which execution mode are you using?
>>>
>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang 
>>> wrote:
>>>
 Hi,

 Thank you for your reply. What I want is flink app has multiple
 jobs, each job manage a stream. Currently our flink app has only 1 job 
 that
 manage multiple streams.
 I did try env.executeAsyc(), but it still doesn't work. From the
 log, when the second executeAsync() was called, it shows " *Job
  was recovered successfully.*"
 Looks like the second executeAsync() recover the first job. Not
 start a second job.

 Thanks,
 Qihua


 On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise 
 wrote:

> Hi,
>
> env.execute("Job 1"); is a blocking call. You either have to use
> executeAsync or use a separate thread to submit the second job. If 
> Job 1
> finishes then this would also work by having sequential execution.
>
> However, I think what you actually want to do is to use the same
> env with 2 topologies and 1 single execute like this.
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream1 = env.addSource(new
> SourceFunction());
> stream1.addSink(new FlinkKafkaProducer());
> DataStream stream2 = env.addSource(new
> SourceFunction());
> stream2.addSink(new DiscardingSink<>());
> env.execute("Job 1+2");
>
> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang 
> wrote:
>
>> Hi,
>> Does anyone know how to run multiple jobs in same flink
>> application?
>> I did a simple test.  First job was started. I did see the log
>> message, but I didn't see the second job was started, even I saw the 
>> log
>> message.
>>
>> public void testJobs() throws Exception {

upgrade kafka-schema-registry-client to 6.1.2 for Flink-avro-confluent-registry

2021-06-24 Thread Lian Jiang
Hi,

I am using ConfluentRegistryAvroSerializationSchema and blocked by a bug
mentioned by https://github.com/confluentinc/schema-registry/issues/1352
and the final fix is done by
https://github.com/confluentinc/schema-registry/pull/1839. I did not find
any easy workaround.

Flink-avro-confluent-registry (
https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry)
still use 5.5.2 kafka-schema-registry-client

which does not have this fix. Could the next Flink upgrade this dependency
of 6.1.2 which has the fix? Meanwhile, does the Flink community have any
recommended workaround before dependency upgrade? Thanks for any help!


Flink State Processor API Example - Java

2021-06-24 Thread Sandeep khanzode
Hello,

1.] Can someone please share a working example of how to read ValueState 
and MapState from a checkpoint and update it? I tried to assemble a 
working Java example but there are bit and pieces of info around. 

2.] I am using Avro 1.7.7 with Flink for state entities since versions belong 
Avro 1.7.7 add a serialVersionUid and then I cannot replace the class with a 
new Avro schema seamlessly. If I update the Avro schema, and the Avro Maven 
plugin runs, a new class with a new serialVersionUid is created and that cannot 
be replaced in the state with the Java exception stating that local and state 
copies are different.  Any example would be really appreciated.

Thanks,
Sandip

Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Debraj Manna
Thanks Fabian again for the clarification.

On Thu, Jun 24, 2021 at 8:16 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> Sorry for the confusion the FlinkKafkaConsumer is the old source and the
> overhauled one you can find here [1].
> You would need to replace the FlinkKafkaConsumer with the KafkaSource to
> not see the message anymore.
>
> Best
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/2bd8fab01d2aba99a5f690d051e817d10d2c6f24/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75
>
>


Re: Task manager got stuck not restarting due to OOM: Metaspace

2021-06-24 Thread Fritz Budiyanto
I found this configuration which control jvm exit. Let me try it out:

taskmanager.jvm-exit-on-oom: true


> On Jun 24, 2021, at 8:07 AM, Fritz Budiyanto  wrote:
> 
> Hi,
> 
> 
> I have a job which kept on restarting due to a bug, and it brought down the 
> task manager with it due to OOM Metaspace. Please ignore the memory leak for 
> a moment, the problem here is the task manager does not restart and hung 
> which reduce the overall slots capacity. We are running Flink in Kubernetes 
> and task manager stuck is a problem as it reduces cluster capacity.
> Any idea how to make task manager restart upon OOM ? Here is the logs of task 
> manager:
> 
> 
> 2021-06-24 08:59:30,437 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>  [] - [Producer clientId=producer-2] Closing the Kafka producer 
> with timeoutMillis = 0 ms.
> 2021-06-24 08:59:30,437 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>  [] - [Producer clientId=producer-2] Proceeding to force close 
> the producer since pending requests could not be completed within timeout 0 
> ms.
> 2021-06-24 08:59:32,200 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL: 
> Thread 'AsyncOperations-thread-8' produced an uncaught exception. Stopping 
> the process...
> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
> occurred. This can mean two things: either the job requires a larger size of 
> JVM metaspace to load classes or there is a class loading leak. In the first 
> case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
> increased. If the error persists (usually in cluster after several job 
> (re-)submissions) then there is probably a class loading leak in user code or 
> some of its dependencies which has to be investigated and fixed. The task 
> executor has to be shutdown...
> 2021-06-24 08:59:41,759 INFO  
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
> Shutting down TaskExecutorLocalStateStoresManager.
> 2021-06-24 08:59:46,560 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting 
> down BLOB cache
> 2021-06-24 08:59:45,414 INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting 
> down BLOB cache
> 2021-06-24 08:59:59,383 INFO  org.apache.flink.runtime.filecache.FileCache
>  [] - removed file cache directory 
> /tmp/flink-dist-cache-bb7818d6-a691-4121-9fdb-670b7d4182c5
> 
> —
> Fritz
> 



Task manager got stuck not restarting due to OOM: Metaspace

2021-06-24 Thread Fritz Budiyanto
Hi,


I have a job which kept on restarting due to a bug, and it brought down the 
task manager with it due to OOM Metaspace. Please ignore the memory leak for a 
moment, the problem here is the task manager does not restart and hung which 
reduce the overall slots capacity. We are running Flink in Kubernetes and task 
manager stuck is a problem as it reduces cluster capacity.
Any idea how to make task manager restart upon OOM ? Here is the logs of task 
manager:


2021-06-24 08:59:30,437 INFO  org.apache.kafka.clients.producer.KafkaProducer   
   [] - [Producer clientId=producer-2] Closing the Kafka producer with 
timeoutMillis = 0 ms.
2021-06-24 08:59:30,437 INFO  org.apache.kafka.clients.producer.KafkaProducer   
   [] - [Producer clientId=producer-2] Proceeding to force close the 
producer since pending requests could not be completed within timeout 0 ms.
2021-06-24 08:59:32,200 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL: Thread 
'AsyncOperations-thread-8' produced an uncaught exception. Stopping the 
process...
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak in user code or 
some of its dependencies which has to be investigated and fixed. The task 
executor has to be shutdown...
2021-06-24 08:59:41,759 INFO  
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - 
Shutting down TaskExecutorLocalStateStoresManager.
2021-06-24 08:59:46,560 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
   [] - Shutting down BLOB cache
2021-06-24 08:59:45,414 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
   [] - Shutting down BLOB cache
2021-06-24 08:59:59,383 INFO  org.apache.flink.runtime.filecache.FileCache  
   [] - removed file cache directory 
/tmp/flink-dist-cache-bb7818d6-a691-4121-9fdb-670b7d4182c5

—
Fritz



Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Fabian Paul
Hi Debraj,

Sorry for the confusion the FlinkKafkaConsumer is the old source and the 
overhauled one you can find here [1].
You would need to replace the FlinkKafkaConsumer with the KafkaSource to not 
see the message anymore.

Best
Fabian


[1] 
https://github.com/apache/flink/blob/2bd8fab01d2aba99a5f690d051e817d10d2c6f24/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75



Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Debraj Manna
Thanks Fabian for replying. But I am using KafkaSource only.

The code is something like below.

class MetricSource {
  final Set metricSdms = new HashSet();
  ...
   env.addSource(MetricKafkaSourceFactory.createConsumer(jobParams))

.name(MetricSource.class.getSimpleName())
.uid(MetricSource.class.getSimpleName())
.filter(sdm -> metricSdms.contains(sdm.getType()));

}

class MetricKafkaSourceFactory {
  public static FlinkKafkaConsumer
createConsumer(final Configuration jobParams) {
   ...
   return new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),
props);
   }
}


On Wed, Jun 23, 2021 at 7:31 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> By Source Legacy Thread we refer to all sources which do not implement the
> new interface yet [1]. Currently only the Hive, Kafka and FileSource
> are already migrated. In general, there is no sever downside of using the
> older source but in the future we plan only to implement ones based on
> the new operator model.
>
> Best,
> Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>


Re:回复:flinksql写入hive问题

2021-06-24 Thread Geoff nie
非常感谢!我是在sql-client上提交的,修改配置文件已经成功提交了。hive表下分区文件名如下:
part-f3fa374b-c563-49c8-bd7a-b3bd7a5fb66d-0-2


还有两个问题请教下:
1.我通过如下创建了kafka流表,通过flink-sql查
kafka_table 是有数据的,
但是hdfs上却无文件,为什么呢
。
2.hive_table如上已经成功写入数据了,但是为啥flink-sql及hive却读取不到hive表数据呢,SELECT * FROM 
hive_table WHERE dt='2021-06-21' and hr='18';
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector'='kafka', 
'topic'='t_kafka_03', 
'scan.startup.mode'='earliest-offset', 
'properties.bootstrap.servers'='192.168.1.*:19092,192.168.1.*:19092,192.168.1.*:19092',
 
'properties.group.id' = 'testGroup10', 
'format'='json' 
);




烦请帮忙看下。感谢感谢。

















在 2021-06-24 16:12:35,"杨光跃"  写道:
>
>
>检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件:  
>sql-client-defaults.yaml 中加入如下配置:
>configuration:
>  execution.checkpointing.interval: 1000
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.com
>|
>签名由网易邮箱大师定制
>在2021年6月24日 16:09,Geoff nie 写道:
>非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-24 14:47:24,"杨光跃"  写道:
>分区的提交需要开启checkpoint,要配置下
>
>
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.com
>|
>签名由网易邮箱大师定制
>在2021年6月24日 14:44,Geoff nie 写道:
>您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 转发邮件信息 
>发件人:"潘永克" <13311533...@163.com>
>发送日期:2021-02-11 11:12:39
>收件人:d...@flink.apache.org
>主题:flinksql写入hive问题
>
>咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
>".partinprogress"类似的文件。flink1.12.0
>基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
>创建hive表
>SET table.sql-dialect=hive;
>CREATE TABLE hive_table (
>user_id STRING,
>order_amount DOUBLE
>) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>'sink.partition-commit.trigger'='partition-time',
>'sink.partition-commit.delay'='1 min',
>'sink.partition-commit.policy.kind'='metastore,success-file'
>);
>插入数据
>INSERT INTO TABLE hive_table
>SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
>DATE_FORMAT(log_ts, 'HH')
>FROM kafka_table;
>
>
>文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Hadoop3 with Flink

2021-06-24 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

We are using Apache flink 1.12.3 and planning to use Hadoop 3 version. Could 
you please suggest how to use Hadoop 3 with flink distribution.

Regards,
Suchithra




Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-24 Thread Felipe Gutierrez
So, just an update.

When I used this code (My stateful watermark) on the original application
it seems that I can recover the latest watermark and further process the
join with stuck events on it.
I don't even have to create MyCoProcessFunction to implement a low-level
join. The available .coGroup(MyCoGroupFunction) works as a charm.

Thank you again for the clarifications!
Felipe

On Mon, Jun 21, 2021 at 5:18 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hello Piotr,
>
> Could you please help me to ensure that I am implementing it in the
> correct way?
>
> I created the WatermarkFunction [1] based on the FilterFunction from Flink
> and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then
> there are things that I am not sure how to do.
>
> How to make the ListState singleton on all parallel operators?
>
> When my job restarts I don't even have to call "processWatermark(new
> Watermark(maxWatermark));" on the end of the "initializeState()". I can see
> that the job process the previous watermarks before it fails. Is it because
> the source is one that I created at the end of the unit test "MySource"? Or
> is it because I don't have a join on the stream pipeline? I have the output
> of my unit test below at this message in case you are not able to runt the
> test.
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113
>
> $ cd explore-flink/docker/ops-playground-image/java/explore-flink/
> $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark
> test
>
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> initializeState... 0
> initializeState... 0
> initializeState... 0
> initializeState... 0
> maxWatermark: 0
> maxWatermark: 0
> maxWatermark: 0
> maxWatermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> Attempts restart: 0
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> Attempts restart: 0
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> Attempts restart: 0
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> Attempts restart: 0
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> This exception will trigger until the reference time [2021-06-21
> 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE
> THE JOB IS RESTARTING
> initializeState... 1
> initializeState... 1
> initializeState... 1
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 2
> initializeState... 1
> maxWatermark: 2 // HERE IS THE LATEST WATERMARK
> processing watermark: 2 // I PROCESS IT HERE
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 1
> watermarkList recovered: 2
> watermarkList recovered: 2
> watermarkList recovered: 2
> maxWatermark: 2
> maxWatermark: 2
> processing watermark: 2
> processing watermark: 2
> maxWatermark: 2
> processing watermark: 2
> processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY?
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> Attempts restart: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> Attempts restart: 1
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> Attempts restart: 1
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> Attempts restart: 1
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing 

Re: PyFlink kafka producer topic override

2021-06-24 Thread Arvid Heise
Hi Curt,

Upon rechecking the code, you actually don't set the topic through
KafkaContextAware but just directly on the KafkaRecord returned by the
KafkaSerializationSchema.

Sorry for the confusion

Arvid

On Thu, Jun 24, 2021 at 8:36 AM Arvid Heise  wrote:

> Hi,
>
> getTargetTopic can really be used as Curt needs it. So it would be good to
> add to PyFlink as well.
>
> However, I'm a bit skeptical that Kafka can really handle that model well.
> It's usually encouraged to use rather fewer, larger topics and you'd rather
> use partitions here instead of topics. There is a huge overhead on managing
> topics and even some hard limits that can be reached quicker than it
> initially appears.
> Of course, if you want to use ACLs, afaik you cannot define them on
> partitions, so you probably do not have any choice but to do it your way
> (or consider an alternative architecture).
> It's probably also fine if you just have THE one table that you need to
> divide, but it's not a pattern that scales to more tables or to an
> ever-growing number of tenants. Partitions scale much much better.
>
> Also from painful past experiences, you should absolutely create all
> topics beforehand on your production env and really use getTargetTopic just
> for routing.
>
> On Thu, Jun 24, 2021 at 8:10 AM Dian Fu  wrote:
>
>> OK, got it. Then it seems that split streams is also not quite suitable
>> to address your requirements as you still need to iterate over each of the
>> side output corresponding to each topic.
>>
>> Regarding to getTargetTopic [1],  per my understanding, it’s not designed
>> to dynamically assign the topic for an element according to the
>> documentation. However, I’m not 100% sure about this and maybe I missed
>> something.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java#L48
>>
>> Regards,
>> Dian
>>
>> 2021年6月24日 上午11:45,Curt Buechter  写道:
>>
>> Hi Dian,
>> Thanks for the reply.
>> I don't think a filter function makes sense here. I have 2,000 tenants in
>> the source database, and I want all records for a single tenant in a
>> tenant-specific topic. So, with a filter function, if I understand it
>> correctly, I would need 2,000 different filters, which isn't very practical.
>>
>> An example:
>> source_topic(tenant_id, first_name, last_name)
>>
>> destination:
>> tenant1.sink_topic (first_name, last_name)
>> tenant2.sink_topic (first_name, last_name)
>> ...
>> tenant2000.sink_topic (first_name, last_name)
>>
>> On 2021/06/24 03:18:36, Dian Fu  wrote:
>> > You are right that split is still not supported. Does it make sense for
>> you to split the stream using a filter function? There is some overhead
>> compared the built-in stream.split as you need to provide a filter function
>> for each sub-stream and so a record will evaluated multiple times.>
>> >
>> > > 2021年6月24日 上午3:08,Curt Buechter  写道:>
>> > > >
>> > > Hi,>
>> > > New PyFlink user here. Loving it so far. The first major problem I've
>> run into is that I cannot create a Kafka Producer with dynamic topics. I
>> see that this has been available for quite some time in Java with Keyed
>> Serialization using the getTargetTopic method. Another way to do this in
>> Java may be with stream.split(), and adding a different sink for the split
>> streams. Stream splitting is also not available in PyFlink.>
>> > > Am I missing anything? Has anyone implemented this before in PyFlink,
>> or know of a way to make it happen?>
>> > > The use case here is that I'm using a CDC Debezium connector to
>> populate kafka topics from a multi-tenant database, and I'm trying to use
>> PyFlink to split the records into a different topic for each tenant.>
>> > > >
>> > > Thanks>
>> >
>> >
>>
>>
>>


退订

2021-06-24 Thread 王雪华


回复:flinksql写入hive问题

2021-06-24 Thread 杨光跃


检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件:  
sql-client-defaults.yaml 中加入如下配置:
configuration:
  execution.checkpointing.interval: 1000
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月24日 16:09,Geoff nie 写道:
非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢!

















在 2021-06-24 14:47:24,"杨光跃"  写道:
分区的提交需要开启checkpoint,要配置下


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月24日 14:44,Geoff nie 写道:
您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
















在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:



















 转发邮件信息 
发件人:"潘永克" <13311533...@163.com>
发送日期:2021-02-11 11:12:39
收件人:d...@flink.apache.org
主题:flinksql写入hive问题

咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
".partinprogress"类似的文件。flink1.12.0
基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
创建hive表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
插入数据
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;


文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
















Re:回复:flinksql写入hive问题

2021-06-24 Thread Geoff nie
非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢!

















在 2021-06-24 14:47:24,"杨光跃"  写道:
>分区的提交需要开启checkpoint,要配置下
>
>
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.com
>|
>签名由网易邮箱大师定制
>在2021年6月24日 14:44,Geoff nie 写道:
>您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 转发邮件信息 
>发件人:"潘永克" <13311533...@163.com>
>发送日期:2021-02-11 11:12:39
>收件人:d...@flink.apache.org
>主题:flinksql写入hive问题
>
>咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
>".partinprogress"类似的文件。flink1.12.0
>基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
>创建hive表
>SET table.sql-dialect=hive;
>CREATE TABLE hive_table (
>user_id STRING,
>order_amount DOUBLE
>) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>'sink.partition-commit.trigger'='partition-time',
>'sink.partition-commit.delay'='1 min',
>'sink.partition-commit.policy.kind'='metastore,success-file'
>);
>插入数据
>INSERT INTO TABLE hive_table
>SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
>DATE_FORMAT(log_ts, 'HH')
>FROM kafka_table;
>
>
>文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
>
>
>
>
>
>
>
>
>
>
>
>
>
>


flink写clickhouse或者doris出错后怎么容错

2021-06-24 Thread 13631283359
大家好:
我最近做的项目是用flink消费kafka数据写到doris,但过程中发生doris写不进去,kafka的数据已经被消费,造成数据丢失。
请问怎么去做容错处理,希望知道的能回答下,谢谢了

回复:flinksql写入hive问题

2021-06-24 Thread 杨光跃
分区的提交需要开启checkpoint,要配置下


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月24日 14:44,Geoff nie 写道:
您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
















在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:



















 转发邮件信息 
发件人:"潘永克" <13311533...@163.com>
发送日期:2021-02-11 11:12:39
收件人:d...@flink.apache.org
主题:flinksql写入hive问题

咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
".partinprogress"类似的文件。flink1.12.0
基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
创建hive表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
插入数据
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;


文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
















Re: PyFlink kafka producer topic override

2021-06-24 Thread Arvid Heise
Hi,

getTargetTopic can really be used as Curt needs it. So it would be good to
add to PyFlink as well.

However, I'm a bit skeptical that Kafka can really handle that model well.
It's usually encouraged to use rather fewer, larger topics and you'd rather
use partitions here instead of topics. There is a huge overhead on managing
topics and even some hard limits that can be reached quicker than it
initially appears.
Of course, if you want to use ACLs, afaik you cannot define them on
partitions, so you probably do not have any choice but to do it your way
(or consider an alternative architecture).
It's probably also fine if you just have THE one table that you need to
divide, but it's not a pattern that scales to more tables or to an
ever-growing number of tenants. Partitions scale much much better.

Also from painful past experiences, you should absolutely create all topics
beforehand on your production env and really use getTargetTopic just for
routing.

On Thu, Jun 24, 2021 at 8:10 AM Dian Fu  wrote:

> OK, got it. Then it seems that split streams is also not quite suitable to
> address your requirements as you still need to iterate over each of the
> side output corresponding to each topic.
>
> Regarding to getTargetTopic [1],  per my understanding, it’s not designed
> to dynamically assign the topic for an element according to the
> documentation. However, I’m not 100% sure about this and maybe I missed
> something.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java#L48
>
> Regards,
> Dian
>
> 2021年6月24日 上午11:45,Curt Buechter  写道:
>
> Hi Dian,
> Thanks for the reply.
> I don't think a filter function makes sense here. I have 2,000 tenants in
> the source database, and I want all records for a single tenant in a
> tenant-specific topic. So, with a filter function, if I understand it
> correctly, I would need 2,000 different filters, which isn't very practical.
>
> An example:
> source_topic(tenant_id, first_name, last_name)
>
> destination:
> tenant1.sink_topic (first_name, last_name)
> tenant2.sink_topic (first_name, last_name)
> ...
> tenant2000.sink_topic (first_name, last_name)
>
> On 2021/06/24 03:18:36, Dian Fu  wrote:
> > You are right that split is still not supported. Does it make sense for
> you to split the stream using a filter function? There is some overhead
> compared the built-in stream.split as you need to provide a filter function
> for each sub-stream and so a record will evaluated multiple times.>
> >
> > > 2021年6月24日 上午3:08,Curt Buechter  写道:>
> > > >
> > > Hi,>
> > > New PyFlink user here. Loving it so far. The first major problem I've
> run into is that I cannot create a Kafka Producer with dynamic topics. I
> see that this has been available for quite some time in Java with Keyed
> Serialization using the getTargetTopic method. Another way to do this in
> Java may be with stream.split(), and adding a different sink for the split
> streams. Stream splitting is also not available in PyFlink.>
> > > Am I missing anything? Has anyone implemented this before in PyFlink,
> or know of a way to make it happen?>
> > > The use case here is that I'm using a CDC Debezium connector to
> populate kafka topics from a multi-tenant database, and I'm trying to use
> PyFlink to split the records into a different topic for each tenant.>
> > > >
> > > Thanks>
> >
> >
>
>
>


Re:Fw:flinksql写入hive问题

2021-06-24 Thread Geoff nie
您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
















在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:



















 转发邮件信息 
发件人:"潘永克" <13311533...@163.com>
发送日期:2021-02-11 11:12:39
收件人:d...@flink.apache.org
主题:flinksql写入hive问题

咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
".partinprogress"类似的文件。flink1.12.0
基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
创建hive表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);
插入数据
INSERT INTO TABLE hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;


文件始终不落地,一直都是 ".part-inprogress。。。"。文件。







 






 

savepopint 调试

2021-06-24 Thread 周瑞
您好:
  
我的savepoint数据有些问题,想在本地调试,请问在IDEA本地启动Flink程序的时候如何设置从指定savepoint的地址恢复启动

Re: PyFlink kafka producer topic override

2021-06-24 Thread Dian Fu
OK, got it. Then it seems that split streams is also not quite suitable to 
address your requirements as you still need to iterate over each of the side 
output corresponding to each topic.

Regarding to getTargetTopic [1],  per my understanding, it’s not designed to 
dynamically assign the topic for an element according to the documentation. 
However, I’m not 100% sure about this and maybe I missed something.

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java#L48
 


Regards,
Dian

> 2021年6月24日 上午11:45,Curt Buechter  写道:
> 
> Hi Dian,
> Thanks for the reply.
> I don't think a filter function makes sense here. I have 2,000 tenants in the 
> source database, and I want all records for a single tenant in a 
> tenant-specific topic. So, with a filter function, if I understand it 
> correctly, I would need 2,000 different filters, which isn't very practical.
> 
> An example: 
> source_topic(tenant_id, first_name, last_name)
> 
> destination:
> tenant1.sink_topic (first_name, last_name)
> tenant2.sink_topic (first_name, last_name)
> ...
> tenant2000.sink_topic (first_name, last_name)
> 
> On 2021/06/24 03:18:36, Dian Fu mailto:d...@gmail.com>> 
> wrote: 
> > You are right that split is still not supported. Does it make sense for you 
> > to split the stream using a filter function? There is some overhead 
> > compared the built-in stream.split as you need to provide a filter function 
> > for each sub-stream and so a record will evaluated multiple times.> 
> > 
> > > 2021年6月24日 上午3:08,Curt Buechter  > > > 写道:> 
> > > > 
> > > Hi,> 
> > > New PyFlink user here. Loving it so far. The first major problem I've run 
> > > into is that I cannot create a Kafka Producer with dynamic topics. I see 
> > > that this has been available for quite some time in Java with Keyed 
> > > Serialization using the getTargetTopic method. Another way to do this in 
> > > Java may be with stream.split(), and adding a different sink for the 
> > > split streams. Stream splitting is also not available in PyFlink.> 
> > > Am I missing anything? Has anyone implemented this before in PyFlink, or 
> > > know of a way to make it happen?> 
> > > The use case here is that I'm using a CDC Debezium connector to populate 
> > > kafka topics from a multi-tenant database, and I'm trying to use PyFlink 
> > > to split the records into a different topic for each tenant.> 
> > > > 
> > > Thanks> 
> > 
> >