[geode-kafka-connector] branch dependabot/maven/org.apache.geode-geode-core-1.15.0 created (now d071717)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/org.apache.geode-geode-core-1.15.0 in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git at d071717 Bump geode-core from 1.12.6 to 1.15.0 No new revisions were added by this update.
[geode] branch support/1.15 updated (62e60b5062 -> d1910cf1ee)
This is an automated email from the ASF dual-hosted git repository. mkevo pushed a change to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git from 62e60b5062 GEODE-10420: Finish distribute() work if interrupted (#7854) new bc47b30c0b GEODE-10412: Clear expired tombstones during region destroy (#7838) new d1910cf1ee GEODE-10401: Configurable .drf recovery HashMap overflow threshold (#7828) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../cache/versions/TombstoneDUnitTest.java | 42 +++ .../apache/geode/internal/cache/DiskStoreImpl.java | 61 +++-- .../geode/internal/cache/TombstoneService.java | 6 ++- .../OplogEntryIdSetDrfHashSetThresholdTest.java| 62 ++ .../geode/internal/cache/TombstoneServiceTest.java | 46 +--- 5 files changed, 193 insertions(+), 24 deletions(-) create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetDrfHashSetThresholdTest.java
[geode] 02/02: GEODE-10401: Configurable .drf recovery HashMap overflow threshold (#7828)
This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit d1910cf1ee18aae749da3553b3537d74b7ef5cae Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com> AuthorDate: Wed Jul 27 15:08:53 2022 +0200 GEODE-10401: Configurable .drf recovery HashMap overflow threshold (#7828) Configurable with the jvm parameter: gemfire.disk.drfHashMapOverflowThreshold Default value: 805306368 When configured threshold value is reached, then server will overflow to the new hashmap during the recovery of .drf files. Warning: If you set threshold parameter over 805306368, then uneeded delay will happen due to bug in fastutil dependency. --- .../apache/geode/internal/cache/DiskStoreImpl.java | 61 +++-- .../OplogEntryIdSetDrfHashSetThresholdTest.java| 62 ++ 2 files changed, 107 insertions(+), 16 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index 9dee1c1c77..b544ca065f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -180,6 +180,13 @@ public class DiskStoreImpl implements DiskStore { public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME = GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverValuesSync"; + /** + * When configured threshold value is reached, then server will overflow to + * the new hashmap during the recovery of .drf files + */ + public static final String DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME = + GeodeGlossary.GEMFIRE_PREFIX + "disk.drfHashMapOverflowThreshold"; + /** * Allows recovering values for LRU regions. By default values are not recovered for LRU regions * during recovery. @@ -187,6 +194,10 @@ public class DiskStoreImpl implements DiskStore { public static final String RECOVER_LRU_VALUES_PROPERTY_NAME = GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverLruValues"; + static final long DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT = 805306368; + static final long DRF_HASHMAP_OVERFLOW_THRESHOLD = + Long.getLong(DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME, DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT); + boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true); boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false); @@ -3525,31 +3536,49 @@ public class DiskStoreImpl implements DiskStore { } try { -if (id > 0 && id <= 0xL) { - currentInts.get().add((int) id); +if (shouldOverflow(id)) { + overflowToNewHashMap(id); } else { - currentLongs.get().add(id); + if (id > 0 && id <= 0xL) { +this.currentInts.get().add((int) id); + } else { +this.currentLongs.get().add(id); + } } } catch (IllegalArgumentException illegalArgumentException) { // See GEODE-8029. -// Too many entries on the accumulated drf files, overflow and continue. +// Too many entries on the accumulated drf files, overflow next [Int|Long]OpenHashSet and +// continue. +overflowToNewHashMap(id); + } +} + +boolean shouldOverflow(final long id) { + if (id > 0 && id <= 0xL) { +return currentInts.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD; + } else { +return currentLongs.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD; + } +} + +void overflowToNewHashMap(final long id) { + if (DRF_HASHMAP_OVERFLOW_THRESHOLD == DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT) { logger.warn( "There is a large number of deleted entries within the disk-store, please execute an offline compaction."); + } -// Overflow to the next [Int|Long]OpenHashSet and continue. -if (id > 0 && id <= 0xL) { - IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID); - allInts.add(overflownHashSet); - currentInts.set(overflownHashSet); + if (id > 0 && id <= 0xL) { +IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID); +allInts.add(overflownHashSet); +currentInts.set(overflownHashSet); - currentInts.get().add((int) id); -} else { - LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID); - allLongs.add(overflownHashSet); - currentLongs.set(overflownHashSet); +currentInts.get().add((int) id); + } else { +LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID); +allL
[geode] 01/02: GEODE-10412: Clear expired tombstones during region destroy (#7838)
This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit bc47b30c0bb57a9c9437234a8fc8530f1daf14bc Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com> AuthorDate: Mon Aug 29 17:08:26 2022 +0200 GEODE-10412: Clear expired tombstones during region destroy (#7838) * GEODE-10412: Clear expired tombstones during region destroy The issue: During region destroy operation, the expired tombstones aren't cleared when non-expired ones are available. Later, these expired tombstones prevent all other regions' tombstones from being cleared from memory, causing many issues (memory and disk exhaustion). The solution: When a region is destroyed, it must clear all the related expired and non-expired tombstones from memory. * Add distributed test that reproduce the issue * Update after review --- .../cache/versions/TombstoneDUnitTest.java | 42 .../geode/internal/cache/TombstoneService.java | 6 ++- .../geode/internal/cache/TombstoneServiceTest.java | 46 ++ 3 files changed, 86 insertions(+), 8 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java index bbcf0ca6ec..d4bd0cc89b 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java @@ -14,11 +14,13 @@ */ package org.apache.geode.internal.cache.versions; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; import static org.apache.geode.cache.RegionShortcut.REPLICATE; import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT; import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; import static org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType.DuringApplyDelta; import static org.apache.geode.internal.cache.InitialImageOperation.resetAllGIITestHooks; +import static org.apache.geode.internal.cache.TombstoneService.EXPIRED_TOMBSTONE_LIMIT; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; @@ -121,6 +123,35 @@ public class TombstoneDUnitTest implements Serializable { }); } + @Test + public void testTombstoneExpiredAndNonExpiredAreClearedAfterRegionIsDestroyed() { +VM vm0 = VM.getVM(0); + +vm0.invoke(() -> { + // reduce timeout so that tombstone is immediately marked as expired + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 100; + createCacheAndRegion(PARTITION_PERSISTENT); + region.put("K1", "V1"); + region.destroy("K1"); +}); + +vm0.invoke(() -> { + waitForScheduledTombstoneCount(0); + // increase timeout so that next tombstone doesn't expire + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 15; + region.put("K1", "V1"); + region.destroy("K1"); + + region.destroyRegion(); + // force expiry of batch - there is only one expired tombstone at this moment + EXPIRED_TOMBSTONE_LIMIT = 1; +}); + +vm0.invoke(() -> { + createCacheAndRegion(PARTITION_PERSISTENT); + checkExpiredTombstones(0); +}); + } @Test public void testWhenAnOutOfRangeTimeStampIsSeenWeExpireItInReplicateTombstoneSweeper() { @@ -562,6 +593,17 @@ public class TombstoneDUnitTest implements Serializable { } } + private void waitForScheduledTombstoneCount(int count) { +LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME); +await().until(() -> ((InternalCache) cache).getTombstoneService().getSweeper(region).tombstones +.size() == count); + } + + private void checkExpiredTombstones(int count) { +await().until( +() -> ((InternalCache) cache).getTombstoneService().getScheduledTombstoneCount() == count); + } + private void performGC(int count) throws Exception { ((InternalCache) cache).getTombstoneService().forceBatchExpirationForTests(count); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java index 242a3ff20b..dc3532a734 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java @@ -926,7 +926,11 @@ public class TombstoneService { * @return true if predicate ever returned true */ private boolean removeIf(Predicate predicate) { - return remove
[geode] branch support/1.15 updated (5364cce7cd -> 62e60b5062)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a change to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git from 5364cce7cd GEODE-10415: bump dependencies due to vulnerability scan (#7855) new 0bd51e8d5d GEODE-10155: Avoid threads hanging when function execution times-out (#7493) new 73aa4b3f72 GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715) new e72da0d984 GEODE-10323: Add small changes after review (#7819) new 982333d4ba GEODE-10346: Correct description of batch-time-interval in doc. (#7742) new b64cdd1473 GEODE-10348: Fix documentation on conflation (#7746) new 5ee944475a GEODE-10352: Update Ruby version in Geode doc preview tool (#7753) new 3ada8fe80f GEODE-10403: Fix distributed deadlock with stop gw sender (#7830) new 8b751804c5 GEODE-10417: Fix NullPointerException in WAN replication (#7845) new 62e60b5062 GEODE-10420: Finish distribute() work if interrupted (#7854) The 9 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: dev-tools/docker/docs/Dockerfile | 18 +- ...nctionExecutionNoSingleHopDistributedTest.java} | 895 ++--- .../cache/execute/PRClientServerTestBase.java | 351 .../internal/ClusterOperationExecutors.java| 9 +- .../geode/internal/cache/BucketRegionQueue.java| 4 + .../geode/internal/cache/EntryEventImpl.java | 5 +- .../PartitionedRegionFunctionResultSender.java | 77 +- .../internal/cache/wan/AbstractGatewaySender.java | 58 +- .../internal/offheap/MemoryAllocatorImpl.java | 80 +- .../internal/offheap/NonRealTimeStatsUpdater.java | 47 ++ .../geode/internal/offheap/OffHeapStorage.java | 19 +- .../internal/cache/BucketRegionQueueJUnitTest.java | 56 +- .../PartitionedRegionFunctionResultSenderTest.java | 174 .../cache/wan/AbstractGatewaySenderTest.java | 170 .../serial/SerialGatewaySenderQueueJUnitTest.java | 28 +- .../internal/offheap/MemoryAllocatorJUnitTest.java | 14 +- .../internal/offheap/OffHeapHelperJUnitTest.java | 3 +- .../OffHeapRegionEntryHelperInstanceTest.java | 3 +- .../internal/offheap/OffHeapStorageJUnitTest.java | 4 +- .../OffHeapStorageNonRuntimeStatsJUnitTest.java| 3 +- .../offheap/OffHeapStoredObjectJUnitTest.java | 3 +- .../conflate_multisite_gateway_queue.html.md.erb | 4 +- geode-docs/images/MultiSite-4.gif | Bin 4991 -> 0 bytes geode-docs/images_svg/MultiSite-4.svg | 4 + geode-docs/reference/topics/cache_xml.html.md.erb | 4 +- .../gfsh/command-pages/alter.html.md.erb | 2 +- .../gfsh/command-pages/create.html.md.erb | 4 +- .../internal/cache/functions/TestFunction.java | 23 +- .../geode/test/junit/rules/ServerStarterRule.java | 9 + .../sanctioned-geode-dunit-serializables.txt | 2 +- .../geode/internal/cache/wan/WANTestBase.java | 7 + ...llelGatewaySenderOperationsDistributedTest.java | 86 +- 32 files changed, 1379 insertions(+), 787 deletions(-) rename geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/{PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java => PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java} (53%) create mode 100644 geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java delete mode 100644 geode-docs/images/MultiSite-4.gif create mode 100644 geode-docs/images_svg/MultiSite-4.svg
[geode] 01/09: GEODE-10155: Avoid threads hanging when function execution times-out (#7493)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit 0bd51e8d5db2eb0c0eb77bae0d8f2d312ae200cf Author: Alberto Gomez AuthorDate: Mon Jun 6 20:18:46 2022 +0200 GEODE-10155: Avoid threads hanging when function execution times-out (#7493) * GEODE-10155: Avoid threads hanging when function execution times-out * GEODE-10155: Updated after review * GEODE-10155: More changes after review * GEODE-10155: Changes after more reviews * GEODE-10155: Some more changes after review * GEODE-10155: More changes after review * GEODE-10155: More clean-up after review --- ...nctionExecutionNoSingleHopDistributedTest.java} | 895 ++--- .../cache/execute/PRClientServerTestBase.java | 351 .../PartitionedRegionFunctionResultSender.java | 77 +- .../PartitionedRegionFunctionResultSenderTest.java | 174 .../internal/cache/functions/TestFunction.java | 23 +- .../geode/test/junit/rules/ServerStarterRule.java | 9 + .../sanctioned-geode-dunit-serializables.txt | 2 +- 7 files changed, 864 insertions(+), 667 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java similarity index 53% rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java index a76a7157b8..1c66b3d2e0 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java @@ -14,10 +14,9 @@ */ package org.apache.geode.internal.cache.execute; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.Serializable; import java.util.ArrayList; @@ -27,6 +26,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.apache.logging.log4j.Logger; import org.junit.Test; @@ -37,6 +38,7 @@ import org.junit.runners.Parameterized.UseParametersRunnerFactory; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionAdapter; @@ -50,11 +52,10 @@ import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.functions.TestFunction; import org.apache.geode.logging.internal.log4j.api.LogService; -import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.SerializableRunnableIF; -import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.dunit.WaitCriterion; import org.apache.geode.test.junit.categories.ClientServerTest; @@ -64,7 +65,7 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor @Category({ClientServerTest.class, FunctionServiceTest.class}) @RunWith(Parameterized.class) @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) -public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest +public class PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest extends PRClientServerTestBase { private static final Logger logger = LogService.getLogger(); @@ -77,7 +78,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest private static final int retryCount = 0; - public PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest() { + public PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest() { super(); } @@ -87,10 +88,10 @@ public cla
[geode] 06/09: GEODE-10352: Update Ruby version in Geode doc preview tool (#7753)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit 5ee944475a9fc2dd6934276dfcc6b1724f636bcd Author: Alberto Gomez AuthorDate: Fri Jun 3 20:50:13 2022 +0200 GEODE-10352: Update Ruby version in Geode doc preview tool (#7753) The script to preview the documentation of Geode (./preview-user-guide.sh) is not working anymore. The following error appears while running it (unless you have a previously downloaded geodedocs/temp docker image in your local docker repo): ERROR: Error installing elasticsearch: The last version of faraday (>= 0) to support your Ruby & RubyGems was 1.10.0. Try installing it with `gem install faraday -v 1.10.0` and then running the current command again faraday requires Ruby version >= 2.6. The current ruby version is 2.5.9.229. That error prevents the preview script to work. It is needed to update the docker image used to preview the documentation to use a Ruby version >= 2.6. The versions of other gems also need to be updated after the change of the Ruby version. --- dev-tools/docker/docs/Dockerfile | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dev-tools/docker/docs/Dockerfile b/dev-tools/docker/docs/Dockerfile index 31f841cb6f..7833db9629 100644 --- a/dev-tools/docker/docs/Dockerfile +++ b/dev-tools/docker/docs/Dockerfile @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM ruby:2.5 +FROM ruby:2.6.8 LABEL Vendor="Apache Geode" LABEL version=unstable @@ -25,16 +25,16 @@ RUN curl -fsSL https://deb.nodesource.com/setup_16.x | bash - ; \ apt-get install -y nodejs RUN gem install bundler:1.17.3 \ rake multi_json:1.13.1 \ -elasticsearch:2.0.2 \ +elasticsearch:7.5.0 \ multipart-post:2.0.0 \ -faraday:0.15.4 \ +faraday:0.17.4 \ libv8:3.16.14.15 \ -mini_portile2:2.5.0 \ -racc:1.5.2 \ -nokogiri:1.11.2 \ -mimemagic:0.3.9 \ -puma:4.3.8 \ -rack:2.1.4 \ +mini_portile2:2.8.0 \ +racc:1.6.0 \ +nokogiri:1.13.3 \ +mimemagic:0.3.10 \ +puma:5.6.2 \ +rack:2.2.3 \ smtpapi:0.1.0 \ sendgrid-ruby:1.1.6 \ therubyracer:0.12.2
[geode] 03/09: GEODE-10323: Add small changes after review (#7819)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit e72da0d984a89203a3a3d5746d509d2c2c1409c1 Author: Alberto Gomez AuthorDate: Fri Jul 15 08:28:09 2022 +0200 GEODE-10323: Add small changes after review (#7819) --- .../geode/internal/offheap/MemoryAllocatorImpl.java| 18 ++ .../internal/offheap/MemoryAllocatorJUnitTest.java | 9 + .../geode/internal/offheap/OffHeapHelperJUnitTest.java | 3 ++- .../offheap/OffHeapRegionEntryHelperInstanceTest.java | 3 ++- .../internal/offheap/OffHeapStorageJUnitTest.java | 3 ++- .../internal/offheap/OffHeapStoredObjectJUnitTest.java | 3 ++- 6 files changed, 27 insertions(+), 12 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java index 70f8f4ab0e..d78bc40fb8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java @@ -187,7 +187,7 @@ public class MemoryAllocatorImpl implements MemoryAllocator { } } return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null, -null, () -> null); +null, () -> new DummyNonRealTimeStatsUpdater()); } private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats, @@ -240,9 +240,7 @@ public class MemoryAllocatorImpl implements MemoryAllocator { } void start() { -if (nonRealTimeStatsUpdater != null) { - nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs); -} +nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs); } public List getLostChunks(InternalCache cache) { @@ -540,4 +538,16 @@ public class MemoryAllocatorImpl implements MemoryAllocator { public MemoryInspector getMemoryInspector() { return memoryInspector; } + + public static class DummyNonRealTimeStatsUpdater extends NonRealTimeStatsUpdater { +public DummyNonRealTimeStatsUpdater() { + super(null); +} + +@Override +void start(int frequency) {} + +@Override +void stop() {}; + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java index 2626fd051b..b080b7d14c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.offheap; +import static org.apache.geode.internal.offheap.MemoryAllocatorImpl.DummyNonRealTimeStatsUpdater; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -74,7 +75,7 @@ public class MemoryAllocatorJUnitTest { try { MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, size -> { throw new OutOfMemoryError("expected"); -}, null, () -> null); +}, null, () -> new DummyNonRealTimeStatsUpdater()); } catch (OutOfMemoryError expected) { } assertTrue(listener.isClosed()); @@ -99,7 +100,7 @@ public class MemoryAllocatorJUnitTest { } }; MemoryAllocatorImpl.create(listener, stats, 10, 950, MAX_SLAB_SIZE, null, factory, null, -() -> null); +() -> new MemoryAllocatorImpl.DummyNonRealTimeStatsUpdater()); } catch (OutOfMemoryError expected) { } assertTrue(listener.isClosed()); @@ -111,7 +112,7 @@ public class MemoryAllocatorJUnitTest { SlabFactory factory = SlabImpl::new; MemoryAllocator ma = MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, factory, null, - () -> null); + () -> new DummyNonRealTimeStatsUpdater()); try { assertFalse(listener.isClosed()); assertFalse(stats.isClosed()); @@ -138,7 +139,7 @@ public class MemoryAllocatorJUnitTest { stats2 = new NullOffHeapMemoryStats(); MemoryAllocator ma2 = MemoryAllocatorImpl.create(listener, stats2, 10, 950, 100, null, factory, null, -() -> null); +() -> new DummyNonRealTimeStatsUpdater()); assertSame(ma, ma2); assertTrue(stats.isClosed()); assertFalse(listener.isClosed()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java index 7019848647..8d649d0d1b 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offh
[geode] 09/09: GEODE-10420: Finish distribute() work if interrupted (#7854)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit 62e60b5062f4c164f3a8af0b52fbc1d5e39fb6aa Author: Alberto Gomez AuthorDate: Mon Sep 12 15:12:14 2022 +0200 GEODE-10420: Finish distribute() work if interrupted (#7854) It is possible that an event of which a gateway sender is to be notified is lost if during the process the thread is interrupted. The reason is that the distribute() method in the AbstractGatewaySender when it catches the InterruptedException at some point, just returns, but does not put the event in the queue and neither drops it. The fix consists of handling the event correctly (put it in the queue or drop it) if the InterruptedException is caught but when the method returns set again the interrupt flag so that the caller is aware. --- .../geode/internal/cache/EntryEventImpl.java | 5 +- .../internal/cache/wan/AbstractGatewaySender.java | 20 ++- .../cache/wan/AbstractGatewaySenderTest.java | 170 + 3 files changed, 186 insertions(+), 9 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index 16adbeca7a..6a521becbe 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -340,8 +340,9 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, op = other.op; distributedMember = other.distributedMember; filterInfo = other.filterInfo; -keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.keyInfo) -: new KeyInfo(other.keyInfo); +keyInfo = +other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.getKeyInfo()) +: new KeyInfo(other.getKeyInfo()); if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) { keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument( (GatewaySenderEventCallbackArgument) other.getRawCallbackArgument(; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 47fa99e4a0..73d85dd5ac 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1039,6 +1039,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di List allRemoteDSIds, boolean isLastEventInTransaction) { final boolean isDebugEnabled = logger.isDebugEnabled(); +boolean wasInterrupted = false; // released by this method or transfers ownership to TmpQueueEvent @Released @@ -1153,15 +1154,17 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } } if (enqueuedAllTempQueueEvents) { - try { -while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { - if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { -return; + while (true) { +try { + while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { +if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { + return; +} } + break; +} catch (InterruptedException e) { + wasInterrupted = true; } - } catch (InterruptedException e) { -Thread.currentThread().interrupt(); -return; } } } @@ -1210,6 +1213,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di if (freeClonedEvent) { clonedEvent.release(); // fix for bug 48035 } + if (wasInterrupted) { +Thread.currentThread().interrupt(); + } } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java index aac5f0d3c0..d57ba5f999 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java @@ -18,15 +18,28 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java
[geode] 05/09: GEODE-10348: Fix documentation on conflation (#7746)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit b64cdd1473de46ec32064e06fb7349f499347676 Author: Alberto Gomez AuthorDate: Thu Jun 2 19:55:19 2022 +0200 GEODE-10348: Fix documentation on conflation (#7746) The Geode documentation states on conflation: "When an update is added to a queue that has conflation enabled, if there is already an update message in the queue for the entry key, then the existing message assumes the value of the new update and the new update is dropped, as shown here for key A." Nevertheless, that is not correct. The actual behavior is the following: "When an update is added to a queue that has conflation enabled, if there is already an update message in the queue for the entry key, then the existing message is removed and the new update is added to the end of the queue, as shown here for key A." The text has been updated as well as the the figure with the example. --- .../events/conflate_multisite_gateway_queue.html.md.erb | 4 ++-- geode-docs/images/MultiSite-4.gif| Bin 4991 -> 0 bytes geode-docs/images_svg/MultiSite-4.svg| 4 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/geode-docs/developing/events/conflate_multisite_gateway_queue.html.md.erb b/geode-docs/developing/events/conflate_multisite_gateway_queue.html.md.erb index 5fa8df905c..4d2ed5f0bc 100644 --- a/geode-docs/developing/events/conflate_multisite_gateway_queue.html.md.erb +++ b/geode-docs/developing/events/conflate_multisite_gateway_queue.html.md.erb @@ -25,9 +25,9 @@ Conflating a queue improves distribution performance. When conflation is enabled **Note:** Do not use conflation if your receiving applications depend on the specific ordering of entry modifications, or if they need to be notified of every change to an entry. -Conflation is most useful when a single entry is updated frequently, but other sites only need to know the current value of the entry (rather than the value of each update). When an update is added to a queue that has conflation enabled, if there is already an update message in the queue for the entry key, then the existing message assumes the value of the new update and the new update is dropped, as shown here for key A. +Conflation is most useful when a single entry is updated frequently, but other sites only need to know the current value of the entry (rather than the value of each update). When an update is added to a queue that has conflation enabled, if there is already an update message in the queue for the entry key, then the existing message is removed and the new update is added to the end of the queue, as shown here for key A. - + **Note:** This method of conflation is different from the one used for server-to-client subscription queue conflation and peer-to-peer distribution within a cluster. diff --git a/geode-docs/images/MultiSite-4.gif b/geode-docs/images/MultiSite-4.gif deleted file mode 100644 index c4a4b7d16f..00 Binary files a/geode-docs/images/MultiSite-4.gif and /dev/null differ diff --git a/geode-docs/images_svg/MultiSite-4.svg b/geode-docs/images_svg/MultiSite-4.svg new file mode 100644 index 00..e58a4e6a02 --- /dev/null +++ b/geode-docs/images_svg/MultiSite-4.svg @@ -0,0 +1,4 @@ + + +http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd";> +http://www.w3.org/2000/svg"; xmlns:xlink="http://www.w3.org/1999/xlink"; version="1.1" width="624px" height="391px" viewBox="-0.5 -0.5 624 391" content="
[geode] 08/09: GEODE-10417: Fix NullPointerException in WAN replication (#7845)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit 8b751804c5110507b156cc42f4beda713f2fcccd Author: Alberto Gomez AuthorDate: Wed Sep 7 13:32:43 2022 +0200 GEODE-10417: Fix NullPointerException in WAN replication (#7845) * GEODE-10417: Fix NullPointerException in WAN replication When the WAN group-transa$ction-events feature is enabled in a parallel gateway sender, it is possible to get a NullPointerException when retrieving events from the queue to complete a transaction if the event in the queue is null. If this situation is reached then the gateway sender dispatcher will not dispatch queue events anymore and therefore the WAN replication will not progress. This happens because the predicates that check if elements in the queue contain a transactionId are not protected against the event being null. A null check has been added before the predicates are invoked so that in case of a null event, the predicate is not invoked and the event is skipped from the checking. * GEODE-10417: Change assertEquals to assertThat --- .../geode/internal/cache/BucketRegionQueue.java| 4 ++ .../internal/cache/BucketRegionQueueJUnitTest.java | 56 +++--- .../serial/SerialGatewaySenderQueueJUnitTest.java | 28 +-- 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 71be25a4e8..1a7746610c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -466,6 +466,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { List elementsMatching = new ArrayList<>(); for (final Object key : eventSeqNumDeque) { Object object = optimalGet(key); +if (object == null) { + continue; +} + if (matchingPredicate.test(object)) { elementsMatching.add(object); eventSeqNumDeque.remove(key); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java index 6643695ea0..1dcbdef6c5 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java @@ -17,7 +17,6 @@ package org.apache.geode.internal.cache; import static org.apache.geode.cache.Region.SEPARATOR; import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -175,22 +174,65 @@ public class BucketRegionQueueJUnitTest { List objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); -assertEquals(2, objects.size()); -assertEquals(objects, Arrays.asList(event1, event3)); +assertThat(objects.size()).isEqualTo(2); +assertThat(objects).isEqualTo(Arrays.asList(event1, event3)); objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); -assertEquals(1, objects.size()); -assertEquals(objects, Arrays.asList(event7)); +assertThat(objects.size()).isEqualTo(1); +assertThat(objects).isEqualTo(Arrays.asList(event7)); hasTransactionIdPredicate = ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2); objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); -assertEquals(2, objects.size()); -assertEquals(objects, Arrays.asList(event2, event4)); +assertThat(objects.size()).isEqualTo(2); +assertThat(objects).isEqualTo(Arrays.asList(event2, event4)); } + @Test + public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesObjectReadNullDoesNotThrowException() + throws ForceReattemptException { + ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); + +TransactionId tx1 = new TXId(null, 1); +TransactionId tx2 = new TXId(null, 2); +TransactionId tx3 = new TXId(null, 3); + +GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false); +GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false); +GatewaySenderEventImpl event2 = createMockGa
[geode] 07/09: GEODE-10403: Fix distributed deadlock with stop gw sender (#7830)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit 3ada8fe80f7569874f643c32d585658e98f4615a Author: Alberto Gomez AuthorDate: Thu Aug 18 09:42:27 2022 +0200 GEODE-10403: Fix distributed deadlock with stop gw sender (#7830) There is a distributed deadlock that can appear when stopping the gateway sender if a race condition happens in which the stop gateway sender command gets blocked indefinitely trying to get the size of the queue from remote peers (ParallelGatewaySenderQueue.size() call) and also one call to store one event in the queue tries to get the lifecycle lock (acquired by the gateway sender command). These two calls could get into a deadlock under heavy load and make the system unresponsive for any traffic request (get, put, ...). In order to avoid it, in the storage of the event in the gateway sender queue (AbstractGatewaySender.distribute() call), instead to trying to get the lifecycle lock without any timeout, a try with a timeout is added. If the try returns false it is checked if the gateway sender is running. If it is not running, the event is dropped and there is no need to get the lock. Otherwise, the lifecycle lock acquire is retried until it succeeds or the gateway sender is stopped. --- .../internal/ClusterOperationExecutors.java| 9 ++- .../internal/cache/wan/AbstractGatewaySender.java | 52 + .../geode/internal/cache/wan/WANTestBase.java | 7 ++ ...llelGatewaySenderOperationsDistributedTest.java | 86 +- 4 files changed, 133 insertions(+), 21 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java index ba25e3b539..7c45bbb9e3 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java @@ -25,12 +25,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; +import org.apache.geode.annotations.internal.MutableForTesting; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage; import org.apache.geode.internal.logging.CoreLoggingExecutors; @@ -167,6 +169,8 @@ public class ClusterOperationExecutors implements OperationExecutors { private SerialQueuedExecutorPool serialQueuedExecutorPool; + @MutableForTesting + public static final AtomicInteger maxPrThreadsForTest = new AtomicInteger(-1); ClusterOperationExecutors(DistributionStats stats, InternalDistributedSystem system) { @@ -252,10 +256,11 @@ public class ClusterOperationExecutors implements OperationExecutors { this::doWaitingThread, stats.getWaitingPoolHelper(), threadMonitor); -if (MAX_PR_THREADS > 1) { +int maxPrThreads = maxPrThreadsForTest.get() > 0 ? maxPrThreadsForTest.get() : MAX_PR_THREADS; +if (maxPrThreads > 1) { partitionedRegionPool = CoreLoggingExecutors.newThreadPoolWithFeedStatistics( - MAX_PR_THREADS, INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(), + maxPrThreads, INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(), "PartitionedRegion Message Processor", thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread, stats.getPartitionedRegionPoolHelper(), diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index e5fa44a83e..47fa99e4a0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.logging.log4j.Logger; @@ -238,6 +239,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di protected bo
[geode] 04/09: GEODE-10346: Correct description of batch-time-interval in doc. (#7742)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit 982333d4ba532c922ac9f176e72fb6816f68ddb8 Author: Alberto Gomez AuthorDate: Mon Jun 6 08:54:33 2022 +0200 GEODE-10346: Correct description of batch-time-interval in doc. (#7742) --- geode-docs/reference/topics/cache_xml.html.md.erb | 4 ++-- geode-docs/tools_modules/gfsh/command-pages/alter.html.md.erb | 2 +- geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/geode-docs/reference/topics/cache_xml.html.md.erb b/geode-docs/reference/topics/cache_xml.html.md.erb index 9ab1ba7e6f..d1eeaa3e1c 100644 --- a/geode-docs/reference/topics/cache_xml.html.md.erb +++ b/geode-docs/reference/topics/cache_xml.html.md.erb @@ -267,7 +267,7 @@ Configures a gateway sender to distribute region events to another <%=vars.produ batch-time-interval -Maximum number of milliseconds that can elapse between sending batches. +Maximum amount of time, in ms, that can elapse before a batch is delivered, when no events are found in the queue to reach the batch-size. 1000 @@ -539,7 +539,7 @@ Configures a queue for sending region events to an AsyncEventListener implementa batch-time-interval -Maximum number of milliseconds that can elapse between sending batches. +Maximum amount of time, in ms, that can elapse before a batch is delivered, when no events are found in the queue to reach the batch-size. 5 diff --git a/geode-docs/tools_modules/gfsh/command-pages/alter.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/alter.html.md.erb index 7c4ac2f653..14f4758b97 100644 --- a/geode-docs/tools_modules/gfsh/command-pages/alter.html.md.erb +++ b/geode-docs/tools_modules/gfsh/command-pages/alter.html.md.erb @@ -238,7 +238,7 @@ The required option, `--id`, identifies the gateway sender to be altered. ‑‑batch-time-interval -Maximum time, in milliseconds, that can elapse between sending batches. +Maximum amount of time, in ms, that can elapse before a batch is delivered, when no events are found in the queue to reach the batch-size. ‑‑gateway-event-filter diff --git a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb index c8503ade89..b946ab6a3b 100644 --- a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb +++ b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb @@ -132,7 +132,7 @@ create async-event-queue --id=value --listener=value [--groups=value(,value)*] ‑‑batch-time-interval -Maximum amount of time, in ms, that can elapse before a batch is delivered. +Maximum amount of time, in ms, that can elapse before a batch is delivered, when no events are found in the queue to reach the batch-size. 5 @@ -585,7 +585,7 @@ create gateway-sender --id=value --remote-distributed-system-id=value ‑‑batch-time-interval -Maximum number of milliseconds that can elapse between sending batches. +Maximum amount of time, in ms, that can elapse before a batch is delivered, when no events are found in the queue to reach the batch-size. 1000
[geode] 02/09: GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715)
This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git commit 73aa4b3f72a5f517c9a281d476513a2886743299 Author: Alberto Gomez AuthorDate: Wed Jun 22 18:01:10 2022 +0200 GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715) * GEODE-10323: Remove schedule threads in MemoryAllocatorImpl The scheduled executor used in MemoryAllocatorImpl was scheduled in the constructor. This provoked intermittent failures in OffHeapStorageJUnitTest testCreateOffHeapStorage test cases due to a race condition. The scheduling has been moved to a new method (start()) in the MemoryAllocatorImpl class that is in turn invoked in the create() static method. * GEODE-10323: Extract update stats code to new class --- .../internal/offheap/MemoryAllocatorImpl.java | 70 +++--- .../internal/offheap/NonRealTimeStatsUpdater.java | 47 +++ .../geode/internal/offheap/OffHeapStorage.java | 19 ++ .../internal/offheap/MemoryAllocatorJUnitTest.java | 13 ++-- .../internal/offheap/OffHeapHelperJUnitTest.java | 2 +- .../OffHeapRegionEntryHelperInstanceTest.java | 2 +- .../internal/offheap/OffHeapStorageJUnitTest.java | 3 +- .../OffHeapStorageNonRuntimeStatsJUnitTest.java| 3 +- .../offheap/OffHeapStoredObjectJUnitTest.java | 2 +- 9 files changed, 102 insertions(+), 59 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java index 4e433e4b10..70f8f4ab0e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java @@ -20,10 +20,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.logging.log4j.Logger; @@ -39,7 +37,6 @@ import org.apache.geode.internal.cache.RegionEntry; import org.apache.geode.internal.lang.SystemProperty; import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier; import org.apache.geode.internal.offheap.annotations.Unretained; -import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.util.internal.GeodeGlossary; @@ -64,14 +61,14 @@ public class MemoryAllocatorImpl implements MemoryAllocator { SystemProperty.getProductIntegerProperty( "off-heap-stats-update-frequency-ms").orElse(360); - private final ScheduledExecutorService updateNonRealTimeStatsExecutor; - - private final ScheduledFuture updateNonRealTimeStatsFuture; + private final NonRealTimeStatsUpdater nonRealTimeStatsUpdater; private volatile OffHeapMemoryStats stats; private volatile OutOfOffHeapMemoryListener ooohml; + private final int updateOffHeapStatsFrequencyMs; + OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() { return ooohml; } @@ -98,20 +95,17 @@ public class MemoryAllocatorImpl implements MemoryAllocator { public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, - int updateOffHeapStatsFrequencyMs) { + Supplier updateOffHeapStatsFrequencyMsSupplier, + Supplier nonRealTimeStatsUpdaterSupplier) { return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, -SlabImpl::new, updateOffHeapStatsFrequencyMs); +SlabImpl::new, updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier); } - public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, - int slabCount, long offHeapMemorySize, long maxSlabSize) { -return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, -SlabImpl::new, UPDATE_OFF_HEAP_STATS_FREQUENCY_MS); - } - - private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, + static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, - Slab[] slabs, SlabFactory slabFactory, int updateOffHeapStatsFrequencyMs) { + Slab[] slabs, SlabFactory slabFactory, + Supplier updateOffHeapStatsFrequencyMsSupplier, + Supplier nonRealTimeStatsUpdaterSupplier) { MemoryAllocatorImpl result = singleton; boolean created = false; try { @@ -155,7 +149,10 @@
[geode] branch develop updated: GEODE-10421: Improve start gw sender with clean-queue (#7856)
This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git The following commit(s) were added to refs/heads/develop by this push: new c4e5a034d8 GEODE-10421: Improve start gw sender with clean-queue (#7856) c4e5a034d8 is described below commit c4e5a034d8cccb0a2814221d0cd3a8c5242e913d Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Fri Sep 16 10:40:20 2022 +0200 GEODE-10421: Improve start gw sender with clean-queue (#7856) * GEODE-10421: added check gw status * GEODE-10421: added TC * GEODE-10421: add document impacts * GEODE-10421: update after comments --- .../geode/management/internal/i18n/CliStrings.java | 7 ++ .../gfsh/command-pages/start.html.md.erb | 2 +- .../cli/commands/StartGatewaySenderCommand.java| 38 + .../StartGatewaySenderCommandDUnitTest.java| 91 ++ 4 files changed, 137 insertions(+), 1 deletion(-) diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java index f533c77960..3734eedce4 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java @@ -3007,6 +3007,13 @@ public class CliStrings { public static final String GATEWAY_RECEIVER_IS_NOT_AVAILABLE_ON_MEMBER_0 = "GatewayReceiver is not available on member {0}"; + public static final String START_GATEWAYSENDER_REJECTED = "Command rejected. Reasons:"; + + public static final String REJECT_START_GATEWAYSENDER_REASON = "Reasons command is rejected"; + + public static final String EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS = + "Command must be executed on all members on which gateway sender is created"; + public static final String GATEWAY_SENDER_IS_NOT_AVAILABLE = "GatewaySender is not available"; public static final String GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1 = "GatewaySender {0} is already started on member {1}"; diff --git a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb index f25b6b5be0..b41bfb02bb 100644 --- a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb +++ b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb @@ -121,7 +121,7 @@ start gateway-sender --id=value [--groups=value(,value)*] [--members=value(,valu | ‑‑id | *Required.* ID of the GatewaySender. | | | ‑‑groups | Group(s) of members on which to start the Gateway Sender. | | | ‑‑members | Member(s) on which to start the Gateway Sender. | | -| ‑‑clean-queues | Option to clean existing queue at start of the Gateway Sender. This option is only applicable for Gateway Senders with enabled persistence. | false | +| ‑‑clean-queues | Option to clean existing queue at start of the Gateway Sender. This option can be executed only on all members on which Gateway Sender is created. | false | **Example Commands:** diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java index 996e85be8e..d02277d4ac 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java @@ -77,6 +77,44 @@ public class StartGatewaySenderCommand extends GfshCommand { return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE); } +if (cleanQueues) { + + GatewaySenderMXBean bean; + boolean commandRejected = false; + + ResultModel rejectResultModel = + ResultModel.createError(CliStrings.START_GATEWAYSENDER_REJECTED); + TabularResultModel rejectResultData = + rejectResultModel.addTable(CliStrings.REJECT_START_GATEWAYSENDER_REASON); + + Set allServers = findMembers(null, null); + + for (DistributedMember member : allServers) { +if (cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId())) { + bean = service.getLocalGatewaySenderMXBean(senderId); +} else { + ObjectName objectName = service.getGatewaySenderMBeanName(member, senderId); + bean = service.getMBeanProxy(objectName, GatewaySenderMXBean.class); +} +if (bean != null) { + if (!dsMembers.contains(member)) { +return ResultModel.createError(CliStrings.EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS); + } + + if (bean.isRunning()) { +commandRe