diff --git a/docker/start_db.sh b/docker/start_db.sh index 02f48db2e..9c01447d0 100755 --- a/docker/start_db.sh +++ b/docker/start_db.sh @@ -96,3 +96,9 @@ for a in ${COORDINATORS[*]} ; do echo "$SCHEME://$a" echo "" done + +if [ "$STARTER_MODE" == "activefailover" ]; then + LEADER=$("$LOCATION"/find_active_endpoint.sh) + echo "Leader: $SCHEME://$LEADER" + echo "" +fi diff --git a/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java b/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java index f07eeba2a..38acb45be 100644 --- a/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java +++ b/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java @@ -85,8 +85,7 @@ protected CompletableFuture execute(final Request request, final VstCo } final String location = e.getLocation(); final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.closeCurrentOnError(); - hostHandler.fail(e); + hostHandler.failIfNotMatch(redirectHost, e); execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1) .whenComplete((v, err) -> { if (v != null) { diff --git a/src/main/java/com/arangodb/internal/http/HttpCommunication.java b/src/main/java/com/arangodb/internal/http/HttpCommunication.java index c40106b8a..5468bbbbb 100644 --- a/src/main/java/com/arangodb/internal/http/HttpCommunication.java +++ b/src/main/java/com/arangodb/internal/http/HttpCommunication.java @@ -106,8 +106,7 @@ private Response execute(final Request request, final HostHandle hostHandle, fin if (e instanceof ArangoDBRedirectException && attemptCount < 3) { final String location = ((ArangoDBRedirectException) e).getLocation(); final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.closeCurrentOnError(); - hostHandler.fail(e); + hostHandler.failIfNotMatch(redirectHost, e); return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1); } else { throw e; diff --git a/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java b/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java index d8e3f0033..db3879d80 100644 --- a/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java @@ -60,6 +60,11 @@ public void fail(Exception exception) { determineHostHandler().fail(exception); } + @Override + public void failIfNotMatch(HostDescription host, Exception exception) { + determineHostHandler().failIfNotMatch(host, exception); + } + @Override public void reset() { determineHostHandler().reset(); @@ -81,6 +86,11 @@ public void closeCurrentOnError() { determineHostHandler().closeCurrentOnError(); } + @Override + public void closeCurrentOnErrorIfNotMatch(HostDescription host) { + determineHostHandler().closeCurrentOnErrorIfNotMatch(host); + } + @Override public void setJwt(String jwt) { master.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java b/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java index a1427d426..241a1b270 100644 --- a/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java @@ -79,6 +79,13 @@ public void fail(Exception exception) { lastFailExceptions.add(exception); } + @Override + public synchronized void failIfNotMatch(HostDescription host, Exception exception) { + if (!host.equals(current.getDescription())) { + fail(exception); + } + } + @Override public void reset() { iterations = 0; @@ -104,6 +111,13 @@ public void closeCurrentOnError() { current.closeOnError(); } + @Override + public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) { + if (!host.equals(current.getDescription())) { + closeCurrentOnError(); + } + } + @Override public void setJwt(String jwt) { hosts.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/net/HostHandler.java b/src/main/java/com/arangodb/internal/net/HostHandler.java index 42c1e7570..1a3ad8d92 100644 --- a/src/main/java/com/arangodb/internal/net/HostHandler.java +++ b/src/main/java/com/arangodb/internal/net/HostHandler.java @@ -33,6 +33,8 @@ public interface HostHandler { void fail(Exception exception); + void failIfNotMatch(HostDescription host, Exception exception); + void reset(); void confirm(); @@ -41,6 +43,8 @@ public interface HostHandler { void closeCurrentOnError(); + void closeCurrentOnErrorIfNotMatch(HostDescription host); + void setJwt(String jwt); } diff --git a/src/main/java/com/arangodb/internal/net/RandomHostHandler.java b/src/main/java/com/arangodb/internal/net/RandomHostHandler.java index 3ab1ce7da..7e7a4464b 100644 --- a/src/main/java/com/arangodb/internal/net/RandomHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/RandomHostHandler.java @@ -59,6 +59,13 @@ public void fail(Exception exception) { current = fallback.get(null, null); } + @Override + public synchronized void failIfNotMatch(HostDescription host, Exception exception) { + if (!host.equals(current.getDescription())) { + fail(exception); + } + } + private Host getRandomHost(final boolean initial, final boolean closeConnections) { hosts = resolver.resolve(initial, closeConnections); final ArrayList hostList = new ArrayList<>(hosts.getHostsList()); @@ -85,6 +92,13 @@ public void closeCurrentOnError() { current.closeOnError(); } + @Override + public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) { + if (!host.equals(current.getDescription())) { + closeCurrentOnError(); + } + } + @Override public void setJwt(String jwt) { fallback.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java b/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java index 99f3c24eb..b79c85c14 100644 --- a/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java @@ -89,6 +89,11 @@ public void fail(Exception exception) { lastFailExceptions.add(exception); } + @Override + public void failIfNotMatch(HostDescription host, Exception exception) { + fail(exception); + } + @Override public void reset() { fails = 0; @@ -109,6 +114,11 @@ public void closeCurrentOnError() { currentHost.closeOnError(); } + @Override + public void closeCurrentOnErrorIfNotMatch(HostDescription host) { + closeCurrentOnError(); + } + @Override public void setJwt(String jwt) { hosts.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java b/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java index 59a03a551..e01e22bc2 100644 --- a/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java +++ b/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java @@ -151,8 +151,7 @@ protected Response execute(final Request request, final VstConnectionSync connec } final String location = e.getLocation(); final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.closeCurrentOnError(); - hostHandler.fail(e); + hostHandler.failIfNotMatch(redirectHost, e); return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1); } } diff --git a/src/test/java/com/arangodb/ArangoDBTest.java b/src/test/java/com/arangodb/ArangoDBTest.java index ca05e060f..ae951f34f 100644 --- a/src/test/java/com/arangodb/ArangoDBTest.java +++ b/src/test/java/com/arangodb/ArangoDBTest.java @@ -44,6 +44,9 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -725,13 +728,15 @@ public void accessMultipleDatabases() { } @Test - public void queueTime() throws InterruptedException { - List threads = IntStream.range(0, 80) - .mapToObj(__ -> new Thread(() -> arangoDB.db().query("RETURN SLEEP(1)", Void.class))) + public void queueTime() throws InterruptedException, ExecutionException { + List> futures = IntStream.range(0, 80) + .mapToObj(i -> CompletableFuture.runAsync( + () -> arangoDB.db().query("RETURN SLEEP(1)", Void.class), + Executors.newFixedThreadPool(80)) + ) .collect(Collectors.toList()); - threads.forEach(Thread::start); - for (Thread it : threads) { - it.join(); + for (CompletableFuture f : futures) { + f.get(); } QueueTimeMetrics qt = arangoDB.metrics().getQueueTime(); diff --git a/src/test/java/com/arangodb/ConcurrencyTests.java b/src/test/java/com/arangodb/ConcurrencyTests.java new file mode 100644 index 000000000..62645cb82 --- /dev/null +++ b/src/test/java/com/arangodb/ConcurrencyTests.java @@ -0,0 +1,43 @@ +package com.arangodb; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@RunWith(Parameterized.class) +public class ConcurrencyTests { + + final Protocol protocol; + + public ConcurrencyTests(Protocol protocol) { + this.protocol = protocol; + } + + @Parameterized.Parameters + public static Protocol[] protocols() { + return Protocol.values(); + } + + @Test + public void concurrentPendingRequests() throws ExecutionException, InterruptedException { + ArangoDB adb = new ArangoDB.Builder().useProtocol(protocol).build(); + List> futures = IntStream.range(0, 10) + .mapToObj(i -> CompletableFuture.runAsync( + () -> adb.db().query("RETURN SLEEP(1)", Void.class), + Executors.newFixedThreadPool(10)) + ) + .collect(Collectors.toList()); + for (CompletableFuture f : futures) { + f.get(); + } + adb.shutdown(); + } + +} diff --git a/src/test/java/com/arangodb/async/ConcurrencyTests.java b/src/test/java/com/arangodb/async/ConcurrencyTests.java new file mode 100644 index 000000000..746ef2923 --- /dev/null +++ b/src/test/java/com/arangodb/async/ConcurrencyTests.java @@ -0,0 +1,25 @@ +package com.arangodb.async; + +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ConcurrencyTests { + + @Test + public void concurrentPendingRequests() throws ExecutionException, InterruptedException { + ArangoDBAsync adb = new ArangoDBAsync.Builder().build(); + List>> reqs = IntStream.range(0, 10) + .mapToObj(__ -> adb.db().query("RETURN SLEEP(1)", Void.class)) + .collect(Collectors.toList()); + for (CompletableFuture> req : reqs) { + req.get(); + } + adb.shutdown(); + } + +}