Skip to content

[DE-82] Feature/overload-control #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 14, 2022
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
if: ${{ cancelled() || failure() }}
uses: actions/upload-artifact@master
with:
name: logs.tgz
name: logs-${{github.job}}.tgz
path: ./logs.tgz

# test encodeURIComponent() and normalize('NFC') comparing to Javascript behavior
Expand Down
19 changes: 18 additions & 1 deletion src/main/java/com/arangodb/ArangoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,17 @@ public Builder loadBalancingStrategy(final LoadBalancingStrategy loadBalancingSt
return this;
}

/**
* Setting the amount of samples kept for queue time metrics
*
* @param responseQueueTimeSamples amount of samples to keep
* @return {@link ArangoDB.Builder}
*/
public Builder responseQueueTimeSamples(final Integer responseQueueTimeSamples) {
setResponseQueueTimeSamples(responseQueueTimeSamples);
return this;
}

/**
* Register a custom {@link VPackSerializer} for a specific type to be used within the internal serialization
* process.
Expand Down Expand Up @@ -658,7 +669,8 @@ public synchronized ArangoDB build() {
protocol,
hostResolver,
hostHandler,
new ArangoContext());
new ArangoContext(),
responseQueueTimeSamples, timeout);
}

}
Expand Down Expand Up @@ -705,6 +717,11 @@ default ArangoDatabase db(String name) {
*/
ArangoDatabase db(DbName dbName);

/**
* @return entry point for accessing client metrics
*/
ArangoMetrics metrics();

/**
* Creates a new database with the given name.
*
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/com/arangodb/ArangoMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* DISCLAIMER
*
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright holder is ArangoDB GmbH, Cologne, Germany
*/

package com.arangodb;

/**
* Interface for accessing metrics.
*
* @author Michele Rastelli
* @since ArangoDB 3.9
*/
public interface ArangoMetrics {
/**
* @return queue time metrics
*/
QueueTimeMetrics getQueueTime();
}
45 changes: 45 additions & 0 deletions src/main/java/com/arangodb/QueueTimeMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* DISCLAIMER
*
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright holder is ArangoDB GmbH, Cologne, Germany
*/

package com.arangodb;

import com.arangodb.model.QueueTimeSample;

/**
* Interface for accessing queue time latency metrics, reported by the "X-Arango-Queue-Time-Seconds" response header.
* This header contains the most recent request (de)queuing time (in seconds) as tracked by the server’s scheduler.
*
* @author Michele Rastelli
* @see <a href="https://www.arangodb.com/docs/stable/http/general.html#overload-control">API Documentation</a>
* @since ArangoDB 3.9
*/
public interface QueueTimeMetrics {

/**
* @return all the n values observed
*/
QueueTimeSample[] getValues();

/**
* @return the average of the last n values observed, 0.0 if no value has been observed (i.e. in ArangoDB versions
* prior to 3.9).
*/
double getAvg();
}
25 changes: 20 additions & 5 deletions src/main/java/com/arangodb/async/ArangoDBAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@

package com.arangodb.async;

