Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-25 Thread Guozhang Wang
Thanks! You can follow this step-by-step guidance to contribute to Kafka
via github.

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest


Guozhang


On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome  wrote:

> I have a 1 liner solution for this in KTableFilter.java and about 5-6 lines
> changes to existing unit test KTableFilterTest.testSendingOldValue. I
> included those lines with context in the JIRA. I am struggling a bit with
> github being new to it and how to do a proper pull request so hopefully
> that can be followed up by you? I had the streams test suite pass aside for
> a few cases that pertain specifically to this JIRA as assumptions have now
> changed.
>
> On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang  wrote:
>
> > Hi Philippe,
> >
> > Great, since you agree with my reasonings, I have created a JIRA ticket
> for
> > optimizing KTableFilter (feel free to pick it up if you are interested in
> > contributing):
> >
> > https://issues.apache.org/jira/browse/KAFKA-3902
> >
> > About case 3-c-1), what I meant is that since "predicate return true on
> > both",
> > the resulted pair would just be the same as the original pair.
> >
> > About KIP-63, itself is a rather big story, but it has one correspondence
> > to this JIRA: with caching you can dedup some records with the same key,
> > for example in the input records to the KTable is:
> >
> > , , , , ,  ...
> >
> > And the KTable is materialized into a state store with cache on top of
> it,
> > then the resulted downstream could be:
> >
> >  1}>,  6}> ...
> >
> > Instead of
> >
> >  1}>,  2}>,  3}>, ...  6}> ...
> >
> > So if it is piped to a filter() operator, then even less data will be
> > produced.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome 
> > wrote:
> >
> > > Yes, it looks very good. Your detailed explanation appears compelling
> > > enough to reveal that some of the details of the complexity of a
> streams
> > > system are probably inherent complexity (not that I dared assume it was
> > > "easy" but I could afford to be conveniently unaware). It took me 30
> > > minutes to grasp this latest response.
> > >
> > > There might be a typo in your email for case 3.c.1) as I would think we
> > > should send the most recent pair as opposed to original, in any event
> it
> > > does not materially impact your presentation.
> > >
> > > Your case 3a) is really what triggered my line of questioning and I
> found
> > > the current behaviour vexing as it may lead to some undesirable and
> > > necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample
> at
> > > the very end trying to weed out null) used to output to topic to
> console.
> > > Without looking at design, it seemed self-evident to me that the 3a)
> > > behaviour had to be implemented ( from my point of view with the code
> > > example I was looking at, it simply means never say to delete a key
> that
> > > was never created, simply don't "create a deleted" key).
> > >
> > > Likewise cases 3 b,c look very reasonable.
> > >
> > > Just out of curiosity, did you effectively just restate the essence of
> > > KIP-63 in a more approachable language I could understand or is KIP-63
> > > really a different beast?
> > >
> > >
> > >
> > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Philippe,
> > > >
> > > > Very good points, let me dump my thoughts about "KTable.filter"
> > > > specifically and how we can improve on that:
> > > >
> > > > 1. Some context: when a KTable participates in a downstream operators
> > > (e.g.
> > > > if that operator is an aggregation), then we need to materialize this
> > > > KTable and send both its old value as well as new value as a pair
> {old
> > ->
> > > > new} to the downstream operator. In practice it usually needs to send
> > the
> > > > pair.
> > > >
> > > > So let's discuss about them separately, take the following example
> > source
> > > > stream for your KTable
> > > >
> > > > , ,  ...
> > > >
> > > > When the KTable needs to be materialized, it will transform the
> source
> > > > messages into the pairs of:
> > > >
> > > >  1}>,  2}>,  3}>
> > > >
> > > > 2. If "send old value" is not enabled, then when the filter predicate
> > > > returns false, we MUST send a  to the downstream operator
> to
> > > > indicate that this key is being filtered in the table. Otherwise, for
> > > > example if your filter is "value < 2", then the updated value 
> > will
> > > > just be filtered, resulting in incorrect semantics.
> > > >
> > > > If it returns true we should still send the original  to
> > > > downstream operators.
> > > >
> > > > 3. If "send old value" is enabled, then there are a couple of cases
> we
> > > can
> > > > consider:
> > > >
> > > > a. If old value is  and new value is ,
> > and
> > > > the filter predicate return false for the 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-25 Thread Philippe Derome
I have a 1 liner solution for this in KTableFilter.java and about 5-6 lines
changes to existing unit test KTableFilterTest.testSendingOldValue. I
included those lines with context in the JIRA. I am struggling a bit with
github being new to it and how to do a proper pull request so hopefully
that can be followed up by you? I had the streams test suite pass aside for
a few cases that pertain specifically to this JIRA as assumptions have now
changed.

