[jira] [Created] (APEXMALHAR-2433) Add readme for Windowing Benchmark

2017-03-06 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2433:
---

 Summary: Add readme for Windowing Benchmark
 Key: APEXMALHAR-2433
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2433
 Project: Apache Apex Malhar
  Issue Type: Task
Reporter: bright chen
Assignee: bright chen






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-28 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888597#comment-15888597
 ] 

bright chen commented on APEXCORE-635:
--

Yes, I agree it should be revisited after migrated to the netty. Otherwise a 
lot of work just wasted.
What about I prototype after migrated to the netty.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 
> item type.
> class ListSerde {
>   Slice toByteArray(List list) {
> byte[] b = new byte[…];  //hard estimate proper size.
> int 

[jira] [Commented] (APEXMALHAR-2366) Apply BloomFilter to Bucket

2017-02-27 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886659#comment-15886659
 ] 

bright chen commented on APEXMALHAR-2366:
-

Hi [~bhupesh]
The only difference as I think is this BloomFilter implementation used 
SerializationBuffer to save some copy and garbage collection. I am not sure how 
much impact on performance. Another thing is Chaitanya's BloomFilter is in 
Megh, It at least need to move to the malhar lib before can use it. and I am 
not sure if there any license issue neither

> Apply BloomFilter to Bucket
> ---
>
> Key: APEXMALHAR-2366
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2366
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 192h
>  Remaining Estimate: 192h
>
> The bucket get() will check the cache and then check from the stored files if 
> the entry is not in the cache. The checking from files is a pretty heavy 
> operation due to file seek.
> The chance of check from file is very high if the key range are large.
> Suggest to apply BloomFilter for bucket to reduce the chance read from file.
> If the buckets were managed by ManagedStateImpl, the entry of bucket would be 
> very huge and the BloomFilter maybe not useful after a while. But If the 
> buckets were managed by ManagedTimeUnifiedStateImpl, each bucket keep certain 
> amount of entry and BloomFilter would be very useful.
> For implementation:
> The Guava already have BloomFilter and the interface are pretty simple and 
> fit for our case. But Guava 11 is not compatible with Guava 14 (Guava 11 use 
> Sink while Guava 14 use PrimitiveSink).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (APEXMALHAR-2395) create MultiAccumulation

2017-02-27 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen resolved APEXMALHAR-2395.
-
Resolution: Duplicate

duplicate with https://issues.apache.org/jira/browse/APEXMALHAR-2428

> create MultiAccumulation
> 
>
> Key: APEXMALHAR-2395
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2395
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> create  MultiAccumulation which support SUM, MAX, MIN, COUNT. And AVG can be 
> computed by SUM and COUNT when query.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (APEXMALHAR-2428) CompositeAccumulation for windowed operator

2017-02-27 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2428:
---

 Summary: CompositeAccumulation for windowed operator
 Key: APEXMALHAR-2428
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2428
 Project: Apache Apex Malhar
  Issue Type: New Feature
Reporter: bright chen
Assignee: bright chen






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-23 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881293#comment-15881293
 ] 

bright chen commented on APEXCORE-635:
--

{quote}
Yes, an array is an object in java that may hold either primitive types or 
other objects. When an array is GC, it is GC as a single object. If an array 
holds primitive types, the array's memory is reclaimed in a single step, there 
is no GC of individual bytes/int/long in the array.
{quote}
Yes, Agree, One array of byte only treat as one object. But as the memory is 
relative big, it probably will not allocated in TLA( Thread Local Area ), which 
means it need to get lock for each allocation, so the allocation is not a very 
light operation. 

{quote}
I was referring to Output.getBuffer() and DefaultKryoStreamCodec that uses 
Output.getBuffer(). You are right that Output.toBytes() allocates a new array 
and copies bytes from Output internal buffer to the newly allocated buffer. 
This is a bug in DefaultStatefulStreamCodec, it should use Output.getBuffer().
{quote}
Output.getBuffer() probably not work in our case. The getBuffer() returns the 
reference of Output's internal buffer, which was used for putting the 
serialized bytes. As we need to share one instance of Output for multiple 
tuples, the buffer was reused when serialize next tuple. Which cause problem no 
matter we set the output's position to zero or not. If set position to zero, 
which means clean the previous serialized data. If not changed position, then 
the serialized data of next tuple concat to the previous one, but we don't know 
the boundary of each tuple, and the buffer keeping on growing which means 
output have to keep on allocate new big memory and copy the old data(if the 
initial buffer size is not very big).

{quote}
As I pointed to you, the prototype has a significant bug (reusing byte arrays 
that are not reusable due to fire and forget netlet behavior). What I would 
like to see is a series of small prototypes that prove that the approach 
provides significant performance gains.
{quote}
Yes, the original prototype reset the buffer when it should not. But as I 
pointed in this proposal, the buffer will be reset after the data sent to 
socket. That why need to override the write() method. 

{quote}
I don't see how without modifying the current definition of StreamCodec and 
StatefulStreamCodec the solution can be generalized. 
{quote}
Yes, agree. But I think it was worthy to change StreamCode, as suggested in the 
proposal, it's not easy to implemented the customized reusable serialization 
without memory copy with the previous interface. And it doesn't compatible with 
kryo's interface.
The current implementation(DefaultStatefulStreamCodec) of StatefulStreamCodec 
seems not natural for me. The pairs are cleared after first tuple. And for each 
other tuples, the pairs need to checked, and an instance of DataStatePair need 
to created to wrapper state(which is null except first tuple) and data.

{quote}
It may be possible to improve DefaultStatefulStreamCodec (concrete 
implementation of StatefulStreamCodec), but it seems to require a large amount 
of modifications to netlet and buffer server that other StreamCodec 
implementations will not benefit from. 
{quote}
We can change the behavior of netlet by override put() and write() method. So 
we don't need to change code in netlet.
The customized codec can benefit from it if change the codec interface as 
pointed in this proposal.

{quote}
Additionally, the proposal seems to require netlet client to be aware of codec 
implementation and any changes to tuple format will require changes to 
serialization and de-serialization code.
{quote}
The netlet client need to aware of the memory management mechanism in order to 
reset the memory. But the code is maintained in same class (current 
BufferServerPublisher; an extended class in proposal), so this should  be ok.
The serialization formation don't have to change. it can compatible with the 
previous one. But if we want to take advantage of reserve() function of 
SerializationBuffer to avoid another extra copy, the format need to be changed 
to avoid use variable length.




> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-13 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864242#comment-15864242
 ] 

bright chen commented on APEXCORE-635:
--

Hi Vlad,
I think what you meant the issues I not addressed is a following: 
- Please benchmark copying one large block of memory in a single step and in 
multiple steps. What is the difference (in %)? Do the same for a direct buffer.
- How will control tuples be injected into a continuous block of memory?

my comments as following:
- From my previous comments, we can the problem of current mechanism is waste 2 
times memory copy and garbage collection. It's not the problem of copy one big 
bytes in one time or divided into multiple copies.
- The control tuples also write to the same SerializationBuffer.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-13 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864216#comment-15864216
 ] 

bright chen commented on APEXCORE-635:
--

Hi Vald,
- As I understand, array is a kind of object in java. Does java implemented 
differently? And how can JVM reuse allocated bytes without collection? Do you 
have any reference I can take a look?
- I copied the implementation of kryo Output.toBytes() here ( kryo-2.24.0.jar 
). From this implementation toBytes() do allocate new memory and copy.
public byte[] toBytes () {
byte[] newBuffer = new byte[position];
System.arraycopy(buffer, 0, newBuffer, 0, position);
return newBuffer;
}
- I can get number, but that means need at least simply prototype. And based on 
my previous test for serialization string of 1000 ascii, The performance can 
gain 30%
- Yes, it's pretty complex to implement all the proposal. But fortunately, most 
of memory manage stuff already implemented in Malhar. The things need to do is 
add some feature and integrate to BufferServer

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would 

[jira] [Commented] (APEXMALHAR-2406) ManagedState Issues

2017-02-13 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864039#comment-15864039
 ] 

bright chen commented on APEXMALHAR-2406:
-

Hi Chaitanya,
I believe Siyuan and David did something regards bucket purge. Have you got 
chance talk with them?
Could you state more detail if you didn't sync with them? Such as which is the 
state implementation and which bucket assigner was used?

> ManagedState Issues
> ---
>
> Key: APEXMALHAR-2406
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2406
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Chaitanya
>Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-10 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862087#comment-15862087
 ] 

bright chen commented on APEXCORE-635:
--

I think the impact of garbage collection would be great depended on the memory 
situation.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 
> item type.
> class ListSerde {
>   Slice toByteArray(List list) {
> byte[] b = new byte[…];  //hard estimate proper size.
> int size = 0;
> for(T item : list) {
>   int length = 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-10 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15861966#comment-15861966
 ] 

bright chen commented on APEXCORE-635:
--

- If we use direct buffer, it seems ByteBuffer has no way to avoid memcopy. 
What I meant to avoid memcopy and garbage collection is explained as following 
by comparing details of current and new procedure( assume each tuple serialized 
to N bytes, and write M tuples in one time):

current procedure(use kryo serde as example):
1) kryo serde:   write data to buffer of kryo’s output, assume the length of 
bytes is N
2) call Output.toBytes() to get bytes from kryo:   need memcopy, copy N bytes
3) put message type and partition:   need memcopy, copy N bytes, the total 
length become N+5; N bytes need to be garbage collected
4) repeat 1 to 3 M times, and generate M slices
5) copy the M slices to ByteBuffer:  M*(N+5) bytes need to be garbage collected
6) write ByterBuffer to socket

new procedure(use kryo serde as example):
1) write message type and partition to SerializationBuffer
2) kryo serde: serde to SerializationBuffer, assume the length of bytes is N
3) call SerializationBuffer.toSlice() to get the slice: return the wrapper 
instead copy the data
4) repeat 1 to 3 M times, and generate M slices
5) merge the M slices into one in most of case(if these slices in same block). 
If in different block, probably handle one block each time.
6) copy or wrapper the to ByteBuffer. Probably need copy if want to use direct 
mode.
7) write ByterBuffer to socket. reset the SerializationBuffer

So, by comparing current and new procedure, current procedure need to copy 
around 3*N*M bytes and garbage collection of 2*N*M  bytes, but new procedure 
only need copy N*M bytes and no need garbage collection. So new approach save 
2*N*M memory copy and garbage collection.

- The memory of different blocks is not continuous as we want dynamic allocate 
block, but the slices in same block are continuous. As the size of block is 
much large than slices.

- The current fire and forget approach means no way to reuse the memory. That's 
why need to override write() method by walk around way. The new implementation 
of write() will reset the SerializationBuffer after wrote data to socket. The 
new approach don't need acknowledgement neither. 

- Yes, for variable length, it is not possible to reserve the space for length. 
But I doubled why we have to use variable length. It save at most 3 bytes for 
each message, but made parser harder. It's not common for protocols to use 
variable length. Even IP the fixed length. In worst case, even if we don't use 
the reserve feature of SerializationBuffer, we still can save 2*N*M memory copy 
and garbage collection.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-09 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860433#comment-15860433
 ] 

