[jira] [Updated] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2

2020-05-27 Thread Andre Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andre Araujo updated KAFKA-10048:
-
Flags:   (was: Patch)

> Possible data gap for a consumer after a failover when using MM2
> 
>
> Key: KAFKA-10048
> URL: https://issues.apache.org/jira/browse/KAFKA-10048
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Andre Araujo
>Priority: Major
>
> I've been looking at some MM2 scenarios and identified a situation where 
> consumers can miss consuming some data in the even of a failover.
>  
> When a consumer subscribes to a topic for the first time and commits offsets, 
> the offsets for every existing partition of that topic will be saved to the 
> cluster's {{__consumer_offset}} topic. Even if a partition is completely 
> empty, the offset {{0}} will still be saved for the consumer's consumer group.
>  
> When MM2 is replicating the checkpoints to the remote cluster, though, it 
> [ignores anything that has an offset equals to 
> zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135],
>  replicating offsets only for partitions that contain data.
>  
> This can lead to a gap in the data consumed by consumers in the following 
> scenario:
>  # Topic is created on the source cluster.
>  # MM2 is configured to replicate the topic and consumer groups
>  # Producer starts to produce data to the source topic but for some reason 
> some partitions do not get data initially, while others do (skewed keyed 
> messages or bad luck)
>  # Consumers start to consume data from that topic and their consumer groups' 
> offsets are replicated to the target cluster, *but only for partitions that 
> contain data*. The consumers are using the default setting auto.offset.reset 
> = latest.
>  # A consumer failover to the second cluster is performed (for whatever 
> reason), and the offset translation steps are completed. The consumer are not 
> restarted yet.
>  # The producers continue to produce data to the source cluster topic and now 
> produce data to the partitions that were empty before.
>  # *After* the producers start producing data, consumers are started on the 
> target cluster and start consuming.
> For the partitions that already had data before the failover, everything 
> works fine. The consumer offsets will have been translated correctly and the 
> consumers will start consuming from the correct position.
> For the partitions that were empty before the failover, though, any data 
> written by the producers to those partitions *after the failover but before 
> the consumers start* will be completely missed, since the consumers will jump 
> straight to the latest offset when they start due to the lack of a zero 
> offset stored locally on the target cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10048) Possible data gap for a consumer after a failover when using MM2

2020-05-27 Thread Andre Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andre Araujo updated KAFKA-10048:
-
Flags: Patch

> Possible data gap for a consumer after a failover when using MM2
> 
>
> Key: KAFKA-10048
> URL: https://issues.apache.org/jira/browse/KAFKA-10048
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Andre Araujo
>Priority: Major
>
> I've been looking at some MM2 scenarios and identified a situation where 
> consumers can miss consuming some data in the even of a failover.
>  
> When a consumer subscribes to a topic for the first time and commits offsets, 
> the offsets for every existing partition of that topic will be saved to the 
> cluster's {{__consumer_offset}} topic. Even if a partition is completely 
> empty, the offset {{0}} will still be saved for the consumer's consumer group.
>  
> When MM2 is replicating the checkpoints to the remote cluster, though, it 
> [ignores anything that has an offset equals to 
> zero|https://github.com/apache/kafka/blob/856e36651203b03bf9a6df2f2d85a356644cbce3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L135],
>  replicating offsets only for partitions that contain data.
>  
> This can lead to a gap in the data consumed by consumers in the following 
> scenario:
>  # Topic is created on the source cluster.
>  # MM2 is configured to replicate the topic and consumer groups
>  # Producer starts to produce data to the source topic but for some reason 
> some partitions do not get data initially, while others do (skewed keyed 
> messages or bad luck)
>  # Consumers start to consume data from that topic and their consumer groups' 
> offsets are replicated to the target cluster, *but only for partitions that 
> contain data*. The consumers are using the default setting auto.offset.reset 
> = latest.
>  # A consumer failover to the second cluster is performed (for whatever 
> reason), and the offset translation steps are completed. The consumer are not 
> restarted yet.
>  # The producers continue to produce data to the source cluster topic and now 
> produce data to the partitions that were empty before.
>  # *After* the producers start producing data, consumers are started on the 
> target cluster and start consuming.
> For the partitions that already had data before the failover, everything 
> works fine. The consumer offsets will have been translated correctly and the 
> consumers will start consuming from the correct position.
> For the partitions that were empty before the failover, though, any data 
> written by the producers to those partitions *after the failover but before 
> the consumers start* will be completely missed, since the consumers will jump 
> straight to the latest offset when they start due to the lack of a zero 
> offset stored locally on the target cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10049) KTable-KTable Foreign

2020-05-27 Thread Amit Chauhan (Jira)
Amit Chauhan created KAFKA-10049:


 Summary: KTable-KTable Foreign 
 Key: KAFKA-10049
 URL: https://issues.apache.org/jira/browse/KAFKA-10049
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
Reporter: Amit Chauhan


 

{code}

 