On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang  wrote:

> Hi Philippe,
>
> Great, since you agree with my reasonings, I have created a JIRA ticket for
> optimizing KTableFilter (feel free to pick it up if you are interested in
> contributing):
>
> https://issues.apache.org/jira/browse/KAFKA-3902
>
> About case 3-c-1), what I meant is that since "predicate return true on
> both",
> the resulted pair would just be the same as the original pair.
>
> About KIP-63, itself is a rather big story, but it has one correspondence
> to this JIRA: with caching you can dedup some records with the same key,
> for example in the input records to the KTable is:
>
> , , , , ,  ...
>
> And the KTable is materialized into a state store with cache on top of it,
> then the resulted downstream could be:
>
>  1}>,  6}> ...
>
> Instead of
>
>  1}>,  2}>,  3}>, ...  6}> ...
>
> So if it is piped to a filter() operator, then even less data will be
> produced.
>
>
> Guozhang
>
>
> On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome 
> wrote:
>
> > Yes, it looks very good. Your detailed explanation appears compelling
> > enough to reveal that some of the details of the complexity of a streams
> > system are probably inherent complexity (not that I dared assume it was
> > "easy" but I could afford to be conveniently unaware). It took me 30
> > minutes to grasp this latest response.
> >
> > There might be a typo in your email for case 3.c.1) as I would think we
> > should send the most recent pair as opposed to original, in any event it
> > does not materially impact your presentation.
> >
> > Your case 3a) is really what triggered my line of questioning and I found
> > the current behaviour vexing as it may lead to some undesirable and
> > necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample at
> > the very end trying to weed out null) used to output to topic to console.
> > Without looking at design, it seemed self-evident to me that the 3a)
> > behaviour had to be implemented ( from my point of view with the code
> > example I was looking at, it simply means never say to delete a key that
> > was never created, simply don't "create a deleted" key).
> >
> > Likewise cases 3 b,c look very reasonable.
> >
> > Just out of curiosity, did you effectively just restate the essence of
> > KIP-63 in a more approachable language I could understand or is KIP-63
> > really a different beast?
> >
> >
> >
> > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Philippe,
> > >
> > > Very good points, let me dump my thoughts about "KTable.filter"
> > > specifically and how we can improve on that:
> > >
> > > 1. Some context: when a KTable participates in a downstream operators
> > (e.g.
> > > if that operator is an aggregation), then we need to materialize this
> > > KTable and send both its old value as well as new value as a pair {old
> ->
> > > new} to the downstream operator. In practice it usually needs to send
> the
> > > pair.
> > >
> > > So let's discuss about them separately, take the following example
> source
> > > stream for your KTable
> > >
> > > , ,  ...
> > >
> > > When the KTable needs to be materialized, it will transform the source
> > > messages into the pairs of:
> > >
> > >  1}>,  2}>,  3}>
> > >
> > > 2. If "send old value" is not enabled, then when the filter predicate
> > > returns false, we MUST send a  to the downstream operator to
> > > indicate that this key is being filtered in the table. Otherwise, for
> > > example if your filter is "value < 2", then the updated value 
> will
> > > just be filtered, resulting in incorrect semantics.
> > >
> > > If it returns true we should still send the original  to
> > > downstream operators.
> > >
> > > 3. If "send old value" is enabled, then there are a couple of cases we
> > can
> > > consider:
> > >
> > > a. If old value is  and new value is ,
> and
> > > the filter predicate return false for the new value, then in this case
> it
> > > is safe to optimize and not returning anything to the downstream
> > operator,
> > > since in this case we know there is no value for the key previously
> > > anyways; otherwise we send the original pair.
> > >
> > > b. If old value is  and new value is ,
> > > indicating to delete this key, and the filter predicate return false
> for
> > > the old value, then in this case it is safe to optimize and not
> returning
> > > anything to the downstream operator, since we know that the old value
> has
> > > 