bright chen commented on APEXCORE-635:
--

The reason for override BufferServerPublisher.write()
- The current implementation of write() assume the buffer of the slices are not 
continuous and used memcopy to merge the bytes to send to socket. The new 
mechanism make sure the memory of continuous slices in same block are 
continuous and don't need memcopy for merge the bytes.
- The current mechanism don't reuse the memory, so the write() can just send 
data to the socket and leave the data to be garbage collected. The new 
mechanism reuse the memory, so the write() need the send the data and then 
reset the buffer(As there has no way to know what data has sent to socket 
outside of write()).

The reason why this mechanism performance better than kryo are:
- The memory can be reused instead of garbage collected after data send to 
socket, so avoid garbage collection
- Avoid unnecessary memory copy. Basically can avoid all extra copy required by 
kryo. the kryo need to copy data after serialized as the kryo internal buffer 
need to be used for next serialization. And for LV(length value) format of 
serialization, kryo Output doesn't provide mechanism to write serialized length 
without memcopy(In most of time, the serialized length can only know after 
serialized).
- the data which send to socket can be easily merged in a block without extra 
memory copy.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for 

[jira] [Updated] (APEXMALHAR-2335) StateTracker can cause bucket memory leak

2017-02-09 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2335:

Description: 
The intension of StateTracker#bucketAccessed as I understand was to keep the 
last access time of each bucket so freeMemory can free the buckets which not 
accessed recently.
But here are some problems

- StateTracker#bucketAccessed() will be called each time access to the bucket. 
Current implementation use Concurrent map and set to handle it(bucketHeap and 
bucketAccessTimes). it could be pretty heavy.

- StateTracker#run use bucketHeap to get the bucket which need to free memory, 
but at the same time bucketHeap was constantly changed by bucketAccessed(). It 
could cause concurrent issue event bucketHeap is a Current set. And even more 
the while loop could never end due to bucketHeap was changing very high 
frequently. Following two bug probably related to this:
https://issues.apache.org/jira/browse/APEXMALHAR-2333
https://issues.apache.org/jira/browse/APEXMALHAR-2334

Following are my thought to solve these issue.
- It's not necessary to keep very accurate of update time, it probably only 
need to be updated once for each stream window. But StateTracker don't support 
endWindow() (in fact endWindow() was triggered at end of application window). 
So, we can keep the bucket which updated in this period and update only at the 
end of the period.

- keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
the lock only need when switch

  was:
The intension of StateTracker#bucketAccessed as I understand was to keep the 
last access time of each bucket so freeMemory can free the buckets which not 
accessed recently.
But here are some problems

- StateTracker#bucketAccessed() will be called each time access to the bucket. 
Current implementation use Concurrent map and set to handle it(bucketHeap and 
bucketAccessTimes). it could be pretty heavy.

- StateTracker#run use bucketHeap to get the bucket which need to free memory, 
but at the same time bucketHeap was constantly changed by bucketAccessed(). It 
could cause concurrent issue event bucketHeap is a Current set. And even more 
the while loop could never end. Following two bug probably related to this:
https://issues.apache.org/jira/browse/APEXMALHAR-2333
https://issues.apache.org/jira/browse/APEXMALHAR-2334

Following are my thought to solve these issue.
- It's not necessary to keep very accurate of update time, it probably only 
need to be updated once for each stream window. But StateTracker don't support 
endWindow() (in fact endWindow() was triggered at end of application window). 
So, we can keep the bucket which updated in this period and update only at the 
end of the period.

- keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
the lock only need when switch


> StateTracker can cause bucket memory leak
> -
>
> Key: APEXMALHAR-2335
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2335
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 144h
>  Remaining Estimate: 144h
>
> The intension of StateTracker#bucketAccessed as I understand was to keep the 
> last access time of each bucket so freeMemory can free the buckets which not 
> accessed recently.
> But here are some problems
> - StateTracker#bucketAccessed() will be called each time access to the 
> bucket. Current implementation use Concurrent map and set to handle 
> it(bucketHeap and bucketAccessTimes). it could be pretty heavy.
> - StateTracker#run use bucketHeap to get the bucket which need to free 
> memory, but at the same time bucketHeap was constantly changed by 
> bucketAccessed(). It could cause concurrent issue event bucketHeap is a 
> Current set. And even more the while loop could never end due to bucketHeap 
> was changing very high frequently. Following two bug probably related to this:
> https://issues.apache.org/jira/browse/APEXMALHAR-2333
> https://issues.apache.org/jira/browse/APEXMALHAR-2334
> Following are my thought to solve these issue.
> - It's not necessary to keep very accurate of update time, it probably only 
> need to be updated once for each stream window. But StateTracker don't 
> support endWindow() (in fact endWindow() was triggered at end of application 
> window). So, we can keep the bucket which updated in this period and update 
> only at the end of the period.
> - keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
> the lock only need when switch



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-03 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXCORE-635:
-
Description: 
Manage memory to avoid memory copy and garbage collection

The aim of this proposal is to reuse the memory to avoid the garbage collection 
and avoid unnecessary memory copy to increase the performance. In this proposal 
the term serde means serialization and deserialization. It’s same as codec.

Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
extends Kryo and optimize it by replace class by class id. And application 
developer can optimize serializer by implement interface StreamCodec. 

First, let’s look into the default codec DefaultStatefulStreamCodec. It 
basically optimize serde by replace class name by class id as my understanding. 
And the state information only send before sending first tuple, it’s kind like 
configuration for serde. So I suggest to separate this feature from serde. The 
benefit is the customized serde can still use this feature. And the kryo have 
some limitation which I’ll state later.


Second, Let’s look at the customized serde. Let’s stand from application 
developer point of view and look at how to implement StreamCodec. I take a 
simple tuple List as example.

The first solution is use kryo. This is basically same as apex default codec.

