Skip to content

Commit 55e8be4

Browse files
authored
[DE-528] non-blocking acquireHostList (#521)
* HostResolver refactoring * HostResolver refactoring * HostResolver refactoring * HostResolver refactoring * HostResolver refactoring * HostResolver refactoring * reverted acquireHostList=true in tests * VST outgoing executor (DE-696)
1 parent b73668e commit 55e8be4

24 files changed

+104
-181
lines changed

core/src/main/java/com/arangodb/ArangoDB.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.arangodb.entity.*;
2424
import com.arangodb.internal.ArangoDBImpl;
25+
import com.arangodb.internal.ArangoExecutorSync;
2526
import com.arangodb.internal.InternalArangoDBBuilder;
2627
import com.arangodb.internal.net.*;
2728
import com.arangodb.model.*;
@@ -363,10 +364,13 @@ public ArangoDB build() {
363364
HostHandler hostHandler = createHostHandler(hostResolver);
364365
hostHandler.setJwt(config.getJwt());
365366

367+
CommunicationProtocol protocol = protocolProvider.createProtocol(config, hostHandler);
368+
ArangoExecutorSync executor = new ArangoExecutorSync(protocol, config);
369+
hostResolver.init(executor, config.getInternalSerde());
370+
366371
return new ArangoDBImpl(
367372
config,
368-
hostResolver,
369-
protocolProvider,
373+
protocol,
370374
hostHandler
371375
);
372376
}

core/src/main/java/com/arangodb/ArangoDBException.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public ArangoDBException(final ErrorEntity errorEntity) {
3939
super(String.format("Response: %s, Error: %s - %s", errorEntity.getCode(), errorEntity.getErrorNum(),
4040
errorEntity.getErrorMessage()));
4141
this.entity = errorEntity;
42-
this.responseCode = null;
42+
this.responseCode = errorEntity.getCode();
4343
this.requestId = null;
4444
}
4545

@@ -117,6 +117,9 @@ public static ArangoDBException of(Throwable t, Long requestId) {
117117

118118
private static ArangoDBException of(String message, Throwable t, Long requestId) {
119119
Objects.requireNonNull(t);
120+
if (t instanceof CompletionException) {
121+
return of(message, t.getCause(), requestId);
122+
}
120123
Throwable cause = unwrapCause(t);
121124
String msg = message != null ? message
122125
: t.getMessage() != null ? t.getMessage()
@@ -141,7 +144,7 @@ private static ArangoDBException of(String message, Throwable t, Long requestId)
141144
}
142145

143146
private static Throwable unwrapCause(Throwable t) {
144-
if (t instanceof ArangoDBException || t instanceof CompletionException) {
147+
if (t instanceof ArangoDBException) {
145148
return unwrapCause(t.getCause());
146149
}
147150
return t;

core/src/main/java/com/arangodb/internal/ArangoDBImpl.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
import com.arangodb.*;
2424
import com.arangodb.entity.*;
2525
import com.arangodb.internal.config.ArangoConfig;
26+
import com.arangodb.internal.net.CommunicationProtocol;
2627
import com.arangodb.internal.net.HostHandler;
27-
import com.arangodb.internal.net.HostResolver;
28-
import com.arangodb.internal.net.ProtocolProvider;
2928
import com.arangodb.internal.serde.SerdeUtils;
3029
import com.arangodb.model.*;
3130
import org.slf4j.Logger;
@@ -44,11 +43,10 @@ public class ArangoDBImpl extends InternalArangoDB implements ArangoDB {
4443
private final HostHandler hostHandler;
4544

4645
public ArangoDBImpl(final ArangoConfig config,
47-
final HostResolver hostResolver, final ProtocolProvider protocolProvider,
46+
final CommunicationProtocol protocol,
4847
final HostHandler hostHandler) {
49-
super(protocolProvider.createProtocol(config, hostHandler), config, config.getInternalSerde());
48+
super(protocol, config);
5049
this.hostHandler = hostHandler;
51-
hostResolver.init(executorSync(), getSerde());
5250
LOGGER.debug("ArangoDB Client is ready to use");
5351
}
5452

core/src/main/java/com/arangodb/internal/ArangoExecuteable.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,8 @@ public abstract class ArangoExecuteable implements ArangoSerdeAccessor {
3737
private final ArangoExecutorAsync executorAsync;
3838
private final InternalSerde serde;
3939

40-
protected ArangoExecuteable(final CommunicationProtocol protocol,
41-
final ArangoConfig config,
42-
final InternalSerde serde) {
43-
this(new ArangoExecutorSync(protocol, config), new ArangoExecutorAsync(protocol, config), serde);
40+
protected ArangoExecuteable(final CommunicationProtocol protocol, final ArangoConfig config) {
41+
this(new ArangoExecutorSync(protocol, config), new ArangoExecutorAsync(protocol, config), config.getInternalSerde());
4442
}
4543

4644
protected ArangoExecuteable(final ArangoExecuteable other) {

core/src/main/java/com/arangodb/internal/InternalArangoDB.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public abstract class InternalArangoDB extends ArangoExecuteable {
5050
private static final String PATH_API_USER = "/_api/user";
5151
private static final String PATH_API_QUERY_RULES = "/_api/query/rules";
5252

53-
protected InternalArangoDB(final CommunicationProtocol protocol, final ArangoConfig config, final InternalSerde util) {
54-
super(protocol, config, util);
53+
protected InternalArangoDB(final CommunicationProtocol protocol, final ArangoConfig config) {
54+
super(protocol, config);
5555
}
5656

5757
protected InternalArangoDB(final ArangoExecuteable other) {

core/src/main/java/com/arangodb/internal/InternalArangoDBBuilder.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,9 @@ protected HostResolver createHostResolver(final Collection<Host> hosts, final Co
337337
LOG.debug("Use SimpleHostResolver");
338338
return new SimpleHostResolver(new ArrayList<>(hosts));
339339
}
340-
341340
}
342341

343-
protected <C extends Connection> Collection<Host> createHostList(final ConnectionFactory connectionFactory) {
342+
protected Collection<Host> createHostList(final ConnectionFactory connectionFactory) {
344343
final Collection<Host> hostList = new ArrayList<>();
345344
for (final HostDescription host : config.getHosts()) {
346345
hostList.add(HostUtils.createHost(host, config, connectionFactory));

core/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java

-15
Original file line numberDiff line numberDiff line change
@@ -72,27 +72,12 @@ public void reset() {
7272
determineHostHandler().reset();
7373
}
7474

75-
@Override
76-
public void confirm() {
77-
determineHostHandler().confirm();
78-
}
79-
8075
@Override
8176
public void close() throws IOException {
8277
master.close();
8378
follower.close();
8479
}
8580

86-
@Override
87-
public void closeCurrentOnError() {
88-
determineHostHandler().closeCurrentOnError();
89-
}
90-
91-
@Override
92-
public void closeCurrentOnErrorIfNotMatch(HostDescription host) {
93-
determineHostHandler().closeCurrentOnErrorIfNotMatch(host);
94-
}
95-
9681
@Override
9782
public void setJwt(String jwt) {
9883
master.setJwt(jwt);

core/src/main/java/com/arangodb/internal/net/ExtendedHostResolver.java

+23-20
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.util.*;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.ScheduledExecutorService;
38+
import java.util.concurrent.ScheduledFuture;
39+
import java.util.concurrent.TimeUnit;
3640

3741
import static com.arangodb.internal.serde.SerdeUtils.constructParametricType;
3842

@@ -48,10 +52,10 @@ public class ExtendedHostResolver implements HostResolver {
4852
private final ArangoConfig config;
4953
private final ConnectionFactory connectionFactory;
5054
private final Integer acquireHostListInterval;
51-
private long lastUpdate;
55+
private final ScheduledExecutorService scheduler;
5256
private ArangoExecutorSync executor;
5357
private InternalSerde arangoSerialization;
54-
58+
private ScheduledFuture<?> schedule;
5559

5660
public ExtendedHostResolver(final List<Host> hosts, final ArangoConfig config,
5761
final ConnectionFactory connectionFactory, Integer acquireHostListInterval) {
@@ -61,22 +65,34 @@ public ExtendedHostResolver(final List<Host> hosts, final ArangoConfig config,
6165
this.config = config;
6266
this.connectionFactory = connectionFactory;
6367

64-
lastUpdate = 0;
68+
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
69+
Thread t = Executors.defaultThreadFactory().newThread(r);
70+
t.setDaemon(true);
71+
return t;
72+
}
73+
);
6574
}
6675

6776
@Override
6877
public void init(ArangoExecutorSync executor, InternalSerde arangoSerialization) {
6978
this.executor = executor;
7079
this.arangoSerialization = arangoSerialization;
80+
resolve();
81+
schedule = scheduler.scheduleAtFixedRate(this::resolve, acquireHostListInterval, acquireHostListInterval, TimeUnit.MILLISECONDS);
7182
}
7283

7384
@Override
74-
public HostSet resolve(boolean initial, boolean closeConnections) {
75-
76-
if (!initial && isExpired()) {
85+
public void close() {
86+
schedule.cancel(false);
87+
scheduler.shutdown();
88+
}
7789

78-
lastUpdate = System.currentTimeMillis();
90+
@Override
91+
public HostSet getHosts() {
92+
return hosts;
93+
}
7994

95+
private void resolve() {
8096
final Collection<String> endpoints = resolveFromServer();
8197
if (LOGGER.isDebugEnabled()) {
8298
LOGGER.debug("Resolve {} Endpoints", endpoints.size());
@@ -110,17 +126,11 @@ public HostSet resolve(boolean initial, boolean closeConnections) {
110126
}
111127
}
112128
hosts.clearAllMarkedForDeletion();
113-
}
114-
115-
return hosts;
116129
}
117130

118131
private Collection<String> resolveFromServer() {
119-
120132
Collection<String> response;
121-
122133
try {
123-
124134
response = executor.execute(
125135
new InternalRequest(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints"),
126136
response1 -> {
@@ -136,7 +146,6 @@ private Collection<String> resolveFromServer() {
136146
}, null);
137147
} catch (final ArangoDBException e) {
138148
final Integer responseCode = e.getResponseCode();
139-
140149
// responseCode == 403: single server < 3.7
141150
// responseCode == 501: single server >= 3.7
142151
if (responseCode != null && (responseCode == 403 || responseCode == 501)) {
@@ -145,12 +154,6 @@ private Collection<String> resolveFromServer() {
145154
throw e;
146155
}
147156
}
148-
149157
return response;
150158
}
151-
152-
private boolean isExpired() {
153-
return System.currentTimeMillis() > (lastUpdate + acquireHostListInterval);
154-
}
155-
156159
}

core/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java

+3-25
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,14 @@ public class FallbackHostHandler implements HostHandler {
3737
private Host current;
3838
private Host lastSuccess;
3939
private int iterations;
40-
private boolean firstOpened;
4140
private HostSet hosts;
4241

4342
public FallbackHostHandler(final HostResolver resolver) {
4443
this.resolver = resolver;
4544
lastFailExceptions = new ArrayList<>();
4645
reset();
47-
hosts = resolver.resolve(true, false);
46+
hosts = resolver.getHosts();
4847
current = lastSuccess = hosts.getHostsList().get(0);
49-
firstOpened = true;
5048
}
5149

5250
@Override
@@ -69,7 +67,7 @@ public void success() {
6967

7068
@Override
7169
public void fail(Exception exception) {
72-
hosts = resolver.resolve(false, false);
70+
hosts = resolver.getHosts();
7371
final List<Host> hostList = hosts.getHostsList();
7472
final int index = hostList.indexOf(current) + 1;
7573
final boolean inBound = index < hostList.size();
@@ -93,30 +91,10 @@ public void reset() {
9391
lastFailExceptions.clear();
9492
}
9593

96-
@Override
97-
public void confirm() {
98-
if (firstOpened) {
99-
// after first successful established connection, update host list
100-
hosts = resolver.resolve(false, false);
101-
firstOpened = false;
102-
}
103-
}
104-
10594
@Override
10695
public void close() {
10796
hosts.close();
108-
}
109-
110-
@Override
111-
public void closeCurrentOnError() {
112-
current.closeOnError();
113-
}
114-
115-
@Override
116-
public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) {
117-
if (!host.equals(current.getDescription())) {
118-
closeCurrentOnError();
119-
}
97+
resolver.close();
12098
}
12199

122100
@Override

core/src/main/java/com/arangodb/internal/net/HostHandler.java

-6
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,8 @@ public interface HostHandler {
3939

4040
void reset();
4141

42-
void confirm();
43-
4442
void close() throws IOException;
4543

46-
void closeCurrentOnError();
47-
48-
void closeCurrentOnErrorIfNotMatch(HostDescription host);
49-
5044
void setJwt(String jwt);
5145

5246
}

core/src/main/java/com/arangodb/internal/net/HostResolver.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@
2828
*/
2929
public interface HostResolver {
3030

31-
void init(ArangoExecutorSync executorSync, InternalSerde arangoSerialization);
31+
default void init(ArangoExecutorSync executorSync, InternalSerde arangoSerialization) {
32+
}
3233

33-
HostSet resolve(boolean initial, boolean closeConnections);
34+
default void close() {
35+
}
36+
37+
HostSet getHosts();
3438

3539
}

core/src/main/java/com/arangodb/internal/net/RandomHostHandler.java

+6-20
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,15 @@ public RandomHostHandler(final HostResolver resolver, final HostHandler fallback
3939
super();
4040
this.resolver = resolver;
4141
this.fallback = fallback;
42-
current = getRandomHost(true, false);
42+
hosts = resolver.getHosts();
43+
current = getRandomHost();
4344
}
4445

4546
@Override
4647
public Host get(final HostHandle hostHandle, AccessType accessType) {
4748
if (current == null) {
48-
current = getRandomHost(false, true);
49+
hosts = resolver.getHosts();
50+
current = getRandomHost();
4951
}
5052
return current;
5153
}
@@ -68,8 +70,7 @@ public synchronized void failIfNotMatch(HostDescription host, Exception exceptio
6870
}
6971
}
7072

71-
private Host getRandomHost(final boolean initial, final boolean closeConnections) {
72-
hosts = resolver.resolve(initial, closeConnections);
73+
private Host getRandomHost() {
7374
final ArrayList<Host> hostList = new ArrayList<>(hosts.getHostsList());
7475
Collections.shuffle(hostList);
7576
return hostList.get(0);
@@ -80,25 +81,10 @@ public void reset() {
8081
fallback.reset();
8182
}
8283

83-
@Override
84-
public void confirm() {
85-
}
86-
8784
@Override
8885
public void close() {
8986
hosts.close();
90-
}
91-
92-
@Override
93-
public void closeCurrentOnError() {
94-
current.closeOnError();
95-
}
96-
97-
@Override
98-
public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) {
99-
if (!host.equals(current.getDescription())) {
100-
closeCurrentOnError();
101-
}
87+
resolver.close();
10288
}
10389

10490
@Override

0 commit comments

Comments
 (0)