need help about executeinsert,thanks!

2020-10-22 Thread ??????
I'm learningexecuteinsert from the document


My code is:
https://paste.ubuntu.com/p/d2TDdcy7GB/


I guess the functioncreateTemporaryView can create a table needed by the 
function executeinsert




I got:
No table was registered under the name 
`default_catalog`.`default_database`.`OutOrders`.


Could you help correct the code in above link ?


Thanks for your help

Re: Flink 1.8.3 GC issues

2020-10-22 Thread Josson Paul
@Piotr Nowojski   @Nico Kruber 

An update.

I am able to figure out the problem code. A change in the Apache Beam code
is causing this problem.





Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
the Flink’s check point lock. Now the same lock is used by Flink’s timer
service to emit the Watermarks. Flink’s timer service is starved to get
hold of the lock and for some reason it never gets that lock. Aftereffect
 of this situation is that the ‘WaterMark’ is never emitted by Flink’s
timer service.  Because there is no Watermarks flowing through the system,
Sliding Windows are never closed. Data gets accumulated in the Window.



This problem occurs only if we have external lookup calls (like Redis)
happen before the data goes to Sliding Window. Something like below.



KafkaSource à Transforms (Occasional Redis
lookup)->SlidingWindow->Transforms->Kafka Sink





https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
. This is Beam 2.4 and you can see that there is no synchronized block at
line 257 and 270.



https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
. This is Beam 2.15. See the synchronized block introduced in line 264 and
280. We are using Beam 2.15 and Flink 1.8.



Beam introduced this synchronized block because of this bug.
https://issues.apache.org/jira/browse/BEAM-3087



After I removed that synchronized keyword everything started working fine
in my application.



What do you guys think about this?. Why does Beam need a Synchronized block
there?



Beam is using this lock ->

https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282



Thanks,

Josson

On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski  wrote:

> Hi Josson,
>
> The TM logs that you attached are only from a 5 minutes time period. Are
> you sure they are encompassing the period before the potential failure and
> after the potential failure? It would be also nice if you would provide the
> logs matching to the charts (like the one you were providing in the
> previous messages), to correlate events (spike in latency/GC with some
> timestamp from the logs).
>
> I was not asking necessarily to upgrade to Java9, but an updated/bug fixed
> version of Java8 [1].
>
> > 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
> out of memory whatever be the ingestion rate. our Windows are 5
> minutes windows.
> > 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
> Full GC doesn't reclaim space.
>
> In both cases there is the same mechanism for the backpressure. If a
> task's output runs out of buffers to put produced records, it will block
> the task. It can be that between 1.4 and 1.8, with credit based flow
> control changes, the amount of available buffers for the tasks on your
> setup has grown, so the tasks are backpressuring later. This in turn can
> sometimes mean that at any point of time there is more data buffered on the
> operator's state, like `WindowOperator`. I'm not sure what's the
> best/easiest way how to check this:
>
> 1. the amount of buffered data might be visible via metrics [2][3]
> 2. if you enable DEBUG logs, it should be visible via:
>
> > LOG.debug("Using a local buffer pool with {}-{} buffers",
> numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
>
> entry logged by
> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.
>
> Piotrek
>
> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network
>
> pon., 14 wrz 2020 o 05:04 Josson Paul  napisał(a):
>
>> @Piotr Nowojski  @Nico Kruber 
>> I have attached the  Taskmanager/GC/thread dumps in a zip file.
>>
>> I don't see any issues in the TM logs.
>> Tried to upgrade to Java 9. Flink is on top of another platform which
>> threw errors while upgrading to Java 9. I can't do much for now. We will
>> upgrade to Jdk 11 in another 2 months.
>>
>> Regarding the Heap size. The new experiment I did was on 4gb Heap on both
>> Flink 1.4 and Flink 1.8.
>>
>> Questions I am trying to get answered are
>>
>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
>> out of memory whatever be the ingestion rate. our Windows are 5
>> minutes windows.
>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>> fills up fast. When Old-gen space goes beyond 60-70% even 

Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-22 Thread Sivaprasanna
Hi,

Have you dropped or renamed any operator from the original job? If yes, and
you are okay with discarding the state of that operator, you can submit the
job with --allowNonRestoredState or -n.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

-
Sivaprasanna

On Fri, Oct 23, 2020 at 10:48 AM Partha Mishra 
wrote:

> Hi,
>
>
>
> We are trying to save checkpoints for one of the flink job running in
> Flink version 1.9 and tried to resume the same flink job in Flink version
> 1.11.2. We are getting the below error when trying to restore the saved
> checkpoint in the newer flink version. Can
>
>
>
> Cannot map checkpoint/savepoint state for operator
> fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator
> is not available in the new program.
>
>
>
>
>
> *Complete Stack Trace :*
>
> {​"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException:
> Could not execute application.\n\tat
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by:
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not execute
> application.\n\tat
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
> 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not
> execute application.\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
> 7 more\nCaused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute job
> 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
> 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute
> job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat
> com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat
> com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
> java.lang.reflect.Method.invoke(Method.java:498)\n\tat
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)\n\t...
> 13 more\nCaused by: 

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
Hi Timo,
I figured it out, thanks a lot for your help.
Are there any articles detailing the pre-flight and cluster phases? I
couldn't find anything on ci.apache.org/projects/flink and I think this
behaviour should be documented as a warning/note.


On Thu, Oct 22, 2020 at 6:44 PM Timo Walther  wrote:

> Hi Manas,
>
> you can use static variable but you need to make sure that the logic to
> fill the static variable is accessible and executed in all JVMs.
>
> I assume `pipeline.properties` is in your JAR that you submit to the
> cluster right? Then you should be able to access it through a singleton
> pattern instead of a static variable access.
>
> Regards,
> Timo
>
>
> On 22.10.20 14:17, Manas Kale wrote:
> > Sorry, I messed up the code snippet in the earlier mail. The correct one
> > is :
> >
> > public static void main(String[] args) {
> > Properties prop =new Properties();
> >
> > InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> > prop.load(is);
> >
> > HashMap strMap =new HashMap<>();
> >
> > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> > new Config(strMap);
> >
> > ...
> >
> > }
> >
> > public class Config {
> >
> > public static StringCONFIG_TOPIC;
> >
> > publicConfig(HashMap s) {
> >
> >  CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >
> > }
> >
> > }
> >
> > The value of CONFIG_TOPIC in a minicluster is properly loaded but null
> > when run on a cluster.
> >
> >
> > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale  > > wrote:
> >
> > Hi Timo,
> > Thank you for the explanation, I can start to see why I was getting
> > an exception.
> > Are you saying that I cannot use static variables at all when trying
> > to deploy to a cluster? I would like the variables to remain static
> > and not be instance-bound as they are accessed from multiple classes.
> > Based on my understanding of what you said, I implemented the
> > following pattern:
> >
> > public static void main(String[] args) {
> > Properties prop =new Properties();
> >
> > InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> > prop.load(is);
> >
> > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> > new Config(strMap, longMap);
> >
> > ...
> >
> > }
> >
> > public class Config {
> >
> > public static StringCONFIG_TOPIC;
> > public static StringCONFIG_KAFKA;
> >
> > public Config(HashMap s) {
> >  CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >  CONFIG_KAFKA = s.get("CONFIG_KAFKA");
> >
> > }
> >
> > }
> >
> > This produces the same issue. With the easier solution that you
> > listed, are you implying I use multiple instances or a singleton
> > pattern of some sort?
> >
> > On Thu, Oct 22, 2020 at 1:23 PM Timo Walther  > > wrote:
> >
> > Hi Manas,
> >
> > you need to make sure to differentiate between what Flink calls
> > "pre-flight phase" and "cluster phase".
> >
> > The pre-flight phase is were the pipeline is constructed and all
> > functions are instantiated. They are then later serialized and
> > send to
> > the cluster.
> >
> > If you are reading your properties file in the `main()` method
> > and store
> > something in static variables, the content is available locally
> > where
> > the pipeline is constructed (e.g. in the client) but when the
> > function
> > instances are send to the cluster. Those static variables are
> fresh
> > (thus empty) in the cluster JVMs. You need to either make sure
> > that the
> > properties file is read from each task manager again, or easier:
> > pass
> > the parameters as constructor parameters into the instances such
> > that
> > they are shipped together with the function itself.
> >
> > I hope this helps.
> >
> > Regards,
> > Timo
> >
> >
> > On 22.10.20 09:24, Manas Kale wrote:
> >  > Hi,
> >  > I am trying to write some data to a kafka topic and I have
> > the following
> >  > situation:
> >  >
> >  > monitorStateStream
> >  >
> >  > .process(new
> >
>  IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >  >
> >  > /... // Stream that outputs elements of type IDAP2Alarm/
> >  >
> >  > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> >  > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >  >
> >  > private static 
> > FlinkKafkaProducer getFlinkKafkaProducer(String servers,
> > String topic) {
> >  > Properties properties =new Properties();
> >  > properties.setProperty("bootstrap.servers", servers);

Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-22 Thread Partha Mishra
Hi,

We are trying to save checkpoints for one of the flink job running in Flink 
version 1.9 and tried to resume the same flink job in Flink version 1.11.2. We 
are getting the below error when trying to restore the saved checkpoint in the 
newer flink version. Can

Cannot map checkpoint/savepoint state for operator 
fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is 
not available in the new program.


Complete Stack Trace :
{​"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: 
Could not execute application.\n\tat 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.\n\tat 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not 
execute application.\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
 7 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error: Failed to execute job 
'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute 
job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat
 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat
 com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat 
com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
 java.lang.reflect.Method.invoke(Method.java:498)\n\tat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)\n\t...
 13 more\nCaused by: org.apache.flink.runtime.client.JobSubmissionException: 
Failed to submit job.\n\tat 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
 

Re: pyflink和flink版本的兼容性问题

2020-10-22 Thread zhisheng
估计可能会有问题,很多变动

whh_960101  于2020年10月23日周五 上午11:41写道:

> Hi,各位大佬,
>  想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink
> 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12
> 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗


Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 Thread zhisheng
hi

既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说

> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)

建议看看是不是这个转发有问题,只转发了一个节点

Best
zhisheng

Lynn Chen  于2020年10月23日周五 上午11:01写道:

>
>
>
> hi, zhisheng:
>
>
> 我解析 json 后:
> (xxx, xxx, xxx, topic, partition, offset)
> =>
>
>
> (false,1603420582310,"INSERT","test3.order",2,75)
> (false,1603421312803,"INSERT","test3.order",2,76)
> (false,1603421344819,"INSERT","test3.order",2,77)
> (false,1603421344819,"INSERT","test3.order",2,78)
>
>
> 我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到
>
>
> 我的猜想:
>
>
> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>
>
> broker1 配置:
>
>
> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> broker2 配置:
>
>
> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
>
>
>
>
>
> broker3 配置:
>
>
> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> 本机连接kafka:
> properties.setProperty("bootstrap.servers", "xxx-b-1:9797")
>
>
> 是跟这个配置有关吗?
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-23 08:37:14,"zhisheng"  写道:
> >hi
> >
> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
> >
> >eg:
> >
> >  env.addSource(new FlinkKafkaConsumer011<>(
> >parameters.get("topic"),new
> >JSONKeyValueDeserializationSchema(true),
> >buildKafkaProps(parameters))).flatMap(new
> >FlatMapFunction() {
> >@Overridepublic void flatMap(ObjectNode jsonNodes,
> >Collector collector) throws Exception {
> >System.out.println(jsonNodes.get("value"));
> >System.out.println(jsonNodes.get("metadata").get("topic").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("offset").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("partition").asText());
> >   collector.collect(jsonNodes);
> >}}).print();
> >
> >Best
> >
> >zhisheng
> >
> >
> >Lynn Chen  于2020年10月23日周五 上午12:13写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >> hi,  Qijun Feng:
> >>
> >>
> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-04-03 09:27:52,"LakeShen"  写道:
> >> >Hi Qijun,
> >> >
> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
> >> >
> >> >Best,
> >> >LakeShen
> >> >
> >> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
> >> >
> >> >> Dear All,
> >> >>
> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >> >>  现在改成了所有地址,也换了 group.id
> >> >>
> >> >>
> >> >> Properties properties = new Properties();
> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >> >>
> >> >> FlinkKafkaConsumer010 kafkaConsumer010 =
> >> >>new FlinkKafkaConsumer010("behavior-logs_dev",
> new
> >> >> BehaviorLogDeserializationSchema(), properties);
> >> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
> >> >>
> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有
> partiton=1,或者
> >> 2
> >> >> 的,
> >> >>
> >> >> 2020-04-02 14:54:58,532 INFO
> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> >> Consumer subtask 0 creating fetcher with offsets
> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >> >>
> >> >>
> >> >> 是哪里有问题吗?
> >> >>
> >> >>
> >>
>


Re: Trying to run Flink tests

2020-10-22 Thread Xintong Song
Hi Dan,

I tried with the PR you pointed out, and cannot reproduce the problem. So
it should not be related to the PR codes.

I'm running with maven 3.2.5, which is the same version that we use for
running ci tests on AZP for PRs. Your maven log suggests the maven version
on your machine is 3.6.3.
I'm not sure whether the maven version is related, but maybe you can try it
out with 3.2.5. And if it turns out worked, we may fire a issue at the
Apache Maven community.

Thank you~

Xintong Song



On Thu, Oct 22, 2020 at 12:31 PM Dan Hill  wrote:

> 1) I don't see anything useful in it
> 
> .
> 2) This PR .
>
> Thanks for replying, Xintong!
>
> On Wed, Oct 21, 2020 at 7:11 PM Xintong Song 
> wrote:
>
>> Hi Dan,
>>
>> It looks like while your tests are executed and passed, the java
>> processes executing those tests did not exit properly.
>> - Could you try execute the command manually and see if there's any
>> useful outputs? You can find the commands by searching "Command was" in the
>> maven logs.
>> - Quick question: which PR are you working on? By any chance you called
>> `System.exit()` in your codes?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Oct 22, 2020 at 5:59 AM Dan Hill  wrote:
>>
>>> Sure, here's a link
>>> 
>>>  to
>>> the output.  I think for this one I used either:
>>> - `mvn package -e -X -DfailIfNoTests=false`
>>> - or added a `clean` before package.
>>>
>>> On Wed, Oct 21, 2020 at 2:24 AM Xintong Song 
>>> wrote:
>>>
 Would you be able to share the complete maven logs and the command? And
 what is the maven version?

 Thank you~

 Xintong Song



 On Wed, Oct 21, 2020 at 1:37 AM Dan Hill  wrote:

