Hello,
These changes result in the following error:
$ lzop -d part-1-0
lzop: part-1-0: not a lzop file
public class BulkRecordLZOSerializer implements BulkWriter<KafkaRecord> {
private final CompressionOutputStream compressedStream;
public BulkRecordLZOSerializer(OutputStream stream) {
CompressionCodecFactory factory = new
CompressionCodecFactory(new Configuration());
try {
compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream);
} catch (IOException e) {
throw new IllegalStateException("Unable to create LZO
OutputStream");
}
}
public void addElement(KafkaRecord record) throws IOException {
compressedStream.write(record.getValue());
compressedStream.write('\n');
}
public void finish() throws IOException {
compressedStream.flush();
compressedStream.finish();
}
public void flush() throws IOException {
compressedStream.flush();
}
}
On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar <
[email protected]> wrote:
> Hi,
>
> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
>
> compressedStream =
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>
>
> Regarding "lzop: unexpected end of file" problem, kindly add
> "compressedStream.flush()" in the below method to flush any leftover data
> before finishing.
>
> public void finish() throws IOException {
> compressedStream.flush();
> compressedStream.finish();
> }
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
>
> Regards,
> Ravi
>
> On Tue, Oct 22, 2019 at 4:10 AM amran dean <[email protected]> wrote:
>
>> Hello,
>> I'm using BulkWriter to write newline-delimited, LZO-compressed files.
>> The logic is very straightforward (See code below).
>>
>> I am experiencing an issue decompressing the created files created in
>> this manner, consistently getting "lzop: unexpected end of file". Is this
>> an issue with caller of BulkWriter?
>>
>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
>> in gibberish. I'm very confused what is going on.
>>
>> private final CompressionOutputStream compressedStream;
>>
>> public BulkRecordLZOSerializer(OutputStream stream) {
>> CompressionCodecFactory factory = new CompressionCodecFactory(new
>> Configuration());
>> try {
>> compressedStream =
>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>> } catch (IOException e) {
>> throw new IllegalStateException("Unable to create LZO OutputStream");
>> }
>> }
>>
>> public void addElement(KafkaRecord record) throws IOException {
>> compressedStream.write(record.getValue());
>> compressedStream.write('\n');
>> }
>>
>> public void finish() throws IOException {
>> compressedStream.finish();
>> }
>>
>> public void flush() throws IOException {
>> compressedStream.flush();
>> }
>>
>>