cadonna commented on code in PR #16090:
URL: https://github.com/apache/kafka/pull/16090#discussion_r1616218317


##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * This interface allows user code to inspect the context of a record that has 
failed processing.
+ */
+public interface ErrorHandlerContext {
+    /**
+     * Return the topic name of the current input record; could be {@code 
null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic 
name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the topic name
+     */
+    String topic();
+
+    /**
+     * Return the partition id of the current input record; could be {@code 
-1} if it is not

Review Comment:
   Could you please use `ID` instead of `id` here and elsewhere?
   For me it's better readable. 
   BTW, I am aware that we use `id` in some other javadocs.



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.streams.processor.api.Record;
+
+/**
+ * An interface that allows user code to inspect a record that has failed 
processing
+ */
+public interface ProcessingExceptionHandler extends Configurable {
+    /**
+     * Inspect a record and the exception received
+     *
+     * @param context processing context metadata
+     * @param record record where the exception occurred
+     * @param exception the actual exception
+     */
+    ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> 
record, Exception exception);

Review Comment:
   In Streams, we use `final` for all arguments. 
   ```suggestion
       ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, Exception exception);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContextImpl.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.processor.TaskId;
+
+public class ErrorHandlerContextImpl implements ErrorHandlerContext {
+    private final String topic;
+    private final int partition;
+    private final long offset;
+    private final Headers headers;
+    private final byte[] sourceRawKey;
+    private final byte[] sourceRawValue;
+    private final String processorNodeId;
+    private final TaskId taskId;
+
+    public ErrorHandlerContextImpl(final String topic,
+        final int partition,
+        final long offset,
+        final Headers headers,
+        final byte[] sourceRawKey,
+        final byte[] sourceRawValue,
+        final String processorNodeId,
+        final TaskId taskId) {

Review Comment:
   nit:
   ```suggestion
       public ErrorHandlerContextImpl(final String topic,
                                      final int partition,
                                      final long offset,
                                      final Headers headers,
                                      final byte[] sourceRawKey,
                                      final byte[] sourceRawValue,
                                      final String processorNodeId,
                                      final TaskId taskId) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContextImpl.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;

Review Comment:
   The implementation should go into an internal package since it is an 
implementation detail that should not be exposed to the public. Otherwise, we 
would need a KIP for it.
   You can create a package  `org.apache.kafka.streams.errors.internals`.



##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContextImpl.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.processor.TaskId;
+
+public class ErrorHandlerContextImpl implements ErrorHandlerContext {

Review Comment:
   I am not fond of implementations of interfaces that end in `*Impl`, because 
they do not really tell you something about what the implementation is about. 
An alternative might be `DefaultErrorHandlerContext` (as used with the 
`DefaultStateUpdater`). 



##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContextImpl.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.processor.TaskId;
+
+public class ErrorHandlerContextImpl implements ErrorHandlerContext {
+    private final String topic;
+    private final int partition;
+    private final long offset;
+    private final Headers headers;
+    private final byte[] sourceRawKey;
+    private final byte[] sourceRawValue;
+    private final String processorNodeId;
+    private final TaskId taskId;
+
+    public ErrorHandlerContextImpl(final String topic,
+        final int partition,
+        final long offset,
+        final Headers headers,
+        final byte[] sourceRawKey,
+        final byte[] sourceRawValue,
+        final String processorNodeId,
+        final TaskId taskId) {
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.headers = headers;
+        this.sourceRawKey = sourceRawKey;
+        this.sourceRawValue = sourceRawValue;
+        this.processorNodeId = processorNodeId;
+        this.taskId = taskId;
+    }
+
+

Review Comment:
   ```suggestion
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndFailExceptionHandler.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.streams.processor.api.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class ProcessingLogAndFailExceptionHandler implements 
ProcessingExceptionHandler {

Review Comment:
   Please add some java docs.



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndFailExceptionHandler.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.streams.processor.api.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class ProcessingLogAndFailExceptionHandler implements 
ProcessingExceptionHandler {

Review Comment:
   Also here I propose to name it `LogAndFailProcessingExceptionHandler`. 



##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * This interface allows user code to inspect the context of a record that has 
failed processing.
+ */
+public interface ErrorHandlerContext {
+    /**
+     * Return the topic name of the current input record; could be {@code 
null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic 
name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the topic name
+     */
+    String topic();
+
+    /**
+     * Return the partition id of the current input record; could be {@code 
-1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid 
partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the partition id
+     */
+    int partition();
+
+    /**
+     * Return the offset of the current input record; could be {@code -1} if 
it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, 
as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the offset
+     */
+    long offset();
+
+    /**
+     * Return the headers of the current source record; could be an empty 
header if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record might not have any associated headers.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide valid headers, 
as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the headers
+     */
+    Headers headers();
+
+    /**
+     * Return the non-deserialized byte[] of the input message key if the 
context has been triggered by a message.
+     *
+     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, it will return null.
+     *
+     * <p> If this method is invoked in a sub-topology due to a repartition, 
the returned key would be one sent
+     * to the repartition topic.
+     *
+     * @return the raw byte of the key of the source message
+     */
+    byte[] sourceRawKey();
+
+    /**
+     * Return the non-deserialized byte[] of the input message value if the 
context has been triggered by a message.
+     *
+     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, it will return null.

Review Comment:
   ```suggestion
        * callback, it will return {@code null}.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * This interface allows user code to inspect the context of a record that has 
failed processing.
+ */
+public interface ErrorHandlerContext {
+    /**
+     * Return the topic name of the current input record; could be {@code 
null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic 
name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the topic name
+     */
+    String topic();
+
+    /**
+     * Return the partition id of the current input record; could be {@code 
-1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid 
partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the partition id
+     */
+    int partition();
+
+    /**
+     * Return the offset of the current input record; could be {@code -1} if 
it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, 
as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the offset
+     */
+    long offset();
+
+    /**
+     * Return the headers of the current source record; could be an empty 
header if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link 
Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, the record might not have any associated headers.
+     * Another example is
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
+     * (and siblings), that do not always guarantee to provide valid headers, 
as they might be
+     * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     *
+     * @return the headers
+     */
+    Headers headers();
+
+    /**
+     * Return the non-deserialized byte[] of the input message key if the 
context has been triggered by a message.
+     *
+     * <p> If this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded 
by a punctuation
+     * callback, it will return null.

Review Comment:
   ```suggestion
        * callback, it will return {@code null}.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProcessingLogAndContinueExceptionHandler.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.streams.processor.api.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Processing exception handler that logs a processing exception and then
+ * signals the processing pipeline to continue processing more records.
+ */
+public class ProcessingLogAndContinueExceptionHandler implements 
ProcessingExceptionHandler {

Review Comment:
   I think, we should rename this to `LogAndContinueProcessingExceptionHandler` 
because it somehow makes more sense. It is a processing exception handler 
(`ProcessingExceptionHandler`) that logs and continues (`LogAndContinue`). 
Sorry for not noticing it earlier. You can write to the voting thread and 
communicate this minor change. I can also do it if you like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to