Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
RexXiong closed pull request #3568: [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient URL: https://github.com/apache/celeborn/pull/3568 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
RexXiong commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3737741705 Thanks, merge to main(v0.7.0) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3717040976 @HolyLow @SteNicholas @RexXiong @FMX wanted to follow up and see if any of you had a chance to look over my PR again. Thank you for your time! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3697624108 After taking a closer look it looks like the CI/CD failures for Celeborn SBT CI are not related to my C++ Code changes. It looks flaky to me. I also see the exact workflows passed in my previous commit it ran for: 41ac02176d9bb00c000924218c7a2587002d9e43 I also ran the tests locally: ``` [info] Passed: Total 10, Failed 0, Errors 0, Passed 10 [info] Passed: Total 11, Failed 0, Errors 0, Passed 11 [info] ScalaTest [info] Run completed in 1 hour, 16 seconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [info] Passed: Total 26, Failed 0, Errors 0, Passed 26 [info] Run completed in 1 hour, 1 second. [info] Total number of tests run: 111 [info] Suites: completed 28, aborted 0 [info] Tests: succeeded 111, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 3737 s (01:02:17), completed Dec 29, 2025 9:50:22 PM ``` @HolyLow let me know if there are any other concerns with my PR that you would like me to address. Thank you for your review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3692965501 @HolyLow is it safe to ignore the CI/CD Failures for Celeborn CI. All of them are due to: `curl: (28) Failed to connect to archive.apache.org port 443 after 135159 ms: Connection timed out ` As for the Celeborn SBT CI failures, I will look into those. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3691586108 @HolyLow are you able to approve the github workflows again please? I've addressed all comments/responded to ones I did not make any code changes for. C++ unit tests pass, and I've applied clang format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2647140768
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -162,6 +162,10 @@ CelebornConf::defaultProperties() {
kShuffleCompressionCodec,
protocol::toString(protocol::CompressionCodec::NONE)),
NUM_PROP(kShuffleCompressionZstdCompressLevel, 1),
+ STR_PROP(kClientPushBufferMaxSize, "64k"),
+ BOOL_PROP(kClientPushMaxBytesSizeInFlightEnabled, false),
+ NONE_PROP(kClientPushMaxBytesSizeInFlightTotal),
Review Comment:
my understanding is that NONE_PROP is correct. It matches Java's
.createOptional behavior.
the property is optional, and the default is computed dynamically when not
set.
Do you believe it should be something else other than NONE_PROP? I feel like
NONE_PROP is good because these parameters are optional.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2647139234
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -267,6 +271,41 @@ long CelebornConf::clientPushLimitInFlightSleepDeltaMs()
const {
optionalProperty(kClientPushLimitInFlightSleepDeltaMs).value());
}
+int CelebornConf::clientPushBufferMaxSize() const {
+ return toCapacity(
+ optionalProperty(kClientPushBufferMaxSize).value(), CapacityUnit::BYTE);
+}
+
+bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
+ return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
+ "true";
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const {
+ auto optionalValue = optionalProperty(kClientPushMaxBytesSizeInFlightTotal);
+ long maxBytesSizeInFlight =
Review Comment:
It's exactly how the java client function is:
```
def clientPushMaxBytesSizeInFlightTotal: Long = {
val maxBytesSizeInFlight =
get(CLIENT_PUSH_MAX_BYTES_SIZE_IN_FLIGHT_TOTAL).getOrElse(0L)
if (clientPushMaxBytesSizeInFlightEnabled && maxBytesSizeInFlight > 0L) {
maxBytesSizeInFlight
} else {
clientPushMaxReqsInFlightTotal * clientPushBufferMaxSize
}
}
```
Any suggestions for how to simplify it @HolyLow ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2647138332
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -267,6 +271,41 @@ long CelebornConf::clientPushLimitInFlightSleepDeltaMs()
const {
optionalProperty(kClientPushLimitInFlightSleepDeltaMs).value());
}
+int CelebornConf::clientPushBufferMaxSize() const {
+ return toCapacity(
+ optionalProperty(kClientPushBufferMaxSize).value(), CapacityUnit::BYTE);
+}
+
+bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
+ return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
+ "true";
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const {
+ auto optionalValue = optionalProperty(kClientPushMaxBytesSizeInFlightTotal);
+ long maxBytesSizeInFlight =
+ optionalValue.has_value() ? toCapacity(optionalValue.value(),
CapacityUnit::BYTE) : 0L;
+ if (clientPushMaxBytesSizeInFlightEnabled() && maxBytesSizeInFlight > 0L) {
+return maxBytesSizeInFlight;
+ }
+ // Default: maxReqsInFlightTotal * bufferMaxSize
+ return static_cast(clientPushMaxReqsInFlightTotal()) *
+ clientPushBufferMaxSize();
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightPerWorker() const {
+ auto optionalValue =
+ optionalProperty(kClientPushMaxBytesSizeInFlightPerWorker);
+ long maxBytesSizeInFlight =
Review Comment:
any suggestions for how to simplify it @HolyLow ? I see in the java client,
this is how the function is:
```
def clientPushMaxBytesSizeInFlightPerWorker: Long = {
val maxBytesSizeInFlight =
get(CLIENT_PUSH_MAX_BYTES_SIZE_IN_FLIGHT_PERWORKER).getOrElse(0L)
if (clientPushMaxBytesSizeInFlightEnabled && maxBytesSizeInFlight > 0L) {
maxBytesSizeInFlight
} else {
clientPushMaxReqsInFlightPerWorker * clientPushBufferMaxSize
}
}
```
The logic is similar
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2647138332
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -267,6 +271,41 @@ long CelebornConf::clientPushLimitInFlightSleepDeltaMs()
const {
optionalProperty(kClientPushLimitInFlightSleepDeltaMs).value());
}
+int CelebornConf::clientPushBufferMaxSize() const {
+ return toCapacity(
+ optionalProperty(kClientPushBufferMaxSize).value(), CapacityUnit::BYTE);
+}
+
+bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
+ return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
+ "true";
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const {
+ auto optionalValue = optionalProperty(kClientPushMaxBytesSizeInFlightTotal);
+ long maxBytesSizeInFlight =
+ optionalValue.has_value() ? toCapacity(optionalValue.value(),
CapacityUnit::BYTE) : 0L;
+ if (clientPushMaxBytesSizeInFlightEnabled() && maxBytesSizeInFlight > 0L) {
+return maxBytesSizeInFlight;
+ }
+ // Default: maxReqsInFlightTotal * bufferMaxSize
+ return static_cast(clientPushMaxReqsInFlightTotal()) *
+ clientPushBufferMaxSize();
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightPerWorker() const {
+ auto optionalValue =
+ optionalProperty(kClientPushMaxBytesSizeInFlightPerWorker);
+ long maxBytesSizeInFlight =
Review Comment:
any suggestions for how to simplify it @HolyLow ? I see in the java client,
this is how the function is:
```
def clientPushMaxBytesSizeInFlightTotal: Long = {
val maxBytesSizeInFlight = get(...).getOrElse(0L) // Get value or default
to 0L
if (clientPushMaxBytesSizeInFlightEnabled && maxBytesSizeInFlight > 0L) {
maxBytesSizeInFlight
} else {
clientPushMaxReqsInFlightTotal * clientPushBufferMaxSize // Calculated
default
}
}
```
The logic is similar
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3688734770 most recent commit should fix C++ unit test errors and clang formatting issues, next commit will address remaining comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3688435533 Thank you for comments @HolyLow wiil address them soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639112073
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -267,6 +271,41 @@ long CelebornConf::clientPushLimitInFlightSleepDeltaMs()
const {
optionalProperty(kClientPushLimitInFlightSleepDeltaMs).value());
}
+int CelebornConf::clientPushBufferMaxSize() const {
+ return toCapacity(
+ optionalProperty(kClientPushBufferMaxSize).value(), CapacityUnit::BYTE);
+}
+
+bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
+ return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
+ "true";
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const {
+ auto optionalValue = optionalProperty(kClientPushMaxBytesSizeInFlightTotal);
+ long maxBytesSizeInFlight =
Review Comment:
ditto. Why is this so complicated?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639110842
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -267,6 +271,41 @@ long CelebornConf::clientPushLimitInFlightSleepDeltaMs()
const {
optionalProperty(kClientPushLimitInFlightSleepDeltaMs).value());
}
+int CelebornConf::clientPushBufferMaxSize() const {
+ return toCapacity(
+ optionalProperty(kClientPushBufferMaxSize).value(), CapacityUnit::BYTE);
+}
+
+bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
+ return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
Review Comment:
I think this is not consistent to existing code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639110141
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -267,6 +271,41 @@ long CelebornConf::clientPushLimitInFlightSleepDeltaMs()
const {
optionalProperty(kClientPushLimitInFlightSleepDeltaMs).value());
}
+int CelebornConf::clientPushBufferMaxSize() const {
+ return toCapacity(
+ optionalProperty(kClientPushBufferMaxSize).value(), CapacityUnit::BYTE);
+}
+
+bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
+ return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
+ "true";
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const {
+ auto optionalValue = optionalProperty(kClientPushMaxBytesSizeInFlightTotal);
+ long maxBytesSizeInFlight =
+ optionalValue.has_value() ? toCapacity(optionalValue.value(),
CapacityUnit::BYTE) : 0L;
+ if (clientPushMaxBytesSizeInFlightEnabled() && maxBytesSizeInFlight > 0L) {
+return maxBytesSizeInFlight;
+ }
+ // Default: maxReqsInFlightTotal * bufferMaxSize
+ return static_cast(clientPushMaxReqsInFlightTotal()) *
+ clientPushBufferMaxSize();
+}
+
+long CelebornConf::clientPushMaxBytesSizeInFlightPerWorker() const {
+ auto optionalValue =
+ optionalProperty(kClientPushMaxBytesSizeInFlightPerWorker);
+ long maxBytesSizeInFlight =
Review Comment:
Why should this be that complicated?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639108322
##
cpp/celeborn/client/writer/PushState.h:
##
@@ -68,15 +73,28 @@ class PushState {
std::atomic currBatchId_{1};
std::atomic totalInflightReqs_{0};
+ std::atomic totalInflightBytes_{0};
const long waitInflightTimeoutMs_;
const long deltaMs_;
const std::unique_ptr pushStrategy_;
const int maxInFlightReqsTotal_;
+ const bool maxInFlightBytesSizeEnabled_;
+ const long maxInFlightBytesSizeTotal_;
+ const long maxInFlightBytesSizePerWorker_;
utils::ConcurrentHashMap<
std::string,
std::shared_ptr>>
inflightBatchesPerAddress_;
+ std::optional
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639106529
##
cpp/celeborn/client/tests/PushStateTest.cpp:
##
@@ -153,3 +192,67 @@ TEST_F(PushStateTest, throwException) {
}
EXPECT_TRUE(exceptionThrowed);
}
+
+TEST_F(PushStateBytesSizeTest, limitMaxInFlightByBytesSize) {
+ const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
+ const int expectedAllowedBatches = 2;
+ const int addBatchCalls = expectedAllowedBatches + 1;
+ std::vector addBatchMarks(addBatchCalls, false);
+
+ std::thread addBatchThread([&]() {
+for (auto i = 0; i < addBatchCalls; i++) {
+ pushState_->addBatch(i, batchSize_, hostAndPushPort);
+ auto result = pushState_->limitMaxInFlight(hostAndPushPort);
+ addBatchMarks[i] = true;
+ if (i < expectedAllowedBatches) {
+EXPECT_FALSE(result) << "Batch " << i << " should be within limits";
+ }
+}
+ });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
+ for (auto i = 0; i < expectedAllowedBatches; i++) {
+EXPECT_TRUE(addBatchMarks[i]) << "Batch " << i << " should have completed";
+ }
+
+ pushState_->removeBatch(0, hostAndPushPort);
+ addBatchThread.join();
+ EXPECT_TRUE(addBatchMarks[expectedAllowedBatches]);
+}
+
+TEST_F(PushStateBytesSizeTest, limitMaxInFlightByTotalBytesSize) {
+ const std::string hostAndPushPort1 = "xx.xx.xx.xx:8080";
+ const std::string hostAndPushPort2 = "yy.yy.yy.yy:8080";
+
+ pushState_->addBatch(0, batchSize_, hostAndPushPort1);
+ EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort1));
+
+ pushState_->addBatch(1, batchSize_, hostAndPushPort2);
+ EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort2));
+
+ std::atomic thirdBatchCompleted{false};
+ std::thread addBatchThread([&]() {
+pushState_->addBatch(2, batchSize_, hostAndPushPort1);
+pushState_->limitMaxInFlight(hostAndPushPort1);
+thirdBatchCompleted = true;
+ });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(pushSleepDeltaMs_));
+ EXPECT_FALSE(thirdBatchCompleted.load())
+ << "Third batch should be blocked due to total bytes limit";
+
+ pushState_->removeBatch(0, hostAndPushPort1);
+ addBatchThread.join();
+
+ EXPECT_TRUE(thirdBatchCompleted.load());
+}
+
+TEST_F(PushStateBytesSizeTest, cleanupClearsBytesSizeTracking) {
+ const std::string hostAndPushPort = "xx.xx.xx.xx:8080";
+
+ pushState_->addBatch(0, batchSize_, hostAndPushPort);
+ pushState_->addBatch(1, batchSize_, hostAndPushPort);
+ pushState_->cleanup();
+
+ EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort));
+}
Review Comment:
Should be an empty line at end of file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639104815
##
cpp/celeborn/client/tests/PushStateTest.cpp:
##
@@ -46,6 +46,45 @@ class PushStateTest : public testing::Test {
static constexpr int pushTimeoutMs_ = 100;
static constexpr int pushSleepDeltaMs_ = 10;
static constexpr int maxReqsInFlight_ = 2;
+ static constexpr int defaultBatchSize_ = 1024;
+};
+
+class PushStateBytesSizeTest : public testing::Test {
+ protected:
+void SetUp() override {
+ conf::CelebornConf conf;
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushLimitInFlightTimeoutMs,
+ std::to_string(pushTimeoutMs_));
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushLimitInFlightSleepDeltaMs,
+ std::to_string(pushSleepDeltaMs_));
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushMaxReqsInFlightTotal, "100");
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushMaxReqsInFlightPerWorker, "100");
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushMaxBytesSizeInFlightEnabled, "true");
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushMaxBytesSizeInFlightTotal,
+ std::to_string(maxBytesSizeTotal_));
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushMaxBytesSizeInFlightPerWorker,
+ std::to_string(maxBytesSizePerWorker_));
+ conf.registerProperty(
+ conf::CelebornConf::kClientPushBufferMaxSize,
+ std::to_string(bufferMaxSize_));
+
+ pushState_ = std::make_unique(conf);
+}
+
+std::unique_ptr pushState_;
+static constexpr int pushTimeoutMs_ = 100;
Review Comment:
It might be more consistent if we change the naming of static constexpr
namings to kXxxXxx_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639101557
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -309,4 +348,4 @@ int CelebornConf::shuffleCompressionZstdCompressLevel()
const {
optionalProperty(kShuffleCompressionZstdCompressLevel).value());
}
} // namespace conf
-} // namespace celeborn
Review Comment:
Why is this change necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639099966
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -162,6 +162,10 @@ CelebornConf::defaultProperties() {
kShuffleCompressionCodec,
protocol::toString(protocol::CompressionCodec::NONE)),
NUM_PROP(kShuffleCompressionZstdCompressLevel, 1),
+ STR_PROP(kClientPushBufferMaxSize, "64k"),
Review Comment:
How will this be translated? Is this option tested?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639093828
##
cpp/celeborn/client/writer/PushState.cpp:
##
@@ -147,9 +215,18 @@ std::optional PushState::getExceptionMsg()
const {
}
void PushState::cleanup() {
+ LOG(INFO) << "Cleanup " << totalInflightReqs_.load() << " requests in
flight.";
+ cleaned_ = true;
inflightBatchesPerAddress_.clear();
totalInflightReqs_ = 0;
pushStrategy_->clear();
+
+ if (maxInFlightBytesSizeEnabled_) {
+LOG(INFO) << "Cleanup " << totalInflightBytes_.load() << " bytes in
flight.";
Review Comment:
Pls do not use LOG(INFO) here which is too verbose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639097118
##
cpp/celeborn/client/writer/PushState.h:
##
@@ -68,15 +73,28 @@ class PushState {
std::atomic currBatchId_{1};
std::atomic totalInflightReqs_{0};
+ std::atomic totalInflightBytes_{0};
const long waitInflightTimeoutMs_;
const long deltaMs_;
const std::unique_ptr pushStrategy_;
const int maxInFlightReqsTotal_;
+ const bool maxInFlightBytesSizeEnabled_;
+ const long maxInFlightBytesSizeTotal_;
+ const long maxInFlightBytesSizePerWorker_;
utils::ConcurrentHashMap<
std::string,
std::shared_ptr>>
inflightBatchesPerAddress_;
+ std::optional>>>
+ inflightBytesSizePerAddress_;
+ std::optional>
+ inflightBatchBytesSizes_;
folly::Synchronized> exception_;
+ volatile bool cleaned_{false};
Review Comment:
Why should this be volatile? The volatile keyword in CPP has different
meaning from Java.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
HolyLow commented on code in PR #3568:
URL: https://github.com/apache/celeborn/pull/3568#discussion_r2639092312
##
cpp/celeborn/conf/CelebornConf.cpp:
##
@@ -162,6 +162,10 @@ CelebornConf::defaultProperties() {
kShuffleCompressionCodec,
protocol::toString(protocol::CompressionCodec::NONE)),
NUM_PROP(kShuffleCompressionZstdCompressLevel, 1),
+ STR_PROP(kClientPushBufferMaxSize, "64k"),
+ BOOL_PROP(kClientPushMaxBytesSizeInFlightEnabled, false),
+ NONE_PROP(kClientPushMaxBytesSizeInFlightTotal),
Review Comment:
Why is this NONE_PROP?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum commented on PR #3568: URL: https://github.com/apache/celeborn/pull/3568#issuecomment-3679668101 @HolyLow @SteNicholas @RexXiong @FMX Could you please help review this PR? Thanks a lot for helping improve this as needed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient [celeborn]
afterincomparableyum closed pull request #3567: [CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient URL: https://github.com/apache/celeborn/pull/3567 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
