Skip to content

Commit 13652ae

Browse files
committed
Add Haskell RPC example
1 parent e713516 commit 13652ae

File tree

3 files changed

+106
-1
lines changed

3 files changed

+106
-1
lines changed

haskell/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Code examples are executed via `runhaskell`:
4343

4444
[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html)
4545

46-
TBD
46+
runhaskell rpcServer.hs
47+
runhaskell rpcClient.hs
4748

4849
To learn more, see [Network.AMQP](https://github.com/hreinhardt/amqp).

haskell/rpcClient.hs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env stack
2+
-- stack --install-ghc runghc --package bytestring --package text --package amqp --package uuid
3+
{-# LANGUAGE OverloadedStrings #-}
4+
5+
import Control.Concurrent (MVar, newEmptyMVar, putMVar,
6+
takeMVar)
7+
import Control.Monad (when)
8+
import qualified Data.ByteString.Lazy.Char8 as BL
9+
import Data.Maybe (fromJust)
10+
import Data.Text (Text)
11+
import Data.UUID (toText)
12+
import Data.UUID.V4 (nextRandom)
13+
import Network.AMQP
14+
15+
type QueueName = Text
16+
17+
main :: IO ()
18+
main = do
19+
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
20+
ch <- openChannel conn
21+
22+
putStrLn " [x] Requesting fib(30)"
23+
res <- callFib ch rpcQueue 30
24+
putStrLn $ " [.] Got '" ++ show res ++ "'"
25+
26+
closeConnection conn
27+
where
28+
rpcQueue = "rpc_queue"
29+
30+
callFib :: Channel -> QueueName -> Int -> IO Int
31+
callFib ch queue n = do
32+
cid <- genCorrelationId
33+
rqn <- declareReplyQueue
34+
35+
let body = BL.pack . show $ n
36+
let message = newMsg {msgCorrelationID = Just cid, msgReplyTo = Just rqn, msgBody = body}
37+
publishMsg ch "" queue message
38+
39+
m <- newEmptyMVar
40+
consumeMsgs ch rqn Ack $ handleResponse cid m
41+
42+
res <- takeMVar m
43+
return res
44+
where
45+
genCorrelationId = toText <$> nextRandom
46+
declareReplyQueue = do
47+
let opts = newQueue {queueAutoDelete = True, queueExclusive = True}
48+
(rqn, _, _) <- declareQueue ch opts
49+
return rqn
50+
51+
handleResponse :: Text -> MVar Int -> (Message, Envelope) -> IO ()
52+
handleResponse corrId m (msg, envelope) = do
53+
let msgCorrId = fromJust (msgCorrelationID msg)
54+
when (msgCorrId == corrId) $ do
55+
res <- readIO (BL.unpack . msgBody $ msg)
56+
putMVar m res
57+
ackEnv envelope

haskell/rpcServer.hs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#!/usr/bin/env stack
2+
-- stack --install-ghc runghc --package bytestring --package text --package amqp
3+
{-# LANGUAGE OverloadedStrings #-}
4+
5+
import Control.Concurrent (MVar, newEmptyMVar, putMVar,
6+
takeMVar)
7+
import qualified Data.ByteString.Lazy.Char8 as BL
8+
import Data.Maybe (fromJust)
9+
import Network.AMQP
10+
11+
main :: IO ()
12+
main = do
13+
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
14+
ch <- openChannel conn
15+
16+
qos ch 0 1 False
17+
declareQueue ch newQueue {queueName = rpcQueue}
18+
19+
m <- newEmptyMVar
20+
consumeMsgs ch rpcQueue Ack $ handleRequest ch m
21+
putStrLn " [x] Awaiting RPC requests"
22+
takeMVar m
23+
24+
closeConnection conn
25+
where
26+
rpcQueue = "rpc_queue"
27+
28+
handleRequest :: Channel -> MVar () -> (Message, Envelope) -> IO ()
29+
handleRequest ch m (msg, envelope) = do
30+
n <- readIO . BL.unpack . msgBody $ msg
31+
putStrLn $ " [.] fib(" ++ show n ++ ")"
32+
33+
let result = fib n
34+
let response = newMsg { msgCorrelationID = msgCorrelationID msg
35+
, msgBody = BL.pack . show $ result
36+
}
37+
publishMsg ch "" replyTo response
38+
ackEnv envelope
39+
putMVar m ()
40+
where
41+
replyTo = fromJust $ msgReplyTo msg
42+
43+
fib :: Int -> Int
44+
fib n
45+
| n >= 2 = fib (n - 1) + fib (n - 2)
46+
| n == 1 = 1
47+
| otherwise = 0

0 commit comments

Comments
 (0)