Re: Flink with HBase

2019-05-15 Thread Chesnay Schepler
Unless I'm blind Flink does not provide an HBase sink, so it depends on 
the implementation you're using.


On 15/05/2019 20:17, Nikhil Goyal wrote:

Hi,

Does flink support exactly once processing using HBase as sink? I am 
not able to find any documentation supporting the same.


Thanks
Nikhil





Re: User Interface not showing the actual count received and produced

2019-05-15 Thread Chesnay Schepler
Flink currently does not measure incoming records for sources and 
outgoing records for sink, see 
https://issues.apache.org/jira/browse/FLINK-7286.


On 15/05/2019 19:43, PoolakkalMukkath, Shakir wrote:


Hi Flink team,

I am developing a flow which uses

·FlinkKafkaConsumer010 to consume message from Kafka  and

·FlinkKinesisProducer to produce the results to Kinesis.

In the user interface, I always see Bytes and Record received from 
Kafka is zero even though it is receiving events and processing.  And 
same with Kinesis Sink, Bytes and record sent is always zero even 
though it is posting events to Kinesis.


Any reason why my UI is not showing the actual count ? Thanks

Thanks,

Shakir





Re: flink 1.4.2. java.lang.IllegalStateException: Could not initialize operator state backend

2019-05-15 Thread anaray
Thank You Andrey. Arity of the job has not changed. Here issue is that job
will run for sometime (with checkpoint enabled) and then after some time
will get into above exception. The job keeps restarting afterwards. 

One thing that I want point out here is that we have a custom *serialization
schema* attached to *FlinkKafkaConsumer010*.  After going through
FLINK-8836, I doubt if real issue is with kryo instances being shared across
threads?

Thanks,







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Manish Bellani
Thanks, Ken. That makes sense! I'll start a new thread.

On Wed, May 15, 2019 at 7:12 PM Ken Krugler 
wrote:

