diff --git a/kernel_test.go b/kernel_test.go index 357ed19..c772158 100644 --- a/kernel_test.go +++ b/kernel_test.go @@ -1,9 +1,12 @@ package main import ( - "errors" + "encoding/json" + "fmt" + "io/ioutil" + "log" "os" - "sync" + "strings" "testing" "time" @@ -15,6 +18,21 @@ const ( success = "\u2713" ) +const ( + connectionFile = "fixtures/connection_file.json" + sessionID = "ba65a05c-106a-4799-9a94-7f5631bbe216" +) + +var ( + connectionKey string + transport string + ip string + shellPort int + iopubPort int +) + +//============================================================================== + func TestMain(m *testing.M) { os.Exit(runTest(m)) } @@ -22,25 +40,58 @@ func TestMain(m *testing.M) { // runTest initializes the environment for the tests and allows for // the proper exit if the test fails or succeeds. func runTest(m *testing.M) int { + // Parse the connection info. + var connInfo ConnectionInfo + + connData, err := ioutil.ReadFile(connectionFile) + if err != nil { + log.Fatal(err) + } + + if err = json.Unmarshal(connData, &connInfo); err != nil { + log.Fatal(err) + } + + // Store the connection parameters globally for use by the test client. + connectionKey = connInfo.Key + transport = connInfo.Transport + ip = connInfo.IP + shellPort = connInfo.ShellPort + iopubPort = connInfo.IOPubPort // Start the kernel. - go runKernel("fixtures/connection_file.json") + go runKernel(connectionFile) return m.Run() } //============================================================================== -// TestEvaluate tests the evaluation of consecutive cells.. +// TestEvaluate tests the evaluation of consecutive cells. func TestEvaluate(t *testing.T) { cases := []struct { - Input string + Input []string Output string }{ - {"import \"fmt\"\na := 1\nfmt.Println(a)", "1\n"}, - {"a = 2\nfmt.Println(a)", "2\n"}, - {"func myFunc(x int) int {\nreturn x+1\n}\nfmt.Println(\"func defined\")", "func dfined\n"}, - {"b := myFunc(1)\nfmt.Println(b)", "2\n"}, + {[]string{ + "import \"fmt\"", + "a := 1", + "fmt.Println(a)", + }, "1\n"}, + {[]string{ + "a = 2", + "fmt.Println(a)", + }, "2\n"}, + {[]string{ + "func myFunc(x int) int {", + " return x+1", + "}", + "fmt.Println(\"func defined\")", + }, "func defined\n"}, + {[]string{ + "b := myFunc(1)", + "fmt.Println(b)", + }, "2\n"}, } t.Logf("Should be able to evaluate valid code in notebook cells.") @@ -51,7 +102,7 @@ func TestEvaluate(t *testing.T) { t.Logf(" Evaluating code snippet %d/%d.", k+1, len(cases)) // Get the result. - result := testEvaluate(t, tc.Input, k) + result := testEvaluate(t, strings.Join(tc.Input, "\n")) // Compare the result. if result != tc.Output { @@ -63,137 +114,336 @@ func TestEvaluate(t *testing.T) { } // testEvaluate evaluates a cell. -func testEvaluate(t *testing.T, codeIn string, testCaseIndex int) string { - - // Define the shell socket. - addrShell := "tcp://127.0.0.1:57503" - addrIO := "tcp://127.0.0.1:40885" +func testEvaluate(t *testing.T, codeIn string) string { + client, closeClient := newTestJupyterClient(t) + defer closeClient() // Create a message. - msg, err := NewMsg("execute_request", ComposedMsg{}) + request, err := NewMsg("execute_request", ComposedMsg{}) if err != nil { - t.Fatal("Create New Message:", err) + t.Fatalf("\t%s NewMsg: %s", failure, err) } // Fill in remaining header information. - msg.Header.Session = "ba65a05c-106a-4799-9a94-7f5631bbe216" - msg.Header.Username = "blah" + request.Header.Session = sessionID + request.Header.Username = "KernelTester" // Fill in Metadata. - msg.Metadata = make(map[string]interface{}) + request.Metadata = make(map[string]interface{}) // Fill in content. content := make(map[string]interface{}) content["code"] = codeIn content["silent"] = false - msg.Content = content + request.Content = content + + reply, pub := client.performJupyterRequest(t, request, 10*time.Second) + + assertMsgTypeEquals(t, reply, "execute_reply") + + content = getMsgContentAsJSONObject(t, reply) + status := getString(t, "content", content, "status") + + if status != "ok" { + t.Fatalf("\t%s Execution encountered error [%s]: %s", failure, content["ename"], content["evalue"]) + } + + for _, pubMsg := range pub { + if pubMsg.Header.MsgType == "execute_result" { + content = getMsgContentAsJSONObject(t, pubMsg) + + bundledMIMEData := getJSONObject(t, "content", content, "data") + textRep := getString(t, "content[\"data\"]", bundledMIMEData, "text/plain") + + return textRep + } + } + + return "" +} + +// TestPanicGeneratesError tests that executing code with an un-recovered panic properly generates both +// an error "execute_reply" and publishes an "error" message. +func TestPanicGeneratesError(t *testing.T) { + client, closeClient := newTestJupyterClient(t) + defer closeClient() + + // Create a message. + request, err := NewMsg("execute_request", ComposedMsg{}) + if err != nil { + t.Fatalf("\t%s NewMsg: %s", failure, err) + } + + // Fill in remaining header information. + request.Header.Session = sessionID + request.Header.Username = "KernelTester" + + // Fill in Metadata. + request.Metadata = make(map[string]interface{}) + + // Fill in content. + content := make(map[string]interface{}) + content["code"] = "panic(\"Error\")" + content["silent"] = false + request.Content = content + + reply, pub := client.performJupyterRequest(t, request, 10*time.Second) + + assertMsgTypeEquals(t, reply, "execute_reply") + + content = getMsgContentAsJSONObject(t, reply) + status := getString(t, "content", content, "status") + + if status != "error" { + t.Fatalf("\t%s Execution did not raise expected error", failure) + } + + var foundPublishedError bool + for _, pubMsg := range pub { + if pubMsg.Header.MsgType == "error" { + foundPublishedError = true + break + } + } + + if !foundPublishedError { + t.Fatalf("\t%s Execution did not publish an expected \"error\" message", failure) + } +} + +//============================================================================== + +// testJupyterClient holds references to the 2 sockets it uses to communicate with the kernel. +type testJupyterClient struct { + shellSocket *zmq.Socket + ioSocket *zmq.Socket +} + +// newTestJupyterClient creates and connects a fresh client to the kernel. Upon error, newTestJupyterClient +// will Fail the test. +func newTestJupyterClient(t *testing.T) (testJupyterClient, func()) { + t.Helper() + + addrShell := fmt.Sprintf("%s://%s:%d", transport, ip, shellPort) + addrIO := fmt.Sprintf("%s://%s:%d", transport, ip, iopubPort) // Prepare the shell socket. - sock, err := zmq.NewSocket(zmq.REQ) + shell, err := zmq.NewSocket(zmq.REQ) if err != nil { - t.Fatal("NewSocket:", err) + t.Fatalf("\t%s NewSocket: %s", failure, err) } - defer sock.Close() - if err = sock.Connect(addrShell); err != nil { - t.Fatal("sock.Connect:", err) + if err = shell.Connect(addrShell); err != nil { + t.Fatalf("\t%s shell.Connect: %s", failure, err) } - // Prepare the IOPub subscriber. - sockIO, err := zmq.NewSocket(zmq.SUB) + // Prepare the IOPub socket. + iopub, err := zmq.NewSocket(zmq.SUB) if err != nil { - t.Fatal("NewSocket:", err) + t.Fatalf("\t%s NewSocket: %s", failure, err) } - defer sockIO.Close() - if err = sockIO.Connect(addrIO); err != nil { - t.Fatal("sockIO.Connect:", err) + if err = iopub.Connect(addrIO); err != nil { + t.Fatalf("\t%s iopub.Connect: %s", failure, err) } - sockIO.SetSubscribe("") + if err = iopub.SetSubscribe(""); err != nil { + t.Fatalf("\t%s iopub.SetSubscribe: %s", failure, err) + } - // Start the subscriber. - quit := make(chan struct{}) - var result string - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - - case <-quit: - return - - default: - msgParts, err := sockIO.RecvMessageBytes(0) - if err != nil { - t.Fatal("sockIO.RecvMessageBytes:", err) - } - - msgParsed, _, err := WireMsgToComposedMsg(msgParts, []byte("a0436f6c-1916-498b-8eb9-e81ab9368e84")) - if err != nil { - t.Fatal("WireMsgToComposedMsg:", err) - } - - if msgParsed.Header.MsgType == "execute_result" { - content, ok := msgParsed.Content.(map[string]interface{}) - if !ok { - t.Fatal("msgParsed.Content.(map[string]interface{})", errors.New("Could not cast type")) - } - data, ok := content["data"] - if !ok { - t.Fatal("content[\"data\"]", errors.New("Data field not present")) - } - dataMap, ok := data.(map[string]interface{}) - if !ok { - t.Fatal("data.(map[string]string)", errors.New("Could not cast type")) - } - rawResult, ok := dataMap["text/plain"] - if !ok { - t.Fatal("dataMap[\"text/plain\"]", errors.New("text/plain field not present")) - } - result, ok = rawResult.(string) - if !ok { - t.Fatal("rawResult.(string)", errors.New("Could not cast result as string")) - } - return - } - } + // Wait for a second to give the tcp connection time to complete to avoid missing the early pub messages. + time.Sleep(1 * time.Second) + + return testJupyterClient{shell, iopub}, func() { + if err := shell.Close(); err != nil { + t.Errorf("\t%s shell.Close: %s", failure, err) } - }() + if err = iopub.Close(); err != nil { + t.Errorf("\t%s iopub.Close: %s", failure, err) + } + } +} - time.Sleep(1 * time.Second) +// sendShellRequest sends a message to the kernel over the shell channel. Upon error, sendShellRequest +// will Fail the test. +func (client *testJupyterClient) sendShellRequest(t *testing.T, request ComposedMsg) { + t.Helper() - // Send the execute request. - if _, err := sock.Send("", zmq.SNDMORE); err != nil { - t.Fatal("sock.Send:", err) + if _, err := client.shellSocket.Send("", zmq.SNDMORE); err != nil { + t.Fatalf("\t%s shellSocket.Send: %s", failure, err) } - msgParts, err := msg.ToWireMsg([]byte("a0436f6c-1916-498b-8eb9-e81ab9368e84")) + reqMsgParts, err := request.ToWireMsg([]byte(connectionKey)) if err != nil { - t.Fatal("msg.ToWireMsg:", err) + t.Fatalf("\t%s request.ToWireMsg: %s", failure, err) } - if _, err = sock.SendMessage(msgParts); err != nil { - t.Fatal("sock.SendMessage:", err) + if _, err = client.shellSocket.SendMessage(reqMsgParts); err != nil { + t.Fatalf("\t%s shellSocket.SendMessage: %s", failure, err) } +} + +// recvShellReply tries to read a reply message from the shell channel. It will timeout after the given +// timeout delay. Upon error or timeout, recvShellReply will Fail the test. +func (client *testJupyterClient) recvShellReply(t *testing.T, timeout time.Duration) (reply ComposedMsg) { + t.Helper() + + ch := make(chan ComposedMsg) - // Wait for the result. If we timeout, kill the subscriber. - done := make(chan struct{}) go func() { - wg.Wait() - close(done) + repMsgParts, err := client.shellSocket.RecvMessageBytes(0) + if err != nil { + t.Fatalf("\t%s Shell socket RecvMessageBytes: %s", failure, err) + } + + msgParsed, _, err := WireMsgToComposedMsg(repMsgParts, []byte(connectionKey)) + if err != nil { + t.Fatalf("\t%s Could not parse wire message: %s", failure, err) + } + + ch <- msgParsed }() - // Compare the result to the expect and clean up. select { - case <-done: - return result - case <-time.After(10 * time.Second): - close(quit) - t.Fatalf("[test case %d] Evaution timed out!", testCaseIndex+1) + case reply = <-ch: + case <-time.After(timeout): + t.Fatalf("\t%s recvShellReply timed out", failure) } - return "" + return +} + +// recvIOSub tries to read a published message from the IOPub channel. It will timeout after the given +// timeout delay. Upon error or timeout, recvIOSub will Fail the test. +func (client *testJupyterClient) recvIOSub(t *testing.T, timeout time.Duration) (sub ComposedMsg) { + t.Helper() + + ch := make(chan ComposedMsg) + + go func() { + repMsgParts, err := client.ioSocket.RecvMessageBytes(0) + if err != nil { + t.Fatalf("\t%s IOPub socket RecvMessageBytes: %s", failure, err) + } + + msgParsed, _, err := WireMsgToComposedMsg(repMsgParts, []byte(connectionKey)) + if err != nil { + t.Fatalf("\t%s Could not parse wire message: %s", failure, err) + } + + ch <- msgParsed + }() + + select { + case sub = <-ch: + case <-time.After(timeout): + t.Fatalf("\t%s recvIOSub timed out", failure) + } + + return +} + +// performJupyterRequest preforms a request and awaits a reply on the shell channel. Additionally all messages on the +// IOPub channel between the opening 'busy' messages and closing 'idle' message are captured and returned. The request +// will timeout after the given timeout delay. Upon error or timeout, request will Fail the test. +func (client *testJupyterClient) performJupyterRequest(t *testing.T, request ComposedMsg, timeout time.Duration) (reply ComposedMsg, pub []ComposedMsg) { + t.Helper() + + client.sendShellRequest(t, request) + reply = client.recvShellReply(t, timeout) + + // Read the expected 'busy' message and ensure it is in fact, a 'busy' message. + subMsg := client.recvIOSub(t, 1*time.Second) + assertMsgTypeEquals(t, subMsg, "status") + + subData := getMsgContentAsJSONObject(t, subMsg) + execState := getString(t, "content", subData, "execution_state") + + if execState != kernelBusy { + t.Fatalf("\t%s Expected a 'busy' status message but got '%s'", failure, execState) + } + + // Read messages from the IOPub channel until an 'idle' message is received. + for { + subMsg = client.recvIOSub(t, 100*time.Millisecond) + + // If the message is a 'status' message, ensure it is an 'idle' status. + if subMsg.Header.MsgType == "status" { + subData = getMsgContentAsJSONObject(t, subMsg) + execState = getString(t, "content", subData, "execution_state") + + if execState != kernelIdle { + t.Fatalf("\t%s Expected a 'idle' status message but got '%s'", failure, execState) + } + + // Break from the loop as we don't expect any other IOPub messages after the 'idle'. + break + } + + // Add the message to the pub collection. + pub = append(pub, subMsg) + } + + return +} + +// assertMsgTypeEquals is a test helper that fails the test if the message header's MsgType is not the +// expectedType. +func assertMsgTypeEquals(t *testing.T, msg ComposedMsg, expectedType string) { + t.Helper() + + if msg.Header.MsgType != expectedType { + t.Fatalf("\t%s Expected message of type '%s' but was '%s'", failure, expectedType, msg.Header.MsgType) + } +} + +// getMsgContentAsJSONObject is a test helper that fails the rest if the message content is not a +// map[string]interface{} and returns the content as a map[string]interface{} if it is of the correct type. +func getMsgContentAsJSONObject(t *testing.T, msg ComposedMsg) map[string]interface{} { + t.Helper() + + content, ok := msg.Content.(map[string]interface{}) + if !ok { + t.Fatalf("\t%s Message content is not a JSON object", failure) + } + + return content +} + +// getString is a test helper that retrieves a value as a string from the content at the given key. If the key +// does not exist in the content map or the value is not a string this will fail the test. The jsonObjectName +// parameter is a string used to name the content for more helpful fail messages. +func getString(t *testing.T, jsonObjectName string, content map[string]interface{}, key string) string { + t.Helper() + + raw, ok := content[key] + if !ok { + t.Fatalf("\t%s %s[\"%s\"] field not present", failure, jsonObjectName, key) + } + + value, ok := raw.(string) + if !ok { + t.Fatalf("\t%s %s[\"%s\"] is not a string", failure, jsonObjectName, key) + } + + return value +} + +// getJSONObject is a test helper that retrieves a value as a map[string]interface{} from the content at the given key. +// If the key does not exist in the content map or the value is not a map[string]interface{} this will fail the test. +// The jsonObjectName parameter is a string used to name the content for more helpful fail messages. +func getJSONObject(t *testing.T, jsonObjectName string, content map[string]interface{}, key string) map[string]interface{} { + t.Helper() + + raw, ok := content[key] + if !ok { + t.Fatalf("\t%s %s[\"%s\"] field not present", failure, jsonObjectName, key) + } + + value, ok := raw.(map[string]interface{}) + if !ok { + t.Fatalf("\t%s %s[\"%s\"] is not a JSON object", failure, jsonObjectName, key) + } + + return value }