{{public static void main(String[] args) \{

 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application-2");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
JSONSerdeComp<>().getClass());
 props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

 StreamsBuilder builder = new StreamsBuilder();
 KTable ordersTable = builder.table(TOPIC_Agora);
 KTable stockTable = builder.table(TOPIC_Stock_Data);

 KTable enriched = ordersTable.leftJoin(stockTable, 
OrderObject:: getSymbol, new ValueJoiner() {

@Override
public EnrichedOrder apply(OrderObject order, StockMarketData 
stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)

.stockLast(stock!=null?stock.getLast().doubleValue():0.0)

.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

 enriched.toStream().foreach(new ForeachAction() \{
 @Override
public void apply(String arg0, EnrichedOrder arg1) {

 logger.info(String.format("key = %s, value = %s", arg0, arg1));
}
});

 KafkaStreams streams = new KafkaStreams(builder.build(), props);
 streams.start();

 Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}

{{}}

 

{{
org.apache.kafka
kafka-clients
2.5.0


org.apache.kafka
kafka-streams
2.5.0
}}{{}}

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread Amit Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Chauhan updated KAFKA-10049:
-
Description: 
 I want to make use of _KTable-KTable_ join feature released in *_2.5.0_* but 
facing issue while running the code.
{code:java}
 

 public static void main(String[] args) {

 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application-2");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
JSONSerdeComp<>().getClass());
 props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

 StreamsBuilder builder = new StreamsBuilder();
 KTable ordersTable = builder.table(TOPIC_Agora);
 KTable stockTable = builder.table(TOPIC_Stock_Data);

 KTable enriched = ordersTable.leftJoin(stockTable, 
OrderObject:: getSymbol, new ValueJoiner() {

@Override
public EnrichedOrder apply(OrderObject order, StockMarketData 
stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)

.stockLast(stock!=null?stock.getLast().doubleValue():0.0)

.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

 enriched.toStream().foreach(new ForeachAction() \{
 @Override
public void apply(String arg0, EnrichedOrder arg1) {

 logger.info(String.format("key = %s, value = %s", arg0, arg1));
}
});

 KafkaStreams streams = new KafkaStreams(builder.build(), props);
 streams.start();

 Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}



 


org.apache.kafka
kafka-clients
2.5.0


org.apache.kafka
kafka-streams
2.5.0
}}

{code}

  was:
 

{code}

 

{{public static void main(String[] args) \{

 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application-2");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
JSONSerdeComp<>().getClass());
 props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

 StreamsBuilder builder = new StreamsBuilder();
 KTable ordersTable = builder.table(TOPIC_Agora);
 KTable stockTable = builder.table(TOPIC_Stock_Data);

 KTable enriched = ordersTable.leftJoin(stockTable, 
OrderObject:: getSymbol, new ValueJoiner() {

@Override
public EnrichedOrder apply(OrderObject order, StockMarketData 
stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)

.stockLast(stock!=null?stock.getLast().doubleValue():0.0)

.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

 enriched.toStream().foreach(new ForeachAction() \{
 @Override
public void apply(String arg0, Enrich

[jira] [Updated] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread Amit Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Chauhan updated KAFKA-10049:
-
Description: 
 I want to make use of _KTable-KTable_ join feature released in *_2.5.0_* but 
facing issue while running the code.
{code:java}
 

 public static void main(String[] args) {

 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application-2");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
JSONSerdeComp<>().getClass());
 props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

 StreamsBuilder builder = new StreamsBuilder();
 KTable ordersTable = builder.table(TOPIC_Agora);
 KTable stockTable = builder.table(TOPIC_Stock_Data);

 KTable enriched = ordersTable.leftJoin(stockTable, 
OrderObject:: getSymbol, new ValueJoiner() {

@Override
public EnrichedOrder apply(OrderObject order, StockMarketData 
stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)

.stockLast(stock!=null?stock.getLast().doubleValue():0.0)

.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

 enriched.toStream().foreach(new ForeachAction() \{
 @Override
public void apply(String arg0, EnrichedOrder arg1) {

 logger.info(String.format("key = %s, value = %s", arg0, arg1));
}
});

 KafkaStreams streams = new KafkaStreams(builder.build(), props);
 streams.start();

 Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}



 


org.apache.kafka
kafka-clients
2.5.0


org.apache.kafka
kafka-streams
2.5.0


{code}

*+Exception:+*

{code}
18:49:31.525 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
stream-thread 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 task [0_0] Failed to flush state store orders-STATE-STORE-00: 
org.apache.kafka.streams.errors.StreamsException: ClassCastException while 
producing data to a sink topic. A serializer (key: 
org.apache.kafka.common.serialization.StringSerializer / value: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
 is not compatible to the actual key or value type (key type: java.lang.String 
/ value type: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). 
Change the default Serdes in StreamConfig or provide correct Serdes via method 
parameters (for example if using the DSL, `#to(String topic, Produced 
produced)` with 
`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) 
~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
 ~[kafka-streams

[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-05-27 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117509#comment-17117509
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


[~mjsax] If I'm not mistaken, these logs indicate a bug, as both threads (same 
instance) operate on _/tmp/kafka-streams/instance-2/test-app/0_1/_ at the same 
time
https://issues.apache.org/jira/browse/KAFKA-9891?focusedCommentId=17115352&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17115352

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
> Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, 
> state_store_operations.txt, tasks_assignment.txt
>
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Process

[jira] [Created] (KAFKA-10050) kafka_log4j_appender.py broken on JDK11

2020-05-27 Thread Nikolay Izhikov (Jira)
Nikolay Izhikov created KAFKA-10050:
---

 Summary: kafka_log4j_appender.py broken on JDK11
 Key: KAFKA-10050
 URL: https://issues.apache.org/jira/browse/KAFKA-10050
 Project: Kafka
  Issue Type: Bug
Reporter: Nikolay Izhikov
Assignee: Nikolay Izhikov


kafka_log4j_appender.py brokern on jdk11

{noformat}
[INFO:2020-05-27 02:31:27,662]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SSL:
 Data: None

SESSION REPORT (ALL TESTS)
ducktape version: 0.7.7
session_id:   2020-05-27--002
run time: 1 minute 41.177 seconds
tests run:4
passed:   0
failed:   4
ignored:  0

test_id:
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_PLAINTEXT
status: FAIL
run time:   27.509 seconds


KafkaLog4jAppender-0-140270269628496-worker-1: Traceback (most recent call 
last):
  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
 line 36, in _protected_worker
self._worker(idx, node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", line 
42, in _worker
cmd = self.start_cmd(node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", line 
48, in start_cmd
cmd = fix_opts_for_new_jvm(node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka/util.py", line 36, in 
fix_opts_for_new_jvm
if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or 
node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version 
== LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
AttributeError: 'ClusterNode' object has no attribute 'version'

Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", line 
132, in run
data = self.run_test()
  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", line 
189, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 
428, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/tools/log4j_appender_test.py", 
line 84, in test_log4j_appender
self.appender.wait()
  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
 line 72, in wait
self._propagate_exceptions()
  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
 line 98, in _propagate_exceptions
raise Exception(self.errors)
Exception: KafkaLog4jAppender-0-140270269628496-worker-1: Traceback (most 
recent call last):
  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
 line 36, in _protected_worker
self._worker(idx, node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", line 
42, in _worker
cmd = self.start_cmd(node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", line 
48, in start_cmd
cmd = fix_opts_for_new_jvm(node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka/util.py", line 36, in 
fix_opts_for_new_jvm
if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or 
node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version 
== LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
AttributeError: 'ClusterNode' object has no attribute 'version'



test_id:
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_SSL
status: FAIL
run time:   28.121 seconds


KafkaLog4jAppender-0-140270269498000-worker-1: Traceback (most recent call 
last):
  File 
"/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
 line 36, in _protected_worker
self._worker(idx, node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", line 
42, in _worker
cmd = self.start_cmd(node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", line 
48, in start_cmd
cmd = fix_opts_for_new_jvm(node)
  File "/opt/kafka-dev/tests/kafkatest/services/kafka/util.py", line 36, in 
fix_opts_for_new_jvm
if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or 
node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version 
== LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
AttributeError: 'ClusterNode' object has no attribute 'version'

Traceback (most recent call last

[GitHub] [kafka] nizhikov opened a new pull request #8731: KAFKA-10050: kafka_log4j_appender.py fixed for JDK11

2020-05-27 Thread GitBox


nizhikov opened a new pull request #8731:
URL: https://github.com/apache/kafka/pull/8731


   kafka_log4j_appender.py broken on JDK11 by 
#befd80b38d3ccb1aa0c6d99a899129fd5cf27774
   This fix just setup node version for log4j appender.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread Amit Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Chauhan updated KAFKA-10049:
-
Description: 
 I want to make use of _KTable-KTable_ Foreign Key join feature released in 
*_2.5.0_* but facing issue while running the code. 
{code:java}
 

 public static void main(String[] args) {

 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application-2");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
JSONSerdeComp<>().getClass());
 props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

 StreamsBuilder builder = new StreamsBuilder();
 KTable ordersTable = builder.table(TOPIC_Agora);
 KTable stockTable = builder.table(TOPIC_Stock_Data);

 KTable enriched = ordersTable.leftJoin(stockTable, 
OrderObject:: getSymbol, new ValueJoiner() {

@Override
public EnrichedOrder apply(OrderObject order, StockMarketData 
stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)

.stockLast(stock!=null?stock.getLast().doubleValue():0.0)

.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

 enriched.toStream().foreach(new ForeachAction() \{
 @Override
public void apply(String arg0, EnrichedOrder arg1) {

 logger.info(String.format("key = %s, value = %s", arg0, arg1));
}
});

 KafkaStreams streams = new KafkaStreams(builder.build(), props);
 streams.start();

 Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}



 


org.apache.kafka
kafka-clients
2.5.0


org.apache.kafka
kafka-streams
2.5.0


{code}
*+Exception:+*
{code:java}
18:49:31.525 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
stream-thread 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 task [0_0] Failed to flush state store orders-STATE-STORE-00: 
org.apache.kafka.streams.errors.StreamsException: ClassCastException while 
producing data to a sink topic. A serializer (key: 
org.apache.kafka.common.serialization.StringSerializer / value: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
 is not compatible to the actual key or value type (key type: java.lang.String 
/ value type: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). 
Change the default Serdes in StreamConfig or provide correct Serdes via method 
parameters (for example if using the DSL, `#to(String topic, Produced 
produced)` with 
`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) 
~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157)
 ~[kafka-streams-2.5.0.jar:?]
at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)

[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-05-27 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r430997918



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -65,6 +63,16 @@ object Json {
 catch { case e: JsonProcessingException => Left(e) }
   }
 
+  /**
+* Parse a JSON string into a JsonValue if possible. It returns an `Either` 
where `Left` will be an exception and
+* `Right` is the `JsonValue`.
+* @param input a JSON string to parse
+* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
+*/
+  def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
+try Right(mapper.readTree(input)).map(JsonValue(_))

Review comment:
   I don't think that's possible in this case as with the String parameter 
you pass in the Json string itself which can either be `null` in which case an 
`IllegalArgumentException` will be thrown or some kind of value which can or 
can't be parsed and then the method returns with `Either`.
   In the cases where you read the Json in from an InputStream or a Reader it 
is possible to return `null` as you can terminate the stream without providing 
any actual content (but in the String you provide the content itself).
   If you want, I can handle the possible `null` but I don't think it's 
possible with `readTree(String)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-05-27 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r430997918



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -65,6 +63,16 @@ object Json {
 catch { case e: JsonProcessingException => Left(e) }
   }
 
+  /**
+* Parse a JSON string into a JsonValue if possible. It returns an `Either` 
where `Left` will be an exception and
+* `Right` is the `JsonValue`.
+* @param input a JSON string to parse
+* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
+*/
+  def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
+try Right(mapper.readTree(input)).map(JsonValue(_))

Review comment:
   I don't think that's possible in this case as with the String parameter 
you pass in the Json string itself which can either be `null` in which case an 
`IllegalArgumentException` will be thrown or some kind of value which can or 
can't be parsed and then the method returns with `Either`.
   In the cases where you read the Json in from an InputStream or a Reader it 
is possible to return `null` as you can terminate the stream without providing 
any actual content (but in the String you provide the content itself).
   If you'd like to, I can handle the `null` in the spirit of defensive 
programming but I don't think it's possible with `readTree(String)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-05-27 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r431001766



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -1609,14 +1610,15 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicPartition, 
Seq[Int])], Map[TopicPartitionReplica, String]) = {
-Json.parseFull(jsonData) match {
-  case Some(js) =>
+Json.tryParseFull(jsonData) match {
+  case Right(js) =>
 val version = js.asJsonObject.get("version") match {
   case Some(jsonValue) => jsonValue.to[Int]
   case None => EarliestVersion
 }
 parsePartitionReassignmentData(version, js)
-  case None => throw new AdminOperationException("The input string is not 
a valid JSON")
+  case Left(f) =>
+throw f

Review comment:
   In this case I deliberately don't handle the exception in order to 
propagate it to the user who called this command. The purpose of this is to 
propagate the JSON parsing error so they know why (and where) was the JSON 
invalid. I can wrap this in an AdminOperationException but since it is intended 
for the user and not for catching, I think it can be left as it is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-27 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r431001869



##
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##
@@ -33,11 +33,15 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
  group: GroupMetadata,
- rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+ rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.
+   */
+  override def tryComplete(): Boolean = if 
(group.inLock(group.hasAllMembersJoined)) forceComplete() else false

Review comment:
   > So, we probably want to keep doing that.
   
   It may produce deadlock if we hold the group lock for 
```GroupCoordinator.onCompleteJoin```.  
   
   ```GroupCoordinator.onCompleteJoin``` is possible to append record to 
```__consumer_offsets``` (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1168),
 and hence it will try to complete other delayed joins which have groups 
belonging to same partition of ```__consumer_offsets```. 
   
   That is why I make ```DelayedJoin``` control the group lock manually.  For 
another,  ```GroupCoordinator.onCompleteJoin``` is used by ```DelayedJoin``` 
only so it should be fine to change behavior of group lock in this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-27 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r431002494



##
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##
@@ -33,11 +33,15 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
  group: GroupMetadata,
- rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+ rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.
+   */
+  override def tryComplete(): Boolean = if 
(group.inLock(group.hasAllMembersJoined)) forceComplete() else false

Review comment:
   > I am thinking that we could do the following.
   
   Copy that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-27 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r431003374



##
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##
@@ -33,11 +33,15 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
  group: GroupMetadata,
- rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+ rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.
+   */
+  override def tryComplete(): Boolean = if 
(group.inLock(group.hasAllMembersJoined)) forceComplete() else false

Review comment:
   > So, we probably want to keep doing that.
   
   hmmm, I misunderstand your point. Please ignore my first comment :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol

2020-05-27 Thread Marouane RAJI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117609#comment-17117609
 ] 

Marouane RAJI commented on KAFKA-4090:
--

Hey, 
 We ran into this issue with Samza jobs using a Kafka client with TLS enabled. 
When producing data, Samza uses two clients, a producer and Admin (setup as a 
consumer to fetch meta-data etc..). We haven't setup the consumer side properly 
and it was silently failing, and causing the job to use a lot of memory (and in 
some cases die because of OOM). 
 We tested PR [https://github.com/apache/kafka/pull/8066] and it seems to work 
properly. We successfully tested the TLS usecase and we had no OOM when we 
don't setup the consumer client properly (we had exceptions reporting the 
mis-config as per PR#8066).

Please let us know if we can help in anyway ? 

> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> 
>
> Key: KAFKA-4090
> URL: https://issues.apache.org/jira/browse/KAFKA-4090
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0
>Reporter: Jaikiran Pai
>Assignee: Alexandre Dupriez
>Priority: Major
>
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in 0.10.0.1 so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which sends a single message to the topic via 
> Java (new producer) APIs:
> {code}
> public class OOMTest {
> public static void main(final String[] args) throws Exception {
> final Properties kafkaProducerConfigs = new Properties();
> // NOTE: Intentionally use a SSL port without specifying 
> security.protocol as SSL
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9093");
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>  StringSerializer.class.getName());
> try (KafkaProducer producer = new 
> KafkaProducer<>(kafkaProducerConfigs)) {
> System.out.println("Created Kafka producer");
> final String topicName = "oom-test";
> final String message = "Hello OOM!";
> // send a message to the topic
> final Future recordMetadataFuture = 
> producer.send(new ProducerRecord<>(topicName, message));
> final RecordMetadata sentRecordMetadata = 
> recordMetadataFuture.get();
> System.out.println("Sent message '" + message + "' to topic '" + 
> topicName + "'");
> }
> System.out.println("Tests complete");
> }
> }
> {code}
> Notice that the server URL is using a SSL endpoint localhost:9093 but isn't 
> specifying any of the other necessary SSL configs like security.protocol.
> 5. For the sake of easily reproducing this issue run this class with a max 
> heap size of 256MB (-Xmx256M). Running this code throws up the following 
> OutOfMemoryError in one of the Sender threads:
> {code}
> 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in 
> kafka-producer-network-thread | producer-1:
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

[jira] [Comment Edited] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol

2020-05-27 Thread Marouane RAJI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117609#comment-17117609
 ] 

Marouane RAJI edited comment on KAFKA-4090 at 5/27/20, 10:11 AM:
-

Hey, 
 We ran into this issue with Samza jobs using a Kafka client with TLS enabled. 
When producing data, Samza uses two clients, a producer and Admin (setup as a 
consumer to fetch meta-data etc..). We haven't setup the consumer side properly 
and it was silently failing, and causing the job to use a lot of memory (and in 
some cases die because of OOM). 
 We tested PR [https://github.com/apache/kafka/pull/8066] and it seems to work 
properly. We successfully tested the TLS usecase and we had no OOM when we 
don't setup the consumer client properly (we had exceptions reporting the 
mis-config as per PR#8066).

Please let us know if we can help in anyway moving this forward ? 


was (Author: rmarou):
Hey, 
 We ran into this issue with Samza jobs using a Kafka client with TLS enabled. 
When producing data, Samza uses two clients, a producer and Admin (setup as a 
consumer to fetch meta-data etc..). We haven't setup the consumer side properly 
and it was silently failing, and causing the job to use a lot of memory (and in 
some cases die because of OOM). 
 We tested PR [https://github.com/apache/kafka/pull/8066] and it seems to work 
properly. We successfully tested the TLS usecase and we had no OOM when we 
don't setup the consumer client properly (we had exceptions reporting the 
mis-config as per PR#8066).

Please let us know if we can help in anyway ? 

> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> 
>
> Key: KAFKA-4090
> URL: https://issues.apache.org/jira/browse/KAFKA-4090
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0
>Reporter: Jaikiran Pai
>Assignee: Alexandre Dupriez
>Priority: Major
>
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in 0.10.0.1 so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which sends a single message to the topic via 
> Java (new producer) APIs:
> {code}
> public class OOMTest {
> public static void main(final String[] args) throws Exception {
> final Properties kafkaProducerConfigs = new Properties();
> // NOTE: Intentionally use a SSL port without specifying 
> security.protocol as SSL
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9093");
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>  StringSerializer.class.getName());
> try (KafkaProducer producer = new 
> KafkaProducer<>(kafkaProducerConfigs)) {
> System.out.println("Created Kafka producer");
> final String topicName = "oom-test";
> final String message = "Hello OOM!";
> // send a message to the topic
> final Future recordMetadataFuture = 
> producer.send(new ProducerRecord<>(topicName, message));
> final RecordMetadata sentRecordMetadata = 
> recordMetadataFuture.get();
> System.out.println("Sent message '" + message + "' to topic '" + 
> topicName + "'");
> }
> System.out.println("Tests complete");
> }
> }
> {code}
> Notice that the server URL is using a SSL endpoint localhost:9093 bu

[jira] [Created] (KAFKA-10051) kafka_2.11 2.5.0 isn't available on Maven Central

2020-05-27 Thread Stefan Zwanenburg (Jira)
Stefan Zwanenburg created KAFKA-10051:
-

 Summary: kafka_2.11 2.5.0 isn't available on Maven Central
 Key: KAFKA-10051
 URL: https://issues.apache.org/jira/browse/KAFKA-10051
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 2.5.0
Reporter: Stefan Zwanenburg


I'm using Spring Boot in a project, and I tried updating it to version 2.3.0, 
which internally depends on all sorts of Kafka artifacts version 2.5.0.

One of these artifacts is _kafka_2.11_, which doesn't appear to be available on 
Maven Central. This means my builds fail, unless I somehow force dependencies 
on versions of Kafka artifacts that are available on Maven Central.

Links:
 - [Search for kafka_2.11 version 2.5.0 on Maven 
Central|https://search.maven.org/search?q=a:kafka_2.11%20AND%20v:2.5.0]

PS: Not sure what the priority ought to be, but it's definitely blocking me in 
my work :)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10036) Improve error message if user violates `Supplier` pattern

2020-05-27 Thread Alex Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Sun reassigned KAFKA-10036:


Assignee: Alex Sun

> Improve error message if user violates `Supplier` pattern
> -
>
> Key: KAFKA-10036
> URL: https://issues.apache.org/jira/browse/KAFKA-10036
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Alex Sun
>Priority: Minor
>  Labels: newbie
>
> Using the Processor API, users need to pass in a `ProcessorSupplier` that 
> needs to return new `Processor` instance each time `get()` is called.
> Users violate this rule on a regular basis and return the same instance on 
> `get()`. This mistake leads to a (not very informative) 
> `NullPointerException` during runtime. (Cf: 
> [https://stackoverflow.com/questions/61790984/kafka-stream-forward-method-throwing-nullpointerexception-because-processornode/61978396)]
> We could improve the error message by checking if `currentNode()` returns 
> `null` 
> ([https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183)]
>  and throw an informative error message for this case.
> Furthermore, we could do a "sanity" check within `KafkaStreams` constructor 
> before we start the process threads: we get all `Suppliers` for the 
> `Topology` and call `get()` two times on each supplier to compare if the 
> returned object references are different – if they are the same, we throw an 
> informative error message.
> We should improve the JavaDocs, too: 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java#L34]
>  (also for `Transformer` et al. – it seems to be too subtle what "new" means. 
> Similarly for 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/Topology.java]
>  and 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java]
>  (`process()`, `transform()` etc.)
> Furthermore, we should improve the docs: to explain the supplier pattern 
> explicitly: 
> [https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-27 Thread GitBox


tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-634585597


   Rebased for conflict.
   
   @kkonstantine I've addressed those first comments, thanks! Still some work 
on the integration test (not passing when run via gradle).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10051) kafka_2.11 2.5.0 isn't available on Maven Central

2020-05-27 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley resolved KAFKA-10051.
-
Resolution: Won't Fix

Kafka 2.5.0 dropped support for Scala 2.11.

> kafka_2.11 2.5.0 isn't available on Maven Central
> -
>
> Key: KAFKA-10051
> URL: https://issues.apache.org/jira/browse/KAFKA-10051
> Project: Kafka
>  Issue Type: Bug
>  Components: packaging
>Affects Versions: 2.5.0
>Reporter: Stefan Zwanenburg
>Priority: Blocker
>
> I'm using Spring Boot in a project, and I tried updating it to version 2.3.0, 
> which internally depends on all sorts of Kafka artifacts version 2.5.0.
> One of these artifacts is _kafka_2.11_, which doesn't appear to be available 
> on Maven Central. This means my builds fail, unless I somehow force 
> dependencies on versions of Kafka artifacts that are available on Maven 
> Central.
> Links:
>  - [Search for kafka_2.11 version 2.5.0 on Maven 
> Central|https://search.maven.org/search?q=a:kafka_2.11%20AND%20v:2.5.0]
> PS: Not sure what the priority ought to be, but it's definitely blocking me 
> in my work :)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10051) kafka_2.11 2.5.0 isn't available on Maven Central

2020-05-27 Thread Stefan Zwanenburg (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117661#comment-17117661
 ] 

Stefan Zwanenburg commented on KAFKA-10051:
---

Ah. The release notes mentioned a deprecation, which led me to believe there 
might be a grace period after which it would be dropped entirely. In that case, 
I'll open an issue with Spring Boot. Thank you!

> kafka_2.11 2.5.0 isn't available on Maven Central
> -
>
> Key: KAFKA-10051
> URL: https://issues.apache.org/jira/browse/KAFKA-10051
> Project: Kafka
>  Issue Type: Bug
>  Components: packaging
>Affects Versions: 2.5.0
>Reporter: Stefan Zwanenburg
>Priority: Blocker
>
> I'm using Spring Boot in a project, and I tried updating it to version 2.3.0, 
> which internally depends on all sorts of Kafka artifacts version 2.5.0.
> One of these artifacts is _kafka_2.11_, which doesn't appear to be available 
> on Maven Central. This means my builds fail, unless I somehow force 
> dependencies on versions of Kafka artifacts that are available on Maven 
> Central.
> Links:
>  - [Search for kafka_2.11 version 2.5.0 on Maven 
> Central|https://search.maven.org/search?q=a:kafka_2.11%20AND%20v:2.5.0]
> PS: Not sure what the priority ought to be, but it's definitely blocking me 
> in my work :)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10050) kafka_log4j_appender.py broken on JDK11

2020-05-27 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117673#comment-17117673
 ] 

Nikolay Izhikov commented on KAFKA-10050:
-

Results with the patch:
{noformat}
nizhikov@kafka:~/kafka$ 
TC_PATHS="tests/kafkatest/tests/tools/log4j_appender_test.py" bash 
tests/docker/run_tests.sh

> Configure project :
Building project 'core' with Scala version 2.13.2
Building project 'streams-scala' with Scala version 2.13.2

Deprecated Gradle features were used in this build, making it incompatible with 
Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See 
https://docs.gradle.org/6.4.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 1s
111 actionable tasks: 2 executed, 109 up-to-date
docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file 
/opt/kafka-dev/tests/docker/build/cluster.json  
./tests/kafkatest/tests/tools/log4j_appender_test.py"
[INFO:2020-05-27 04:25:23,949]: starting test run with session id 
2020-05-27--006...
[INFO:2020-05-27 04:25:23,949]: running 4 tests...
[INFO:2020-05-27 04:25:23,949]: Triggering test 1 of 4...
[INFO:2020-05-27 04:25:23,961]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/tools', 'file_name': 
'log4j_appender_test.py', 'method_name': 'test_log4j_appender', 'cls_name': 
'Log4jAppenderTest', 'injected_args': {'security_protocol': 'SASL_PLAINTEXT'}}
[INFO:2020-05-27 04:25:23,966]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_PLAINTEXT:
 Setting up...
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
  m.add_string(self.Q_C.public_numbers().encode_point())
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: 
CryptographyDeprecationWarning: Support for unsafe construction of public 
numbers from encoded data will be removed in a future version. Please use 
EllipticCurvePublicKey.from_encoded_point
  self.curve, Q_S_bytes
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
  hm.add_string(self.Q_C.public_numbers().encode_point())
[INFO:2020-05-27 04:25:27,733]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_PLAINTEXT:
 Running...
[INFO:2020-05-27 04:25:50,830]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_PLAINTEXT:
 PASS
[INFO:2020-05-27 04:25:50,831]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_PLAINTEXT:
 Tearing down...
[INFO:2020-05-27 04:25:58,992]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_PLAINTEXT:
 Summary: 
[INFO:2020-05-27 04:25:58,993]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_PLAINTEXT:
 Data: None
[INFO:2020-05-27 04:25:59,009]: 
~
[INFO:2020-05-27 04:25:59,010]: Triggering test 2 of 4...
[INFO:2020-05-27 04:25:59,020]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/tools', 'file_name': 
'log4j_appender_test.py', 'method_name': 'test_log4j_appender', 'cls_name': 
'Log4jAppenderTest', 'injected_args': {'security_protocol': 'SASL_SSL'}}
[INFO:2020-05-27 04:25:59,024]: RunnerClient: 
kafkatest.tests.tools.log4j_appender_test.Log4jAppenderTest.test_log4j_appender.security_protocol=SASL_SSL:
 Setting up...
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
  m.add_string(self.Q_C.public_numbers().encode_point())
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: 
CryptographyDeprecationWarning: Support for unsafe construction of public 
numbers from encoded data will be removed in a future version. Please use 
EllipticCurvePublicKey.from_encoded_point
  self.curve, Q_S_bytes
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: 
CryptographyDeprecationWarning: encode_point has bee

[GitHub] [kafka] nizhikov commented on pull request #8731: KAFKA-10050: kafka_log4j_appender.py fixed for JDK11

2020-05-27 Thread GitBox


nizhikov commented on pull request #8731:
URL: https://github.com/apache/kafka/pull/8731#issuecomment-634599948


   Hello, @ijuma 
   
   I found that `kafka_log4j_appender.py` broken for JDK11.
   This was broken by my patch - befd80b38d3ccb1aa0c6d99a899129fd5cf27774
   
   I've prepared oneliner fix
   
   Can you, please, take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8728: MINOR: Slight MetadataCache tweaks to avoid unnecessary work

2020-05-27 Thread GitBox


ijuma commented on pull request #8728:
URL: https://github.com/apache/kafka/pull/8728#issuecomment-634623654


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8726: MINOR: Remove unused `Json.legacyEncodeAsString`

2020-05-27 Thread GitBox


ijuma commented on pull request #8726:
URL: https://github.com/apache/kafka/pull/8726#issuecomment-634638112


   Unrelated test failures.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #8726: MINOR: Remove unused `Json.legacyEncodeAsString`

2020-05-27 Thread GitBox


ijuma merged pull request #8726:
URL: https://github.com/apache/kafka/pull/8726


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8731: KAFKA-10050: kafka_log4j_appender.py fixed for JDK11

2020-05-27 Thread GitBox


ijuma commented on pull request #8731:
URL: https://github.com/apache/kafka/pull/8731#issuecomment-634644538


   Thanks for the PR. Can you elaborate how the original change broke it? I 
don't see anything obviously related.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-05-27 Thread Ella Kurginyan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117750#comment-17117750
 ] 

Ella Kurginyan commented on KAFKA-7500:
---

Hi [~ryannedolan]. Now I can't find class LegacyReplicationPolicy.class. Could 
you describe where is it? I can't find it:(

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov commented on pull request #8731: KAFKA-10050: kafka_log4j_appender.py fixed for JDK11

2020-05-27 Thread GitBox


nizhikov commented on pull request #8731:
URL: https://github.com/apache/kafka/pull/8731#issuecomment-634652827


   I added call of `fix_opts_for_new_jvm` 
[here](https://github.com/apache/kafka/commit/befd80b38d3ccb1aa0c6d99a899129fd5cf27774#diff-ad7ee04aff5cb79b1478d038d59d4bbeR48).
   
   But node variable for `kafka_log4j_appender.py` don't have attribute 
'version'.
   The fix just adds this attribute from the corresponding kafka version.
   
   ```
 File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", 
line 48, in start_cmd
   cmd = fix_opts_for_new_jvm(node)
 File "/opt/kafka-dev/tests/kafkatest/services/kafka/util.py", line 36, in 
fix_opts_for_new_jvm
   if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or 
node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version 
== LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
   AttributeError: 'ClusterNode' object has no attribute 'version'
   ```




This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov edited a comment on pull request #8731: KAFKA-10050: kafka_log4j_appender.py fixed for JDK11

2020-05-27 Thread GitBox


nizhikov edited a comment on pull request #8731:
URL: https://github.com/apache/kafka/pull/8731#issuecomment-634652827


   I added call of `fix_opts_for_new_jvm` 
[here](https://github.com/apache/kafka/commit/befd80b38d3ccb1aa0c6d99a899129fd5cf27774#diff-ad7ee04aff5cb79b1478d038d59d4bbeR48).
   
   But `node` variable for `kafka_log4j_appender.py` don't have an attribute 
'version'.
   The fix just adds this attribute from the corresponding Kafka version.
   
   ```
 File "/opt/kafka-dev/tests/kafkatest/services/kafka_log4j_appender.py", 
line 48, in start_cmd
   cmd = fix_opts_for_new_jvm(node)
 File "/opt/kafka-dev/tests/kafkatest/services/kafka/util.py", line 36, in 
fix_opts_for_new_jvm
   if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or 
node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version 
== LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
   AttributeError: 'ClusterNode' object has no attribute 'version'
   ```




This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10007) Kafka consumer offset reset despite recent group activity

2020-05-27 Thread Raman Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117760#comment-17117760
 ] 

Raman Gupta commented on KAFKA-10007:
-

It certainly could be. I'm not certain KAFKA-9543 listed too many specific 
conditions -- just some vague race conditions. Is there a way to tell for sure 
if this is the same problem or not?

> Kafka consumer offset reset despite recent group activity
> -
>
> Key: KAFKA-10007
> URL: https://issues.apache.org/jira/browse/KAFKA-10007
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raman Gupta
>Priority: Major
>
> I was running a Kafka 2.3.0 broker with the default values for 
> `offset.retention.minutes` (which should be 7 days as of 2.0.0). I deployed a 
> 2.4.1 broker, along with a change in setting `offset.retention.minutes` to 14 
> days, as I have several low-traffic topics in which exactly-once processing 
> is desired.
> As I understand it, with https://issues.apache.org/jira/browse/KAFKA-4682 and 
> KIP-211, offsets should no longer be expired based on the last commit 
> timestamp, but instead on the last time the group transitioned into an Empty 
> state.
> However, the behavior I saw from Kafka upon broker shutdown was that the 
> offsets were expired for a group when as far as I can tell, they should not 
> have been. See these logs from during the cluster recycle -- during this time 
> the consumer, configured with the static group membership protocol, is always 
> running:
> {code}
> < offsets.retention.minutes using default value>>
> [2020-05-10 05:37:01,070] <>
> << Starting broker-0 on 2.4.1 with protocol version 2.3, 
> offsets.retention.minutes = 10080 >>
> kafka-0   [2020-05-10 05:37:39,682] INFO starting 
> (kafka.server.KafkaServer)
> kafka-0   [2020-05-10 05:39:42,680] INFO [GroupCoordinator 0]: Loading 
> group metadata for produs-cis-CisFileEventConsumer with generation 27 
> (kafka.coordinator.group.GroupCoordinator)
> << Recycling broker-1 on 2.4.1, protocol version 2.3, 
> offsets.retention.minutes = 10080, looks like the consumer fails because of 
> the broker going down, and kafka-0 reports: >>
> kafka-0   [2020-05-10 05:45:14,121] INFO [GroupCoordinator 0]: Member 
> cis-9c5d994c5-7hpqt-efced5ca-0b81-4720-992d-bdd8612519b3 in group 
> produs-cis-CisFileEventConsumer has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:45:14,124] INFO [GroupCoordinator 0]: Preparing 
> to rebalance group produs-cis-CisFileEventConsumer in state 
> PreparingRebalance with old generation 27 (__consumer_offsets-17) (reason: 
> removing member cis-9c5d994c5-7hpqt-efced5ca-0b81-4720-992d-bdd8612519b3 on 
> heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:45:19,479] INFO [GroupCoordinator 0]: Member 
> cis-9c5d994c5-sknlk-2b9ed8bf-348c-4a10-97d3-5f2caccce7df in group 
> produs-cis-CisFileEventConsumer has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:45:19,482] INFO [GroupCoordinator 0]: Group 
> produs-cis-CisFileEventConsumer with generation 28 is now empty 
> (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
> << and now kafka-1 starts up again, the offsets are expired >>
> kafka-1   [2020-05-10 05:46:11,229] INFO starting 
> (kafka.server.KafkaServer)
> ...
> kafka-0   [2020-05-10 05:47:42,303] INFO [GroupCoordinator 0]: Preparing 
> to rebalance group produs-cis-CisFileEventConsumer in state 
> PreparingRebalance with old generation 28 (__consumer_offsets-17) (reason: 
> Adding new member cis-9c5d994c5-sknlk-1194b4b6-81ae-4a78-89a7-c610cf8c65be 
> with group instanceid Some(cis-9c5d994c5-sknlk)) 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:47:47,611] INFO [GroupMetadataManager 
> brokerId=0] Removed 43 expired offsets in 13 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)
> kafka-0   [2020-05-10 05:48:12,308] INFO [GroupCoordinator 0]: Stabilized 
> group produs-cis-CisFileEventConsumer generation 29 (__consumer_offsets-17) 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:48:12,311] INFO [GroupCoordinator 0]: Assignment 
> received from leader for group produs-cis-CisFileEventConsumer for generation 
> 29 (kafka.coordinator.group.GroupCoordinator)
> {code}
> The group becomes empty at 2020-05-10 05:45:19,482, and then the offsets are 
> expired about two minutes later at 05:47:47,611. I can't see any reason based 
> on my understanding of how things work for this to have happened, other than 
> it being a bug of some type?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wj1918 commented on pull request #2880: New transform: ExtractFields

