Re: [Flink-9407] Question about proposed ORC Sink !

2018-06-27 Thread sagar loke
Thanks @zhangminglei and @Fabian for confirming.

Even I looked at the ORC parsing code and it seems that using  type
is mandatory for now.

Thanks,
Sagar

On Wed, Jun 27, 2018 at 12:59 AM, Fabian Hueske  wrote:

> Hi Sagar,
>
> That's more a question for the ORC community, but AFAIK, the top-level
> type is always a struct because it needs to wrap the fields, e.g.,
> struct(name:string, age:int)
>
> Best, Fabian
>
> 2018-06-26 22:38 GMT+02:00 sagar loke :
>
>> @zhangminglei,
>>
>> Question about the schema for ORC format:
>>
>> 1. Does it always need to be of complex type "" ?
>>
>> 2. Or can it be created with individual data types directly ?
>> eg. "name:string, age:int" ?
>>
>>
>> Thanks,
>> Sagar
>>
>> On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838...@163.com>
>> wrote:
>>
>>> Yes, it should be exit. Thanks to Ted Yu. Very exactly!
>>>
>>> Cheers
>>> Zhangminglei
>>>
>>> 在 2018年6月23日,下午12:40,Ted Yu  写道:
>>>
>>> For #1, the word exist should be exit, right ?
>>> Thanks
>>>
>>>  Original message 
>>> From: zhangminglei <18717838...@163.com>
>>> Date: 6/23/18 10:12 AM (GMT+08:00)
>>> To: sagar loke 
>>> Cc: dev , user 
>>> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
>>>
>>> Hi, Sagar.
>>>
>>> 1. It solves the issue partially meaning files which have finished
>>> checkpointing don't show .pending status but the files which were in
>>> progress
>>> when the program exists are still in .pending state.
>>>
>>>
>>> Ans:
>>>
>>> Yea, Make the program exists and in that time if a checkpoint does not
>>> finished will lead the status keeps in .pending state then. Under the
>>> normal circumstances, the programs that running in the production env will
>>> never be stoped or existed if everything is fine.
>>>
>>> 2. Ideally, writer should work with default settings correct ? Meaning
>>> we don't have to explicitly set these parameters to make it work.
>>> Is this assumption correct ?
>>>
>>>
>>> Ans:
>>>
>>> Yes. Writer should work with default settings correct.
>>> Yes. We do not have to explicitly set these parameters to make it work.
>>> Yes. Assumption correct indeed.
>>>
>>> However, you know, flink is a real time streaming framework, so under
>>> normal circumstances,you don't really go to use the default settings when
>>> it comes to a specific business. Especially together work with *offline
>>> end*(Like hadoop mapreduce). In this case, you need to tell the offline
>>> end when time a bucket is close and when time the data for the specify
>>> bucket is ready. So, you can take a look on https://issues.apache.org/j
>>> ira/browse/FLINK-9609.
>>>
>>> Cheers
>>> Zhangminglei
>>>
>>>
>>> 在 2018年6月23日,上午8:23,sagar loke  写道:
>>>
>>> Hi Zhangminglei,
>>>
>>> Thanks for the reply.
>>>
>>> 1. It solves the issue partially meaning files which have finished
>>> checkpointing don't show .pending status but the files which were in
>>> progress
>>> when the program exists are still in .pending state.
>>>
>>> 2. Ideally, writer should work with default settings correct ? Meaning
>>> we don't have to explicitly set these parameters to make it work.
>>> Is this assumption correct ?
>>>
>>> Thanks,
>>> Sagar
>>>
>>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com>
>>> wrote:
>>>
>>>> Hi, Sagar. Please use the below code and you will find the part files
>>>> status from _part-0-107.in-progress   to _part-0-107.pending and
>>>> finally to part-0-107. [For example], you need to run the program for a
>>>> while. However, we need set some parameters, like the following. Moreover,
>>>> *enableCheckpointing* IS also needed. I know why you always see the
>>>> *.pending* file since the below parameters default value is 60 seconds
>>>> even though you set the enableCheckpoint. So, that is why you can not see
>>>> the finished file status until 60 seconds passed.
>>>>
>>>> Attached is the ending on my end, and you will see what you want!
>>>>
>>>> Please let me know if 