> Hi Xintong!
>
> No changes.  I tried -X and no additional log information is logged.
> -DfailIfNoTests=false does not help.  `-DskipTests` works fine.  I'm going
> to go ahead and create a PR and see if it fails.
>
> Thanks!
> - Dan
>
> On Tue, Oct 20, 2020 at 8:22 AM Xintong Song 
> wrote:
>
>> Hi Dan,
>>
>> The 'mvn package' command automatically includes 'mvn verify', which
>> triggers the test cases. You can skip the tests with 'mvn package
>> -DskipTests'. You can rely on the ci-tests running on Azure Pipeline,
>> either in your own workspace or in the PR.
>>
>> If it is intended to execute the tests locally, you can try the
>> following actions. I'm not sure whether that helps though.
>> - Try to add '-DfailIfNoTests=false' to your maven command.
>> - Execute the maven command with '-X' to print all the debug logs.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Oct 20, 2020 at 3:48 PM Dan Hill 
>> wrote:
>>
>>> I forked Flink to work on a PR.  When I run `mvn clean package` from
>>> a clean branch, Maven says the runtime tests failed but the logs do not
>>> appear to have details on the failure.  Do I have to do anything to run
>>> these?
>>>
>>>
>>>
>>> ...
>>>
>>> [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time
>>> elapsed: 2.24 s - in org.apache.flink.runtime.taskexecutor.
>>> BackPressureSampleServiceTest
>>>
>>> [INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time
>>> elapsed: 1.872 s - in org.apache.flink.runtime.taskexecutor.partition.
>>> PartitionTableTest
>>>
>>> [DEBUG] Forking command line: /bin/sh -c cd
>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime &&
>>> /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/jre/bin/java
>>> -Xms256m -Xmx2048m -Dmvn.forkNumber=9 -XX:+UseG1GC -jar
>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire/surefirebooter3345042301183877750.jar
>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire
>>> 2020-10-19T23-54-59_239-jvmRun9 surefire7884081050263655575tmp
>>> surefire_7142433009722615751420tmp
>>>
>>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time
>>> elapsed: 2 s - in org.apache.flink.runtime.taskexecutor.slot.
>>> TaskSlotTest
>>>
>>> [INFO] Running org.apache.flink.runtime.taskexecutor.
>>> NettyShuffleEnvironmentConfigurationTest
>>>
>>> [INFO] Running org.apache.flink.runtime.taskexecutor.slot.
>>> TaskSlotTableImplTest
>>>
>>> [INFO] Running org.apache.flink.runtime.taskexecutor.
>>> TaskExecutorToResourceManagerConnectionTest
>>>
>>> [INFO] Running org.apache.flink.runtime.taskexecutor.slot.
>>> TimerServiceTest
>>>
>>> [INFO] Running akka.actor.RobustActorSystemTest
>>>
>>> [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time
>>> elapsed: 2.077 

pyflink和flink版本的兼容性问题

2020-10-22 Thread whh_960101
Hi,各位大佬, 
想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink 
1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 
升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗

Re:Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 Thread Lynn Chen



hi, zhisheng:


我解析 json 后:
(xxx, xxx, xxx, topic, partition, offset)
=>


(false,1603420582310,"INSERT","test3.order",2,75)
(false,1603421312803,"INSERT","test3.order",2,76)
(false,1603421344819,"INSERT","test3.order",2,77)
(false,1603421344819,"INSERT","test3.order",2,78)


我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到


我的猜想:


我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)


broker1 配置:


listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


broker2 配置:


listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT







broker3 配置:


listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


本机连接kafka:
properties.setProperty("bootstrap.servers", "xxx-b-1:9797")


是跟这个配置有关吗? 










在 2020-10-23 08:37:14,"zhisheng"  写道:
>hi
>
>如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>
>eg:
>
>  env.addSource(new FlinkKafkaConsumer011<>(
>parameters.get("topic"),new
>JSONKeyValueDeserializationSchema(true),
>buildKafkaProps(parameters))).flatMap(new
>FlatMapFunction() {
>@Overridepublic void flatMap(ObjectNode jsonNodes,
>Collector collector) throws Exception {
>System.out.println(jsonNodes.get("value"));
>System.out.println(jsonNodes.get("metadata").get("topic").asText());
>
>System.out.println(jsonNodes.get("metadata").get("offset").asText());
>
>System.out.println(jsonNodes.get("metadata").get("partition").asText());
>   collector.collect(jsonNodes);
>}}).print();
>
>Best
>
>zhisheng
>
>
>Lynn Chen  于2020年10月23日周五 上午12:13写道:
>
>>
>>
>>
>>
>>
>>
>> hi,  Qijun Feng:
>>
>>
>> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-03 09:27:52,"LakeShen"  写道:
>> >Hi Qijun,
>> >
>> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>> >
>> >Best,
>> >LakeShen
>> >
>> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
>> >
>> >> Dear All,
>> >>
>> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >>  现在改成了所有地址,也换了 group.id
>> >>
>> >>
>> >> Properties properties = new Properties();
>> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >>
>> >> FlinkKafkaConsumer010 kafkaConsumer010 =
>> >>new FlinkKafkaConsumer010("behavior-logs_dev", new
>> >> BehaviorLogDeserializationSchema(), properties);
>> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>> >>
>> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
>> 2
>> >> 的,
>> >>
>> >> 2020-04-02 14:54:58,532 INFO
>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> Consumer subtask 0 creating fetcher with offsets
>> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >>
>> >>
>> >> 是哪里有问题吗?
>> >>
>> >>
>>


Re: pyflink和pyspark中引用包不兼容,li例如pyspark 中pandas版本1.1.4 而pyflink是>=0.23<0.25

2020-10-22 Thread Xingbo Huang
Hi,
pyspark对pandas版本的限制是>=0.23.2,你安装的话就默认安装了最新的版本的pandas,这是有很大的潜在风险的。
在pyflink
1.11版本的时候pdandas的版本限制是pandas>=0.23.4,<=0.25.3,使用更稳定的pandas的版本可以规避很多风险。而且这个版本范围也在pyspark的范围内,是能一起用的。

Best,
Xingbo

xuzh  于2020年10月23日周五 上午9:39写道:

> pyflink和pyspark中引用包不兼容,li例如pyspark 中pandas版本1.1.4 而pyflink是=0.23<0.25.
> 官方有没有升级pandas版的计划。
> 为了能让pyflink和pyspak兼容。在某些主版本上的包能保持一致嘛
> 个人建议


pyflink??pyspark????????????????li????pyspark ??pandas????1.1.4 ??pyflink??>=0.23<0.25

2020-10-22 Thread xuzh
pyflink??pysparklipyspark ??pandas1.1.4 
??pyflink??=0.23<0.25.
??pandas??
pyflink??pyspak


Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 Thread zhisheng
hi

如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。

eg:

  env.addSource(new FlinkKafkaConsumer011<>(
parameters.get("topic"),new
JSONKeyValueDeserializationSchema(true),
buildKafkaProps(parameters))).flatMap(new
FlatMapFunction() {
@Overridepublic void flatMap(ObjectNode jsonNodes,
Collector collector) throws Exception {
System.out.println(jsonNodes.get("value"));
System.out.println(jsonNodes.get("metadata").get("topic").asText());

System.out.println(jsonNodes.get("metadata").get("offset").asText());

System.out.println(jsonNodes.get("metadata").get("partition").asText());
   collector.collect(jsonNodes);
}}).print();

Best

zhisheng


Lynn Chen  于2020年10月23日周五 上午12:13写道:

>
>
>
>
>
>
> hi,  Qijun Feng:
>
>
> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-03 09:27:52,"LakeShen"  写道:
> >Hi Qijun,
> >
> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
> >
> >Best,
> >LakeShen
> >
> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
> >
> >> Dear All,
> >>
> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >>  现在改成了所有地址,也换了 group.id
> >>
> >>
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >>
> >> FlinkKafkaConsumer010 kafkaConsumer010 =
> >>new FlinkKafkaConsumer010("behavior-logs_dev", new
> >> BehaviorLogDeserializationSchema(), properties);
> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
> >>
> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
> 2
> >> 的,
> >>
> >> 2020-04-02 14:54:58,532 INFO
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> Consumer subtask 0 creating fetcher with offsets
> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >>
> >>
> >> 是哪里有问题吗?
> >>
> >>
>


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 Thread amen...@163.com
是的,正如@chenxuying 和@zhisheng 所说,

我这边采用的方案是通过pipeline.classpaths参数将需要的udf jar添加到类路径中,但是当task被分配到tm去执行时仍需要找到所需udf 
jar才行,所以在1.11版本中我采用-yt参数将/plugins插件目录上传至hdfs,即可解决这个问题~

best,
amenhub



 
发件人: zhisheng
发送时间: 2020-10-22 23:28
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
hi
 
flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持
 
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs
 
Best
zhisheng
 
Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:
 
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


请问Oracle作为维表注册到flinksql环境怎么支持?

2020-10-22 Thread Bruce
你好,我看目前jdbc 
connector仅支持mysql,postgresql可以注册到flinksql,我想把Oracle维表注册进去,怎样扩展connector可以实现呢?

发自我的iPhone

Re: Re: Flink-1.11.1 Rest API使用

2020-10-22 Thread amen...@163.com
还真是不支持,多谢解惑~



 
发件人: Peidian Li
发送时间: 2020-10-22 19:13
收件人: user-zh
主题: Re: Flink-1.11.1 Rest API使用
Yarn 的proxy server不支持POST请求,这是前几天yarn同事给我截的图:
我们改了下proxy server的逻辑来支持POST请求就可以了


一个stop with savepoint例子:
http://zjy-hadoop-prc-ct11.bj:21001/proxy/application_1600936402499_375893/jobs/790e4740baa52b43c0ceb9a5cdaf6135/stop?proxyapproved=true

Request body:
{
"drain" : true,
"targetDirectory" : "hdfs://zjyprc-hadoop/user/s_flink_tst/checkpoints4"
}

Response:
{
"request-id": "69416efc4538f56759f77a3001c38ff8"
}

2020年10月22日 下午2:30,Husky Zeng <568793...@qq.com> 写道:

其他接口大多不是post类型,你要修改成get或者其他的。可以先仔细阅读一下你发的这个页面上面的介绍,看看部署有没有出错。



--
Sent from: http://apache-flink.147419.n8.nabble.com/



Re: "stepless" sliding windows?

2020-10-22 Thread Jacob Sevart
I think the issue is you have to specify a *time *interval for "step." It
would be nice to consider the preceding N minutes as of every message. You
can somewhat approximate that using a very small step.

On Thu, Oct 22, 2020 at 2:29 AM Danny Chan  wrote:

> The SLIDING window always triggers as of each step, what do you mean by
> "stepless" ?
>
> Alex Cruise  于2020年10月21日周三 上午1:52写道:
>
>> whoops.. as usual, posting led me to find some answers myself. Does this
>> make sense given my requirements?
>>
>> Thanks!
>>
>> private class MyWindowAssigner(val windowSize: Time) : 
>> WindowAssigner() {
>> private val trigger = CountTrigger.of(1) as Trigger> TimeWindow>
>>
>> override fun assignWindows(
>> element: Record,
>> timestamp: Long,
>> context: WindowAssignerContext
>> ): MutableCollection {
>> return mutableListOf(TimeWindow(timestamp - 
>> windowSize.toMilliseconds(), timestamp))
>> }
>>
>> override fun getDefaultTrigger(env: StreamExecutionEnvironment?): 
>> Trigger {
>> return trigger
>> }
>>
>> override fun getWindowSerializer(executionConfig: ExecutionConfig?): 
>> TypeSerializer {
>> return TimeWindow.Serializer()
>> }
>>
>> override fun isEventTime(): Boolean {
>> return true
>> }
>> }
>>
>>
>> On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise  wrote:
>>
>>> Hey folks!
>>>
>>> I have an application that wants to use "stepless" sliding windows, i.e.
>>> we produce aggregates on every event. The windows need to be of a fixed
>>> size, but to have their start and end times update continuously, and I'd
>>> like to trigger on every event. Is this a bad idea? I've googled and read
>>> the docs extensively and haven't been able to identify built-in
>>> functionality or examples that map cleanly to my requirements.
>>>
>>> OK, I just found DeltaTrigger, which looks promising... Does it make
>>> sense to write a WindowAssigner that makes a new Window on every event,
>>> allocation rates aside?
>>>
>>> Thanks!
>>>
>>> -0xe1a
>>>
>>

-- 
Jacob Sevart
Software Engineer, Safety


Re:FlinkSQL 窗口使用问题

2020-10-22 Thread hailongwang
Hi Roc,
这边涉及到 order by 和 limit 一起使用时如何执行的问题。
1. 只对找到 limit n 的数据后,然后进行 order by,并不是对所有的数据;
2. 对所有的数据进行 order by 后,再 limit;
目前看 flink 对 `StreamExecSortLimit` 只保证输出 limit n,但是不保证输出的 limit n 是排序的。
如果业务允许的话,可以在 limit 后面加个 offset,这样可以使用 `emitRecordsWithRowNumber`,保证 limit n 最后是 
order by 的。
个人觉得 应该将 `StreamExecSortLimit` 的 `outputRankNumber` 设置为true。
不知道理解的对不对,CC @Jark @ godfrey




Best,
Hailong Wang




在 2020-10-22 10:09:09,"Roc Marshal"  写道:
>Hi,
>
>
>
>
>SELECT
>
>TUMBLE_START(ts, INTERVAL '1' day) as window_start,
>
>TUMBLE_END(ts, INTERVAL '1' day) as window_end,
>
>c1,
>
>sum(c2) as sc2
>
>FROM sourcetable
>
>GROUP BY TUMBLE(ts, INTERVAL '1' day), c1
>
>ORDER BY window_start, sc2 desc limit 10
>
>
>这个sql希望能够以一天为窗口(翻滚)进行计算窗口  
>按照c1分组,并对c2列求和(sc2)后对sc2进行窗口内排序。但是结果看起来,结果集中窗口内的数据列sc2并没有有序(降序/或者升序)排列。
>能不能根据我的需求和sql的写法诊断一下问题出在哪里?或者说给一下建议,好让我定位到自己对flinksql使用的误区在哪?
>
>
>谢谢!
>
>
>Best Roc.


Re:Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 Thread Lynn Chen






hi,  Qijun Feng:


我也遇到了类似的问题, 请问您后来是怎么解决的哈?

















在 2020-04-03 09:27:52,"LakeShen"  写道:
>Hi Qijun,
>
>看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>
>Best,
>LakeShen
>
>Qijun Feng  于2020年4月2日周四 下午5:44写道:
>
>> Dear All,
>>
>> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>>  现在改成了所有地址,也换了 group.id
>>
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> 10.216.77.170:9092,10.216.77.188:9092");
>> properties.setProperty("group.id", "behavior-logs-aggregator");
>>
>> FlinkKafkaConsumer010 kafkaConsumer010 =
>>new FlinkKafkaConsumer010("behavior-logs_dev", new
>> BehaviorLogDeserializationSchema(), properties);
>> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>>
>> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2
>> 的,
>>
>> 2020-04-02 14:54:58,532 INFO
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> Consumer subtask 0 creating fetcher with offsets
>> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>>
>>
>> 是哪里有问题吗?
>>
>>


Re:使用Flink Table API & SQL编写流应用,SQL中的NOW()等时间函数如何理解

2020-10-22 Thread hailongwang
 Hi Longdexin,
根据文档[1]描述,now 函数是非确定性的,意思是不会在 RelNode 优化阶段将其 
常量折叠优化掉,所以这个函数是会不断更新的,并不是启动的时间,并且一直不变。
在自定义 UDF 时候,可以覆盖方法
`default boolean isDeterministic
` 来决定是确定性的还是非确定性的,默认是true。




[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-functions



Best,
Hailong Wang

在 2020-10-22 19:13:03,"Longdexin" <274522...@qq.com> 写道:
>请问,当流应用运行起来后,随着时间的推移,比如,到第二天了,SQL中的NOW()会随着处理时间不断更新,从而保证处理逻辑的正确性吗?在我的理解中,在流应用启动的那一刻,NOW()的值就确定了,以后也不会再改变了,那么,使用什么方式可以让SQL中的时间比较逻辑与时俱进呢?非常感谢。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-10-22 Thread John Smith
Any thoughts this doesn't seem to create duplicates all the time or maybe
it's unrelated as we are still seeing the message and there is no
duplicates...

On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, 
wrote:

> And yes my downstream is handling the duplicates in an idempotent way so
> we are good on that point. But just curious what the behaviour is on the
> source consumer when that error happens.
>
> On Wed, 21 Oct 2020 at 12:04, John Smith  wrote:
>
>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-21
>> 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>> Consumer clientId=consumer-2, groupId=xx-import] Error sending fetch
>> request (sessionId=806089934, epoch=INITIAL) to node 0:
>> org.apache.kafka.common.errors.DisconnectException.
>>
>> Obviously it looks like the consumer is getting disconnected and from
>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>> possibly version mismatch between client and brokers. That's fine I can
>> look at upgrading the client and/or Kafka. But I'm trying to understand
>> what happens in terms of the source and the sink. It looks let we get
>> duplicates on the sink and I'm guessing it's because the consumer is
>> failing and at that point Flink stays on that checkpoint until it can
>> reconnect and process that offset and hence the duplicates downstream?
>>
>


Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 Thread zhisheng
hi

flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持

 [1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs

Best
zhisheng

Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:

>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 Thread zhisheng
Hi Robin:

1、是不是更改了刷新时间?一直不显示吗?

2、running 的作业不会显示的,你可以之间在 yarn 查看,history server 应该是只提供展示挂掉的作业

PS:另外提几个 history server 的问题

1、挂掉的作业展示能否支持分页呢?目前直接在一个页面全部展示了历史所有的作业,打开会很卡

2、有办法可以查看挂掉作业的 jm 和 tm 日志吗?因为 HDFS
其实是有日志,按道理是可以拿到日志信息然后解析展示出来的,Spark history server 也是可以查看挂掉作业的日志


Best!
zhisheng

Robin Zhang  于2020年10月22日周四 下午6:11写道:

>
> 如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table
> api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。
> 使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per
> job方式提交到yarn运行的,只是多了个sql解析动作。不能理解
>
> ,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t447/%E5%8E%86%E5%8F%B2%E6%9C%8D%E5%8A%A1%E5%99%A8.png>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Un-ignored Parsing Exceptions in the CsvFormat

2020-10-22 Thread Austin Cawley-Edwards
Hey Roman,

Sorry to miss this -- thanks for the confirmation and making the ticket.
I'm happy to propose a fix if someone is able to assign the ticket to me.

Best,
Austin

On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hey Austin,
>
> I think you are right. The problematic row contains an odd number of
> delimiters in which case skipFields will return -1, which in turn leads to
> an exception.
>
> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
> to fix it.
>
> Regards,
> Roman
>
>
> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>> Format[1].
>>
>> Even with the `ignoreParseErrors()` set, the job fails when it encounters
>> some types of malformed rows. The root cause is indeed a `ParseException`,
>> so I'm wondering if there's anything more I need to do to ignore these
>> rows. Each field in the schema is a STRING.
>>
>>
>> I've configured the CSV format and table like so:
>>
>> tableEnv.connect(
>> new FileSystem()
>> .path(path)
>> )
>> .withFormat(
>> new Csv()
>> .quoteCharacter('"')
>> .ignoreParseErrors()
>> )
>> .withSchema(schema)
>> .inAppendMode()
>>
>>
>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
>> to `isLenient()` if there is an unexpected parser position?[2]
>>
>> Example error:
>>
>> 2020-10-16 12:50:18
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception when processing split: null
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>> parser position for column 1 of row '",
>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>> ""company,'
>> at
>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>> [2]:
>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>
>


need help about executeinsert,thanks!

2020-10-22 Thread ??????
I'm learningexecuteinsert from the document


My code is:
https://paste.ubuntu.com/p/d2TDdcy7GB/


I guess the functioncreateTemporaryView can create a table needed by the 
function executeinsert




I got:
No table was registered under the name 
`default_catalog`.`default_database`.`OutOrders`.


Could you help correct the code in above link ?


Thanks for your help

Re: NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-22 Thread Dylan Forciea
Danny,

Thanks! I have created a new JIRA issue [1]. I’ll look into how hard it is to 
get a patch and unit test myself, although I may need a hand on the process of 
making a change to both the master branch and a release branch if it is desired 
to get a fix into 1.11.

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan 
Date: Thursday, October 22, 2020 at 4:34 AM
To: Dylan Forciea 
Cc: Flink ML 
Subject: Re: NullPointerException when trying to read null array in Postgres 
using JDBC Connector

Yes, the current code throws directly for NULLs, can you log an issue there ?

Dylan Forciea mailto:dy...@oseberg.io>> 于2020年10月21日周三 
上午4:30写道:
I believe I am getting an error because I have a nullable postgres array of 
text that is set to NULL that I’m reading using the JDBC SQL Connector. Is this 
something that should be allowed? Looking at the source code line below, it 
doesn’t look like the case of an array being null would be handled.

[error] Caused by: java.io.IOException: Couldn't access resultSet
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)
[error]   at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
[error]   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
[error] Caused by: java.lang.NullPointerException
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)
[error]   ... 5 more

Thanks,
Dylan Forciea


Re: 回复: rename error in flink sql

2020-10-22 Thread Timo Walther

Hi,

sorry for the late reply. I the problem was in the 
`tEnv.toAppendStream(result,Order.class).print();` right?


You can also find a new example here:

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java

We are in the process of adding more example code for beginners.

I will reply to your JIRA issue in the comments.

Regards,
Timo


On 21.10.20 07:17, ?? wrote:

Thanks for your help,I have finally modify it and solve it by luck.

but new relevent question occur:
https://issues.apache.org/jira/browse/FLINK-19746
Could you please have a look at it?

*I know JIRA is for bug only,but the email will destroy the table I posted,*
*forgive me for posting in JIRA and close it if you think it's not a bug.*

Thanks for your help


--?0?2?0?2--
*??:* "??" ;
*:*?0?22020??10??21??(??) 10:54
*??:*?0?2"Andrey Zagrebin";
*:*?0?2"user";"dwysakowicz" 
;"twalthr";

*:*?0?2?? rename error in flink sql


Thanks for your replies.

um
Could you tell me *where I am wrong in this code?*
*I'm weak in Java.*
*
*
The pojo is copied from flink official example:
https://paste.ubuntu.com/p/kRKxwBpSVK/

the test code is:
https://paste.ubuntu.com/p/yVYhZqMNq7/

Thanks for your help~!


--  --
*??:* "Andrey Zagrebin" ;
*:*?0?22020??10??20??(??) 11:58
*??:*?0?2"??";
*:*?0?2"user";"dwysakowicz" 
;"twalthr";

*:*?0?2Re: rename error in flink sql

Hi,

I am not an SQL expert but I would not expect the original POJO to match 
the new row with the renamed field.

Maybe Timo or Dawid have to add something.

Best,
Andrey

On Tue, Oct 20, 2020 at 4:56 PM ?? > wrote:



I'm learning "select"from
official document




my code is:
https://paste.ubuntu.com/p/yVYhZqMNq7/

the error I got is:
 ?0?2total is not found in PojoType

It's strange that the*renamed field should be found in pojo.*
*
*
*Could anyone help me?*
*thanks for your help*







Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Timo Walther

Hi Manas,

you can use static variable but you need to make sure that the logic to 
fill the static variable is accessible and executed in all JVMs.


I assume `pipeline.properties` is in your JAR that you submit to the 
cluster right? Then you should be able to access it through a singleton 
pattern instead of a static variable access.


Regards,
Timo


On 22.10.20 14:17, Manas Kale wrote:
Sorry, I messed up the code snippet in the earlier mail. The correct one 
is :


public static void main(String[] args) {
Properties prop =new Properties();

InputStream is = 
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

HashMap strMap =new HashMap<>();

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap);

...

}