2020-05-27 Thread GitBox


wj1918 commented on pull request #2880:
URL: https://github.com/apache/kafka/pull/2880#issuecomment-634666303


   @dbtucker @ewencp Do we still need this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10007) Kafka consumer offset reset despite recent group activity

2020-05-27 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117792#comment-17117792
 ] 

Ismael Juma commented on KAFKA-10007:
-

Do you see segment rolling happening in the broker shortly before the offset 
out of range in the consumer?

> Kafka consumer offset reset despite recent group activity
> -
>
> Key: KAFKA-10007
> URL: https://issues.apache.org/jira/browse/KAFKA-10007
> Project: Kafka
>  Issue Type: Bug
>Reporter: Raman Gupta
>Priority: Major
>
> I was running a Kafka 2.3.0 broker with the default values for 
> `offset.retention.minutes` (which should be 7 days as of 2.0.0). I deployed a 
> 2.4.1 broker, along with a change in setting `offset.retention.minutes` to 14 
> days, as I have several low-traffic topics in which exactly-once processing 
> is desired.
> As I understand it, with https://issues.apache.org/jira/browse/KAFKA-4682 and 
> KIP-211, offsets should no longer be expired based on the last commit 
> timestamp, but instead on the last time the group transitioned into an Empty 
> state.
> However, the behavior I saw from Kafka upon broker shutdown was that the 
> offsets were expired for a group when as far as I can tell, they should not 
> have been. See these logs from during the cluster recycle -- during this time 
> the consumer, configured with the static group membership protocol, is always 
> running:
> {code}
> < offsets.retention.minutes using default value>>
> [2020-05-10 05:37:01,070] <>
> << Starting broker-0 on 2.4.1 with protocol version 2.3, 
> offsets.retention.minutes = 10080 >>
> kafka-0   [2020-05-10 05:37:39,682] INFO starting 
> (kafka.server.KafkaServer)
> kafka-0   [2020-05-10 05:39:42,680] INFO [GroupCoordinator 0]: Loading 
> group metadata for produs-cis-CisFileEventConsumer with generation 27 
> (kafka.coordinator.group.GroupCoordinator)
> << Recycling broker-1 on 2.4.1, protocol version 2.3, 
> offsets.retention.minutes = 10080, looks like the consumer fails because of 
> the broker going down, and kafka-0 reports: >>
> kafka-0   [2020-05-10 05:45:14,121] INFO [GroupCoordinator 0]: Member 
> cis-9c5d994c5-7hpqt-efced5ca-0b81-4720-992d-bdd8612519b3 in group 
> produs-cis-CisFileEventConsumer has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:45:14,124] INFO [GroupCoordinator 0]: Preparing 
> to rebalance group produs-cis-CisFileEventConsumer in state 
> PreparingRebalance with old generation 27 (__consumer_offsets-17) (reason: 
> removing member cis-9c5d994c5-7hpqt-efced5ca-0b81-4720-992d-bdd8612519b3 on 
> heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:45:19,479] INFO [GroupCoordinator 0]: Member 
> cis-9c5d994c5-sknlk-2b9ed8bf-348c-4a10-97d3-5f2caccce7df in group 
> produs-cis-CisFileEventConsumer has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:45:19,482] INFO [GroupCoordinator 0]: Group 
> produs-cis-CisFileEventConsumer with generation 28 is now empty 
> (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
> << and now kafka-1 starts up again, the offsets are expired >>
> kafka-1   [2020-05-10 05:46:11,229] INFO starting 
> (kafka.server.KafkaServer)
> ...
> kafka-0   [2020-05-10 05:47:42,303] INFO [GroupCoordinator 0]: Preparing 
> to rebalance group produs-cis-CisFileEventConsumer in state 
> PreparingRebalance with old generation 28 (__consumer_offsets-17) (reason: 
> Adding new member cis-9c5d994c5-sknlk-1194b4b6-81ae-4a78-89a7-c610cf8c65be 
> with group instanceid Some(cis-9c5d994c5-sknlk)) 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:47:47,611] INFO [GroupMetadataManager 
> brokerId=0] Removed 43 expired offsets in 13 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)
> kafka-0   [2020-05-10 05:48:12,308] INFO [GroupCoordinator 0]: Stabilized 
> group produs-cis-CisFileEventConsumer generation 29 (__consumer_offsets-17) 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-0   [2020-05-10 05:48:12,311] INFO [GroupCoordinator 0]: Assignment 
> received from leader for group produs-cis-CisFileEventConsumer for generation 
> 29 (kafka.coordinator.group.GroupCoordinator)
> {code}
> The group becomes empty at 2020-05-10 05:45:19,482, and then the offsets are 
> expired about two minutes later at 05:47:47,611. I can't see any reason based 
> on my understanding of how things work for this to have happened, other than 
> it being a bug of some type?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

