jsancio commented on a change in pull request #10253:
URL: https://github.com/apache/kafka/pull/10253#discussion_r586814323
##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -328,8 +328,21 @@ public void register(MetaLogListener listener) throws
Exception {
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
- return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
- batch.stream().map(r ->
r.message()).collect(Collectors.toList())));
+ return scheduleAtomicWrite(epoch, batch);
+ }
+
+ @Override
+ public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion>
batch) {
+ return shared.tryAppend(
+ nodeId,
+ leader.epoch(),
+ new LocalRecordBatch(
+ batch
+ .stream()
+ .map(r -> r.message())
Review comment:
Done.
##########
File path:
metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
##########
@@ -28,15 +28,21 @@
class ControllerResult<T> {
private final List<ApiMessageAndVersion> records;
private final T response;
+ private final boolean isAtomic;
public ControllerResult(T response) {
this(new ArrayList<>(), response);
}
public ControllerResult(List<ApiMessageAndVersion> records, T response) {
+ this(records, response, false);
+ }
+
+ public ControllerResult(List<ApiMessageAndVersion> records, T response,
boolean isAtomic) {
Review comment:
I agree with your style preference. Changed.
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
##########
@@ -135,18 +135,43 @@ public void testIncrementalAlterConfigs() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(),
snapshotRegistry, CONFIGS);
- assertEquals(new ControllerResult<Map<ConfigResource,
ApiError>>(Collections.singletonList(
- new ApiMessageAndVersion(new ConfigRecord().
- setResourceType(TOPIC.id()).setResourceName("mytopic").
- setName("abc").setValue("123"), (short) 0)),
- toMap(entry(BROKER0, new ApiError(
- Errors.INVALID_REQUEST, "A DELETE op was given with a non-null
value.")),
- entry(MYTOPIC, ApiError.NONE))),
- manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
- entry("foo.bar", entry(DELETE, "abc")),
- entry("quux", entry(SET, "abc")))),
- entry(MYTOPIC, toMap(
- entry("abc", entry(APPEND, "123")))))));
+ assertEquals(
Review comment:
Yes. It should be clearer now that we have `ControllerResult::atomicOf`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]