DonalEvans commented on a change in pull request #7403:
URL: https://github.com/apache/geode/pull/7403#discussion_r832426053



##########
File path: 
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/LPushDUnitTest.java
##########
@@ -136,15 +138,15 @@ public void 
shouldNotLoseData_givenPrimaryServerCrashesDuringOperations()
     long length;
     for (String key : keys) {
       length = jedis.llen(key);
-      assertThat(length).isGreaterThanOrEqualTo(MINIMUM_ITERATIONS * 2 * 
PUSH_LIST_SIZE);
+      assertThat(length).isCloseTo(ITERATION_COUNT * 2 * PUSH_LIST_SIZE, 
within(6L));
       assertThat(length % 3).isEqualTo(0);
       validateListContents(key, length, keyToElementListMap);
     }
   }
 
   private void lpushPerformAndVerify(String key, List<String> elementList,
       AtomicLong runningCount) {
-    for (int i = 0; i < MINIMUM_ITERATIONS; i++) {
+    for (int i = 0; i < ITERATION_COUNT; i++) {

Review comment:
       Rather than having this test always do a fixed number of LPUSH 
operations and moving buckets while they're ongoing, it would be better to have 
a fixed number of bucket moves performed (25 seems like a reasonable number) 
and have LPUSH happening continuously until those bucket moves are finished, 
since what we really want to test here is how LPUSH behaves when we move 
buckets, so we should make sure that we always do the same number of bucket 
moves.
   
   This can be achieved by replacing the for loop in this method with a while 
loop, having an `AtomicBoolean` rather than an `AtomicInteger` as the condition 
(named something like continueRunning maybe), and setting that `AtomicBoolean` 
to false after the for loop that moves buckets completes. A similar approach is 
used in `LInsertDUnitTest`.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LTrimExecutor.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+
+import java.util.List;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.executor.CommandExecutor;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class LTrimExecutor implements CommandExecutor {
+  private static final int startIndex = 2;
+  private static final int stopIndex = 3;
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElems = command.getProcessedCommand();
+    Region<RedisKey, RedisData> region = context.getRegion();
+    RedisKey key = command.getKey();

Review comment:
       Minor performance improvement, but these lines should be moved to below 
the try/catch as it's possible we return from this method before needing them.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
##########
@@ -221,6 +222,63 @@ public int llen() {
     return elementList.size();
   }
 
+  /**
+   * @param start the index of the first element to retain
+   * @param end the index of the last element to retain
+   * @param region the region this instance is stored in
+   * @param key the name of the list to pop from
+   * @return the element actually popped
+   */
+  public byte[] ltrim(long start, long end, Region<RedisKey, RedisData> region,

Review comment:
       This method does not need to return a byte array, as the LTRIM command 
just returns "OK" if it succeeds. This method should therefore be `public 
Void`, returning `null` unless an exception is encountered (we can't have it be 
lower-case v `void` because the `listLockedExecute()` method expects a 
function, which can't return `void` even though in this case we want to).
   
   If all of the comments for this method are taken on board, it would end up 
looking something like:
   ```
     public Void ltrim(long start, long end, Region<RedisKey, RedisData> region,
         RedisKey key) {
       int length = elementList.size();
       int boundedStart = getBoundedStartIndex(start, length);
       int boundedEnd = getBoundedEndIndex(end, length);
   
       if (boundedStart > boundedEnd || boundedStart == length) {
         // Remove everything
         region.remove(key);
         return null;
       }
   
       if (boundedStart == 0 && boundedEnd == length) {
         // No-op, return without modifying the list
         return null;
       }
   
       RetainElementsByIndexRange retainElementsByRange;
       synchronized (this) {
         if (boundedEnd < length) {
           // trim stuff at end of list
           elementList.subList(boundedEnd + 1, length).clear();
         }
         if (boundedStart > 0) {
           // trim stuff at start of list
           elementList.subList(0, boundedStart).clear();
         }
         retainElementsByRange =
             new RetainElementsByIndexRange(incrementAndGetVersion(), 
boundedStart, boundedEnd);
       }
       storeChanges(region, key, retainElementsByRange);
       return null;
     }
   ```

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/delta/RemoveElementsByIndex.java
##########
@@ -46,6 +46,15 @@ public void add(int index) {
     indexes.add(index);
   }
 
+  public RemoveElementsByIndex(byte version, List<Integer> indexes) {
+    super(version);
+    this.indexes = indexes;
+  }
+
+  public int size() {
+    return this.indexes.size();
+  }
+

Review comment:
       Why was these methods added? They're not being called anywhere.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {
+    assertThat(jedis.ltrim("nonexistent", 0, -1)).isEqualTo("OK");
+  }
+
+  @Test
+  public void withNonIntegerRangeSpecifier_Fails() {
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "0", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "-1"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+  }
+
+  @Test
+  public void trimsToSpecifiedRange_givenValidRange() {

Review comment:
       Rather than one big test with multiple cases, it would be better to 
break this up into individual tests, especially since the contents of the list 
is being reset between each call to LTRIM. To reduce duplication, it might be 
worth turning this into a parameterized test, along with the one below it. That 
way you could have one test that tests all of the possible combinations of 
start and end that produce non-empty ranges and one test that tests all the 
combinations that produce empty ranges.
   
   In particular it would be good to test with values for the end index that 
are equal to the size of the list, as that case is currently missing.

##########
File path: 
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/LTrimDUnitTest.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class LTrimDUnitTest {
+  public static final int INITIAL_LIST_SIZE = 5_000;
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule();
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  private static JedisCluster jedis;
+
+  @Before
+  public void testSetup() {
+    MemberVM locator = clusterStartUp.startLocatorVM(0);
+    clusterStartUp.startRedisVM(1, locator.getPort());
+    clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(3, locator.getPort());
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), 
REDIS_CLIENT_TIMEOUT);
+    clusterStartUp.flushAll();
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+  }
+
+  @Test
+  public void shouldDistributeDataAmongCluster_andRetainDataAfterServerCrash() 
{
+    String key = makeListKeyWithHashtag(1, 
clusterStartUp.getKeyOnServer("ltrim", 1));
+    List<String> elementList = makeElementList(key, INITIAL_LIST_SIZE);
+    lpushPerformAndVerify(key, elementList);
+
+    // Remove all but last element
+    jedis.ltrim(key, INITIAL_LIST_SIZE - 1, INITIAL_LIST_SIZE);

Review comment:
       To make this test a bit more robust, it would be good to select start 
and end indexes such that we remove some elements from the start and some 
elements form the end of the list. That way we're checking that we're using 
both start and end indexes correctly in the Delta that we send.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {
+    assertThat(jedis.ltrim("nonexistent", 0, -1)).isEqualTo("OK");
+  }
+
+  @Test
+  public void withNonIntegerRangeSpecifier_Fails() {
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "0", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "-1"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+  }
+
+  @Test
+  public void trimsToSpecifiedRange_givenValidRange() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 0);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -2, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e1");
+  }
+
+  @Test
+  public void trimsToCorrectRange_givenSpecifiersOutsideListSize() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, -4, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, -10, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 4);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+  }
+
+  private void initializeTestList() {
+    jedis.del(KEY);
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+  }
+
+  @Test
+  public void removesKey_whenLastElementRemoved() {
+    final String keyWithTagForKeysCommand = "{tag}" + KEY;
+    jedis.lpush(keyWithTagForKeysCommand, "e1", "e2", "e3");
+
+    jedis.ltrim(keyWithTagForKeysCommand, 0, -4);
+    assertThat(jedis.llen(keyWithTagForKeysCommand)).isEqualTo(0L);
+    assertThat(jedis.exists(keyWithTagForKeysCommand)).isFalse();
+  }
+
+  @Test
+  public void removesKey_whenLastElementRemoved_multipleTimes() {
+    final String keyWithTagForKeysCommand = "{tag}" + KEY;
+    jedis.lpush(keyWithTagForKeysCommand, "e1", "e2", "e3");
+
+    jedis.ltrim(keyWithTagForKeysCommand, 0, -4);
+    jedis.ltrim(keyWithTagForKeysCommand, 0, -4);
+
+    assertThat(jedis.exists(keyWithTagForKeysCommand)).isFalse();
+  }
+
+  @Test
+  public void withConcurrentLPush_returnsCorrectValue() {
+    String[] valuesInitial = new String[] {"un", "deux", "trois"};
+    String[] valuesToAdd = new String[] {"plum", "peach", "orange"};
+    jedis.lpush(KEY, valuesInitial);
+
+    final AtomicReference<String> lpopReference = new AtomicReference<>();
+    new ConcurrentLoopingThreads(1000,
+        i -> jedis.lpush(KEY, valuesToAdd),
+        i -> jedis.ltrim(KEY, 0, 2),
+        i -> lpopReference.set(jedis.lpop(KEY)))
+            .runWithAction(() -> {
+              assertThat(lpopReference).satisfiesAnyOf(
+                  lpopResult -> 
assertThat(lpopReference.get()).isEqualTo("orange"),
+                  lpopResult -> 
assertThat(lpopReference.get()).isEqualTo("troix"));
+              jedis.del(KEY);
+              jedis.lpush(KEY, valuesInitial);
+            });

Review comment:
       The call to LPOP here should be removed, since we're not trying to test 
the concurrent behaviour of LPUSH, LTRIM *and* LPOP. With the test as it is, we 
could do the push, then do the pop, then do the trim, which is not what we're 
trying to test.
   
   Instead, we should be calling LRANGE in the `runWithAction` lambda and 
asserting that the contents of the list is either `{"un", "deux", "trois"}` if 
the LPUSH happened first or `{"plum", "peach", "orange"}` if the LTRIM happened 
first (but with the elements in reverse order, because LPUSH likes to 
complicate things).

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LTrimExecutor.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+
+import java.util.List;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.executor.CommandExecutor;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class LTrimExecutor implements CommandExecutor {
+  private static final int startIndex = 2;
+  private static final int stopIndex = 3;
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElems = command.getProcessedCommand();
+    Region<RedisKey, RedisData> region = context.getRegion();
+    RedisKey key = command.getKey();
+
+    long start;
+    long end;
+
+    try {
+      start = Coder.bytesToLong(commandElems.get(startIndex));
+      end = Coder.bytesToLong(commandElems.get(stopIndex));

Review comment:
       Having these values be `long` here but then casting them to `int` inside 
the `ltrim()` method could potentially lead to overflows, so it might be safer 
to wrap these calls to `bytesToLong()` in `narrowLongToInt()` to safely convert 
from `long` to `int` and have the values be `int` throughout.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
##########
@@ -356,10 +432,32 @@ protected synchronized void elementsPushHead(List<byte[]> 
elementsToAdd) {
     }
   }
 
+  public synchronized void elementsRemove(List<Integer> indexList) {
+    for (Integer element : indexList) {
+      elementList.remove(element.intValue());
+    }
+  }
+
   public synchronized void elementReplace(int index, byte[] newValue) {
     elementList.set(index, newValue);
   }
 
+  public synchronized void elementsRetainByIndexRange(int start, int end) {
+    if (start < 0) {
+      // Remove everything
+      elementList.clear();
+      return;
+    }
+
+    if (end < elementList.size()) {
+      elementList.subList(end + 1, elementList.size()).clear();
+    }
+
+    if (start > 0) {
+      elementList.subList(0, start).clear();
+    }

Review comment:
       I think it might be better to not have the checks on the values of `end` 
and `start` here, since if in the time between us sending the delta with valid 
values and the delta arriving at the secondary, those values have become 
invalid, we should probably throw an exception rather than just silently hide 
the fact, since we really don't expect that to ever happen.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
##########
@@ -221,6 +222,63 @@ public int llen() {
     return elementList.size();
   }
 
+  /**
+   * @param start the index of the first element to retain
+   * @param end the index of the last element to retain
+   * @param region the region this instance is stored in
+   * @param key the name of the list to pop from
+   * @return the element actually popped
+   */
+  public byte[] ltrim(long start, long end, Region<RedisKey, RedisData> region,
+      RedisKey key) {
+    int length = elementList.size();
+    int retainStart = 0;
+    int retainEnd = length;
+    int boundedStart = getBoundedStartIndex(start, length);
+    int boundedEnd = getBoundedEndIndex(end, length);
+    RetainElementsByIndexRange retainElementsByRange;
+
+    if (boundedStart > boundedEnd || boundedStart == length) {
+      // Remove everything
+      retainStart = -1;

Review comment:
       If we know we're going to remove everything, we should be able to do 
that here, by just calling `region.remove(key)` followed by returning null. 
This avoids the need to muck around with deltas or synchronization. We can also 
return early here if the range to be retained encompasses the entire list, i.e. 
`boundedStart == 0 && boundedEnd = length` since then we know we're not going 
to modify the list at all and can just return immediately.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
##########
@@ -221,6 +222,63 @@ public int llen() {
     return elementList.size();
   }
 
+  /**
+   * @param start the index of the first element to retain
+   * @param end the index of the last element to retain
+   * @param region the region this instance is stored in
+   * @param key the name of the list to pop from
+   * @return the element actually popped
+   */
+  public byte[] ltrim(long start, long end, Region<RedisKey, RedisData> region,
+      RedisKey key) {
+    int length = elementList.size();
+    int retainStart = 0;
+    int retainEnd = length;
+    int boundedStart = getBoundedStartIndex(start, length);
+    int boundedEnd = getBoundedEndIndex(end, length);
+    RetainElementsByIndexRange retainElementsByRange;
+
+    if (boundedStart > boundedEnd || boundedStart == length) {
+      // Remove everything
+      retainStart = -1;
+    } else {
+      if (boundedStart > retainStart) {
+        retainStart = boundedStart;
+      }
+      if (boundedEnd <= retainEnd) {
+        retainEnd = boundedEnd;
+      }

Review comment:
       Given the limits put on `boundedStart` and `boundedEnd` by the 
`getBounded***Index()` methods, I'm not sure I see the purpose of the 
`retainStart` and `retainEnd` variables here. When we get to this block, 
`boundedStart` is always either 0 or greater, meaning that in either case, 
`retainStart` will have the same value as `boundedStart`. Likewise, there's no 
situation in which `boundedEnd` is not less than or equal to `length` (the 
initial value of `retainEnd`) so `retainEnd` will always have the same value as 
`boundedEnd`. I think it would simplify things a bit to just use `boundedStart` 
and `boundedEnd` throughout and remove the `retainStart` and `retainEnd` 
variables.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
##########
@@ -356,10 +432,32 @@ protected synchronized void elementsPushHead(List<byte[]> 
elementsToAdd) {
     }
   }
 
+  public synchronized void elementsRemove(List<Integer> indexList) {
+    for (Integer element : indexList) {
+      elementList.remove(element.intValue());
+    }
+  }
+
   public synchronized void elementReplace(int index, byte[] newValue) {
     elementList.set(index, newValue);
   }
 
+  public synchronized void elementsRetainByIndexRange(int start, int end) {
+    if (start < 0) {
+      // Remove everything
+      elementList.clear();
+      return;
+    }

Review comment:
       We should not need to handle this case in the Delta, since if all 
elements were removed from the list, the list should have just been removed 
from the region, so there shouldn't be anything to apply the delta to.

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
##########
@@ -258,6 +316,22 @@ public void lset(Region<RedisKey, RedisData> region, 
RedisKey key, int index, by
     return popped;
   }
 
+  private int getBoundedStartIndex(long index, int size) {
+    if (index >= 0L) {
+      return (int) Math.min(index, size);
+    } else {
+      return (int) Math.max(index + size, 0);
+    }
+  }
+
+  private int getBoundedEndIndex(long index, int size) {
+    if (index >= 0L) {
+      return (int) Math.min(index, size);
+    } else {
+      return (int) Math.max(index + size, -1);

Review comment:
       I think this is incorrect, as the end index provided by the user is 
inclusive, so an index of `-1` should be equivalent to an index of `size`, 
instead of `size - 1`, which is what would be returned here. For contrast, if 
the user directly passed `size` then this method would return `size` instead of 
`size -1`.
   
   We need to be careful with how we handle inclusive vs exclusive indexes 
here, since Java uses an exclusive index for the second argument in methods 
like `sublist()`, meaning that if start == end then the list is empty, but 
Redis uses an inclusive index for end, meaning that if start == end, then list 
has size 1. Either this method should return the inclusive end index (and be 
named to relfect that, since it's non-standard in Java) or it should always 
return the exclusive end index and the logic in methods that call it should be 
adjusted to ensure that we're not behaving incorrectly.

##########
File path: 
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/LPushDUnitTest.java
##########
@@ -155,11 +157,13 @@ private void lpushPerformAndVerify(String key, 
List<String> elementList,
 
   private void validateListContents(String key, long length,
       HashMap<String, List<String>> keyToElementListMap) {
-    while (jedis.llen(key) > 0) {
+    length = jedis.llen(key);
+    while (length > 0) {
       List<String> elementList = keyToElementListMap.get(key);
       assertThat(jedis.lpop(key)).isEqualTo(elementList.get(2));
       assertThat(jedis.lpop(key)).isEqualTo(elementList.get(1));
       assertThat(jedis.lpop(key)).isEqualTo(elementList.get(0));
+      length = jedis.llen(key);

Review comment:
       This change seems unnecessary. It would probably be better to just 
remove `length` from the method signature and keep the while loop as it was.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {

Review comment:
       This test name should be "returnsOK" instead of "returnsNull"

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {
+    assertThat(jedis.ltrim("nonexistent", 0, -1)).isEqualTo("OK");
+  }
+
+  @Test
+  public void withNonIntegerRangeSpecifier_Fails() {
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "0", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "-1"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+  }
+
+  @Test
+  public void trimsToSpecifiedRange_givenValidRange() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 0);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -2, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e1");
+  }
+
+  @Test
+  public void trimsToCorrectRange_givenSpecifiersOutsideListSize() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, -4, -1);

Review comment:
       -4 is inside the list size range, corresponding to index 0; a value of 
-5 would be required for a start index that is (effectively) negative.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {
+    assertThat(jedis.ltrim("nonexistent", 0, -1)).isEqualTo("OK");
+  }
+
+  @Test
+  public void withNonIntegerRangeSpecifier_Fails() {
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "0", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "-1"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+  }
+
+  @Test
+  public void trimsToSpecifiedRange_givenValidRange() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 0);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -2, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e1");
+  }
+
+  @Test
+  public void trimsToCorrectRange_givenSpecifiersOutsideListSize() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, -4, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, -10, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 4);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+  }
+
+  private void initializeTestList() {
+    jedis.del(KEY);
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+  }
+
+  @Test
+  public void removesKey_whenLastElementRemoved() {

Review comment:
       This test name might be better as something like 
"removesKey_whenAllElementsTrimmed"

##########
File path: 
geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/delta/RetainElementsByIndexRange.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.data.delta;
+
+import static org.apache.geode.DataSerializer.readPrimitiveInt;
+import static 
org.apache.geode.redis.internal.data.delta.DeltaType.RETAIN_ELEMENTS_BY_INDEX_RANGE;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.redis.internal.data.AbstractRedisData;
+
+public class RetainElementsByIndexRange extends DeltaInfo {
+  private int start;
+  private int end;

Review comment:
       These can be `final`

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {
+    assertThat(jedis.ltrim("nonexistent", 0, -1)).isEqualTo("OK");
+  }
+
+  @Test
+  public void withNonIntegerRangeSpecifier_Fails() {
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "0", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "-1"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+  }
+
+  @Test
+  public void trimsToSpecifiedRange_givenValidRange() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 0);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -2, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e1");
+  }
+
+  @Test
+  public void trimsToCorrectRange_givenSpecifiersOutsideListSize() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, -4, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, -10, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 4);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+  }
+
+  private void initializeTestList() {
+    jedis.del(KEY);
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+  }
+
+  @Test
+  public void removesKey_whenLastElementRemoved() {
+    final String keyWithTagForKeysCommand = "{tag}" + KEY;
+    jedis.lpush(keyWithTagForKeysCommand, "e1", "e2", "e3");

Review comment:
       A tag is not necessary for this test, so these lines can be replaced 
with a call to `initializeTestList()`.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {
+    assertThat(jedis.ltrim("nonexistent", 0, -1)).isEqualTo("OK");
+  }
+
+  @Test
+  public void withNonIntegerRangeSpecifier_Fails() {
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "0", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "-1"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+  }
+
+  @Test
+  public void trimsToSpecifiedRange_givenValidRange() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 0);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -2, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e1");
+  }
+
+  @Test
+  public void trimsToCorrectRange_givenSpecifiersOutsideListSize() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, -4, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, -10, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 4);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+  }
+
+  private void initializeTestList() {
+    jedis.del(KEY);
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+  }
+
+  @Test
+  public void removesKey_whenLastElementRemoved() {
+    final String keyWithTagForKeysCommand = "{tag}" + KEY;
+    jedis.lpush(keyWithTagForKeysCommand, "e1", "e2", "e3");
+
+    jedis.ltrim(keyWithTagForKeysCommand, 0, -4);
+    assertThat(jedis.llen(keyWithTagForKeysCommand)).isEqualTo(0L);
+    assertThat(jedis.exists(keyWithTagForKeysCommand)).isFalse();

Review comment:
       It's not necessary to call LLEN here in addition to checking if the key 
exists. Doing so is just testing the behaviour of LLEN rather than checking 
that the key is correctly removed from the region.

##########
File path: 
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/LTrimDUnitTest.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class LTrimDUnitTest {
+  public static final int INITIAL_LIST_SIZE = 5_000;
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule();
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  private static JedisCluster jedis;
+
+  @Before
+  public void testSetup() {
+    MemberVM locator = clusterStartUp.startLocatorVM(0);
+    clusterStartUp.startRedisVM(1, locator.getPort());
+    clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(3, locator.getPort());
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), 
REDIS_CLIENT_TIMEOUT);
+    clusterStartUp.flushAll();
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+  }
+
+  @Test
+  public void shouldDistributeDataAmongCluster_andRetainDataAfterServerCrash() 
{
+    String key = makeListKeyWithHashtag(1, 
clusterStartUp.getKeyOnServer("ltrim", 1));
+    List<String> elementList = makeElementList(key, INITIAL_LIST_SIZE);
+    lpushPerformAndVerify(key, elementList);
+
+    // Remove all but last element
+    jedis.ltrim(key, INITIAL_LIST_SIZE - 1, INITIAL_LIST_SIZE);
+
+    clusterStartUp.crashVM(1); // kill primary server
+
+    assertThat(jedis.lindex(key, 0)).isEqualTo(elementList.get(0));
+    jedis.ltrim(key, 0, -2);
+    assertThat(jedis.exists(key)).isFalse();
+  }
+
+  @Test
+  public void givenBucketsMoveDuringLtrim_thenOperationsAreNotLost() throws 
Exception {
+    AtomicLong runningCount = new AtomicLong(3);

Review comment:
       This test would be better if instead of running until we've done a 
certain number of LTRIM calls, we instead do constant LTRIM calls until we've 
done a certain number of bucket moves. The LRemDUnitTest that does bucket moves 
that's being added [as part of the LREM 
PR](https://github.com/apache/geode/pull/7431/files) is a great example of how 
we want to be approaching these sort of tests, I think, so it could be helpful 
to use it as a guide.
   
   The basic idea is that we do our operation constantly until we're finished 
moving buckets, but since LTRIM removes elements, we need to reset the list to 
its initial state once it's been emptied, then continue calling LTRIM. That way 
we can be sure that we do enough bucket moves to make the test consistent and 
useful, without having to worry that we run out of elements to trim.

##########
File path: 
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/LTrimDUnitTest.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class LTrimDUnitTest {
+  public static final int INITIAL_LIST_SIZE = 5_000;
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new 
RedisClusterStartupRule();
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  private static JedisCluster jedis;
+
+  @Before
+  public void testSetup() {
+    MemberVM locator = clusterStartUp.startLocatorVM(0);
+    clusterStartUp.startRedisVM(1, locator.getPort());
+    clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(3, locator.getPort());
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), 
REDIS_CLIENT_TIMEOUT);
+    clusterStartUp.flushAll();
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+  }
+
+  @Test
+  public void shouldDistributeDataAmongCluster_andRetainDataAfterServerCrash() 
{
+    String key = makeListKeyWithHashtag(1, 
clusterStartUp.getKeyOnServer("ltrim", 1));
+    List<String> elementList = makeElementList(key, INITIAL_LIST_SIZE);
+    lpushPerformAndVerify(key, elementList);
+
+    // Remove all but last element
+    jedis.ltrim(key, INITIAL_LIST_SIZE - 1, INITIAL_LIST_SIZE);
+
+    clusterStartUp.crashVM(1); // kill primary server
+
+    assertThat(jedis.lindex(key, 0)).isEqualTo(elementList.get(0));
+    jedis.ltrim(key, 0, -2);
+    assertThat(jedis.exists(key)).isFalse();
+  }
+
+  @Test
+  public void givenBucketsMoveDuringLtrim_thenOperationsAreNotLost() throws 
Exception {
+    AtomicLong runningCount = new AtomicLong(3);
+    List<String> listHashtags = makeListHashtags();
+    List<String> keys = makeListKeys(listHashtags);
+
+    List<String> elementList1 = makeElementList(keys.get(0), 
INITIAL_LIST_SIZE);
+    List<String> elementList2 = makeElementList(keys.get(1), 
INITIAL_LIST_SIZE);
+    List<String> elementList3 = makeElementList(keys.get(2), 
INITIAL_LIST_SIZE);
+
+    lpushPerformAndVerify(keys.get(0), elementList1);
+    lpushPerformAndVerify(keys.get(1), elementList2);
+    lpushPerformAndVerify(keys.get(2), elementList3);
+
+    Runnable task1 =
+        () -> ltrimPerformAndVerify(keys.get(0), runningCount);
+    Runnable task2 =
+        () -> ltrimPerformAndVerify(keys.get(1), runningCount);
+    Runnable task3 =
+        () -> ltrimPerformAndVerify(keys.get(2), runningCount);
+
+    Future<Void> future1 = executor.runAsync(task1);
+    Future<Void> future2 = executor.runAsync(task2);
+    Future<Void> future3 = executor.runAsync(task3);
+
+    for (int i = 0; i < 100 && runningCount.get() > 0; i++) {
+      clusterStartUp.moveBucketForKey(listHashtags.get(i % 
listHashtags.size()));
+      Thread.sleep(200);
+    }
+
+    runningCount.set(0);
+
+    future1.get();
+    future2.get();
+    future3.get();
+  }
+
+  private List<String> makeListHashtags() {
+    List<String> listHashtags = new ArrayList<>();
+    listHashtags.add(clusterStartUp.getKeyOnServer("ltrim", 1));
+    listHashtags.add(clusterStartUp.getKeyOnServer("ltrim", 2));
+    listHashtags.add(clusterStartUp.getKeyOnServer("ltrim", 3));
+    return listHashtags;
+  }
+
+  private List<String> makeListKeys(List<String> listHashtags) {
+    List<String> keys = new ArrayList<>();
+    keys.add(makeListKeyWithHashtag(1, listHashtags.get(0)));
+    keys.add(makeListKeyWithHashtag(2, listHashtags.get(1)));
+    keys.add(makeListKeyWithHashtag(3, listHashtags.get(2)));
+    return keys;
+  }
+
+  private void lpushPerformAndVerify(String key, List<String> elementList) {
+    jedis.lpush(key, elementList.toArray(new String[] {}));
+
+    Long listLength = jedis.llen(key);
+    assertThat(listLength).as("Initial list lengths not equal for key %s'", 
key)
+        .isEqualTo(elementList.size());
+  }
+
+  private void ltrimPerformAndVerify(String key, AtomicLong runningCount) {
+    assertThat(jedis.llen(key)).isEqualTo(INITIAL_LIST_SIZE);
+
+    int lastElementIndex = INITIAL_LIST_SIZE - 1;
+    int i = 0;
+    while (jedis.llen(key) > 1 && runningCount.get() > 0) {
+      String expected = makeElementString(key, i);
+      try {
+        assertThat(jedis.lindex(key, lastElementIndex)).isEqualTo(expected);
+        jedis.ltrim(key, 0, lastElementIndex - 1);
+        assertThat(jedis.llen(key)).as("Key: %s ", 
key).isEqualTo(lastElementIndex);
+        lastElementIndex--;

Review comment:
       Could we choose indexes for start and end here so that we're keeping 
some middle part of the list instead of just trimming the last element? That 
way we make sure that we're exercising the whole of the Delta that we send, not 
just the end index. Maybe just trim the first and last elements?
   
   Also, it would be good to assert on the entire contents of the list (using 
LRANGE) rather than just the size, as we want to make sure that we're not 
removing the wrong elements.

##########
File path: 
geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractLTrimIntegrationTest.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertExactNumberOfArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+
+public abstract class AbstractLTrimIntegrationTest implements 
RedisIntegrationTest {
+  public static final String KEY = "key";
+  public static final String PREEXISTING_VALUE = "preexistingValue";
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void givenWrongNumOfArgs_returnsError() {
+    assertExactNumberOfArgs(jedis, Protocol.Command.LTRIM, 3);
+  }
+
+  @Test
+  public void withNonListKey_Fails() {
+    jedis.set("string", PREEXISTING_VALUE);
+    assertThatThrownBy(() -> jedis.ltrim("string", 0, -1))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void withNonExistentKey_returnsNull() {
+    assertThat(jedis.ltrim("nonexistent", 0, -1)).isEqualTo("OK");
+  }
+
+  @Test
+  public void withNonIntegerRangeSpecifier_Fails() {
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "0", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "-1"))
+            .hasMessage(ERROR_NOT_INTEGER);
+    assertThatThrownBy(() -> jedis.sendCommand(KEY, Protocol.Command.LTRIM, 
KEY,
+        "not-an-integer", "not-an-integer"))
+            .hasMessage(ERROR_NOT_INTEGER);
+  }
+
+  @Test
+  public void trimsToSpecifiedRange_givenValidRange() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 0);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 0, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, 2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(3);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, 1, -2);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e2");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -2, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(2);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e1");
+
+    initializeTestList();
+
+    jedis.ltrim(KEY, -1, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(1);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e1");
+  }
+
+  @Test
+  public void trimsToCorrectRange_givenSpecifiersOutsideListSize() {
+    initializeTestList();
+
+    jedis.ltrim(KEY, -4, -1);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, -10, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 4);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+
+    jedis.ltrim(KEY, 0, 10);
+    assertThat(jedis.llen(KEY)).isEqualTo(4);
+    assertThat(jedis.lindex(KEY, 0)).isEqualTo("e4");
+    assertThat(jedis.lindex(KEY, 1)).isEqualTo("e3");
+    assertThat(jedis.lindex(KEY, 2)).isEqualTo("e2");
+    assertThat(jedis.lindex(KEY, 3)).isEqualTo("e1");
+  }
+
+  private void initializeTestList() {
+    jedis.del(KEY);
+    jedis.lpush(KEY, "e1", "e2", "e3", "e4");
+  }
+
+  @Test
+  public void removesKey_whenLastElementRemoved() {
+    final String keyWithTagForKeysCommand = "{tag}" + KEY;
+    jedis.lpush(keyWithTagForKeysCommand, "e1", "e2", "e3");
+
+    jedis.ltrim(keyWithTagForKeysCommand, 0, -4);
+    assertThat(jedis.llen(keyWithTagForKeysCommand)).isEqualTo(0L);
+    assertThat(jedis.exists(keyWithTagForKeysCommand)).isFalse();
+  }
+
+  @Test
+  public void removesKey_whenLastElementRemoved_multipleTimes() {

Review comment:
       I'm not super clear on what this test is showing, since we already have 
a test that shows that the key is removed if all elements are trimmed, and a 
test that shows the behaviour when LTRIM is called on a non-existent key. I 
think this can probably just be removed.

##########
File path: 
geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/LPushDUnitTest.java
##########
@@ -114,7 +115,8 @@ public void 
givenBucketsMovedDuringLPush_elementsAreAddedAtomically()
 
     for (String key : keys) {
       long length = jedis.llen(key);
-      assertThat(length).isGreaterThanOrEqualTo(MINIMUM_ITERATIONS * 2 * 
PUSH_LIST_SIZE);
+      assertThat(length).isCloseTo(ITERATION_COUNT * 2 * PUSH_LIST_SIZE, 
within(6L));
+      assertThat(length % 3).isEqualTo(0);

Review comment:
       Agreed, this assertion should not be using `isCloseTo()` since we should 
not be seeing duplicated commands here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@geode.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to