2020-05-27 Thread GitBox


ijuma commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634692630


   @rajinisivaram We also retain a reference to the `NetworkReceive`, which is 
probably a bigger deal, right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

2020-05-27 Thread GitBox


vvcephei commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634694919


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-27 Thread GitBox


rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634695460


   @xiaodongdu please address the conflicts.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

2020-05-27 Thread GitBox


ijuma commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634696095


   This change is probably OK, but the way we call `close` while iterating over 
`completedReceives` seems a bit fragile. It would probably be safer to collect 
the items we need to close and close them in a separate iteration. What do you 
think @rajinisivaram?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-27 Thread GitBox


vvcephei commented on pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#issuecomment-634696168







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

2020-05-27 Thread GitBox


ijuma commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634696784


   One more thing, can we improve `KafkaChannel.hashCode/equals` to avoid 
unnecessary work? The calls to `Objects.equals` and `Objects.hash` seem 
pointless.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-05-27 Thread GitBox


bbejeck commented on pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#issuecomment-634699859


   Java 14 passed, 
   Java 11 failed with 
   ```Execution failed for task ':connect:mirror:integrationTest'.
   21:42:58 > Process 'Gradle Test Executor 48' finished with non-zero exit 
value 1
   ```
   Java 8 failed with 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > 