The second solution is implement StreamCodec for String and List, and ListSerde 
delegate String to StringSerde. The benefit of this solution is the StringSerde 
ListSerde can be reused. The problem is there need a lot of temporary memory 
and memory copy. Following is the sample implement.
Class StringSerde {
  Slice toByteArray(String o) {
byte[] b = o.getBytes(“UTF8”);  // new bytes
byte[] b1 = new byte[b1.length + 4];  // new bytes
set the length of the string at the first 4 bytes
System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
return new Slice(b1);
  }

class ListSerde {
  StreamCodec itemSerde;  //the serde for serialize/deserialize item

  Slice toByteArray(List list) {
Slice[] itemSlices = new Slice[list.size()];
int size = 0;
int index = 0;
for(T item : list) {
  Slice slice = itemSerde.toByteArray(item);
  size += slice.length;
  itemSlices[index++] = slice;
}
byte[] b = new byte[size+4];   //allocated the memory
set the length of the list at the first 4 bytes
copy the data from itemSlices
return new Slice(b);
  }
}
  
from above code, we can see that around 2 times of required memory were 
allocated and data copied twice( one copy maybe special to string, but another 
copy is mandatory). And when bytes written to the socket, all allocated memory 
can’t be reused but need to be garbage collected.

The above tuple only have two levels, if the tuple have n level, n times of 
required memory would be allocated and n-1 time of data copy is required.

The third solution could be allocate memory and then pass the memory and offset 
to item serde. There are some problems for this solution:
How to pass the memory from caller? As our previous interface only pass the 
object but no way to pass memory. So the pass of memory will depends on 
implementation.
Another big problem of this solution is it hard to reallocate proper memory(For 
this special case, it probably can allocate 2 times of all string length. ). 
And the memory allocated more than required would be wasted until data send to 
the socket(or allocate exact memory and copy the data to avoid waste memory). 
And the code also need to handle the case if memory is not enough. 
The fourth solution could be treat whole object as flat, allocate memory and 
handle it. For example as following. This solution solve the problem of pass 
memory. But it has other problems of third solution and introduced some other 
problems:

Can’t reuse the code: we already have the StringSerde, but ListSerde 
have to implement almost same logic again. 
The serializeItemToMemory() method should be implemented depend on different 
item type.
class ListSerde {
  Slice toByteArray(List list) {
byte[] b = new byte[…];  //hard estimate proper size.
int size = 0;
for(T item : list) {
  int length = serializeItemToMemory(item, b, size); 
  size += length;
}
Allocate new memory to copy data if don’t want waste memory
  }
}
So, from the analysis of these solutions. It’s not easy to implement good and 
reusable customize serde.



Third, let’s look at the Kryo serde. Kryo provides Output, so each field serde 
write to the same Output. This approach solve the memory problem. But the 
Output has some problem too.

The Output, as a stream, can only write continuously. But it would be problem. 
For example, when Serialize String to LV format. We don’t know what the length 
could be before serialization. 
The Output don’t have cache, which 

[jira] [Assigned] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-02 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen reassigned APEXCORE-635:


Assignee: bright chen

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringCodec {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 
> item type.
> class ListSerde {
>   Slice toByteArray(List list) {
> byte[] b = new byte[…];  //hard estimate proper size.
> int size = 0;
> for(T item : list) {
>   int length = serializeItemToMemory(item, b, size); 
>   size += length;
> }
> Allocate new memory to copy 

[jira] [Created] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-02 Thread bright chen (JIRA)
bright chen created APEXCORE-635:


 Summary: Proposal: Manage memory to avoid memory copy and garbage 
collection
 Key: APEXCORE-635
 URL: https://issues.apache.org/jira/browse/APEXCORE-635
 Project: Apache Apex Core
  Issue Type: Wish
Reporter: bright chen


Manage memory to avoid memory copy and garbage collection

The aim of this proposal is to reuse the memory to avoid the garbage collection 
and avoid unnecessary memory copy to increase the performance. In this proposal 
the term serde means serialization and deserialization. It’s same as codec.

Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
extends Kryo and optimize it by replace class by class id. And application 
developer can optimize serializer by implement interface StreamCodec. 

First, let’s look into the default codec DefaultStatefulStreamCodec. It 
basically optimize serde by replace class name by class id as my understanding. 
And the state information only send before sending first tuple, it’s kind like 
configuration for serde. So I suggest to separate this feature from serde. The 
benefit is the customized serde can still use this feature. And the kryo have 
some limitation which I’ll state later.


Second, Let’s look at the customized serde. Let’s stand from application 
developer point of view and look at how to implement StreamCodec. I take a 
simple tuple List as example.

The first solution is use kryo. This is basically same as apex default codec.

The second solution is implement StreamCodec for String and List, and ListSerde 
delegate String to StringSerde. The benefit of this solution is the StringSerde 
ListSerde can be reused. The problem is there need a lot of temporary memory 
and memory copy. Following is the sample implement.
Class StringCodec {
  Slice toByteArray(String o) {
byte[] b = o.getBytes(“UTF8”);  // new bytes
byte[] b1 = new byte[b1.length + 4];  // new bytes
set the length of the string at the first 4 bytes
System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
return new Slice(b1);
  }

class ListSerde {
  StreamCodec itemSerde;  //the serde for serialize/deserialize item

  Slice toByteArray(List list) {
Slice[] itemSlices = new Slice[list.size()];
int size = 0;
int index = 0;
for(T item : list) {
  Slice slice = itemSerde.toByteArray(item);
  size += slice.length;
  itemSlices[index++] = slice;
}
byte[] b = new byte[size+4];   //allocated the memory
set the length of the list at the first 4 bytes
copy the data from itemSlices
return new Slice(b);
  }
}
  
from above code, we can see that around 2 times of required memory were 
allocated and data copied twice( one copy maybe special to string, but another 
copy is mandatory). And when bytes written to the socket, all allocated memory 
can’t be reused but need to be garbage collected.

The above tuple only have two levels, if the tuple have n level, n times of 
required memory would be allocated and n-1 time of data copy is required.

The third solution could be allocate memory and then pass the memory and offset 
to item serde. There are some problems for this solution:
How to pass the memory from caller? As our previous interface only pass the 
object but no way to pass memory. So the pass of memory will depends on 
implementation.
Another big problem of this solution is it hard to reallocate proper memory(For 
this special case, it probably can allocate 2 times of all string length. ). 
And the memory allocated more than required would be wasted until data send to 
the socket(or allocate exact memory and copy the data to avoid waste memory). 
And the code also need to handle the case if memory is not enough. 
The fourth solution could be treat whole object as flat, allocate memory and 
handle it. For example as following. This solution solve the problem of pass 
memory. But it has other problems of third solution and introduced some other 
problems:

Can’t reuse the code: we already have the StringSerde, but ListSerde 
have to implement almost same logic again. 
The serializeItemToMemory() method should be implemented depend on different 
item type.
class ListSerde {
  Slice toByteArray(List list) {
byte[] b = new byte[…];  //hard estimate proper size.
int size = 0;
for(T item : list) {
  int length = serializeItemToMemory(item, b, size); 
  size += length;
}
Allocate new memory to copy data if don’t have waste memory
  }
}
So, from the analysis of these solutions. It’s not easy to implement good and 
reusable customize serde.



Third, let’s look at the Kryo serde. Kryo provides Output, so each field serde 
write to the same Output. This approach solve the memory problem. But the 
Output has some problem too.

The Output, as a stream, can only write continuously. But it would be 

[jira] [Created] (APEXMALHAR-2395) create MultiAccumulation

2017-01-26 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2395:
---

 Summary: create MultiAccumulation
 Key: APEXMALHAR-2395
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2395
 Project: Apache Apex Malhar
  Issue Type: Task
Reporter: bright chen
Assignee: bright chen


create  MultiAccumulation which support SUM, MAX, MIN, COUNT. And AVG can be 
computed by SUM and COUNT when query.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXCORE-606) Suggest: Optimise Kryo Output

2017-01-09 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXCORE-606:
-
Summary: Suggest: Optimise Kryo Output  (was: Optimise Kryo Output)

> Suggest: Optimise Kryo Output
> -
>
> Key: APEXCORE-606
> URL: https://issues.apache.org/jira/browse/APEXCORE-606
> Project: Apache Apex Core
>  Issue Type: New Feature
>Reporter: bright chen
>Assignee: bright chen
>
> The kryo Output has some limitation
>   - The size of the data is limited. kryo write data to the buffer, it will 
> throw the overflow exception if the data exceed the size
>   - The Output.toBytes() will copy the data to temporary buffer and output, 
> it will  decrease the performance and introduce garbage collection.
> When I was tuning Spillable Data structure and Manage State. I create a 
> mechanism to share and reuse the memory to avoid above problem.  And it can 
> be reused in core serialization with small change. Please see jira: 
> https://issues.apache.org/jira/browse/APEXMALHAR-2190



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXCORE-606) Suggestion: Optimise Kryo Output

2017-01-09 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXCORE-606:
-
Summary: Suggestion: Optimise Kryo Output  (was: Suggest: Optimise Kryo 
Output)

> Suggestion: Optimise Kryo Output
> 
>
> Key: APEXCORE-606
> URL: https://issues.apache.org/jira/browse/APEXCORE-606
> Project: Apache Apex Core
>  Issue Type: New Feature
>Reporter: bright chen
>Assignee: bright chen
>
> The kryo Output has some limitation
>   - The size of the data is limited. kryo write data to the buffer, it will 
> throw the overflow exception if the data exceed the size
>   - The Output.toBytes() will copy the data to temporary buffer and output, 
> it will  decrease the performance and introduce garbage collection.
> When I was tuning Spillable Data structure and Manage State. I create a 
> mechanism to share and reuse the memory to avoid above problem.  And it can 
> be reused in core serialization with small change. Please see jira: 
> https://issues.apache.org/jira/browse/APEXMALHAR-2190



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXCORE-606) Optimise Kryo Output

2017-01-09 Thread bright chen (JIRA)
bright chen created APEXCORE-606:


 Summary: Optimise Kryo Output
 Key: APEXCORE-606
 URL: https://issues.apache.org/jira/browse/APEXCORE-606
 Project: Apache Apex Core
  Issue Type: New Feature
Reporter: bright chen
Assignee: bright chen


The kryo Output has some limitation
  - The size of the data is limited. kryo write data to the buffer, it will 
throw the overflow exception if the data exceed the size
  - The Output.toBytes() will copy the data to temporary buffer and output, it 
will  decrease the performance and introduce garbage collection.

When I was tuning Spillable Data structure and Manage State. I create a 
mechanism to share and reuse the memory to avoid above problem.  And it can be 
reused in core serialization with small change. Please see jira: 
https://issues.apache.org/jira/browse/APEXMALHAR-2190





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2366) Apply BloomFilter to Bucket

2016-12-08 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15732915#comment-15732915
 ] 

bright chen commented on APEXMALHAR-2366:
-

The Hadoop BloomFilter have serialization problem. And also introduce memory 
copy and garbage collection due to create Key and copy byte array.

Going to implement the SliceBloomFilter for bucket case to avoid these problems

> Apply BloomFilter to Bucket
> ---
>
> Key: APEXMALHAR-2366
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2366
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 192h
>  Remaining Estimate: 192h
>
> The bucket get() will check the cache and then check from the stored files if 
> the entry is not in the cache. The checking from files is a pretty heavy 
> operation due to file seek.
> The chance of check from file is very high if the key range are large.
> Suggest to apply BloomFilter for bucket to reduce the chance read from file.
> If the buckets were managed by ManagedStateImpl, the entry of bucket would be 
> very huge and the BloomFilter maybe not useful after a while. But If the 
> buckets were managed by ManagedTimeUnifiedStateImpl, each bucket keep certain 
> amount of entry and BloomFilter would be very useful.
> For implementation:
> The Guava already have BloomFilter and the interface are pretty simple and 
> fit for our case. But Guava 11 is not compatible with Guava 14 (Guava 11 use 
> Sink while Guava 14 use PrimitiveSink).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2362) SpillableSetMulitmapImpl.removedSets keeps growing

2016-12-06 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen resolved APEXMALHAR-2362.
-
Resolution: Fixed

> SpillableSetMulitmapImpl.removedSets keeps growing
> --
>
> Key: APEXMALHAR-2362
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2362
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: David Yan
>Assignee: David Yan
>
> That list is only added to but not removed and it will grow over time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2366) Apply BloomFilter to Bucket

2016-12-05 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723125#comment-15723125
 ] 

bright chen commented on APEXMALHAR-2366:
-

The BloomFilter will create bits in memory, and this memory should be released 
when purge Bucket.

> Apply BloomFilter to Bucket
> ---
>
> Key: APEXMALHAR-2366
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2366
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 192h
>  Remaining Estimate: 192h
>
> The bucket get() will check the cache and then check from the stored files if 
> the entry is not in the cache. The checking from files is a pretty heavy 
> operation due to file seek.
> The chance of check from file is very high if the key range are large.
> Suggest to apply BloomFilter for bucket to reduce the chance read from file.
> If the buckets were managed by ManagedStateImpl, the entry of bucket would be 
> very huge and the BloomFilter maybe not useful after a while. But If the 
> buckets were managed by ManagedTimeUnifiedStateImpl, each bucket keep certain 
> amount of entry and BloomFilter would be very useful.
> For implementation:
> The Guava already have BloomFilter and the interface are pretty simple and 
> fit for our case. But Guava 11 is not compatible with Guava 14 (Guava 11 use 
> Sink while Guava 14 use PrimitiveSink).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2366) Apply BloomFilter to Bucket

2016-12-05 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723065#comment-15723065
 ] 

bright chen commented on APEXMALHAR-2366:
-

It's better use Guava BloomFilter as it's type safe and can support different 
type of key. But due to the compatible problem mentioned in previous comment. 
I'll integrate Hadoop BloomFilter which which the type of key is 'Key'

> Apply BloomFilter to Bucket
> ---
>
> Key: APEXMALHAR-2366
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2366
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 192h
>  Remaining Estimate: 192h
>
> The bucket get() will check the cache and then check from the stored files if 
> the entry is not in the cache. The checking from files is a pretty heavy 
> operation due to file seek.
> The chance of check from file is very high if the key range are large.
> Suggest to apply BloomFilter for bucket to reduce the chance read from file.
> If the buckets were managed by ManagedStateImpl, the entry of bucket would be 
> very huge and the BloomFilter maybe not useful after a while. But If the 
> buckets were managed by ManagedTimeUnifiedStateImpl, each bucket keep certain 
> amount of entry and BloomFilter would be very useful.
> For implementation:
> The Guava already have BloomFilter and the interface are pretty simple and 
> fit for our case. But Guava 11 is not compatible with Guava 14 (Guava 11 use 
> Sink while Guava 14 use PrimitiveSink).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-12-02 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15715934#comment-15715934
 ] 

bright chen commented on APEXMALHAR-2339:
-

Suggest apply BloomFilter to bucket to reduce the chance of seek in files.
See https://issues.apache.org/jira/browse/APEXMALHAR-2366

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-21 at 10.34.38 AM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2366) Apply BloomFilter to Bucket

2016-12-02 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2366:
---

 Summary: Apply BloomFilter to Bucket
 Key: APEXMALHAR-2366
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2366
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen
Assignee: bright chen


The bucket get() will check the cache and then check from the stored files if 
the entry is not in the cache. The checking from files is a pretty heavy 
operation due to file seek.

The chance of check from file is very high if the key range are large.

Suggest to apply BloomFilter for bucket to reduce the chance read from file.

If the buckets were managed by ManagedStateImpl, the entry of bucket would be 
very huge and the BloomFilter maybe not useful after a while. But If the 
buckets were managed by ManagedTimeUnifiedStateImpl, each bucket keep certain 
amount of entry and BloomFilter would be very useful.

For implementation:
The Guava already have BloomFilter and the interface are pretty simple and fit 
for our case. But Guava 11 is not compatible with Guava 14 (Guava 11 use Sink 
while Guava 14 use PrimitiveSink).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2361) Optimise SpillableWindowedKeyedStorage remove(Window) to improve the performance

