diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java
index e442a6d1259..d8dd4ee94a6 100644
--- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java
+++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java
@@ -62,6 +62,7 @@
  * @author Gary Russell
  * @author Alexandre Strubel
  * @author Ruslan Stelmachenko
+ * @author Eddie Cho
  *
  * @since 4.3
  */
@@ -389,9 +390,9 @@ public void close() {
 	}
 
 	@Override
-	public void delete(String lock) {
-		this.defaultTransactionTemplate.executeWithoutResult(
-				transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
+	public boolean delete(String lock) {
+		return this.defaultTransactionTemplate.execute(
+				transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)) > 0;
 	}
 
 	@Override
diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java
index cae7d922d7f..e966d6bb428 100644
--- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java
+++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2023 the original author or authors.
+ * Copyright 2016-2024 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
 package org.springframework.integration.jdbc.lock;
 
 import java.time.Duration;
+import java.util.ConcurrentModificationException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -56,6 +57,7 @@
  * @author Unseok Kim
  * @author Christian Tzolov
  * @author Myeonghyeon Lee
+ * @author Eddie Cho
  *
  * @since 4.3
  */
@@ -305,12 +307,20 @@ public void unlock() {
 			try {
 				while (true) {
 					try {
-						this.mutex.delete(this.path);
-						return;
+						if (this.mutex.delete(this.path)) {
+							return;
+						}
+						else {
+							throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
+									"The integrity of data protected by this lock may have been compromised.");
+						}
 					}
 					catch (TransientDataAccessException | TransactionTimedOutException | TransactionSystemException e) {
 						// try again
 					}
+					catch (ConcurrentModificationException e) {
+						throw e;
+					}
 					catch (Exception e) {
 						throw new DataAccessResourceFailureException("Failed to release mutex at " + this.path, e);
 					}
diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java
index b0a6a902fda..247e6fccb5e 100644
--- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java
+++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2021 the original author or authors.
+ * Copyright 2016-2024 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
  * @author Dave Syer
  * @author Alexandre Strubel
  * @author Artem Bilan
+ * @author Eddie Cho
  *
  * @since 4.3
  */
@@ -41,8 +42,9 @@ public interface LockRepository extends Closeable {
 	/**
 	 * Remove a lock from this repository.
 	 * @param lock the lock to remove.
+	 * @return deleted or not.
 	 */
-	void delete(String lock);
+	boolean delete(String lock);
 
 	/**
 	 * Remove all the expired locks.
diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java
index 2ff6a720d8c..28f5db6ffff 100644
--- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java
+++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2022 the original author or authors.
+ * Copyright 2020-2024 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
 package org.springframework.integration.jdbc.lock;
 
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -31,17 +30,17 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
  * @author Olivier Hubaut
  * @author Fran Aranda
+ * @author Eddie Cho
  *
  * @since 5.2.11
  */
-public class JdbcLockRegistryDelegateTests {
+class JdbcLockRegistryDelegateTests {
 
 	private JdbcLockRegistry registry;
 
@@ -56,7 +55,7 @@ public void clear() {
 	}
 
 	@Test
-	public void testLessAmountOfUnlockThanLock() {
+	void testLessAmountOfUnlockThanLock() {
 		final Random random = new Random();
 		final int lockCount = random.nextInt(5) + 1;
 		final int unlockCount = random.nextInt(lockCount);
@@ -73,11 +72,13 @@ public void testLessAmountOfUnlockThanLock() {
 	}
 
 	@Test
-	public void testSameAmountOfUnlockThanLock() {
+	void testSameAmountOfUnlockThanLock() {
 		final Random random = new Random();
 		final int lockCount = random.nextInt(5) + 1;
 
 		final Lock lock = registry.obtain("foo");
+		when(repository.delete(anyString())).thenReturn(true);
+
 		for (int i = 0; i < lockCount; i++) {
 			lock.tryLock();
 		}
@@ -89,17 +90,13 @@ public void testSameAmountOfUnlockThanLock() {
 	}
 
 	@Test
-	public void testTransientDataAccessException() {
+	void testTransientDataAccessException() {
 		final Lock lock = registry.obtain("foo");
 		lock.tryLock();
 
-		final AtomicBoolean shouldThrow = new AtomicBoolean(true);
-		doAnswer(invocation -> {
-			if (shouldThrow.getAndSet(false)) {
-				throw mock(TransientDataAccessException.class);
-			}
-			return null;
-		}).when(repository).delete(anyString());
+		when(repository.delete(anyString()))
+				.thenThrow(mock(TransientDataAccessException.class))
+				.thenReturn(true);
 
 		lock.unlock();
 
@@ -107,17 +104,13 @@ public void testTransientDataAccessException() {
 	}
 
 	@Test
-	public void testTransactionTimedOutException() {
+	void testTransactionTimedOutException() {
 		final Lock lock = registry.obtain("foo");
 		lock.tryLock();
 
-		final AtomicBoolean shouldThrow = new AtomicBoolean(true);
-		doAnswer(invocation -> {
-			if (shouldThrow.getAndSet(false)) {
-				throw mock(TransactionTimedOutException.class);
-			}
-			return null;
-		}).when(repository).delete(anyString());
+		when(repository.delete(anyString()))
+				.thenThrow(TransactionTimedOutException.class)
+				.thenReturn(true);
 
 		lock.unlock();
 
@@ -125,17 +118,13 @@ public void testTransactionTimedOutException() {
 	}
 
 	@Test
-	public void testTransactionSystemException() {
+	void testTransactionSystemException() {
 		final Lock lock = registry.obtain("foo");
 		lock.tryLock();
 
-		final AtomicBoolean shouldThrow = new AtomicBoolean(true);
-		doAnswer(invocation -> {
-			if (shouldThrow.getAndSet(false)) {
-				throw mock(TransactionSystemException.class);
-			}
-			return null;
-		}).when(repository).delete(anyString());
+		when(repository.delete(anyString()))
+				.thenThrow(TransactionSystemException.class)
+				.thenReturn(true);
 
 		lock.unlock();
 
diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java
index bcd040e6514..394bf935e24 100644
--- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java
+++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2024 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
 package org.springframework.integration.jdbc.lock;
 
 import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -45,18 +46,20 @@
 import org.springframework.util.StopWatch;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * @author Dave Syer
  * @author Artem Bilan
  * @author Glenn Renfro
  * @author Alexandre Strubel
+ * @author Eddie Cho
  *
  * @since 4.3
  */
 @SpringJUnitConfig(locations = "JdbcLockRegistryTests-context.xml")
 @DirtiesContext
-public class JdbcLockRegistryDifferentClientTests {
+class JdbcLockRegistryDifferentClientTests {
 
 	private static final Log LOGGER = LogFactory.getLog(JdbcLockRegistryDifferentClientTests.class);
 
@@ -92,7 +95,7 @@ public void close() {
 	}
 
 	@Test
-	public void testSecondThreadLoses() throws Exception {
+	void testSecondThreadLoses() throws Exception {
 		for (int i = 0; i < 100; i++) {
 			final JdbcLockRegistry registry1 = this.registry;
 			final JdbcLockRegistry registry2 = this.child.getBean(JdbcLockRegistry.class);
@@ -129,7 +132,7 @@ public void testSecondThreadLoses() throws Exception {
 	}
 
 	@Test
-	public void testBothLock() throws Exception {
+	void testBothLock() throws Exception {
 		for (int i = 0; i < 100; i++) {
 			final JdbcLockRegistry registry1 = this.registry;
 			final JdbcLockRegistry registry2 = this.child.getBean(JdbcLockRegistry.class);
@@ -185,7 +188,7 @@ public void testBothLock() throws Exception {
 	}
 
 	@Test
-	public void testOnlyOneLock() throws Exception {
+	void testOnlyOneLock() throws Exception {
 		for (int i = 0; i < 100; i++) {
 			final BlockingQueue<String> locked = new LinkedBlockingQueue<>();
 			final CountDownLatch latch = new CountDownLatch(20);
@@ -231,7 +234,7 @@ public void testOnlyOneLock() throws Exception {
 	}
 
 	@Test
-	public void testExclusiveAccess() throws Exception {
+	void testExclusiveAccess() throws Exception {
 		DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
 		client1.setApplicationContext(this.context);
 		client1.afterPropertiesSet();
@@ -281,7 +284,7 @@ public void testExclusiveAccess() throws Exception {
 	}
 
 	@Test
-	public void testOutOfDateLockTaken() throws Exception {
+	void testOutOfDateLockTaken() throws Exception {
 		DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
 		client1.setTimeToLive(100);
 		client1.setApplicationContext(this.context);
@@ -314,7 +317,7 @@ public void testOutOfDateLockTaken() throws Exception {
 				});
 		assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
 		data.add(2);
-		lock1.unlock();
+		assertThatThrownBy(lock1::unlock).isInstanceOf(ConcurrentModificationException.class);
 		for (int i = 0; i < 2; i++) {
 			Integer integer = data.poll(10, TimeUnit.SECONDS);
 			assertThat(integer).isNotNull();
@@ -323,7 +326,7 @@ public void testOutOfDateLockTaken() throws Exception {
 	}
 
 	@Test
-	public void testRenewLock() throws Exception {
+	void testRenewLock() throws Exception {
 		DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
 		client1.setTimeToLive(500);
 		client1.setApplicationContext(this.context);
diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java
index 85879f923ac..e4190770f96 100644
--- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java
+++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java
@@ -16,6 +16,7 @@
 
 package org.springframework.integration.jdbc.lock;
 
+import java.util.ConcurrentModificationException;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
@@ -53,12 +54,13 @@
  * @author Stefan Vassilev
  * @author Alexandre Strubel
  * @author Unseok Kim
+ * @author Eddie Cho
  *
  * @since 4.3
  */
 @SpringJUnitConfig
 @DirtiesContext
-public class JdbcLockRegistryTests {
+class JdbcLockRegistryTests {
 
 	private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
 
@@ -84,7 +86,7 @@ public void clear() {
 	}
 
 	@Test
-	public void testLock() throws Exception {
+	void testLock() throws Exception {
 		for (int i = 0; i < 10; i++) {
 			Lock lock = this.registry.obtain("foo");
 			lock.lock();
@@ -102,7 +104,7 @@ public void testLock() throws Exception {
 	}
 
 	@Test
-	public void testLockInterruptibly() throws Exception {
+	void testLockInterruptibly() throws Exception {
 		for (int i = 0; i < 10; i++) {
 			Lock lock = this.registry.obtain("foo");
 			lock.lockInterruptibly();
@@ -116,7 +118,7 @@ public void testLockInterruptibly() throws Exception {
 	}
 
 	@Test
-	public void testReentrantLock() {
+	void testReentrantLock() {
 		for (int i = 0; i < 10; i++) {
 			Lock lock1 = this.registry.obtain("foo");
 			lock1.lock();
@@ -133,7 +135,7 @@ public void testReentrantLock() {
 	}
 
 	@Test
-	public void testReentrantLockInterruptibly() throws Exception {
+	void testReentrantLockInterruptibly() throws Exception {
 		for (int i = 0; i < 10; i++) {
 			Lock lock1 = this.registry.obtain("foo");
 			lock1.lockInterruptibly();
@@ -150,7 +152,7 @@ public void testReentrantLockInterruptibly() throws Exception {
 	}
 
 	@Test
-	public void testReentrantLockAfterExpiration() throws Exception {
+	void testReentrantLockAfterExpiration() throws Exception {
 		DefaultLockRepository client = new DefaultLockRepository(dataSource);
 		client.setTimeToLive(1);
 		client.setApplicationContext(this.context);
@@ -172,7 +174,7 @@ public void testReentrantLockAfterExpiration() throws Exception {
 	}
 
 	@Test
-	public void testTwoLocks() throws Exception {
+	void testTwoLocks() throws Exception {
 		for (int i = 0; i < 10; i++) {
 			Lock lock1 = this.registry.obtain("foo");
 			lock1.lockInterruptibly();
@@ -189,7 +191,7 @@ public void testTwoLocks() throws Exception {
 	}
 
 	@Test
-	public void testTwoThreadsSecondFailsToGetLock() throws Exception {
+	void testTwoThreadsSecondFailsToGetLock() throws Exception {
 		final Lock lock1 = this.registry.obtain("foo");
 		lock1.lockInterruptibly();
 		final AtomicBoolean locked = new AtomicBoolean();
@@ -215,7 +217,7 @@ public void testTwoThreadsSecondFailsToGetLock() throws Exception {
 	}
 
 	@Test
-	public void testTwoThreads() throws Exception {
+	void testTwoThreads() throws Exception {
 		final Lock lock1 = this.registry.obtain("foo");
 		final AtomicBoolean locked = new AtomicBoolean();
 		final CountDownLatch latch1 = new CountDownLatch(1);
@@ -247,7 +249,7 @@ public void testTwoThreads() throws Exception {
 	}
 
 	@Test
-	public void testTwoThreadsDifferentRegistries() throws Exception {
+	void testTwoThreadsDifferentRegistries() throws Exception {
 		for (int i = 0; i < 100; i++) {
 
 			final JdbcLockRegistry registry1 = new JdbcLockRegistry(this.client);
@@ -289,7 +291,7 @@ public void testTwoThreadsDifferentRegistries() throws Exception {
 	}
 
 	@Test
-	public void testTwoThreadsWrongOneUnlocks() throws Exception {
+	void testTwoThreadsWrongOneUnlocks() throws Exception {
 		final Lock lock = this.registry.obtain("foo");
 		lock.lockInterruptibly();
 		final AtomicBoolean locked = new AtomicBoolean();
@@ -314,7 +316,7 @@ public void testTwoThreadsWrongOneUnlocks() throws Exception {
 	}
 
 	@Test
-	public void testLockRenew() {
+	void testLockRenew() {
 		final Lock lock = this.registry.obtain("foo");
 
 		assertThat(lock.tryLock()).isTrue();
@@ -327,7 +329,7 @@ public void testLockRenew() {
 	}
 
 	@Test
-	public void testLockRenewLockNotOwned() {
+	void testLockRenewLockNotOwned() {
 		this.registry.obtain("foo");
 
 		assertThatExceptionOfType(IllegalMonitorStateException.class)
@@ -335,7 +337,7 @@ public void testLockRenewLockNotOwned() {
 	}
 
 	@Test
-	public void concurrentObtainCapacityTest() throws InterruptedException {
+	void concurrentObtainCapacityTest() throws InterruptedException {
 		final int KEY_CNT = 500;
 		final int CAPACITY_CNT = 179;
 		final int THREAD_CNT = 4;
@@ -371,7 +373,7 @@ public void concurrentObtainCapacityTest() throws InterruptedException {
 	}
 
 	@Test
-	public void concurrentObtainRemoveOrderTest() throws InterruptedException {
+	void concurrentObtainRemoveOrderTest() throws InterruptedException {
 		final int THREAD_CNT = 2;
 		final int DUMMY_LOCK_CNT = 3;
 
@@ -415,7 +417,7 @@ public void concurrentObtainRemoveOrderTest() throws InterruptedException {
 	}
 
 	@Test
-	public void concurrentObtainAccessRemoveOrderTest() throws InterruptedException {
+	void concurrentObtainAccessRemoveOrderTest() throws InterruptedException {
 		final int THREAD_CNT = 2;
 		final int DUMMY_LOCK_CNT = 3;
 
@@ -465,7 +467,7 @@ public void concurrentObtainAccessRemoveOrderTest() throws InterruptedException
 	}
 
 	@Test
-	public void setCapacityTest() {
+	void setCapacityTest() {
 		final int CAPACITY_CNT = 4;
 		registry.setCacheCapacity(CAPACITY_CNT);
 
@@ -506,6 +508,51 @@ void noTableThrowsExceptionOnStart() {
 		}
 	}
 
+	@Test
+	void testUnlockAfterLockStatusHasBeenExpiredAndLockHasBeenAcquiredByAnotherProcess() throws Exception {
+		int ttl = 100;
+		DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
+		client1.setApplicationContext(this.context);
+		client1.setTimeToLive(ttl);
+		client1.afterPropertiesSet();
+		client1.afterSingletonsInstantiated();
+		DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
+		client2.setApplicationContext(this.context);
+		client2.setTimeToLive(ttl);
+		client2.afterPropertiesSet();
+		client2.afterSingletonsInstantiated();
+		JdbcLockRegistry process1Registry = new JdbcLockRegistry(client1);
+		JdbcLockRegistry process2Registry = new JdbcLockRegistry(client2);
+		Lock lock1 = process1Registry.obtain("foo");
+		Lock lock2 = process2Registry.obtain("foo");
+
+		lock1.lock();
+		Thread.sleep(ttl);
+		assertThat(lock2.tryLock()).isTrue();
+
+		assertThatExceptionOfType(ConcurrentModificationException.class)
+				.isThrownBy(lock1::unlock);
+		lock2.unlock();
+	}
+
+	@Test
+	void testUnlockAfterLockStatusHasBeenExpiredAndDeleted() throws Exception {
+		DefaultLockRepository client = new DefaultLockRepository(dataSource);
+		client.setApplicationContext(this.context);
+		client.setTimeToLive(100);
+		client.afterPropertiesSet();
+		client.afterSingletonsInstantiated();
+		JdbcLockRegistry registry = new JdbcLockRegistry(client);
+		Lock lock = registry.obtain("foo");
+
+		lock.lock();
+		Thread.sleep(200);
+		client.deleteExpired();
+
+		assertThatExceptionOfType(ConcurrentModificationException.class)
+				.isThrownBy(lock::unlock);
+	}
+
 	@SuppressWarnings("unchecked")
 	private static Map<String, Lock> getRegistryLocks(JdbcLockRegistry registry) {
 		return TestUtils.getPropertyValue(registry, "locks", Map.class);
diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java
index 7f73eebbc7c..1b420adea7f 100644
--- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java
+++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java
@@ -18,6 +18,7 @@
 
 import java.text.SimpleDateFormat;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -500,12 +501,12 @@ private void removeLockKey() {
 					return;
 				}
 				else if (Boolean.FALSE.equals(unlinkResult)) {
-					throw new IllegalStateException("Lock was released in the store due to expiration. " +
+					throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
 							"The integrity of data protected by this lock may have been compromised.");
 				}
 			}
 			if (!removeLockKeyInnerDelete()) {
-				throw new IllegalStateException("Lock was released in the store due to expiration. " +
+				throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
 						"The integrity of data protected by this lock may have been compromised.");
 			}
 		}
diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java
index a0874de6202..b04659b6d0d 100644
--- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java
+++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java
@@ -16,6 +16,7 @@
 
 package org.springframework.integration.redis.util;
 
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -52,8 +53,8 @@
 import org.springframework.integration.test.util.TestUtils;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -64,6 +65,7 @@
  * @author Unseok Kim
  * @author Artem Vozhdayenko
  * @author Anton Gabov
+ * @author Eddie Cho
  *
  * @since 4.0
  *
@@ -115,6 +117,19 @@ void testLock(RedisLockType testRedisLockType) {
 		registry.destroy();
 	}
 
+	@ParameterizedTest
+	@EnumSource(RedisLockType.class)
+	void testUnlockAfterLockStatusHasBeenExpired(RedisLockType testRedisLockType) throws InterruptedException {
+		RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
+		registry.setRedisLockType(testRedisLockType);
+		Lock lock = registry.obtain("foo");
+		lock.lock();
+		Thread.sleep(200);
+
+		assertThatThrownBy(lock::unlock).isInstanceOf(ConcurrentModificationException.class);
+		registry.destroy();
+	}
+
 	@ParameterizedTest
 	@EnumSource(RedisLockType.class)
 	void testLockInterruptibly(RedisLockType testRedisLockType) throws Exception {
@@ -398,9 +413,9 @@ void testExceptionOnExpire(RedisLockType testRedisLockType) throws Exception {
 		Lock lock1 = registry.obtain("foo");
 		assertThat(lock1.tryLock()).isTrue();
 		waitForExpire("foo");
-		assertThatIllegalStateException()
-				.isThrownBy(lock1::unlock)
-				.withMessageContaining("Lock was released in the store due to expiration.");
+		assertThatThrownBy(lock1::unlock)
+				.isInstanceOf(ConcurrentModificationException.class)
+				.hasMessageContaining("Lock was released in the store due to expiration.");
 		registry.destroy();
 	}
 
diff --git a/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc b/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc
index 1147b5263fb..067a3e6c66e 100644
--- a/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc
+++ b/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc
@@ -57,3 +57,5 @@ For example, an insert query for PostgreSQL hint can be configured like this:
 lockRepository.setInsertQuery(lockRepository.getInsertQuery() + " ON CONFLICT DO NOTHING");
 ----
 
+Starting with version 6.4, the `LockRepository.delete()` method return the result of removing ownership of a distributed lock.
+And the `JdbcLockRegistry.JdbcLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
\ No newline at end of file
diff --git a/src/reference/antora/modules/ROOT/pages/redis.adoc b/src/reference/antora/modules/ROOT/pages/redis.adoc
index 251d27026ef..545777fb361 100644
--- a/src/reference/antora/modules/ROOT/pages/redis.adoc
+++ b/src/reference/antora/modules/ROOT/pages/redis.adoc
@@ -855,3 +855,5 @@ Default.
 
 The pub-sub is preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process.
 However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment.
+
+Starting with version 6.4, instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
\ No newline at end of file
diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc
index 9328aec8297..02dd1c76b81 100644
--- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc
+++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc
@@ -32,7 +32,15 @@ The `LobHandler` (and respective API) has been deprecated for removal in Spring
 Respective option on `JdbcMessageStore` (and similar) have been deprecated as well.
 The byte array handling for serialized message is fully deferred to JDBC driver.
 
+The `LockRepository.delete()` method return the result of removing ownership of a distributed lock.
+And the `JdbcLockRegistry.JdbcLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
+
 [[x6.4-zeromq-changes]]
 === ZeroMQ Changes
 
-The outbound component `ZeroMqMessageHandler` (and respective API) can now bind a TCP port instead of connecting to a given URL.
\ No newline at end of file
+The outbound component `ZeroMqMessageHandler` (and respective API) can now bind a TCP port instead of connecting to a given URL.
+
+[[x6.4-redis-changes]]
+=== Redis Changes
+
+Instead of throwing `IllegalStateException`, the `RedisLockRegistry.RedisLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired.
\ No newline at end of file