shouldUpgradeFromEosAlphaToEosBeta[true] FAILED`
   
   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

2020-05-27 Thread GitBox


vvcephei commented on pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#issuecomment-634702382


   The test failures were the result of extra tests that were added on trunk 
after this branch. The branch builder does a merge with trunk before running 
the tests. I'm rebasing and fixing the tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10017:
-
Priority: Blocker  (was: Major)

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-05-27 Thread GitBox


tombentley commented on pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#issuecomment-634705442


   @mimaison I've fixed those, if you want to make a 2nd pass. I guess I should 
add a test to `KafkaAdminClientTest` too.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10017:
-
Affects Version/s: 2.6.0

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-27 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117819#comment-17117819
 ] 

John Roesler commented on KAFKA-10017:
--

I've just marked this as a blocker for 2.6.0. It's been failing with very high 
frequency on branch builds, and it looks like it may reveal an actual bug in 
EOS Beta.

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-27 Thread GitBox


rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r431194915



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map configs) 
throws ConfigExcept
 default void reconfigure(Map configs) {
 }
 
+/**
+ * Provides context labels for the service or library exposing metrics
+ *
+ * @param metricsContext the metric context
+ */
+@InterfaceStability.Evolving
+default void contextChange(MetricsContext metricsContext) {

Review comment:
   If there is no clear call pattern, then it's fine to not say anything. 
However, `JmxReporter.contextChange(...)` seems to assume that 
`contextChange(...)` will be called before any metrics are added via 
`init(...)`.
   
   If that call pattern is true, then I think we should document it. If it's 
also true it can be called later, then mention this as well. For example, the 
JavaDoc text on `contextChange(...)` could be something like:
   
   > Sets the context labels for the service or library that is exposing 
metrics. 
   > This will be called before {@link #init(List)} and may be called anytime 
after that.
   
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-27 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r431206147



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/DelayedJoinTest.scala
##
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+
+import kafka.utils.MockTime
+import org.easymock.EasyMock
+import org.junit.{Assert, Test}
+
+class DelayedJoinTest {

Review comment:
   make sure ```DelayedJoinTest``` does not cause deadlock 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-27 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r431208549



##
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
  group: GroupMetadata,
- rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+ rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we 
keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: 
scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.
+   *
+   */
+  override def tryComplete(): Boolean = try group.inLock {
+/**
+ * holds the group lock for both the "group.hasAllMembersJoined" check and 
the call to forceComplete()
+ */
+if (group.hasAllMembersJoined) forceComplete()

Review comment:
   @junrao both methods are executed with lock





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-27 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r431209368



##
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
  group: GroupMetadata,
- rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+ rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, None) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we 
keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: 
scala.collection.Map[TopicPartition, LeaderHWChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since GroupCoordinator#onCompleteJoin() tries to complete 
delayed requests.
+   *
+   */
+  override def tryComplete(): Boolean = try group.inLock {
+/**
+ * holds the group lock for both the "group.hasAllMembersJoined" check and 
the call to forceComplete()
+ */
+if (group.hasAllMembersJoined) forceComplete()
+else false
+  } finally completeDelayedRequests()
+  override def onExpiration(): Unit = coordinator.onExpireJoin()
+  override def onComplete(): Unit = try partitionsToComplete = 
coordinator.onCompleteJoin(group)
+  finally completeDelayedRequests()
+
+  /**
+   * try to complete delayed requests only if the caller does not hold the 
group lock.
+   * This method is called by following cases:
+   * 1) tryComplete -> hold lock -> onComplete -> release lock -> 
completeDelayedRequests
+   * 2) onComplete -> completeDelayedRequests
+   */
+  private[group] def completeDelayedRequests(): Unit = if 
(!group.lock.isHeldByCurrentThread) {

Review comment:
   this is a workaround to deal with deadlock caused by taking multiples 
group locks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

2020-05-27 Thread GitBox


rhauch commented on pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#issuecomment-634721500


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-05-27 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r431210497



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -65,13 +65,24 @@ import scala.compat.java8.OptionConverters._
 /*
  * Result metadata of a log append operation on the log
  */
-case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = 
None) {
+case class LogAppendResult(info: LogAppendInfo,
+   exception: Option[Throwable] = None,
+   leaderHWChange: LeaderHWChange = 
LeaderHWChange.None) {
   def error: Errors = exception match {
 case None => Errors.NONE
 case Some(e) => Errors.forException(e)
   }
 }
 
+/**
+ * a flag indicting whether the HWM has been changed.
+ */
+sealed trait LeaderHWChange

Review comment:
   this enum type is more readable than ```boolean```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #8376: KAFKA-9724 Newer clients not always sending fetch request to older brokers

2020-05-27 Thread GitBox


mumrah commented on pull request #8376:
URL: https://github.com/apache/kafka/pull/8376#issuecomment-634738632


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431247169



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -261,6 +261,42 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 Assert.assertEquals(1, exitCode);
 }
 
+public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
+appID = testId + "-with-force-option";
+streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT * 100);
+
+// Run
+streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+streams.start();
+final List> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+streams.close();
+
+// RESET
+streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+streams.cleanUp();
+
+// Reset would fail since long session timeout has been configured
+final boolean cleanResult = tryCleanGlobal(false, null, null);
+Assert.assertEquals(false, cleanResult);
+
+// Reset will success with --force, it will force delete active 
members on broker side
+cleanGlobal(false, "--force", null);
+
+waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT);
+
+assertInternalTopicsGotDeleted(null);
+
+// RE-RUN

Review comment:
   This is to verify that after the `successfully force removal of active 
