[
https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719527#comment-16719527
]
hitesh gollahalli bachanna commented on KAFKA-7640:
---------------------------------------------------
[~vvcephei] sure, I have attached some sceenshot that shows how the data and
request are on different host. I will work on gathering actual debug logs from
kafka internal class.
There are 3 files.
1. intital_data_host.jpeg – shows that which consumer received data. The last
column(host) shows that. But iI used the ip address to make the actual rest
call, which are routable.
2. rest_api_call_host.jpeg – after I sent the message, I make a rest call, they
land on some random node, host name are on the last column.
3. rest_redirect_host.jpeg – code check of the host is localSelf, if not send
the request to ip/host to fetch the result, which is a different host compared
to what is shown in the intital_data_host.jpeg file.
here is rest call code for reference
{code:java}
HostStoreInfo hostStoreInfo =
metadataService.streamsMetadataForStoreAndKey(
config.getMOVE_DATA_AGGS_STORE_NAME(),
storeDpci,
new JsonSerializer<>());
System.out.println("host_info :" + hostStoreInfo + " for_key :" +
storeDpci);
MoveDataVo moveDataVo = null;
System.out.println("localSelf :" + localSelf.host() + " remote_host:" +
hostStoreInfo.getHost());
if (localSelf.host().equals(hostStoreInfo.getHost())) {
ReadOnlyKeyValueStore<String, MoveDataVo> store =
kafkaStreams.store(config.getMOVE_DATA_AGGS_STORE_NAME(),
QueryableStoreTypes.<String, MoveDataVo>keyValueStore());
if (store != null) {
moveDataVo = store.get(storeDpci);
}
} else {
String fooResourceUrl
= "http://" + hostStoreInfo.getHost() + ":" + 8080 +
"/dataaggs/v1/get_state?storedpci=" + storeDpci + "";
ResponseEntity<MoveDataVo> response
= restTemplate.getForEntity(fooResourceUrl + "",
MoveDataVo.class);
moveDataVo = response.getBody();
System.out.println("response_from_other: " + moveDataVo);
}
{code}
> Kafka stream interactive query not returning data when state is backed by
> rocksdb
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-7640
> URL: https://issues.apache.org/jira/browse/KAFKA-7640
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.0.0
> Reporter: hitesh gollahalli bachanna
> Priority: Major
> Attachments: intital_data_host.jpeg, rest_api_call_host.jpeg,
> rest_redirect_host.jpeg
>
>
> I have a kafka stream app running with 36 different instance (one for each
> partition). Each instance come up one after the other. And I am building rest
> service on top of the state to access the data.
> Here some code that I use:
> {code:java}
> StreamsMetadata metadata = streams.metadataForKey(store, key, serializer);
> --> call this find ouy which host has the key
> if (localSelf.host().equals(hostStoreInfo.getHost())) {
> get the key from local store
> }
> else {
> call the remote host using restTemplate
> }{code}
> The problem now is `metadata` object returned has a different host/ip but the
> data is on a different node. I was able to see using some application logs I
> printed. This happens every time I start my application.
> The `allMetadata` method in `KafkaStreams` class says the value will be
> update as when the partition get reassigned. But its not happening in this
> case.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)