2016-11-30 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15710303#comment-15710303
 ] 

bright chen commented on APEXMALHAR-2361:
-

Probably we can handle DISCARDING by update value to default value instead of 
clear the window. Then  remove(Window)  only used for window after lateness.

> Optimise SpillableWindowedKeyedStorage remove(Window) to improve the 
> performance
> 
>
> Key: APEXMALHAR-2361
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2361
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> Currently, SpillableWindowedKeyedStorage remove(Window) will go through each 
> key and mark all of them as deleted. It would be expensive when there are 
> lots of keys and especially these entry already spill out of memory (this the 
> common case when remove() was called).
> Suggest to mark whole window as deleted. When the window was marked as 
> deleted, it will not allowed to add/update any entry of this window ( this 
> should match the requirement as remove(Window) only be called after allowed 
> lateness



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-30 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15710031#comment-15710031
 ] 

bright chen commented on APEXMALHAR-2339:
-

SpillableWindowedKeyedStorage.remove(Window) will go through each entry of this 
window and mark as deleted. It would be expensive. Suggest to optimize it. 
See https://issues.apache.org/jira/browse/APEXMALHAR-2361

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-21 at 10.34.38 AM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2361) Optimise SpillableWindowedKeyedStorage remove(Window) to improve the performance

2016-11-30 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2361:
---

 Summary: Optimise SpillableWindowedKeyedStorage remove(Window) to 
improve the performance
 Key: APEXMALHAR-2361
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2361
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen
Assignee: bright chen


Currently, SpillableWindowedKeyedStorage remove(Window) will go through each 
key and mark all of them as deleted. It would be expensive when there are lots 
of keys and especially these entry already spill out of memory (this the common 
case when remove() was called).

Suggest to mark whole window as deleted. When the window was marked as deleted, 
it will not allowed to add/update any entry of this window ( this should match 
the requirement as remove(Window) only be called after allowed lateness



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-29 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706354#comment-15706354
 ] 

bright chen edited comment on APEXMALHAR-2339 at 11/29/16 8:01 PM:
---

KeyedWindowedOperatorImpl.fireNormalTrigger(Window, boolean) currently go 
through each window and key to check value. The data collection could be very 
huge as the discard period could be relative long time. If fireOnlyUpdatedPanes 
is false probably there don't have much space to improve. But if 
fireOnlyUpdatedPanes is true, we don't have to go through the whole data 
collection. We only need to go through the window and key which handle after 
last trigger.

See https://issues.apache.org/jira/browse/APEXMALHAR-2359


was (Author: brightchen):
KeyedWindowedOperatorImpl.fireNormalTrigger(Window, boolean) currently go 
through each window and key to check value. The data collection could be very 
huge as the discard period could be relative long time. If fireOnlyUpdatedPanes 
is false probably there don't have much space to improve. But if 
fireOnlyUpdatedPanes is true, we don't have to go through the whole data 
collection. We only need to go through the window and key which handle after 
last trigger.

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-21 at 10.34.38 AM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2359) Optimise fire trigger to avoid go through all data

2016-11-29 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2359:
---

 Summary: Optimise fire trigger to avoid go through all data
 Key: APEXMALHAR-2359
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2359
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen
Assignee: bright chen


KeyedWindowedOperatorImpl.fireNormalTrigger(Window, boolean) currently go 
through each window and key to check value. The data collection could be very 
huge as the discard period could be relative long time. If fireOnlyUpdatedPanes 
is false probably there don't have much space to improve. But if 
fireOnlyUpdatedPanes is true, we don't have to go through the whole data 
collection. We only need to go through the window and key which handle after 
last trigger.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-29 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706354#comment-15706354
 ] 

bright chen commented on APEXMALHAR-2339:
-

KeyedWindowedOperatorImpl.fireNormalTrigger(Window, boolean) currently go 
through each window and key to check value. The data collection could be very 
huge as the discard period could be relative long time. If fireOnlyUpdatedPanes 
is false probably there don't have much space to improve. But if 
fireOnlyUpdatedPanes is true, we don't have to go through the whole data 
collection. We only need to go through the window and key which handle after 
last trigger.

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-21 at 10.34.38 AM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2358) Optimise GenericSerde to use specific serde to improve the performance

2016-11-28 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2358:
---

 Summary: Optimise GenericSerde to use specific serde to improve 
the performance
 Key: APEXMALHAR-2358
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2358
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen
Assignee: bright chen






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2352) Improve performance of keyed windowed operators

2016-11-23 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691426#comment-15691426
 ] 

bright chen commented on APEXMALHAR-2352:
-

I am not very sure about the reason.
Maybe in equals, ImmutablePair use instanceof which need to compare the class 
hierarchy while com.datatorrent.common.util.Pair just use getClass()

> Improve performance of keyed windowed operators
> ---
>
> Key: APEXMALHAR-2352
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2352
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> refer to https://issues.apache.org/jira/browse/APEXMALHAR-2339 to the keyed 
> windowed operator benchmark



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2352) Improve performance of keyed windowed operators

2016-11-22 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688259#comment-15688259
 ] 

bright chen commented on APEXMALHAR-2352:
-

This is an issue causes memory leak: 
https://issues.apache.org/jira/browse/APEXMALHAR-2350

> Improve performance of keyed windowed operators
> ---
>
> Key: APEXMALHAR-2352
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2352
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> refer to https://issues.apache.org/jira/browse/APEXMALHAR-2339 to the keyed 
> windowed operator benchmark



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2352) Improve performance of keyed windowed operators

2016-11-22 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2352:

Description: 
refer to https://issues.apache.org/jira/browse/APEXMALHAR-2339 to the keyed 
windowed operator benchmark


> Improve performance of keyed windowed operators
> ---
>
> Key: APEXMALHAR-2352
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2352
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> refer to https://issues.apache.org/jira/browse/APEXMALHAR-2339 to the keyed 
> windowed operator benchmark



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2352) Improve performance of keyed windowed operators

2016-11-22 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2352:
---

 Summary: Improve performance of keyed windowed operators
 Key: APEXMALHAR-2352
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2352
 Project: Apache Apex Malhar
  Issue Type: Task
Reporter: bright chen
Assignee: bright chen






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-22 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687670#comment-15687670
 ] 

bright chen commented on APEXMALHAR-2339:
-

The performance of KeyedWindowed operator decrease rapidly with the increase of 
key size (number of keys);
  - key size is 10K, rate is around 50K/second
  - key size is 100K, rate would be less than 1k/second

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-21 at 10.34.38 AM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2350) The key and value stream should match with the bucket

2016-11-21 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2350:
---

 Summary: The key and value stream should match with the bucket
 Key: APEXMALHAR-2350
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2350
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen
Assignee: bright chen


In SpillableMapImpl, the bucket which the data put into will keep on changing 
instead of a fixed bucket. So the key stream and value stream should match to 
the bucket



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-21 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684495#comment-15684495
 ] 

bright chen commented on APEXMALHAR-2339:
-

The benchmark of Windowed Operator seems fine.
I ran more than 30 minutes, the average rate is around 40k/second.
Following is the code of the input tuple: new 
Tuple.TimestampedTuple(System.currentTimeMillis() - 
random.nextInt(12), (long)random.nextInt(10));


> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-21 at 10.34.38 AM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-21 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2339:

Comment: was deleted

(was: KeyedWindowedOperator have memory issue. And the rate started around 
40K/second, end with around 2K/second.

See attached screen shot
)

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-21 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2339:

Attachment: (was: Screen Shot 2016-11-18 at 3.49.21 PM.png)

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-21 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2339:

Attachment: (was: Screen Shot 2016-11-18 at 3.49.40 PM.png)

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-18 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15678164#comment-15678164
 ] 

bright chen commented on APEXMALHAR-2339:
-

KeyedWindowedOperator have memory issue. And the rate started around 
40K/second, end with around 2K/second.

See attached screen shot


> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-18 at 3.49.21 PM.png, Screen Shot 
> 2016-11-18 at 3.49.40 PM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-18 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2339:

Attachment: Screen Shot 2016-11-18 at 3.49.21 PM.png
Screen Shot 2016-11-18 at 3.49.40 PM.png

screen shot for memory and cpu

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-11-18 at 3.49.21 PM.png, Screen Shot 
> 2016-11-18 at 3.49.40 PM.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-18 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15678055#comment-15678055
 ] 

bright chen commented on APEXMALHAR-2339:
-

when use default AccumulationMode DISCARDING, it will have big impact for 
performance due to AbstractWindowedOperator.clearWindowData(Window)

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-18 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15677433#comment-15677433
 ] 

bright chen edited comment on APEXMALHAR-2339 at 11/18/16 6:52 PM:
---

test result for KeyedWindowedOperatorBenchmarkApp. 
  - number of key is 1k: rate is around 200k per second
  - number of key is 10k: rate is around 7k per second
Here key is the String of number. Here is the code to emit the tuple
data.emit(new Tuple.TimestampedTuple<>(System.currentTimeMillis() - 
random.nextInt(12), new KeyValPair("" + 
random.nextInt(1), (long)random.nextInt(100;


was (Author: brightchen):
test result for KeyedWindowedOperatorBenchmarkApp
  - number of key is 1k: rate is around 200k
  - number of key is 10k: rate is around 7k

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2339) Windowed Operator benchmarking

2016-11-18 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15677433#comment-15677433
 ] 

bright chen commented on APEXMALHAR-2339:
-

test result for KeyedWindowedOperatorBenchmarkApp
  - number of key is 1k: rate is around 200k
  - number of key is 10k: rate is around 7k

> Windowed Operator benchmarking
> --
>
> Key: APEXMALHAR-2339
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2339
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2347) DefaultBucket.getFromReaders() throws java.util.NoSuchElementException

2016-11-18 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2347:
---

 Summary: DefaultBucket.getFromReaders() throws 
java.util.NoSuchElementException
 Key: APEXMALHAR-2347
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2347
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen
Assignee: bright chen


java.util.NoSuchElementException
at java.util.TreeMap.key(TreeMap.java:1221)
at java.util.TreeMap.firstKey(TreeMap.java:285)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getFromReaders(Bucket.java:305)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.get(Bucket.java:352)
at 
org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.getValueFromBucketSync(AbstractManagedStateImpl.java:289)
at 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl.getSync(ManagedTimeUnifiedStateImpl.java:82)
at 
org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl.get(SpillableMapImpl.java:153)
at 
org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage.get(SpillableWindowedKeyedStorage.java:220)
at 
org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl.accumulateTuple(KeyedWindowedOperatorImpl.java:144)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2335) StateTracker can cause bucket memory leak

2016-11-17 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2335:

Summary: StateTracker can cause bucket memory leak  (was: StateTracker has 
memory leak)