members`, the stream application re-run can send exactly the same records again 
to the output topics





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8720: KAFKA-9971: Error Reporting in Sink Connectors

2020-05-27 Thread GitBox


rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431212650



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link 
SinkTask#put(Collection)}.
+ * Reporter of problematic records and the corresponding problems.
+ *
+ * @since 2.6
+ */
+public interface ErrantRecordReporter {
+
+  /**
+   * Report a problematic record and the corresponding error to be written to 
the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * This call is asynchronous and returns a {@link 
java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future 
will block until the
+   * record has been written or throw any exception that occurred while 
sending the record.
+   * If you want to simulate a simple blocking call you can call the 
get() method
+   * immediately.
+   *
+   * Connect guarantees that sink records reported through this reporter will 
be written to the error topic
+   * before the framework calls the {@link SinkTask#preCommit(Map)} method and 
therefore before

Review comment:
   You need to qualify `Map` or import it:
   ```suggestion
  * before the framework calls the {@link 
SinkTask#preCommit(java.util.Map)} method and therefore before
   ```

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link 
SinkTask#put(Collection)}.

Review comment:
   This is not legal JavaDoc:
   ```suggestion
* Component that the sink task can use as it {@link 
SinkTask#put(java.util.Collection)}.
   ```

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##
@@ -100,4 +110,29 @@ public String toString() {
 ", timestampType=" + timestampType +
 "} " + super.toString();
 }
+
+public class InternalSinkRecord extends SinkRecord {
+
+ConsumerRecord originalRecord;

Review comment:
   When you move `InternalSinkRecord` to the `runtime` module, be sure to 
make this `private final`.

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##
@@ -68,6 +70,14 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
 return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
 }
 
+public InternalSinkRecord newRecord(String topic, Integer kafkaPartition, 
Schema keySchema, Object key, Schema valueSchema, Object value,
+long kafkaOffset, Long timestamp,
+TimestampType timestampType, 
Iterable headers,
+ConsumerRecord 
originalRecord) {
+return new InternalSinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value,
+kafkaOffset, timestamp, timestampType, headers, origin

[GitHub] [kafka] rhauch commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-27 Thread GitBox


rhauch commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-634766420


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #7886: KAFKA-9353: Add groupInstanceId to DescribeGroup for better visibility

2020-05-27 Thread GitBox


hachikuji merged pull request #7886:
URL: https://github.com/apache/kafka/pull/7886


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

2020-05-27 Thread GitBox


rajinisivaram commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634769086


   @ijuma Based on our discussion, I have added 
`Selector#clearCompletedSends()` and `Selector#clearCompletedReceives()` for 
SocketServer to clear buffers after they are processed. Also updated 
KafkaChannel. Can you review please? Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9353) Add groupInstanceId to DescribeGroup for better visibility

2020-05-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9353.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Add groupInstanceId to DescribeGroup for better visibility
> --
>
> Key: KAFKA-9353
> URL: https://issues.apache.org/jira/browse/KAFKA-9353
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
> Fix For: 2.6.0
>
>
> Kafka-8538 has already added `group.instance.id` to `MemberDescription` but 
> didn't print it, so we just print it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-27 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r431267044



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map configs) 
throws ConfigExcept
 default void reconfigure(Map configs) {
 }
 
+/**
+ * Provides context labels for the service or library exposing metrics
+ *
+ * @param metricsContext the metric context
+ */
+@InterfaceStability.Evolving
+default void contextChange(MetricsContext metricsContext) {

Review comment:
   Sounds good. Updated javadoc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431266629



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3623,22 +3641,26 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
 new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
 
-Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+List members;
+if (options.removeAll()) {
+members = getMembersFromGroup(groupId);
+} else {
+members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+}
+Call findCoordinatorCall = getFindCoordinatorCall(context, () -> 
getRemoveMembersFromGroupCall(context, members));
 runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
 return new RemoveMembersFromConsumerGroupResult(future, 
options.members());

Review comment:
   --- If option.members() is empty, it implies that we do a removeAll()
   => Yes, that is correct.
   
   --- hence, should we pass in members into the 
RemoveMembersFromConsumerGroupResult instead of options.members()
   => The members is of type `List` and `MemberIdentity` 
contains field: `memberId` which supports the removal of dynamic members, while 
`options.members()` is of type: `Set`, MemberToRemove only 
supports static member removal specification, in 
RemoveMembersFromConsumerGroupResult we treat similarly like in 
`RemoveMembersFromConsumerGroupOptions`, empty `members` implies `removeAll`,
   we handle it in this way because we think in `non removeAll` scenario we 
would only remove static members, while in `removeAll` scenario we may remove 
both static and dynamic members.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4793) Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed tasks

2020-05-27 Thread Jun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117896#comment-17117896
 ] 

Jun Wang commented on KAFKA-4793:
-

