[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16713588#comment-16713588
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-445442845
 
 
   This is what I've worked. If you need to fix something, please fix it 
yourself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712533#comment-16712533
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-445173617
 
 
   Hi @kl0u 
   I'm sorry I was late.
   I planed to work this on the weekend. But I think you would better do this.
   What I want was to write files in `SequenceFileFormat` with only `Path`.
   The users do not need to know about that Hadoop configuration. So I added 
utility method as static.
   If you do, I will leave the rest of modifications up to you. Thank you.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712498#comment-16712498
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-445162125
 
 
   Hi @jihyun! 
   Are you planning to work on this? 
   If you prefer I can make the necessary changes myself and then merge. 
   Let me know what you think!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708553#comment-16708553
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238614740
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708536#comment-16708536
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238608508
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708534#comment-16708534
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238608030
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * The wrapper class for serialization of {@link Configuration}.
+ */
+@Internal
+public class SerializableHadoopConfiguration implements Serializable {
 
 Review comment:
   Ok. I'll move into `SequenceFileWriterFactory` class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708532#comment-16708532
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-444053307
 
 
   Sorry. I missed too many Null checking.
   I'll fix it. Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707394#comment-16707394
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238320072
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType = 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707401#comment-16707401
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238320587
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType = 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707399#comment-16707399
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238316378
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link 
SequenceFile.Writer}.
+ *
+ * @param  The type of key written.
+ * @param  The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter 
implements BulkWriter> {
+   private final SequenceFile.Writer writer;
+
+   public SequenceFileWriter(SequenceFile.Writer writer) {
+   this.writer = checkNotNull(writer, "sequenceFileWriter");
 
 Review comment:
   `checkNotNull(writer)` is enough as, if the exception is thrown, the trace 
is going to have the name of the class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707395#comment-16707395
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238320884
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType = 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707388#comment-16707388
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238315346
 
 

 ##
 File path: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
 ##
 @@ -16,7 +16,7 @@
  * limitations under the License.
 
 Review comment:
   I would move that in the `org.apache.flink.streaming.util` of 
`flink-test-utils`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707393#comment-16707393
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238319765
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType = 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707398#comment-16707398
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238318139
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType = 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707390#comment-16707390
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238318386
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType = 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707396#comment-16707396
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238321158
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * The wrapper class for serialization of {@link Configuration}.
+ */
+@Internal
+public class SerializableHadoopConfiguration implements Serializable {
 
 Review comment:
   This class can be package private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707400#comment-16707400
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238317930
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final SerializableHadoopConfiguration serdeHadoopConf;
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf The Hadoop configuration for Sequence File Writer.
+* @param keyClass   The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass) {
+   this(hadoopConf, keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName) {
+   this(hadoopConf, keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param hadoopConf   The Hadoop configuration for Sequence 
File Writer.
+* @param keyClass The class of key to write.
+* @param valueClass   The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType  The type of compression level.
+*/
+   public SequenceFileWriterFactory(Configuration hadoopConf, Class 
keyClass, Class valueClass, String compressionCodecName, 
SequenceFile.CompressionType compressionType) {
+   this.serdeHadoopConf = new 
SerializableHadoopConfiguration(hadoopConf);
+   this.keyClass = keyClass;
 
 Review comment:
   Add `checkNotNull` to all of the parameters.


This is an 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707392#comment-16707392
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238321380
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * The wrapper class for serialization of {@link Configuration}.
+ */
+@Internal
+public class SerializableHadoopConfiguration implements Serializable {
+   private Configuration hadoopConf;
 
 Review comment:
   This can be `transient` so that the reader knows in advance that the field 
is not serialized.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707397#comment-16707397
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238321653
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * The wrapper class for serialization of {@link Configuration}.
+ */
+@Internal
+public class SerializableHadoopConfiguration implements Serializable {
+   private Configuration hadoopConf;
+
+   public SerializableHadoopConfiguration(Configuration hadoopConf) {
+   this.hadoopConf = hadoopConf;
+   }
+
+   public Configuration get() {
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707391#comment-16707391
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238316598
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link 
SequenceFile.Writer}.
+ *
+ * @param  The type of key written.
+ * @param  The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter 
implements BulkWriter> {
+   private final SequenceFile.Writer writer;
+
+   public SequenceFileWriter(SequenceFile.Writer writer) {
+   this.writer = checkNotNull(writer, "sequenceFileWriter");
+   }
+
+   @Override
+   public void addElement(Tuple2 element) throws IOException {
+   writer.append(element.f0, element.f1);
+   }
+
+   @Override
+   public void flush() throws IOException {
+   writer.hflush();
 
 Review comment:
   Here `writer.hsync()` should be called, not `hflush()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707389#comment-16707389
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r238321562
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * The wrapper class for serialization of {@link Configuration}.
+ */
+@Internal
+public class SerializableHadoopConfiguration implements Serializable {
+   private Configuration hadoopConf;
+
+   public SerializableHadoopConfiguration(Configuration hadoopConf) {
 
 Review comment:
   This can be package private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-12-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705743#comment-16705743
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-443413453
 
 
   Hi @kl0u 
   Thanks for your review. I've fixed code based your feedback.
   Please review whether the changes is okay.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704727#comment-16704727
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237857194
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 ##
 @@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param  The type of key to write. It should be writable.
+ * @param  The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory 
implements BulkWriter.Factory> {
+   private static final long serialVersionUID = 1L;
+
+   private final Class keyClass;
+   private final Class valueClass;
+   private final String compressionCodecName;
+   private final SequenceFile.CompressionType compressionType;
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param keyClass The class of key to write.
+* @param valueClass The class of value to write.
+*/
+   public SequenceFileWriterFactory(Class keyClass, Class 
valueClass) {
+   this(keyClass, valueClass, "None", 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param keyClass The class of key to write.
+* @param valueClass The class of value to write.
+* @param compressionCodecName The name of compression codec.
+*/
+   public SequenceFileWriterFactory(Class keyClass, Class 
valueClass, String compressionCodecName) {
+   this(keyClass, valueClass, compressionCodecName, 
SequenceFile.CompressionType.BLOCK);
+   }
+
+   /**
+* Creates a new SequenceFileWriterFactory using the given builder to 
assemble the
+* SequenceFileWriter.
+*
+* @param keyClass The class of key to write.
+* @param valueClass The class of value to write.
+* @param compressionCodecName The name of compression codec.
+* @param compressionType The type of compression level.
+*/
+   public SequenceFileWriterFactory(Class keyClass, Class 
valueClass, String compressionCodecName, SequenceFile.CompressionType 
compressionType) {
+   this.keyClass = keyClass;
+   this.valueClass = valueClass;
+   this.compressionCodecName = compressionCodecName;
+   this.compressionType = compressionType;
+   }
+
+   @Override
+   public BulkWriter> create(FSDataOutputStream out) throws 
IOException {
+   Configuration hadoopConf = 
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
 
 Review comment:
   Here we should not get the configuration this way. 
   
   The configuration of the Sequence File Writer should be done on a per job 
basis, and not for the whole cluster.
   
   I would recommend to pass a hadoop configuration in the constructor and 
given that it is not serializable, we can store it as a byte array. Then, in 
the 

[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702990#comment-16702990
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426973
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-sequencefile
+   flink-sequencefile
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-hadoop-fs
+   ${project.version}
+   provided
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-hadoop-compatibility_2.11
+   ${project.version}
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702988#comment-16702988
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426646
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-sequencefile
+   flink-sequencefile
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-hadoop-fs
+   ${project.version}
+   provided
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
 
 Review comment:
   Replace the hardcoded scala version with `_${scala.binary.version}`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702989#comment-16702989
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237427090
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link 
SequenceFile.Writer}.
+ *
+ * @param  The type of key written.
+ * @param  The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter 
implements BulkWriter> {
+   private final SequenceFile.Writer writer;
+
+   public SequenceFileWriter(SequenceFile.Writer writer) {
+   this.writer = checkNotNull(writer, "sequenceFileWriter");
+   }
+
+   @Override
+   public void addElement(Tuple2 element) throws IOException {
+
 
 Review comment:
   Remove empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702987#comment-16702987
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426910
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-sequencefile
+   flink-sequencefile
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-hadoop-fs
+   ${project.version}
+   provided
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702991#comment-16702991
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426794
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
 
 Review comment:
   Update the version to `1.8-SNAPSHOT`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702992#comment-16702992
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237428128
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/FiniteTestSource.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
 
 Review comment:
   This class seems like a copy from the `flink-parquet` module. It may make 
sense to create a `utils` module to avoid this duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683393#comment-16683393
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-437795189
 
 
   Hi @jihyun ! Currently we are busy with testing the release candidate for 
Flink-1.7. Unfortunately, this PR will have to wait until the release happens. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683275#comment-16683275
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-437769950
 
 
   Hi. Did you check this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656526#comment-16656526
 ] 

ASF GitHub Bot commented on FLINK-10457:


GJL commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-431306529
 
 
   I realized it's better if @kl0u reviews this since he worked on the 
StreamingFileSink.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646275#comment-16646275
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#issuecomment-428912272
 
 
   Hi @jihyun ! We are a bit busy now with the upcoming feature freeze. But I 
will review your PR as soon as possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631503#comment-16631503
 ] 

ASF GitHub Bot commented on FLINK-10457:


jihyun opened a new pull request #6774: [FLINK-10457] Support SequenceFile for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6774
 
 
   
   
   ## What is the purpose of the change
   
   This pull request adds to support SequenceFile format for StreamingFileSink.
   
   ## Brief change log
   
   Added SequenceFileWriterFactory and SequenceFileWriter.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - 
org.apache.flink.formats.sequencefile.SequenceFileSinkITCase#testWriteSequenceFile
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) yes
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented) not documented
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)