public class Config {

public static StringCONFIG_TOPIC;

publicConfig(HashMap s) {

 CONFIG_TOPIC = s.get("CONFIG_TOPIC");

}

}

The value of CONFIG_TOPIC in a minicluster is properly loaded but null 
when run on a cluster.



On Thu, Oct 22, 2020 at 5:42 PM Manas Kale > wrote:


Hi Timo,
Thank you for the explanation, I can start to see why I was getting
an exception.
Are you saying that I cannot use static variables at all when trying
to deploy to a cluster? I would like the variables to remain static
and not be instance-bound as they are accessed from multiple classes.
Based on my understanding of what you said, I implemented the
following pattern:

public static void main(String[] args) {
Properties prop =new Properties();

InputStream is = 
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap, longMap);

...

}

public class Config {

public static StringCONFIG_TOPIC;
public static StringCONFIG_KAFKA;

public Config(HashMap s) {
 CONFIG_TOPIC = s.get("CONFIG_TOPIC");
 CONFIG_KAFKA = s.get("CONFIG_KAFKA");

}

}

This produces the same issue. With the easier solution that you
listed, are you implying I use multiple instances or a singleton
pattern of some sort?

On Thu, Oct 22, 2020 at 1:23 PM Timo Walther mailto:twal...@apache.org>> wrote:

