Skip to content

Commit 33682d7

Browse files
committed
ReactiveMessageChannel and ReactiveSubscribableChannel
See gh-21987
1 parent ceccd9f commit 33682d7

File tree

3 files changed

+182
-0
lines changed

3 files changed

+182
-0
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging;
17+
18+
import reactor.core.publisher.Mono;
19+
20+
/**
21+
* Contract for reactive, non-blocking sending of messages.
22+
*
23+
* @author Rossen Stoyanchev
24+
* @since 5.2
25+
*/
26+
public interface ReactiveMessageChannel {
27+
28+
/**
29+
* Send a {@link Message} to this channel. If the message is sent
30+
* successfully, return {@code true}. Or if not sent due to a non-fatal
31+
* reason, return {@code false}.
32+
* @param message the message to send
33+
* @return completion {@link Mono} returning {@code true} on success,
34+
* {@code false} if not sent, or an error signal.
35+
*/
36+
Mono<Boolean> send(Message<?> message);
37+
38+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging;
18+
19+
/**
20+
* {@link MessageChannel} that maintains a registry of subscribers to handle
21+
* messages sent through this channel.
22+
*
23+
* @author Rossen Stoyanchev
24+
* @since 5.2
25+
*/
26+
public interface ReactiveSubscribableChannel extends ReactiveMessageChannel {
27+
28+
/**
29+
* Register a message handler.
30+
* @return {@code true} if the handler was subscribed or {@code false} if it
31+
* was already subscribed.
32+
*/
33+
boolean subscribe(ReactiveMessageHandler handler);
34+
35+
/**
36+
* Un-register a message handler.
37+
* @return {@code true} if the handler was un-registered, or {@code false}
38+
* if was not registered.
39+
*/
40+
boolean unsubscribe(ReactiveMessageHandler handler);
41+
42+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.support;
17+
18+
import java.util.Set;
19+
import java.util.concurrent.CopyOnWriteArraySet;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.Mono;
25+
26+
import org.springframework.beans.factory.BeanNameAware;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.messaging.ReactiveMessageHandler;
29+
import org.springframework.messaging.ReactiveSubscribableChannel;
30+
import org.springframework.util.ObjectUtils;
31+
32+
/**
33+
* Default implementation of {@link ReactiveSubscribableChannel}.
34+
*
35+
* @author Rossen Stoyanchev
36+
* @since 5.2
37+
*/
38+
public class DefaultReactiveMessageChannel implements ReactiveSubscribableChannel, BeanNameAware {
39+
40+
private static final Mono<Boolean> SUCCESS_RESULT = Mono.just(true);
41+
42+
private static Log logger = LogFactory.getLog(DefaultReactiveMessageChannel.class);
43+
44+
45+
private final Set<ReactiveMessageHandler> handlers = new CopyOnWriteArraySet<>();
46+
47+
private String beanName;
48+
49+
50+
public DefaultReactiveMessageChannel() {
51+
this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
52+
}
53+
54+
55+
/**
56+
* A message channel uses the bean name primarily for logging purposes.
57+
*/
58+
@Override
59+
public void setBeanName(String name) {
60+
this.beanName = name;
61+
}
62+
63+
/**
64+
* Return the bean name for this message channel.
65+
*/
66+
public String getBeanName() {
67+
return this.beanName;
68+
}
69+
70+
71+
@Override
72+
public boolean subscribe(ReactiveMessageHandler handler) {
73+
boolean result = this.handlers.add(handler);
74+
if (result) {
75+
if (logger.isDebugEnabled()) {
76+
logger.debug(getBeanName() + " added " + handler);
77+
}
78+
}
79+
return result;
80+
}
81+
82+
83+
@Override
84+
public boolean unsubscribe(ReactiveMessageHandler handler) {
85+
boolean result = this.handlers.remove(handler);
86+
if (result) {
87+
if (logger.isDebugEnabled()) {
88+
logger.debug(getBeanName() + " removed " + handler);
89+
}
90+
}
91+
return result;
92+
}
93+
94+
95+
@Override
96+
public Mono<Boolean> send(Message<?> message) {
97+
return Flux.fromIterable(this.handlers)
98+
.concatMap(handler -> handler.handleMessage(message))
99+
.then(SUCCESS_RESULT);
100+
}
101+
102+
}

0 commit comments

Comments
 (0)