Let’s say I have this:

KStream<String, CallRecord>[] branches = allRecords
    .branch(
            (imsi, callRecord) -> 
"VOICE".equalsIgnoreCase(callRecord.getCallCommType()),
            (imsi, callRecord) -> 
"DATA".equalsIgnoreCase(callRecord.getCallCommType()),
            (imsi, callRecord) -> true
    );
KStream<String, CallRecord> callRecords = branches[0];
KStream<String, CallRecord> dataRecords = branches[1];
KStream<String, CallRecord> callRecordCounter = branches[2];

callRecordCounter
        .map((imsi, callRecord) -> new KeyValue<>("", ""))
        .countByKey(
                UnlimitedWindows.of("counter-window"),
                stringSerde
        )
        .print();

Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if data 
is DATA. Branch 2 is supposed to get triggered regardless of type all the type 
so that then I can count stuff for a time window. BUT the problem is branch is 
implemented like this:

private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
    @Override
    public void process(K key, V value) {
        for (int i = 0; i < predicates.length; i++) {
            if (predicates[i].test(key, value)) {
                // use forward with childIndex here and then break the loop
                // so that no record is going to be piped to multiple streams
                context().forward(key, value, i);
                break;
            }
        }
    }
}

Note the break. So the counter branch is never reached. I’d like to change the 
behavior of branch so that all predicates are checked and no break happens, in 
say a branchAll() method. What’s the easiest way to this functionality to the 
DSL? I tried process() but it doesn’t return KStream.

Ara.




________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.

________________________________

Reply via email to