[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16713588#comment-16713588 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-445442845 This is what I've worked. If you need to fix something, please fix it yourself. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712533#comment-16712533 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-445173617 Hi @kl0u I'm sorry I was late. I planed to work this on the weekend. But I think you would better do this. What I want was to write files in `SequenceFileFormat` with only `Path`. The users do not need to know about that Hadoop configuration. So I added utility method as static. If you do, I will leave the rest of modifications up to you. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712498#comment-16712498 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-445162125 Hi @jihyun! Are you planning to work on this? If you prefer I can make the necessary changes myself and then merge. Let me know what you think! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708553#comment-16708553 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238614740 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708536#comment-16708536 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238608508 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708534#comment-16708534 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238608030 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * The wrapper class for serialization of {@link Configuration}. + */ +@Internal +public class SerializableHadoopConfiguration implements Serializable { Review comment: Ok. I'll move into `SequenceFileWriterFactory` class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708532#comment-16708532 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-444053307 Sorry. I missed too many Null checking. I'll fix it. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707394#comment-16707394 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238320072 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType =
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707401#comment-16707401 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238320587 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType =
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707399#comment-16707399 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238316378 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link BulkWriter} implementation that wraps a {@link SequenceFile.Writer}. + * + * @param The type of key written. + * @param The type of value written. + */ +@PublicEvolving +public class SequenceFileWriter implements BulkWriter> { + private final SequenceFile.Writer writer; + + public SequenceFileWriter(SequenceFile.Writer writer) { + this.writer = checkNotNull(writer, "sequenceFileWriter"); Review comment: `checkNotNull(writer)` is enough as, if the exception is thrown, the trace is going to have the name of the class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707395#comment-16707395 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238320884 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType =
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707388#comment-16707388 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238315346 ## File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java ## @@ -16,7 +16,7 @@ * limitations under the License. Review comment: I would move that in the `org.apache.flink.streaming.util` of `flink-test-utils`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707393#comment-16707393 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238319765 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType =
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707398#comment-16707398 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238318139 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType =
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707390#comment-16707390 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238318386 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType =
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707396#comment-16707396 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238321158 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * The wrapper class for serialization of {@link Configuration}. + */ +@Internal +public class SerializableHadoopConfiguration implements Serializable { Review comment: This class can be package private. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707400#comment-16707400 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238317930 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final SerializableHadoopConfiguration serdeHadoopConf; + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass) { + this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName) { + this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param hadoopConf The Hadoop configuration for Sequence File Writer. +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Configuration hadoopConf, Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.serdeHadoopConf = new SerializableHadoopConfiguration(hadoopConf); + this.keyClass = keyClass; Review comment: Add `checkNotNull` to all of the parameters. This is an
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707392#comment-16707392 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238321380 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * The wrapper class for serialization of {@link Configuration}. + */ +@Internal +public class SerializableHadoopConfiguration implements Serializable { + private Configuration hadoopConf; Review comment: This can be `transient` so that the reader knows in advance that the field is not serialized. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707397#comment-16707397 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238321653 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * The wrapper class for serialization of {@link Configuration}. + */ +@Internal +public class SerializableHadoopConfiguration implements Serializable { + private Configuration hadoopConf; + + public SerializableHadoopConfiguration(Configuration hadoopConf) { + this.hadoopConf = hadoopConf; + } + + public Configuration get() { Review comment: Same here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707391#comment-16707391 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238316598 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link BulkWriter} implementation that wraps a {@link SequenceFile.Writer}. + * + * @param The type of key written. + * @param The type of value written. + */ +@PublicEvolving +public class SequenceFileWriter implements BulkWriter> { + private final SequenceFile.Writer writer; + + public SequenceFileWriter(SequenceFile.Writer writer) { + this.writer = checkNotNull(writer, "sequenceFileWriter"); + } + + @Override + public void addElement(Tuple2 element) throws IOException { + writer.append(element.f0, element.f1); + } + + @Override + public void flush() throws IOException { + writer.hflush(); Review comment: Here `writer.hsync()` should be called, not `hflush()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707389#comment-16707389 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r238321562 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * The wrapper class for serialization of {@link Configuration}. + */ +@Internal +public class SerializableHadoopConfiguration implements Serializable { + private Configuration hadoopConf; + + public SerializableHadoopConfiguration(Configuration hadoopConf) { Review comment: This can be package private. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705743#comment-16705743 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-443413453 Hi @kl0u Thanks for your review. I've fixed code based your feedback. Please review whether the changes is okay. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704727#comment-16704727 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r237857194 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A factory that creates a SequenceFile {@link BulkWriter}. + * + * @param The type of key to write. It should be writable. + * @param The type of value to write. It should be writable. + */ +@PublicEvolving +public class SequenceFileWriterFactory implements BulkWriter.Factory> { + private static final long serialVersionUID = 1L; + + private final Class keyClass; + private final Class valueClass; + private final String compressionCodecName; + private final SequenceFile.CompressionType compressionType; + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +*/ + public SequenceFileWriterFactory(Class keyClass, Class valueClass) { + this(keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +*/ + public SequenceFileWriterFactory(Class keyClass, Class valueClass, String compressionCodecName) { + this(keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK); + } + + /** +* Creates a new SequenceFileWriterFactory using the given builder to assemble the +* SequenceFileWriter. +* +* @param keyClass The class of key to write. +* @param valueClass The class of value to write. +* @param compressionCodecName The name of compression codec. +* @param compressionType The type of compression level. +*/ + public SequenceFileWriterFactory(Class keyClass, Class valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) { + this.keyClass = keyClass; + this.valueClass = valueClass; + this.compressionCodecName = compressionCodecName; + this.compressionType = compressionType; + } + + @Override + public BulkWriter> create(FSDataOutputStream out) throws IOException { + Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()); Review comment: Here we should not get the configuration this way. The configuration of the Sequence File Writer should be done on a per job basis, and not for the whole cluster. I would recommend to pass a hadoop configuration in the constructor and given that it is not serializable, we can store it as a byte array. Then, in the
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702990#comment-16702990 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r237426973 ## File path: flink-formats/flink-sequencefile/pom.xml ## @@ -0,0 +1,107 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-sequencefile + flink-sequencefile + + jar + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + provided + + + org.apache.flink + flink-hadoop-fs + ${project.version} + provided + + + + + + + org.apache.flink + flink-test-utils_2.11 + ${project.version} + test + + + + org.apache.flink + flink-streaming-java_2.11 + ${project.version} + test + + + + org.apache.flink + flink-hadoop-compatibility_2.11 + ${project.version} Review comment: Same here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702988#comment-16702988 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r237426646 ## File path: flink-formats/flink-sequencefile/pom.xml ## @@ -0,0 +1,107 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-sequencefile + flink-sequencefile + + jar + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + provided + + + org.apache.flink + flink-hadoop-fs + ${project.version} + provided + + + + + + + org.apache.flink + flink-test-utils_2.11 + ${project.version} Review comment: Replace the hardcoded scala version with `_${scala.binary.version}`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702989#comment-16702989 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r237427090 ## File path: flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.sequencefile; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link BulkWriter} implementation that wraps a {@link SequenceFile.Writer}. + * + * @param The type of key written. + * @param The type of value written. + */ +@PublicEvolving +public class SequenceFileWriter implements BulkWriter> { + private final SequenceFile.Writer writer; + + public SequenceFileWriter(SequenceFile.Writer writer) { + this.writer = checkNotNull(writer, "sequenceFileWriter"); + } + + @Override + public void addElement(Tuple2 element) throws IOException { + Review comment: Remove empty line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702987#comment-16702987 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r237426910 ## File path: flink-formats/flink-sequencefile/pom.xml ## @@ -0,0 +1,107 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-sequencefile + flink-sequencefile + + jar + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + provided + + + org.apache.flink + flink-hadoop-fs + ${project.version} + provided + + + + + + + org.apache.flink + flink-test-utils_2.11 + ${project.version} + test + + + + org.apache.flink + flink-streaming-java_2.11 + ${project.version} Review comment: Same here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702991#comment-16702991 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r237426794 ## File path: flink-formats/flink-sequencefile/pom.xml ## @@ -0,0 +1,107 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. Review comment: Update the version to `1.8-SNAPSHOT` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702992#comment-16702992 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#discussion_r237428128 ## File path: flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/FiniteTestSource.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one Review comment: This class seems like a copy from the `flink-parquet` module. It may make sense to create a `utils` module to avoid this duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683393#comment-16683393 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-437795189 Hi @jihyun ! Currently we are busy with testing the release candidate for Flink-1.7. Unfortunately, this PR will have to wait until the release happens. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16683275#comment-16683275 ] ASF GitHub Bot commented on FLINK-10457: jihyun commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-437769950 Hi. Did you check this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656526#comment-16656526 ] ASF GitHub Bot commented on FLINK-10457: GJL commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-431306529 I realized it's better if @kl0u reviews this since he worked on the StreamingFileSink. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646275#comment-16646275 ] ASF GitHub Bot commented on FLINK-10457: kl0u commented on issue #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774#issuecomment-428912272 Hi @jihyun ! We are a bit busy now with the upcoming feature freeze. But I will review your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631503#comment-16631503 ] ASF GitHub Bot commented on FLINK-10457: jihyun opened a new pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink URL: https://github.com/apache/flink/pull/6774 ## What is the purpose of the change This pull request adds to support SequenceFile format for StreamingFileSink. ## Brief change log Added SequenceFileWriterFactory and SequenceFileWriter. ## Verifying this change This change added tests and can be verified as follows: - org.apache.flink.formats.sequencefile.SequenceFileSinkITCase#testWriteSequenceFile ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) yes - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) not documented This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support SequenceFile for StreamingFileSink > -- > > Key: FLINK-10457 > URL: https://issues.apache.org/jira/browse/FLINK-10457 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Jihyun Cho >Priority: Major > Labels: pull-request-available > > SequenceFile is major file format in Hadoop eco system. > It is simple to manage file and easy to combine with other tools. > So we are still needed SequenceFile format, even if the file format supports > Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)