[jira] [Created] (FLINK-10134) UTF-16 support for TextInputFormat

2018-08-13 Thread David Dreyfus (JIRA)
David Dreyfus created FLINK-10134:
-

 Summary: UTF-16 support for TextInputFormat
 Key: FLINK-10134
 URL: https://issues.apache.org/jira/browse/FLINK-10134
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.2
Reporter: David Dreyfus


It does not appear that Flink supports a charset encoding of "UTF-16". It 
particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) to 
establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
 
TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
which sets TextInputFormat.charsetName and then modifies the previously set 
delimiterString to construct the proper byte string encoding of the the 
delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
to interpret the bytes read from the file.
 
There are two problems that this implementation would seem to have when using 
UTF-16.
 # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
actual text file will not contain a BOM at each line ending, so the delimiter 
will never be read. Moreover, if the actual byte encoding of the file is Little 
Endian, the bytes will be interpreted incorrectly.
 # TextInputFormat.readRecord() will not see a BOM each time it decodes a byte 
sequence with the String(bytes, offset, numBytes, charset) call. Therefore, it 
will assume Big Endian, which may not always be correct. [1] 
[https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]

 
While there are likely many solutions, I would think that all of them would 
have to start by reading the BOM from the file when a Split is opened and then 
using that BOM to modify the specified encoding to a BOM specific one when the 
caller doesn't specify one, and to overwrite the caller's specification if the 
BOM is in conflict with the caller's specification. That is, if the BOM 
indicates Little Endian and the caller indicates UTF-16BE, Flink should rewrite 
the charsetName as UTF-16LE.
 I hope this makes sense and that I haven't been testing incorrectly or 
misreading the code.
 
I've verified the problem on version 1.4.2. I believe the problem exists on all 
versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-7926) Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.

2017-10-25 Thread David Dreyfus (JIRA)
David Dreyfus created FLINK-7926:


 Summary: Bug in Hybrid Hash Join: Request to spill a partition 
with less than two buffers.
 Key: FLINK-7926
 URL: https://issues.apache.org/jira/browse/FLINK-7926
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.3.2
 Environment: standalone execution on MacBook Pro
in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3.
taskmanager.heap.mb = 1024
taskmanager.memory.preallocate = false
taskmanager.numberOfTaskSlots = 3
Reporter: David Dreyfus


The following exception is thrown as the number of tasks increases.
{code:java}
10/25/2017 14:26:16 LeftOuterJoin(Join at 
with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a 
partition with less than two buffers.
at 
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
at 
org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

10/25/2017 14:26:16 Job execution switched to status FAILING.
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a 
partition with less than two buffers.
at 
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
at 
org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
{code}

I run with the following command:

{code:java}
flink run -c com.northbay.union.Union3 
./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left 
/Users/user/Documents/Flink/Quickstart/Files/manysmall --right 
/Users/user/Documents/Flink/Quickstart/Files/manysmall --output 
/tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50
{code}

The files submitted are all CSV (int, string, short)

This is the code (break out into 3 separate files before using). The idea 
behind this test is to compare (hash-join) pairs of files and combine their 
results.

{code:java}
package com.northbay.hashcount;

public class DeviceRecord1 {
public int device;
public String fingerprint;
public short dma;
public boolean match;
public DeviceRecord1() {

}
public DeviceRecord1(DeviceRecord old) {
this.device = old.device;
this.fingerprint = old.fingerprint;
this.dma = old.dma;
this.match = false;
}
public DeviceRecord1(int device, String fingerprint, short dma) {
this.device = device;
this.fingerprint = fingerprint;
this.dma = dma;
this.match =