{{Look at the pause endpoint, it pause it's tasks.}}

{{}}
{noformat}
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops 
message processing until the connector is resumed 
{noformat}
The existing restart endpoint should restart the tasks as well to be consistent 
with pause endpoint.

Suggest fix the existing restart endpoint instead of create a new endpoint, so 
that avoid a KIP.

{{}}
{noformat}
POST /connectors/{name}/restart - restart a connector (typically because it has 
failed)

{noformat}
 

> Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed 
> tasks
> -
>
> Key: KAFKA-4793
> URL: https://issues.apache.org/jira/browse/KAFKA-4793
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Priority: Major
>  Labels: needs-kip
>
> Sometimes tasks stop due to repeated failures. Users will want to restart the 
> connector and have it retry after fixing an issue. 
> We expected "POST /connectors/(string: name)/restart" to cause retry of 
> failed tasks, but this doesn't appear to be the case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4793) Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed tasks

2020-05-27 Thread Jun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117896#comment-17117896
 ] 

Jun Wang edited comment on KAFKA-4793 at 5/27/20, 4:23 PM:
---

{{Look at the pause endpoint, it pause it's tasks.}}
{noformat}
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops 
message processing until the connector is resumed 
{noformat}
The existing restart endpoint should restart the tasks as well to be consistent 
with pause endpoint.

Suggest fix the existing restart endpoint instead of create a new endpoint, so 
that avoid a KIP.
{noformat}
POST /connectors/{name}/restart - restart a connector (typically because it has 
failed)

{noformat}
 


was (Author: wj1918):
{{Look at the pause endpoint, it pause it's tasks.}}

{{}}
{noformat}
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops 
message processing until the connector is resumed 
{noformat}
The existing restart endpoint should restart the tasks as well to be consistent 
with pause endpoint.

Suggest fix the existing restart endpoint instead of create a new endpoint, so 
that avoid a KIP.
{noformat}
POST /connectors/{name}/restart - restart a connector (typically because it has 
failed)

{noformat}
 

> Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed 
> tasks
> -
>
> Key: KAFKA-4793
> URL: https://issues.apache.org/jira/browse/KAFKA-4793
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Priority: Major
>  Labels: needs-kip
>
> Sometimes tasks stop due to repeated failures. Users will want to restart the 
> connector and have it retry after fixing an issue. 
> We expected "POST /connectors/(string: name)/restart" to cause retry of 
> failed tasks, but this doesn't appear to be the case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4793) Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed tasks

2020-05-27 Thread Jun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117896#comment-17117896
 ] 

Jun Wang edited comment on KAFKA-4793 at 5/27/20, 4:23 PM:
---

{{Look at the pause endpoint, it pause it's tasks.}}

{{}}
{noformat}
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops 
message processing until the connector is resumed 
{noformat}
The existing restart endpoint should restart the tasks as well to be consistent 
with pause endpoint.

Suggest fix the existing restart endpoint instead of create a new endpoint, so 
that avoid a KIP.
{noformat}
POST /connectors/{name}/restart - restart a connector (typically because it has 
failed)

{noformat}
 


was (Author: wj1918):
{{Look at the pause endpoint, it pause it's tasks.}}

{{}}
{noformat}
PUT /connectors/{name}/pause - pause the connector and its tasks, which stops 
message processing until the connector is resumed 
{noformat}
The existing restart endpoint should restart the tasks as well to be consistent 
with pause endpoint.

Suggest fix the existing restart endpoint instead of create a new endpoint, so 
that avoid a KIP.

{{}}
{noformat}
POST /connectors/{name}/restart - restart a connector (typically because it has 
failed)

{noformat}
 

> Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed 
> tasks
> -
>
> Key: KAFKA-4793
> URL: https://issues.apache.org/jira/browse/KAFKA-4793
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Priority: Major
>  Labels: needs-kip
>
> Sometimes tasks stop due to repeated failures. Users will want to restart the 
> connector and have it retry after fixing an issue. 
> We expected "POST /connectors/(string: name)/restart" to cause retry of 
> failed tasks, but this doesn't appear to be the case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9878) Block AddPartitionsToTxn call until the txn markers are committed

2020-05-27 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9878:
--

Assignee: HaiyuanZhao

> Block AddPartitionsToTxn call until the txn markers are committed
> -
>
> Key: KAFKA-9878
> URL: https://issues.apache.org/jira/browse/KAFKA-9878
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, producer 
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
>
> Currently the EndTxn call from Producer will immediately return as the 
> control record is written to the txn coordinator log. The ongoing transaction 
> will be going to a pending state to wait for all txn markers to be 
> propagated. In the meantime, producer client will start another new 
> transaction but being rejected constantly until the pending state gets 
> resolved, which is unnecessary round trips and more burden to the broker to 
> handle repetitive requests.
> To avoid this situation, we should make the Producer client wait for txn 
> marker completion instead. This will incur better performance overall, as no 
> more back-off shall be triggered for a subsequent transaction to begin.
> On the other hand, we could also batch complete the AddPartitionsToTxn 
> results if we buffered more than one request in the queue.
> The third change is on the client side, which is to maintain the futures of 
> the AddPartitionsToTxn calls to make more inflight changes as necessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

2020-05-27 Thread GitBox


hachikuji commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r431288110



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
 copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+"LogReadResult(" +
+  s"info=$info, " +
+  s"highWatermark=$highWatermark, " +
+  s"leaderLogStartOffset=$leaderLogStartOffset, " +
+  s"leaderLogEndOffset=$leaderLogEndOffset, " +
+  s"followerLogStartOffset=$followerLogStartOffset, " +
+  s"fetchTimeMs=$fetchTimeMs, " +
+  s"preferredReadReplica=$preferredReadReplica, " +
+  s"lastStableOffset=$lastStableOffset, " +
+  s"error=$error" +
+  ")"

Review comment:
   Ok, I added something like that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10049:

Priority: Major  (was: Blocker)

> KTable-KTable Foreign Key join throwing Serialization Exception 
> 
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Amit Chauhan
>Priority: Major
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>  Properties props = new Properties();
>  props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>  props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>  StreamsBuilder builder = new StreamsBuilder();
>  KTable ordersTable = builder. OrderObject>table(TOPIC_Agora);
>  KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data);
>  KTable enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
> 
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
> 
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
> 
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
> 
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>  enriched.toStream().foreach(new ForeachAction() \{
>  @Override
> public void apply(String arg0, EnrichedOrder arg1) {
>  logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
>  KafkaStreams streams = new KafkaStreams(builder.build(), props);
>  streams.start();
>  Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
> 
> org.apache.kafka
> kafka-clients
> 2.5.0
> 
> 
> org.apache.kafka
> kafka-streams
> 2.5.0
> 
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-00: 
> org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.Proce

[GitHub] [kafka] rhauch commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-27 Thread GitBox


rhauch commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431281159



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Filter;
+import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
+import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
+import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * An integration test for connectors with transformations
+ */
+@Category(IntegrationTest.class)
+public class TransformationIntegrationTest {
+
+private static final int NUM_RECORDS_PRODUCED = 2000;
+private static final int NUM_TOPIC_PARTITIONS = 3;
+private static final long RECORD_TRANSFER_DURATION_MS = 
TimeUnit.SECONDS.toMillis(30);
+private static final long OBSERVED_RECORDS_DURATION_MS = 
TimeUnit.SECONDS.toMillis(60);
+private static final int NUM_TASKS = 3;
+private static final int NUM_WORKERS = 3;
+private static final String CONNECTOR_NAME = "simple-conn";
+private static final String SINK_CONNECTOR_CLASS_NAME = 
MonitorableSinkConnector.class.getSimpleName();
+private static final String SOURCE_CONNECTOR_CLASS_NAME = 
MonitorableSourceConnector.class.getSimpleName();
+
+private EmbeddedConnectCluster connect;
+private ConnectorHandle connectorHandle;
+
+@Before
+public void setup() {
+// setup Connect worker properties
+Map exampleWorkerProps = new HashMap<>();
+exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
+
+// setup Kafka broker properties
+Properties exampleBrokerProps = new Properties();
+exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+// build a Connect cluster backed by Kafka and Zk
+connect = new EmbeddedConnectCluster.Builder()
+.name("connect-cluster")
+.numWorkers(NUM_WORKERS)
+.numBrokers(1)
+.workerProps(exampleWorkerProps)
+.brokerProps(exampleBrokerProps)
+.build();
+
+// start the clusters
+connect.start();

Review comment:
   Should we wait until all brokers and Connect workers are available, via 
something like:
   ```
   connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, 
"Brokers did not start in time.");
   connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, 
"Worker did not start in time.");
   ```

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/transforms/predi

[jira] [Updated] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10049:
-
Affects Version/s: 2.6.0

> KTable-KTable Foreign Key join throwing Serialization Exception 
> 
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Amit Chauhan
>Priority: Blocker
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>  Properties props = new Properties();
>  props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>  props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>  StreamsBuilder builder = new StreamsBuilder();
>  KTable ordersTable = builder. OrderObject>table(TOPIC_Agora);
>  KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data);
>  KTable enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
> 
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
> 
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
> 
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
> 
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>  enriched.toStream().foreach(new ForeachAction() \{
>  @Override
> public void apply(String arg0, EnrichedOrder arg1) {
>  logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
>  KafkaStreams streams = new KafkaStreams(builder.build(), props);
>  streams.start();
>  Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
> 
> org.apache.kafka
> kafka-clients
> 2.5.0
> 
> 
> org.apache.kafka
> kafka-streams
> 2.5.0
> 
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-00: 
> org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.Processor

[jira] [Updated] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10049:
-
Priority: Blocker  (was: Major)

> KTable-KTable Foreign Key join throwing Serialization Exception 
> 
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Amit Chauhan
>Priority: Blocker
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>  Properties props = new Properties();
>  props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>  props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>  StreamsBuilder builder = new StreamsBuilder();
>  KTable ordersTable = builder. OrderObject>table(TOPIC_Agora);
>  KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data);
>  KTable enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
> 
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
> 
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
> 
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
> 
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>  enriched.toStream().foreach(new ForeachAction() \{
>  @Override
> public void apply(String arg0, EnrichedOrder arg1) {
>  logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
>  KafkaStreams streams = new KafkaStreams(builder.build(), props);
>  streams.start();
>  Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
> 
> org.apache.kafka
> kafka-clients
> 2.5.0
> 
> 
> org.apache.kafka
> kafka-streams
> 2.5.0
> 
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-00: 
> org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.Processor

[jira] [Commented] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117922#comment-17117922
 ] 

John Roesler commented on KAFKA-10049:
--

I've marked this as a 2.6 blocker. It's not immediately clear to me what the 
root cause might be.

> KTable-KTable Foreign Key join throwing Serialization Exception 
> 
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Amit Chauhan
>Priority: Blocker
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>  Properties props = new Properties();
>  props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>  props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>  StreamsBuilder builder = new StreamsBuilder();
>  KTable ordersTable = builder. OrderObject>table(TOPIC_Agora);
>  KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data);
>  KTable enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
> 
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
> 
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
> 
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
> 
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>  enriched.toStream().foreach(new ForeachAction() \{
>  @Override
> public void apply(String arg0, EnrichedOrder arg1) {
>  logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
>  KafkaStreams streams = new KafkaStreams(builder.build(), props);
>  streams.start();
>  Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
> 
> org.apache.kafka
> kafka-clients
> 2.5.0
> 
> 
> org.apache.kafka
> kafka-streams
> 2.5.0
> 
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-00: 
> org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(Proce

[GitHub] [kafka] ijuma commented on a change in pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

2020-05-27 Thread GitBox


ijuma commented on a change in pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#discussion_r431300357



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -109,9 +109,19 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =
 copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
 
-  override def toString =
-s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], 
error: [$error]"
+  override def toString = {
+"LogReadResult(" +
+  s"info=$info, " +
+  s"highWatermark=$highWatermark, " +
+  s"leaderLogStartOffset=$leaderLogStartOffset, " +
+  s"leaderLogEndOffset=$leaderLogEndOffset, " +
+  s"followerLogStartOffset=$followerLogStartOffset, " +
+  s"fetchTimeMs=$fetchTimeMs, " +
+  s"preferredReadReplica=$preferredReadReplica, " +
+  s"lastStableOffset=$lastStableOffset, " +
+  s"error=$error" +
+  ")"

Review comment:
   Sweet!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

2020-05-27 Thread GitBox


ijuma commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634803238


   Argh:
   
   > 09:53:20 [Error] 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/core/src/main/scala/kafka/utils/CoreUtils.scala:326:
 value productElementName is not a member of Product
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

2020-05-27 Thread GitBox


ijuma commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634803985


   Looks like `productToString` only works with Scala 2.13. :(



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431302352



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -51,9 +52,21 @@
 if (throwable != null) {
 result.completeExceptionally(throwable);
 } else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {

Review comment:
   Because in non `removeAll` scenario, we have put the members to be 
deleted in the `RemoveMembersFromConsumerGroupResult#memberInfos`, while in the 
`removeAll` scenario, we don't do so(members to be deleted are decided in the 
private method: `KafkaAdminClient#getMembersFromGroup` of `KafkaAdminClient`). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-27 Thread GitBox


tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431305276



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with at least one header with the 
configured name.
+ * @param  The type of connect record.
+ */
+public class HasHeaderKey> implements Predicate {
+
+private static final String NAME_CONFIG = "name";
+private static final ConfigDef CONFIG_DEF = new 
ConfigDef().define(NAME_CONFIG, ConfigDef.Type.STRING, null,

Review comment:
   Ah, thanks, I'd not realised that was the point of 
`ConfigDef.NO_DEFAULT_VALUE`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431305795



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -66,6 +79,9 @@
  * Returns the selected member future.
  */
 public KafkaFuture memberResult(MemberToRemove member) {
+if (removeAll()) {
+throw new IllegalArgumentException("The method: memberResult is 
not applicable in 'removeAll' mode");

Review comment:
   Since in the `removeAll` scenario, we don't save the members to be 
deleted in `RemoveMembersFromConsumerGroupResult`,  so I think calling 
`memberResult` doesn't seem applicative.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10049:
---

Assignee: John Roesler

> KTable-KTable Foreign Key join throwing Serialization Exception 
> 
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Amit Chauhan
>Assignee: John Roesler
>Priority: Blocker
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>  Properties props = new Properties();
>  props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>  props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>  StreamsBuilder builder = new StreamsBuilder();
>  KTable ordersTable = builder. OrderObject>table(TOPIC_Agora);
>  KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data);
>  KTable enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
> 
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
> 
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
> 
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
> 
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>  enriched.toStream().foreach(new ForeachAction() \{
>  @Override
> public void apply(String arg0, EnrichedOrder arg1) {
>  logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
>  KafkaStreams streams = new KafkaStreams(builder.build(), props);
>  streams.start();
>  Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
> 
> org.apache.kafka
> kafka-clients
> 2.5.0
> 
> 
> org.apache.kafka
> kafka-streams
> 2.5.0
> 
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-00: 
> org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
> at 
> org.apa

[jira] [Created] (KAFKA-10052) Flaky Test InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers

2020-05-27 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10052:
---

 Summary: Flaky Test 
InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
 Key: KAFKA-10052
 URL: https://issues.apache.org/jira/browse/KAFKA-10052
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sophie Blee-Goldman
 Fix For: 2.6.0


h3. Stacktrace

 
{code:java}
java.lang.NullPointerException
  at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertTopicSettings(EmbeddedConnectClusterAssertions.java:207)
 
  at 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.assertInternalTopicSettings(InternalTopicsIntegrationTest.java:148)
 
  at
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers(InternalTopicsIntegrationTest.java:118){code}
 

 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6539/testReport/junit/org.apache.kafka.connect.integration/InternalTopicsIntegrationTest/testCreateInternalTopicsWithFewerReplicasThanBrokers/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10047) Unnecessary widening of (int to long) scope in FloatSerializer

2020-05-27 Thread Guru Tahasildar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guru Tahasildar reassigned KAFKA-10047:
---

Assignee: Guru Tahasildar

> Unnecessary widening of (int to long) scope in FloatSerializer
> --
>
> Key: KAFKA-10047
> URL: https://issues.apache.org/jira/browse/KAFKA-10047
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Guru Tahasildar
>Assignee: Guru Tahasildar
>Priority: Trivial
>
> The following code is present in FloatSerializer:
> {code}
> long bits = Float.floatToRawIntBits(data);
> return new byte[] {
> (byte) (bits >>> 24),
> (byte) (bits >>> 16),
> (byte) (bits >>> 8),
> (byte) bits
> };
> {code}
> {{Float.floatToRawIntBits()}} returns an {{int}} but, the result is assigned 
> to a {{long}} so there is a widening of scope. This is not needed for any 
> subsequent operations hence, can be changed to use {{int}}.
> I would like to volunteer to make this change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-27 Thread GitBox


tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r431308724



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.util.RegexValidator;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+/**
+ * A predicate which is true for records with a topic name that matches the 
configured regular expression.
+ * @param  The type of connect record.
+ */
+public class TopicNameMatches> implements 
Predicate {
+
+private static final String PATTERN_CONFIG = "pattern";
+private static final ConfigDef CONFIG_DEF = new 
ConfigDef().define(PATTERN_CONFIG, ConfigDef.Type.STRING, ".*",
+new RegexValidator(), ConfigDef.Importance.MEDIUM,
+"A Java regular expression for matching against the name of a 
record's topic.");

Review comment:
   I changed it to `.*` only when I realised that the default had to be 
valid and before I knew about `NO_DEFAULT_VALUE`, so using `NO_DEFAULT_VALUE` 
is good. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431312462



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3623,22 +3641,26 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
 new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
 
-Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+List members;
+if (options.removeAll()) {
+members = getMembersFromGroup(groupId);
+} else {
+members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+}
+Call findCoordinatorCall = getFindCoordinatorCall(context, () -> 
getRemoveMembersFromGroupCall(context, members));
 runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
 return new RemoveMembersFromConsumerGroupResult(future, 
options.members());
 }
 
-private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context) {
+private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext,
+RemoveMembersFromConsumerGroupOptions> context, 
List members) {

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10049) KTable-KTable Foreign Key join throwing Serialization Exception

2020-05-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117933#comment-17117933
 ] 

Matthias J. Sax commented on KAFKA-10049:
-

Assigned the ticket to [~vvcephei] for now. Not sure if [~abellemare] would be 
interested to pick it up? Think we should try to fix it for 2.6.0 if possible 
(and 2.5.1). We have like 2 weeks till code freeze.

> KTable-KTable Foreign Key join throwing Serialization Exception 
> 
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Amit Chauhan
>Assignee: John Roesler
>Priority: Blocker
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>  Properties props = new Properties();
>  props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>  props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>  StreamsBuilder builder = new StreamsBuilder();
>  KTable ordersTable = builder. OrderObject>table(TOPIC_Agora);
>  KTable stockTable = builder. StockMarketData>table(TOPIC_Stock_Data);
>  KTable enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
> 
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
> 
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
> 
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
> 
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>  enriched.toStream().foreach(new ForeachAction() \{
>  @Override
> public void apply(String arg0, EnrichedOrder arg1) {
>  logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
>  KafkaStreams streams = new KafkaStreams(builder.build(), props);
>  streams.start();
>  Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
> 
> org.apache.kafka
> kafka-clients
> 2.5.0
> 
> 
> org.apache.kafka
> kafka-streams
> 2.5.0
> 
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-00: 
> org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> at 
> org.apache.kafka.streams.processor.internals.SinkNode

[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431313074



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -37,7 +38,15 @@ public 
RemoveMembersFromConsumerGroupOptions(Collection members)
 this.members = new HashSet<>(members);

Review comment:
   Make sense. It will throw exception if empty members provided now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431313236



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -379,6 +380,22 @@ private static MetadataResponse 
prepareMetadataResponse(Cluster cluster, Errors
 MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
 }
 
+private static DescribeGroupsResponseData 
prepareDescribeGroupsResponseData(String groupId, List groupInstances,

Review comment:
   Fixed

##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -1017,47 +1017,70 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   assertTrue(0 == list1.errors().get().size())
   assertTrue(0 == list1.valid().get().size())
   val testTopicName = "test_topic"
+  val testTopicName1 = testTopicName + "1"
+  val testTopicName2 = testTopicName + "2"
   val testNumPartitions = 2
-  client.createTopics(Collections.singleton(
-new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get()
-  waitForTopics(client, List(testTopicName), List())
+
+  client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, 
testNumPartitions, 1.toShort),

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

2020-05-27 Thread GitBox


vvcephei commented on pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#issuecomment-634820186


   The only failures were:
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #8732: KAFKA-10017: disable flaky EosBetaUpgradeIntegrationTest to stabilize build

2020-05-27 Thread GitBox


mjsax opened a new pull request #8732:
URL: https://github.com/apache/kafka/pull/8732


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

2020-05-27 Thread GitBox


vvcephei commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634821709


   The only failing tests were unrelated:
   
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   Note that the java 8 build is not actually running.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

2020-05-27 Thread GitBox


vvcephei commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634823280


   Unbelievable. The java 8 build started running right after I mentioned that 
it's not running.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

2020-05-27 Thread GitBox


guozhangwang commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r431322682



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final 
Object maxInFlightReque
 consumerProps.put(REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));
 consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));
 consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));
+consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, 
getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, 
getInt(MAX_WARMUP_REPLICAS_CONFIG));
+consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
   Regarding KAFKA-10046, in current trunk we already have some logic that 
assumes the default partition grouper is always used, so I'd suggest we just 
bite the bullet and remove it in 2.6.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


mjsax commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431322560



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -261,6 +261,42 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 Assert.assertEquals(1, exitCode);
 }
 
+public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
+appID = testId + "-with-force-option";
+streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT * 100);
+
+// Run
+streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+streams.start();
+final List> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+streams.close();
+
+// RESET
+streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+streams.cleanUp();
+
+// Reset would fail since long session timeout has been configured
+final boolean cleanResult = tryCleanGlobal(false, null, null);
+Assert.assertEquals(false, cleanResult);
+
+// Reset will success with --force, it will force delete active 
members on broker side
+cleanGlobal(false, "--force", null);
+
+waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT);
+
+assertInternalTopicsGotDeleted(null);
+
+// RE-RUN

Review comment:
   Seems redundant as tested somewhere else. And the purpose of the test is 
to verify `--force` itself. This additional checks have nothing to do with 
`--force` IMHO. It seems best to keep test to a "minimum". 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8709: KAFKA-9952; Remove immediate fetch completion logic on high watermark updates

2020-05-27 Thread GitBox


hachikuji commented on pull request #8709:
URL: https://github.com/apache/kafka/pull/8709#issuecomment-634825301


   Bummer. Guess I will revert the commit.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


mjsax commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431323350



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3623,22 +3641,26 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
 new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
 
-Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+List members;
+if (options.removeAll()) {
+members = getMembersFromGroup(groupId);
+} else {
+members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+}
+Call findCoordinatorCall = getFindCoordinatorCall(context, () -> 
getRemoveMembersFromGroupCall(context, members));
 runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
 return new RemoveMembersFromConsumerGroupResult(future, 
options.members());

Review comment:
   Thanks for clarifying.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-27 Thread GitBox


guozhangwang merged pull request #8221:
URL: https://github.com/apache/kafka/pull/8221


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-27 Thread GitBox


guozhangwang commented on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-634827126


   Thanks for your patience @avalsa ! Merged to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9561) Update task input partitions when topic metadata changes

2020-05-27 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9561.
--
Resolution: Fixed

> Update task input partitions when topic metadata changes
> 
>
> Key: KAFKA-9561
> URL: https://issues.apache.org/jira/browse/KAFKA-9561
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Stepanenko Vyacheslav
>Priority: Major
>  Labels: beginner, newbie, stream
> Fix For: 2.6.0
>
>
> With https://issues.apache.org/jira/browse/KAFKA-9545, we exposed a 
> possibility that a task could have been alive throughout the rebalance, while 
> the input partitions actually change. For example, a regex subscribed source 
> could have different topics when partitions are added/removed. We need to 
> consider adding the support to expand/shrink the partitions across rebalance 
> to keep task information consistent with subscription data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #8697: KAFKA-9983: KIP-613, add INFO level e2e latency metrics

2020-05-27 Thread GitBox


ableegoldman commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-634827871


   > The java 8 build started running right after I mentioned that it's not 
running
   
   I hope you'll choose to use these new powers for good



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


mjsax commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431327264



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -51,9 +52,21 @@
 if (throwable != null) {
 result.completeExceptionally(throwable);
 } else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {

Review comment:
   Well, while `memberInfo` is empty for the `removeAll` case, I am still 
wondering if the code for `removeAll` would not work for the other case, too?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-27 Thread GitBox


mjsax commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r431329158



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -66,6 +79,9 @@
  * Returns the selected member future.
  */
 public KafkaFuture memberResult(MemberToRemove member) {
+if (removeAll()) {
+throw new IllegalArgumentException("The method: memberResult is 
not applicable in 'removeAll' mode");

Review comment:
   I see. Makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-27 Thread GitBox


mjsax commented on pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#issuecomment-634834525


   Java 8: 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
   Java 11 and 14 passed. Merging this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   >