Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
MapStateDescriptor descriptor = new
MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);

Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:

> Hi,
>
>Can you show us the descriptor in the codes below?
>
> client.getKvState(JobID.fromHexString(
> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>
> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>> TypeInformation.of(new TypeHint() {}), descriptor);
>>
>>
> Jiayi Liao, Best
>
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* bupt_ljy
> *Cc:* Tzu-Li (Gordon) Tai; user >
> *Date:* Friday, Oct 26, 2018 02:26
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Also, I haven't provided any custom serializer in my flink job. Shouldn't
> the same configuration work for queryable state client?
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta  wrote:
>
>> Hi Gordon,
>> Following is the stack trace that I'm getting:
>>
>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Error while processing request
>> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: -985346241*
>> *Serialization trace:*
>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>> * at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>> * at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>> * at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>> * at
>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>> * at
>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>> * at
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>> * at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>> * at java.lang.Thread.run(Thread.java:748)*
>>
>> I am not using any custom serialize as mentioned by Jiayi.
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:
>>
>>> Hi  Jayant,
>>>
>>>   There should be a Serializer parameter in the constructor of the
>>> StateDescriptor, you should create a new serializer like this:
>>>
>>>
>>>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>>
>>>
>>>  By the way, can you show us your kryo exception like what Gordon said?
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>
>>>
>>>  Original Message
>>> *Sender:* Tzu-Li (Gordon) Tai
>>> *Recipient:* Jayant Ameta; bupt_ljy<
>>> bupt_...@163.com>
>>> *Cc:* user
>>> *Date:* Thursday, Oct 25, 2018 17:18
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Jayant,
>>>
>>> What is the Kryo exception message that you are getting?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com)
>>> wrote:
>>>
>>> Hi,
>>> I've not configured any serializer in the descriptor. (Neither in flink
>>> job, nor in state query client).
>>> Which serializer should I use?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:
>>>
 Hi,

It seems that your codes are right. Are you sure that you’re using
 the same Serializer as the Flink program do? Could you show the serializer
 in descriptor?



 Jiayi Liao, Best

  Original Message
 *Sender:* Jayant Ameta
 *Recipient:* user
 *Date:* Thursday, Oct 25, 2018 14:17
 *Subject:* Queryable state when key is UUID - getting Kyro Exception

 I get Kyro exception when querying the state.

 Key: UUID
 MapState

 Client code snippet:

 CompletableFuture> 

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Hi,
 Can you show us the descriptor in the codes below?
  client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);


Jiayi Liao, Best




Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Friday, Oct 26, 2018 02:26
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Also, I haven't provided any custom serializer in my flink job. Shouldn't the 
same configuration work for queryable state client?


Jayant Ameta





On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta wittyam...@gmail.com wrote:

Hi Gordon,
Following is the stack trace that I'm getting:


Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Error while processing request with ID 
0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


I am not using any custom serialize as mentioned by Jiayi.


Jayant Ameta





On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy bupt_...@163.com wrote:

Hi Jayant,
 There should be a Serializer parameter in the constructor of the 
StateDescriptor, you should create a new serializer like this:


 new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best




Original Message
Sender:Tzu-Li (Gordon) taitzuli...@apache.org
Recipient:Jayant ametawittyam...@gmail.com; bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 17:18
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jayant,


What is the Kryo exception message that you are getting?


Cheers,
Gordon




On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?


Jayant Ameta





On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote:

Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?




Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 14:17
Subject:Queryable state when key is UUID - getting Kyro Exception


I get Kyro exception when querying the state.


Key: UUID
MapStateUUID, String


Client code snippet:


CompletableFutureMapStateUUID, String resultFuture =
 client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
 UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);
MapStateUUID, String mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?




Jayant Ameta

Re: How to tune Hadoop version in flink shaded jar to Hadoop version actually used?

2018-10-25 Thread vino yang
Hi Henry,

When running flink on YARN, from ClusterEntrypoint the system environment
info is print out.
One of the info is "Hadoop version: 2.4.1”, I think it is from the
flink-shaded-hadoop2 jar. But actually the system Hadoop version is 2.7.2.

I want to know is it OK if the version is different?

*> I don't think it is OK, because you will use a lower version of the
client to access the higher version of the server.*

Is it a best practice to adjust flink Hadoop version to the Hadoop version
actually used?

*> I personally recommend that you keep the two versions consistent to
eliminate the possibility of causing various potential problems. *
*In fact, Flink provides a bundle of Hadoop 2.7.x bundles for you to
download.[1]*

[1]:
https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz

Thanks, vino.

徐涛  于2018年10月26日周五 上午9:13写道:

> Hi Experts
> When running flink on YARN, from ClusterEntrypoint the system
> environment info is print out.
> One of the info is "Hadoop version: 2.4.1”, I think it is from the
> flink-shaded-hadoop2 jar. But actually the system Hadoop version is 2.7.2.
> I want to know is it OK if the version is different? Is it a best
> practice to adjust flink Hadoop version to the Hadoop version actually used?
>
> Thanks a lot.
>
> Best
> Henry


Re: Parallelize an incoming stream into 5 streams with the same data

2018-10-25 Thread Hequn Cheng
Hi Vijay,

Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
`inputStream`.
While option 2 replicate all data to each task and option 3 split data into
smaller groups without duplication.

Best, Hequn

On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan 
wrote:

