Skip to content

Commit 5fa7250

Browse files
committed
added content encoding compression to HttpConnection
1 parent d8ffcc2 commit 5fa7250

File tree

6 files changed

+301
-19
lines changed

6 files changed

+301
-19
lines changed

core/src/main/java/com/arangodb/ArangoDB.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ public Builder compressionThreshold(Integer threshold) {
656656
/**
657657
* Sets the compression level. (default: {@code 6})
658658
*
659-
* @param level compression level
659+
* @param level compression level between 0 and 9
660660
* @return {@link ArangoDB.Builder}
661661
* @since ArangoDB 3.12
662662
*/

http/src/main/java/com/arangodb/http/HttpConnection.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@
2020

2121
package com.arangodb.http;
2222

23-
import com.arangodb.ArangoDBException;
24-
import com.arangodb.ContentType;
25-
import com.arangodb.PackageVersion;
26-
import com.arangodb.Protocol;
23+
import com.arangodb.*;
2724
import com.arangodb.arch.UnstableApi;
2825
import com.arangodb.config.HostDescription;
26+
import com.arangodb.http.compression.Encoder;
2927
import com.arangodb.internal.InternalRequest;
3028
import com.arangodb.internal.InternalResponse;
3129
import com.arangodb.internal.RequestType;
@@ -37,6 +35,7 @@
3735
import io.netty.handler.ssl.ClientAuth;
3836
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
3937
import io.netty.handler.ssl.JdkSslContext;
38+
import io.vertx.core.MultiMap;
4039
import io.vertx.core.Vertx;
4140
import io.vertx.core.VertxOptions;
4241
import io.vertx.core.buffer.Buffer;
@@ -75,10 +74,12 @@ public class HttpConnection implements Connection {
7574
private static final String CONTENT_TYPE_VPACK = "application/x-velocypack";
7675
private static final String USER_AGENT = getUserAgent();
7776
private static final AtomicInteger THREAD_COUNT = new AtomicInteger();
78-
private final ContentType contentType;
7977
private String auth;
78+
private final int compressionThreshold;
79+
private final Encoder encoder;
8080
private final WebClient client;
8181
private final Integer timeout;
82+
private final MultiMap commonHeaders = MultiMap.caseInsensitiveMultiMap();
8283
private final Vertx vertx;
8384

8485
private static String getUserAgent() {
@@ -88,7 +89,23 @@ private static String getUserAgent() {
8889
HttpConnection(final ArangoConfig config, final HostDescription host) {
8990
super();
9091
Protocol protocol = config.getProtocol();
91-
contentType = ContentTypeFactory.of(protocol);
92+
ContentType contentType = ContentTypeFactory.of(protocol);
93+
if (contentType == ContentType.VPACK) {
94+
commonHeaders.add(HttpHeaders.ACCEPT.toString(), CONTENT_TYPE_VPACK);
95+
commonHeaders.add(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_VPACK);
96+
} else if (contentType == ContentType.JSON) {
97+
commonHeaders.add(HttpHeaders.ACCEPT.toString(), CONTENT_TYPE_APPLICATION_JSON_UTF8);
98+
commonHeaders.add(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_APPLICATION_JSON_UTF8);
99+
} else {
100+
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
101+
}
102+
compressionThreshold = config.getCompressionThreshold();
103+
Compression compression = config.getCompression();
104+
encoder = Encoder.of(compression, config.getCompressionLevel());
105+
if (encoder.getFormat() != null) {
106+
commonHeaders.add(HttpHeaders.ACCEPT_ENCODING.toString(), encoder.getFormat());
107+
}
108+
commonHeaders.add("x-arango-driver", USER_AGENT);
92109
timeout = config.getTimeout();
93110
vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1));
94111
vertx.runOnContext(e -> {
@@ -127,6 +144,9 @@ private static String getUserAgent() {
127144
.setDefaultHost(host.getHost())
128145
.setDefaultPort(host.getPort());
129146

147+
if (compression != Compression.NONE) {
148+
webClientOptions.setTryUseCompression(true);
149+
}
130150

131151
if (Boolean.TRUE.equals(config.getUseSsl())) {
132152
SSLContext ctx;
@@ -236,24 +256,20 @@ public void doExecute(@UnstableApi final InternalRequest request, @UnstableApi f
236256
HttpRequest<Buffer> httpRequest = client
237257
.request(requestTypeToHttpMethod(request.getRequestType()), path)
238258
.timeout(timeout);
239-
if (contentType == ContentType.VPACK) {
240-
httpRequest.putHeader("Accept", CONTENT_TYPE_VPACK);
241-
}
259+
260+
httpRequest.putHeaders(commonHeaders);
242261
addHeader(request, httpRequest);
243262
httpRequest.putHeader(HttpHeaders.AUTHORIZATION.toString(), auth);
244-
httpRequest.putHeader("x-arango-driver", USER_AGENT);
245263

246264
byte[] reqBody = request.getBody();
247265
Buffer buffer;
248-
if (reqBody != null) {
249-
buffer = Buffer.buffer(reqBody);
250-
if (contentType == ContentType.VPACK) {
251-
httpRequest.putHeader(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_VPACK);
252-
} else {
253-
httpRequest.putHeader(HttpHeaders.CONTENT_TYPE.toString(), CONTENT_TYPE_APPLICATION_JSON_UTF8);
254-
}
255-
} else {
266+
if (reqBody == null) {
256267
buffer = Buffer.buffer();
268+
} else if (reqBody.length > compressionThreshold) {
269+
httpRequest.putHeader(HttpHeaders.CONTENT_ENCODING.toString(), encoder.getFormat());
270+
buffer = encoder.encode(reqBody);
271+
} else {
272+
buffer = Buffer.buffer(reqBody);
257273
}
258274

259275
try {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.arangodb.http.compression;
2+
3+
import com.arangodb.Compression;
4+
import io.netty.handler.codec.compression.ZlibWrapper;
5+
import io.vertx.core.buffer.Buffer;
6+
7+
public interface Encoder {
8+
Buffer encode(byte[] data);
9+
10+
String getFormat();
11+
12+
static Encoder of(Compression compression, int level) {
13+
if (level < 0 || level > 9) {
14+
throw new IllegalArgumentException("compression level: " + level + " (expected: 0-9)");
15+
}
16+
17+
switch (compression) {
18+
case GZIP:
19+
return new ZlibEncoder(ZlibWrapper.GZIP, level, "gzip");
20+
case DEFLATE:
21+
return new ZlibEncoder(ZlibWrapper.ZLIB, level, "deflate");
22+
case NONE:
23+
return new NoopEncoder();
24+
default:
25+
throw new IllegalArgumentException("Unsupported compression: " + compression);
26+
}
27+
}
28+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2012 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.arangodb.http.compression;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
21+
import io.netty.handler.codec.compression.CompressionException;
22+
import io.netty.handler.codec.compression.ZlibWrapper;
23+
import io.netty.util.internal.*;
24+
import io.netty.util.internal.logging.InternalLogger;
25+
import io.netty.util.internal.logging.InternalLoggerFactory;
26+
27+
import java.util.zip.CRC32;
28+
import java.util.zip.Deflater;
29+
30+
/**
31+
* Compresses a {@link ByteBuf} using the deflate algorithm.
32+
*/
33+
class JdkZlibEncoder {
34+
35+
private static final InternalLogger logger = InternalLoggerFactory.getInstance(JdkZlibEncoder.class);
36+
37+
/**
38+
* Maximum initial size for temporary heap buffers used for the compressed output. Buffer may still grow beyond
39+
* this if necessary.
40+
*/
41+
private static final int MAX_INITIAL_OUTPUT_BUFFER_SIZE;
42+
/**
43+
* Max size for temporary heap buffers used to copy input data to heap.
44+
*/
45+
private static final int MAX_INPUT_BUFFER_SIZE;
46+
private static final ByteBuf EMPTY_BUF;
47+
48+
private final ZlibWrapper wrapper;
49+
private final Deflater deflater;
50+
51+
/*
52+
* GZIP support
53+
*/
54+
private final CRC32 crc = new CRC32();
55+
private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
56+
57+
static {
58+
MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
59+
"io.netty.jdkzlib.encoder.maxInitialOutputBufferSize",
60+
65536);
61+
MAX_INPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
62+
"io.netty.jdkzlib.encoder.maxInputBufferSize",
63+
65536);
64+
65+
if (logger.isDebugEnabled()) {
66+
logger.debug("-Dio.netty.jdkzlib.encoder.maxInitialOutputBufferSize={}", MAX_INITIAL_OUTPUT_BUFFER_SIZE);
67+
logger.debug("-Dio.netty.jdkzlib.encoder.maxInputBufferSize={}", MAX_INPUT_BUFFER_SIZE);
68+
}
69+
70+
EMPTY_BUF = allocateByteBuf(0);
71+
}
72+
73+
private static ByteBuf allocateByteBuf(int len) {
74+
return ByteBufAllocator.DEFAULT.heapBuffer(len);
75+
}
76+
77+
private static ByteBuf allocateByteBuf() {
78+
return ByteBufAllocator.DEFAULT.heapBuffer();
79+
}
80+
81+
82+
/**
83+
* Creates a new zlib encoder with the specified {@code compressionLevel}
84+
* and the specified wrapper.
85+
*
86+
* @param compressionLevel {@code 1} yields the fastest compression and {@code 9} yields the
87+
* best compression. {@code 0} means no compression. The default
88+
* compression level is {@code 6}.
89+
* @throws CompressionException if failed to initialize zlib
90+
*/
91+
JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
92+
ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
93+
ObjectUtil.checkNotNull(wrapper, "wrapper");
94+
95+
if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
96+
throw new IllegalArgumentException(
97+
"wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
98+
"allowed for compression.");
99+
}
100+
101+
this.wrapper = wrapper;
102+
deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
103+
}
104+
105+
ByteBuf encode(byte[] in) {
106+
if (in.length == 0) {
107+
return EMPTY_BUF;
108+
}
109+
110+
ByteBuf out = allocateBuffer(in.length);
111+
encodeSome(in, out);
112+
finishEncode(out);
113+
return out;
114+
}
115+
116+
private void encodeSome(byte[] in, ByteBuf out) {
117+
if (wrapper == ZlibWrapper.GZIP) {
118+
out.writeBytes(gzipHeader);
119+
}
120+
if (wrapper == ZlibWrapper.GZIP) {
121+
crc.update(in, 0, in.length);
122+
}
123+
124+
deflater.setInput(in);
125+
for (; ; ) {
126+
deflate(out);
127+
if (!out.isWritable()) {
128+
out.ensureWritable(out.writerIndex());
129+
} else if (deflater.needsInput()) {
130+
break;
131+
}
132+
}
133+
}
134+
135+
private ByteBuf allocateBuffer(int length) {
136+
int sizeEstimate = (int) Math.ceil(length * 1.001) + 12;
137+
switch (wrapper) {
138+
case GZIP:
139+
sizeEstimate += gzipHeader.length;
140+
break;
141+
case ZLIB:
142+
sizeEstimate += 2; // first two magic bytes
143+
break;
144+
default:
145+
throw new IllegalArgumentException();
146+
}
147+
// sizeEstimate might overflow if close to 2G
148+
if (sizeEstimate < 0 || sizeEstimate > MAX_INITIAL_OUTPUT_BUFFER_SIZE) {
149+
// can always expand later
150+
return allocateByteBuf(MAX_INITIAL_OUTPUT_BUFFER_SIZE);
151+
}
152+
return allocateByteBuf(sizeEstimate);
153+
}
154+
155+
private void finishEncode(ByteBuf out) {
156+
ByteBuf footer = allocateByteBuf();
157+
deflater.finish();
158+
while (!deflater.finished()) {
159+
deflate(footer);
160+
}
161+
if (wrapper == ZlibWrapper.GZIP) {
162+
int crcValue = (int) crc.getValue();
163+
int uncBytes = deflater.getTotalIn();
164+
footer.writeByte(crcValue);
165+
footer.writeByte(crcValue >>> 8);
166+
footer.writeByte(crcValue >>> 16);
167+
footer.writeByte(crcValue >>> 24);
168+
footer.writeByte(uncBytes);
169+
footer.writeByte(uncBytes >>> 8);
170+
footer.writeByte(uncBytes >>> 16);
171+
footer.writeByte(uncBytes >>> 24);
172+
}
173+
out.writeBytes(footer);
174+
deflater.reset();
175+
crc.reset();
176+
}
177+
178+
private void deflate(ByteBuf out) {
179+
int numBytes;
180+
do {
181+
int writerIndex = out.writerIndex();
182+
numBytes = deflater.deflate(
183+
out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
184+
out.writerIndex(writerIndex + numBytes);
185+
} while (numBytes > 0);
186+
}
187+
188+
void close() {
189+
deflater.reset();
190+
deflater.end();
191+
}
192+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.arangodb.http.compression;
2+
3+
import io.vertx.core.buffer.Buffer;
4+
5+
class NoopEncoder implements Encoder {
6+
@Override
7+
public Buffer encode(byte[] data) {
8+
return Buffer.buffer(data);
9+
}
10+
11+
@Override
12+
public String getFormat() {
13+
return null;
14+
}
15+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.arangodb.http.compression;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.handler.codec.compression.ZlibWrapper;
5+
import io.vertx.core.buffer.Buffer;
6+
7+
class ZlibEncoder implements Encoder {
8+
private final ZlibWrapper wrapper;
9+
private final int level;
10+
private final String format;
11+
12+
ZlibEncoder(ZlibWrapper wrapper, int level, String format) {
13+
this.wrapper = wrapper;
14+
this.level = level;
15+
this.format = format;
16+
}
17+
18+
@Override
19+
public Buffer encode(byte[] data) {
20+
JdkZlibEncoder encoder = new JdkZlibEncoder(wrapper, level);
21+
ByteBuf bb = encoder.encode(data);
22+
Buffer out = Buffer.buffer(bb);
23+
encoder.close();
24+
return out;
25+
}
26+
27+
@Override
28+
public String getFormat() {
29+
return format;
30+
}
31+
}

0 commit comments

Comments
 (0)