@zhangminglei,
Question about the schema for ORC format:
1. Does it always need to be of complex type "<Struct>" ?
2. Or can it be created with individual data types directly ?
eg. "name:string, age:int" ?
Thanks,
Sagar
On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <[email protected]> wrote:
> Yes, it should be exit. Thanks to Ted Yu. Very exactly!
>
> Cheers
> Zhangminglei
>
> 在 2018年6月23日,下午12:40,Ted Yu <[email protected]> 写道:
>
> For #1, the word exist should be exit, right ?
> Thanks
>
> -------- Original message --------
> From: zhangminglei <[email protected]>
> Date: 6/23/18 10:12 AM (GMT+08:00)
> To: sagar loke <[email protected]>
> Cc: dev <[email protected]>, user <[email protected]>
> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
>
> Hi, Sagar.
>
> 1. It solves the issue partially meaning files which have finished
> checkpointing don't show .pending status but the files which were in
> progress
> when the program exists are still in .pending state.
>
>
> Ans:
>
> Yea, Make the program exists and in that time if a checkpoint does not
> finished will lead the status keeps in .pending state then. Under the
> normal circumstances, the programs that running in the production env will
> never be stoped or existed if everything is fine.
>
> 2. Ideally, writer should work with default settings correct ? Meaning we
> don't have to explicitly set these parameters to make it work.
> Is this assumption correct ?
>
>
> Ans:
>
> Yes. Writer should work with default settings correct.
> Yes. We do not have to explicitly set these parameters to make it work.
> Yes. Assumption correct indeed.
>
> However, you know, flink is a real time streaming framework, so under
> normal circumstances,you don't really go to use the default settings when
> it comes to a specific business. Especially together work with *offline
> end*(Like hadoop mapreduce). In this case, you need to tell the offline
> end when time a bucket is close and when time the data for the specify
> bucket is ready. So, you can take a look on https://issues.apache.org/
> jira/browse/FLINK-9609.
>
> Cheers
> Zhangminglei
>
>
> 在 2018年6月23日,上午8:23,sagar loke <[email protected]> 写道:
>
> Hi Zhangminglei,
>
> Thanks for the reply.
>
> 1. It solves the issue partially meaning files which have finished
> checkpointing don't show .pending status but the files which were in
> progress
> when the program exists are still in .pending state.
>
> 2. Ideally, writer should work with default settings correct ? Meaning we
> don't have to explicitly set these parameters to make it work.
> Is this assumption correct ?
>
> Thanks,
> Sagar
>
> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <[email protected]> wrote:
>
>> Hi, Sagar. Please use the below code and you will find the part files
>> status from _part-0-107.in-progress to _part-0-107.pending and finally
>> to part-0-107. [For example], you need to run the program for a while.
>> However, we need set some parameters, like the following. Moreover,
>> *enableCheckpointing* IS also needed. I know why you always see the
>> *.pending* file since the below parameters default value is 60 seconds
>> even though you set the enableCheckpoint. So, that is why you can not see
>> the finished file status until 60 seconds passed.
>>
>> Attached is the ending on my end, and you will see what you want!
>>
>> Please let me know if you still have the problem.
>>
>> Cheers
>> Zhangminglei
>>
>> setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>>
>>
>> public class TestOrc {
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>> env.enableCheckpointing(1000);
>> env.setStateBackend(new MemoryStateBackend());
>>
>> String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>> String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>
>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>
>> bucketingSink
>> .setWriter(new OrcFileWriter<>(orcSchemaString))
>> .setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>>
>> DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>
>> dataStream.addSink(bucketingSink);
>>
>> env.execute();
>> }
>>
>> public static class ManGenerator implements SourceFunction<Row> {
>>
>> @Override
>> public void run(SourceContext<Row> ctx) throws Exception {
>> for (int i = 0; i < 2147483000; i++) {
>> Row row = new Row(3);
>> row.setField(0, "Sagar");
>> row.setField(1, 26 + i);
>> row.setField(2, false);
>> ctx.collect(row);
>> }
>> }
>>
>> @Override
>> public void cancel() {
>>
>> }
>> }
>> }
>>
>> <filestatus.jpg>
>>
>>
>>
>> 在 2018年6月22日,上午11:14,sagar loke <[email protected]> 写道:
>>
>> Sure, we can solve it together :)
>>
>> Are you able to reproduce it ?
>>
>> Thanks,
>> Sagar
>>
>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <[email protected]> wrote:
>>
>>> Sagar, flush will be called when do a checkpoint. Please see
>>>
>>> bucketState.currentFileValidLength = bucketState.writer.flush();
>>>
>>>
>>>
>>> @Override
>>> public void snapshotState(FunctionSnapshotContext context) throws Exception
>>> {
>>> Preconditions.checkNotNull(restoredBucketStates, "The operator has not
>>> been properly initialized.");
>>>
>>> restoredBucketStates.clear();
>>>
>>> synchronized (state.bucketStates) {
>>> int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>>>
>>> for (Map.Entry<String, BucketState<T>> bucketStateEntry :
>>> state.bucketStates.entrySet()) {
>>> BucketState<T> bucketState = bucketStateEntry.getValue();
>>>
>>> if (bucketState.isWriterOpen) {
>>> bucketState.currentFileValidLength = bucketState.writer.flush();
>>> }
>>>
>>> synchronized (bucketState.pendingFilesPerCheckpoint) {
>>>
>>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(),
>>> bucketState.pendingFiles);
>>> }
>>> bucketState.pendingFiles = new ArrayList<>();
>>> }
>>> restoredBucketStates.add(state);
>>>
>>> if (LOG.isDebugEnabled()) {
>>> LOG.debug("{} idx {} checkpointed {}.",
>>> getClass().getSimpleName(), subtaskIdx, state);
>>> }
>>> }
>>>
>>>
>>>
>>> 在 2018年6月22日,上午10:21,sagar loke <[email protected]> 写道:
>>>
>>> Thanks for replying.
>>>
>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000.
>>>
>>> env.enablecheckpointing(100);
>>>
>>> But in all the cases, I still see .pending state.
>>>
>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which
>>> might not be getting called somehow ?
>>>
>>> Thanks,
>>> Sagar
>>>
>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <[email protected]>
>>> wrote:
>>>
>>>> Hi,Sagar
>>>>
>>>> Please take a look at BucketingSink, It says that a file would keep
>>>> .pending status if you DO NOT do a checkpoint. Doc says, when a checkpoint
>>>> is successful the currently pending file will be removed to {@code
>>>> finished}.
>>>> Take a try again. I think you should call the below method and see what
>>>> would happen on it. Anyway, I will also try that and see whether it works.
>>>> Please let me know if you still meet error.
>>>>
>>>> env.enableCheckpointing(200);
>>>>
>>>> /**
>>>> * The suffix for {@code pending} part files. These are closed files that
>>>> we are
>>>> * not currently writing to (inactive or reached {@link #batchSize}), but
>>>> which
>>>> * were not yet confirmed by a checkpoint.
>>>> */
>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending";
>>>>
>>>> <p>Part files can be in one of three states: {@code in-progress}, {@code
>>>> pending} or {@code finished}.
>>>> * The reason for this is how the sink works together with the
>>>> checkpointing mechanism to provide exactly-once
>>>> * semantics and fault-tolerance. The part file that is currently being
>>>> written to is {@code in-progress}. Once
>>>> * a part file is closed for writing it becomes {@code pending}. When a
>>>> checkpoint is successful the currently
>>>> * pending files will be moved to {@code finished}.
>>>>
>>>>
>>>> Cheers
>>>> Zhangminglei
>>>>
>>>>
>>>>
>>>> 在 2018年6月22日,上午4:46,sagar loke <[email protected]> 写道:
>>>>
>>>> Thanks Zhangminglei for quick response.
>>>>
>>>> I tried the above code and I am seeing another issue where the files
>>>> created on hdfs are always in *.pending* state.
>>>>
>>>> Let me know if you can reproduce it ?
>>>>
>>>> Thanks,
>>>> Sagar
>>>>
>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi, Sagar
>>>>>
>>>>> I did a local test for that and it seems works fine for me. PR will be
>>>>> updated for [FLINK-9407]
>>>>>
>>>>> I will update the newest code to PR soon and below is the example I
>>>>> was using for my test. You can check it again. Hopes you can enjoy it!
>>>>>
>>>>> Cheers
>>>>> Zhangminglei.
>>>>>
>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
>>>>> import org.apache.flink.types.Row;
>>>>>
>>>>> public class TestOrc {
>>>>> public static void main(String[] args) throws Exception {
>>>>> StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> env.setParallelism(1);
>>>>>
>>>>> String orcSchemaString =
>>>>> "struct<name:string,age:int,married:boolean>";
>>>>> String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>>>>
>>>>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>>>
>>>>> bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));
>>>>>
>>>>> DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>>>
>>>>> dataStream.addSink(bucketingSink);
>>>>>
>>>>> env.execute();
>>>>> }
>>>>>
>>>>> public static class ManGenerator implements SourceFunction<Row> {
>>>>>
>>>>> @Override
>>>>> public void run(SourceContext<Row> ctx) throws Exception {
>>>>> for (int i = 0; i < 3; i++) {
>>>>> Row row = new Row(3);
>>>>> row.setField(0, "Sagar");
>>>>> row.setField(1, 26 + i);
>>>>> row.setField(2, false);
>>>>> ctx.collect(row);
>>>>> }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void cancel() {
>>>>>
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> 在 2018年6月21日,上午1:47,sagar loke <[email protected]> 写道:
>>>>>
>>>>> Hi Zhangminglei,
>>>>>
>>>>> Question about https://issues.apache.org/jira/browse/FLINK-9407
>>>>>
>>>>> I tried to use the code from PR and run it on local hdfs cluster to
>>>>> write some ORC data.
>>>>>
>>>>> But somehow this code is failing with following error:
>>>>>
>>>>>
>>>>>
>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.
>>>>>> hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE
>>>>>> /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for
>>>>>> DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file
>>>>>> lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on
>>>>>> 127.0.0.1
>>>>>
>>>>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverL
>>>>>> easeInternal(FSNamesystem.java:2500)
>>>>>
>>>>>
>>>>> I understand that this error is related to Hadoop but somehow I get
>>>>> this error only when executing the code from this PR.
>>>>>
>>>>> I had created very crude way to write ORC file to HDFS as per follows.
>>>>> Below code works alright and does not throw above error.
>>>>>
>>>>> import org.apache.flink.streaming.connectors.fs.Writer;
>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>> import org.apache.hadoop.fs.Path;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
>>>>>> import org.apache.orc.OrcFile;
>>>>>> import org.apache.orc.TypeDescription;
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>>
>>>>>> public class FlinkOrcWriterV1<T> implements
>>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> {
>>>>>>
>>>>>> private transient org.apache.orc.Writer orcWriter;
>>>>>> String schema;
>>>>>> TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
>>>>>> String basePath;
>>>>>>
>>>>>> public FlinkOrcWriterV1(String schema) {
>>>>>> this.schema = schema;
>>>>>> this.typeDescriptionschema = TypeDescription.fromString(schema);
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void open(FileSystem fs, Path path) throws IOException {
>>>>>> Configuration conf = new Configuration();
>>>>>> orcWriter = OrcFile.createWriter(new
>>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/"),
>>>>>> OrcFile.writerOptions(conf)
>>>>>> .setSchema(typeDescriptionschema));
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public long flush() throws IOException {
>>>>>> return orcWriter.writeIntermediateFooter();
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public long getPos() throws IOException {
>>>>>> return orcWriter.getRawDataSize();
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void close() throws IOException {
>>>>>> orcWriter.close();
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void write(T element) throws IOException {
>>>>>> VectorizedRowBatch batch =
>>>>>> typeDescriptionschema.createRowBatch(10);
>>>>>> LongColumnVector x = (LongColumnVector) batch.cols[0];
>>>>>> LongColumnVector y = (LongColumnVector) batch.cols[1];
>>>>>> for(int r=0; r < 10; ++r) {
>>>>>> int row = batch.size++;
>>>>>> x.vector[row] = r;
>>>>>> y.vector[row] = r * 3;
>>>>>> // If the batch is full, write it out and start over.
>>>>>> if (batch.size == batch.getMaxSize()) {
>>>>>> orcWriter.addRowBatch(batch);
>>>>>> batch.reset();
>>>>>> }
>>>>>> }
>>>>>> if (batch.size != 0) {
>>>>>> orcWriter.addRowBatch(batch);
>>>>>> batch.reset();
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public FlinkOrcWriterV1<T> duplicate() {
>>>>>> return new FlinkOrcWriterV1<>(schema);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> Not sure, if the error is related to any of the hadoop dependencies or
>>>>> something else ?
>>>>>
>>>>> Can you please look into it and let me know if you can reproduce it on
>>>>> your end too ?
>>>>>
>>>>> By the way, following are my dependencies in my project:
>>>>>
>>>>> dependencies {
>>>>>>
>>>>>> compile 'org.apache.flink:flink-java:1.4.2'
>>>>>>
>>>>>> compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
>>>>>>
>>>>>> compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
>>>>>>
>>>>>> compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
>>>>>>
>>>>>> compile 'org.apache.flink:flink-connec
>>>>>>> tor-elasticsearch5_2.11:1.4.2'
>>>>>>
>>>>>> compile 'io.confluent:kafka-avro-serializer:3.3.0'
>>>>>>
>>>>>> compile 'org.apache.flink:flink-avro:1.4.2'
>>>>>>
>>>>>> compile group: 'org.apache.kafka', name: 'kafka_2.11', version:
>>>>>>> '1.1.0'
>>>>>>
>>>>>> compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2',
>>>>>>> version: '1.4.2'
>>>>>>
>>>>>> compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
>>>>>>
>>>>>> compile group: 'org.apache.flink', name: 'flink-jdbc', version:
>>>>>>> '1.4.2'
>>>>>>
>>>>>> compile group: 'org.apache.flink', name: 'flink-table_2.11',
>>>>>>> version: '1.4.2'
>>>>>>
>>>>>> compile group: 'org.apache.orc', name: 'orc-core', version:
>>>>>>> '1.5.1'
>>>>>>
>>>>>> compile group: 'org.apache.parquet', name: 'parquet-avro',
>>>>>>> version: '1.10.0'
>>>>>>
>>>>>> compile group: 'org.apache.parquet', name: 'parquet-common',
>>>>>>> version: '1.10.0'
>>>>>>
>>>>>> compile group: 'org.apache.flink', name: 'flink-orc_2.11',
>>>>>>> version: '1.4.2'
>>>>>>
>>>>>> testCompile group: 'junit', name: 'junit', version: '4.12'
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Sagar.
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> SAGAR.
>>>>
>>>>
>>>> --
>>> Cheers,
>>> Sagar
>>>
>>>
>>> --
>> Cheers,
>> Sagar
>>
>>
>>
>
>
> --
> Regards,
> SAGAR.
>
>
>
>
--
Regards,
SAGAR.