Skip to content

Pull upstream changes 2021/06 #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion lambda/core/directinvoke/directinvoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package directinvoke

import (
"fmt"
"io"
"net/http"

"github.com/go-chi/chi"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/metering"
)

const (
Expand Down Expand Up @@ -51,6 +53,7 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T
return nil, interop.ErrMalformedCustomerHeaders
}

now := metering.Monotime()
inv := &interop.Invoke{
ID: r.Header.Get(InvokeIDHeader),
ReservationToken: chi.URLParam(r, "reservationtoken"),
Expand All @@ -64,7 +67,9 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T
ClientContext: custHeaders.ClientContext,
Payload: r.Body,
CorrelationID: "invokeCorrelationID",
DeadlineNs: token.DeadlineNs,
DeadlineNs: fmt.Sprintf("%d", now+token.FunctionTimeout.Nanoseconds()),
NeedDebugLogs: token.NeedDebugLogs,
InvokeReceivedTime: now,
}

if inv.ID != token.InvokeID {
Expand All @@ -82,6 +87,11 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T
return nil, interop.ErrInvalidFunctionVersion
}

if now > token.InvackDeadlineNs {
renderBadRequest(w, r, interop.ErrReservationExpired.Error())
return nil, interop.ErrReservationExpired
}

w.Header().Set(VersionIDHeader, token.VersionID)
w.Header().Set(ReservationTokenHeader, token.ReservationToken)
w.Header().Set(InvokeIDHeader, token.InvokeID)
Expand Down
13 changes: 12 additions & 1 deletion lambda/interop/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"time"

"go.amzn.com/lambda/core/statejson"
"go.amzn.com/lambda/fatalerror"
Expand Down Expand Up @@ -39,16 +40,19 @@ type Invoke struct {
CorrelationID string // internal use only
ReservationToken string
VersionID string
InvokeReceivedTime int64
}

type Token struct {
ReservationToken string
InvokeID string
VersionID string
DeadlineNs string
FunctionTimeout time.Duration
InvackDeadlineNs int64
TraceID string
LambdaSegmentID string
InvokeMetadata string
NeedDebugLogs bool
}