Hi Manas,

you need to make sure to differentiate between what Flink calls
"pre-flight phase" and "cluster phase".

The pre-flight phase is were the pipeline is constructed and all
functions are instantiated. They are then later serialized and
send to
the cluster.

If you are reading your properties file in the `main()` method
and store
something in static variables, the content is available locally
where
the pipeline is constructed (e.g. in the client) but when the
function
instances are send to the cluster. Those static variables are fresh
(thus empty) in the cluster JVMs. You need to either make sure
that the
properties file is read from each task manager again, or easier:
pass
the parameters as constructor parameters into the instances such
that
they are shipped together with the function itself.

I hope this helps.

Regards,
Timo


On 22.10.20 09:24, Manas Kale wrote:
 > Hi,
 > I am trying to write some data to a kafka topic and I have
the following
 > situation:
 >
 > monitorStateStream
 >
 >     .process(new
IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
 >
 >     /... // Stream that outputs elements of type IDAP2Alarm/
 >
 > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
 > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
 >
 > private static 
FlinkKafkaProducer getFlinkKafkaProducer(String servers,
String topic) {
 >     Properties properties =new Properties();
 >     properties.setProperty("bootstrap.servers", servers);
 >     return new FlinkKafkaProducer(topic,
 >           (element, timestamp) -> element.serializeForKafka(),
 >           properties,
 >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
 > }
 >
 > /*
 > This interface is used to indicate that a class may be output
to Kafka.
 > Since Kafka treats all
 > data as bytes, classes that implement this interface have to
provide an
 > implementation for the
 > serializeForKafka() method.
 > */
 > public interface IDAP2JSONOutput {
 >
 >      // Implement serialization logic in this method.
 > ProducerRecord serializeForKafka();
 >
 > }
 >
 > public class IDAP2Alarmextends 

How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-22 Thread Longdexin
>From my point of view, the value of NOW() function in SQL is certain by the
time when the streaming app is launched and will not change with the process
time. However, as a new Flink user, I'm not so sure of that. By the way, if
my attemp is to keep the time logic to update all the time, what should I
do?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
Sorry, I messed up the code snippet in the earlier mail. The correct one is
:

public static void main(String[] args) {
   Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

HashMap strMap = new HashMap<>();

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap);

...

}

public class Config {

public static String CONFIG_TOPIC;

public Config(HashMap s) {

CONFIG_TOPIC = s.get("CONFIG_TOPIC");

}

}

The value of CONFIG_TOPIC in a minicluster is properly loaded but null when
run on a cluster.


On Thu, Oct 22, 2020 at 5:42 PM Manas Kale  wrote:

> Hi Timo,
> Thank you for the explanation, I can start to see why I was getting an
> exception.
> Are you saying that I cannot use static variables at all when trying to
> deploy to a cluster? I would like the variables to remain static and not be
> instance-bound as they are accessed from multiple classes.
> Based on my understanding of what you said, I implemented the
> following pattern:
>
> public static void main(String[] args) {
>Properties prop = new Properties();
>
> InputStream is = 
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> prop.load(is);
>
> strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>
> new Config(strMap, longMap);
>
> ...
>
> }
>
> public class Config {
>
> public static String CONFIG_TOPIC;
> public static String CONFIG_KAFKA;
>
> public Config(HashMap s) {
> CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> CONFIG_KAFKA = s.get("CONFIG_KAFKA");
>
> }
>
> }
>
> This produces the same issue. With the easier solution that you listed,
> are you implying I use multiple instances or a singleton pattern of some
> sort?
>
> On Thu, Oct 22, 2020 at 1:23 PM Timo Walther  wrote:
>
>> Hi Manas,
>>
>> you need to make sure to differentiate between what Flink calls
>> "pre-flight phase" and "cluster phase".
>>
>> The pre-flight phase is were the pipeline is constructed and all
>> functions are instantiated. They are then later serialized and send to
>> the cluster.
>>
>> If you are reading your properties file in the `main()` method and store
>> something in static variables, the content is available locally where
>> the pipeline is constructed (e.g. in the client) but when the function
>> instances are send to the cluster. Those static variables are fresh
>> (thus empty) in the cluster JVMs. You need to either make sure that the
>> properties file is read from each task manager again, or easier: pass
>> the parameters as constructor parameters into the instances such that
>> they are shipped together with the function itself.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 22.10.20 09:24, Manas Kale wrote:
>> > Hi,
>> > I am trying to write some data to a kafka topic and I have the
>> following
>> > situation:
>> >
>> > monitorStateStream
>> >
>> > .process(new
>> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
>> >
>> > /... // Stream that outputs elements of type IDAP2Alarm/
>> >
>> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
>> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
>> >
>> > private static  FlinkKafkaProducer
>> getFlinkKafkaProducer(String servers, String topic) {
>> > Properties properties =new Properties();
>> > properties.setProperty("bootstrap.servers", servers);
>> > return new FlinkKafkaProducer(topic,
>> >   (element, timestamp) -> element.serializeForKafka(),
>> >   properties,
>> >   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>> > }
>> >
>> > /*
>> > This interface is used to indicate that a class may be output to Kafka.
>> > Since Kafka treats all
>> > data as bytes, classes that implement this interface have to provide an
>> > implementation for the
>> > serializeForKafka() method.
>> > */
>> > public interface IDAP2JSONOutput {
>> >
>> >  // Implement serialization logic in this method.
>> > ProducerRecord serializeForKafka();
>> >
>> > }
>> >
>> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
>> >
>> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
>> >
>> > @Override
>> > public ProducerRecord serializeForKafka() {
>> >  byte[] rawValue;
>> >  byte[] rawKey;
>> >  String k = getMonitorFeatureKey().getMonitorName() ;
>> >  ...
>> >
>> >  rawValue = val.getBytes();
>> >
>> >  LOGGER.info("value of alarms topic from idap2 alarm : " +
>> > Config.ALARMS_TOPIC);
>> >
>> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); //
>> Line 95
>> > }
>> >
>> > }
>> >
>> >
>> > Config.ALARMS_TOPIC is a static string that is read from a properties
>> > file. When I run this code on my IDE minicluster, it runs great with no
>> > problems. But when I submit it as a jar to the cluster, I get the
>> > following error:
>> >
>> > Caused by: java.lang.IllegalArgumentException: Topic 

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
Hi Timo,
Thank you for the explanation, I can start to see why I was getting an
exception.
Are you saying that I cannot use static variables at all when trying to
deploy to a cluster? I would like the variables to remain static and not be
instance-bound as they are accessed from multiple classes.
Based on my understanding of what you said, I implemented the
following pattern:

public static void main(String[] args) {
   Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap, longMap);

...

}

