Thanks Matthias. Can you also please confirm the compatible versions of the
client dependencies . Our broker version is 10.2.1 and when i updgrade the
client library to 1.1.0, i am getting a issue with tests while starting the
embedded cluster .
Test dependencies are (kafka-stream.version is 1.1.0)

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka-stream.version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka-stream.version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>${kafka-stream.version}</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>



Test error
=======
java.lang.AbstractMethodError:
kafka.zk.EmbeddedZookeeper.kafka$utils$Logging$_setter_$loggerName_$eq(Ljava/lang/String;)V

at kafka.utils.Logging$class.$init$(Logging.scala:23)
at kafka.zk.EmbeddedZookeeper.<init>(EmbeddedZookeeper.scala:37)
at
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:87)
at
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.before(EmbeddedKafkaCluster.java:153)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at
com.github.tomakehurst.wiremock.junit.WireMockClassRule$1.evaluate(WireMockClassRule.java:70)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

On Sat, Apr 21, 2018 at 3:06 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> You are hitting: https://issues.apache.org/jira/browse/KAFKA-6499
>
> Was fixed in 1.1 release.
>
> Thus, you can just ignore the checkpoint file. There should be no issue
> with running on Kubernetes.
>
> Also, if there is no store (independent of disk based or in-memory)
> there will be no changelog topic.
>
>
> -Matthias
>
> On 4/21/18 8:34 AM, pradeep s wrote:
> > Hi,
> > I am using kafka streams app connecting to confluent kafka
> cluster(10.2.1).
> > Application is reading messages from a topic, performing a tranformation
> > and pushing to output topic . There is no count or aggregation performed
> .
> > Have following clarifications regarding state directory.
> >
> > *1)* Will there be any data written in state directory?
> > When i verified the state directory , it was showing
> > 0
> > 0
> >
> > *2)* Application is running in kubernetes without any external volumes .
> > Will state directory cause any processing issue during kubernetes pod
> > restarts?
> >
> > *3)* Will the app creates a changelog topic since there is no in memory
> > store used in the app?
> >
> >
> > Code Snippet
> > ===========
> >
> > Stream Config
> > =============
> >
> > properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > streamEnvironment.getKafkaBootstrapServers());
> > properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> > NUMBER_OF_STREAM_THREADS);
> > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> COMMIT_INTERVAL_MS);
> >
> >
> > properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY);
> > properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_
> EXCEPTION_HANDLER_CLASS_CONFIG,
> >         ItemDeserializationExceptionHandler.class);
> >
> >
> >
> > Stream builder
> > =============
> >
> > private void processStream(StreamsBuilder builder) {
> >     KStream<byte[], byte[]> input = builder.stream(inputTopic,
> > Consumed.with(byteArraySerde, byteArraySerde))
> >                                            .peek((key, value) ->
> > metricsClient.writeMetric(
> >
> > CountMetric.generate(METRIC_OFFER_INPUT, 1)));
> >
> >     KStream<byte[], DeserializationResultWrapper>
> > deserializationResultStream = input
> >             .mapValues(this::deserializeOffer);
> >
> >     quarantineNonDeSerializableOffers(deserializationResultStream);
> >
> >     KStream<byte[], List<TransformerResult>> trans =
> > transformOffers(deserializationResultStream);
> >
> >     produceToQuarantineTopic(trans);
> >
> >     produceToOutputTopic(trans);
> >
> > }
> >
> > private void produceToOutputTopic(KStream<byte[],
> > List<TransformerResult>> trans) {
> >     trans.filter((key, value) -> value != null
> >                                  && !value.isEmpty())
> >          .peek((key, value) ->
> > metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT,
> > 1)))
> >          .flatMapValues(transformerResults ->
> transformerResults.stream()
> >
> > .map(TransformerResult::getItem)
> >
> > .filter(Objects::nonNull)
> >
> > .collect(Collectors.toCollection(ArrayList::new)))
> >          .to(outputTopic, Produced.with(byteArraySerde,
> itemEnvelopeSerde));
> > }
> >
> > private void produceToQuarantineTopic(KStream<byte[],
> > List<TransformerResult>> trans) {
> >     trans.filter((key, value) -> value == null || value.isEmpty()
> >                                  ||
> > value.stream().anyMatch(TransformerResult::hasErrors))
> >          .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU))
> >          .to(quarantineTopic, Produced.with(byteArraySerde,
> > quarantineItemEnvelopeSerde));
> > }
> >
> > Thanks
> > Pradeep
> >
>
>

Reply via email to