[jira] [Commented] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic

2016-06-25 Thread Phil Derome (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15349932#comment-15349932
 ] 

Phil Derome commented on KAFKA-3902:


one liner fix, but am unfamiliar with github and submitting proper pull 
request. Code is below, I used 10.0.0 as a base.

Files: KTableFilter.java (1 liner), KTableFilterTest.java (4 lines changes or 
so), MockProcessorSupplier.java (new 1 liner method)

KTableFilter.java
   private class KTableFilterProcessor extends AbstractProcessor {

@Override
public void process(K key, Change change) {
V newValue = computeValue(key, change.newValue);
V oldValue = sendOldValues ? computeValue(key, change.oldValue) : 
null;

if (sendOldValues && oldValue == null && newValue == null) return; 
// unnecessary to forward here.
context().forward(key, new Change<>(newValue, oldValue));
}

}

KTableFilterTest.java
   @Test
public void testSendingOldValue() throws IOException {
KStreamBuilder builder = new KStreamBuilder();

String topic1 = "topic1";

KTableImpl table1 =
(KTableImpl) 
builder.table(stringSerde, intSerde, topic1);
KTableImpl table2 = (KTableImpl) table1.filter(
new Predicate() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
});

table2.enableSendingOldValues();

MockProcessorSupplier proc1 = new 
MockProcessorSupplier<>();
MockProcessorSupplier proc2 = new 
MockProcessorSupplier<>();

builder.addProcessor("proc1", proc1, table1.name);
builder.addProcessor("proc2", proc2, table2.name);

driver = new KStreamTestDriver(builder, stateDir, null, null);

driver.process(topic1, "A", 1);
driver.process(topic1, "B", 1);
driver.process(topic1, "C", 1);

proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
proc2.checkEmpty(); // we got nothing since all inputs are odd or 
filtered out

driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2);

proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); // we 
are informed of 2 making it in for both A and B

driver.process(topic1, "A", 3);

proc1.checkAndClearProcessResult("A:(3<-2)");
proc2.checkAndClearProcessResult("A:(null<-2)"); // no change for B but 
A is deleted

driver.process(topic1, "A", null);
driver.process(topic1, "B", null);

proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
proc2.checkAndClearProcessResult("B:(null<-2)");  // B is deleted from 
source Table1
}

MockProcessorSupplier.java:
public void checkEmpty() {
assertEquals("the number of outputs:", 0, processed.size());
}


> Optimize KTable.filter() to reduce unnecessary traffic
> --
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{, ,  ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{ 1\}>,  2\}>,  3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a  to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value  will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original  to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
> a. If old value is  and new value is , and the 
> filter predicate return false for the new 

[jira] [Issue Comment Deleted] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic

2016-06-25 Thread Phil Derome (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Derome updated KAFKA-3902:
---
Comment: was deleted

(was: Can we remove in KTableFilterTest  testSendingOldValue references to 
proc2 as a consequence of this ticket? Specifically following ones (towards end 
of file):

   proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", 
"C:(null<-null)");
   proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
   proc2.checkAndClearProcessResult("A:(null<-2)");
   proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");

My tentative fix invalidates these ones.)

> Optimize KTable.filter() to reduce unnecessary traffic
> --
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{, ,  ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{ 1\}>,  2\}>,  3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a  to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value  will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original  to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
> a. If old value is  and new value is , and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
> b. If old value is  and new value is , 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send 
> anything;
> 3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
> 4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1554: MINOR: Follow-up from KAFKA-3842 with suggested fi...

