Skip to content

Enhance RpcClient: Provide access to message metadata #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 64 additions & 6 deletions src/main/java/com/rabbitmq/client/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void handleDelivery(String consumerTag,
String replyId = properties.getCorrelationId();
BlockingCell<Object> blocker = _continuationMap.get(replyId);
_continuationMap.remove(replyId);
blocker.set(body);
blocker.set(new Response(consumerTag, envelope, properties, body));
}
}
};
Expand All @@ -205,16 +205,15 @@ public void publish(AMQP.BasicProperties props, byte[] message)
_channel.basicPublish(_exchange, _routingKey, props, message);
}

public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
throws IOException, ShutdownSignalException, TimeoutException
{
public Response doCall(AMQP.BasicProperties props, byte[] message)
throws IOException, ShutdownSignalException, TimeoutException {
checkConsumer();
BlockingCell<Object> k = new BlockingCell<Object>();
synchronized (_continuationMap) {
_correlationId++;
String replyId = "" + _correlationId;
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
.correlationId(replyId).replyTo(_replyTo).build();
.correlationId(replyId).replyTo(_replyTo).build();
_continuationMap.put(replyId, k);
}
publish(props, message);
Expand All @@ -229,10 +228,16 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
wrapper.initCause(sig);
throw wrapper;
} else {
return (byte[]) reply;
return (Response) reply;
}
}

public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
throws IOException, ShutdownSignalException, TimeoutException
{
return doCall(props, message).getBody();
}

/**
* Perform a simple byte-array-based RPC roundtrip.
* @param message the byte array request message to send
Expand All @@ -246,6 +251,21 @@ public byte[] primitiveCall(byte[] message)
return primitiveCall(null, message);
}

/**
* Perform a simple byte-array-based RPC roundtrip
*
* Useful if you need to get at more than just the body of the message
*
* @param message the byte array request message to send
* @return The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
* @throws ShutdownSignalException if the connection dies during our wait
* @throws IOException if an error is encountered
* @throws TimeoutException if a response is not received within the configured timeout
*/
public Response responseCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException {
return doCall(null, message);
}

/**
* Perform a simple string-based RPC roundtrip.
* @param message the string request message to send
Expand Down Expand Up @@ -368,5 +388,43 @@ public int getCorrelationId() {
public Consumer getConsumer() {
return _consumer;
}

/**
* The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
*/
public static class Response {
protected String consumerTag;
protected Envelope envelope;
protected AMQP.BasicProperties properties;
protected byte[] body;

public Response() {
}

public Response(
final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties,
final byte[] body) {
this.consumerTag = consumerTag;
this.envelope = envelope;
this.properties = properties;
this.body = body;
}

public String getConsumerTag() {
return consumerTag;
}

public Envelope getEnvelope() {
return envelope;
}

public AMQP.BasicProperties getProperties() {
return properties;
}

public byte[] getBody() {
return body;
}
}
}