> StateTracker can cause bucket memory leak
> -
>
> Key: APEXMALHAR-2335
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2335
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 144h
>  Remaining Estimate: 144h
>
> The intension of StateTracker#bucketAccessed as I understand was to keep the 
> last access time of each bucket so freeMemory can free the buckets which not 
> accessed recently.
> But here are some problems
> - StateTracker#bucketAccessed() will be called each time access to the 
> bucket. Current implementation use Concurrent map and set to handle 
> it(bucketHeap and bucketAccessTimes). it could be pretty heavy.
> - StateTracker#run use bucketHeap to get the bucket which need to free 
> memory, but at the same time bucketHeap was constantly changed by 
> bucketAccessed(). It could cause concurrent issue event bucketHeap is a 
> Current set. And even more the while loop could never end. Following two bug 
> probably related to this:
> https://issues.apache.org/jira/browse/APEXMALHAR-2333
> https://issues.apache.org/jira/browse/APEXMALHAR-2334
> Following are my thought to solve these issue.
> - It's not necessary to keep very accurate of update time, it probably only 
> need to be updated once for each stream window. But StateTracker don't 
> support endWindow() (in fact endWindow() was triggered at end of application 
> window). So, we can keep the bucket which updated in this period and update 
> only at the end of the period.
> - keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
> the lock only need when switch



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2335) StateTracker has memory leak

2016-11-17 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2335:

Summary: StateTracker has memory leak  (was: Problems on StateTracker)

> StateTracker has memory leak
> 
>
> Key: APEXMALHAR-2335
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2335
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 144h
>  Remaining Estimate: 144h
>
> The intension of StateTracker#bucketAccessed as I understand was to keep the 
> last access time of each bucket so freeMemory can free the buckets which not 
> accessed recently.
> But here are some problems
> - StateTracker#bucketAccessed() will be called each time access to the 
> bucket. Current implementation use Concurrent map and set to handle 
> it(bucketHeap and bucketAccessTimes). it could be pretty heavy.
> - StateTracker#run use bucketHeap to get the bucket which need to free 
> memory, but at the same time bucketHeap was constantly changed by 
> bucketAccessed(). It could cause concurrent issue event bucketHeap is a 
> Current set. And even more the while loop could never end. Following two bug 
> probably related to this:
> https://issues.apache.org/jira/browse/APEXMALHAR-2333
> https://issues.apache.org/jira/browse/APEXMALHAR-2334
> Following are my thought to solve these issue.
> - It's not necessary to keep very accurate of update time, it probably only 
> need to be updated once for each stream window. But StateTracker don't 
> support endWindow() (in fact endWindow() was triggered at end of application 
> window). So, we can keep the bucket which updated in this period and update 
> only at the end of the period.
> - keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
> the lock only need when switch



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-17 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen resolved APEXMALHAR-2321.
-
Resolution: Fixed

see https://issues.apache.org/jira/browse/APEXMALHAR-2335

> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-17 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2321:

Description: 
Currently buckets were managed as an array. Each bucket have memory limitation, 
and free memory will be triggered if the bucket memory usage over the 
limitation.


  was:
Currently buckets were managed as an array. Each bucket have memory limitation, 
and free memory will be triggered if the bucket memory usage over the 
limitation.

- For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
probably too large. But it can be changed by set 
Context.OperatorContext.APPLICATION_WINDOW_COUNT

- The default maxMemorySize is zero. It's better to give a default reasonable 
value to avoid too much garbage collection 



> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (APEXMALHAR-2335) Problems on StateTracker

2016-11-17 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2335:

Comment: was deleted

(was: see commit on https://issues.apache.org/jira/browse/APEXMALHAR-2329)

> Problems on StateTracker
> 
>
> Key: APEXMALHAR-2335
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2335
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 144h
>  Remaining Estimate: 144h
>
> The intension of StateTracker#bucketAccessed as I understand was to keep the 
> last access time of each bucket so freeMemory can free the buckets which not 
> accessed recently.
> But here are some problems
> - StateTracker#bucketAccessed() will be called each time access to the 
> bucket. Current implementation use Concurrent map and set to handle 
> it(bucketHeap and bucketAccessTimes). it could be pretty heavy.
> - StateTracker#run use bucketHeap to get the bucket which need to free 
> memory, but at the same time bucketHeap was constantly changed by 
> bucketAccessed(). It could cause concurrent issue event bucketHeap is a 
> Current set. And even more the while loop could never end. Following two bug 
> probably related to this:
> https://issues.apache.org/jira/browse/APEXMALHAR-2333
> https://issues.apache.org/jira/browse/APEXMALHAR-2334
> Following are my thought to solve these issue.
> - It's not necessary to keep very accurate of update time, it probably only 
> need to be updated once for each stream window. But StateTracker don't 
> support endWindow() (in fact endWindow() was triggered at end of application 
> window). So, we can keep the bucket which updated in this period and update 
> only at the end of the period.
> - keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
> the lock only need when switch



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (APEXMALHAR-2331) StateTracker#bucketAccessed should add bucket to bucketAccessTimes

2016-11-17 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen closed APEXMALHAR-2331.
---
Resolution: Invalid

The whole logic of bucketAccessed() changed. So this issue not valid any more

> StateTracker#bucketAccessed should add bucket to bucketAccessTimes
> --
>
> Key: APEXMALHAR-2331
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2331
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The bucket didn't add to the bucketAccessTimes, which cause lots of 
> BucketIdTimeWrapper instances created and added to bucketHeap



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2335) Problems on StateTracker

2016-11-10 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2335:

Summary: Problems on StateTracker  (was: Problems on 
StateTracker#bucketAccessed)

> Problems on StateTracker
> 
>
> Key: APEXMALHAR-2335
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2335
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 144h
>  Remaining Estimate: 144h
>
> The intension of StateTracker#bucketAccessed as I understand was to keep the 
> last access time of each bucket so freeMemory can free the buckets which not 
> accessed recently.
> But here are some problems
> - StateTracker#bucketAccessed() will be called each time access to the 
> bucket. Current implementation use Concurrent map and set to handle 
> it(bucketHeap and bucketAccessTimes). it could be pretty heavy.
> - StateTracker#run use bucketHeap to get the bucket which need to free 
> memory, but at the same time bucketHeap was constantly changed by 
> bucketAccessed(). It could cause concurrent issue event bucketHeap is a 
> Current set. And even more the while loop could never end. Following two bug 
> probably related to this:
> https://issues.apache.org/jira/browse/APEXMALHAR-2333
> https://issues.apache.org/jira/browse/APEXMALHAR-2334
> Following are my thought to solve these issue.
> - It's not necessary to keep very accurate of update time, it probably only 
> need to be updated once for each stream window. But StateTracker don't 
> support endWindow() (in fact endWindow() was triggered at end of application 
> window). So, we can keep the bucket which updated in this period and update 
> only at the end of the period.
> - keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
> the lock only need when switch



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2335) Problems on StateTracker#bucketAccessed

2016-11-09 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2335:
---

 Summary: Problems on StateTracker#bucketAccessed
 Key: APEXMALHAR-2335
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2335
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen
Assignee: bright chen


The intension of StateTracker#bucketAccessed as I understand was to keep the 
last access time of each bucket so freeMemory can free the buckets which not 
accessed recently.
But here are some problems

- StateTracker#bucketAccessed() will be called each time access to the bucket. 
Current implementation use Concurrent map and set to handle it(bucketHeap and 
bucketAccessTimes). it could be pretty heavy.

- StateTracker#run use bucketHeap to get the bucket which need to free memory, 
but at the same time bucketHeap was constantly changed by bucketAccessed(). It 
could cause concurrent issue event bucketHeap is a Current set. And even more 
the while loop could never end. Following two bug probably related to this:
https://issues.apache.org/jira/browse/APEXMALHAR-2333
https://issues.apache.org/jira/browse/APEXMALHAR-2334

Following are my thought to solve these issue.
- It's not necessary to keep very accurate of update time, it probably only 
need to be updated once for each stream window. But StateTracker don't support 
endWindow() (in fact endWindow() was triggered at end of application window). 
So, we can keep the bucket which updated in this period and update only at the 
end of the period.

- keep two sets of bucketHeap, one for bucketAccessed() and one for run(). so 
the lock only need when switch



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2332) StateTracker should make sure all the memory freed before remove the bucket from bucketHeap and bucketAccessTimes

2016-11-08 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2332:

Summary: StateTracker should make sure all the memory freed before remove 
the bucket from bucketHeap and bucketAccessTimes  (was: StateTracker should 
free memory after committed)

> StateTracker should make sure all the memory freed before remove the bucket 
> from bucketHeap and bucketAccessTimes
> -
>
> Key: APEXMALHAR-2332
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2332
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> Current StateTracker free memory was triggered by a timer. The default the 
> timer value was DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * 
> OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue. It would have memory 
> leak if the process with operator thread and memory release thread as 
> following:
> bucket1: put(), put() ... put()
> bucket2: put(), put() ... put()
> freeMemory(): {bucket removed from bucketHeap and bucketAccessTimes}
> commit: bucket1, bucket2
> in this case,  nothing was freed and the bucket can't be freed any more
> And the default value of free memory could large and memory used up even 
> before get chance of free memory. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2332) StateTracker should free memory after committed

2016-11-08 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2332:

Remaining Estimate: 120h  (was: 48h)
 Original Estimate: 120h  (was: 48h)
   Description: 
Current StateTracker free memory was triggered by a timer. The default the 
timer value was DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * 
OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue. It would have memory 
leak if the process with operator thread and memory release thread as following:

bucket1: put(), put() ... put()
bucket2: put(), put() ... put()
freeMemory(): {bucket removed from bucketHeap and bucketAccessTimes}
commit: bucket1, bucket2

in this case,  nothing was freed and the bucket can't be freed any more

And the default value of free memory could large and memory used up even before 
get chance of free memory. 

  was:
StateTracker#bucketAccessTimes keep the bucket access time order by access 
time. It was used by free memory thread to decide which bucket can be freed. 
So, each access to bucket include put and get should update the access time.

As bucketAccessTimes and bucketHeap are shared by two thread. update them for 
each operation could impact the performance. It better to update period. As 
Bucket don't support window operation. I am going to keep the update time and 
update when time out.


   Summary: StateTracker should free memory after committed  (was: 
StateTracker#bucketAccessed should be called each time access to the bucket)

> StateTracker should free memory after committed
> ---
>
> Key: APEXMALHAR-2332
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2332
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> Current StateTracker free memory was triggered by a timer. The default the 
> timer value was DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * 
> OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue. It would have memory 
> leak if the process with operator thread and memory release thread as 
> following:
> bucket1: put(), put() ... put()
> bucket2: put(), put() ... put()
> freeMemory(): {bucket removed from bucketHeap and bucketAccessTimes}
> commit: bucket1, bucket2
> in this case,  nothing was freed and the bucket can't be freed any more
> And the default value of free memory could large and memory used up even 
> before get chance of free memory. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2332) StateTracker#bucketAccessed should be called each time access to the bucket

2016-11-08 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2332:
---

 Summary: StateTracker#bucketAccessed should be called each time 