> Hi,
> I need to broadcast/parallelize an incoming stream(inputStream) into 5
> streams with the same data. Each stream is keyed by different keys to do
> various grouping operations on the set.
>
> Do I just use inputStream.keyBy(5 diff keys) and then just use the
> DataStream to perform windowing/grouping operations ?
>
> *DataStream inputStream= ...*
> *DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
> *DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*
>
> *DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
> windowing/grouping operations in this function*
> *DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
> windowing/grouping operations in this function*
>
> out1Stream.print();
> out2Stream.addSink(new Out2Sink());
>
> Will this work ?
>
> Or do I use the keyBy Stream with a broadcast function like this:
>
> *BroadcastStream broadCastStream = inputStream.broadcast(..);*
> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
> * .process(new KeyedBroadcastProcessFunction...)*
>
> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
> * .process(new KeyedBroadcastProcessFunction...)*
>
> Or do I need to use split:
>
> *SplitStream source = inputStream.split(new MyOutputSelector());*
> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>
>
> static final class MyOutputSelector implements OutputSelector {
> List outputs = new ArrayList();
> public Iterable select(Long value) {
> outputs.add("");
> return outputs;
> }
> }
> TIA,
>


Re: Accumulating a batch

2018-10-25 Thread Hequn Cheng
Hi Austin,

You can use GroupBy Window[1], such as TUMBLE Window. The size of the
window either as time or row-count interval. You can also define your
own User-Defined Aggregate Functions[2] to be used in window.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#tumble-tumbling-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions

On Fri, Oct 26, 2018 at 5:08 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi there,
>
> Is it possible to use an AggregationFunction to accumulate n values in a
> buffer until a threshold is met, then stream down all records in the batch?
>
> Thank you!
> Austin Cawley-Edwards
>


How to tune Hadoop version in flink shaded jar to Hadoop version actually used?

2018-10-25 Thread 徐涛
Hi Experts
When running flink on YARN, from ClusterEntrypoint the system 
environment info is print out.
One of the info is "Hadoop version: 2.4.1”, I think it is from the 
flink-shaded-hadoop2 jar. But actually the system Hadoop version is 2.7.2.
I want to know is it OK if the version is different? Is it a best 
practice to adjust flink Hadoop version to the Hadoop version actually used?

Thanks a lot.

Best
Henry

Re: RocksDB State Backend Exception

2018-10-25 Thread Ning Shi
Hi Andrey,

Thank you for the explanation. I think you are right. It is either
kStaleFile or kNoSpace. We found the cause of the issue, even though we
still don't know how to explain it.

We set the java.io.tmpdir to an EBS-backed drive instead of the
default and the exception started happening. The issue was gone after we
changed it back to use the default.

Thanks,

On Thu, Oct 25, 2018 at 02:23:31PM +0200, Andrey Zagrebin wrote:
> Hi Ning,
>
> The problem here first of all is that RocksDB java JNI client diverged from 
> RocksDB cpp code in status.h,
> as mentioned in the Flink issue you refer to.
>
> Flink 1.6 uses RocksDB 5.7.5 java client.
> The JNI code there misses these status subcodes:
> kNoSpace = 4,
> kDeadlock = 5,
> kStaleFile = 6,
> kMemoryLimit = 7
> which could be potential problems in the job.
>
> kNoSpace is only one of them.
> Another probable cause could be kStaleFile, some file system IO problem.
> kDeadlock seems to be used only with transactions, so not relevant.
> kMemoryLimit means that write batch exceeded max size, but we do not have 
> limit for it as I understand.
>
> It would be easier to debug if RocksDB JNI client would at least log the 
> unknown subcode but i do not see any easy way to log it in the current 
> version, without rebuilding RocksDB and subsequently Flink.
>
> In master branch, java Status and status.h are also unsynced. You could 
> report this issue in RocksDB repo, along with extending exception logging 
> message with the number of unknown error code. Flink community plans to 
> upgrade to the latest RocksDB version again in one of the next Flink releases.
>
> Best,
> Andrey

--
Ning


Parallelize an incoming stream into 5 streams with the same data

2018-10-25 Thread Vijay Balakrishnan
Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5
streams with the same data. Each stream is keyed by different keys to do
various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the
DataStream to perform windowing/grouping operations ?

*DataStream inputStream= ...*
*DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
*DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*

*DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
windowing/grouping operations in this function*
*DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
windowing/grouping operations in this function*

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

*BroadcastStream broadCastStream = inputStream.broadcast(..);*
*DataSTream out1Stream = keyBy1.connect(broadCastStream)*
* .process(new KeyedBroadcastProcessFunction...)*

*DataSTream out2Stream = keyBy2.connect(broadCastStream)*
* .process(new KeyedBroadcastProcessFunction...)*

Or do I need to use split:

*SplitStream source = inputStream.split(new MyOutputSelector());*
*source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector {
List outputs = new ArrayList();
public Iterable select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,


Link on Azure HDInsight cluster with WASB storage

2018-10-25 Thread Fakrudeen Ali Ahmed
Hi there,

https://stackoverflow.com/questions/52996054/resource-changed-on-src-filesystem-in-azure-flink

We are unable to start flink on Azure Hadoop cluster [on top of WASB]. This 
throws:

Application application_1539730571763_0046 failed 1 times (global limit =5; 
local limit is =1) due to AM Container for appattempt_1539730571763_0046_01 
exited with exitCode: -1000
Failing this attempt.
Diagnostics: [2018-10-23 23:31:31.563]Resource 
wasb://realtime-id-graph-2018-10-16t22-41-48-7...@fakrudeenstorage.blob.core.windows.net/user/aliahmed/.flink/application_1539730571763_0046/logback.xml
 changed on src filesystem (expected 1536955416000, was 1540337488000
java.io.IOException: Resource 
wasb://realtime-id-graph-2018-10-16t22-41-48-7...@fakrudeenstorage.blob.core.windows.net/user/aliahmed/.flink/application_1539730571763_0046/logback.xml
 changed on src filesystem (expected 1536955416000, was 1540337488000
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:273)

Is Flink not supported on Azure Hadoop cluster?

It looks like this file timestamp changes in WASB during file copy and YARN is 
complaining about original timestamp.

Thanks,
-Fakrudeen


Accumulating a batch

2018-10-25 Thread Austin Cawley-Edwards
Hi there,

Is it possible to use an AggregationFunction to accumulate n values in a
buffer until a threshold is met, then stream down all records in the batch?

Thank you!
Austin Cawley-Edwards


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
Also, I haven't provided any custom serializer in my flink job. Shouldn't
the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta  wrote:

> Hi Gordon,
> Following is the stack trace that I'm getting:
>
> *Exception in thread "main" java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Failed request 0.*
> * Caused by: java.lang.RuntimeException: Failed request 0.*
> * Caused by: java.lang.RuntimeException: Error while processing request
> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
> unregistered class ID: -985346241*
> *Serialization trace:*
> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
> * at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
> * at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
> * at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
> * at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
> * at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
> * at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
> * at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
> * at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
> * at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
> * at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
> * at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
> * at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> * at java.lang.Thread.run(Thread.java:748)*
>
> I am not using any custom serialize as mentioned by Jiayi.
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:
>
>> Hi  Jayant,
>>
>>   There should be a Serializer parameter in the constructor of the
>> StateDescriptor, you should create a new serializer like this:
>>
>>
>>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>
>>
>>  By the way, can you show us your kryo exception like what Gordon said?
>>
>>
>> Jiayi Liao, Best
>>
>>
>>
>>  Original Message
>> *Sender:* Tzu-Li (Gordon) Tai
>> *Recipient:* Jayant Ameta; bupt_ljy<
>> bupt_...@163.com>
>> *Cc:* user
>> *Date:* Thursday, Oct 25, 2018 17:18
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Hi Jayant,
>>
>> What is the Kryo exception message that you are getting?
>>
>> Cheers,
>> Gordon
>>
>>
>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com)
>> wrote:
>>
>> Hi,
>> I've not configured any serializer in the descriptor. (Neither in flink
>> job, nor in state query client).
>> Which serializer should I use?
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:
>>
>>> Hi,
>>>
>>>It seems that your codes are right. Are you sure that you’re using
>>> the same Serializer as the Flink program do? Could you show the serializer
>>> in descriptor?
>>>
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* user
>>> *Date:* Thursday, Oct 25, 2018 14:17
>>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>>
>>> I get Kyro exception when querying the state.
>>>
>>> Key: UUID
>>> MapState
>>>
>>> Client code snippet:
>>>
>>> CompletableFuture> resultFuture =
>>> 
>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
>>> "rule",
>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>> TypeInformation.of(new TypeHint() {}), descriptor);
>>> MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>
>>>
>>> Any better way to query it?
>>>
>>>
>>> Jayant Ameta
>>>
>>


Re: Checkpoint acknowledge takes too long

2018-10-25 Thread Hequn Cheng
Hi Henry,

Thanks for letting us know.

On Thu, Oct 25, 2018 at 7:34 PM 徐涛  wrote:

> Hi Hequn & Kien,
> Finally the problem is solved.
> It is due to slow sink write. Because the job only have 2 tasks, I check
> the backpressure, found that the source has high backpressure, so I tried
> to improve the sink write. After that the end to end duration is below 1s
> and the checkpoint timeout is fixed.
>
> Best
> Henry
>
>
> 在 2018年10月24日,下午10:43,徐涛  写道:
>
> Hequn & Kien,
> Thanks a lot for your help, I will try it later.
>
> Best
> Henry
>
>
> 在 2018年10月24日,下午8:18,Hequn Cheng  写道:
>
> Hi Henry,
>
> @Kien is right. Take a thread dump to see what was doing in the
> TaskManager. Also check whether gc happens frequently.
>
> Best, Hequn
>
>
> On Wed, Oct 24, 2018 at 5:03 PM 徐涛  wrote:
>
>> Hi
>> I am running a flink application with parallelism 64, I left the
>> checkpoint timeout default value, which is 10minutes, the state size is
>> less than 1MB, I am using the FsStateBackend.
>> The application triggers some checkpoints but all of them fails
>> due to "Checkpoint expired before completing”, I check the checkpoint
>> history, found that there are 63 subtask acknowledge, but one left n/a, and
>> also the alignment duration is quite long, about 5m27s.
>> I want to know why there is one subtask does not acknowledge? And
>> because the alignment duration is long, what will influent the alignment
>> duration?
>> Thank a lot.
>>
>> Best
>> Henry
>
>
>
>


Flink yarn -kill

2018-10-25 Thread Mikhail Pryakhin
Hi Flink community,
Could you please help me clarify the following question:
When a streaming job running in YARN gets manually killed via yarn -kill 
command is there any way to make a savepoint or other clean up actions before 
the job manager is killed? 

Kind Regards,
Mike Pryakhin



smime.p7s
Description: S/MIME cryptographic signature


Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

2018-10-25 Thread Mikhail Pryakhin
Hi Flink experts!

When a streaming job with Zookeeper-HA enabled gets cancelled all the 
job-related Zookeeper nodes are not removed. Is there a reason behind that? 
I noticed that Zookeeper paths are created of type "Container Node" (an 
Ephemeral node that can have nested nodes) and fall back to Persistent node 
type in case Zookeeper doesn't support this sort of nodes. 
But anyway, it is worth removing the job Zookeeper node when a job is 
cancelled, isn't it?

Thank you in advance!

Kind Regards,
Mike Pryakhin



smime.p7s
Description: S/MIME cryptographic signature


Re: Question over Incremental Snapshot vs Full Snapshot in rocksDb state backend

2018-10-25 Thread Andrey Zagrebin
Hi Chandan,

> 1. Why did we took 2 different approaches using different RocksDB apis ?
> We could have used Checkpoint api of RocksDB for fullSnapshot as well .

The reason here is partially historical. Full snapshot in RocksDB backend was 
implemented before incremental and rescaling for incremental snapshot but after 
heap backend. Full snapshot in RocksDB uses approach close to heap backend 
because Flink community plans to support the unified format for savepoints. The 
unified format would make it possible to switch backends and restore from 
savepoint. The formats still differ due to backend specifics to optimise 
snapshotting and restore but it is technically possible to unify them in future.

