Skip to content

Support Pub/Sub for static master/replica configuration. #2773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/antora/modules/ROOT/pages/redis/pubsub.adoc
Original file line number Diff line number Diff line change
@@ -12,6 +12,8 @@ This is an example of the pattern often called Publish/Subscribe (Pub/Sub for sh

The `org.springframework.data.redis.connection` and `org.springframework.data.redis.listener` packages provide the core functionality for Redis messaging.

For static Master/Replica configuration, Pub/Sub operations always use the first node because messages are not replicated between nodes.

[[redis:pubsub:publish]]
== Publishing (Sending Messages)

Original file line number Diff line number Diff line change
@@ -28,11 +28,12 @@
* Configuration class used for setting up {@link RedisConnection} via {@link RedisConnectionFactory} using the provided
* Master / Replica configuration to nodes know to not change address. Eg. when connecting to
* <a href="https://aws.amazon.com/documentation/elasticache/">AWS ElastiCache with Read Replicas</a>. <br/>
* Please also note that a Master/Replica connection cannot be used for Pub/Sub operations.
* Please also note that Pub/Sub operations for Master/Replica always use the first node.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Tamer Soliman
* @author Krzysztof Debski
* @since 2.1
*/
public class RedisStaticMasterReplicaConfiguration implements RedisConfiguration, StaticMasterReplicaConfiguration {
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
@@ -38,6 +39,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Krzysztof Debski
* @since 2.1
*/
class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider {
@@ -68,7 +70,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
throw new UnsupportedOperationException("Pub/Sub connections not supported with Master/Replica configurations");

return connectionType.cast(client.connectPubSub(codec, getPubSubUri()));
}

if (StatefulConnection.class.isAssignableFrom(connectionType)) {
@@ -85,6 +88,12 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider
@Override
public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) {

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {

return client.connectPubSubAsync(codec, getPubSubUri())
.thenApply(connectionType::cast);
}

if (StatefulConnection.class.isAssignableFrom(connectionType)) {

CompletableFuture<? extends StatefulRedisMasterReplicaConnection<?, ?>> connection = MasterReplica
@@ -99,4 +108,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider

throw new UnsupportedOperationException(String.format("Connection type %s not supported", connectionType));
}

private RedisURI getPubSubUri() {
return nodes.iterator().next();
}
}
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@
* @author Thomas Darimont
* @author Christoph Strobl
* @author Mark Paluch
* @author Krzysztof Debski
*/
@ExtendWith(LettuceConnectionFactoryExtension.class)
class LettuceConnectionFactoryTests {
@@ -427,7 +428,7 @@ void factoryUsesElastiCacheMasterReplicaConnections() {
}

@Test // DATAREDIS-1093
void pubSubDoesNotSupportMasterReplicaConnections() {
void factoryConnectionSupportsSubscriptionForMasterReplicaConnections() {

assumeTrue(String.format("No replicas connected to %s:%s.", SettingsUtils.getHost(), SettingsUtils.getPort()),
connection.info("replication").getProperty("connected_slaves", "0").compareTo("0") > 0);
@@ -441,10 +442,13 @@ void pubSubDoesNotSupportMasterReplicaConnections() {

RedisConnection connection = factory.getConnection();

assertThatThrownBy(() -> connection.pSubscribe((message, pattern) -> {}, "foo".getBytes()))
.isInstanceOf(RedisConnectionFailureException.class).hasCauseInstanceOf(UnsupportedOperationException.class);
try {
connection.pSubscribe((message, pattern) -> {}, "foo".getBytes());
assertThat(connection.isSubscribed()).isTrue();
} finally {
connection.close();
}

connection.close();
factory.destroy();
}