import com.arangodb.ArangoDBException;
import com.arangodb.ArangoSerializationAccessor;
import com.arangodb.DbName;
import com.arangodb.Protocol;
import com.arangodb.*;
import com.arangodb.async.internal.ArangoDBAsyncImpl;
import com.arangodb.async.internal.velocystream.VstCommunicationAsync;
import com.arangodb.async.internal.velocystream.VstConnectionFactoryAsync;
Expand Down Expand Up @@ -110,6 +107,11 @@ default ArangoDatabaseAsync db(final String name) {
*/
ArangoDatabaseAsync db(final DbName dbName);

/**
* @return entry point for accessing client metrics
*/
ArangoMetrics metrics();

/**
* Creates a new database
*
Expand Down Expand Up @@ -497,6 +499,17 @@ public Builder acquireHostList(final Boolean acquireHostList) {
return this;
}

/**
* Setting the amount of samples kept for queue time metrics
*
* @param responseQueueTimeSamples amount of samples to keep
* @return {@link ArangoDBAsync.Builder}
*/
public Builder responseQueueTimeSamples(final Integer responseQueueTimeSamples) {
setResponseQueueTimeSamples(responseQueueTimeSamples);
return this;
}

/**
* Sets the load balancing strategy to be used in an ArangoDB cluster setup.
* In case of Active-Failover deployment set to {@link LoadBalancingStrategy#NONE} or not set at all, since that
Expand Down Expand Up @@ -838,7 +851,9 @@ public synchronized ArangoDBAsync build() {
syncHostResolver,
asyncHostHandler,
syncHostHandler,
new ArangoContext());
new ArangoContext(),
responseQueueTimeSamples,
timeout);
}

private VstCommunicationAsync.Builder asyncBuilder(final HostHandler hostHandler) {
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/com/arangodb/async/internal/ArangoDBAsyncImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package com.arangodb.async.internal;

import com.arangodb.ArangoDBException;
import com.arangodb.ArangoMetrics;
import com.arangodb.DbName;
import com.arangodb.async.ArangoDBAsync;
import com.arangodb.async.ArangoDatabaseAsync;
Expand Down Expand Up @@ -68,18 +69,22 @@ public ArangoDBAsyncImpl(
final HostResolver syncHostResolver,
final HostHandler asyncHostHandler,
final HostHandler syncHostHandler,
final ArangoContext context
final ArangoContext context,
final int responseQueueTimeSamples,
final int timeoutMs
) {

super(new ArangoExecutorAsync(asyncCommBuilder.build(util.get(Serializer.INTERNAL)), util, new DocumentCache()), util, context);
super(new ArangoExecutorAsync(asyncCommBuilder.build(util.get(Serializer.INTERNAL)), util, new DocumentCache(),
new QueueTimeMetricsImpl(responseQueueTimeSamples), timeoutMs), util, context);

final VstCommunication<Response, VstConnectionSync> cacheCom = syncCommBuilder.build(util.get(Serializer.INTERNAL));

cp = new VstProtocol(cacheCom);
this.asyncHostHandler = asyncHostHandler;
this.syncHostHandler = syncHostHandler;

ArangoExecutorSync arangoExecutorSync = new ArangoExecutorSync(cp, util, new DocumentCache());
ArangoExecutorSync arangoExecutorSync = new ArangoExecutorSync(cp, util, new DocumentCache(),
new QueueTimeMetricsImpl(responseQueueTimeSamples), timeoutMs);
asyncHostResolver.init(arangoExecutorSync, util.get(Serializer.INTERNAL));
syncHostResolver.init(arangoExecutorSync, util.get(Serializer.INTERNAL));

Expand Down Expand Up @@ -121,6 +126,11 @@ public ArangoDatabaseAsync db(final DbName name) {
return new ArangoDatabaseAsyncImpl(this, name);
}

@Override
public ArangoMetrics metrics() {
return new ArangoMetricsImpl(executor.getQueueTimeMetrics());
}

@Override
public CompletableFuture<Boolean> createDatabase(final DbName name) {
return createDatabase(new DBCreateOptions().name(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.arangodb.async.internal.velocystream.VstCommunicationAsync;
import com.arangodb.internal.ArangoExecutor;
import com.arangodb.internal.DocumentCache;
import com.arangodb.internal.QueueTimeMetricsImpl;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.util.ArangoSerializationFactory;
import com.arangodb.velocystream.Request;
Expand All @@ -44,8 +45,8 @@ public class ArangoExecutorAsync extends ArangoExecutor {
private final ExecutorService outgoingExecutor = Executors.newSingleThreadExecutor();

public ArangoExecutorAsync(final VstCommunicationAsync communication, final ArangoSerializationFactory util,
final DocumentCache documentCache) {
super(util, documentCache);
final DocumentCache documentCache, final QueueTimeMetricsImpl qtMetrics, final int timeoutMs) {
super(util, documentCache, qtMetrics, timeoutMs);
this.communication = communication;
}

Expand All @@ -67,8 +68,11 @@ private <T> CompletableFuture<T> execute(
final HostHandle hostHandle) {

return CompletableFuture.completedFuture(null)
.thenComposeAsync((it) -> communication.execute(request, hostHandle), outgoingExecutor)
.thenApplyAsync(responseDeserializer::deserialize);
.thenComposeAsync((it) -> communication.execute(interceptRequest(request), hostHandle), outgoingExecutor)
.thenApplyAsync(response -> {
interceptResponse(response);
return responseDeserializer.deserialize(response);
});
}

public void disconnect() {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/arangodb/entity/CursorEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public Map<String, String> getMeta() {
public Map<String, String> cleanupMeta(Map<String, String> meta) {
meta.remove("Content-Length");
meta.remove("Transfer-Encoding");
meta.remove("X-Arango-Queue-Time-Seconds");
return meta;
}

Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/arangodb/internal/ArangoDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ public class ArangoDBImpl extends InternalArangoDB<ArangoExecutorSync> implement

public ArangoDBImpl(final VstCommunicationSync.Builder vstBuilder, final HttpCommunication.Builder httpBuilder,
final ArangoSerializationFactory util, final Protocol protocol, final HostResolver hostResolver,
final HostHandler hostHandler, final ArangoContext context) {
final HostHandler hostHandler, final ArangoContext context, int responseQueueTimeSamples, final int timeoutMs) {

super(new ArangoExecutorSync(
createProtocol(vstBuilder, httpBuilder, util.get(Serializer.INTERNAL), protocol),
util,
new DocumentCache()),
new DocumentCache(), new QueueTimeMetricsImpl(responseQueueTimeSamples), timeoutMs),
util,
context);

Expand Down Expand Up @@ -140,6 +140,11 @@ public ArangoDatabase db(final DbName dbName) {
return new ArangoDatabaseImpl(this, dbName).setCursorInitializer(cursorInitializer);
}

@Override
public ArangoMetrics metrics() {
return new ArangoMetricsImpl(executor.getQueueTimeMetrics());
}

@Override
public Boolean createDatabase(final DbName dbName) throws ArangoDBException {
return createDatabase(new DBCreateOptions().name(dbName));
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/arangodb/internal/ArangoDefaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ private ArangoDefaults() {
public static final boolean DEFAULT_ACQUIRE_HOST_LIST = false;
public static final int DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour
public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE;
public static final int DEFAULT_RESPONSE_QUEUE_TIME_SAMPLES = 10;

}
1 change: 1 addition & 0 deletions src/main/java/com/arangodb/internal/ArangoErrors.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ private ArangoErrors() {
public static final Integer ERROR_ARANGO_DATA_SOURCE_NOT_FOUND = 1203;
public static final Integer ERROR_ARANGO_DATABASE_NOT_FOUND = 1228;
public static final Integer ERROR_GRAPH_NOT_FOUND = 1924;
public static final Integer QUEUE_TIME_VIOLATED = 21004;

}
24 changes: 23 additions & 1 deletion src/main/java/com/arangodb/internal/ArangoExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

package com.arangodb.internal;

import com.arangodb.QueueTimeMetrics;
import com.arangodb.entity.Entity;
import com.arangodb.internal.util.ArangoSerializationFactory;
import com.arangodb.internal.util.ArangoSerializationFactory.Serializer;
import com.arangodb.velocypack.exception.VPackException;
import com.arangodb.velocystream.Request;
import com.arangodb.velocystream.Response;

import java.lang.reflect.ParameterizedType;
Expand Down Expand Up @@ -69,12 +71,17 @@ private boolean isInternal(final Type type) {
}

private final DocumentCache documentCache;
private final QueueTimeMetricsImpl qtMetrics;
private final ArangoSerializationFactory util;
private final String timeoutS;

protected ArangoExecutor(final ArangoSerializationFactory util, final DocumentCache documentCache) {
protected ArangoExecutor(final ArangoSerializationFactory util, final DocumentCache documentCache,
final QueueTimeMetricsImpl qtMetrics, final int timeoutMs) {
super();
this.documentCache = documentCache;
this.qtMetrics = qtMetrics;
this.util = util;
timeoutS = timeoutMs >= 1000 ? Integer.toString(timeoutMs / 1000) : null;
}

public DocumentCache documentCache() {
Expand All @@ -85,4 +92,19 @@ public interface ResponseDeserializer<T> {
T deserialize(Response response) throws VPackException;
}

protected final void interceptResponse(Response response) {
String queueTime = response.getMeta().get("X-Arango-Queue-Time-Seconds");
if (queueTime != null) {
qtMetrics.add(Double.parseDouble(queueTime));
}
}

protected final Request interceptRequest(Request request) {
request.putHeaderParam("X-Arango-Queue-Time-Seconds", timeoutS);
return request;
}

public QueueTimeMetrics getQueueTimeMetrics() {
return qtMetrics;
}
}
7 changes: 4 additions & 3 deletions src/main/java/com/arangodb/internal/ArangoExecutorSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class ArangoExecutorSync extends ArangoExecutor {
private final CommunicationProtocol protocol;

public ArangoExecutorSync(final CommunicationProtocol protocol, final ArangoSerializationFactory util,
final DocumentCache documentCache) {
super(util, documentCache);
final DocumentCache documentCache, final QueueTimeMetricsImpl qtMetrics, final int timeoutMs) {
super(util, documentCache, qtMetrics, timeoutMs);
this.protocol = protocol;
}

Expand All @@ -68,7 +68,8 @@ public <T> T execute(

try {

final Response response = protocol.execute(request, hostHandle);
final Response response = protocol.execute(interceptRequest(request), hostHandle);
interceptResponse(response);
T deserialize = responseDeserializer.deserialize(response);

if (deserialize instanceof MetaAware) {
Expand Down
Loading