access to the bucket
 Key: APEXMALHAR-2332
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2332
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen
Assignee: bright chen


StateTracker#bucketAccessTimes keep the bucket access time order by access 
time. It was used by free memory thread to decide which bucket can be freed. 
So, each access to bucket include put and get should update the access time.

As bucketAccessTimes and bucketHeap are shared by two thread. update them for 
each operation could impact the performance. It better to update period. As 
Bucket don't support window operation. I am going to keep the update time and 
update when time out.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2331) StateTracker#bucketAccessed should add bucket to bucketAccessTimes

2016-11-08 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen reassigned APEXMALHAR-2331:
---

Assignee: bright chen

> StateTracker#bucketAccessed should add bucket to bucketAccessTimes
> --
>
> Key: APEXMALHAR-2331
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2331
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The bucket didn't add to the bucketAccessTimes, which cause lots of 
> BucketIdTimeWrapper instances created and added to bucketHeap



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2331) StateTracker#bucketAccessed should add bucket to bucketAccessTimes

2016-11-08 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2331:
---

 Summary: StateTracker#bucketAccessed should add bucket to 
bucketAccessTimes
 Key: APEXMALHAR-2331
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2331
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen


The bucket didn't add to the bucketAccessTimes, which cause lots of 
BucketIdTimeWrapper instances created and added to bucketHeap



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2329) ManagedState benchmark should not use constant bucket

2016-11-07 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2329:
---

 Summary: ManagedState benchmark should not use constant bucket
 Key: APEXMALHAR-2329
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2329
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen
Assignee: bright chen






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2327) BucketsFileSystem.writeBucketData() call Slice.toByteArray() cause allocate unnecessary memory

2016-11-03 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2327:

Remaining Estimate: 2m  (was: 120h)
 Original Estimate: 2m  (was: 120h)

> BucketsFileSystem.writeBucketData() call Slice.toByteArray() cause allocate 
> unnecessary memory
> --
>
> Key: APEXMALHAR-2327
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2327
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 2m
>  Remaining Estimate: 2m
>
> BucketsFileSystem.writeBucketData call Slice.toByteArray() instead of using 
> Slice.buffer to get the byte array, which create unnecessary byte array as 
> Slice.toByteArray() which create new byte array and copy the data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2327) BucketsFileSystem.writeBucketData() call Slice.toByteArray() cause allocate unnecessary memory

2016-11-03 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634460#comment-15634460
 ] 

bright chen commented on APEXMALHAR-2327:
-

Discussed with David, instead add new method append(Slice key, Slice value), 
but change the append(byte[] key, byte[] value) into append(Slice key, Slice 
value)

> BucketsFileSystem.writeBucketData() call Slice.toByteArray() cause allocate 
> unnecessary memory
> --
>
> Key: APEXMALHAR-2327
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2327
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> BucketsFileSystem.writeBucketData call Slice.toByteArray() instead of using 
> Slice.buffer to get the byte array, which create unnecessary byte array as 
> Slice.toByteArray() which create new byte array and copy the data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2327) BucketsFileSystem.writeBucketData() call Slice.toByteArray() cause allocate unnecessary memory

2016-11-03 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634322#comment-15634322
 ] 

bright chen commented on APEXMALHAR-2327:
-

[~csingh][~davidyan] Please comment if ok to add a method in the interface

> BucketsFileSystem.writeBucketData() call Slice.toByteArray() cause allocate 
> unnecessary memory
> --
>
> Key: APEXMALHAR-2327
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2327
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> BucketsFileSystem.writeBucketData call Slice.toByteArray() instead of using 
> Slice.buffer to get the byte array, which create unnecessary byte array as 
> Slice.toByteArray() which create new byte array and copy the data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-01 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626347#comment-15626347
 ] 

bright chen commented on APEXMALHAR-2321:
-

I changed to use keyStream and valueStream to manage the memory. But as we 
can't enforce the caller to use keyStream and valueStream to manage the memory, 
So it is also need to cleanup the maps.

[~csingh] How do you think? and I saw you also clear the fileCache and close 
readers in freeMemory(). Is it better to move to another function or rename 
this method into something like releaseResource?

> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.
> - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
> probably too large. But it can be changed by set 
> Context.OperatorContext.APPLICATION_WINDOW_COUNT
> - The default maxMemorySize is zero. It's better to give a default reasonable 
> value to avoid too much garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-01 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2321:

Description: 
Currently buckets were managed as an array. Each bucket have memory limitation, 
and free memory will be triggered if the bucket memory usage over the 
limitation.

- For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
probably too large. But it can be changed by set 
Context.OperatorContext.APPLICATION_WINDOW_COUNT

- The default maxMemorySize is zero. It's better to give a default reasonable 
value to avoid too much garbage collection 


  was:
Currently buckets were managed as an array. Each bucket have memory limitation, 
and free memory will be triggered if the bucket memory usage over the 
limitation.

- For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
probably too large.


- The default maxMemorySize is zero. It's better to give a default reasonable 
value to avoid too much garbage collection 



> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.
> - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
> probably too large. But it can be changed by set 
> Context.OperatorContext.APPLICATION_WINDOW_COUNT
> - The default maxMemorySize is zero. It's better to give a default reasonable 
> value to avoid too much garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-01 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626147#comment-15626147
 ] 

bright chen commented on APEXMALHAR-2321:
-

removed the incorrect statement

> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.
> - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
> probably too large. But it can be changed by set 
> Context.OperatorContext.APPLICATION_WINDOW_COUNT
> - The default maxMemorySize is zero. It's better to give a default reasonable 
> value to avoid too much garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-01 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2321:

Description: 
Currently buckets were managed as an array. Each bucket have memory limitation, 
and free memory will be triggered if the bucket memory usage over the 
limitation.

- For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
probably too large.


- The default maxMemorySize is zero. It's better to give a default reasonable 
value to avoid too much garbage collection 


  was:
Currently buckets were managed as an array. Each bucket have memory limitation, 
and free memory will be triggered if the bucket memory usage over the 
limitation.

- For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
probably too large.

- AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as the 
max memory size of total managed state, but in fact it was used as memory size 
of each bucket. Better to rename it.

- The default maxMemorySize is zero. It's better to give a default reasonable 
value to avoid too much garbage collection 



> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.
> - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
> probably too large.
> - The default maxMemorySize is zero. It's better to give a default reasonable 
> value to avoid too much garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2321) Improve Buckets memory management

2016-11-01 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626054#comment-15626054
 ] 

bright chen commented on APEXMALHAR-2321:
-

except release memory managed by SerializationBuffer, Other resources such as 
map also need to be released

> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.
> - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
> probably too large.
> - AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as 
> the max memory size of total managed state, but in fact it was used as memory 
> size of each bucket. Better to rename it.
> - The default maxMemorySize is zero. It's better to give a default reasonable 
> value to avoid too much garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2321) Improve Buckets memory management

2016-10-27 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen reassigned APEXMALHAR-2321:
---

Assignee: bright chen

> Improve Buckets memory management
> -
>
> Key: APEXMALHAR-2321
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> Currently buckets were managed as an array. Each bucket have memory 
> limitation, and free memory will be triggered if the bucket memory usage over 
> the limitation.
> - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
> probably too large.
> - AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as 
> the max memory size of total managed state, but in fact it was used as memory 
> size of each bucket. Better to rename it.
> - The default maxMemorySize is zero. It's better to give a default reasonable 
> value to avoid too much garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2321) Improve Buckets memory management

2016-10-27 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2321:
---

 Summary: Improve Buckets memory management
 Key: APEXMALHAR-2321
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen


Currently buckets were managed as an array. Each bucket have memory limitation, 
and free memory will be triggered if the bucket memory usage over the 
limitation.

- For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which 
probably too large.

- AbstractManagedStateImpl.maxMemorySize probably will be misunderstand as the 
max memory size of total managed state, but in fact it was used as memory size 
of each bucket. Better to rename it.

- The default maxMemorySize is zero. It's better to give a default reasonable 
value to avoid too much garbage collection 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2320) FSWindowDataManager.toSlice() can cause lots of garbage collection

2016-10-27 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen reassigned APEXMALHAR-2320:
---

Assignee: bright chen

> FSWindowDataManager.toSlice() can cause lots of garbage collection
> --
>
> Key: APEXMALHAR-2320
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2320
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> FSWindowDataManager.toSlice(Object) use ByteArrayOutputStream for 
> serialization and then call toByteArray() to get the byte array. Which can 
> cause lots of garbage collection.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2320) FSWindowDataManager.toSlice() can cause lots of garbage collection

2016-10-27 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2320:
---

 Summary: FSWindowDataManager.toSlice() can cause lots of garbage 
collection
 Key: APEXMALHAR-2320
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2320
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen


FSWindowDataManager.toSlice(Object) use ByteArrayOutputStream for serialization 
and then call toByteArray() to get the byte array. Which can cause lots of 
garbage collection.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2317) Change SpillableBenchmarkApp to adapt the change on Spillable Data Structure

2016-10-26 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen reassigned APEXMALHAR-2317:
---

Assignee: bright chen

> Change SpillableBenchmarkApp to adapt the change on Spillable Data Structure
> 
>
> Key: APEXMALHAR-2317
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2317
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>
> Nullpointer exception will be thrown due to the change of Spillable Data 
> Structure.
> java.lang.NullPointerException
>   at 
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.getBucket(AbstractManagedStateImpl.java:325)
>   at 
> org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.ensureBucket(AbstractManagedStateImpl.java:331)
>   at 
> org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl.setup(SpillableMapImpl.java:209)
>   at 
> com.datatorrent.benchmark.spillable.SpillableTestOperator.createWindowToCountMap(SpillableTestOperator.java:188)
>   at 
> com.datatorrent.benchmark.spillable.SpillableTestOperator.setup(SpillableTestOperator.java:85)
>   at com.datatorrent.common.util.BaseOperator.setup(BaseOperator.java:30)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2317) Change SpillableBenchmarkApp to adapt the change on Spillable Data Structure

2016-10-26 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2317:
---

 Summary: Change SpillableBenchmarkApp to adapt the change on 
Spillable Data Structure
 Key: APEXMALHAR-2317
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2317
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen


Nullpointer exception will be thrown due to the change of Spillable Data 
Structure.

java.lang.NullPointerException
at 
org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.getBucket(AbstractManagedStateImpl.java:325)
at 
org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl.ensureBucket(AbstractManagedStateImpl.java:331)
at 
org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl.setup(SpillableMapImpl.java:209)
at 
com.datatorrent.benchmark.spillable.SpillableTestOperator.createWindowToCountMap(SpillableTestOperator.java:188)
at 
com.datatorrent.benchmark.spillable.SpillableTestOperator.setup(SpillableTestOperator.java:85)
at com.datatorrent.common.util.BaseOperator.setup(BaseOperator.java:30)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2289) Managed State benchmark improvement

2016-10-11 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2289:
---

 Summary:  Managed State benchmark improvement
 Key: APEXMALHAR-2289
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2289
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen
Assignee: bright chen


It seems the State Manager will slow down after run a while. Add following 
feature to the Managed State benchmark for monitoring
1. monitor the system resource.
2. monitor the garbage collection 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2286) Make ByteStream extends from OutputStream in order to be used by Kyro

2016-10-10 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2286:
---

 Summary: Make ByteStream extends from OutputStream in order to be 
used by Kyro
 Key: APEXMALHAR-2286
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2286
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen
Assignee: bright chen


Make ByteStream extends from OutputStream so it can be used by Kyro Serializer



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2262) lock on AbstractManagedStateImpl.getValueFromBucketSync is too wide

2016-09-22 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2262:

Summary: lock on AbstractManagedStateImpl.getValueFromBucketSync is too 
wide  (was: lock on is too AbstractManagedStateImpl.getValueFromBucketSync wide)

> lock on AbstractManagedStateImpl.getValueFromBucketSync is too wide
> ---
>
> Key: APEXMALHAR-2262
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2262
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>Assignee: bright chen
>
> The Managed State used a lot of lock, which could impact a lot on 
> performance. 
> AbstractManagedStateImpl.getValueFromBucketSync(long, long, Slice) lock the 
> buck to get value, But if the value still in memory, the lock is not 
> necessary as flash is ConcurrentMap.
> probably AbstractManagedStateImpl should only lock when add/remove bucket. 
> And bucket handle read/write lock inside bucket
>   - 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXCORE-537) Slice hashCode should not depend on offset

2016-09-20 Thread bright chen (JIRA)
bright chen created APEXCORE-537:


 Summary: Slice hashCode should not depend on offset
 Key: APEXCORE-537
 URL: https://issues.apache.org/jira/browse/APEXCORE-537
 Project: Apache Apex Core
  Issue Type: Bug
Reporter: bright chen


The 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (APEXMALHAR-2243) change name StoreOperator.setExeModeStr(String) to setExecModeStr

2016-09-19 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen reassigned APEXMALHAR-2243:
---

Assignee: bright chen

> change name StoreOperator.setExeModeStr(String) to setExecModeStr
> -
>
> Key: APEXMALHAR-2243
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2243
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>Priority: Minor
>
> change name StoreOperator.setExeModeStr(String) to setExecModeStr for 
> consistence.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2243) change name StoreOperator.setExeModeStr(String) to setExecModeStr

2016-09-19 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2243:
---

 Summary: change name StoreOperator.setExeModeStr(String) to 
setExecModeStr
 Key: APEXMALHAR-2243
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2243
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen
Priority: Minor


change name StoreOperator.setExeModeStr(String) to setExecModeStr for 
consistence.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2238) State management throw exception while get value with async

2016-09-14 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2238:
---

 Summary: State management throw exception while get value with 
async
 Key: APEXMALHAR-2238
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2238
 Project: Apache Apex Malhar
  Issue Type: Bug
Reporter: bright chen


It maybe due to the window files already transfered.

Following is the exception:
2016-09-14 11:41:18,130 [2/Store:StoreOperator] ERROR engine.StreamingContainer 
run - Operator set 
[OperatorDeployInfo[id=2,name=Store,type=GENERIC,checkpoint={57d998af003b, 
0, 
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=Events,sourceNodeId=1,sourcePortName=data,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=]],outputs=[]]]
 stopped running due to an exception.
java.lang.RuntimeException: while loading 1, 345627
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:348)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getFromReaders(Bucket.java:293)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.get(Bucket.java:325)
at 
org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl$ValueFetchTask.call(AbstractManagedStateImpl.java:569)
at 
org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl$ValueFetchTask.call(AbstractManagedStateImpl.java:1)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File target/temp/1/345627 does not 
exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:520)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:398)
at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:137)
at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:763)
at 
com.datatorrent.lib.fileaccess.FileAccessFSImpl.getInputStream(FileAccessFSImpl.java:115)
at 
com.datatorrent.lib.fileaccess.TFileImpl$DTFileImpl.getReader(TFileImpl.java:170)
at 
org.apache.apex.malhar.lib.state.managed.BucketsFileSystem.getReader(BucketsFileSystem.java:98)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.loadFileReader(Bucket.java:373)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:343)
... 8 more
2016-09-14 11:41:18,133 [2/Store:StoreOperator] INFO  stram.StramLocalCluster 
log - container-1 msg: Stopped running due to an exception. 
java.lang.RuntimeException: while loading 1, 345627
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getValueFromTimeBucketReader(Bucket.java:348)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.getFromReaders(Bucket.java:293)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.get(Bucket.java:325)
at 
org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl$ValueFetchTask.call(AbstractManagedStateImpl.java:569)
at 
org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl$ValueFetchTask.call(AbstractManagedStateImpl.java:1)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File target/temp/1/345627 does not 
exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:520)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:398)
at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:137)
at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:763)
at 
com.datatorrent.lib.fileaccess.FileAccessFSImpl.getInputStream(FileAccessFSImpl.java:115)
at 
com.datatorrent.lib.fileaccess.TFileImpl$DTFileImpl.getReader(TFileImpl.java:170)
at 
org.apache.apex.malhar.lib.state.managed.BucketsFileSystem.getReader(BucketsFileSystem.java:98)
at 
org.apache.apex.malhar.lib.state.managed.Bucket$DefaultBucket.loadFileReader(Bucket.java:373)
at 

[jira] [Commented] (APEXMALHAR-2205) State management benchmark

2016-09-13 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488539#comment-15488539
 ] 

bright chen commented on APEXMALHAR-2205:
-

I updated the description and hopefully it is you wanted

> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: ManagedStateBenchmark.png
>
>
> The key was generated by following forum:
> (timestamp - timestamp % range) + random.nextInt(range)
> where the default value of range is 60k(1 minute).  
> With the default "range" value, the key will have the same prefix in one 
> minute. And the process rate is around 40k/s ( namely 2400k/m ). And the key 
> range is 60k, so roughly 40 operation for each key by average ( namely 1 
> insertion and another 39 update ).
> Following is the benchmark result: The highest is around 40K/s and lowest is 
> around 28k/s, average is around 35.5k/s
> See attached "ManagedStateBenchmark.png" for screen shot



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2205) State management benchmark

2016-09-13 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2205:

Description: 
The key was generated by following forum:
(timestamp - timestamp % range) + random.nextInt(range)
where the default value of range is 60k(1 minute).  
With the default "range" value, the key will have the same prefix in one 
minute. And the process rate is around 40k/s ( namely 2400k/m ). And the key 
range is 60k, so roughly 40 operation for each key by average ( namely 1 
insertion and another 39 update ).

Following is the benchmark result: The highest is around 40K/s and lowest is 
around 28k/s, average is around 35.5k/s
See attached "ManagedStateBenchmark.png" for screen shot




  was:
Following is the benchmark result: The highest is around 40K/s and lowest is 
around 28k/s, average is around 35.5k/s
See attached "ManagedStateBenchmark.png" for screen shot





> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: ManagedStateBenchmark.png
>
>
> The key was generated by following forum:
> (timestamp - timestamp % range) + random.nextInt(range)
> where the default value of range is 60k(1 minute).  
> With the default "range" value, the key will have the same prefix in one 
> minute. And the process rate is around 40k/s ( namely 2400k/m ). And the key 
> range is 60k, so roughly 40 operation for each key by average ( namely 1 
> insertion and another 39 update ).
> Following is the benchmark result: The highest is around 40K/s and lowest is 
> around 28k/s, average is around 35.5k/s
> See attached "ManagedStateBenchmark.png" for screen shot



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2205) State management benchmark

2016-09-12 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2205:

Description: 
Following is the benchmark result: The highest is around 40K/s and lowest is 
around 28k/s, average is around 35.5k/s
See attached "ManagedStateBenchmark.png" for screen shot




  was:
Following is the benchmark result:





> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: ManagedStateBenchmark.png
>
>
> Following is the benchmark result: The highest is around 40K/s and lowest is 
> around 28k/s, average is around 35.5k/s
> See attached "ManagedStateBenchmark.png" for screen shot



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2205) State management benchmark

2016-09-12 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2205:

Attachment: (was: Screen Shot 2016-09-12 at 1.53.22 PM.png)

> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: ManagedStateBenchmark.png
>
>
> Following is the benchmark result:



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2205) State management benchmark

2016-09-12 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2205:

Attachment: ManagedStateBenchmark.png

> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: ManagedStateBenchmark.png
>
>
> Following is the benchmark result:



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2205) State management benchmark

2016-09-12 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2205:

Attachment: Screen Shot 2016-09-12 at 1.53.22 PM.png

screen shot of throughput

> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
> Attachments: Screen Shot 2016-09-12 at 1.53.22 PM.png
>
>
> Following is the benchmark result:



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2205) State management benchmark

2016-09-12 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2205:

Description: 
Following is the benchmark result:




> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>
> Following is the benchmark result:



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2190) Use reusable buffer to serial spillable data structure

2016-09-12 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2190:

Description: 
Spillable Data Structure created lots of temporary memory to serial data lot of 
of memory copy( see SliceUtils.concatenate(byte[], byte[]). Which used up 
memory very quickly. See APEXMALHAR-2182.

Use a shared memory to avoid allocate temporary memory and memory copy

some basic ideas
- SerToLVBuffer interface provides a method serTo(T object, LengthValueBuffer 
buffer): instead of create a memory and then return the serialized data, this 
method let the caller pass in the buffer. So different objects or object with 
embed objects can share the same LengthValueBuffer

- LengthValueBuffer: It is a buffer which manage the memory as length and 
value(which is the generic format of serialized data). which provide length 
placeholder mechanism to avoid temporary memory and data copy when the length 
can be know after data serialized

- memory management classes: includes interface ByteStream and it's 
implementations: Block, FixedBlock, BlocksStream. Which provides a mechanism to 
dynamic allocate and manage memory. Which basically provides following 
function. I tried other some other stream mechamism such as 
ByteArrayInputStream, but it can meet 3rd criteria, and don't have good 
performance(50% loss) 
  - dynamic allocate memory
  - reset memory for reuse
  - BlocksStream make sure the output slices will not be changed when need 
extra memory; Block can change the reference of output slices buffer is data 
was moved due to reallocate of memory(BlocksStream is better solution).
  - WindowableBlocksStream extends from BlocksStream and provides function to 
reset memory window by window instead of reset all memory. It provides certain 
amount of cache( as bytes ) in memory

  was:
Spillable Data Structure created lots of temporary memory to serial data lot of 
of memory copy( see SliceUtils.concatenate(byte[], byte[]). Which used up 
memory very quickly. See APEXMALHAR-2182.

Use a shared memory to avoid allocate temporary memory and memory copy

some basic ideas
- SerToLVBuffer interface provides a method serTo(T object, LengthValueBuffer 
buffer): instead of create a memory and then return the serialized data, this 
method let the caller pass in the buffer. So different objects or object with 
embed objects can share the same LengthValueBuffer

- LengthValueBuffer: It is a buffer which manage the memory as length and 
value(which is the generic format of serialized data). which provide length 
placeholder mechanism to avoid temporary memory and data copy when the length 
can be know after data serialized

- memory management classes: includes interface ByteStream and it's 
implementations: Block, FixedBlock, BlocksStream. Which provides a mechanism to 
dynamic allocate and manage memory. Which basically provides following 
function. I tried other some other stream mechamism such as 
ByteArrayInputStream, but it can meet 3rd criteria, and don't have good 
performance(50% loss) 
  - dynamic allocate memory
  - reset memory for reuse
  - BlocksStream make sure the output slices will not be changed when need 
extra memory; Block can change the reference of output slices buffer is data 
was moved due to reallocate of memory(BlocksStream is better solution).



> Use reusable buffer to serial spillable data structure
> --
>
> Key: APEXMALHAR-2190
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2190
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> Spillable Data Structure created lots of temporary memory to serial data lot 
> of of memory copy( see SliceUtils.concatenate(byte[], byte[]). Which used up 
> memory very quickly. See APEXMALHAR-2182.
> Use a shared memory to avoid allocate temporary memory and memory copy
> some basic ideas
> - SerToLVBuffer interface provides a method serTo(T object, LengthValueBuffer 
> buffer): instead of create a memory and then return the serialized data, this 
> method let the caller pass in the buffer. So different objects or object with 
> embed objects can share the same LengthValueBuffer
> - LengthValueBuffer: It is a buffer which manage the memory as length and 
> value(which is the generic format of serialized data). which provide length 
> placeholder mechanism to avoid temporary memory and data copy when the length 
> can be know after data serialized
> - memory management classes: includes interface ByteStream and it's 
> implementations: Block, FixedBlock, BlocksStream. Which provides a mechanism 
> to dynamic allocate and manage memory. Which basically provides following 
> function. I tried other some other 

[jira] [Updated] (APEXMALHAR-2182) benchmark for spillable data structure

2016-09-12 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2182:

Description: 
The state management benchmark: 
https://issues.apache.org/jira/browse/APEXMALHAR-2205

The spillable data structure has the OutOfMemory issue due created lots of 
temporary memory for serialization. 
https://issues.apache.org/jira/browse/APEXMALHAR-2190 give a solution for this 
problem


> benchmark for spillable data structure
> --
>
> Key: APEXMALHAR-2182
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2182
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> The state management benchmark: 
> https://issues.apache.org/jira/browse/APEXMALHAR-2205
> The spillable data structure has the OutOfMemory issue due created lots of 
> temporary memory for serialization. 
> https://issues.apache.org/jira/browse/APEXMALHAR-2190 give a solution for 
> this problem



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2205) State management benchmark

2016-09-01 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15456794#comment-15456794
 ] 

bright chen commented on APEXMALHAR-2205:
-

The throughput ( almost same in cluster and local machine ) is around 
40k/second in peek and 28k/second at bottom. I ran around one hour and pretty 
stable.


> State management benchmark
> --
>
> Key: APEXMALHAR-2205
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2205
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2182) benchmark for spillable data structure

2016-08-31 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2182:

Summary: benchmark for spillable data structure  (was: benchmark for 
spillable data structure  and storage)

> benchmark for spillable data structure
> --
>
> Key: APEXMALHAR-2182
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2182
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2190) Use reusable buffer to serial spillable data structure

2016-08-26 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440182#comment-15440182
 ] 

bright chen commented on APEXMALHAR-2190:
-

Confirmed with David, he will SpillableMap to store the window

> Use reusable buffer to serial spillable data structure
> --
>
> Key: APEXMALHAR-2190
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2190
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> Spillable Data Structure created lots of temporary memory to serial data lot 
> of of memory copy( see SliceUtils.concatenate(byte[], byte[]). Which used up 
> memory very quickly. See APEXMALHAR-2182.
> Use a shared memory to avoid allocate temporary memory and memory copy
> some basic ideas
> - SerToLVBuffer interface provides a method serTo(T object, LengthValueBuffer 
> buffer): instead of create a memory and then return the serialized data, this 
> method let the caller pass in the buffer. So different objects or object with 
> embed objects can share the same LengthValueBuffer
> - LengthValueBuffer: It is a buffer which manage the memory as length and 
> value(which is the generic format of serialized data). which provide length 
> placeholder mechanism to avoid temporary memory and data copy when the length 
> can be know after data serialized
> - memory management classes: includes interface ByteStream and it's 
> implementations: Block, FixedBlock, BlocksStream. Which provides a mechanism 
> to dynamic allocate and manage memory. Which basically provides following 
> function. I tried other some other stream mechamism such as 
> ByteArrayInputStream, but it can meet 3rd criteria, and don't have good 
> performance(50% loss) 
>   - dynamic allocate memory
>   - reset memory for reuse
>   - BlocksStream make sure the output slices will not be changed when need 
> extra memory; Block can change the reference of output slices buffer is data 
> was moved due to reallocate of memory(BlocksStream is better solution).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXMALHAR-2190) Use reusable buffer to serial spillable data structure

2016-08-24 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXMALHAR-2190:

Summary: Use reusable buffer to serial spillable data structure  (was: Use 
shared memory to serial spillable data structure)

> Use reusable buffer to serial spillable data structure
> --
>
> Key: APEXMALHAR-2190
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2190
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> Spillable Data Structure created lots of temporary memory to serial data lot 
> of of memory copy( see SliceUtils.concatenate(byte[], byte[]). Which used up 
> memory very quickly. See APEXMALHAR-2182.
> Use a shared memory to avoid allocate temporary memory and memory copy
> some basic ideas
> - SerToLVBuffer interface provides a method serTo(T object, LengthValueBuffer 
> buffer): instead of create a memory and then return the serialized data, this 
> method let the caller pass in the buffer. So different objects or object with 
> embed objects can share the same LengthValueBuffer
> - LengthValueBuffer: It is a buffer which manage the memory as length and 
> value(which is the generic format of serialized data). which provide length 
> placeholder mechanism to avoid temporary memory and data copy when the length 
> can be know after data serialized
> - memory management classes: includes interface ByteStream and it's 
> implementations: Block, FixedBlock, BlocksStream. Which provides a mechanism 
> to dynamic allocate and manage memory. Which basically provides following 
> function. I tried other some other stream mechamism such as 
> ByteArrayInputStream, but it can meet 3rd criteria, and don't have good 
> performance(50% loss) 
>   - dynamic allocate memory
>   - reset memory for reuse
>   - BlocksStream make sure the output slices will not be changed when need 
> extra memory; Block can change the reference of output slices buffer is data 
> was moved due to reallocate of memory(BlocksStream is better solution).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2190) Use shared memory to serial spillable data structure

2016-08-22 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431248#comment-15431248
 ] 

bright chen commented on APEXMALHAR-2190:
-

Suppose for window for memory management:
The data could be reset would be window by window instead of whole data. the 
support of window could be management by outside code, for example, each window 
related to one instance of Block or BlockStream. More convenient is add 
function for support window.

> Use shared memory to serial spillable data structure
> 
>
> Key: APEXMALHAR-2190
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2190
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: bright chen
>Assignee: bright chen
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> Spillable Data Structure created lots of temporary memory to serial data lot 
> of of memory copy( see SliceUtils.concatenate(byte[], byte[]). Which used up 
> memory very quickly. See APEXMALHAR-2182.
> Use a shared memory to avoid allocate temporary memory and memory copy
> some basic ideas
> - SerToLVBuffer interface provides a method serTo(T object, LengthValueBuffer 
> buffer): instead of create a memory and then return the serialized data, this 
> method let the caller pass in the buffer. So different objects or object with 
> embed objects can share the same LengthValueBuffer
> - LengthValueBuffer: It is a buffer which manage the memory as length and 
> value(which is the generic format of serialized data). which provide length 
> placeholder mechanism to avoid temporary memory and data copy when the length 
> can be know after data serialized
> - memory management classes: includes interface ByteStream and it's 
> implementations: Block, FixedBlock, BlocksStream. Which provides a mechanism 
> to dynamic allocate and manage memory. Which basically provides following 
> function. I tried other some other stream mechamism such as 
> ByteArrayInputStream, but it can meet 3rd criteria, and don't have good 
> performance(50% loss) 
>   - dynamic allocate memory
>   - reset memory for reuse
>   - BlocksStream make sure the output slices will not be changed when need 
> extra memory; Block can change the reference of output slices buffer is data 
> was moved due to reallocate of memory(BlocksStream is better solution).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2193) Implement SpillableByteArrayListMultimapImpl.remove(@Nullable Object key, @Nullable Object value)

2016-08-22 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431168#comment-15431168
 ] 

bright chen commented on APEXMALHAR-2193:
-

All the update in fact only apply to in memory data. For the data already 
written to file(s), only the most recent file take effect. When get the value 
of a key, first get value from memory, then get value from files ordered by the 
time desc.

I had a solution for memory management, see PR 
https://github.com/apache/apex-malhar/pull/375 and  jira 
https://issues.apache.org/jira/browse/APEXMALHAR-2190. Right now, the reset() 
method was called by client code( after 
AbstractManagedStateImpl.beforeCheckpoint(long) called), it probably need to 
integrate with State management.

> Implement SpillableByteArrayListMultimapImpl.remove(@Nullable Object key, 
> @Nullable Object value)
> -
>
> Key: APEXMALHAR-2193
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2193
> Project: Apache Apex Malhar
>  Issue Type: Sub-task
>Reporter: David Yan
>Assignee: David Yan
>
> This is needed by SpillableSessionWindowedStorage. It needs a way to remove 
> the session window given the key in its internal keyToWindowsMap



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2193) Implement SpillableByteArrayListMultimapImpl.remove(@Nullable Object key, @Nullable Object value)

2016-08-19 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15429065#comment-15429065
 ] 

bright chen commented on APEXMALHAR-2193:
-

actually remove an entry would be very heavy as it only need need to remove 
from memory but also need to remove the entry from files too. As the get 
operation will get from memory first and then files.

suggest solutions
- Add the flag 'deleted' in the entry. so when delete an entry, just set this 
flag to true. When get, get the entry and check the flag and return null if 
'deleted' flag set.

- Add another spillable set ( which can be implemented by spillable map from 
key to deleted) just for deleted keys. when get, check the deleted keys, if 
deleted, return null; else get from the real map; when put, set the 
value("deleted") of deleted map to false ( same idea as solution 1) and put to 
real map; when delete, set the value("deleted") of deleted map to true.  For 
multimap, generate deleted map key from origin key and value.

> Implement SpillableByteArrayListMultimapImpl.remove(@Nullable Object key, 
> @Nullable Object value)
> -
>
> Key: APEXMALHAR-2193
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2193
> Project: Apache Apex Malhar
>  Issue Type: Sub-task
>Reporter: David Yan
>Assignee: David Yan
>
> This is needed by SpillableSessionWindowedStorage. It needs a way to remove 
> the session window given the key in its internal keyToWindowsMap



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >