[ 
https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719527#comment-16719527
 ] 

hitesh gollahalli bachanna commented on KAFKA-7640:
---------------------------------------------------

[~vvcephei] sure, I have attached some sceenshot that shows how the data and 
request are on different host. I will work on gathering actual debug logs from 
kafka internal class. 

There are 3 files.

1. intital_data_host.jpeg – shows that which consumer received data. The last 
column(host) shows that. But iI used the ip address to make the actual rest 
call, which are routable.

2. rest_api_call_host.jpeg – after I sent the message, I make a rest call, they 
land on some random node, host name are on the last column.

3. rest_redirect_host.jpeg – code check of the host is localSelf, if not send 
the request to ip/host to fetch the result, which is a different host compared 
to what is shown in the intital_data_host.jpeg file.

 

here is rest call code for reference 

 
{code:java}
        HostStoreInfo hostStoreInfo = 
metadataService.streamsMetadataForStoreAndKey(
                config.getMOVE_DATA_AGGS_STORE_NAME(),
                storeDpci,
                new JsonSerializer<>());
        System.out.println("host_info :" + hostStoreInfo + " for_key :" + 
storeDpci);

        MoveDataVo moveDataVo = null;
        System.out.println("localSelf :" + localSelf.host() + " remote_host:" + 
hostStoreInfo.getHost());
        
if (localSelf.host().equals(hostStoreInfo.getHost())) {
            ReadOnlyKeyValueStore<String, MoveDataVo> store = 
kafkaStreams.store(config.getMOVE_DATA_AGGS_STORE_NAME(), 
QueryableStoreTypes.<String, MoveDataVo>keyValueStore());
            if (store != null) {
                moveDataVo = store.get(storeDpci);
            }
        } else {
            String fooResourceUrl
                    = "http://"; + hostStoreInfo.getHost() + ":" + 8080 + 
"/dataaggs/v1/get_state?storedpci=" + storeDpci + "";
            ResponseEntity<MoveDataVo> response
                    = restTemplate.getForEntity(fooResourceUrl + "", 
MoveDataVo.class);
            moveDataVo = response.getBody();
            System.out.println("response_from_other: " + moveDataVo);
        }
{code}
 

 

 

> Kafka stream interactive query not returning data when state is backed by 
> rocksdb
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7640
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7640
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: hitesh gollahalli bachanna
>            Priority: Major
>         Attachments: intital_data_host.jpeg, rest_api_call_host.jpeg, 
> rest_redirect_host.jpeg
>
>
> I have a kafka stream app running with 36 different instance (one for each 
> partition). Each instance come up one after the other. And I am building rest 
> service on top of the state to access the data.
> Here some code that I use:
> {code:java}
> StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); 
> --> call this find ouy which host has the key
> if (localSelf.host().equals(hostStoreInfo.getHost())) {
> get the key from local store
> }
> else {
> call the remote host using restTemplate
> }{code}
> The problem now is `metadata` object returned has a different host/ip but the 
> data is on a different node. I was able to see using some application logs I 
> printed. This happens every time I start my application.
> The `allMetadata` method in `KafkaStreams` class says the value will be 
> update as when the partition get reassigned. But its not happening in this 
> case. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to