2016-06-25 Thread bbejeck
GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/1554

MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating tem…

…p directories, waitForCondition

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka follow_up_for_KAFKA-3842

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1554


commit 48336f953a8f93a705aa756aa0b95ef356599ee0
Author: bbejeck 
Date:   2016-06-26T01:24:08Z

MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating temp 
directories, waitForCondition




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3842) Add Helper Functions Into TestUtils

2016-06-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15349914#comment-15349914
 ] 

ASF GitHub Bot commented on KAFKA-3842:
---

GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/1554

MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating tem…

…p directories, waitForCondition

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka follow_up_for_KAFKA-3842

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1554


commit 48336f953a8f93a705aa756aa0b95ef356599ee0
Author: bbejeck 
Date:   2016-06-26T01:24:08Z

MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating temp 
directories, waitForCondition




> Add Helper Functions Into TestUtils
> ---
>
> Key: KAFKA-3842
> URL: https://issues.apache.org/jira/browse/KAFKA-3842
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 0.10.1.0
>
>
> Per guidance from [~guozhang] from PR #1477 move helper functions from 
> RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, 
> getStreamsConfig into TestUtils and parameterize as appropriate.  Also look 
> into adding a {{waitUntil(Condition condition)}} type construct to wait for a 
> condition to be met without relying on using Thread.sleep



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic

2016-06-25 Thread Phil Derome (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15349872#comment-15349872
 ] 

Phil Derome commented on KAFKA-3902:


Can we remove in KTableFilterTest  testSendingOldValue references to proc2 as a 
consequence of this ticket? Specifically following ones (towards end of file):

   proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", 
"C:(null<-null)");
   proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
   proc2.checkAndClearProcessResult("A:(null<-2)");
   proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");

My tentative fix invalidates these ones.

> Optimize KTable.filter() to reduce unnecessary traffic
> --
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{, ,  ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{ 1\}>,  2\}>,  3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a  to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value  will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original  to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
> a. If old value is  and new value is , and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
> b. If old value is  and new value is , 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send 
> anything;
> 3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
> 4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores

2016-06-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15349772#comment-15349772
 ] 

ASF GitHub Bot commented on KAFKA-3740:
---

GitHub user HenryCaiHaiying opened a pull request:

https://github.com/apache/kafka/pull/1553

KAFKA-3740: Add configs for RocksDBStore

This is the part I of the work to add the StreamsConfig to ProcessorContext.

We need to access StreamsConfig in the ProcessorContext so other components 
(e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from 
application)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HenryCaiHaiying/kafka config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1553


commit 12973c2d090268db416417b18dfbbe3d96f6a9d5
Author: Henry Cai 
Date:   2016-06-25T20:19:42Z

KAFKA-3740: Add configs for RocksDBStore

This is the part I of the work to add the StreamsConfig to ProcessorContext.

We need to access StreamsConfig in the ProcessorContext so other components 
(e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from 
application)




> Add configs for RocksDBStores
> -
>
> Key: KAFKA-3740
> URL: https://issues.apache.org/jira/browse/KAFKA-3740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, 
> or the default values are directly used. We need to make them configurable 
> for advanced users. For example, some default values may not work perfectly 
> for some scenarios: 
> https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
>  
> One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
> to "StreamsConfig", which defines all related rocksDB options configs, that 
> can be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1553: KAFKA-3740: Add configs for RocksDBStore

2016-06-25 Thread HenryCaiHaiying
GitHub user HenryCaiHaiying opened a pull request:

https://github.com/apache/kafka/pull/1553

KAFKA-3740: Add configs for RocksDBStore

This is the part I of the work to add the StreamsConfig to ProcessorContext.

We need to access StreamsConfig in the ProcessorContext so other components 
(e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from 
application)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HenryCaiHaiying/kafka config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1553


commit 12973c2d090268db416417b18dfbbe3d96f6a9d5
Author: Henry Cai 
Date:   2016-06-25T20:19:42Z

KAFKA-3740: Add configs for RocksDBStore