> 2. Is there any specific reason to use Snapshot API of rocksDB  over 
> Checkpoint api of RocksDB for fullSnapshot?

I think Checkpoint API produces separate SST file list to copy them to HDFS in 
case of incremental snapshot.

Full snapshot does not need the file list, it just needs an iterator over 
snapshotted (frozen) data. Internally RocksDB just hard-links immutable already 
existing SST files and iterates their data for Snapshot API.

Best,
Andrey


> On 24 Oct 2018, at 18:40, chandan prakash  wrote:
> 
> Thanks Tzu-Li for redirecting.
> Would also like to be corrected if my any inference from the code is 
> incorrect or incomplete.
> I am sure it will help to clear doubts of more developers like me  :)
> Thanks in advance.
> 
> Regards,
> Chandan
> 
> 
> On Wed, Oct 24, 2018 at 9:19 PM Tzu-Li (Gordon) Tai  > wrote:
> Hi,
> 
> I’m forwarding this question to Stefan (cc’ed).
> He would most likely be able to answer your question, as he has done 
> substantial work in the RocksDB state backends.
> 
> Cheers,
> Gordon
> 
> 
> On 24 October 2018 at 8:47:24 PM, chandan prakash (chandanbaran...@gmail.com 
> ) wrote:
> 
>> Hi,
>> I am new to Flink.
>> Was looking into the code to understand how Flink does FullSnapshot and 
>> Incremental Snapshot using RocksDB
>> 
>> What I understood:
>> 1. For full snapshot, we call RocksDb snapshot api which basically an 
>> iterator handle to the entries in RocksDB instance. We iterate over every 
>> entry one by one and serialize that to some distributed file system. 
>> Similarly in restore for fullSnapshot, we read the file to get every entry 
>> and apply that to the rocksDb instance one by one to fully construct the db 
>> instance.
>> 
>> 2. On the other hand in for Incremental Snapshot, we rely on RocksDB 
>> Checkpoint api to copy the sst files to HDFS/S3 incrementally.
>> Similarly on restore, we copy the sst files to local directory and 
>> instantiate rocksDB instance with the path of the directory.
>> 
>> My Question is:
>> 1. Why did we took 2 different approaches using different RocksDB apis ?
>> We could have used Checkpoint api of RocksDB for fullSnapshot as well .
>> 2. Is there any specific reason to use Snapshot API of rocksDB  over 
>> Checkpoint api of RocksDB for fullSnapshot?
>> 
>> I am sure, I am missing some important point, really curious to know that.
>> Any explanation will be really great. Thanks in advance.
>> 
>> 
>> Regards,
>> Chandan
>> 
>> 
>> 
>> 
>> 
>> --
>> Chandan Prakash
>> 
> 
> 
> -- 
> Chandan Prakash
> 



Re: RocksDB State Backend Exception

2018-10-25 Thread Andrey Zagrebin
Hi Ning,

The problem here first of all is that RocksDB java JNI client diverged from 
RocksDB cpp code in status.h,
as mentioned in the Flink issue you refer to.

Flink 1.6 uses RocksDB 5.7.5 java client. 
The JNI code there misses these status subcodes:
kNoSpace = 4,
kDeadlock = 5,
kStaleFile = 6,
kMemoryLimit = 7
which could be potential problems in the job.

kNoSpace is only one of them.
Another probable cause could be kStaleFile, some file system IO problem.
kDeadlock seems to be used only with transactions, so not relevant.
kMemoryLimit means that write batch exceeded max size, but we do not have limit 
for it as I understand.

It would be easier to debug if RocksDB JNI client would at least log the 
unknown subcode but i do not see any easy way to log it in the current version, 
without rebuilding RocksDB and subsequently Flink.

In master branch, java Status and status.h are also unsynced. You could report 
this issue in RocksDB repo, along with extending exception logging message with 
the number of unknown error code. Flink community plans to upgrade to the 
latest RocksDB version again in one of the next Flink releases.

Best,
Andrey

> On 25 Oct 2018, at 04:31, Ning Shi  wrote:
> 
> Hi,
> 
> We are doing some performance testing on a 12 node cluster with 8 task
> slots per TM. Every 15 minutes or so, the job would run into the
> following exception.
> 
> java.lang.IllegalArgumentException: Illegal value provided for SubCode.
>   at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>   at org.rocksdb.Status.(Status.java:30)
>   at org.rocksdb.RocksDB.put(Native Method)
>   at org.rocksdb.RocksDB.put(RocksDB.java:511)
>   at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:80)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:99)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> 
> I saw an outstanding issue with similar exception in [1]. The ticket
> description suggests that it was due to out of disk error, but in our
> case, we have plenty of disk left on all TMs.
> 
> Has anyone run into this before? If so, is there a fix or workaround?
> 
> Thanks,
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9233
> 
> --
> Ning



Re: Java Table API and external catalog bug?

2018-10-25 Thread Fabian Hueske
IIRC, that was recently fixed.
Might come out with 1.6.2 / 1.7.0.

Cheers, Fabian


Flavio Pompermaier  schrieb am Do., 25. Okt. 2018,
14:09:

> Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;)
> On Thu, Oct 25, 2018 at 2:00 PM Timo Walther  wrote:
>
>> Hi Flavio,
>>
>> the external catalog support is not feature complete yet. I think you can
>> only specify the catalog when reading from a table but `insertInto` does
>> not consider the catalog name.
>>
>> Regards,
>> TImo
>>
>>
>> Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
>>
>> Any other help here? is this a bug or something wrong in my code?
>>
>> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier 
>> wrote:
>>
>>> I've tried with t2, test.t2 and test.test.t2.
>>>
>>> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, 
>>> wrote:
>>>
 Have you tried "t2" instead of "test.t2"? There is a possibility that
 catalog name isn't part of the table name in the table API.

 Thanks,
 Xuefu

 --
 Sender:Flavio Pompermaier 
 Sent at:2018 Oct 22 (Mon) 23:06
 Recipient:user 
 Subject:Java Table API and external catalog bug?

 Hi to all,
 I've tried to register an external catalog and use it with the Table
 API in Flink 1.6.1.
 The following (Java) test job cannot write to a sink using insertInto
 because Flink cannot find the table by id (test.t2). Am I doing something
 wrong or is this a bug?

 This is my Java test class:

 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.catalog.ExternalCatalogTable;
 import org.apache.flink.table.catalog.InMemoryExternalCatalog;
 import org.apache.flink.table.descriptors.Csv;
 import org.apache.flink.table.descriptors.FileSystem;
 import org.apache.flink.table.descriptors.FormatDescriptor;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.sinks.CsvTableSink;

 public class CatalogExperiment {
   public static void main(String[] args) throws Exception {
 // create an external catalog
 final String outPath = "file:/tmp/file2.txt";
 InMemoryExternalCatalog catalog = new
 InMemoryExternalCatalog("test");
 FileSystem connDescIn = new FileSystem().path(
 "file:/tmp/file-test.txt");
 FileSystem connDescOut = new FileSystem().path(outPath);
 FormatDescriptor csvDesc = new Csv()//
 .field("a", "string")//
 .field("b", "string")//
 .field("c", "string")//
 .fieldDelimiter("\t");
 Schema schemaDesc = new Schema()//
 .field("a", "string")//
 .field("b", "string")//
 .field("c", "string");
 ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
 .withFormat(csvDesc)//
 .withSchema(schemaDesc)//
 .asTableSource();
 ExternalCatalogTable t2 =
 ExternalCatalogTable.builder(connDescOut)//
 .withFormat(csvDesc)//
 .withSchema(schemaDesc)//
 .asTableSink();
 catalog.createTable("t1", t1, true);
 catalog.createTable("t2", t2, true);

 final  ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 final BatchTableEnvironment btEnv =
 TableEnvironment.getTableEnvironment(env);
 btEnv.registerExternalCatalog("test", catalog);
 // this does not work ---
 btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table
 was registered under the name test.t2
 // this works ---
 btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath,
 "\t", 1, WriteMode.OVERWRITE));
 env.execute();
   }
 }


 Best,
 Flavio


>>
>>
>


Re: Java Table API and external catalog bug?

2018-10-25 Thread Flavio Pompermaier
Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;)
On Thu, Oct 25, 2018 at 2:00 PM Timo Walther  wrote:

> Hi Flavio,
>
> the external catalog support is not feature complete yet. I think you can
> only specify the catalog when reading from a table but `insertInto` does
> not consider the catalog name.
>
> Regards,
> TImo
>
>
> Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
>
> Any other help here? is this a bug or something wrong in my code?
>
> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier 
> wrote:
>
>> I've tried with t2, test.t2 and test.test.t2.
>>
>> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu,  wrote:
>>
>>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>>> catalog name isn't part of the table name in the table API.
>>>
>>> Thanks,
>>> Xuefu
>>>
>>> --
>>> Sender:Flavio Pompermaier 
>>> Sent at:2018 Oct 22 (Mon) 23:06
>>> Recipient:user 
>>> Subject:Java Table API and external catalog bug?
>>>
>>> Hi to all,
>>> I've tried to register an external catalog and use it with the Table API
>>> in Flink 1.6.1.
>>> The following (Java) test job cannot write to a sink using insertInto
>>> because Flink cannot find the table by id (test.t2). Am I doing something
>>> wrong or is this a bug?
>>>
>>> This is my Java test class:
>>>
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>>> import org.apache.flink.table.api.TableEnvironment;
>>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>>> import org.apache.flink.table.descriptors.Csv;
>>> import org.apache.flink.table.descriptors.FileSystem;
>>> import org.apache.flink.table.descriptors.FormatDescriptor;
>>> import org.apache.flink.table.descriptors.Schema;
>>> import org.apache.flink.table.sinks.CsvTableSink;
>>>
>>> public class CatalogExperiment {
>>>   public static void main(String[] args) throws Exception {
>>> // create an external catalog
>>> final String outPath = "file:/tmp/file2.txt";
>>> InMemoryExternalCatalog catalog = new
>>> InMemoryExternalCatalog("test");
>>> FileSystem connDescIn = new FileSystem().path(
>>> "file:/tmp/file-test.txt");
>>> FileSystem connDescOut = new FileSystem().path(outPath);
>>> FormatDescriptor csvDesc = new Csv()//
>>> .field("a", "string")//
>>> .field("b", "string")//
>>> .field("c", "string")//
>>> .fieldDelimiter("\t");
>>> Schema schemaDesc = new Schema()//
>>> .field("a", "string")//
>>> .field("b", "string")//
>>> .field("c", "string");
>>> ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>>> .withFormat(csvDesc)//
>>> .withSchema(schemaDesc)//
>>> .asTableSource();
>>> ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>>> .withFormat(csvDesc)//
>>> .withSchema(schemaDesc)//
>>> .asTableSink();
>>> catalog.createTable("t1", t1, true);
>>> catalog.createTable("t2", t2, true);
>>>
>>> final  ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> final BatchTableEnvironment btEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>> btEnv.registerExternalCatalog("test", catalog);
>>> // this does not work ---
>>> btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table
>>> was registered under the name test.t2
>>> // this works ---
>>> btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
>>> 1, WriteMode.OVERWRITE));
>>> env.execute();
>>>   }
>>> }
>>>
>>>
>>> Best,
>>> Flavio
>>>
>>>
>
>


Re: BucketingSink capabilities for DataSet API

2018-10-25 Thread Andrey Zagrebin
Hi Rafi,

At the moment I do not see any support of Parquet in DataSet API except 
HadoopOutputFormat, mentioned in stack overflow question. I have cc’ed Fabian 
and Aljoscha, maybe they could provide more information.