> Hi Manish,
>
> It’s best to start a new thread if you have a new question - see
> https://home.apache.org/~hossman/#threadhijack for reasons why…
>
> Regards,
>
> — Ken
>
>
> On May 15, 2019, at 4:46 PM, Manish Bellani 
> wrote:
>
> Hi Ken,
>
> Thanks for the quick response, you are actually right, the job seems to be
> running even after that error appears. It was crashing earlier (due to
> fs.s3a.multipart.size being too high) and I confused it with this error
> since that was the first one popping out and OOM wasn't apparent
> immediately.
>
> I do have a subsequent question though if you don't mind me asking this
> question in the same thread. So... if I'm reading the  BucketingSink code
> correctly then if I supply the core-site.xml with following contents, would
> it not pick the *S3RecoverableWriter* code path?:
>
> 
>
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
>
> 
> fs.s3a.fast.upload
> true
> 
> Use the incremental block upload mechanism with
> the buffering mechanism set in fs.s3a.fast.upload.buffer.
> The number of threads performing uploads in the filesystem is
> defined
> by fs.s3a.threads.max; the queue of waiting uploads limited by
> fs.s3a.max.total.tasks.
> The size of each buffer is set by fs.s3a.multipart.size.
> 
> 
>
> 
> fs.s3a.fast.upload.buffer
> array
> 
> The buffering mechanism to use when using S3A fast upload
> (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
> This configuration option has no effect if fs.s3a.fast.upload
> is false.
>
> "disk" will use the directories listed in fs.s3a.buffer.dir as
> the location(s) to save data prior to being uploaded.
>
> "array" uses arrays in the JVM heap
>
> "bytebuffer" uses off-heap memory within the JVM.
>
> Both "array" and "bytebuffer" will consume memory in a single
> stream up to the number
> of blocks set by:
>
> fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
>
> If using either of these mechanisms, keep this value low
>
> The total number of threads performing work across all threads
> is set by
> fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting
> the number of queued
> work items.
> 
> 
>
> 
> fs.s3a.multipart.size
> 10M
> How big (in bytes) to split upload or copy operations
> up into.
> A suffix from the set {K,M,G,T,P} may be used to scale the
> numeric value.
> 
> 
>
> 
> fs.s3a.fast.upload.active.blocks
> 8
> 
> Maximum Number of blocks a single output stream can have
> active (uploading, or queued to the central FileSystem
> instance's pool of queued operations.
>
> This stops a single stream overloading the shared thread pool.
> 
> 
>
> 
>   fs.s3a.aws.credentials.provider
>
> com.amazonaws.auth.EnvironmentVariableCredentialsProvider
> 
>
> 
>
> I say that because I don't see any files being written under `/tmp`
> directory with the pattern like ".tmp_UUID", which what
> RefCountedTmpFileCreator is supposed to create for staging writes to s3
> (which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):
>
> public RefCountedFile apply(File file) throws IOException {
>File directory = this.tempDirectories[this.nextIndex()];
>
> while(true) {
> try {
> if (file == null) {
> File newFile = new File(directory, ".tmp_" +
> UUID.randomUUID());
> OutputStream out =
> Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
> return RefCountedFile.newFile(newFile, out);
> }
>
> OutputStream out = Files.newOutputStream(file.toPath(),
> StandardOpenOption.APPEND);
> return RefCountedFile.restoredFile(file, out,
> file.length());
> } catch (FileAlreadyExistsException var5) {
> }
> }
> }
>
>
> Is S3RecoverableWriter path even supported for BucketingSink?
>
> Manish
>
>
> On Wed, May 15, 2019 at 6:05 PM Ken Krugler 
> wrote:
>
>> Hi Manish,
>>
>> Are you sure this is an exception that’s actually killing the job?
>>
>> Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks
>> about Commons Beanutils logging this exception, but it’s a warning vs.
>> something being thrown up the stack.
>>
>> — Ken
>>
>> On May 15, 2019, at 3:50 PM, Manish Bellani 
>> wrote:
>>
>> hey Friends,
>>
>> Thanks for all the work you have been doing on flink, I have been trying
>>

Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Ken Krugler
Hi Manish,

It’s best to start a new thread if you have a new question - see 
https://home.apache.org/~hossman/#threadhijack 
 for reasons why…

Regards,

— Ken


> On May 15, 2019, at 4:46 PM, Manish Bellani  wrote:
> 
> Hi Ken,
> 
> Thanks for the quick response, you are actually right, the job seems to be 
> running even after that error appears. It was crashing earlier (due to 
> fs.s3a.multipart.size being too high) and I confused it with this error since 
> that was the first one popping out and OOM wasn't apparent immediately.
> 
> I do have a subsequent question though if you don't mind me asking this 
> question in the same thread. So... if I'm reading the  BucketingSink code 
> correctly then if I supply the core-site.xml with following contents, would 
> it not pick the S3RecoverableWriter code path?:
> 
> 
> 
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
> 
> fs.s3a.fast.upload
> true
> 
> Use the incremental block upload mechanism with
> the buffering mechanism set in fs.s3a.fast.upload.buffer.
> The number of threads performing uploads in the filesystem is 
> defined
> by fs.s3a.threads.max; the queue of waiting uploads limited by
> fs.s3a.max.total.tasks.
> The size of each buffer is set by fs.s3a.multipart.size.
> 
> 
> 
> 
> fs.s3a.fast.upload.buffer
> array
> 
> The buffering mechanism to use when using S3A fast upload
> (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
> This configuration option has no effect if fs.s3a.fast.upload is 
> false.
> 
> "disk" will use the directories listed in fs.s3a.buffer.dir as
> the location(s) to save data prior to being uploaded.
> 
> "array" uses arrays in the JVM heap
> 
> "bytebuffer" uses off-heap memory within the JVM.
> 
> Both "array" and "bytebuffer" will consume memory in a single 
> stream up to the number
> of blocks set by:
> 
> fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
> 
> If using either of these mechanisms, keep this value low
> 
> The total number of threads performing work across all threads is 
> set by
> fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting 
> the number of queued
> work items.
> 
> 
> 
> 
> fs.s3a.multipart.size
> 10M
> How big (in bytes) to split upload or copy operations up 
> into.
> A suffix from the set {K,M,G,T,P} may be used to scale the 
> numeric value.
> 
> 
> 
> 
> fs.s3a.fast.upload.active.blocks
> 8
> 
> Maximum Number of blocks a single output stream can have
> active (uploading, or queued to the central FileSystem
> instance's pool of queued operations.
> 
> This stops a single stream overloading the shared thread pool.
> 
> 
> 
> 
>   fs.s3a.aws.credentials.provider
>   com.amazonaws.auth.EnvironmentVariableCredentialsProvider
> 
> 
> 
> 
> I say that because I don't see any files being written under `/tmp` directory 
> with the pattern like ".tmp_UUID", which what RefCountedTmpFileCreator is 
> supposed to create for staging writes to s3 (which is wired in by 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem): 
> 
> public RefCountedFile apply(File file) throws IOException {
>File directory = this.tempDirectories[this.nextIndex()];
> 
> while(true) {
> try {
> if (file == null) {
> File newFile = new File(directory, ".tmp_" + 
> UUID.randomUUID());
> OutputStream out = 
> Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
> return RefCountedFile.newFile(newFile, out);
> }
> 
> OutputStream out = Files.newOutputStream(file.toPath(), 
> StandardOpenOption.APPEND);
> return RefCountedFile.restoredFile(file, out, file.length());
> } catch (FileAlreadyExistsException var5) {
> }
> }
> }
> 
> 
> Is S3RecoverableWriter path even supported for BucketingSink?
> 
> Manish
> 
> 
> On Wed, May 15, 2019 at 6:05 PM Ken Krugler  > wrote:
> Hi Manish,
> 
> Are you sure this is an exception that’s actually killing the job?
> 
> Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 
>  talks about Commons 
> Beanutils logging this exception, but it’s a warning vs. something being 
> thrown up the stack.
> 
> — Ken
> 
>> On May 15, 2019, at 3:50 PM, Manish Bellani > > wrote:
>> 
>> hey

Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Manish Bellani
Hi Ken,

Thanks for the quick response, you are actually right, the job seems to be
running even after that error appears. It was crashing earlier (due to
fs.s3a.multipart.size being too high) and I confused it with this error
since that was the first one popping out and OOM wasn't apparent
immediately.

I do have a subsequent question though if you don't mind me asking this
question in the same thread. So... if I'm reading the  BucketingSink code
correctly then if I supply the core-site.xml with following contents, would
it not pick the *S3RecoverableWriter* code path?:




fs.s3.impl
org.apache.hadoop.fs.s3a.S3AFileSystem



fs.s3a.fast.upload
true

Use the incremental block upload mechanism with
the buffering mechanism set in fs.s3a.fast.upload.buffer.
The number of threads performing uploads in the filesystem is
defined
by fs.s3a.threads.max; the queue of waiting uploads limited by
fs.s3a.max.total.tasks.
The size of each buffer is set by fs.s3a.multipart.size.




fs.s3a.fast.upload.buffer
array

The buffering mechanism to use when using S3A fast upload
(fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
This configuration option has no effect if fs.s3a.fast.upload
is false.

"disk" will use the directories listed in fs.s3a.buffer.dir as
the location(s) to save data prior to being uploaded.

"array" uses arrays in the JVM heap

"bytebuffer" uses off-heap memory within the JVM.

Both "array" and "bytebuffer" will consume memory in a single
stream up to the number
of blocks set by:

fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

If using either of these mechanisms, keep this value low

The total number of threads performing work across all threads
is set by
fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting
the number of queued
work items.




fs.s3a.multipart.size
10M
How big (in bytes) to split upload or copy operations
up into.
A suffix from the set {K,M,G,T,P} may be used to scale the
numeric value.




fs.s3a.fast.upload.active.blocks
8

Maximum Number of blocks a single output stream can have
active (uploading, or queued to the central FileSystem
instance's pool of queued operations.

This stops a single stream overloading the shared thread pool.




  fs.s3a.aws.credentials.provider

com.amazonaws.auth.EnvironmentVariableCredentialsProvider




I say that because I don't see any files being written under `/tmp`
directory with the pattern like ".tmp_UUID", which what
RefCountedTmpFileCreator is supposed to create for staging writes to s3
(which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):

public RefCountedFile apply(File file) throws IOException {
   File directory = this.tempDirectories[this.nextIndex()];

while(true) {
try {
if (file == null) {
File newFile = new File(directory, ".tmp_" +
UUID.randomUUID());
OutputStream out =
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
return RefCountedFile.newFile(newFile, out);
}

OutputStream out = Files.newOutputStream(file.toPath(),
StandardOpenOption.APPEND);
return RefCountedFile.restoredFile(file, out,
file.length());
} catch (FileAlreadyExistsException var5) {
}
}
}


Is S3RecoverableWriter path even supported for BucketingSink?

Manish


On Wed, May 15, 2019 at 6:05 PM Ken Krugler 
wrote:

> Hi Manish,
>
> Are you sure this is an exception that’s actually killing the job?
>
> Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks
> about Commons Beanutils logging this exception, but it’s a warning vs.
> something being thrown up the stack.
>
> — Ken
>
> On May 15, 2019, at 3:50 PM, Manish Bellani 
> wrote:
>
> hey Friends,
>
> Thanks for all the work you have been doing on flink, I have been trying
> to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm
> running into some issues (which I suspect could be dependency/packaging
> related) that'd try to describe here.
>
> The data pipeline is quite simple:
>
> Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3
>
> *Environment:*
>
> Kubernetes
> Debian
> DockerImage: flink:1.7.2-hadoop28-scala_2.11
> Java 1.8
> Hadoop Version: 2.8.5
>
> I followed this dependency section:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27
> to place the dependenci

Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Ken Krugler
Hi Manish,

Are you sure this is an exception that’s actually killing the job?

Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 
 talks about Commons 
Beanutils logging this exception, but it’s a warning vs. something being thrown 
up the stack.

— Ken

> On May 15, 2019, at 3:50 PM, Manish Bellani  wrote:
> 
> hey Friends,
> 
> Thanks for all the work you have been doing on flink, I have been trying to 
> use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm 
> running into some issues (which I suspect could be dependency/packaging 
> related) that'd try to describe here.
> 
> The data pipeline is quite simple:
> 
> Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3
> Environment:
> 
> Kubernetes
> Debian
> DockerImage: flink:1.7.2-hadoop28-scala_2.11
> Java 1.8
> Hadoop Version: 2.8.5
> I followed this dependency section: 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27
>  
> 
>  to place the dependencies under /opt/flink/lib (with an exception that my 
> Hadoop version and it's dependencies that I pull in are different).
> 
> Here are the dependencies I'm pulling in (excerpt from my Dockerfile)
> 
> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar 
> /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
> RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar 
> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
>  
> 
> RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
> #Transitive 
> 
>  Dependency of aws-java-sdk-s3
> RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
>  
> 
> RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
>  
> 
> RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar 
> http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar 
> 
> RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
>  
> 
>  
> But when I submit the job, it throws this error during initialization of 
> BucketingSink/S3AFileSystem:
> 
> java.beans.IntrospectionException: bad write method arg count: public final 
> void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
> at 
> java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
> at 
> java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
> at java.beans.PropertyDescriptor.(PropertyDescriptor.java:139)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspe

Error While Initializing S3A FileSystem

2019-05-15 Thread Manish Bellani
hey Friends,

Thanks for all the work you have been doing on flink, I have been trying to
use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm
running into some issues (which I suspect could be dependency/packaging
related) that'd try to describe here.

The data pipeline is quite simple:

Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3

*Environment:*

Kubernetes
Debian
DockerImage: flink:1.7.2-hadoop28-scala_2.11
Java 1.8
Hadoop Version: 2.8.5

I followed this dependency section:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27
to place the dependencies under /opt/flink/lib (with an exception that my
Hadoop version and it's dependencies that I pull in are different).

Here are the dependencies I'm pulling in (excerpt from my Dockerfile)

RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar
/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar
http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar
http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar
http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar



But when I submit the job, it throws this error during initialization of
BucketingSink/S3AFileSystem:

java.beans.IntrospectionException: bad write method arg count: public
final void 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
at 
java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
at java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
at java.beans.PropertyDescriptor.(PropertyDescriptor.java:139)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBea

Applying multiple calculation on data aggregated on window

2019-05-15 Thread Soheil Pourbafrani
Hi,

Im my environment I need to collect stream of messages into windows based
on some fields as key and then I need to do multiple calculations that will
apply on specaified messages. for example if i had the following messages
on the window:
{ts: 1, key: a, value: 10}
{ts: 1, key: b, value: 0}
{ts: 1, key: c, value: 2}
{ts: 1, key: d, value: 5}
{ts: 1, key: e, value: 6}
{ts: 1, key: f, value: 7}
{ts: 1, key: g, value: 9}

- for the keys a, b and c I need to calculate the average of the values
(12/3=4) and generate another message like {ts: 1, key: abc, value: 4}

- for the key f and d I need to get the sum (5 + 7 = 12) and generate {ts:
1, key: fd, value: 12}

and I don't need the messages with the key e and g

So I did the following:

raw
  .keyBy(4, 5)
  .timeWindow(Time.seconds(5))

but I don't know how flink can help me to apply the logic to the data. I
think I need to use some method other than reduce or aggregate.

Any help will be appreciated.

thanks


Preventing Late events from getting triggered for custom trigger

2019-05-15 Thread shkob1
Im running SQL along with a a custom trigger - so think about this scenario:

Select id, count(*) as count_events, atLeastOneIsTrue(booleanField) as
shouldTrigger
FROM my_table 
GROUP BY id

transforming it to a retracted stream and then filtering by the
shouldTrigger field, that works as expected.

What happens is that if i get any event for the same id past the
shouldTrigger=true condition, obviously it will trigger again (as
atLeastOneIsTrue is still TRUE). I'm trying to prevent it, so similar to
late events on window functions in [1] i want to ensure only a single
trigger happens for any given id.

I had a thought about whether this is related to withIdleStateRetentionTime
but it doesn't seem right to me

Any idea how i can achieve it?

Thanks!
Shahar



[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#allowed-lateness




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Migrating Existing TTL State to 1.8

2019-05-15 Thread Ning Shi
Hi Andrey,

Thank you for the reply.

We are using incremental checkpointing.

Good to know that the incremental cleanup only applies to the heap state
backend. Looks like taking some downtime to take a full savepoint and restore
everything is inevitable.

Thanks,

--
Ning

On Wed, 15 May 2019 10:53:25 -0400,
Andrey Zagrebin wrote:
> 
> Hi Ning,
> 
> If you have not activated non-incremental checkpointing then taking a
> savepoint is the only way to trigger the full snapshot. In any case, it
> will take time.
> 
> The incremental cleanup strategy is applicable only for heap state backend
> and does nothing for RocksDB backend. At the moment, you can combine only
> compaction filter with full snapshotting cleanup with RocksDB backend.
> 
> Best,
> Andrey


Re: Getting java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError when stopping/canceling job.

2019-05-15 Thread John Smith
So these are the two exceptions I see in the logs...

Exception in thread "vert.x-worker-thread-0" Exception in thread
"vert.x-internal-blocking-0" java.lang.NoClassDefFoundError:
io/netty/util/concurrent/FastThreadLocal
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
io.netty.util.concurrent.FastThreadLocal
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 2 more
java.lang.NoClassDefFoundError: io/netty/util/concurrent/FastThreadLocal
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32)
at java.lang.Thread.run(Thread.java:748)
May 15, 2019 10:42:07 AM io.vertx.core.impl.ContextImpl
SEVERE: Unhandled exception
java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError:
io/vertx/core/impl/VertxImpl$SharedWorkerPool
at
io.vertx.core.impl.VertxImpl.lambda$deleteCacheDirAndShutdown$25(VertxImpl.java:830)
at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:284)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at
io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError:
io/vertx/core/impl/VertxImpl$SharedWorkerPool
... 10 more
Caused by: java.lang.ClassNotFoundException:
io.vertx.core.impl.VertxImpl$SharedWorkerPool
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 10 more

On Wed, 15 May 2019 at 12:00, Andrey Zagrebin  wrote:

> Hi John,
>
> could you share the full stack trace or better logs?
> It looks like something is trying to be executed in vertx.io code after
> the local task has been stopped and the class loader for the user code has
> been unloaded. Maybe from some daemon thread pool.
>
> Best,
> Andrey
>
>
> On Wed, May 15, 2019 at 4:58 PM John Smith  wrote:
>
>> Hi,
>>
>> I'm using vertx.io as an async JDBC client for a RichAsyncFunction it
>> works fine but when I stop the job I get...
>>
>> java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError:
>> io/vertx/core/impl/VertxImpl$SharedWorkerPool
>>
>> Is there a way to avoid/fix this?
>>
>


Flink with HBase

2019-05-15 Thread Nikhil Goyal
Hi,

Does flink support exactly once processing using HBase as sink? I am not
able to find any documentation supporting the same.

Thanks
Nikhil


User Interface not showing the actual count received and produced

2019-05-15 Thread PoolakkalMukkath, Shakir
Hi Flink team,

I am developing a flow which uses
· FlinkKafkaConsumer010 to consume message from Kafka  and
· FlinkKinesisProducer to produce the results to Kinesis.

In the user interface, I always see Bytes and Record received from Kafka is 
zero even though it is receiving events and processing.  And same with Kinesis 
Sink, Bytes and record sent is always zero even though it is posting events to 
Kinesis.

Any reason why my UI is not showing the actual count ? Thanks


Thanks,
Shakir



Re: NPE in Flink 1.8.0

2019-05-15 Thread Farouk
Hi

We had the same issue.

Make sure everything is using Flink 1.8 and not half on 1.7.2 and the other
half on 1.8.

Make sure to prune docker images and so on, even maven repo.

Farouk


Le mer. 15 mai 2019 à 18:08, Akshay Shinde  a
écrit :

> Hi
>
>
> We would highly appreciate any information on following stacktrace.
> Our flink job is simply writing data to Cassandra using cassandra sink
> and we are getting following exception -
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at
> org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> My jobs were working fine with version 1.7.2, now we upgraded our version
> to 1.8.0 then we are facing this problem.
>
> Waiting for your response.
>
>
> --
>
> Regards,
>
> Akshay Shinde
>
> SMTS, Oracle
>


NPE in Flink 1.8.0

2019-05-15 Thread Akshay Shinde
Hi


We would highly appreciate any information on following stacktrace. Our flink 
job is simply writing data to Cassandra using cassandra sink and we are getting 
following exception -

Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
at java.lang.Thread.run(Thread.java:745)



My jobs were working fine with version 1.7.2, now we upgraded our version to 
1.8.0 then we are facing this problem.

Waiting for your response.


--

Regards,

Akshay Shinde

SMTS, Oracle

Re: Getting java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError when stopping/canceling job.

2019-05-15 Thread Andrey Zagrebin
Hi John,

could you share the full stack trace or better logs?
It looks like something is trying to be executed in vertx.io code after the
local task has been stopped and the class loader for the user code has been
unloaded. Maybe from some daemon thread pool.

Best,
Andrey


On Wed, May 15, 2019 at 4:58 PM John Smith  wrote:

> Hi,
>
> I'm using vertx.io as an async JDBC client for a RichAsyncFunction it
> works fine but when I stop the job I get...
>
> java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError:
> io/vertx/core/impl/VertxImpl$SharedWorkerPool
>
> Is there a way to avoid/fix this?
>


Re: flink 1.4.2. java.lang.IllegalStateException: Could not initialize operator state backend

2019-05-15 Thread Andrey Zagrebin
Hi,

I am not sure that FLINK-8836
 is related to the
failure in the stack trace.

You say you are using Flink in production, does it mean it always worked
and has started to fail recently?

>From the stack trace, it looks like the arity of some Tuple type changed in
some operator state. The number of tuple fields could have increased after
job restart. In that case Flink expects tuples with more fields stored in
checkpoint and fails. Such change would require an explicit state
migration. Could it be the case? When did the failure start to happen and
why the operator state was restored? Job restart?

Best,
Andrey


Re: Table program cannot be compiled

2019-05-15 Thread Andrey Zagrebin
Hi, I am looping in Timo and Dawid to look at the problem.

On Tue, May 14, 2019 at 9:12 PM shkob1  wrote:

> BTW looking at past posts on this issue[1] it should have been fixed? i'm
> using version 1.7.2
> Also the recommendation was to use a custom function, though that's exactly
> what im doing with the conditionalArray function[2]
>
> Thanks!
>
> [1]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStreamCalcRule-1802-quot-grows-beyond-64-KB-when-execute-long-sql-td20832.html#a20841
>
> [2]
> public class ConditionalArrayFunction extends ScalarFunction {
>
> public static final String NAME = "conditionalArray";
>
> public String[] eval(Object... keyValues) {
> if (keyValues.length == 0) {
> return new String[]{};
> }
> final List keyValuesList = Arrays.asList(keyValues);
> List trueItems = Lists.newArrayList();
> for (int i = 0; i < keyValuesList.size(); i = i + 2){
> final String key = (String)keyValuesList.get(i);
> final Object value = keyValuesList.get(i + 1);
>
> if (value != null && (boolean)value)
> trueItems.add(key);
> }
> return trueItems.toArray(new String[0]);
> }
> }
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Getting java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError when stopping/canceling job.

2019-05-15 Thread John Smith
Hi,

I'm using vertx.io as an async JDBC client for a RichAsyncFunction it works
fine but when I stop the job I get...

java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError:
io/vertx/core/impl/VertxImpl$SharedWorkerPool

Is there a way to avoid/fix this?


Re: Migrating Existing TTL State to 1.8

2019-05-15 Thread Andrey Zagrebin
Hi Ning,

If you have not activated non-incremental checkpointing then taking a
savepoint is the only way to trigger the full snapshot. In any case, it
will take time.

The incremental cleanup strategy is applicable only for heap state backend
and does nothing for RocksDB backend. At the moment, you can combine only
compaction filter with full snapshotting cleanup with RocksDB backend.

Best,
Andrey

On Fri, Mar 15, 2019 at 11:56 PM Ning Shi  wrote:

> Hi Stefan,
>
> Thank you for the confirmation.
>
> Doing a one time cleanup with full snapshot and upgrading to Flink 1.8
> could work. However, in our case, the state is quite large (TBs).
> Taking a savepoint takes over an hour, during which we have to pause
> the job or it may process more events.
>
> The JavaDoc of `cleanupFullSnapshot` [1] says "Cleanup expired state
> in full snapshot on checkpoint.". My understanding is that the only
> way to take a full snapshot with RocksDB backend is to take a
> savepoint. Is there another way to take a full checkpoint?
>
> I noticed that Flink 1.8 also added an incremental cleanup strategy
> [2] by iterating through several keys at a time for each state access.
> If I combine this with the new compaction filter cleanup strategy,
> will it eventually remove all expired state without taking a full
> snapshot for upgrade?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupFullSnapshot--
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupIncrementally-int-boolean-
>
> Thanks,
>
> Ning
>
>
> On Wed, Mar 13, 2019 at 11:22 AM Stefan Richter 
> wrote:
> >
> > Hi,
> >
> > If you are worried about old state, you can combine the compaction
> filter based TTL with other cleanup strategies (see docs). For example,
> setting `cleanupFullSnapshot` when you take a savepoint it will be cleared
> of any expired state and you can then use it to bring it into Flink 1.8.
> >
> > Best,
> > Stefan
>


FlinkSQL fails when rowtime meets dirty data

2019-05-15 Thread maidangdang
I use FlinkSQL to process Kafka data in the following format:
|  id |  server_time |
|  1  | 2019-05-15 10:00:00 |
|  2  | 2019-05-15 10:00:00 |
...


and I define rowtime from the  server_time field:
new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
   .rowtime(new Rowtime().timestampsFromField("server_time"))
.field("id", Types.String)
.field("server_time", Types.String)


when dirty data arrives, such as :
|  id   |  server_time |
|  99  | 11.22.33.44  |


My FlinkSQL job fails with exception:
java.lang.NumberFormatException: For input string: "11.22.33.44"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at 
org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
at 
org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
at DataStreamSourceConversion$288.processElement(Unknown Source)
at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)


Because my flink job use EXACTLY_ONCE, so the job is re-executed from the last 
checkpoint, consumes dirty data again, fails again, and keeps looping like 
this.I would like to ask if there are any good ways to solve this situation?


The Flink version I used was flink-1.7.2

Re: Flink and Prometheus setup in K8s

2019-05-15 Thread Wouter Zorgdrager
Hi all,

To answer my own questions I worked on the following solution:

1) Custom Docker image which pulls the Flink image and moves Prometheus jar
to the correct folder [1, 2].
2) I wrote manifests for Kubernetes with service discovery configuration
for Kubernetes [3]. Besides the 'official' Flink Kubernetes manifests, I
added a TM service which exposes all TM metrics locations so that
Prometheus can scrape it. This means that (re)-scaling Flink TM's are
automatically picked up by Prometheus. The repository also includes a
Grafana setup with a simple dashboard.

I thought this might be useful for other users!

Cheers,
Wouter

[1]: https://github.com/wzorgdrager/flink-k8s/blob/master/docker/Dockerfile
[2]: https://hub.docker.com/r/wzorgdrager/flink-prometheus
[3]: https://github.com/wzorgdrager/flink-k8s

Op ma 13 mei 2019 om 14:16 schreef Wouter Zorgdrager <
w.d.zorgdra...@tudelft.nl>:

> Hey all,
>
> I'm working on a deployment setup with Flink and Prometheus on Kubernetes.
> I'm running into the following issues:
>
> 1) Is it possible to use the default Flink Docker image [1] and enable the
> Prometheus reporter? Modifying the flink-config.yaml is easy, but somehow
> the Prometheus reporter jar needs to be moved within the image. This is
> easy if use my own Dockerfile (as done here [2]) , but I prefer using the
> official one.
> 2) I can define the jobmanager/taskmanager metric endpoints statically,
> but w.r.t. scaling I prefer to have these resolved/discovered dynamically.
> Did anyone get a working setup on this? I came across this resource for
> YARN [3], is there something similar for Kubernetes? Or are there any other
> ways of configuring Prometheus to pick this up automatically?
>
> Thanks a lot for your help!
>
> Kind regards,
> Wouter
>
> [1]: https://hub.docker.com/_/flink/
> [2]:
> https://github.com/mbode/flink-prometheus-example/blob/master/Dockerfile
> [3]: https://github.com/eastcirclek/flink-service-discovery
>


Checkpoints periodically fail with hdfs as the state backend - Could not flush and close the file system output stream

2019-05-15 Thread PedroMrChaves
Hello,

Every once in a while our checkpoints fail with the following exception:

/AsynchronousException{java.lang.Exception: Could not materialize checkpoint
65912 for operator AGGREGATION-FILTER (2/2).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 65912 for
operator AGGREGATION-FILTER (2/2).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
hdfs:/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by:
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
does not exist:
/flink/data/checkpoints/76f7b4f5c679e8f2d822c9c3c73faf5d/chk-65912/68776faf-b687-403b-ba0c-17419f8684dc
(inode 181723246) Holder DFSClient_NONMAPREDUCE_-10072319_1 does not have
any open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2668)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2713)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:555)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at

Passing a custom SourceContext to a SourceFunction

2019-05-15 Thread Debasish Ghosh
Hi -

I have a custom SourceFunction ..

class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] {
  def run(ctx: SourceContext[T]): Unit = {
data.foreach(d ⇒ ctx.collect(d))
  }
}

When this function is run during job execution, the SourceContext that gets
passed serializes the data. I would like to pass a mock SourceContext
(similar to
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java)
in the run method. How do I do this ? Note I am not invoking the run method
explicitly anywhere.

Any help will be appreciated.

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg