From 53567ce631b593c50c6ccd47c708e0f0b9afb73b Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 19 Feb 2016 12:19:43 -0800 Subject: [PATCH] Enhance RpcClient: Provide access to msg metadata This provides a simple way to get at the other parameters from the Consumer's `handleDelivery` method (e.g. consumerTag, envelope, properties). It is backwards compatible, and only those using the new interface `responseCall` will have access to the additional data. --- .../java/com/rabbitmq/client/RpcClient.java | 70 +++++++++++++++++-- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/RpcClient.java b/src/main/java/com/rabbitmq/client/RpcClient.java index 379e36178c..230fec93b7 100644 --- a/src/main/java/com/rabbitmq/client/RpcClient.java +++ b/src/main/java/com/rabbitmq/client/RpcClient.java @@ -191,7 +191,7 @@ public void handleDelivery(String consumerTag, String replyId = properties.getCorrelationId(); BlockingCell blocker = _continuationMap.get(replyId); _continuationMap.remove(replyId); - blocker.set(body); + blocker.set(new Response(consumerTag, envelope, properties, body)); } } }; @@ -205,16 +205,15 @@ public void publish(AMQP.BasicProperties props, byte[] message) _channel.basicPublish(_exchange, _routingKey, props, message); } - public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) - throws IOException, ShutdownSignalException, TimeoutException - { + public Response doCall(AMQP.BasicProperties props, byte[] message) + throws IOException, ShutdownSignalException, TimeoutException { checkConsumer(); BlockingCell k = new BlockingCell(); synchronized (_continuationMap) { _correlationId++; String replyId = "" + _correlationId; props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder()) - .correlationId(replyId).replyTo(_replyTo).build(); + .correlationId(replyId).replyTo(_replyTo).build(); _continuationMap.put(replyId, k); } publish(props, message); @@ -229,10 +228,16 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) wrapper.initCause(sig); throw wrapper; } else { - return (byte[]) reply; + return (Response) reply; } } + public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) + throws IOException, ShutdownSignalException, TimeoutException + { + return doCall(props, message).getBody(); + } + /** * Perform a simple byte-array-based RPC roundtrip. * @param message the byte array request message to send @@ -246,6 +251,21 @@ public byte[] primitiveCall(byte[] message) return primitiveCall(null, message); } + /** + * Perform a simple byte-array-based RPC roundtrip + * + * Useful if you need to get at more than just the body of the message + * + * @param message the byte array request message to send + * @return The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer + * @throws ShutdownSignalException if the connection dies during our wait + * @throws IOException if an error is encountered + * @throws TimeoutException if a response is not received within the configured timeout + */ + public Response responseCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException { + return doCall(null, message); + } + /** * Perform a simple string-based RPC roundtrip. * @param message the string request message to send @@ -368,5 +388,43 @@ public int getCorrelationId() { public Consumer getConsumer() { return _consumer; } + + /** + * The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer + */ + public static class Response { + protected String consumerTag; + protected Envelope envelope; + protected AMQP.BasicProperties properties; + protected byte[] body; + + public Response() { + } + + public Response( + final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, + final byte[] body) { + this.consumerTag = consumerTag; + this.envelope = envelope; + this.properties = properties; + this.body = body; + } + + public String getConsumerTag() { + return consumerTag; + } + + public Envelope getEnvelope() { + return envelope; + } + + public AMQP.BasicProperties getProperties() { + return properties; + } + + public byte[] getBody() { + return body; + } + } }