Best,
Andrey

> On 25 Oct 2018, at 13:08, Rafi Aroch  wrote:
> 
> Hi,
> 
> I'm writing a Batch job which reads Parquet, does some aggregations and 
> writes back as Parquet files.
> I would like the output to be partitioned by year, month, day by event time. 
> Similarly to the functionality of the BucketingSink.
> 
> I was able to achieve the reading/writing to/from Parquet by using the 
> hadoop-compatibility features.
> I couldn't find a way to partition the data by year, month, day to create a 
> folder hierarchy accordingly. Everything is written to a single directory.
> 
> I could find an unanswered question about this issue: 
> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>  
> 
> 
> Can anyone suggest a way to achieve this? Maybe there's a way to integrate 
> the BucketingSink with the DataSet API? Another solution?
> 
> Rafi



Re: Java Table API and external catalog bug?

2018-10-25 Thread Timo Walther

Hi Flavio,

the external catalog support is not feature complete yet. I think you 
can only specify the catalog when reading from a table but `insertInto` 
does not consider the catalog name.


Regards,
TImo


Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:

Any other help here? is this a bug or something wrong in my code?

On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


I've tried with t2, test.t2 and test.test.t2.

On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, mailto:xuef...@alibaba-inc.com>> wrote:

Have you tried "t2" instead of "test.t2"? There is a
possibility that catalog name isn't part of the table name in
the table API.

Thanks,
Xuefu

--
Sender:Flavio Pompermaier mailto:pomperma...@okkam.it>>
Sent at:2018 Oct 22 (Mon) 23:06
Recipient:user mailto:user@flink.apache.org>>
Subject:Java Table API and external catalog bug?

Hi to all,
I've tried to register an external catalog and use it with
the Table API in Flink 1.6.1.
The following (Java) test job cannot write to a sink using
insertInto because Flink cannot find the table by id
(test.t2). Am I doing something wrong or is this a bug?

This is my Java test class:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.InMemoryExternalCatalog;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;

public class CatalogExperiment {
  public static void main(String[] args) throws Exception {
    // create an external catalog
    final String outPath = "file:/tmp/file2.txt";
    InMemoryExternalCatalog catalog = new
InMemoryExternalCatalog("test");
    FileSystem connDescIn = new
FileSystem().path("file:/tmp/file-test.txt");
    FileSystem connDescOut = new FileSystem().path(outPath);
    FormatDescriptor csvDesc = new Csv()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string")//
        .fieldDelimiter("\t");
    Schema schemaDesc = new Schema()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string");
    ExternalCatalogTable t1 =
ExternalCatalogTable.builder(connDescIn)//
        .withFormat(csvDesc)//
.withSchema(schemaDesc)//
        .asTableSource();
    ExternalCatalogTable t2 =
ExternalCatalogTable.builder(connDescOut)//
        .withFormat(csvDesc)//
.withSchema(schemaDesc)//
        .asTableSink();
    catalog.createTable("t1", t1, true);
    catalog.createTable("t2", t2, true);

    final  ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv =
TableEnvironment.getTableEnvironment(env);
btEnv.registerExternalCatalog("test", catalog);
    // this does not work
---
    btEnv.scan("test", "t1").insertInto("test.t2");
//ERROR: No table was registered under the name test.t2
    // this works ---
    btEnv.scan("test", "t1").writeToSink(new
CsvTableSink(outPath, "\t", 1, WriteMode.OVERWRITE));
    env.execute();
  }
}


Best,
Flavio






Re: Checkpoint acknowledge takes too long

2018-10-25 Thread 徐涛
Hi Hequn & Kien,
Finally the problem is solved.
It is due to slow sink write. Because the job only have 2 tasks, I 
check the backpressure, found that the source has high backpressure, so I tried 
to improve the sink write. After that the end to end duration is below 1s and 
the checkpoint timeout is fixed.

Best
Henry


> 在 2018年10月24日,下午10:43,徐涛  写道:
> 
> Hequn & Kien,
>   Thanks a lot for your help, I will try it later.
> 
> Best
> Henry
> 
> 
>> 在 2018年10月24日,下午8:18,Hequn Cheng > > 写道:
>> 
>> Hi Henry,
>> 
>> @Kien is right. Take a thread dump to see what was doing in the TaskManager. 
>> Also check whether gc happens frequently.
>> 
>> Best, Hequn
>>  
>> 
>> On Wed, Oct 24, 2018 at 5:03 PM 徐涛 > > wrote:
>> Hi 
>> I am running a flink application with parallelism 64, I left the 
>> checkpoint timeout default value, which is 10minutes, the state size is less 
>> than 1MB, I am using the FsStateBackend.
>> The application triggers some checkpoints but all of them fails due 
>> to "Checkpoint expired before completing”, I check the checkpoint history, 
>> found that there are 63 subtask acknowledge, but one left n/a, and also the 
>> alignment duration is quite long, about 5m27s.
>> I want to know why there is one subtask does not acknowledge? And 
>> because the alignment duration is long, what will influent the alignment 
>> duration?
>> Thank a lot.
>> 
>> Best
>> Henry
> 



BucketingSink capabilities for DataSet API

2018-10-25 Thread Rafi Aroch
Hi,

I'm writing a Batch job which reads Parquet, does some aggregations and
writes back as Parquet files.
I would like the output to be partitioned by year, month, day by event
time. Similarly to the functionality of the BucketingSink.

I was able to achieve the reading/writing to/from Parquet by using the
hadoop-compatibility features.
I couldn't find a way to partition the data by year, month, day to create a
folder hierarchy accordingly. Everything is written to a single directory.

I could find an unanswered question about this issue:
https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit

Can anyone suggest a way to achieve this? Maybe there's a way to integrate
the BucketingSink with the DataSet API? Another solution?

Rafi


