Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-05 Thread bat man
Hi Xintong Song,
I tried using the java options to generate heap dump referring to docs[1]
in flink-conf.yaml, however after adding this the task manager containers
are not coming up. Note that I am using EMR. Am i doing anything wrong here?

env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof"

Thanks,
Hemant





On Fri, Mar 5, 2021 at 3:05 PM Xintong Song  wrote:

> Hi Hemant,
>
> This exception generally suggests that JVM is running out of heap memory.
> Per the official documentation [1], the amount of live data barely fits
> into the Java heap having little free space for new allocations.
>
> You can try to increase the heap size following these guides [2].
>
> If a memory leak is suspected, to further understand where the memory is
> consumed, you may need to dump the heap on OOMs and looking for unexpected
> memory usages leveraging profiling tools.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>
>
>
> On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:
>
>> Hi,
>>
>> Getting the below OOM but the job failed 4-5 times and recovered from
>> there.
>>
>> j
>>
>>
>>
>>
>>
>>
>>
>> *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
>> exceededat
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>   at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)Caused by:
>> java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>
>> Is there any way I can debug this. since the job after a few re-starts
>> started running fine. what could be the reason behind this.
>>
>> Thanks,
>> Hemant
>>
>


Re: reading file from s3

2021-03-05 Thread Avi Levi
Does anyone by any chance have a working example (of course without the
credentials etc') that can be shared on github ?simply reading/writing a
file from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks

On Thu, Mar 4, 2021 at 7:30 PM Avi Levi  wrote:

> Sure, This is the full exception stacktrace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at 

questions about broadcasts

2021-03-05 Thread Marco Villalobos
Is it possible for an operator to receive two different kinds of
broadcasts?

Is it possible for an operator to receive two different types of streams
and a broadcast? For example, I know there is a KeyedCoProcessFunction, but
is there a version of that which can also receive broadcasts?


Invitation to Beam College

2021-03-05 Thread Mara Ruvalcaba

Hi Flink community,

You are invited to Improve your data processing skills with the *Beam 
College* webinars!


If you know about Apache Beam but haven’t used it in production yet, or 
you want to learn best practices to optimize your Beam pipelines, then 
Beam College is for you!


Beam College, is a *free 5-day webinar series *designed to be flexible, 
so you can sign up and drop-in based on topics of your interest and 
needs. Don’t miss the opportunity to learn practical tips, experience 
interactive demos and engage with our Beam experts!


Some of the topics we’ll cover:

    Introduction to the Data processing ecosystem
    Advanced distributed data processing with Apache Beam
    Features to scale and productionalize your business case
    Strategies for performance and cost optimization
    Best practices for debugging Beam pipelines

Check out the full curriculum at: https://beamcollege.dev/all-courses/

I hope you can join us!

Regards,

Mara.

--
Mara Ruvalcaba
COO, SG Software Guru & Nearshore Link
USA: 512 296 2884
MX: 55 5239 5502



Re: Broadcasting to multiple operators

2021-03-05 Thread David Anderson
Glad to hear it! Thanks for letting us know.

David

On Fri, Mar 5, 2021 at 10:22 PM Roger  wrote:

> Confirmed. This worked!
> Thanks!
> Roger
>
> On Fri, Mar 5, 2021 at 12:41 PM Roger  wrote:
>
>> Hey David.
>> Thank you very much for your response. This is making sense now. It was
>> confusing because I was able to use the Broadcast stream prior to adding
>> the second stream. However, now I realize that this part of the pipeline
>> occurs after the windowing so I'm not affected the same way. This is
>> definitely going to help fix my problem.
>>
>> On Fri, Mar 5, 2021 at 12:33 PM David Anderson 
>> wrote:
>>
>>> This is a watermarking issue. Whenever an operator has two or more input
>>> streams, its watermark is the minimum of watermarks of the incoming
>>> streams. In this case your broadcast stream doesn't have a watermark
>>> generator, so it is preventing the watermarks from advancing. This in turn
>>> is preventing the windows from being triggered.
>>>
>>> You should call assignTimestampsAndWatermarks on the broadcast stream.
>>> If time is irrelevant for this stream, you could do something like this:
>>>
>>> public static class ConfigStreamAssigner implements 
>>> AssignerWithPeriodicWatermarks {
>>>   @Nullable
>>>   @Override
>>>   public Watermark getCurrentWatermark() {
>>>  return Watermark.MAX_WATERMARK;
>>>   }
>>>
>>>   @Override
>>>   public long extractTimestamp(T element, long 
>>> previousElementTimestamp) {
>>>  return 0;
>>>   }
>>> }
>>>
>>>
>>> By setting the watermark for this stream to MAX_WATERMARK, you are
>>> effectively removing this stream's watermarks from consideration.
>>>
>>> Regards,
>>> David
>>>
>>> On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:
>>>
 Hello.
 I am having an issue with a Flink 1.8 pipeline when trying to consume
 broadcast state across multiple operators.  I currently
 have a working pipeline that looks like the following:

 records
 .assignTimestampsAndWatermarks(
 new BoundedOutOfOrdernessGenerator(

 Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
 .keyBy(new ApplicationNameKeySelector())
 .window(
 TumblingEventTimeWindows.of(

 Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
 .aggregate(new Aggregator())
 .connect(configurationBroadcastStream)
 .process(excluder)
 .addSink(KinesisProducer.createSinkFromStaticConfig(properties));

 * records are a FlinkKafkaConsumer stream
 * configurationBroadcastStream is a FlinkKafkaConsumer
 * aggregator is an AggregateFunction
 * filter is a BroadcastProcessFunction


 I now have requirements to filter out transactions at the beginning of
 the pipeline using the same broadcast stream I am consuming towards the end
 of the pipeline. I updated the pipeline to look like this:

 records
 .assignTimestampsAndWatermarks(
 new BoundedOutOfOrdernessGenerator(

 Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
 .connect(configurationBroadcastStream) **new**
 .process(filter) **new**
 .keyBy(new ApplicationNameKeySelector())
 .window(
 TumblingEventTimeWindows.of(

 Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
 .aggregate(new Aggregator())
 .connect(configurationBroadcastStream)
 .process(excluder)
 .addSink(KinesisProducer.createSinkFromStaticConfig(properties));

 * records are a FlinkKafkaConsumer stream
 * configurationBroadcastStream is a FlinkKafkaConsumer
 * aggregator is an AggregateFunction
 * excluder is a BroadcastProcessFunction

 With this change, the aggregated records are not making it into the
 excluder process.

 1. The aggregator add is working. I can see this in the logs.
 2. The aggregator getResult is never called. This makes me think this
 is a window issue.
 3. Both processBroadcastElement methods from the two broadcast
 functions are working and
  retrieving the broadcasted state. I see this in logging.
 4. The pipeline definitely worked prior to me adding in the second
 .connect and .process at the beginning of the pipeline.
 5. I have considered creating a new record object from the new
 process(filter) that contains the config retrieved from the broadcast
 stream along with the transactions and passing that down the pipeline but
 that is really not desirable.

 Any ideas on what might be going on here?

 Thanks!
 Roger




Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-05 Thread Kevin Lam
Thanks Shuiqiang! That's really helpful, we'll give the connectors a try.

On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen  wrote:

> Hi Kevin,
>
> Thank you for your questions. Currently, users are not able to defined
> custom source/sinks in Python. This is a greate feature that can unify the
> end to end PyFlink application development in Python and is a large topic
> that we have no plan to support at present.
>
> As you have noticed that `the Python DataStream API has several connectors
> [2] that use Py4J+Java gateways to interoperate with Java source/sinks`.
> These connectors are the extensions of the Python abstract class named
> `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java
> source/sink instance and maintain it to enable the interoperation between
> Python and Java.  They can also accept a string of the full name of a
> Java/Scala defined Source/SinkFunction class and create the corresponding
> java instance. Bellow are the definition of these classes:
>
> class JavaFunctionWrapper(object):
> """
> A wrapper class that maintains a Function implemented in Java.
> """
>
> def __init__(self, j_function: Union[str, JavaObject]):
> # TODO we should move this part to the get_java_function() to perform 
> a lazy load.
> if isinstance(j_function, str):
> j_func_class = get_gateway().jvm.__getattr__(j_function)
> j_function = j_func_class()
> self._j_function = j_function
>
> def get_java_function(self):
> return self._j_function
>
>
>
> class SourceFunction(JavaFunctionWrapper):
> """
> Base class for all stream data source in Flink.
> """
>
> def __init__(self, source_func: Union[str, JavaObject]):
> """
> Constructor of SinkFunction.
>
> :param source_func: The java SourceFunction object.
> """
> super(SourceFunction, self).__init__(source_func)
>
>
> class SinkFunction(JavaFunctionWrapper):
> """
> The base class for SinkFunctions.
> """
>
> def __init__(self, sink_func: Union[str, JavaObject]):
> """
> Constructor of SinkFunction.
>
> :param sink_func: The java SinkFunction object or the full name of the
> SinkFunction class.
> """
> super(SinkFunction, self).__init__(sink_func)
>
> Therefore, you are able to defined custom sources/sinks in Scala and apply
> them in Python. Here is the recommended approach for implementation:
>
> class MyBigTableSink(SinkFunction):
> def __init__(self, class_name: str):
> super(MyBigTableSink, self).__init__(class_name)
>
>
> def example():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars('/the/path/of/your/MyBigTableSink.jar')
> # ...
> ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
> env.execute("Application with Custom Sink")
>
>
> if __name__ == '__main__':
> example()
>
> Remember that you must add the jar of the Scala defined SinkFunction by
> calling `env.add_jars()` before adding the SinkFunction. And your custom
> sources/sinks function must be the extension of `SourceFunction` and
> `SinkFunction`.
>
> Any further questions are welcomed!
>
> Best,
> Shuiqiang
>
>
> Kevin Lam  于2021年3月3日周三 上午2:50写道:
>
>> Hello everyone,
>>
>> I have some questions about the Python API that hopefully folks in the
>> Apache Flink community can help with.
>>
>> A little background, I’m interested in using the Python Datastream API
>> because of stakeholders who don’t have a background in Scala/Java, and
>> would prefer Python if possible. Our team is open to maintaining Scala
>> constructs on our end, however we are looking to expose Flink for stateful
>> streaming via a Python API to end-users.
>>
>> Questions:
>>
>> 1/ The docs mention that custom Sources and Sinks cannot be defined in
>> Python, but must be written in Java/Scala [1]. What is the recommended
>> approach for interoperating between custom sinks/sources written in Scala,
>> with the Python API? If nothing is currently supported, is it on the road
>> map?
>>
>> 2/ Also, I’ve noted that the Python DataStream API has several connectors
>> [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
>> there a way for users to build their own connectors? What would this
>> process entail?
>>
>> Ideally, we’d like to be able to define custom sources/sinks in Scala and
>> use them in our Python API Flink Applications. For example, defining a
>> BigTable sink in Scala for use in the Python API:
>>
>>
>> [3]
>>
>> Where MyBigTableSink is just somehow importing a Scala defined sink.
>>
>> More generally, we’re interested in learning more about Scala/Python
>> interoperability in Flink, and how we can expose the power of Flink’s Scala
>> APIs to Python. Open to any suggestions, strategies, etc.
>>
>> Looking forward to any thoughts!
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks
>>
>> [2]
>> 

Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-05 Thread Kevin Lam
Hello everyone,

I'm looking to run a Pyflink application run in a distributed fashion,
using kubernetes, and am currently facing issues. I've successfully gotten
a Scala Flink Application to run using the manifests provided at [0]

I attempted to run the application by updating the jobmanager command args
from

 args: ["standalone-job", "--job-classname", "com.job.ClassName",
, ]

to

args: ["standalone-job", "--python", "my_python_app.py", , ]

But this didn't work. It resulted in the following error:

Caused by: java.lang.LinkageError: loader constraint violation: loader
org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
org.apache.commons.cli.Options. A different class with the same name was
previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
module of loader 'app'

I was able to get things to 'run' by setting args to:

args: ["python", "my_python_app.py", , ]


But I'm not sure if things were running in a distributed fashion or not.

1/ Is there a good way to check if the task pods were being correctly
utilized?

2/ Are there any similar examples to [0] for how to run Pyflink jobs on
kubernetes?

Open to any suggestions you may have. Note: we'd prefer not to run using
the native K8S route outlined at [1] because we need to maintain the
ability to customize certain aspects of the deployment (eg. mounting SSDs
to some of the pods)

Thanks in advance!

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode


Re: Broadcasting to multiple operators

2021-03-05 Thread Roger
Confirmed. This worked!
Thanks!
Roger

On Fri, Mar 5, 2021 at 12:41 PM Roger  wrote:

> Hey David.
> Thank you very much for your response. This is making sense now. It was
> confusing because I was able to use the Broadcast stream prior to adding
> the second stream. However, now I realize that this part of the pipeline
> occurs after the windowing so I'm not affected the same way. This is
> definitely going to help fix my problem.
>
> On Fri, Mar 5, 2021 at 12:33 PM David Anderson 
> wrote:
>
>> This is a watermarking issue. Whenever an operator has two or more input
>> streams, its watermark is the minimum of watermarks of the incoming
>> streams. In this case your broadcast stream doesn't have a watermark
>> generator, so it is preventing the watermarks from advancing. This in turn
>> is preventing the windows from being triggered.
>>
>> You should call assignTimestampsAndWatermarks on the broadcast stream. If
>> time is irrelevant for this stream, you could do something like this:
>>
>> public static class ConfigStreamAssigner implements 
>> AssignerWithPeriodicWatermarks {
>>   @Nullable
>>   @Override
>>   public Watermark getCurrentWatermark() {
>>  return Watermark.MAX_WATERMARK;
>>   }
>>
>>   @Override
>>   public long extractTimestamp(T element, long previousElementTimestamp) 
>> {
>>  return 0;
>>   }
>> }
>>
>>
>> By setting the watermark for this stream to MAX_WATERMARK, you are
>> effectively removing this stream's watermarks from consideration.
>>
>> Regards,
>> David
>>
>> On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:
>>
>>> Hello.
>>> I am having an issue with a Flink 1.8 pipeline when trying to consume
>>> broadcast state across multiple operators.  I currently
>>> have a working pipeline that looks like the following:
>>>
>>> records
>>> .assignTimestampsAndWatermarks(
>>> new BoundedOutOfOrdernessGenerator(
>>>
>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>>> .keyBy(new ApplicationNameKeySelector())
>>> .window(
>>> TumblingEventTimeWindows.of(
>>>
>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>>> .aggregate(new Aggregator())
>>> .connect(configurationBroadcastStream)
>>> .process(excluder)
>>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>
>>> * records are a FlinkKafkaConsumer stream
>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>> * aggregator is an AggregateFunction
>>> * filter is a BroadcastProcessFunction
>>>
>>>
>>> I now have requirements to filter out transactions at the beginning of
>>> the pipeline using the same broadcast stream I am consuming towards the end
>>> of the pipeline. I updated the pipeline to look like this:
>>>
>>> records
>>> .assignTimestampsAndWatermarks(
>>> new BoundedOutOfOrdernessGenerator(
>>>
>>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>>> .connect(configurationBroadcastStream) **new**
>>> .process(filter) **new**
>>> .keyBy(new ApplicationNameKeySelector())
>>> .window(
>>> TumblingEventTimeWindows.of(
>>>
>>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>>> .aggregate(new Aggregator())
>>> .connect(configurationBroadcastStream)
>>> .process(excluder)
>>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>>
>>> * records are a FlinkKafkaConsumer stream
>>> * configurationBroadcastStream is a FlinkKafkaConsumer
>>> * aggregator is an AggregateFunction
>>> * excluder is a BroadcastProcessFunction
>>>
>>> With this change, the aggregated records are not making it into the
>>> excluder process.
>>>
>>> 1. The aggregator add is working. I can see this in the logs.
>>> 2. The aggregator getResult is never called. This makes me think this is
>>> a window issue.
>>> 3. Both processBroadcastElement methods from the two broadcast functions
>>> are working and
>>>  retrieving the broadcasted state. I see this in logging.
>>> 4. The pipeline definitely worked prior to me adding in the second
>>> .connect and .process at the beginning of the pipeline.
>>> 5. I have considered creating a new record object from the new
>>> process(filter) that contains the config retrieved from the broadcast
>>> stream along with the transactions and passing that down the pipeline but
>>> that is really not desirable.
>>>
>>> Any ideas on what might be going on here?
>>>
>>> Thanks!
>>> Roger
>>>
>>>


Re: Dynamic JDBC Sink Support

2021-03-05 Thread David Anderson
Rion,

A given JdbcSink can only write to one table, but if the number of tables
involved isn't unreasonable, you could use a separate sink for each table,
and use side outputs [1] from a process function to steer each record to
the appropriate sink.

I suggest you avoid trying to implement a sink.

In general, custom sinks need to implement their own checkpointing, though
there is a generic two phase commit sink you can use as a starting point
for implementing a transactional sink. FYI, the JDBC sink has been reworked
for 1.13 to include exactly-once guarantees based on the XA standard [2].

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink

On Fri, Mar 5, 2021 at 7:34 PM Rion Williams  wrote:

> Hi all,
>
> I’ve been playing around with a proof-of-concept application with Flink to
> assist a colleague of mine. The application is fairly simple (take in a
> single input and identify various attributes about it) with the goal of
> outputting those to separate tables in Postgres:
>
> object AttributeIdentificationJob {
> @JvmStatic
> fun main(args: Array) {
> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
> stream
> .addSource(ReadFromKafka())
> .process(IdentifyAttributesFunction())
> .addSink(DynamicJdbcHere())
>
> // execute program
> stream.execute("Attribute Identification")
> }
> }
>
> Considering my attributes may be of varying types (all implementing an
> Attribute interface), I don't know if the existing JdbcSink functionality
> or some variant of it (i.e. one of the dynamic ones that I see listed)
> could handle this functionality. Essentially for a given "bundle" of
> records, I'd need to ensure that each respective type of attribute was
> upserted into its corresponding table within a Postgres database.
>
> Is that something that the connector can handle on it's own? Or would I
> need to implement my own RichSinkFunction> that
> could handle opening a connection to Postgres and dynamically generating
> the appropriate UPSERT statements to handle sending the records? As a
> follow up to that, if I did need to write my own RichSinkFunction, would I
> need to implement my own checkmarking for resilience purposes or does that
> come along for the ride for RichSinkFunctions?
>
> Any insight or approaches would be welcome!
>
> Thanks,
>
> Rion
>


Re: Broadcasting to multiple operators

2021-03-05 Thread Roger
Hey David.
Thank you very much for your response. This is making sense now. It was
confusing because I was able to use the Broadcast stream prior to adding
the second stream. However, now I realize that this part of the pipeline
occurs after the windowing so I'm not affected the same way. This is
definitely going to help fix my problem.

On Fri, Mar 5, 2021 at 12:33 PM David Anderson  wrote:

> This is a watermarking issue. Whenever an operator has two or more input
> streams, its watermark is the minimum of watermarks of the incoming
> streams. In this case your broadcast stream doesn't have a watermark
> generator, so it is preventing the watermarks from advancing. This in turn
> is preventing the windows from being triggered.
>
> You should call assignTimestampsAndWatermarks on the broadcast stream. If
> time is irrelevant for this stream, you could do something like this:
>
> public static class ConfigStreamAssigner implements 
> AssignerWithPeriodicWatermarks {
>   @Nullable
>   @Override
>   public Watermark getCurrentWatermark() {
>  return Watermark.MAX_WATERMARK;
>   }
>
>   @Override
>   public long extractTimestamp(T element, long previousElementTimestamp) {
>  return 0;
>   }
> }
>
>
> By setting the watermark for this stream to MAX_WATERMARK, you are
> effectively removing this stream's watermarks from consideration.
>
> Regards,
> David
>
> On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:
>
>> Hello.
>> I am having an issue with a Flink 1.8 pipeline when trying to consume
>> broadcast state across multiple operators.  I currently
>> have a working pipeline that looks like the following:
>>
>> records
>> .assignTimestampsAndWatermarks(
>> new BoundedOutOfOrdernessGenerator(
>>
>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>> .keyBy(new ApplicationNameKeySelector())
>> .window(
>> TumblingEventTimeWindows.of(
>>
>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>> .aggregate(new Aggregator())
>> .connect(configurationBroadcastStream)
>> .process(excluder)
>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>
>> * records are a FlinkKafkaConsumer stream
>> * configurationBroadcastStream is a FlinkKafkaConsumer
>> * aggregator is an AggregateFunction
>> * filter is a BroadcastProcessFunction
>>
>>
>> I now have requirements to filter out transactions at the beginning of
>> the pipeline using the same broadcast stream I am consuming towards the end
>> of the pipeline. I updated the pipeline to look like this:
>>
>> records
>> .assignTimestampsAndWatermarks(
>> new BoundedOutOfOrdernessGenerator(
>>
>> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
>> .connect(configurationBroadcastStream) **new**
>> .process(filter) **new**
>> .keyBy(new ApplicationNameKeySelector())
>> .window(
>> TumblingEventTimeWindows.of(
>>
>> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
>> .aggregate(new Aggregator())
>> .connect(configurationBroadcastStream)
>> .process(excluder)
>> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>>
>> * records are a FlinkKafkaConsumer stream
>> * configurationBroadcastStream is a FlinkKafkaConsumer
>> * aggregator is an AggregateFunction
>> * excluder is a BroadcastProcessFunction
>>
>> With this change, the aggregated records are not making it into the
>> excluder process.
>>
>> 1. The aggregator add is working. I can see this in the logs.
>> 2. The aggregator getResult is never called. This makes me think this is
>> a window issue.
>> 3. Both processBroadcastElement methods from the two broadcast functions
>> are working and
>>  retrieving the broadcasted state. I see this in logging.
>> 4. The pipeline definitely worked prior to me adding in the second
>> .connect and .process at the beginning of the pipeline.
>> 5. I have considered creating a new record object from the new
>> process(filter) that contains the config retrieved from the broadcast
>> stream along with the transactions and passing that down the pipeline but
>> that is really not desirable.
>>
>> Any ideas on what might be going on here?
>>
>> Thanks!
>> Roger
>>
>>


Re: Broadcasting to multiple operators

2021-03-05 Thread David Anderson
This is a watermarking issue. Whenever an operator has two or more input
streams, its watermark is the minimum of watermarks of the incoming
streams. In this case your broadcast stream doesn't have a watermark
generator, so it is preventing the watermarks from advancing. This in turn
is preventing the windows from being triggered.

You should call assignTimestampsAndWatermarks on the broadcast stream. If
time is irrelevant for this stream, you could do something like this:

public static class ConfigStreamAssigner implements
AssignerWithPeriodicWatermarks {
  @Nullable
  @Override
  public Watermark getCurrentWatermark() {
 return Watermark.MAX_WATERMARK;
  }

  @Override
  public long extractTimestamp(T element, long previousElementTimestamp) {
 return 0;
  }
}


By setting the watermark for this stream to MAX_WATERMARK, you are
effectively removing this stream's watermarks from consideration.

Regards,
David

On Fri, Mar 5, 2021 at 5:48 PM Roger  wrote:

> Hello.
> I am having an issue with a Flink 1.8 pipeline when trying to consume
> broadcast state across multiple operators.  I currently
> have a working pipeline that looks like the following:
>
> records
> .assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessGenerator(
>
> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
> .keyBy(new ApplicationNameKeySelector())
> .window(
> TumblingEventTimeWindows.of(
>
> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
> .aggregate(new Aggregator())
> .connect(configurationBroadcastStream)
> .process(excluder)
> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>
> * records are a FlinkKafkaConsumer stream
> * configurationBroadcastStream is a FlinkKafkaConsumer
> * aggregator is an AggregateFunction
> * filter is a BroadcastProcessFunction
>
>
> I now have requirements to filter out transactions at the beginning of the
> pipeline using the same broadcast stream I am consuming towards the end of
> the pipeline. I updated the pipeline to look like this:
>
> records
> .assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessGenerator(
>
> Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
> .connect(configurationBroadcastStream) **new**
> .process(filter) **new**
> .keyBy(new ApplicationNameKeySelector())
> .window(
> TumblingEventTimeWindows.of(
>
> Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
> .aggregate(new Aggregator())
> .connect(configurationBroadcastStream)
> .process(excluder)
> .addSink(KinesisProducer.createSinkFromStaticConfig(properties));
>
> * records are a FlinkKafkaConsumer stream
> * configurationBroadcastStream is a FlinkKafkaConsumer
> * aggregator is an AggregateFunction
> * excluder is a BroadcastProcessFunction
>
> With this change, the aggregated records are not making it into the
> excluder process.
>
> 1. The aggregator add is working. I can see this in the logs.
> 2. The aggregator getResult is never called. This makes me think this is a
> window issue.
> 3. Both processBroadcastElement methods from the two broadcast functions
> are working and
>  retrieving the broadcasted state. I see this in logging.
> 4. The pipeline definitely worked prior to me adding in the second
> .connect and .process at the beginning of the pipeline.
> 5. I have considered creating a new record object from the new
> process(filter) that contains the config retrieved from the broadcast
> stream along with the transactions and passing that down the pipeline but
> that is really not desirable.
>
> Any ideas on what might be going on here?
>
> Thanks!
> Roger
>
>


Dynamic JDBC Sink Support

2021-03-05 Thread Rion Williams
Hi all,

I’ve been playing around with a proof-of-concept application with Flink to
assist a colleague of mine. The application is fairly simple (take in a
single input and identify various attributes about it) with the goal of
outputting those to separate tables in Postgres:

object AttributeIdentificationJob {
@JvmStatic
fun main(args: Array) {
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

stream
.addSource(ReadFromKafka())
.process(IdentifyAttributesFunction())
.addSink(DynamicJdbcHere())

// execute program
stream.execute("Attribute Identification")
}
}

Considering my attributes may be of varying types (all implementing an
Attribute interface), I don't know if the existing JdbcSink functionality
or some variant of it (i.e. one of the dynamic ones that I see listed)
could handle this functionality. Essentially for a given "bundle" of
records, I'd need to ensure that each respective type of attribute was
upserted into its corresponding table within a Postgres database.

Is that something that the connector can handle on it's own? Or would I
need to implement my own RichSinkFunction> that could
handle opening a connection to Postgres and dynamically generating the
appropriate UPSERT statements to handle sending the records? As a follow up
to that, if I did need to write my own RichSinkFunction, would I need to
implement my own checkmarking for resilience purposes or does that come
along for the ride for RichSinkFunctions?

Any insight or approaches would be welcome!

Thanks,

Rion


Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-05 Thread Yuval Itzchakov
Hi Timo,
After investigating this further, this is actually non related to
implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a
watermark (see my example in the original mail), the plan will always
include a LogicalWatermarkAssigner. This assigner that is between the
LogicalTableScan and the LogicalFilter will then go on and fail the
HepPlanner from invoking the optimization since it requires
LogicalTableScan to be a direct child of LogicalFilter. Since I have
LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther  wrote:

> Hi Yuval,
>
> sorry that nobody replied earlier. Somehow your email fell through the
> cracks.
>
> If I understand you correctly, could would like to implement a table
> source that implements both `SupportsWatermarkPushDown` and
> `SupportsFilterPushDown`?
>
> The current behavior might be on purpose. Filters and Watermarks are not
> very compatible. Filtering would also mean that records (from which
> watermarks could be generated) are skipped. If the filter is very
> strict, we would not generate any new watermarks and the pipeline would
> stop making progress in time.
>
> Watermark push down is only necessary, if per-partition watermarks are
> required. Otherwise the watermarks are generated in a subsequent
> operator after the source. So you can still use rowtime without
> implementing `SupportsWatermarkPushDown` in your custom source.
>
> I will lookp in Shengkai who worked on this topic recently.
>
> Regards,
> Timo
>
>
> On 04.03.21 18:52, Yuval Itzchakov wrote:
> > Bumping this up again, would appreciate any help if anyone is familiar
> > with the blink planner.
> >
> > Thanks,
> > Yuval.
> >
> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov  > > wrote:
> >
> > Hi Jark,
> > Would appreciate your help with this.
> >
> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan  > > wrote:
> >
> > Hi Yuval,
> >
> > I'm not familiar with the Blink planner but probably Jark can
> help.
> >
> > Regards,
> > Roman
> >
> >
> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
> > mailto:yuva...@gmail.com>> wrote:
> >
> > Update: When I don't set the watermark explicitly on the
> > TableSchema, `applyWatermarkStrategy` never gets called on
> > my ScanTableSource, which does make sense. But now the
> > question is what should be done? This feels a bit
> unintuitive.
> >
> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
> > mailto:yuva...@gmail.com>> wrote:
> >
> > Hi,
> > Flink 1.12.1, Blink Planner, Scala 2.12
> >
> > I have the following logical plan:
> >
> >
>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
> baz, hello_world, a, b])
> > +- LogicalProject(value=[$2],
> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET
> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
> > "UTF-16LE"], b=[EMPTY_MAP()])
> > +- LogicalFilter(condition=[AND(=($4,
> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
> >+- LogicalWatermarkAssigner(rowtime=[bar],
> > watermark=[$0])
> >   +- LogicalTableScan(table=[[default_catalog,
> > default_database, foo]])
> >
> > I have a custom source which creates a TableSchema based
> > on an external table. When I create the schema, I push
> > the watermark definition to the schema:
> >
> > image.png
> >
> > When the HepPlanner starts the optimization phase and
> > reaches the "PushFilterInotTableSourceScanRule", it
> > matches on the LogicalFilter in the definition. But
> > then, since the RelOptRuleOperandChildPolicy is set to
> > "SOME", it attempts to do a full match on the child
> > nodes. Since the rule is defined as so:
> >
> > image.png
> >
> > The child filter fails since the immediate child of the
> > filter is a "LocalWatermarkAssigner", and not the
> > "LogicalTableScan" which is the grandchild:
> >
> > image.png
> >
> > Is this the desired behavior? Should I create the
> > TableSchema without the row time attribute and use
> > "SupportsWatermarkPushdown" to generate the watermark
> > dynamically from the source record?
> >
> > --
> > 

RE: Need information on latency metrics

2021-03-05 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Timo,

Yes I have gone through the link. But for the other metrics documentation has 
description.
For example, 
numBytesOut -   The total number of bytes this task has emitted.
lastCheckpointSize - The total size of the last checkpoint (in bytes).

For the latency metrics I don't see such description due to which it is 
difficult to understand what is the count listed for each operator, how it is 
incrementing and values.
It will be helpful if some more information is provided regarding these metrics.

-Original Message-
From: Timo Walther  
Sent: Friday, March 5, 2021 8:52 PM
To: user@flink.apache.org
Subject: Re: Need information on latency metrics

Hi Suchithra,

did you see this section in the docs?

https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking

Regards,
Timo

On 05.03.21 15:31, V N, Suchithra (Nokia - IN/Bangalore) wrote:
> Hi,
> 
> I am using flink 1.12.1 version and trying to explore latency metrics 
> with Prometheus. I have enabled latency metrics by adding
> *"metrics.latency.interval: 1" *in flink-conf.yaml.
> 
> I have submitted a flink streaming job which has
> Source->flatmap->process->sink which is chained into single task. And 
> Source->flatmap->process->I
> can see below latency metrics in Prometheus.
> 
> flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_i
> ndex_latency_count
> 
> flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_i
> ndex_latency
> 
> Prometheus output :
> 
> *flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency_count*
> 
> **
> 
> /flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency_count{app="met-flink-taskmanager",
> host="", instance=" ", job="kubernetes-pods-insecure", 
> job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
> kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc",
> namespace="", operator_id="3d05135cf7d8f1375d8f655ba9d20255",
> operator_subtask_index="0", pod_template_hash="5b58cdf557", 
> source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/
> 
> */27804583/*
> 
> /flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency_count{app="met-flink-taskmanager",
> host="", instance=" ", job="kubernetes-pods-insecure", 
> job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
> kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc",
> namespace=" ", operator_id="570f707193e0fe32f4d86d067aba243b",
> operator_subtask_index="0", pod_template_hash="5b58cdf557", 
> source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/
> 
> */27804583/*
> 
> /flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency_count{app="met-flink-taskmanager",
> host="", instance=" ", job="kubernetes-pods-insecure", 
> job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
> kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc",
> namespace=" ", operator_id="ba40499bacce995f15693b1735928377",
> operator_subtask_index="0", pod_template_hash="5b58cdf557", 
> source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/
> 
> */27804583/*
> 
> *//*
> 
> *flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency*
> 
> /flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency{app="met-flink-taskmanager",
> host="", instance=" ", job="kubernetes-pods-insecure", 
> job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
> kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc",
> namespace=" ", operator_id="3d05135cf7d8f1375d8f655ba9d20255",
> operator_subtask_index="0", pod_template_hash="5b58cdf557", 
> quantile="0.95", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" 
> "}/
> 
> */0/*
> 
> /flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency{app="met-flink-taskmanager",
> host="", instance=" ", job="kubernetes-pods-insecure", 
> job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
> kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc",
> namespace=" ", operator_id="3d05135cf7d8f1375d8f655ba9d20255",
> operator_subtask_index="0", pod_template_hash="5b58cdf557", 
> quantile="0.98", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" 
> "}/
> 
> */0.4217/*
> 
> /flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_
> index_latency{app="met-flink-taskmanager",
> host="", instance=" ", job="kubernetes-pods-insecure", 
> job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
> kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc",
> namespace=" ", operator_id="3d05135cf7d8f1375d8f655ba9d20255",
> operator_subtask_index="0", pod_template_hash="5b58cdf557", 
> quantile="0.99", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" 
> "}/
> 
> */1/*
> 
> *//*
> 
> Could someone please explain what is the values reported here for 
> these metrics.
> 
> Thanks
> 
> Suchithra
> 



Broadcasting to multiple operators

2021-03-05 Thread Roger
Hello.
I am having an issue with a Flink 1.8 pipeline when trying to consume
broadcast state across multiple operators.  I currently
have a working pipeline that looks like the following:

records
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessGenerator(

Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
.keyBy(new ApplicationNameKeySelector())
.window(
TumblingEventTimeWindows.of(

Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
.aggregate(new Aggregator())
.connect(configurationBroadcastStream)
.process(excluder)
.addSink(KinesisProducer.createSinkFromStaticConfig(properties));

* records are a FlinkKafkaConsumer stream
* configurationBroadcastStream is a FlinkKafkaConsumer
* aggregator is an AggregateFunction
* filter is a BroadcastProcessFunction


I now have requirements to filter out transactions at the beginning of the
pipeline using the same broadcast stream I am consuming towards the end of
the pipeline. I updated the pipeline to look like this:

records
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessGenerator(

Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")
.connect(configurationBroadcastStream) **new**
.process(filter) **new**
.keyBy(new ApplicationNameKeySelector())
.window(
TumblingEventTimeWindows.of(

Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")
.aggregate(new Aggregator())
.connect(configurationBroadcastStream)
.process(excluder)
.addSink(KinesisProducer.createSinkFromStaticConfig(properties));

* records are a FlinkKafkaConsumer stream
* configurationBroadcastStream is a FlinkKafkaConsumer
* aggregator is an AggregateFunction
* excluder is a BroadcastProcessFunction

With this change, the aggregated records are not making it into the
excluder process.

1. The aggregator add is working. I can see this in the logs.
2. The aggregator getResult is never called. This makes me think this is a
window issue.
3. Both processBroadcastElement methods from the two broadcast functions
are working and
 retrieving the broadcasted state. I see this in logging.
4. The pipeline definitely worked prior to me adding in the second .connect
and .process at the beginning of the pipeline.
5. I have considered creating a new record object from the new
process(filter) that contains the config retrieved from the broadcast
stream along with the transactions and passing that down the pipeline but
that is really not desirable.

Any ideas on what might be going on here?

Thanks!
Roger


Error Starting PyFlink in Kubernetes Session Cluster "Could Not Get Rest Endpoint"

2021-03-05 Thread Robert Cullen
Trying to spin up a Python Flink instance in my Kubernetes cluster with
this configuration ...

sudo ./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-python \
-Dkubernetes.namespace=cmdaa \
-Dkubernetes.container.image=cmdaa/pyflink:0.0.1 \
--pyModule word_count \
--pyFiles /opt/flink-1.12.2/examples/python/table/batch/word_count.py

 ... But getting this error:

Traceback (most recent call last):
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py",
line 193, in _run_module_as_main
"__main__", mod_spec)
  File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py",
line 85, in _run_code
exec(code, run_globals)
  File 
"/var/folders/zz/zyxvpxvq6csfxvn_n0/T/pyflink/074c581c-5b39-4b07-ac84-55a86c46f9eb/affe3559-5364-43f1-93ef-b7f6cd9a2f77/word_count.py",
line 80, in 
word_count()
  File 
"/var/folders/zz/zyxvpxvq6csfxvn_n0/T/pyflink/074c581c-5b39-4b07-ac84-55a86c46f9eb/affe3559-5364-43f1-93ef-b7f6cd9a2f77/word_count.py",
line 74, in word_count
t_env.execute("word_count")
  File 
"/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 1276, in execute
  File 
"/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: java.lang.RuntimeException:
org.apache.flink.client.deployment.ClusterRetrieveException: Could not
get the rest endpoint of flink-python
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:102)
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:145)
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:66)
at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:75)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:347)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:317)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.deployment.ClusterRetrieveException:
Could not get the rest endpoint of flink-python
... 17 more

org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

-- 
Robert Cullen
240-475-4490


Re: [ANNOUNCE] Apache Flink 1.12.2 released

2021-03-05 Thread Piotr Nowojski
Thanks Roman and Yuan for your work and driving the release process :)

pt., 5 mar 2021 o 15:53 Till Rohrmann  napisał(a):

> Great work! Thanks a lot for being our release managers Roman and Yuan and
> to everyone who has made this release possible.
>
> Cheers,
> Till
>
> On Fri, Mar 5, 2021 at 10:43 AM Yuan Mei  wrote:
>
> > Cheers!
> >
> > Thanks, Roman, for doing the most time-consuming and difficult part of
> the
> > release!
> >
> > Best,
> >
> > Yuan
> >
> > On Fri, Mar 5, 2021 at 5:41 PM Roman Khachatryan 
> wrote:
> >
> > > The Apache Flink community is very happy to announce the release of
> > Apache
> > > Flink 1.12.2, which is the second bugfix release for the Apache Flink
> > 1.12
> > > series.
> > >
> > > Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > streaming
> > > applications.
> > >
> > > The release is available for download at:
> > > https://flink.apache.org/downloads.html
> > >
> > > Please check out the release blog post for an overview of the
> > improvements
> > > for this bugfix release:
> > > https://flink.apache.org/news/2021/03/03/release-1.12.2.html
> > >
> > > The full release notes are available in Jira:
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
> > >
> > > We would like to thank all contributors of the Apache Flink community
> who
> > > made this release possible!
> > >
> > > Special thanks to Yuan Mei for managing the release and PMC members
> > Robert
> > > Metzger, Chesnay Schepler and Piotr Nowojski.
> > >
> > > Regards,
> > > Roman
> > >
> >
>


Re: Flink KafkaProducer flushing on savepoints

2021-03-05 Thread Piotr Nowojski
Yes, that might be an issue. As far as I remember, the universal connector
works with Kafka 0.10.x or higher.

Piotrek

pt., 5 mar 2021 o 11:20 Witzany, Tomas 
napisał(a):

> Hi,
> thanks for your answer. It seems like it will not be possible for me to
> upgrade to the newer universal Flink producer, because of an older Kafka
> version I am reading from. So unfortunately for now I will have to go with
> the hack.
> Thanks
> --
> *From:* Piotr Nowojski 
> *Sent:* 03 March 2021 21:10
> *To:* Witzany, Tomas 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink KafkaProducer flushing on savepoints
>
> Hi,
>
> What Flink version and which FlinkKafkaProducer version are you using?
> `FlinkKafkaProducerBase` is no longer used in the latest version. I would
> guess some older versions, and FlinkKafkaProducer010 or later (no longer
> supported).
>
> I would suggest either to use the universal FlinkKafkaProducer
> (universal), or FliknKafkaProducer011 (if you are using a really old Flink
> version that doesn't have the universal Kafka connector). Both of those
> should work with any Kafka version and by looking at the code it seems to
> me like neither of those has the problem you mentioned. If you select
> `org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE`
> and disable checkpointing it should be still flushing records on savepoints.
>
> > The only thing I can think about is have checkpoints enabled with some
> very high periodicity so that they are never(almost) triggered. But this is
> a hack.
>
> Yes, it would be a hack. But it would work.
>
> Best,
> Piotrek
>
> wt., 2 mar 2021 o 12:09 Witzany, Tomas 
> napisał(a):
>
> Hi,
> I have a question about the at-least-once guarantees for Kafka producers
> when checkpointing is disabled. In our data pipeline we have a Flink job on
> an unlimited stream that originally, we had checkpoints turned on. Further
> this job is cancelled with a savepoint once a day to do some data pre and
> post-processing for the next day, afterwards this job is restarted from the
> savepoint.
>
> The issue we have is that we want to turn off checkpointing, since it
> does not give us much value and only creates extra IO. When this is done this
> message
> 
> shows up:
> "Flushing on checkpoint is enabled, but checkpointing is not enabled.
> Disabling flushing."
> This prompted us to investigate, and it seems that if you have
> checkpointing disabled, there are no at-least-once guarantees.
> 
>
> What about if you have no checkpointing, but you make savepoints that you
> restore from yourself? Savepoints are the same thing as checkpoints in the
> code. The flink producer makes it impossible to turn on flushing and have
> checkpointing disabled. I can see why this is the case as there is some
> extra synchronization overhead related to the flushing flag being on. Is
> there a way to have checkpointing disabled and have at least once
> guarantees on savepoints?
>
> The only thing I can think about is have checkpoints enabled with some
> very high periodicity so that they are never(almost) triggered. But this is
> a hack.
>
> Tomas Witzany
>
>


Re: Re: Independence of task parallelism

2021-03-05 Thread Piotr Nowojski
Yes, it might be the case. Hard to tell for sure without looking at the
job, metrics etc. Just be mindful of what I described, and if you want to
fine tune a job and set different parallelism values for different
operators, pay attention to where those operators are being distributed.
Usually in practice there is little reason to choose (1-2-1-1-1) over
(2-2-2-2-2). If you spread a load of some operators more than you need?
Usually not an issue. On the other hand with (2-2-2-2-2) you will spread
the load more evenly across task managers, which makes it easier to
tune/analyse/optimise.

Best,
Piotrek

pt., 5 mar 2021 o 09:10 Jan Nitschke  napisał(a):

> Hey Piotr,
>
> thanks for your answer, that makes perfect sense. However, when looking at
> the number of messages being processed, we can see that both subtasks on
> task 2 will produce the same amount of messages in the (1-2-1-1-1)
> scenario, even with the first task hitting backpressure. We assume that
> this has to do with the distribution of messages between task. As messages
> are being distributed equally among subtasks in our case, would this be an
> explanation for that behavior?
>
> Best,
> Jan
>
>
> *Gesendet:* Mittwoch, 03. März 2021 um 19:53 Uhr
> *Von:* "Piotr Nowojski" 
> *An:* "Jan Nitschke" 
> *Cc:* "user" 
> *Betreff:* Re: Independence of task parallelism
> Hi Jan,
>
> As far as I remember, Flink doesn't handle very well cases like
> (1-2-1-1-1) and two Task Managers. There are no guarantees how the
> operators/subtasks are going to be scheduled, but most likely it will be as
> you mentioned/observed. First task manager will be handling all of the
> operators, while the second task manager will only be running a single
> instance of the second operator (for load balancing reasons it would be
> better to spread the tasks across those two Task Managers more evenly).
>
> No, Flink doesn't hold any resources (appart of network buffers) per task.
> All of the available memory and CPU resources are shared across all of the
> running tasks. So in the (1-2-1-1-1) case, if the first task manager will
> be overloaded (for example if it has very few CPU cores), the second task
> will perform much better on the second task manager (which will be empty),
> causing a throughput skew. From this perspective, (2-2-2-2-2) would most
> likely be performing better, as the load would be more evenly spread.
>
> Piotrek
>
> niedz., 28 lut 2021 o 13:10 Jan Nitschke  napisał(a):
>
>> Hello,
>>
>> We are working on a project where we want to gather information about the
>> job performance across different task level parallelism settings.
>> Essentially, we want to see how the throughput of a single task varies
>> across different parallelism settings, e.g. for a job of 5 tasks: 1-1-1-1-1
>> vs. 1-2-1-1-1 vs. 2-2-2-2-2.
>>
>> *We are running flink on Kubernetes, a job with 5 tasks, slot sharing is
>> enabled, operator chasing is disabled and each task manager has one slot.*
>>
>> So, the number of task managers is always the number of the highest
>> parallelism and wen can fit the entire job into one task manager slot.
>>
>> We are then running the job against multiple parallelism configs (such as
>> those above), collect the relevant metrics and try to get some useful
>> information out of them.
>>
>> We are now wondering how independent our results are from one another.
>> More specifically, if we now look at the parallelism of the second task, is
>> its performance independent of the parallelism of the other tasks? So, will
>> a the second task perform the same in (1-2-1-1-1) as in (2-2-2-2-2)?
>>
>> Our take on it is the following: With our setup, (1-2-1-1-1) should
>> result in one task manager holding the entire job and a second task manager
>> that only runs the second task. (2-2-2-2-2) will run two task managers with
>> the entire job. So, theoretically, the second task should have much more
>> resources available in the first setup as it has the entire resources of
>> that task manager to its disposal. Does that assumption hold or will flink
>> assign a certain amount of resources to a task in a task manager no matter
>> how many other tasks are running on that same task manager slot?
>>
>> We would highly appreciate any help.
>>
>> Best,
>> Jan
>>
>


Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-05 Thread Timo Walther

Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the 
cracks.


If I understand you correctly, could would like to implement a table 
source that implements both `SupportsWatermarkPushDown` and 
`SupportsFilterPushDown`?


The current behavior might be on purpose. Filters and Watermarks are not 
very compatible. Filtering would also mean that records (from which 
watermarks could be generated) are skipped. If the filter is very 
strict, we would not generate any new watermarks and the pipeline would 
stop making progress in time.


Watermark push down is only necessary, if per-partition watermarks are 
required. Otherwise the watermarks are generated in a subsequent 
operator after the source. So you can still use rowtime without 
implementing `SupportsWatermarkPushDown` in your custom source.


I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
Bumping this up again, would appreciate any help if anyone is familiar 
with the blink planner.


Thanks,
Yuval.

On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov > wrote:


Hi Jark,
Would appreciate your help with this.

On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan mailto:ro...@apache.org>> wrote:

Hi Yuval,

I'm not familiar with the Blink planner but probably Jark can help.

Regards,
Roman


On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
mailto:yuva...@gmail.com>> wrote:

Update: When I don't set the watermark explicitly on the
TableSchema, `applyWatermarkStrategy` never gets called on
my ScanTableSource, which does make sense. But now the
question is what should be done? This feels a bit unintuitive.

On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
mailto:yuva...@gmail.com>> wrote:

Hi,
Flink 1.12.1, Blink Planner, Scala 2.12

I have the following logical plan:

  LogicalSink(table=[default_catalog.default_database.table], 
fields=[bar, baz, hello_world, a, b])
+- LogicalProject(value=[$2],
bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
hello_world=[null:VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], b=[EMPTY_MAP()])
    +- LogicalFilter(condition=[AND(=($4,
_UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
       +- LogicalWatermarkAssigner(rowtime=[bar],
watermark=[$0])
          +- LogicalTableScan(table=[[default_catalog,
default_database, foo]])

I have a custom source which creates a TableSchema based
on an external table. When I create the schema, I push
the watermark definition to the schema:

image.png

When the HepPlanner starts the optimization phase and
reaches the "PushFilterInotTableSourceScanRule", it
matches on the LogicalFilter in the definition. But
then, since the RelOptRuleOperandChildPolicy is set to
"SOME", it attempts to do a full match on the child
nodes. Since the rule is defined as so:

image.png

The child filter fails since the immediate child of the
filter is a "LocalWatermarkAssigner", and not the
"LogicalTableScan" which is the grandchild:

image.png

Is this the desired behavior? Should I create the
TableSchema without the row time attribute and use
"SupportsWatermarkPushdown" to generate the watermark
dynamically from the source record?

-- 
Best Regards,

Yuval Itzchakov.



-- 
Best Regards,

Yuval Itzchakov.



-- 
Best Regards,

Yuval Itzchakov.





New settings are not honored unless checkpoint is cleared.

2021-03-05 Thread Yordan Pavlov
Hello there,
I am running Flink 1.11.3 on Kubernetes deployment. If I change a
setting and re-deploy my Flink setup, the new setting is correctly
applied in the config file but is not being honored by Flink. In other
words, I can ssh into the pod and check the config file - it has the
new setting as I would expect. However the web interface for the job
keeps showing the old configuration and Flink as a whole keep running
with the old setting. The way to have the new setting considered is to
clear the checkpoint for the job stored in Zookeeper. Then I recover
the job using:

--fromSavepoint path_to_savepoint_or_checkpoint

My presumption is that the job configuration is stored in Zookeeper
along with other Flink data. Could someone shed some light on what I
am observing.

Thank you!


Re: Convert BIGINT to TIMESTAMP in pyflink when using datastream api

2021-03-05 Thread Timo Walther

Hi Shilpa,

Shuiqiang is right. Currently, we recommend to use SQL DDL until the 
connect API is updated. See here:


https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/create/#create-table

Especially the WATERMARK section shows how to declare a rowtime attribute.

Regards,
Timo


On 05.03.21 08:47, Shuiqiang Chen wrote:

Hi Shilpa,

There might be something wrong when defining the rowtime field with the 
Connector descriptor, it’s recommended to use SQL DDL to create tables, 
and do queries with table API.


Best,
Shuiqiang

Shilpa Shankar mailto:sshan...@bandwidth.com>> 
于2021年3月4日周四 下午9:29写道:


Hello,

We are using pyflink's datastream api v1.12.1 to consume from kafka
and want to use one of the fields to act as the "rowtime" for windowing.
We realize we need to convert BIGINT to TIMESTAMP before we use it
as "rowtime".

py4j.protocol.Py4JJavaError: An error occurred while calling o91.select.
: org.apache.flink.table.api.ValidationException: A group window
expects a time attribute for grouping in a stream environment.

But we are not sure where and how that needs to be implemented.
Some help here would be really appreciated.

Thanks,
Shilpa

import os
from pyflink.table.expressions import lit, Expression
from pyflink.datastream import StreamExecutionEnvironment,
TimeCharacteristic
from pyflink.datastream import CheckpointingMode,
ExternalizedCheckpointCleanup
from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings, CsvTableSink, TableConfig
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
from pyflink.table.window import Slide

def main():
     env = StreamExecutionEnvironment.get_execution_environment()
     env.set_parallelism(1)
     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

     env.enable_checkpointing(6, CheckpointingMode.EXACTLY_ONCE)
     config = env.get_checkpoint_config()

config.enable_externalized_checkpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)


     st_env = StreamTableEnvironment.create(
         env,

environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()

     )

     register_kafka_source(st_env)
     register_transactions_sink_into_csv(st_env)

     #Filter
     st_env.from_path("source") \

.window(Slide.over(lit(2).minutes).every(lit(1).minutes).on("rowtime").alias("w"))

\
         .group_by("customer_id, w") \
         .select("""customer_id as customer_id,
                  count(*) as total_counts,
                  w.start as start_time,
                  w.end as end_time
                  """) \
         .insert_into("sink_into_csv")

def register_kafka_source(st_env):
     # Add Source
     st_env.connect(
         Kafka() \
             .version("universal") \
             .topic("topic1") \
             .property("group.id ", "topic_consumer") \
             .property("security.protocol", "SASL_PLAINTEXT") \
             .property("sasl.mechanism", "PLAIN") \
             .property("bootstrap.servers", "") \
             .property("sasl.jaas.config", "") \
             .start_from_earliest()
     ).with_format(
         Json()
             .fail_on_missing_field(False)
             .schema(
             DataTypes.ROW([
                 DataTypes.FIELD("customer_id", DataTypes.STRING()),
                 DataTypes.FIELD("time_in_epoch_milliseconds",
DataTypes.BIGINT())
             ])
         )
     ).with_schema(
         Schema()
             .field("customer_id", DataTypes.STRING())
             .field("rowtime", DataTypes.BIGINT())
             .rowtime(
             Rowtime()
                 .timestamps_from_field("time_in_epoch_milliseconds")
                 .watermarks_periodic_bounded(10)
         )
     ).in_append_mode(
     ).create_temporary_table(
         "source"
     )


def register_transactions_sink_into_csv(env):
     result_file = "/opt/examples/data/output/output_file.csv"
     if os.path.exists(result_file):
         os.remove(result_file)
     env.register_table_sink("sink_into_csv",
                             CsvTableSink(["customer_id",
                                           "total_count",
                                           "start_time",
                                           "end_time"],
                                          [DataTypes.STRING(),
                                           DataTypes.DOUBLE(),
                                           DataTypes.TIMESTAMP(3),
                                           DataTypes.TIMESTAMP(3)],
  

Re: Need information on latency metrics

2021-03-05 Thread Timo Walther

Hi Suchithra,

did you see this section in the docs?

https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking

Regards,
Timo

On 05.03.21 15:31, V N, Suchithra (Nokia - IN/Bangalore) wrote:

Hi,

I am using flink 1.12.1 version and trying to explore latency metrics 
with Prometheus. I have enabled latency metrics by adding 
*“metrics.latency.interval: 1” *in flink-conf.yaml.


I have submitted a flink streaming job which has 
Source->flatmap->process->sink which is chained into single task. And I 
can see below latency metrics in Prometheus.


flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count

flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency

Prometheus output :

*flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count*

**

/flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count{app="met-flink-taskmanager", 
host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", 
namespace="”, operator_id="3d05135cf7d8f1375d8f655ba9d20255", 
operator_subtask_index="0", pod_template_hash="5b58cdf557", 
source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/


*/27804583/*

/flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count{app="met-flink-taskmanager", 
host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", 
namespace=" ", operator_id="570f707193e0fe32f4d86d067aba243b", 
operator_subtask_index="0", pod_template_hash="5b58cdf557", 
source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/


*/27804583/*

/flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count{app="met-flink-taskmanager", 
host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", 
namespace=" ", operator_id="ba40499bacce995f15693b1735928377", 
operator_subtask_index="0", pod_template_hash="5b58cdf557", 
source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/


*/27804583/*

*//*

*flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency*

/flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="met-flink-taskmanager", 
host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", 
namespace=" ", operator_id="3d05135cf7d8f1375d8f655ba9d20255", 
operator_subtask_index="0", pod_template_hash="5b58cdf557", 
quantile="0.95", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/


*/0/*

/flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="met-flink-taskmanager", 
host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", 
namespace=" ", operator_id="3d05135cf7d8f1375d8f655ba9d20255", 
operator_subtask_index="0", pod_template_hash="5b58cdf557", 
quantile="0.98", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/


*/0.4217/*

/flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="met-flink-taskmanager", 
host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", 
namespace=" ", operator_id="3d05135cf7d8f1375d8f655ba9d20255", 
operator_subtask_index="0", pod_template_hash="5b58cdf557", 
quantile="0.99", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}/


*/1/*

*//*

Could someone please explain what is the values reported here for these 
metrics.


Thanks

Suchithra





Re: How to emit after a merge?

2021-03-05 Thread Timo Walther
I don't know how the resulting plan for you query looks like. You can 
print it via `env.sqlQuery().explain()`. But I could imagine that by 
simplifying the query you would also simplify the number of retraction 
messages/operators in the pipeline.


Regards,
Timo


On 05.03.21 13:28, Yik San Chan wrote:

Hi Timo,

If I understand correctly, the UDF only simplifies the query, but not 
doing anything functionally different. Please correct me if I am wrong, 
thank you!


Best,
Yik San

On Thu, Mar 4, 2021 at 8:34 PM Timo Walther > wrote:


Yes, implementing a UDF might be the most convenient option for some
use
cases. The accumulator of such a UDF could take the two timestamps and
perform the two aggregations at once.

The upsert-kafka connector can apply the updates to the Kafka log. If
you enable log compaction in Kafka, Kafka will clean up the log and
make
sure to only keep the most recent one.

Regards,
Timo

On 04.03.21 11:59, Yik San Chan wrote:
 > Hi Timo,
 >
 > Thanks for the reply!
 >
 >  > You could filter the deletions manually in DataStream API
before writing
 > them to Kafka.
 >
 > Yah I agree this helps the issue, though I will need to mix up
SQL and
 > DataStream API.
 >
 >  > To simplify the query you could also investigate to implement
your own
 > aggregate function and combine the Top 2 and ListAgg into one
operation.
 >
 > Do you mean implement an UDF to do so?
 >
 > Besides, is 'upsert-kafka' connector designed for this use case?
 >
 > Thank you.
 >
 > On Thu, Mar 4, 2021 at 4:41 PM Timo Walther mailto:twal...@apache.org>
 > >> wrote:
 >
 >     Hi Yik,
 >
 >     if I understand you correctly you would like to avoid the
deletions in
 >     your stream?
 >
 >     You could filter the deletions manually in DataStream API before
 >     writing
 >     them to Kafka. Semantically the deletions are required to
produce a
 >     correct result because the runtime is not aware of a key for
idempotent
 >     updates.
 >
 >     To simplify the query you could also investigate to implement
your own
 >     aggregate function and combine the Top 2 and ListAgg into one
operation.
 >
 >     Regards,
 >     Timo
 >
 >     On 28.02.21 09:55, Yik San Chan wrote:
 >      > I define a `Transaction` class:
 >      >
 >      > ```scala
 >      > case class Transaction(accountId: Long, amount: Long,
timestamp:
 >     Long)
 >      > ```
 >      >
 >      > The `TransactionSource` simply emits `Transaction` with
some time
 >      > interval. Now I want to compute the last 2 transaction
timestamp
 >     of each
 >      > account id, see code below:
 >      >
 >      > ```scala
 >      > import org.apache.flink.streaming.api.scala.{DataStream,
 >      > StreamExecutionEnvironment, _}
 >      > import org.apache.flink.table.api.EnvironmentSettings
 >      > import
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 >      > import org.apache.flink.walkthrough.common.entity.Transaction
 >      > import
org.apache.flink.walkthrough.common.source.TransactionSource
 >      >
 >      > object LastNJob {
 >      >
 >      >    final val QUERY =
 >      >      """
 >      >        |WITH last_n AS (
 >      >        |    SELECT accountId, `timestamp`
 >      >        |    FROM (
 >      >        |        SELECT *,
 >      >        |            ROW_NUMBER() OVER (PARTITION BY accountId
 >     ORDER BY
 >      > `timestamp` DESC) AS row_num
 >      >        |        FROM transactions
 >      >        |    )
 >      >        |    WHERE row_num <= 2
 >      >        |)
 >      >        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
 >      > last2_timestamp
 >      >        |FROM last_n
 >      >        |GROUP BY accountId
 >      >        |""".stripMargin
 >      >
 >      >    def main(args: Array[String]): Unit = {
 >      >      val settings: EnvironmentSettings =
 >      >
 >   
  EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

 >      >      val streamEnv: StreamExecutionEnvironment =
 >      > StreamExecutionEnvironment.getExecutionEnvironment
 >      >      val tableEnv: StreamTableEnvironment =
 >      > StreamTableEnvironment.create(streamEnv, settings)
 >      >
 >      >      val txnStream: DataStream[Transaction] = streamEnv
 >      >        .addSource(new TransactionSource)
 >      >        .name("transactions")
 >      >
 >      >      tableEnv.createTemporaryView("transactions", txnStream)
 >      >

Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-05 Thread Till Rohrmann
Thanks for this proposal Guowei. +1 for it.

Concerning the default size, maybe we can run some experiments and see how
the system behaves with different pool sizes.

Cheers,
Till

On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen  wrote:

> Thanks Guowei, for the proposal.
>
> As discussed offline already, I think this sounds good.
>
> One thought is that 16m sounds very small for a default read buffer pool.
> How risky do you think it is to increase this to 32m or 64m?
>
> Best,
> Stephan
>
> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma  wrote:
>
>> Hi, all
>>
>>
>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>> out-of-the-box experience of using TM merge shuffle is not very good. The
>> main reason is that the default configuration always makes users encounter
>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>> to avoid the problem.
>> Goals
>>
>>1. Don't affect the streaming and pipelined-shuffle-only batch setups.
>>2. Don't mix memory with different life cycle in the same pool. E.g.,
>>write buffers needed by running tasks and read buffer needed even after
>>tasks being finished.
>>3. User can use the TM merge shuffle with default memory
>>configurations. (May need further tunings for performance optimization, 
>> but
>>should not fail with the default configurations.)
>>
>> Proposal
>>
>>1. Introduce a configuration `taskmanager.memory.network.batch-read`
>>to specify the size of this memory pool. The default value is 16m.
>>2. Allocate the pool lazily. It means that the memory pool would be
>>allocated when the TM merge shuffle is used at the first time.
>>3. This pool size will not be add up to the TM's total memory size,
>>but will be considered part of
>>`taskmanager.memory.framework.off-heap.size`. We need to check that the
>>pool size is not larger than the framework off-heap size, if TM merge
>>shuffle is enabled.
>>
>>
>> In this default configuration, the allocation of the memory pool is
>> almost impossible to fail. Currently the default framework’s off-heap
>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>> copy, the usage of it has been reduced, and you can refer to the detailed
>> data [2].
>> Known Limitation
>> Usability for increasing the memory pool size
>>
>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>> at the same time. It also means that once the user forgets this, it is
>> likely to fail the check when allocating the memory pool.
>>
>>
>> So in the following two situations, we will still prompt the user to
>> increase the size of `framework.off-heap.size`.
>>
>>1. `taskmanager.memory.network.batch-read` is bigger than
>>`taskmanager.memory.framework.off-heap.size`
>>2. Allocating the pool encounters the OOM.
>>
>>
>> An alternative is that when the user adjusts the size of the memory pool,
>> the system automatically adjusts it. But we are not entierly sure about
>> this, given its implicity and complicating the memory configurations.
>> Potential memory waste
>>
>> In the first step, the memory pool will not be released once allocated. This
>> means in the first step, even if there is no subsequent batch job, the
>> pooled memory cannot be used by other consumers.
>>
>>
>> We are not releasing the pool in the first step due to the concern that
>> frequently allocating/deallocating the entire pool may increase the GC
>> pressue. Investitations on how to dynamically release the pool when it's no
>> longer needed is considered a future follow-up.
>>
>>
>> Looking forward to your feedback.
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>
>> [2] https://github.com/apache/flink/pull/7368.
>> Best,
>> Guowei
>>
>


Re: [ANNOUNCE] Apache Flink 1.12.2 released

2021-03-05 Thread Till Rohrmann
Great work! Thanks a lot for being our release managers Roman and Yuan and
to everyone who has made this release possible.

Cheers,
Till

On Fri, Mar 5, 2021 at 10:43 AM Yuan Mei  wrote:

> Cheers!
>
> Thanks, Roman, for doing the most time-consuming and difficult part of the
> release!
>
> Best,
>
> Yuan
>
> On Fri, Mar 5, 2021 at 5:41 PM Roman Khachatryan  wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink 1.12.2, which is the second bugfix release for the Apache Flink
> 1.12
> > series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements
> > for this bugfix release:
> > https://flink.apache.org/news/2021/03/03/release-1.12.2.html
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Special thanks to Yuan Mei for managing the release and PMC members
> Robert
> > Metzger, Chesnay Schepler and Piotr Nowojski.
> >
> > Regards,
> > Roman
> >
>


Need information on latency metrics

2021-03-05 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi,

I am using flink 1.12.1 version and trying to explore latency metrics with 
Prometheus. I have enabled latency metrics by adding "metrics.latency.interval: 
1" in flink-conf.yaml.
I have submitted a flink streaming job which has Source->flatmap->process->sink 
which is chained into single task. And I can see below latency metrics in 
Prometheus.

flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency

Prometheus output :
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count

flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count{app="met-flink-taskmanager",
 host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", namespace="", 
operator_id="3d05135cf7d8f1375d8f655ba9d20255", operator_subtask_index="0", 
pod_template_hash="5b58cdf557", source_id="cbc357ccb763df2852fee8c4fc7d55f2", 
tm_id=" "}
27804583
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count{app="met-flink-taskmanager",
 host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", namespace=" ", 
operator_id="570f707193e0fe32f4d86d067aba243b", operator_subtask_index="0", 
pod_template_hash="5b58cdf557", source_id="cbc357ccb763df2852fee8c4fc7d55f2", 
tm_id=" "}
27804583
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_count{app="met-flink-taskmanager",
 host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", namespace=" ", 
operator_id="ba40499bacce995f15693b1735928377", operator_subtask_index="0", 
pod_template_hash="5b58cdf557", source_id="cbc357ccb763df2852fee8c4fc7d55f2", 
tm_id=" "}
27804583

flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency

flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="met-flink-taskmanager",
 host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", namespace=" ", 
operator_id="3d05135cf7d8f1375d8f655ba9d20255", operator_subtask_index="0", 
pod_template_hash="5b58cdf557", quantile="0.95", 
source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}
0
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="met-flink-taskmanager",
 host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", namespace=" ", 
operator_id="3d05135cf7d8f1375d8f655ba9d20255", operator_subtask_index="0", 
pod_template_hash="5b58cdf557", quantile="0.98", 
source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}
0.4217
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="met-flink-taskmanager",
 host="", instance=" ", job="kubernetes-pods-insecure", 
job_id="3ad0b4c814836aea92c48f6baf44b8bb", job_name=" ", 
kubernetes_pod_name="met-flink-taskmanager-5b58cdf557-l24tc", namespace=" ", 
operator_id="3d05135cf7d8f1375d8f655ba9d20255", operator_subtask_index="0", 
pod_template_hash="5b58cdf557", quantile="0.99", 
source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id=" "}
1

Could someone please explain what is the values reported here for these metrics.
Thanks
Suchithra



Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-05 Thread Stephan Ewen
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool.
How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma  wrote:

> Hi, all
>
>
> In the Flink 1.12 we introduce the TM merge shuffle. But the
> out-of-the-box experience of using TM merge shuffle is not very good. The
> main reason is that the default configuration always makes users encounter
> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
> to avoid the problem.
> Goals
>
>1. Don't affect the streaming and pipelined-shuffle-only batch setups.
>2. Don't mix memory with different life cycle in the same pool. E.g.,
>write buffers needed by running tasks and read buffer needed even after
>tasks being finished.
>3. User can use the TM merge shuffle with default memory
>configurations. (May need further tunings for performance optimization, but
>should not fail with the default configurations.)
>
> Proposal
>
>1. Introduce a configuration `taskmanager.memory.network.batch-read`
>to specify the size of this memory pool. The default value is 16m.
>2. Allocate the pool lazily. It means that the memory pool would be
>allocated when the TM merge shuffle is used at the first time.
>3. This pool size will not be add up to the TM's total memory size,
>but will be considered part of
>`taskmanager.memory.framework.off-heap.size`. We need to check that the
>pool size is not larger than the framework off-heap size, if TM merge
>shuffle is enabled.
>
>
> In this default configuration, the allocation of the memory pool is almost
> impossible to fail. Currently the default framework’s off-heap memory is
> 128m, which is mainly used by Netty. But after we introduced zero copy, the
> usage of it has been reduced, and you can refer to the detailed data [2].
> Known Limitation
> Usability for increasing the memory pool size
>
> In addition to increasing `taskmanager.memory.network.batch-read`, the
> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
> at the same time. It also means that once the user forgets this, it is
> likely to fail the check when allocating the memory pool.
>
>
> So in the following two situations, we will still prompt the user to
> increase the size of `framework.off-heap.size`.
>
>1. `taskmanager.memory.network.batch-read` is bigger than
>`taskmanager.memory.framework.off-heap.size`
>2. Allocating the pool encounters the OOM.
>
>
> An alternative is that when the user adjusts the size of the memory pool,
> the system automatically adjusts it. But we are not entierly sure about
> this, given its implicity and complicating the memory configurations.
> Potential memory waste
>
> In the first step, the memory pool will not be released once allocated. This
> means in the first step, even if there is no subsequent batch job, the
> pooled memory cannot be used by other consumers.
>
>
> We are not releasing the pool in the first step due to the concern that
> frequently allocating/deallocating the entire pool may increase the GC
> pressue. Investitations on how to dynamically release the pool when it's no
> longer needed is considered a future follow-up.
>
>
> Looking forward to your feedback.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20740
>
> [2] https://github.com/apache/flink/pull/7368.
> Best,
> Guowei
>


Re: How to emit after a merge?

2021-03-05 Thread Yik San Chan
Hi Timo,

If I understand correctly, the UDF only simplifies the query, but not doing
anything functionally different. Please correct me if I am wrong, thank you!

Best,
Yik San

On Thu, Mar 4, 2021 at 8:34 PM Timo Walther  wrote:

> Yes, implementing a UDF might be the most convenient option for some use
> cases. The accumulator of such a UDF could take the two timestamps and
> perform the two aggregations at once.
>
> The upsert-kafka connector can apply the updates to the Kafka log. If
> you enable log compaction in Kafka, Kafka will clean up the log and make
> sure to only keep the most recent one.
>
> Regards,
> Timo
>
> On 04.03.21 11:59, Yik San Chan wrote:
> > Hi Timo,
> >
> > Thanks for the reply!
> >
> >  > You could filter the deletions manually in DataStream API before
> writing
> > them to Kafka.
> >
> > Yah I agree this helps the issue, though I will need to mix up SQL and
> > DataStream API.
> >
> >  > To simplify the query you could also investigate to implement your own
> > aggregate function and combine the Top 2 and ListAgg into one operation.
> >
> > Do you mean implement an UDF to do so?
> >
> > Besides, is 'upsert-kafka' connector designed for this use case?
> >
> > Thank you.
> >
> > On Thu, Mar 4, 2021 at 4:41 PM Timo Walther  > > wrote:
> >
> > Hi Yik,
> >
> > if I understand you correctly you would like to avoid the deletions
> in
> > your stream?
> >
> > You could filter the deletions manually in DataStream API before
> > writing
> > them to Kafka. Semantically the deletions are required to produce a
> > correct result because the runtime is not aware of a key for
> idempotent
> > updates.
> >
> > To simplify the query you could also investigate to implement your
> own
> > aggregate function and combine the Top 2 and ListAgg into one
> operation.
> >
> > Regards,
> > Timo
> >
> > On 28.02.21 09:55, Yik San Chan wrote:
> >  > I define a `Transaction` class:
> >  >
> >  > ```scala
> >  > case class Transaction(accountId: Long, amount: Long, timestamp:
> > Long)
> >  > ```
> >  >
> >  > The `TransactionSource` simply emits `Transaction` with some time
> >  > interval. Now I want to compute the last 2 transaction timestamp
> > of each
> >  > account id, see code below:
> >  >
> >  > ```scala
> >  > import org.apache.flink.streaming.api.scala.{DataStream,
> >  > StreamExecutionEnvironment, _}
> >  > import org.apache.flink.table.api.EnvironmentSettings
> >  > import
> org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> >  > import org.apache.flink.walkthrough.common.entity.Transaction
> >  > import
> org.apache.flink.walkthrough.common.source.TransactionSource
> >  >
> >  > object LastNJob {
> >  >
> >  >final val QUERY =
> >  >  """
> >  >|WITH last_n AS (
> >  >|SELECT accountId, `timestamp`
> >  >|FROM (
> >  >|SELECT *,
> >  >|ROW_NUMBER() OVER (PARTITION BY accountId
> > ORDER BY
> >  > `timestamp` DESC) AS row_num
> >  >|FROM transactions
> >  >|)
> >  >|WHERE row_num <= 2
> >  >|)
> >  >|SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
> >  > last2_timestamp
> >  >|FROM last_n
> >  >|GROUP BY accountId
> >  >|""".stripMargin
> >  >
> >  >def main(args: Array[String]): Unit = {
> >  >  val settings: EnvironmentSettings =
> >  >
> >
>  EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >  >  val streamEnv: StreamExecutionEnvironment =
> >  > StreamExecutionEnvironment.getExecutionEnvironment
> >  >  val tableEnv: StreamTableEnvironment =
> >  > StreamTableEnvironment.create(streamEnv, settings)
> >  >
> >  >  val txnStream: DataStream[Transaction] = streamEnv
> >  >.addSource(new TransactionSource)
> >  >.name("transactions")
> >  >
> >  >  tableEnv.createTemporaryView("transactions", txnStream)
> >  >
> >  >  tableEnv.executeSql(QUERY).print()
> >  >}
> >  > }
> >  > ```
> >  >
> >  > When I run the program, I get:
> >  >
> >  > ```
> >  > ++--++
> >  > | op |accountId |last2_timestamp |
> >  > ++--++
> >  > | +I |1 |  154627200 |
> >  > | +I |2 |  154627236 |
> >  > | +I |3 |  154627272 |
> >  > | +I |4 |  154627308 |
> >  > | +I |5 |  

回复:消息队列量级特别如何优化消费

2021-03-05 Thread allanqinjy
感谢各位的回答


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 19:23,smq<374060...@qq.com> 写道:
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web界面查看哪个算子导致的,然后优化就可以了



发自我的iPhone


-- 原始邮件 --
发件人: 刘建刚 

回复:消息队列量级特别如何优化消费

2021-03-05 Thread smq
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web界面查看哪个算子导致的,然后优化就可以了



发自我的iPhone


-- 原始邮件 --
发件人: 刘建刚 

回复:消息队列量级特别如何优化消费

2021-03-05 Thread smq
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web界面查看哪个算子导致的,然后优化就可以了



发自我的iPhone


-- 原始邮件 --
发件人: 刘建刚 

Reply:消息队列量级特别如何优化消费

2021-03-05 Thread smq
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系



发自我的iPhone


-- 原始邮件 --
From: 刘建刚 

[sql]TimeStamp和异常格式的字符串进行比较时会报空指针

2021-03-05 Thread silence
问题描述:
TimeStamp类型和异常格式的字符串进行比较时会在任务运行时报空指针
像这种错误虽然是用户书写错误导致的,但运行时才能发现问题,且sql太长时不好定位具体原因
是否可以在编译期进行类型的验证,尽早发现问题并给出sql的文本坐标

例:where CURRENT_TIMESTAMP=''
  where CURRENT_TIMESTAMP='19700101'

java.lang.NullPointerException: null
at
org.apache.flink.table.data.TimestampData.compareTo(TimestampData.java:112)
at StreamExecCalc$4.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:76)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink KafkaProducer flushing on savepoints

2021-03-05 Thread Witzany, Tomas
Hi,
thanks for your answer. It seems like it will not be possible for me to upgrade 
to the newer universal Flink producer, because of an older Kafka version I am 
reading from. So unfortunately for now I will have to go with the hack.
Thanks

From: Piotr Nowojski 
Sent: 03 March 2021 21:10
To: Witzany, Tomas 
Cc: user@flink.apache.org 
Subject: Re: Flink KafkaProducer flushing on savepoints

Hi,

What Flink version and which FlinkKafkaProducer version are you using?  
`FlinkKafkaProducerBase` is no longer used in the latest version. I would guess 
some older versions, and FlinkKafkaProducer010 or later (no longer supported).

I would suggest either to use the universal FlinkKafkaProducer (universal), or 
FliknKafkaProducer011 (if you are using a really old Flink version that doesn't 
have the universal Kafka connector). Both of those should work with any Kafka 
version and by looking at the code it seems to me like neither of those has the 
problem you mentioned. If you select 
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE`
 and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some very 
> high periodicity so that they are never(almost) triggered. But this is a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas 
mailto:tomas.witz...@blindspot.ai>> napisał(a):
Hi,
I have a question about the at-least-once guarantees for Kafka producers when 
checkpointing is disabled. In our data pipeline we have a Flink job on an 
unlimited stream that originally, we had checkpoints turned on. Further this 
job is cancelled with a savepoint once a day to do some data pre and 
post-processing for the next day, afterwards this job is restarted from the 
savepoint.

The issue we have is that we want to turn off checkpointing, since it does not 
give us much value and only creates extra IO. When this is done this 
message
 shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling 
flushing."
This prompted us to investigate, and it seems that if you have checkpointing 
disabled, there are no at-least-once guarantees. 


What about if you have no checkpointing, but you make savepoints that you 
restore from yourself? Savepoints are the same thing as checkpoints in the 
code. The flink producer makes it impossible to turn on flushing and have 
checkpointing disabled. I can see why this is the case as there is some extra 
synchronization overhead related to the flushing flag being on. Is there a way 
to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very 
high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany


Re: 消息队列量级特别如何优化消费

2021-03-05 Thread 刘建刚
本质原因是作业资源不足无法处理大量数据,好像只有扩大并发来解决了。


allanqinjy  于2021年3月5日周五 上午10:48写道:

>
>
> hi,
>   由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>
>
> | |
> allanqinjy
> |
> |
> allanqi...@163.com
> |
> 签名由网易邮箱大师定制
>
>


?????? flink sql??????????????io??????????

2021-03-05 Thread ????
??




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join
flink sql?? temporal-table join io

casel.chen 

Re: [ANNOUNCE] Apache Flink 1.12.2 released

2021-03-05 Thread Yuan Mei
Cheers!

Thanks, Roman, for doing the most time-consuming and difficult part of the
release!

Best,

Yuan

On Fri, Mar 5, 2021 at 5:41 PM Roman Khachatryan  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.12.2, which is the second bugfix release for the Apache Flink 1.12
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/03/03/release-1.12.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Special thanks to Yuan Mei for managing the release and PMC members Robert
> Metzger, Chesnay Schepler and Piotr Nowojski.
>
> Regards,
> Roman
>


[ANNOUNCE] Apache Flink 1.12.2 released

2021-03-05 Thread Roman Khachatryan
The Apache Flink community is very happy to announce the release of Apache
Flink 1.12.2, which is the second bugfix release for the Apache Flink 1.12
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/03/03/release-1.12.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Special thanks to Yuan Mei for managing the release and PMC members Robert
Metzger, Chesnay Schepler and Piotr Nowojski.

Regards,
Roman


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-05 Thread Xintong Song
Hi Hemant,

This exception generally suggests that JVM is running out of heap memory.
Per the official documentation [1], the amount of live data barely fits
into the Java heap having little free space for new allocations.

You can try to increase the heap size following these guides [2].

If a memory leak is suspected, to further understand where the memory is
consumed, you may need to dump the heap on OOMs and looking for unexpected
memory usages leveraging profiling tools.

Thank you~

Xintong Song


[1]
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html



On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:

> Hi,
>
> Getting the below OOM but the job failed 4-5 times and recovered from
> there.
>
> j
>
>
>
>
>
>
>
> *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
> exceededat
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)Caused by:
> java.lang.OutOfMemoryError: GC overhead limit exceeded*
>
> Is there any way I can debug this. since the job after a few re-starts
> started running fine. what could be the reason behind this.
>
> Thanks,
> Hemant
>


flink dataStream多次sink DAG重复驱动执行?

2021-03-05 Thread lp


有个疑问,
如下程序片段:

--
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);
properties.setProperty("group.id",kafkaOdsGroup);
properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset);
   
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,kafkaOdsPartitionDiscoverInterval);
   
properties.setProperty("transaction.timeout.ms",KafkaOdsTransactionTimeout);//kafka事务超时时间

FlinkKafkaConsumer flinkKafkaConsumer = new
FlinkKafkaConsumer<>(kafkaOdsTopic,new SimpleStringSchema(),properties);
DataStreamSource dataStreamSource =
env.addSource(flinkKafkaConsumer);
dataStreamSource.printToErr("1");
dataStreamSource.printToErr("2");
dataStreamSource.printToErr("3");



我对一个datastream进行多次相同操作的sink,请问是否会导致上游整个DAG重复驱动执行,基于spark的惯性思维,我认为上游DAG是会重复驱动执行的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Multiple JobManager HA set up for Standalone Kubernetes

2021-03-05 Thread Yang Wang
Hi deepthi,

Thanks for trying the Kubernetes HA service.

> Do I need standby JobManagers?
I think the answer is based on your production requirements. Usually, it is
unnecessary to have more than one JobManagers.
Because we are using the Kubernetes deployment to manage the JobManager.
Once it crashed exceptionally, a new one will be launched
But if you want to get the recovery faster, then the standby JobManagers
could help, especially for the Kubernetes node failure. We could
save the time for scheduling and launching a new pod.

> Why the multiple JobManagers could not work with HA service?
I notice that all the JobManagers are using the "flink-jobmanager" service
name for rpc address. This should not happen. Instead, starting
JobManager with pod IP is the correct way. You could find how to set the
pod IP for "jobmanager.rpc.address" here[1]. The most important
change is to add the "--host", "$(_POD_IP_ADDRESS)" in the args.

[1].
https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715


Best,
Yang


deepthi Sridharan  于2021年3月5日周五 下午2:26写道:

> I am trying to figure out the right architecture for running Standalone
> Kubernetes with Job manager HA. The documentation
>  
> for
> running HA seems to always suggest that there needs to be multiple
> job managers, but there isn't much instructions on how we can set that up
> since most deployment recommendations suggest running a single Job Manager.
> A few talks I found online, only refer to multiple JobManagers instances
> when running with Zookeeper HA and the ones for K8S HA seem to all have 1
> JobManager.
>
> I tried running a setup with replicaSet 2 for JobManager and I could see
> one of them getting leadership for a previously submitted job (from when
> replicaSet was 1).
>
> 2021-03-04 18:34:19,282 INFO
>  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] -
> http://flink-jobmanager:8081 was granted leadership with
> leaderSessionID=3e9a9d16-dc30-4ee1-9556-43b199db826d
> 2021-03-04 18:34:20,773 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected e1f3af04-073b-4ab8-9261-9a18b3bf85d7 for
> flink-dks-ha1-dispatcher-leader.
> 2021-03-04 18:34:20,786 INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Start SessionDispatcherLeaderProcess.
> 2021-03-04 18:34:20,787 INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Recover all persisted job graphs.
> 2021-03-04 18:34:20,794 INFO
>  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] -
> Retrieved job ids [7dc4e15f392ccd826a0bc95e8755b410] from
> KubernetesStateHandleStore{configMapName='flink-dks-ha1-dispatcher-leader'}
> 2021-03-04 18:34:20,794 INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Trying to recover job with job id 7dc4e15f392ccd826a0bc95e8755b410.
> 2021-03-04 18:34:20,862 INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Successfully recovered 1 persisted job graphs.
> 2021-03-04 18:34:21,134 INFO
>  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Recovering checkpoints from
> KubernetesStateHandleStore{configMapName='flink-dks-ha1-7dc4e15f392ccd826a0bc95e8755b410-jobmanager-leader'}.
> 2021-03-04 18:34:21,145 INFO
>  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Found 5 checkpoints in
> KubernetesStateHandleStore{configMapName='flink-dks-ha1-7dc4e15f392ccd826a0bc95e8755b410-jobmanager-leader'}.
>
> But, after killing the leader JobManager instance with
>
> kubectl exec {jobmanager_pod_name} -- /bin/sh -c "kill 1"
>
>
> I don't see the task managers being able to resume processing. I have 2
> Task manager pods and they both seem to be stuck here
>
> 2021-03-04 19:57:47,647
> INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] -
> Using predefined options: DEFAULT.
>
> 2021-03-04 19:57:47,648
> INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] -
> Using default options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={}}.
>
> 2021-03-04 19:57:47,648
> INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] -
> Getting managed memory shared cache for RocksDB.
>
> 2021-03-04 19:57:47,648
> INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] -
> Obtained shared RocksDB cache of size 268435460 bytes
>
> while the same test with 1 JobManager shows the task managers successfully
> communicating to the restarted job manager and getting their assignments
> and start processing from saved checkpoints.
>
> Is multiple JobManagers not the recommended way to run JobManager HA for
> Kubernetes? If it is, is there any documentation on how to run multiple 

java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-05 Thread bat man
Hi,

Getting the below OOM but the job failed 4-5 times and recovered from there.

j







*ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
exceededat
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)Caused by:
java.lang.OutOfMemoryError: GC overhead limit exceeded*

Is there any way I can debug this. since the job after a few re-starts
started running fine. what could be the reason behind this.

Thanks,
Hemant


Aw: Re: Independence of task parallelism

2021-03-05 Thread Jan Nitschke
Hey Piotr, 

 

thanks for your answer, that makes perfect sense. However, when looking at the number of messages being processed, we can see that both subtasks on task 2 will produce the same amount of messages in the (1-2-1-1-1) scenario, even with the first task hitting backpressure. We assume that this has to do with the distribution of messages between task. As messages are being distributed equally among subtasks in our case, would this be an explanation for that behavior?

 

Best, 
Jan

 
 

Gesendet: Mittwoch, 03. März 2021 um 19:53 Uhr
Von: "Piotr Nowojski" 
An: "Jan Nitschke" 
Cc: "user" 
Betreff: Re: Independence of task parallelism


Hi Jan,
 

As far as I remember, Flink doesn't handle very well cases like (1-2-1-1-1) and two Task Managers. There are no guarantees how the operators/subtasks are going to be scheduled, but most likely it will be as you mentioned/observed. First task manager will be handling all of the operators, while the second task manager will only be running a single instance of the second operator (for load balancing reasons it would be better to spread the tasks across those two Task Managers more evenly).

 

No, Flink doesn't hold any resources (appart of network buffers) per task. All of the available memory and CPU resources are shared across all of the running tasks. So in the (1-2-1-1-1) case, if the first task manager will be overloaded (for example if it has very few CPU cores), the second task will perform much better on the second task manager (which will be empty), causing a throughput skew. From this perspective, (2-2-2-2-2) would most likely be performing better, as the load would be more evenly spread.  

 

Piotrek 

 


niedz., 28 lut 2021 o 13:10 Jan Nitschke  napisał(a):


Hello, 
 

We are working on a project where we want to gather information about the job performance across different task level parallelism settings.

Essentially, we want to see how the throughput of a single task varies across different parallelism settings, e.g. for a job of 5 tasks: 1-1-1-1-1 vs. 1-2-1-1-1 vs. 2-2-2-2-2. 

 

We are running flink on Kubernetes, a job with 5 tasks, slot sharing is enabled, operator chasing is disabled and each task manager has one slot.

 

So, the number of task managers is always the number of the highest parallelism and wen can fit the entire job into one task manager slot. 

 

We are then running the job against multiple parallelism configs (such as those above), collect the relevant metrics and try to get some useful information out of them. 

 

We are now wondering how independent our results are from one another. More specifically, if we now look at the parallelism of the second task, is its performance independent of the parallelism of the other tasks? So, will a the second task perform the same in (1-2-1-1-1) as in (2-2-2-2-2)? 

 

Our take on it is the following: With our setup, (1-2-1-1-1) should result in one task manager holding the entire job and a second task manager that only runs the second task. (2-2-2-2-2) will run two task managers with the entire job. So, theoretically, the second task should have much more resources available in the first setup as it has the entire resources of that task manager to its disposal. Does that assumption hold or will flink assign a certain amount of resources to a task in a task manager no matter how many other tasks are running on that same task manager slot? 

 

We would highly appreciate any help. 

 

Best, 

Jan