public class Config {

public static String CONFIG_TOPIC;
public static String CONFIG_KAFKA;

public Config(HashMap s) {
CONFIG_TOPIC = s.get("CONFIG_TOPIC");
CONFIG_KAFKA = s.get("CONFIG_KAFKA");

}

}

This produces the same issue. With the easier solution that you listed, are
you implying I use multiple instances or a singleton pattern of some sort?

On Thu, Oct 22, 2020 at 1:23 PM Timo Walther  wrote:

> Hi Manas,
>
> you need to make sure to differentiate between what Flink calls
> "pre-flight phase" and "cluster phase".
>
> The pre-flight phase is were the pipeline is constructed and all
> functions are instantiated. They are then later serialized and send to
> the cluster.
>
> If you are reading your properties file in the `main()` method and store
> something in static variables, the content is available locally where
> the pipeline is constructed (e.g. in the client) but when the function
> instances are send to the cluster. Those static variables are fresh
> (thus empty) in the cluster JVMs. You need to either make sure that the
> properties file is read from each task manager again, or easier: pass
> the parameters as constructor parameters into the instances such that
> they are shipped together with the function itself.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 22.10.20 09:24, Manas Kale wrote:
> > Hi,
> > I am trying to write some data to a kafka topic and I have the following
> > situation:
> >
> > monitorStateStream
> >
> > .process(new
> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >
> > /... // Stream that outputs elements of type IDAP2Alarm/
> >
> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >
> > private static  FlinkKafkaProducer
> getFlinkKafkaProducer(String servers, String topic) {
> > Properties properties =new Properties();
> > properties.setProperty("bootstrap.servers", servers);
> > return new FlinkKafkaProducer(topic,
> >   (element, timestamp) -> element.serializeForKafka(),
> >   properties,
> >   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> > }
> >
> > /*
> > This interface is used to indicate that a class may be output to Kafka.
> > Since Kafka treats all
> > data as bytes, classes that implement this interface have to provide an
> > implementation for the
> > serializeForKafka() method.
> > */
> > public interface IDAP2JSONOutput {
> >
> >  // Implement serialization logic in this method.
> > ProducerRecord serializeForKafka();
> >
> > }
> >
> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
> >
> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
> >
> > @Override
> > public ProducerRecord serializeForKafka() {
> >  byte[] rawValue;
> >  byte[] rawKey;
> >  String k = getMonitorFeatureKey().getMonitorName() ;
> >  ...
> >
> >  rawValue = val.getBytes();
> >
> >  LOGGER.info("value of alarms topic from idap2 alarm : " +
> > Config.ALARMS_TOPIC);
> >
> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); //
> Line 95
> > }
> >
> > }
> >
> >
> > Config.ALARMS_TOPIC is a static string that is read from a properties
> > file. When I run this code on my IDE minicluster, it runs great with no
> > problems. But when I submit it as a jar to the cluster, I get the
> > following error:
> >
> > Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
> >  at
> >
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:71)
>
> > ~[flink_POC-0.1.jar:?]
> >  at
> >
> org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:133)
>
> > ~[flink_POC-0.1.jar:?]
> > *at
> >
> flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
> > ~[flink_POC-0.1.jar:?]*
> >  at
> >
> flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
>
> > ~[flink_POC-0.1.jar:?]
> >  at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
>
> > ~[flink_POC-0.1.jar:?]
> >  at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>
> > ~[flink_POC-0.1.jar:?]
> >  at
> >
> 

使用Flink Table API & SQL编写流应用,SQL中的NOW()等时间函数如何理解

2020-10-22 Thread Longdexin
请问,当流应用运行起来后,随着时间的推移,比如,到第二天了,SQL中的NOW()会随着处理时间不断更新,从而保证处理逻辑的正确性吗?在我的理解中,在流应用启动的那一刻,NOW()的值就确定了,以后也不会再改变了,那么,使用什么方式可以让SQL中的时间比较逻辑与时俱进呢?非常感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: mvn clean verify - testConfigurePythonExecution failing

2020-10-22 Thread Chesnay Schepler
try naming it PythonProgramOptionsITCase; it apparently needs a jar to 
be created first, which happens after unit tests (tests suffixed with 
Test) are executed.


On 10/22/2020 1:48 PM, Juha Mynttinen wrote:

Hello there,

The PR https://github.com/apache/flink/pull/13322 lately added the 
test method testConfigurePythonExecution in 
org.apache.flink.client.cli.PythonProgramOptionsTest.


"mvn clean verify" fails for me in testConfigurePythonExecution:

...
INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 0.433 s <<< FAILURE! - in 
org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR] 
testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest) 
 Time elapsed: 0.019 s  <<< ERROR!

java.nio.file.NoSuchFileException: target/dummy-job-jar
at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at 
java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at 
java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
at 
java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)

at java.base/java.nio.file.Files.readAttributes(Files.java:1763)
at 
java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)

at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2716)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2796)
at 
org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
at 
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

...
[ERROR] Errors:
[ERROR] PythonProgramOptionsTest.testConfigurePythonExecution:131 » 
NoSuchFile target/...




The command "find . -name dummy-job-jar" doesn't find anything. I 
didn't check any deeper why 

mvn clean verify - testConfigurePythonExecution failing

2020-10-22 Thread Juha Mynttinen
Hello there,

The PR https://github.com/apache/flink/pull/13322 lately added the test
method  testConfigurePythonExecution in
org.apache.flink.client.cli.PythonProgramOptionsTest.

"mvn clean verify" fails for me in  testConfigurePythonExecution:

...
INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed:
0.433 s <<< FAILURE! - in
org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR]
testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest)
 Time elapsed: 0.019 s  <<< ERROR!
java.nio.file.NoSuchFileException: target/dummy-job-jar
at
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at
java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at
java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
at
java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
at java.base/java.nio.file.Files.readAttributes(Files.java:1763)
at
java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2716)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2796)
at
org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
at
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
at
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
...
[ERROR] Errors:
[ERROR]   PythonProgramOptionsTest.testConfigurePythonExecution:131 »
NoSuchFile target/...



The command "find . -name dummy-job-jar" doesn't find anything. I didn't
check any deeper why things fail. Ideas? I'm running "Python 3.8.5".

Regards,
Juha


Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread Xingbo Huang
Hi,

从源码编译安装把。可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink

Best,
Xingbo

whh_960101  于2020年10月22日周四 下午6:47写道:

> 现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo"  写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = 'http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>


Dependency vulnerabilities with flink 1.11.1 version

2020-10-22 Thread V N, Suchithra (Nokia - IN/Bangalore)

Hello,

We are using Apache Flink 1.11.1 version. During our security scans following 
issues are reported by our scan tool.

1.Package : commons_codec-1.10
Severity: Medium

Description:
Apache Commons contains a flaw that is due to the Base32 codec decoding invalid 
strings instead of rejecting them. This may allow a remote attacker to tunnel 
additional information via a base 32 string that seems valid.

Path:
/opt/flink/lib/flink-table_2.11-1.11.1.jar:commons-codec
/opt/flink/lib/flink-table-blink_2.11-1.11.1.jar:commons-codec

References:
https://issues.apache.org/jira/browse/CODEC-134
https://issues.apache.org/jira/browse/HTTPCLIENT-2018

2. Package : antlr-4.7
Severity: Medium

Description:
ANTLR contains a flaw in 
runtime/Java/src/org/antlr/v4/runtime/atn/ParserATNSimulator.java that is 
triggered as it does not catch exceptions when attempting to access the 
TURN_OFF_LR_LOOP_ENTRY_BRANCH_OPT environment variable. This may allow a 
context-dependent attacker to potentially crash a process linked against the 
library.

Path:
/opt/flink/opt/flink-python_2.11-1.11.1.jar:antlr4-runtime
References:
https://github.com/antlr/antlr4/issues/2069

3. Package : mesos-1.0.1
Severity: Medium

Description:
Apache Mesos can be configured to require authentication to call the Executor 
HTTP API using JSON Web Token (JWT). In Apache Mesos versions pre-1.4.2, 1.5.0, 
1.5.1, 1.6.0 the comparison of the generated HMAC value against the provided 
signature in the JWT implementation used is vulnerable to a timing attack 
because instead of a constant-time string comparison routine a standard `==` 
operator has been used. A malicious actor can therefore abuse the timing 
difference of when the JWT validation function returns to reveal the correct 
HMAC value.
Path:
/opt/flink/lib/flink-dist_2.11-1.11.1.jar:mesos

References:
https://nvd.nist.gov/vuln/detail/CVE-2018-8023

4. Package : okhttp-3.7.0
Severity: Medium

Description:
** DISPUTED ** CertificatePinner.java in OkHttp 3.x through 3.12.0 allows 
man-in-the-middle attackers to bypass certificate pinning by changing 
SSLContext and the boolean values while hooking the application. NOTE: This id 
is disputed because some parties don't consider this is a vulnerability. Their 
rationale can be found in https://github.com/square/okhttp/issues/4967.
Path:
/opt/flink/plugins/metrics-datadog/flink-metrics-datadog-1.11.1.jar:okhttp
References:
https://nvd.nist.gov/vuln/detail/CVE-2018-20200

5. Package : commons_io-2.4
Severity: Medium

Description:
Apache Commons IO contains a flaw that allows traversing outside of a 
restricted path. The issue is due to FileNameUtils.normalize not properly 
sanitizing user input, specifically path traversal style attacks (e.g. '../'). 
With a specially crafted request, a remote attacker can disclose arbitrary 
files.
Path:
/opt/flink/lib/flink-dist_2.11-1.11.1.jar:commons-io
/opt/flink/lib/flink-table-blink_2.11-1.11.1.jar:commons-io

References:
https://issues.apache.org/jira/browse/IO-556


Please let us know your comments on these issues and fix plans.

Regards,
Suchithra


Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 Thread Robin Zhang

如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table
api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。
使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per
job方式提交到yarn运行的,只是多了个sql解析动作。不能理解
,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。


 





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗

2020-10-22 Thread Danny Chan
你可以了解下 Calcite 的 metadata 系统,其中有一个 metadata: RelMdColumnOrigins 可以拿到 column 
的血缘,前提是你要拿到 SQL 对的关系表达式树。

Best,
Danny Chan
在 2020年10月20日 +0800 PM8:43,dawangli ,写道:
> 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗


Re: (iceberg testing) DDL parser error when create hive catalog

2020-10-22 Thread OpenInx
Hi

I'm maintaining the flink sink connector from apache iceberg community,
did your classpath include the correct  iceberg-flink-runtime.jar ?

Pls following the steps here:
https://github.com/apache/iceberg/blob/master/site/docs/flink.md

Thanks.

On Wed, Oct 21, 2020 at 10:54 PM 18717838093 <18717838...@126.com> wrote:

>
> Below is my error, thanks for help, I would like to know ,is it a bug ?
>
> I followed the website to carry on:
> https://github.com/apache/iceberg/pull/1464/files
>
>
> Flink SQL> CREATE CATALOG hive_catalog WITH (
> >   'type'='iceberg',
> >   'catalog-type'='hive',
> >   'uri'='thrift://localhost:9083',
> >   'clients'='5',
> >   'property-version'='1'
> > );
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.CatalogFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The following properties are requested:
> catalog-type=hive
> clients=5
> property-version=1
> type=iceberg
> uri=thrift://localhost:9083
>
> The following factories have been considered:
> org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>
> 18717838093
> 18717838...@126.com
>
> 
> 签名由 网易邮箱大师  定制
>
>


Re: NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-22 Thread Danny Chan
Yes, the current code throws directly for NULLs, can you log an issue there
?

Dylan Forciea  于2020年10月21日周三 上午4:30写道:

> I believe I am getting an error because I have a nullable postgres array
> of text that is set to NULL that I’m reading using the JDBC SQL Connector.
> Is this something that should be allowed? Looking at the source code line
> below, it doesn’t look like the case of an array being null would be
> handled.
>
>
>
> [error] Caused by: java.io.IOException: Couldn't access resultSet
>
> [error]   at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)
>
> [error]   at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)
>
> [error]   at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>
> [error]   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
> [error]   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
> [error]   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> [error] Caused by: java.lang.NullPointerException
>
> [error]   at
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)
>
> [error]   at
> org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)
>
> [error]   at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)
>
> [error]   ... 5 more
>
>
>
> Thanks,
>
> Dylan Forciea
>


Re: "stepless" sliding windows?

2020-10-22 Thread Danny Chan
The SLIDING window always triggers as of each step, what do you mean by
"stepless" ?

Alex Cruise  于2020年10月21日周三 上午1:52写道:

> whoops.. as usual, posting led me to find some answers myself. Does this
> make sense given my requirements?
>
> Thanks!
>
> private class MyWindowAssigner(val windowSize: Time) : WindowAssigner TimeWindow>() {
> private val trigger = CountTrigger.of(1) as Trigger TimeWindow>
>
> override fun assignWindows(
> element: Record,
> timestamp: Long,
> context: WindowAssignerContext
> ): MutableCollection {
> return mutableListOf(TimeWindow(timestamp - 
> windowSize.toMilliseconds(), timestamp))
> }
>
> override fun getDefaultTrigger(env: StreamExecutionEnvironment?): 
> Trigger {
> return trigger
> }
>
> override fun getWindowSerializer(executionConfig: ExecutionConfig?): 
> TypeSerializer {
> return TimeWindow.Serializer()
> }
>
> override fun isEventTime(): Boolean {
> return true
> }
> }
>
>
> On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise  wrote:
>
>> Hey folks!
>>
>> I have an application that wants to use "stepless" sliding windows, i.e.
>> we produce aggregates on every event. The windows need to be of a fixed
>> size, but to have their start and end times update continuously, and I'd
>> like to trigger on every event. Is this a bad idea? I've googled and read
>> the docs extensively and haven't been able to identify built-in
>> functionality or examples that map cleanly to my requirements.
>>
>> OK, I just found DeltaTrigger, which looks promising... Does it make
>> sense to write a WindowAssigner that makes a new Window on every event,
>> allocation rates aside?
>>
>> Thanks!
>>
>> -0xe1a
>>
>


Re: Extract column and table lineage from flink sql

2020-10-22 Thread Danny Chan
Hi, dawangli ~

Usually people build the lineage of tables through a self-built platform,
there was a DB to persist the relationship between the tables, for each
job, you may need to analyze each SQL which are source tables and which are
sink.

E.G. The INSERT target table is a sink and table after the scan or join is
a source.

If you got the rel tree, you can get the info by a shuttle, if an AST
instead, you can have a SqlVisitor.

Danny Chan  于2020年10月22日周四 下午5:24写道:

> Hi, dawangli ~
>
> Usually people build the lineage of tables through a self-built platform,
> there was a DB to persist the relationship between the tables, for each
> job, you may need to analyze each SQL which are source tables and which are
> sink.
>
> E.G. The INSERT target table is a sink and table after the scan or join is
> a source.
>
> If you got the rel tree, you can get the info by a shuttle, if an AST
> instead, you can have a SqlVisitor.
>
> dawangli  于2020年10月20日周二 下午9:46写道:
>
>> I want to build a lineage system for a real-time data warehouse,how can I
>> extract table and column lineage from flink sql?
>>
>>
>>
>>
>>
>>
>>
>


Re: flink sql 写入hive问题

2020-10-22 Thread Jingsong Li
writer的并行度是根据上游并行度来的

committer的并行度才是1

On Thu, Oct 22, 2020 at 5:22 PM 酷酷的浑蛋  wrote:

> 我用flink sql实时写入hive表时发现sink的并行度为1?
> 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢?  并行度1的写入速度很慢
>
>
>
>

-- 
Best, Jingsong Lee


flink sql 写入hive问题

2020-10-22 Thread 酷酷的浑蛋
我用flink sql实时写入hive表时发现sink的并行度为1? 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢?  
并行度1的写入速度很慢





Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?

















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>
> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>  TABLE myUserTable (
>   user_id STRING,
>   user_name STRING
>   uv BIGINT,
>   pv BIGINT,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://localhost:9200',
>   'index' = 'users'
> );Connector Options
> | Option | Required | Default | Type | Description |
> |
> connector
> | required | (none) | String | Specify what connector to use, valid values 
> are:
> elasticsearch-6: connect to Elasticsearch 6.x cluster
> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> |
> |
> hosts
> | required | (none) | String | One or more Elasticsearch hosts to connect to, 
> e.g. 'http://host_name:9092;http://host_name:9093'. |
> |
> index
> | required | (none) | String | Elasticsearch index for every record. Can be a 
> static index (e.g. 'myIndex') or a dynamic index (e.g. 
> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for more 
> details. |
> |
> document-type
> | required in 6.x | (none) | String | Elasticsearch document type. Not 
> necessary anymore in elasticsearch-7. |
> |
> document-id.key-delimiter
> | optional | _ | String | Delimiter for composite keys ("_" by default), 
> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> |
> failure-handler
> | optional | fail | String | Failure handling strategy in case a request to 
> Elasticsearch fails. Valid strategies are:
> fail: throws an exception if a request fails and thus causes a job failure.
> ignore: ignores failures and drops the request.
> retry_rejected: re-adds requests that have failed due to queue capacity 
> saturation.
> custom class name: for failure handling with a ActionRequestFailureHandler 
> subclass.
> |
> |
> sink.flush-on-checkpoint
> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
> sink will not wait for all pending action requests to be acknowledged by 
> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
> guarantees for at-least-once delivery of action requests. |
> |
> sink.bulk-flush.max-actions
> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
> request. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.max-size
> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
> per bulk request. Must be in MB granularity. Can be set to '0' to disable it. 
> |
> |
> sink.bulk-flush.interval
> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
> allowing for complete async processing of buffered actions. |
> |
> sink.bulk-flush.backoff.strategy
> | optional | DISABLED | String | Specify how to perform retries if any flush 
> actions failed due to a temporary request error. Valid strategies are:
> DISABLED: no retry performed, i.e. fail after the first request error.
> CONSTANT: wait for backoff delay between retries.
> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
> between retries.
> |
> |
> sink.bulk-flush.backoff.max-retries
> | optional | 8 | Integer | Maximum number of backoff retries. |
> |
> sink.bulk-flush.backoff.delay
> | optional | 50ms | Duration | Delay between each backoff attempt. For 
> CONSTANT backoff, this is simply the delay between each retry. For 
> EXPONENTIAL backoff, this is the initial base delay. |
> |
> connection.max-retry-timeout
> | optional | (none) | Duration | Maximum timeout between retries. |
> |
> connection.path-prefix
> | optional | (none) | String | Prefix string to be added to every REST 
> communication, e.g., '/v1' |
> |
> format
> | optional | json | String | Elasticsearch connector supports to specify a 
> format. The format must produce a valid json document. By default uses 
> built-in 'json' format. Please refer to JSON Format page for more details. |
>
>
>
>
>
>
>


Re: Configurable Parser

2020-10-22 Thread Timo Walther

Hi Theo,

this is indeed a difficult use case. The KafkaDeserializationSchema is 
actually meant mostly for deserialization and should not contain more 
complex logic such as joining with a different topic. You would make 
KafkaDeserializationSchema stateful.


But in your usecase, I see no better alternative than making 
KafkaDeserializationSchema more complex if per-partition watermarking 
should be in place. Are the parser changes happen frequently? And how do 
you deal with a failure case when the Flink job restarts and all state 
in the deserialization schema is lost? You might need to extend the 
FlinkKafkaConsumer to add another parser state for persistence.


Regards,
Timo


On 21.10.20 17:25, Theo Diefenthal wrote:

Hi there,

In my usecase, I read data from Kafka where in each kafka partition, I 
have ascending timestamps.
Currently, I parse the data from Kafka with a custom deserialization 
schema so that after parsing, the FlinkKafkaConsumerBase can extract the 
eventtime ascending timestamps and create proper watermarks via the 
WatermarkMultiplexer (i.e. take the minimum watermark over all 
non-idling assigned partitions in that task).


Now, we have a strange modification that the parser can change at 
runtime and only via the new parser, I can extract the timestamp field 
of the received byte[]. The parser change is told to me via another 
kafka topic.
I immediately thought about: That's a perfect usecase for broadcast 
streams: I connect the parser config stream and buffer the events in the 
connect function if the old parser is not able to parse the events up 
until a new parser arrives. (Doesn't sound too good from architecture 
all in all, but that's how it is).


My problem is the following: If I want to use broadcast stream, I must 
outsource my parser to a new pipeline step and don't parse within the 
KafkaDeserializationSchema any longer. This also means that Flink/Kafka 
can't produce the watermarks and I need to emulate the nice per 
partition ascending watermark assigner with the downstream multiplexer 
myself. Am I correct? Can I "easily" plugin to my stream (after 
broadcast parsing) this timestamp assigner with multiplexer logic? Could 
it also detect idle partitions like the KafkaConsumer? Or which way 
would you go? The only alternative I see is to greatly incrase the 
complexity of my KafkaDeserializationSchema to also read another kafka 
topic in background and as well buffer elements internally.. Sounds not 
very "flinkish".


Best regards
Theo




Re: flink job will restart over and over again if a taskmanager's disk damages

2020-10-22 Thread Timo Walther

Hi,

thanks for letting us know about this shortcoming.

I will link someone from the runtime team in the JIRA issue. Let's 
continue the discussion there.


Regards,
Timo

On 22.10.20 05:36, chenkaibit wrote:

Hi everyone:
  I met this Exception when a hard disk was damaged:
https://issues.apache.org/jira/secure/attachment/13009035/13009035_flink_disk_error.png 



I checked the code and found that flink will create a temp file  when 
Record length > 5 MB:


// SpillingAdaptiveSpanningRecordDeserializer.java if  (nextRecordLength > 
THRESHOLD_FOR_SPILLING) {
// create a spilling channel and put the data there 
this.spillingChannel = createSpillingChannel();

ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
FileUtils.writeCompletely(this.spillingChannel, toWrite);
}


The tempDir is random picked from all `tempDirs`. Well on yarn mode, one 
`tempDir`  usually represents one hard disk.
In may opinion, if a hard disk is damaged, taskmanager should pick 
another disk(tmpDir) for Spilling Channel, rather than throw an 
IOException, which causes flink job restart over and over again.


I have created a jira issue ( 
https://issues.apache.org/jira/browse/FLINK-18811 )  to track this. And 
I'm looking forward someone could help review the code or discuss about 
this issue.

thanks!




Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Timo Walther

Hi Manas,

you need to make sure to differentiate between what Flink calls 
"pre-flight phase" and "cluster phase".


The pre-flight phase is were the pipeline is constructed and all 
functions are instantiated. They are then later serialized and send to 
the cluster.


If you are reading your properties file in the `main()` method and store 
something in static variables, the content is available locally where 
the pipeline is constructed (e.g. in the client) but when the function 
instances are send to the cluster. Those static variables are fresh 
(thus empty) in the cluster JVMs. You need to either make sure that the 
properties file is read from each task manager again, or easier: pass 
the parameters as constructor parameters into the instances such that 
they are shipped together with the function itself.


I hope this helps.

Regards,
Timo


On 22.10.20 09:24, Manas Kale wrote:

Hi,
I am trying to write some data to a kafka topic and I have the following 
situation:


monitorStateStream

.process(new 
IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)

/... // Stream that outputs elements of type IDAP2Alarm/

.addSink(getFlinkKafkaProducer(ALARMS_KAFKA, 
Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);


private static  FlinkKafkaProducer 
getFlinkKafkaProducer(String servers, String topic) {
Properties properties =new Properties();
properties.setProperty("bootstrap.servers", servers);
return new FlinkKafkaProducer(topic,
  (element, timestamp) -> element.serializeForKafka(),
  properties,
  FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

/*
This interface is used to indicate that a class may be output to Kafka. 
Since Kafka treats all
data as bytes, classes that implement this interface have to provide an 
implementation for the

serializeForKafka() method.
*/
public interface IDAP2JSONOutput {

 // Implement serialization logic in this method.
ProducerRecord serializeForKafka();

}

public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{

private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);

@Override
public ProducerRecord serializeForKafka() {
 byte[] rawValue;
 byte[] rawKey;
 String k = getMonitorFeatureKey().getMonitorName() ;
 ...

 rawValue = val.getBytes();

 LOGGER.info("value of alarms topic from idap2 alarm : " + 
Config.ALARMS_TOPIC);


return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); // Line 95
}

}


Config.ALARMS_TOPIC is a static string that is read from a properties 
file. When I run this code on my IDE minicluster, it runs great with no 
problems. But when I submit it as a jar to the cluster, I get the 
following error:


Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
 at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:71) 
~[flink_POC-0.1.jar:?]
 at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:133) 
~[flink_POC-0.1.jar:?]
*at 
flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95) 
~[flink_POC-0.1.jar:?]*
 at 
flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62) 
~[flink_POC-0.1.jar:?]
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854) 
~[flink_POC-0.1.jar:?]
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) 
~[flink_POC-0.1.jar:?]
 at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
*at 
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69) 
~[flink_POC-0.1.jar:?]*
*at 
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25) 
~[flink_POC-0.1.jar:?]*
 at 

pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
 TABLE myUserTable (
  user_id STRING,
  user_name STRING
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
);Connector Options
| Option | Required | Default | Type | Description |
|
connector
| required | (none) | String | Specify what connector to use, valid values are:
elasticsearch-6: connect to Elasticsearch 6.x cluster
elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
|
|
hosts
| required | (none) | String | One or more Elasticsearch hosts to connect to, 
e.g. 'http://host_name:9092;http://host_name:9093'. |
|
index
| required | (none) | String | Elasticsearch index for every record. Can be a 
static index (e.g. 'myIndex') or a dynamic index (e.g. 
'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for more 
details. |
|
document-type
| required in 6.x | (none) | String | Elasticsearch document type. Not 
necessary anymore in elasticsearch-7. |
|
document-id.key-delimiter
| optional | _ | String | Delimiter for composite keys ("_" by default), e.g., 
"$" would result in IDs "KEY1$KEY2$KEY3"." |
|
failure-handler
| optional | fail | String | Failure handling strategy in case a request to 
Elasticsearch fails. Valid strategies are:
fail: throws an exception if a request fails and thus causes a job failure.
ignore: ignores failures and drops the request.
retry_rejected: re-adds requests that have failed due to queue capacity 
saturation.
custom class name: for failure handling with a ActionRequestFailureHandler 
subclass.
|
|
sink.flush-on-checkpoint
| optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink 
will not wait for all pending action requests to be acknowledged by 
Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
guarantees for at-least-once delivery of action requests. |
|
sink.bulk-flush.max-actions
| optional | 1000 | Integer | Maximum number of buffered actions per bulk 
request. Can be set to '0' to disable it. |
|
sink.bulk-flush.max-size
| optional | 2mb | MemorySize | Maximum size in memory of buffered actions per 
bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
|
sink.bulk-flush.interval
| optional | 1s | Duration | The interval to flush buffered actions. Can be set 
to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
allowing for complete async processing of buffered actions. |
|
sink.bulk-flush.backoff.strategy
| optional | DISABLED | String | Specify how to perform retries if any flush 
actions failed due to a temporary request error. Valid strategies are:
DISABLED: no retry performed, i.e. fail after the first request error.
CONSTANT: wait for backoff delay between retries.
EXPONENTIAL: initially wait for backoff delay and increase exponentially 
between retries.
|
|
sink.bulk-flush.backoff.max-retries
| optional | 8 | Integer | Maximum number of backoff retries. |
|
sink.bulk-flush.backoff.delay
| optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT 
backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, 
this is the initial base delay. |
|
connection.max-retry-timeout
| optional | (none) | Duration | Maximum timeout between retries. |
|
connection.path-prefix
| optional | (none) | String | Prefix string to be added to every REST 
communication, e.g., '/v1' |
|
format
| optional | json | String | Elasticsearch connector supports to specify a 
format. The format must produce a valid json document. By default uses built-in 
'json' format. Please refer to JSON Format page for more details. |






 

Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Manas Kale
Hi,
I am trying to write some data to a kafka topic and I have the following
situation:

monitorStateStream

   .process(new
IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)

   *... // Stream that outputs elements of type IDAP2Alarm*

.addSink(getFlinkKafkaProducer(ALARMS_KAFKA, Config.ALARMS_TOPIC)).name(
ALARM_SINK).uid(ALARM_SINK);

private static  FlinkKafkaProducer
getFlinkKafkaProducer(String servers, String topic) {
   Properties properties = new Properties();
   properties.setProperty("bootstrap.servers", servers);
   return new FlinkKafkaProducer(topic,
 (element, timestamp) -> element.serializeForKafka(),
 properties,
 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

/*
This interface is used to indicate that a class may be output to
Kafka. Since Kafka treats all
data as bytes, classes that implement this interface have to
provide an implementation for the
serializeForKafka() method.
 */
public interface IDAP2JSONOutput {

// Implement serialization logic in this method.
ProducerRecord serializeForKafka();

}

public class IDAP2Alarm extends Tuple5<...> implements  IDAP2JSONOutput{

private final Logger LOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);

@Override
public ProducerRecord serializeForKafka() {
byte[] rawValue;
byte[] rawKey;
String k = getMonitorFeatureKey().getMonitorName() ;
...

rawValue = val.getBytes();

LOGGER.info("value of alarms topic from idap2 alarm : " +
Config.ALARMS_TOPIC);

return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
rawValue); // Line 95
}

}


Config.ALARMS_TOPIC is a static string that is read from a properties file.
When I run this code on my IDE minicluster, it runs great with no problems.
But when I submit it as a jar to the cluster, I get the following error:

Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
at org.apache.kafka.clients.producer.ProducerRecord.(
ProducerRecord.java:71) ~[flink_POC-0.1.jar:?]
at org.apache.kafka.clients.producer.ProducerRecord.(
ProducerRecord.java:133) ~[flink_POC-0.1.jar:?]
*at
flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
~[flink_POC-0.1.jar:?]*
at flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(
StreamingJob.java:62) ~[flink_POC-0.1.jar:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:854) ~[flink_POC-0.1.jar:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:99) ~[flink_POC-0.1.jar:?]
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:56) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:52) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:30) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:53) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
*at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
~[flink_POC-0.1.jar:?]*
*at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
~[flink_POC-0.1.jar:?]*
at org.apache.flink.streaming.api.operators.KeyedProcessOperator
.processElement(KeyedProcessOperator.java:85) ~[flink-dist_2.11-1.11.0.jar:
1.11.0]
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.0
.jar:1.11.0]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.0.jar:
1.11.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor

Re: Correct way to package application.properties file with Flink JAR

2020-10-22 Thread Manas Kale
Okay, I solved the other issue with viewing logs which proved that correct,
non-null values are being loaded. I believe I have a different issue
altogether so will create a separate thread for that. Thanks for the help
Chesnay!

On Thu, Oct 22, 2020 at 11:30 AM Manas Kale  wrote:

> Hi Chesnay,
> The Config reader has everything static, so I tried using
>
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
>
> Here's the .pom file for that file:
>
> 
>
>
>   
>  src/main/resources
>  
> pipeline.properties
> log4j.properties
>  
>   
>
>
> ...
>
> 
>
>
> I can see the pipeline.properties packaged in my JAR at the root level
> (using maven package command).
> However, this works on my IDE minicluster but loads null values when
> submitted to the cluster.
> The Config class is not at the package root, rather it is a few levels
> deep. Does that make a difference and cause the getClassLoader() to treat
> an inner package as root?
>
>
>
> On Wed, Oct 21, 2020 at 6:06 PM Chesnay Schepler 
> wrote:
>
>> You could bundle said file in the jar and retrieve it via
>> getClass().getClassLoader().getResource("").
>>
>> On 10/21/2020 2:24 PM, Manas Kale wrote:
>> > Hi,
>> > I have a Flink job that I am packaging as a JAR that is submitted to
>> > the Flink cluster runtime. However, this JAR reads a few configuration
>> > values from a .properties file.
>> > What is the recommended way to package this properties file when
>> > submitting to a cluster? Do I have to copy it to a folder in my flink
>> > cluster installation?
>> >
>> > My own attempt is a somewhat convoluted method that is not working.
>> > Basically I set an environment variable that points to the properties
>> > file, and I use that at runtime to read configuration values. This
>> > works when I run it in my IDE as a minicluster but fails when I submit
>> > it to the cluster. I'm kind of stuck debugging this as for some reason
>> > I am not able to see the logs from the configuration reader class
>> > (asked a question about that in a separate thread).
>>
>>
>>


Re: Re: Flink-1.11.1 Rest API使用

2020-10-22 Thread Husky Zeng
其他接口大多不是post类型,你要修改成get或者其他的。可以先仔细阅读一下你发的这个页面上面的介绍,看看部署有没有出错。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: Flink-1.11.1 Rest API使用

2020-10-22 Thread caozhen
是使用的PATCH请求嘛?





amen...@163.com wrote
> 感谢回复,
> 
> 不过好像没有用,是因为我使用per-job方式提交到yarn,然后yarn代理的URL无法响应吗?
> 
> 我测试其他很多接口还都会报405异常‘HTTP method POST is not supported by this URL’...
> 
> best,
> amenhub
> 
> 
> 
>  
> 发件人: Husky Zeng
> 发送时间: 2020-10-22 11:17
> 收件人: user-zh
> 主题: Re: Flink-1.11.1 Rest API使用
> jobId=123456  mode选择cancel
>  
> 那么你发送
> http://ip:port/../jobs/123456?mode=cancel
>  
>  
>  
>  
>  
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Logs printed when running with minicluster but not printed when submitted as a job

2020-10-22 Thread Manas Kale
Thank you Chesnay. I found the logs being printed in the standalone session
when I used CLI to submit the job. However this only deepens the mystery of
the configuration file on the other thread - I see from the logs that the
configuration values are being read correctly, but when these values are
actually used, they are null!

On Wed, Oct 21, 2020 at 7:58 PM Manas Kale  wrote:

> I see, thanks for that clarification - I incorrectly assumed both methods
> of submission produce logs in the same place. I will have an update
> tomorrow!
>
> On Wed, Oct 21, 2020 at 6:12 PM Chesnay Schepler 
> wrote:
>
>> Hold on, let us clarify how you submit the job.
>>
>> Do you upload the jar via the WebUI, or with the CLI (e.g., ./bin/flink
>> run ...)?
>>
>> If it is the former, then it show up in the JM logs.
>> If it is the latter, then it should appear in the logs of the client
>> (i.e., log/flink-???-client-???.log).
>>
>> On 10/21/2020 2:17 PM, Manas Kale wrote:
>>
>> Hi Chesnay,
>> I checked the JobManager logs - it's not there either.
>>
>> On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler 
>> wrote:
>>
>>> The main method is executed in the JobManager process and never reaches
>>> the TaskExecutors (only the individual functions do).
>>> As such you have to take a peek into the JobManager logs.
>>>
>>> On 10/21/2020 11:37 AM, Manas Kale wrote:
>>>
>>> Hi,
>>> I have the following pattern:
>>>
>>> public static void main(String[] args) {
>>>
>>>// Get the exec environment. This could be a cluster or a 
>>> mini-cluster used for local development.  StreamExecutionEnvironment 
>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>   // Make the Flink runtime use event time as time metric.  
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>   // Generate a watermark every WATERMARK_PERIODICITY ms.  
>>> env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
>>>
>>> Config.readProperties();
>>>
>>> }
>>>
>>> class Config {
>>>
>>> private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
>>>
>>> // Populates variables above with values read from config file.public 
>>> static void readProperties() throws Exception {
>>> Properties prop = new Properties();
>>>
>>> String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
>>> if (propFileLocation == null) {
>>> System.err.println("Properties file pointer env variable 
>>> FLINK_CONFIG_LOCATION missing!");
>>> System.exit(1);
>>> }
>>> FileInputStream is = null;
>>> try {
>>>is = new FileInputStream(new File(propFileLocation));
>>>
>>> } catch (Exception e) {
>>> System.err.println("File " + propFileLocation + " not found!");
>>> System.exit(1);
>>> }
>>>
>>> prop.load(is);
>>>
>>>* LOGGER.info(".."); // prints content read from property file*
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> When I run this program as a minicluster, I am able to see the
>>> LOGGER.info() being printed in my console.
>>> However, when I submit this job as a JAR to a flink cluster, the Config
>>> class's  LOGGER.info()* line above is never printed in the
>>> taskmanager's logs!* I don't understand why this is happening because
>>> log  statements from other operators are definitely being printed in the
>>> log files on the cluster. What am I doing wrong?
>>>
>>> My log4j.properties file is:
>>>
>>> log4j.rootLogger=INFO, console, 
>>> fileAppenderlog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>>>  %-5p %-60c %x - 
>>> %m%nlog4j.appender.fileAppender=org.apache.log4j.RollingFileAppenderlog4j.appender.fileAppender.layout=org.apache.log4j.PatternLayoutlog4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS}
>>>  %-5p %-60c %x - 
>>> %m%nlog4j.appender.fileAppender.File=dataProcessingEngine.loglog4j.appender.fileAppender.policies.type
>>>  = Policieslog4j.appender.fileAppender.policies.size.type = 
>>> SizeBasedTriggeringPolicylog4j.appender.fileAppender.policies.size.size=10MBlog4j.appender.fileAppender.strategy.type
>>>  = DefaultRolloverStrategylog4j.appender.fileAppender.strategy.max = 5
>>>
>>>
>>> Thank you,
>>> Manas Kale
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: Correct way to package application.properties file with Flink JAR

2020-10-22 Thread Manas Kale
Hi Chesnay,
The Config reader has everything static, so I tried using

Config.class.getClassLoader().getResourceAsStream("pipeline.properties");

Here's the .pom file for that file:



   
  
 src/main/resources
 
pipeline.properties
log4j.properties
 
  
   

...




I can see the pipeline.properties packaged in my JAR at the root level
(using maven package command).
However, this works on my IDE minicluster but loads null values when
submitted to the cluster.
The Config class is not at the package root, rather it is a few levels
deep. Does that make a difference and cause the getClassLoader() to treat
an inner package as root?



On Wed, Oct 21, 2020 at 6:06 PM Chesnay Schepler  wrote:

> You could bundle said file in the jar and retrieve it via
> getClass().getClassLoader().getResource("").
>
> On 10/21/2020 2:24 PM, Manas Kale wrote:
> > Hi,
> > I have a Flink job that I am packaging as a JAR that is submitted to
> > the Flink cluster runtime. However, this JAR reads a few configuration
> > values from a .properties file.
> > What is the recommended way to package this properties file when
> > submitting to a cluster? Do I have to copy it to a folder in my flink
> > cluster installation?
> >
> > My own attempt is a somewhat convoluted method that is not working.
> > Basically I set an environment variable that points to the properties
> > file, and I use that at runtime to read configuration values. This
> > works when I run it in my IDE as a minicluster but fails when I submit
> > it to the cluster. I'm kind of stuck debugging this as for some reason
> > I am not able to see the logs from the configuration reader class
> > (asked a question about that in a separate thread).
>
>
>