Re: HighAvailability :: FsNegativeRunningJobsRegistry

2018-10-25 Thread Chesnay Schepler
The release process for 1.6.2 is currently ongoing and will hopefully be 
finished within the next days.

In the mean-time you could use 1.6.2-rc1 artifacts:
binaries: https://dist.apache.org/repos/dist/dev/flink/flink-1.6.2/
maven: 
https://repository.apache.org/content/repositories/orgapacheflink-1186


On 25.10.2018 12:36, Mikhail Pryakhin wrote:

Hi all,
I’ve realised that the feature I requested information about hasn’t 
been released yet.
Could you please reveal when approximately the release-1.6.2-rc1 
 is 
going to be rolled out?


Thank you.

Kind Regards,
Mike Pryakhin

On 24 Oct 2018, at 16:12, Mikhail Pryakhin > wrote:


Hi guys,
I'm trying to substitute Zookeeper-based HA registry with YARN-based 
HA registry. (The idea was taken from the issue 
https://issues.apache.org/jira/browse/FLINK-5254)
In Flink 1.6.1, there exists an 
org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry 
which claims to be tailored towards YRN deployment mode. I've looked 
through org.apache.flink.configuration.HighAvailabilityOptions in 
order to figure out how to enable YARN-based HA registry but haven't 
found anything about it. The Flink documentation mentions nothing 
about it either.


Do I miss something? Is there a way to use this exact registry for 
YARN deployments?


Thank you in advance.

Kind Regards,
Mike Pryakhin







Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
Hi Gordon,
Following is the stack trace that I'm getting:

*Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Failed request 0.*
* Caused by: java.lang.RuntimeException: Failed request 0.*
* Caused by: java.lang.RuntimeException: Error while processing request
with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
unregistered class ID: -985346241*
*Serialization trace:*
*$outer (scala.collection.convert.Wrappers$SeqWrapper)*
* at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
* at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
* at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
* at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
* at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
* at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
* at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
* at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
* at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
* at
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
* at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
* at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
* at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
* at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
* at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
* at java.lang.Thread.run(Thread.java:748)*

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:

> Hi  Jayant,
>
>   There should be a Serializer parameter in the constructor of the
> StateDescriptor, you should create a new serializer like this:
>
>
>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>
>
>  By the way, can you show us your kryo exception like what Gordon said?
>
>
> Jiayi Liao, Best
>
>
>
>  Original Message
> *Sender:* Tzu-Li (Gordon) Tai
> *Recipient:* Jayant Ameta; bupt_ljy >
> *Cc:* user
> *Date:* Thursday, Oct 25, 2018 17:18
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Hi Jayant,
>
> What is the Kryo exception message that you are getting?
>
> Cheers,
> Gordon
>
>
> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com)
> wrote:
>
> Hi,
> I've not configured any serializer in the descriptor. (Neither in flink
> job, nor in state query client).
> Which serializer should I use?
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:
>
>> Hi,
>>
>>It seems that your codes are right. Are you sure that you’re using the
>> same Serializer as the Flink program do? Could you show the serializer in
>> descriptor?
>>
>>
>>
>> Jiayi Liao, Best
>>
>>  Original Message
>> *Sender:* Jayant Ameta
>> *Recipient:* user
>> *Date:* Thursday, Oct 25, 2018 14:17
>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>
>> I get Kyro exception when querying the state.
>>
>> Key: UUID
>> MapState
>>
>> Client code snippet:
>>
>> CompletableFuture> resultFuture =
>> 
>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
>> "rule",
>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>> TypeInformation.of(new TypeHint() {}), descriptor);
>> MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>
>>
>> Any better way to query it?
>>
>>
>> Jayant Ameta
>>
>


Re: HighAvailability :: FsNegativeRunningJobsRegistry

2018-10-25 Thread Mikhail Pryakhin
Hi all, 
I’ve realised that the feature I requested information about hasn’t been 
released yet.
Could you please reveal when approximately the release-1.6.2-rc1 
 is going to be 
rolled out?

Thank you.

Kind Regards,
Mike Pryakhin

> On 24 Oct 2018, at 16:12, Mikhail Pryakhin  wrote:
> 
> Hi guys, 
> I'm trying to substitute Zookeeper-based HA registry with YARN-based HA 
> registry. (The idea was taken from the issue 
> https://issues.apache.org/jira/browse/FLINK-5254 
> )
> In Flink 1.6.1, there exists an 
> org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry which 
> claims to be tailored towards YRN deployment mode. I've looked through 
> org.apache.flink.configuration.HighAvailabilityOptions in order to figure out 
> how to enable YARN-based HA registry but haven't found anything about it. The 
> Flink documentation mentions nothing about it either. 
> 
> Do I miss something? Is there a way to use this exact registry for YARN 
> deployments?
> 
> Thank you in advance.
> 
> Kind Regards,
> Mike Pryakhin
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Hi Jayant,
 There should be a Serializer parameter in the constructor of the 
StateDescriptor, you should create a new serializer like this:


 new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best




Original Message
Sender:Tzu-Li (Gordon) taitzuli...@apache.org
Recipient:Jayant ametawittyam...@gmail.com; bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 17:18
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jayant,


What is the Kryo exception message that you are getting?


Cheers,
Gordon




On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?


Jayant Ameta





On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote:

Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?




Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 14:17
Subject:Queryable state when key is UUID - getting Kyro Exception


I get Kyro exception when querying the state.


Key: UUID
MapStateUUID, String


Client code snippet:


CompletableFutureMapStateUUID, String resultFuture =
 client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
 UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);
MapStateUUID, String mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?




Jayant Ameta

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Tzu-Li (Gordon) Tai
Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:
Hi,
   It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in 
descriptor? 


Jiayi Liao, Best

 Original Message 
Sender: Jayant Ameta
Recipient: user
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState

Client code snippet:

CompletableFuture> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint() {}), descriptor);
MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
Hi,
I've not configured any serializer in the descriptor. (Neither in flink
job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:

> Hi,
>
>It seems that your codes are right. Are you sure that you’re using the
> same Serializer as the Flink program do? Could you show the serializer in
> descriptor?
>
>
>
> Jiayi Liao, Best
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* user
> *Date:* Thursday, Oct 25, 2018 14:17
> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>
> I get Kyro exception when querying the state.
>
> Key: UUID
> MapState
>
> Client code snippet:
>
> CompletableFuture> resultFuture =
> 
> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
> "rule",
> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
> TypeInformation.of(new TypeHint() {}), descriptor);
> MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);
>
>
> Any better way to query it?
>
>
> Jayant Ameta
>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?




Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 14:17
Subject:Queryable state when key is UUID - getting Kyro Exception


I get Kyro exception when querying the state.


Key: UUID
MapStateUUID, String


Client code snippet:


CompletableFutureMapStateUUID, String resultFuture =
 client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
 UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);
MapStateUUID, String mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?




Jayant Ameta

Re: Java Table API and external catalog bug?

2018-10-25 Thread Flavio Pompermaier
Any other help here? is this a bug or something wrong in my code?

On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier 
wrote:

> I've tried with t2, test.t2 and test.test.t2.
>
> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu,  wrote:
>
>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>> catalog name isn't part of the table name in the table API.
>>
>> Thanks,
>> Xuefu
>>
>> --
>> Sender:Flavio Pompermaier 
>> Sent at:2018 Oct 22 (Mon) 23:06
>> Recipient:user 
>> Subject:Java Table API and external catalog bug?
>>
>> Hi to all,
>> I've tried to register an external catalog and use it with the Table API
>> in Flink 1.6.1.
>> The following (Java) test job cannot write to a sink using insertInto
>> because Flink cannot find the table by id (test.t2). Am I doing something
>> wrong or is this a bug?
>>
>> This is my Java test class:
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>> import org.apache.flink.table.api.TableEnvironment;
>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>> import org.apache.flink.table.descriptors.Csv;
>> import org.apache.flink.table.descriptors.FileSystem;
>> import org.apache.flink.table.descriptors.FormatDescriptor;
>> import org.apache.flink.table.descriptors.Schema;
>> import org.apache.flink.table.sinks.CsvTableSink;
>>
>> public class CatalogExperiment {
>>   public static void main(String[] args) throws Exception {
>> // create an external catalog
>> final String outPath = "file:/tmp/file2.txt";
>> InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
>> FileSystem connDescIn = new
>> FileSystem().path("file:/tmp/file-test.txt");
>> FileSystem connDescOut = new FileSystem().path(outPath);
>> FormatDescriptor csvDesc = new Csv()//
>> .field("a", "string")//
>> .field("b", "string")//
>> .field("c", "string")//
>> .fieldDelimiter("\t");
>> Schema schemaDesc = new Schema()//
>> .field("a", "string")//
>> .field("b", "string")//
>> .field("c", "string");
>> ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>> .withFormat(csvDesc)//
>> .withSchema(schemaDesc)//
>> .asTableSource();
>> ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>> .withFormat(csvDesc)//
>> .withSchema(schemaDesc)//
>> .asTableSink();
>> catalog.createTable("t1", t1, true);
>> catalog.createTable("t2", t2, true);
>>
>> final  ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> final BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>> btEnv.registerExternalCatalog("test", catalog);
>> // this does not work ---
>> btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was
>> registered under the name test.t2
>> // this works ---
>> btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
>> 1, WriteMode.OVERWRITE));
>> env.execute();
>>   }
>> }
>>
>>
>> Best,
>> Flavio
>>
>>


Re: Task manager count goes the expand then converge process when running flink on YARN

2018-10-25 Thread Till Rohrmann
Hi Henry,

since version 1.5 you don't need to specify the number of TaskManagers to
start, because the system will figure this out. Moreover, in version 1.5.x
and 1.6.x it is recommended to set the number of slots per TaskManager to 1
since we did not support multi task slot TaskManagers properly. The problem
was that we start for every incoming slot request a separate TaskManager
even though there might still be some free slots left. This has been fixed
by FLINK-9455 [1]. The fix will be released with the upcoming next major
Flink release 1.7.

[1] https://issues.apache.org/jira/browse/FLINK-9455

Cheers,
Till

On Thu, Oct 25, 2018 at 5:58 AM vino yang  wrote:

> Hi Henry,
>
> The phenomenon you expressed is there, this is a bug, but I can't remember
> its JIRA number.
>
> Thanks, vino.
>
> 徐涛  于2018年10月24日周三 下午11:27写道:
>
>> Hi experts
>> I am running flink job on YARN in job cluster mode, the job is divided
>> into 2 tasks, the following are some configs of the job:
>> parallelism.default => 16
>> taskmanager.numberOfTaskSlots => 8
>> -yn => 2
>>
>> when the program starts, I found that the count of task managers is not
>> set immediately, but first expand then converge, I record the number during
>> the process:
>> Task Managers Task Slots Available Task Slots
>> 1. 14  10488
>> 2. 15 120104
>> 3. 16 128112
>> 4. 6   48  32
>> 5. 3   24  8
>> 6. 2   16  0
>>
>> The final state is correct. There are 2 tasks, 32 subtask in total, due
>> to slot sharing, only 16 slots are enough, the number of task slots per TM
>> are 8, so 2 TMs are needed.
>> I have the following question:
>> *Because I specify yn=2, why does not directly allocate 2 TMs, but goes
>> the expand then converge process?  Why does it apply 16 task managers at
>> most? If it is not a must, how to avoid it?*
>>
>> Thanks a lot!
>>
>> Best
>> Henry
>>
>


Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
I get Kyro exception when querying the state.

Key: UUID
MapState

Client code snippet:

CompletableFuture> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"),
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint() {}), descriptor);
MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?


Jayant Ameta