type ErrorResponse struct {
Expand Down Expand Up @@ -129,6 +133,7 @@ type DoneMetadata struct {
InvokeRequestReadTimeNs int64
InvokeRequestSizeBytes int64
InvokeCompletionTimeNs int64
InvokeReceivedTime int64
}

type Done struct {
Expand Down Expand Up @@ -159,6 +164,9 @@ var ErrMalformedCustomerHeaders = fmt.Errorf("ErrMalformedCustomerHeaders")
// ErrResponseSent is returned when response with given invokeID was already sent.
var ErrResponseSent = fmt.Errorf("ErrResponseSent")

// ErrReservationExpired is returned when invoke arrived after InvackDeadline
var ErrReservationExpired = fmt.Errorf("ErrReservationExpired")

// ErrorResponseTooLarge is returned when response Payload exceeds shared memory buffer size
type ErrorResponseTooLarge struct {
MaxResponseSize int
Expand Down Expand Up @@ -186,6 +194,9 @@ func (s *ErrorResponseTooLarge) AsInteropError() *ErrorResponse {

// Server implements Slicer communication protocol.
type Server interface {
// StartAcceptingDirectInvokes starts accepting on direct invoke socket (if one is available)
StartAcceptingDirectInvokes() error

// SendErrorResponse sends response.
// Errors returned:
// ErrInvalidInvokeID - validation error indicating that provided invokeID doesn't match current invokeID
Expand Down
12 changes: 9 additions & 3 deletions lambda/rapid/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func handleStart(ctx context.Context, execCtx *rapidContext, watchdog *core.Watc
if !startRequest.SuppressInit {
if err := doInit(ctx, execCtx, watchdog); err != nil {
log.WithError(err).WithField("InvokeID", startRequest.InvokeID).Error("Init failed")
doneFailMsg := generateDoneFail(execCtx, startRequest.CorrelationID, nil)
doneFailMsg := generateDoneFail(execCtx, startRequest.CorrelationID, nil, 0)
handleInitError(doneFailMsg, execCtx, startRequest.InvokeID, interopServer, err)
return
}
Expand All @@ -378,9 +378,13 @@ func handleStart(ctx context.Context, execCtx *rapidContext, watchdog *core.Watc
if err := interopServer.SendDone(doneMsg); err != nil {
log.Panic(err)
}

if err := interopServer.StartAcceptingDirectInvokes(); err != nil {
log.Panic(err)
}
}

func generateDoneFail(execCtx *rapidContext, correlationID string, invokeMx *rendering.InvokeRendererMetrics) *interop.DoneFail {
func generateDoneFail(execCtx *rapidContext, correlationID string, invokeMx *rendering.InvokeRendererMetrics, invokeReceivedTime int64) *interop.DoneFail {
errorType, found := appctx.LoadFirstFatalError(execCtx.appCtx)
if !found {
errorType = fatalerror.Unknown
Expand All @@ -392,6 +396,7 @@ func generateDoneFail(execCtx *rapidContext, correlationID string, invokeMx *ren
Meta: interop.DoneMetadata{
RuntimeRelease: appctx.GetRuntimeRelease(execCtx.appCtx),
NumActiveExtensions: execCtx.registrationService.CountAgents(),
InvokeReceivedTime: invokeReceivedTime,
},
}

Expand All @@ -414,7 +419,7 @@ func handleInvoke(ctx context.Context, execCtx *rapidContext, watchdog *core.Wat

if err := doInvoke(ctx, execCtx, watchdog, invokeRequest, &invokeMx); err != nil {
log.WithError(err).WithField("InvokeID", invokeRequest.ID).Error("Invoke failed")
doneFailMsg := generateDoneFail(execCtx, invokeRequest.CorrelationID, &invokeMx)
doneFailMsg := generateDoneFail(execCtx, invokeRequest.CorrelationID, &invokeMx, invokeRequest.InvokeReceivedTime)
handleInvokeError(doneFailMsg, execCtx, invokeRequest.ID, interopServer, err)
return
}
Expand All @@ -436,6 +441,7 @@ func handleInvoke(ctx context.Context, execCtx *rapidContext, watchdog *core.Wat
InvokeRequestReadTimeNs: invokeMx.ReadTime.Nanoseconds(),
InvokeRequestSizeBytes: int64(invokeMx.SizeBytes),
InvokeCompletionTimeNs: invokeCompletionTimeNs,
InvokeReceivedTime: invokeRequest.InvokeReceivedTime,
},
}
if execCtx.telemetryAPIEnabled {
Expand Down
21 changes: 10 additions & 11 deletions lambda/rapidcore/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -103,6 +104,10 @@ type Server struct {
runtimeState runtimeState
}

func (s *Server) StartAcceptingDirectInvokes() error {
return nil
}

func (s *Server) setRapidPhase(phase rapidPhase) {
s.mutex.Lock()
defer s.mutex.Unlock()
Expand Down Expand Up @@ -153,13 +158,7 @@ func (s *Server) GetInvokeContext() *InvokeContext {
return &ctx
}

func (s *Server) generateInvokeDeadline() string {
s.mutex.Lock()
defer s.mutex.Unlock()
return fmt.Sprintf("%v", time.Now().Add(s.invokeTimeout).UnixNano())
}

func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID, deadline string) (*ReserveResponse, error) {
func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID string) (*ReserveResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

Expand All @@ -172,9 +171,10 @@ func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID,
ReservationToken: uuid.New().String(),
InvokeID: invokeID,
VersionID: standaloneVersionID,
DeadlineNs: deadline,
FunctionTimeout: s.invokeTimeout,
TraceID: traceID,
LambdaSegmentID: lambdaSegmentID,
InvackDeadlineNs: math.MaxInt64, // no INVACK in standalone
},
}

Expand All @@ -189,12 +189,11 @@ func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID,

// Reserve allocates invoke context
func (s *Server) Reserve(id string, traceID, lambdaSegmentID string) (*ReserveResponse, error) {
ddl := s.generateInvokeDeadline()
invokeID := uuid.New().String()
if len(id) > 0 {
invokeID = id
}
resp, err := s.setNewInvokeContext(invokeID, traceID, lambdaSegmentID, ddl)
resp, err := s.setNewInvokeContext(invokeID, traceID, lambdaSegmentID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -614,7 +613,7 @@ func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invo
}
}

invoke.DeadlineNs = reserveResp.Token.DeadlineNs
invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+reserveResp.Token.FunctionTimeout.Nanoseconds())

invokeChan := make(chan error)
go func() {
Expand Down
4 changes: 3 additions & 1 deletion lambda/rapidcore/standalone/invokeHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package standalone

import (
"fmt"
"net/http"

"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/metering"
"go.amzn.com/lambda/rapidcore"

log "github.com/sirupsen/logrus"
Expand All @@ -25,7 +27,7 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, s rapidcore.InteropSe
LambdaSegmentID: r.Header.Get("X-Amzn-Segment-Id"),
Payload: r.Body,
CorrelationID: "invokeCorrelationID",
DeadlineNs: tok.DeadlineNs,
DeadlineNs: fmt.Sprintf("%d", metering.Monotime()+tok.FunctionTimeout.Nanoseconds()),
}

if err := s.FastInvoke(w, invokePayload, false); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion lambda/testdata/flowtesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type MockInteropServer struct {
ActiveInvokeID string
}

// StartAcceptingDirectInvokes
func (i *MockInteropServer) StartAcceptingDirectInvokes() error { return nil }

// SendResponse writes response to a shared memory.
func (i *MockInteropServer) SendResponse(invokeID string, reader io.Reader) error {
bytes, err := ioutil.ReadAll(reader)
Expand Down Expand Up @@ -92,7 +95,6 @@ func (m *MockInteropServer) Invoke(w http.ResponseWriter, i *interop.Invoke) err

func (m *MockInteropServer) Shutdown(shutdown *interop.Shutdown) *statejson.InternalStateDescription { return nil }


// FlowTest provides configuration for tests that involve synchronization flows.
type FlowTest struct {
AppCtx appctx.ApplicationContext
Expand Down
44 changes: 43 additions & 1 deletion test/integration/local_lambda/end-to-end-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,48 @@ def test_exception_returned(self):
r = requests.post("http://localhost:9002/2015-03-31/functions/function/invocations", json={})
self.assertEqual(b'{"errorMessage": "Raising an exception", "errorType": "Exception", "stackTrace": [" File \\"/var/task/main.py\\", line 13, in exception_handler\\n raise Exception(\\"Raising an exception\\")\\n"]}', r.content)

def test_context_get_remaining_time_in_three_seconds(self):
cmd = f"docker run --name remainingtimethree -d --env AWS_LAMBDA_FUNCTION_TIMEOUT=3 -v {self.path_to_binary}:/local-lambda-runtime-server -p 9004:8080 --entrypoint /local-lambda-runtime-server/aws-lambda-rie {self.image_name} {DEFAULT_1P_ENTRYPOINT} main.check_remaining_time_handler"

Popen(cmd.split(' ')).communicate()

r = requests.post("http://localhost:9004/2015-03-31/functions/function/invocations", json={})

# sleep 1s to give enough time for the endpoint to be up to curl
time.sleep(SLEEP_TIME)
# Executation time is not decided, 1.0s ~ 3.0s is a good estimation
self.assertLess(int(r.content), 3000)
self.assertGreater(int(r.content), 1000)


def test_context_get_remaining_time_in_ten_seconds(self):
cmd = f"docker run --name remainingtimeten -d --env AWS_LAMBDA_FUNCTION_TIMEOUT=10 -v {self.path_to_binary}:/local-lambda-runtime-server -p 9005:8080 --entrypoint /local-lambda-runtime-server/aws-lambda-rie {self.image_name} {DEFAULT_1P_ENTRYPOINT} main.check_remaining_time_handler"

Popen(cmd.split(' ')).communicate()

r = requests.post("http://localhost:9005/2015-03-31/functions/function/invocations", json={})

# sleep 1s to give enough time for the endpoint to be up to curl
time.sleep(SLEEP_TIME)
# Executation time is not decided, 8.0s ~ 10.0s is a good estimation
self.assertLess(int(r.content), 10000)
self.assertGreater(int(r.content), 8000)


def test_context_get_remaining_time_in_default_deadline(self):
cmd = f"docker run --name remainingtimedefault -d -v {self.path_to_binary}:/local-lambda-runtime-server -p 9006:8080 --entrypoint /local-lambda-runtime-server/aws-lambda-rie {self.image_name} {DEFAULT_1P_ENTRYPOINT} main.check_remaining_time_handler"

Popen(cmd.split(' ')).communicate()

r = requests.post("http://localhost:9006/2015-03-31/functions/function/invocations", json={})

# sleep 1s to give enough time for the endpoint to be up to curl
time.sleep(SLEEP_TIME)
# Executation time is not decided, 298.0s ~ 300.0s is a good estimation
self.assertLess(int(r.content), 300000)
self.assertGreater(int(r.content), 298000)


class TestPython36Runtime(TestCase):

@classmethod
Expand Down Expand Up @@ -153,4 +195,4 @@ def test_function_name_is_overriden(self):


if __name__ == "__main__":
main()
main()
6 changes: 6 additions & 0 deletions test/integration/testdata/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ def assert_lambda_arn_in_context(event, context):
return "My lambda ran succesfully"
else:
raise("Function Arn was not there")


def check_remaining_time_handler(event, context):
# Wait 1s to see if the remaining time changes
time.sleep(1)
return context.get_remaining_time_in_millis()