Re: UTF-16 support for TextInputFormat

2018-08-13 Thread David Dreyfus
Hi Fabian,

I've added FLINK-10134. FLINK-10134
<https://issues.apache.org/jira/browse/FLINK-10134>. I'm not sure you'd
consider it a blocker or that I've identified the right component.
I'm afraid I don't have the bandwidth or knowledge to make the kind of pull
request you really need. I do hope my suggestions prove a little useful.

Thank you,
David

On Fri, Aug 10, 2018 at 5:41 AM Fabian Hueske  wrote:

> Hi David,
>
> Thanks for digging into the code! I had a quick look into the classes as
> well.
> As far as I can see, your analysis is correct and the BOM handling in
> DelimitedInputFormat and TextInputFormat (and other text-based IFs such as
> CsvInputFormat) is broken.
> In fact, its obvious that nobody paid attention to this yet.
>
> It would be great if you could open a Jira issue and copy your analysis
> and solution proposal into it.
> While on it, we could also deprecated the (duplicated) setCharsetName()
> method from TextInputFormat and redirect it to
> DelimitedInputFormat.setCharset().
>
> Would you also be interested in contributing a fix for this problem?
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95
>
> 2018-08-09 14:55 GMT+02:00 David Dreyfus :
>
>> Hi Fabian,
>>
>> Thank you for taking my email.
>> TextInputFormat.setCharsetName("UTF-16") appears to set the private
>> variable TextInputFormat.charsetName.
>> It doesn't appear to cause additional behavior that would help interpret
>> UTF-16 data.
>>
>> The method I've tested is calling
>> DelimitedInputFormat.setCharset("UTF-16"), which then sets
>> TextInputFormat.charsetName and then modifies the previously set
>> delimiterString to construct the proper byte string encoding of the the
>> delimiter. This same charsetName is also used in
>> TextInputFormat.readRecord() to interpret the bytes read from the file.
>>
>> There are two problems that this implementation would seem to have when
>> using UTF-16.
>>
>>1. delimiterString.getBytes(getCharset()) in
>>DelimitedInputFormat.java will return a Big Endian byte sequence including
>>the Byte Order Mark (BOM). The actual text file will not contain a BOM at
>>each line ending, so the delimiter will never be read. Moreover, if the
>>actual byte encoding of the file is Little Endian, the bytes will be
>>interpreted incorrectly.
>>2. TextInputFormat.readRecord() will not see a BOM each time it
>>decodes a byte sequence with the String(bytes, offset, numBytes, charset)
>>call. Therefore, it will assume Big Endian, which may not always be 
>> correct.
>>
>> While there are likely many solutions, I would think that all of them
>> would have to start by reading the BOM from the file when a Split is opened
>> and then using that BOM to modify the specified encoding to a BOM specific
>> one when the caller doesn't specify one, and to overwrite the caller's
>> specification if the BOM is in conflict with the caller's specification.
>> That is, if the BOM indicates Little Endian and the caller indicates
>> UTF-16BE, Flink should rewrite the charsetName as UTF-16LE.
>>
>> I hope this makes sense and that I haven't been testing incorrectly or
>> misreading the code.
>>
>> Thank you,
>> David
>>
>> On Thu, Aug 9, 2018 at 4:04 AM Fabian Hueske  wrote:
>>
>>> Hi David,
>>>
>>> Did you try to set the encoding on the TextInputFormat with
>>>
>>> TextInputFormat tif = ...
>>> tif.setCharsetName("UTF-16");
>>>
>>> Best, Fabian
>>>
>>> 2018-08-08 17:45 GMT+02:00 David Dreyfus :
>>>
>>>> Hello -
>>>>
>>>> It does not appear that Flink supports a charset encoding of "UTF-16".
>>>> It particular, it doesn't appear that Flink consumes the Byte Order Mark
>>>> (BOM) to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there
>>>> any plans to enhance Flink to handle UTF-16 with BOM?
>>>>
>>>> Thank you,
>>>> David
>>>>
>>>
>>>
>


