Skip to content

Shutdown executor and provide new endpoint constructor #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -16,4 +16,8 @@ public String getProtocol() {
}

public abstract Consumer<String> createConsumer(SubscriptionSession session);

public void shutdown() {
// do nothing
}
}
Original file line number Diff line number Diff line change
@@ -20,4 +20,8 @@ default void onStop(SubscriptionSession session, OperationMessage message) {
default void onTerminate(SubscriptionSession session, OperationMessage message) {
// do nothing
}
}

default void shutdown() {
// do nothing
}
}
Original file line number Diff line number Diff line change
@@ -61,4 +61,8 @@ void abort(SubscriptionSession session) {
future.cancel(true);
}
}

void shutdown() {
this.executor.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ public class ApolloSubscriptionProtocolFactory extends SubscriptionProtocolFacto
public static final int KEEP_ALIVE_INTERVAL = 15;
@Getter private final GraphQLObjectMapper objectMapper;
private final ApolloCommandProvider commandProvider;
private KeepAliveSubscriptionConnectionListener keepAlive;

public ApolloSubscriptionProtocolFactory(
GraphQLObjectMapper objectMapper,
@@ -67,7 +68,8 @@ public ApolloSubscriptionProtocolFactory(
if (keepAliveInterval != null
&& listeners.stream()
.noneMatch(KeepAliveSubscriptionConnectionListener.class::isInstance)) {
listeners.add(new KeepAliveSubscriptionConnectionListener(keepAliveInterval));
keepAlive = new KeepAliveSubscriptionConnectionListener(keepAliveInterval);
listeners.add(keepAlive);
}
commandProvider =
new ApolloCommandProvider(
@@ -81,4 +83,11 @@ public ApolloSubscriptionProtocolFactory(
public Consumer<String> createConsumer(SubscriptionSession session) {
return new ApolloSubscriptionConsumer(session, objectMapper, commandProvider);
}

@Override
public void shutdown() {
if (keepAlive != null) {
keepAlive.shutdown();
}
}
}
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
public class KeepAliveSubscriptionConnectionListener
implements ApolloSubscriptionConnectionListener {

private final ApolloSubscriptionKeepAliveRunner keepAliveRunner;
protected final ApolloSubscriptionKeepAliveRunner keepAliveRunner;

public KeepAliveSubscriptionConnectionListener() {
this(Duration.ofSeconds(15));
@@ -35,4 +35,10 @@ public void onStop(SubscriptionSession session, OperationMessage message) {
public void onTerminate(SubscriptionSession session, OperationMessage message) {
keepAliveRunner.abort(session);
}

@Override
public void shutdown() {
keepAliveRunner.shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -116,6 +116,24 @@ public GraphQLWebsocketServlet(
.collect(toList());
}

public GraphQLWebsocketServlet(
GraphQLInvoker graphQLInvoker,
GraphQLSubscriptionInvocationInputFactory invocationInputFactory,
GraphQLObjectMapper graphQLObjectMapper,
List<SubscriptionProtocolFactory> subscriptionProtocolFactory,
SubscriptionProtocolFactory fallbackSubscriptionProtocolFactory) {

this.subscriptionProtocolFactories = subscriptionProtocolFactory;
this.fallbackSubscriptionProtocolFactory = fallbackSubscriptionProtocolFactory;

allSubscriptionProtocols =
Stream.concat(
subscriptionProtocolFactories.stream(),
Stream.of(fallbackSubscriptionProtocolFactory))
.map(SubscriptionProtocolFactory::getProtocol)
.collect(toList());
}

@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
final WebSocketSubscriptionProtocolFactory subscriptionProtocolFactory =
@@ -234,6 +252,12 @@ public void beginShutDown() {
log.error("GraphQLWebsocketServlet did not shut down cleanly!");
sessionSubscriptionCache.clear();
}

for (SubscriptionProtocolFactory protocolFactory : subscriptionProtocolFactories) {
protocolFactory.shutdown();
}

fallbackSubscriptionProtocolFactory.shutdown();
}

isShutDown.set(true);