This is the part I of the work to add the StreamsConfig to ProcessorContext.

We need to access StreamsConfig in the ProcessorContext so other components 
(e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from 
application)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3903) Convert tests to use static helper methods for Consumer/Producer/StreamsConfigs setup

2016-06-25 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-3903:
--

 Summary: Convert tests to use static helper methods for 
Consumer/Producer/StreamsConfigs setup
 Key: KAFKA-3903
 URL: https://issues.apache.org/jira/browse/KAFKA-3903
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 0.10.1.0


There are several unit/integration tests where we create 
Consumer/Producer/Streams configs.  All of these calls essentially create the 
same configs over and over.  We should migrate these config setups to use the 
static helper methods  TestUtils.consumerConfigs, TestUtils.producerConfigs, 
StreamsTestUtils.getStreamsConfigs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: delay of producer and consumer in kafka 0.9 is too big to be accepted

2016-06-25 Thread Jay Kreps
Can you sanity check this with the end-to-end latency test that ships with
Kafka in the tools package?

https://apache.googlesource.com/kafka/+/1769642bb779921267bd57d3d338591dbdf33842/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala

On Saturday, June 25, 2016, Kafka  wrote:

> Hi all,
> my kafka cluster is composed of three brokers with each have 8core
> cpu and 8g memory and 1g network card.
> with java async client,I sent 100 messages with size of 1024
> bytes per message ,the send gap between each sending is 20us,the consumer’s
> config is like this,fetch.min.bytes is set to 1, fetch.wait.max.ms is set
> to 100.
> to avoid the inconformity bewteen two machines,I start producer
> and consumer at the same machine,the machine’s configurations  have enough
> resources to satisfy these two clients.
>
> I start consumer before producer on each test,with the sending
> timestamp in each message,when consumer receive the message,then I can got
> the consumer delay through the substraction between current timesstamp and
> sending timestamp.
> when I set acks to 0,replica to 2,then the average producer delay
> is 2.98ms, the average consumer delay is 52.23ms.
> when I set acks to 1,replica to 2,then the average producer delay
> is 3.9ms,the average consumer delay is 44.88ms.
> when I set acks to -1, replica to 2, then the average producer
> delay is 1782ms, the average consumer delay is 1786ms.
>
> I have two doubts,the first is why my  consumer's delay with acks
> settled to 0  is logger than the consumer delay witch acks settled to 1.
> the second is why the delay of producer and consumer is so big when I set
> acks to -1,I think this delay is can not be accepted.
> and I found this delay is amplified with sending more messages.
>
> any feedback is appreciated.
> thanks
>
>
>
>


delay of producer and consumer in kafka 0.9 is too big to be accepted

2016-06-25 Thread Kafka
Hi all,
my kafka cluster is composed of three brokers with each have 8core cpu 
and 8g memory and 1g network card.
with java async client,I sent 100 messages with size of 1024 bytes 
per message ,the send gap between each sending is 20us,the consumer’s config is 
like this,fetch.min.bytes is set to 1, fetch.wait.max.ms is set to 100.
to avoid the inconformity bewteen two machines,I start producer and 
consumer at the same machine,the machine’s configurations  have enough 
resources to satisfy these two clients.

I start consumer before producer on each test,with the sending 
timestamp in each message,when consumer receive the message,then I can got the 
consumer delay through the substraction between current timesstamp and sending 
timestamp.
when I set acks to 0,replica to 2,then the average producer delay is 
2.98ms, the average consumer delay is 52.23ms.
when I set acks to 1,replica to 2,then the average producer delay is 
3.9ms,the average consumer delay is 44.88ms.
when I set acks to -1, replica to 2, then the average producer delay is 
1782ms, the average consumer delay is 1786ms.

I have two doubts,the first is why my  consumer's delay with acks 
settled to 0  is logger than the consumer delay witch acks settled to 1.
the second is why the delay of producer and consumer is so big when I set acks 
to -1,I think this delay is can not be accepted.
and I found this delay is amplified with sending more messages.

any feedback is appreciated. 
thanks