Re: UTF-16 support for TextInputFormat

2018-08-09 Thread David Dreyfus
Hi Fabian,

Thank you for taking my email.
TextInputFormat.setCharsetName("UTF-16") appears to set the private
variable TextInputFormat.charsetName.
It doesn't appear to cause additional behavior that would help interpret
UTF-16 data.

The method I've tested is calling
DelimitedInputFormat.setCharset("UTF-16"), which then sets
TextInputFormat.charsetName and then modifies the previously set
delimiterString to construct the proper byte string encoding of the the
delimiter. This same charsetName is also used in
TextInputFormat.readRecord() to interpret the bytes read from the file.

There are two problems that this implementation would seem to have when
using UTF-16.

   1. delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java
   will return a Big Endian byte sequence including the Byte Order Mark (BOM).
   The actual text file will not contain a BOM at each line ending, so the
   delimiter will never be read. Moreover, if the actual byte encoding of the
   file is Little Endian, the bytes will be interpreted incorrectly.
   2. TextInputFormat.readRecord() will not see a BOM each time it decodes
   a byte sequence with the String(bytes, offset, numBytes, charset) call.
   Therefore, it will assume Big Endian, which may not always be correct.

While there are likely many solutions, I would think that all of them would
have to start by reading the BOM from the file when a Split is opened and
then using that BOM to modify the specified encoding to a BOM specific one
when the caller doesn't specify one, and to overwrite the caller's
specification if the BOM is in conflict with the caller's specification.
That is, if the BOM indicates Little Endian and the caller indicates
UTF-16BE, Flink should rewrite the charsetName as UTF-16LE.

I hope this makes sense and that I haven't been testing incorrectly or
misreading the code.

Thank you,
David

On Thu, Aug 9, 2018 at 4:04 AM Fabian Hueske  wrote:

> Hi David,
>
> Did you try to set the encoding on the TextInputFormat with
>
> TextInputFormat tif = ...
> tif.setCharsetName("UTF-16");
>
> Best, Fabian
>
> 2018-08-08 17:45 GMT+02:00 David Dreyfus :
>
>> Hello -
>>
>> It does not appear that Flink supports a charset encoding of "UTF-16". It
>> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM)
>> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there any
>> plans to enhance Flink to handle UTF-16 with BOM?
>>
>> Thank you,
>> David
>>
>
>


UTF-16 support for TextInputFormat

2018-08-08 Thread David Dreyfus
Hello -

It does not appear that Flink supports a charset encoding of "UTF-16". It
particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM)
to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there any
plans to enhance Flink to handle UTF-16 with BOM?

Thank you,
David


Data sources and slices

2017-10-26 Thread David Dreyfus
Hello,

If I am on a cluster with 2 task managers with 64 CPUs each, I can configure
128 slots in accordance with the documentation. If I set parallelism to 128
and read a 64 MB file (one datasource with a single file), will flink really
create 500K slices? Or, will it check the default blocksize of the host it
is reading from and allocate only as many slices as there are blocks? 

If the file is on S3:
1. Does a single thread copy it to local disk and then have 128 slices
consume it?
2. Does a single thread read read the file from S3 and consume it, treating
it as one slice?
3. Does flink talk to S3 and make a multi-part read to local storage and
then read from local storage in 128 slices?

If a datasource has a large number of files, does each slot read one file at
a time with a single thread, or does each slot read one part of each file
such that 128 slots consume each file one at a time?

More generally, does flink try to allocate files to slots such that each
slot reads the same volume with as long a sequential read as possible? 

How does it distinguish between reading from the local HDFS and S3, given
that they might have vastly different performance characteristics.

Thanks,
David

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Not enough free slots to run the job

2017-10-26 Thread David Dreyfus
Hello,

I know this is an older thread, but ...

If some slots are left empty it doesn't necessarily mean that machine
resources are wasted. Some managed memory might be unavailable, but CPU,
heap memory, network, and disk are shared across slots. To the extent there
are multiple operators executing within a slot, multiple threads are
executing consuming those resources. It's not clear what the actual
performance degradation would be, if any. Correct?

David 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Execute multiple jobs in parallel (threading): java.io.OptionalDataException

2017-10-26 Thread David Dreyfus
Hello,

I am trying to submit multiple jobs to flink from within my Java program.
I am running into an exception that may be related:
java.io.OptionalDataException.

Should I be able to create multiple plans/jobs within a single session and
execute them concurrently?
If so, is there a working example you can point me at?

Do I share the same ExecutionEnvironment? 
It looks like calls to getExecutionEnvironment() return the same one.

I have a number of different transformations on my data I'd like to make.
I'd rather not create one very large job and have them processed in
parallel.
My cluster has enough resources that performing each job sequentially would
be very wasteful.

Thank you,
David

Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
java.io.OptionalDataException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at java.util.HashMap.readObject(HashMap.java:1407)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Tasks, slots, and partitioned joins

2017-10-26 Thread David Dreyfus
Hi Fabian,

Thank you for the great, detailed answers. 
1. So, each parallel slice of the DAG is placed into one slot. The key to
high utilization is many slices of the source data (or the various methods
of repartitioning it). Yes?
2. In batch processing, are slots filled round-robin on task managers, or do
I need to tune the number of slots to load the cluster evenly?
3. Are you suggesting that I perform the join in my custom data source?
4. Looking at this sample from
org.apache.flink.optimizer.PropertyDataSourceTest

  DataSource> data = 
env.readCsvFile("/some/path").types(Long.class, String.class); 
 
  data.getSplitDataProperties() 
.splitsPartitionedBy(0); 

4.a Does this code assume that one split == one file from /some/path? If
readCsvFile splits each file, the guarantee that all keys in each part of
the file share the same partition would be violated, right?
4.b Is there a way to mark a partition number so that sources that share
partition numbers are read in parallel and joined? If I have 10,000 pairs, I
want partition 1 read from the sources at the same time.
4.c Does a downstream flatmap function get an open() call for each new
partition? Or, do I chain MapPartition directly to the datasource?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Tasks, slots, and partitioned joins

2017-10-25 Thread David Dreyfus
Hello -

I have a large number of pairs of files. For purpose of discussion:
/source1/{1..1} and /source2/{1..1}.

I want to join the files pair-wise: /source1/1 joined to /source2/1,
/source1/2 joined to /source2/2, and so on.
I then want to union the results of the pair-wise joins and perform an
aggregate.

I create a simple flink job that has four sources, two joins, and two sinks
to produce intermediate results. This represents two unrelated chains.

I notice that when running this job with parallelism = 1 on a standalone
machine with one task manager and 3 slots, only one slot gets used. 

My concern is that when I scale up to a YARN cluster, flink will continue to
use one slot on one machine instead of using all slots on all machines.

Prior reading suggests all the data source subtasks are added to a default
resource group. Downstream tasks (joins and sinks) want to be colocated with
the data sources. The result is all of my tasks are executed in one slot.

Flink Stream (DataStream) offers the slotSharingGroup() function. This
doesn't seem available to the DataSet user.

*Q1:* How do I force Flink to distribute work evenly across task managers
and the slots allocated to them? If this shouldn't be a concern, please
elaborate. 

When I scale up the number of unrelated chains I notice that flink seems to
start all of them at the same time, which results in thrashing and errors -
lots of IO and errors regarding hash buffers.

*Q2:* Is there any method for controlling the scheduling of tasks so that
some finish before others start? My work around is to execute multiple,
sequential batches with results going into an intermediate directory, and
then a final job that aggregates the results. I would certainly prefer one
job that might avoid the intermediate write.

If I treat /source1 as one data source and /source2 as the second, and then
join the two, flink will shuffle and partition the files on the join key.
The /source1 and /source2 files represent this partitioning. They are reused
multiple times; thus, I shuffle and save the results creating /source1 and
/source2.

*Q3:* Does flink have a method by which I can mark individual files (or
directories) as belonging to a particular partition so that when I try to
join them, the unnecessary shuffle and repartition is avoided?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/