Re: [Flink-9407] Question about proposed ORC Sink !

2018-06-27 Thread Fabian Hueske
Hi Sagar,

That's more a question for the ORC community, but AFAIK, the top-level type
is always a struct because it needs to wrap the fields, e.g.,
struct(name:string, age:int)

Best, Fabian

2018-06-26 22:38 GMT+02:00 sagar loke :

> @zhangminglei,
>
> Question about the schema for ORC format:
>
> 1. Does it always need to be of complex type "" ?
>
> 2. Or can it be created with individual data types directly ?
> eg. "name:string, age:int" ?
>
>
> Thanks,
> Sagar
>
> On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838...@163.com>
> wrote:
>
>> Yes, it should be exit. Thanks to Ted Yu. Very exactly!
>>
>> Cheers
>> Zhangminglei
>>
>> 在 2018年6月23日,下午12:40,Ted Yu  写道:
>>
>> For #1, the word exist should be exit, right ?
>> Thanks
>>
>>  Original message ----
>> From: zhangminglei <18717838...@163.com>
>> Date: 6/23/18 10:12 AM (GMT+08:00)
>> To: sagar loke 
>> Cc: dev , user 
>> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
>>
>> Hi, Sagar.
>>
>> 1. It solves the issue partially meaning files which have finished
>> checkpointing don't show .pending status but the files which were in
>> progress
>> when the program exists are still in .pending state.
>>
>>
>> Ans:
>>
>> Yea, Make the program exists and in that time if a checkpoint does not
>> finished will lead the status keeps in .pending state then. Under the
>> normal circumstances, the programs that running in the production env will
>> never be stoped or existed if everything is fine.
>>
>> 2. Ideally, writer should work with default settings correct ? Meaning we
>> don't have to explicitly set these parameters to make it work.
>> Is this assumption correct ?
>>
>>
>> Ans:
>>
>> Yes. Writer should work with default settings correct.
>> Yes. We do not have to explicitly set these parameters to make it work.
>> Yes. Assumption correct indeed.
>>
>> However, you know, flink is a real time streaming framework, so under
>> normal circumstances,you don't really go to use the default settings when
>> it comes to a specific business. Especially together work with *offline
>> end*(Like hadoop mapreduce). In this case, you need to tell the offline
>> end when time a bucket is close and when time the data for the specify
>> bucket is ready. So, you can take a look on https://issues.apache.org/j
>> ira/browse/FLINK-9609.
>>
>> Cheers
>> Zhangminglei
>>
>>
>> 在 2018年6月23日,上午8:23,sagar loke  写道:
>>
>> Hi Zhangminglei,
>>
>> Thanks for the reply.
>>
>> 1. It solves the issue partially meaning files which have finished
>> checkpointing don't show .pending status but the files which were in
>> progress
>> when the program exists are still in .pending state.
>>
>> 2. Ideally, writer should work with default settings correct ? Meaning we
>> don't have to explicitly set these parameters to make it work.
>> Is this assumption correct ?
>>
>> Thanks,
>> Sagar
>>
>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com>
>> wrote:
>>
>>> Hi, Sagar. Please use the below code and you will find the part files
>>> status from _part-0-107.in-progress   to _part-0-107.pending and
>>> finally to part-0-107. [For example], you need to run the program for a
>>> while. However, we need set some parameters, like the following. Moreover,
>>> *enableCheckpointing* IS also needed. I know why you always see the
>>> *.pending* file since the below parameters default value is 60 seconds
>>> even though you set the enableCheckpoint. So, that is why you can not see
>>> the finished file status until 60 seconds passed.
>>>
>>> Attached is the ending on my end, and you will see what you want!
>>>
>>> Please let me know if you still have the problem.
>>>
>>> Cheers
>>> Zhangminglei
>>>
>>> setInactiveBucketCheckInterval(2000)
>>> .setInactiveBucketThreshold(2000);
>>>
>>>
>>> public class TestOrc {
>>>public static void main(String[] args) throws Exception {
>>>   StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>   env.setParallelism(1);
>>>   env.enableCheckpointing(1000);
>>>   env.setStateBackend(new MemoryStateBackend());
>>>
>>>   String orcS

Re: [Flink-9407] Question about proposed ORC Sink !

2018-06-23 Thread zhangminglei
Yes, it should be exit. Thanks to Ted Yu. Very exactly! 

Cheers
Zhangminglei

> 在 2018年6月23日,下午12:40,Ted Yu  写道:
> 
> For #1, the word exist should be exit, right ?
> Thanks
> 
>  Original message 
> From: zhangminglei <18717838...@163.com>
> Date: 6/23/18 10:12 AM (GMT+08:00)
> To: sagar loke 
> Cc: dev , user 
> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
> 
> Hi, Sagar.
> 
>> 1. It solves the issue partially meaning files which have finished 
>> checkpointing don't show .pending status but the files which were in 
>> progress 
>> when the program exists are still in .pending state.
> 
> Ans: 
> 
> Yea, Make the program exists and in that time if a checkpoint does not 
> finished will lead the status keeps in .pending state then. Under the normal 
> circumstances, the programs that running in the production env will never be 
> stoped or existed if everything is fine.
> 
>> 2. Ideally, writer should work with default settings correct ? Meaning we 
>> don't have to explicitly set these parameters to make it work. 
>> Is this assumption correct ?
> 
> Ans: 
> 
> Yes. Writer should work with default settings correct.
> Yes. We do not have to explicitly set these parameters to make it work.
> Yes. Assumption correct indeed.
> 
> However, you know, flink is a real time streaming framework, so under normal 
> circumstances,you don't really go to use the default settings when it comes 
> to a specific business. Especially together work with offline end(Like hadoop 
> mapreduce). In this case, you need to tell the offline end when time a bucket 
> is close and when time the data for the specify bucket is ready. So, you can 
> take a look on https://issues.apache.org/jira/browse/FLINK-9609 
> <https://issues.apache.org/jira/browse/FLINK-9609>.
> 
> Cheers
> Zhangminglei
> 
> 
>> 在 2018年6月23日,上午8:23,sagar loke > <mailto:sagar...@gmail.com>> 写道:
>> 
>> Hi Zhangminglei,
>> 
>> Thanks for the reply.
>> 
>> 1. It solves the issue partially meaning files which have finished 
>> checkpointing don't show .pending status but the files which were in 
>> progress 
>> when the program exists are still in .pending state.
>> 
>> 2. Ideally, writer should work with default settings correct ? Meaning we 
>> don't have to explicitly set these parameters to make it work. 
>> Is this assumption correct ?
>> 
>> Thanks,
>> Sagar
>> 
>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com 
>> <mailto:18717838...@163.com>> wrote:
>> Hi, Sagar. Please use the below code and you will find the part files status 
>> from _part-0-107.in-progress   <> to _part-0-107.pending and finally to 
>> part-0-107. [For example], you need to run the program for a while. However, 
>> we need set some parameters, like the following. Moreover, 
>> enableCheckpointing IS also needed. I know why you always see the .pending 
>> file since the below parameters default value is 60 seconds even though you 
>> set the enableCheckpoint. So, that is why you can not see the finished file 
>> status until 60 seconds passed.
>> 
>> Attached is the ending on my end, and you will see what you want! 
>> 
>> Please let me know if you still have the problem.
>> 
>> Cheers
>> Zhangminglei
>> 
>> setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>> 
>> public class TestOrc {
>>public static void main(String[] args) throws Exception {
>>   StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>   env.setParallelism(1);
>>   env.enableCheckpointing(1000);
>>   env.setStateBackend(new MemoryStateBackend());
>> 
>>   String orcSchemaString = "struct";
>>   String path = "hdfs://10.199.196.0:9000/data/hive/man <>";
>> 
>>   BucketingSink bucketingSink = new BucketingSink<>(path);
>> 
>>   bucketingSink
>>  .setWriter(new OrcFileWriter<>(orcSchemaString))
>>  .setInactiveBucketCheckInterval(2000)
>>  .setInactiveBucketThreshold(2000);
>> 
>>   DataStream dataStream = env.addSource(new ManGenerator());
>> 
>>   dataStream.addSink(bucketingSink);
>> 
>>   env.execute();
>>}
>> 
>>public static class ManGenerator implements SourceFunction {
>> 
>>   @Override
>>   public void run(SourceContext ctx) throws Exception {
>> 

Re: [Flink-9407] Question about proposed ORC Sink !

2018-06-22 Thread Ted Yu
For #1, the word exist should be exit, right ?Thanks
 Original message From: zhangminglei <18717838...@163.com> 
Date: 6/23/18  10:12 AM  (GMT+08:00) To: sagar loke  Cc: 
dev , user  Subject: Re: 
[Flink-9407] Question about proposed ORC Sink ! 
Hi, Sagar.
1. It solves the issue partially meaning files which have finished 
checkpointing don't show .pending status but the files which were in progress   
  when the program exists are still in .pending state.
Ans: 
Yea, Make the program exists and in that time if a checkpoint does not finished 
will lead the status keeps in .pending state then. Under the normal 
circumstances, the programs that running in the production env will never be 
stoped or existed if everything is fine.
2. Ideally, writer should work with default settings correct ? Meaning we don't 
have to explicitly set these parameters to make it work.     Is this assumption 
correct ?
Ans: 
Yes. Writer should work with default settings correct.Yes. We do not have to 
explicitly set these parameters to make it work.Yes. Assumption correct indeed.
However, you know, flink is a real time streaming framework, so under normal 
circumstances,you don't really go to use the default settings when it comes to 
a specific business. Especially together work with offline end(Like hadoop 
mapreduce). In this case, you need to tell the offline end when time a bucket 
is close and when time the data for the specify bucket is ready. So, you can 
take a look on https://issues.apache.org/jira/browse/FLINK-9609.
CheersZhangminglei

在 2018年6月23日,上午8:23,sagar loke  写道:
Hi Zhangminglei,
Thanks for the reply.
1. It solves the issue partially meaning files which have finished 
checkpointing don't show .pending status but the files which were in progress   
  when the program exists are still in .pending state.
2. Ideally, writer should work with default settings correct ? Meaning we don't 
have to explicitly set these parameters to make it work.     Is this assumption 
correct ?
Thanks,Sagar
On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com> wrote:
Hi, Sagar. Please use the below code and you will find the part files status 
from _part-0-107.in-progress   to _part-0-107.pending and finally to 
part-0-107. [For example], you need to run the program for a while. However, we 
need set some parameters, like the following. Moreover, enableCheckpointing IS 
also needed. I know why you always see the .pending file since the below 
parameters default value is 60 seconds even though you set the 
enableCheckpoint. So, that is why you can not see the finished file status 
until 60 seconds passed.
Attached is the ending on my end, and you will see what you want! 
Please let me know if you still have the problem.
CheersZhangminglei
setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);
public class TestOrc {
   public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  env.enableCheckpointing(1000);
  env.setStateBackend(new MemoryStateBackend());

  String orcSchemaString = "struct";
  String path = "hdfs://10.199.196.0:9000/data/hive/man";

  BucketingSink bucketingSink = new BucketingSink<>(path);

  bucketingSink
 .setWriter(new OrcFileWriter<>(orcSchemaString))
 .setInactiveBucketCheckInterval(2000)
 .setInactiveBucketThreshold(2000);

  DataStream dataStream = env.addSource(new ManGenerator());

  dataStream.addSink(bucketingSink);

  env.execute();
   }

   public static class ManGenerator implements SourceFunction {

  @Override
  public void run(SourceContext ctx) throws Exception {
 for (int i = 0; i < 2147483000; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
 }
  }

  @Override
  public void cancel() {

  }
   }
}


在 2018年6月22日,上午11:14,sagar loke  写道:
Sure, we can solve it together :)
Are you able to reproduce it ?
Thanks,Sagar
On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com> wrote:
Sagar, flush will be called when do a checkpoint. Please see 
bucketState.currentFileValidLength = bucketState.writer.flush();

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
   Preconditions.checkNotNull(restoredBucketStates, "The operator has not been 
properly initialized.");

   restoredBucketStates.clear();

   synchronized (state.bucketStates) {
  int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

  for (Map.Entry> bucketStateEntry : 
state.bucketStates.entrySet()) {
 BucketState bucketState = bucketStateEntry.getValue();

 if (bucketState.isWriterOpen)

Re: [Flink-9407] Question about proposed ORC Sink !

2018-06-22 Thread zhangminglei
Hi, Sagar.

> 1. It solves the issue partially meaning files which have finished 
> checkpointing don't show .pending status but the files which were in progress 
> when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished 
will lead the status keeps in .pending state then. Under the normal 
circumstances, the programs that running in the production env will never be 
stoped or existed if everything is fine.

> 2. Ideally, writer should work with default settings correct ? Meaning we 
> don't have to explicitly set these parameters to make it work. 
> Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal 
circumstances,you don't really go to use the default settings when it comes to 
a specific business. Especially together work with offline end(Like hadoop 
mapreduce). In this case, you need to tell the offline end when time a bucket 
is close and when time the data for the specify bucket is ready. So, you can 
take a look on https://issues.apache.org/jira/browse/FLINK-9609 
.

Cheers
Zhangminglei


> 在 2018年6月23日,上午8:23,sagar loke  写道:
> 
> Hi Zhangminglei,
> 
> Thanks for the reply.
> 
> 1. It solves the issue partially meaning files which have finished 
> checkpointing don't show .pending status but the files which were in progress 
> when the program exists are still in .pending state.
> 
> 2. Ideally, writer should work with default settings correct ? Meaning we 
> don't have to explicitly set these parameters to make it work. 
> Is this assumption correct ?
> 
> Thanks,
> Sagar
> 
> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com 
> > wrote:
> Hi, Sagar. Please use the below code and you will find the part files status 
> from _part-0-107.in-progress   <> to _part-0-107.pending and finally to 
> part-0-107. [For example], you need to run the program for a while. However, 
> we need set some parameters, like the following. Moreover, 
> enableCheckpointing IS also needed. I know why you always see the .pending 
> file since the below parameters default value is 60 seconds even though you 
> set the enableCheckpoint. So, that is why you can not see the finished file 
> status until 60 seconds passed.
> 
> Attached is the ending on my end, and you will see what you want! 
> 
> Please let me know if you still have the problem.
> 
> Cheers
> Zhangminglei
> 
> setInactiveBucketCheckInterval(2000)
> .setInactiveBucketThreshold(2000);
> 
> public class TestOrc {
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   env.enableCheckpointing(1000);
>   env.setStateBackend(new MemoryStateBackend());
> 
>   String orcSchemaString = "struct";
>   String path = "hdfs://10.199.196.0:9000/data/hive/man <>";
> 
>   BucketingSink bucketingSink = new BucketingSink<>(path);
> 
>   bucketingSink
>  .setWriter(new OrcFileWriter<>(orcSchemaString))
>  .setInactiveBucketCheckInterval(2000)
>  .setInactiveBucketThreshold(2000);
> 
>   DataStream dataStream = env.addSource(new ManGenerator());
> 
>   dataStream.addSink(bucketingSink);
> 
>   env.execute();
>}
> 
>public static class ManGenerator implements SourceFunction {
> 
>   @Override
>   public void run(SourceContext ctx) throws Exception {
>  for (int i = 0; i < 2147483000; i++) {
> Row row = new Row(3);
> row.setField(0, "Sagar");
> row.setField(1, 26 + i);
> row.setField(2, false);
> ctx.collect(row);
>  }
>   }
> 
>   @Override
>   public void cancel() {
> 
>   }
>}
> }
> 
> 
> 
> 
>> 在 2018年6月22日,上午11:14,sagar loke > > 写道:
>> 
>> Sure, we can solve it together :)
>> 
>> Are you able to reproduce it ?
>> 
>> Thanks,
>> Sagar
>> 
>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com 
>> > wrote:
>> Sagar, flush will be called when do a checkpoint. Please see 
>> 
>> bucketState.currentFileValidLength = bucketState.writer.flush();
>> 
>> 
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>>Preconditions.checkNotNull(restoredBucketStates, "The operator has not 
>> been properly initialized.");
>> 
>>restoredBucketStates.clear();
>> 
>>synchronized (state.bucketStates) {
>>   int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>> 
>>   for (Map.Entry> bucketStateEntry : 
>> state.bucketStates.entrySet()) {
>>