Skip to content

Commit b1c9aa7

Browse files
authored
Pull upstream changes 2021/06 (#39)
1 parent a78c49c commit b1c9aa7

File tree

8 files changed

+97
-19
lines changed

8 files changed

+97
-19
lines changed

lambda/core/directinvoke/directinvoke.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
package directinvoke
55

66
import (
7+
"fmt"
78
"io"
89
"net/http"
910

1011
"github.com/go-chi/chi"
1112
"go.amzn.com/lambda/interop"
13+
"go.amzn.com/lambda/metering"
1214
)
1315

1416
const (
@@ -51,6 +53,7 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T
5153
return nil, interop.ErrMalformedCustomerHeaders
5254
}
5355

56+
now := metering.Monotime()
5457
inv := &interop.Invoke{
5558
ID: r.Header.Get(InvokeIDHeader),
5659
ReservationToken: chi.URLParam(r, "reservationtoken"),
@@ -64,7 +67,9 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T
6467
ClientContext: custHeaders.ClientContext,
6568
Payload: r.Body,
6669
CorrelationID: "invokeCorrelationID",
67-
DeadlineNs: token.DeadlineNs,
70+
DeadlineNs: fmt.Sprintf("%d", now+token.FunctionTimeout.Nanoseconds()),
71+
NeedDebugLogs: token.NeedDebugLogs,
72+
InvokeReceivedTime: now,
6873
}
6974

7075
if inv.ID != token.InvokeID {
@@ -82,6 +87,11 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T
8287
return nil, interop.ErrInvalidFunctionVersion
8388
}
8489

90+
if now > token.InvackDeadlineNs {
91+
renderBadRequest(w, r, interop.ErrReservationExpired.Error())
92+
return nil, interop.ErrReservationExpired
93+
}
94+
8595
w.Header().Set(VersionIDHeader, token.VersionID)
8696
w.Header().Set(ReservationTokenHeader, token.ReservationToken)
8797
w.Header().Set(InvokeIDHeader, token.InvokeID)

lambda/interop/model.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"net/http"
11+
"time"
1112

1213
"go.amzn.com/lambda/core/statejson"
1314
"go.amzn.com/lambda/fatalerror"
@@ -39,16 +40,19 @@ type Invoke struct {
3940
CorrelationID string // internal use only
4041
ReservationToken string
4142
VersionID string
43+
InvokeReceivedTime int64
4244
}
4345

4446
type Token struct {
4547
ReservationToken string
4648
InvokeID string
4749
VersionID string
48-
DeadlineNs string
50+
FunctionTimeout time.Duration
51+
InvackDeadlineNs int64
4952
TraceID string
5053
LambdaSegmentID string
5154
InvokeMetadata string
55+
NeedDebugLogs bool
5256
}
5357

5458
type ErrorResponse struct {
@@ -129,6 +133,7 @@ type DoneMetadata struct {
129133
InvokeRequestReadTimeNs int64
130134
InvokeRequestSizeBytes int64
131135
InvokeCompletionTimeNs int64
136+
InvokeReceivedTime int64
132137
}
133138

134139
type Done struct {
@@ -159,6 +164,9 @@ var ErrMalformedCustomerHeaders = fmt.Errorf("ErrMalformedCustomerHeaders")
159164
// ErrResponseSent is returned when response with given invokeID was already sent.
160165
var ErrResponseSent = fmt.Errorf("ErrResponseSent")
161166

167+
// ErrReservationExpired is returned when invoke arrived after InvackDeadline
168+
var ErrReservationExpired = fmt.Errorf("ErrReservationExpired")
169+
162170
// ErrorResponseTooLarge is returned when response Payload exceeds shared memory buffer size
163171
type ErrorResponseTooLarge struct {
164172
MaxResponseSize int
@@ -186,6 +194,9 @@ func (s *ErrorResponseTooLarge) AsInteropError() *ErrorResponse {
186194

187195
// Server implements Slicer communication protocol.
188196
type Server interface {
197+
// StartAcceptingDirectInvokes starts accepting on direct invoke socket (if one is available)
198+
StartAcceptingDirectInvokes() error
199+
189200
// SendErrorResponse sends response.
190201
// Errors returned:
191202
// ErrInvalidInvokeID - validation error indicating that provided invokeID doesn't match current invokeID

lambda/rapid/start.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func handleStart(ctx context.Context, execCtx *rapidContext, watchdog *core.Watc
359359
if !startRequest.SuppressInit {
360360
if err := doInit(ctx, execCtx, watchdog); err != nil {
361361
log.WithError(err).WithField("InvokeID", startRequest.InvokeID).Error("Init failed")
362-
doneFailMsg := generateDoneFail(execCtx, startRequest.CorrelationID, nil)
362+
doneFailMsg := generateDoneFail(execCtx, startRequest.CorrelationID, nil, 0)
363363
handleInitError(doneFailMsg, execCtx, startRequest.InvokeID, interopServer, err)
364364
return
365365
}
@@ -378,9 +378,13 @@ func handleStart(ctx context.Context, execCtx *rapidContext, watchdog *core.Watc
378378
if err := interopServer.SendDone(doneMsg); err != nil {
379379
log.Panic(err)
380380
}
381+
382+
if err := interopServer.StartAcceptingDirectInvokes(); err != nil {
383+
log.Panic(err)
384+
}
381385
}
382386

383-
func generateDoneFail(execCtx *rapidContext, correlationID string, invokeMx *rendering.InvokeRendererMetrics) *interop.DoneFail {
387+
func generateDoneFail(execCtx *rapidContext, correlationID string, invokeMx *rendering.InvokeRendererMetrics, invokeReceivedTime int64) *interop.DoneFail {
384388
errorType, found := appctx.LoadFirstFatalError(execCtx.appCtx)
385389
if !found {
386390
errorType = fatalerror.Unknown
@@ -392,6 +396,7 @@ func generateDoneFail(execCtx *rapidContext, correlationID string, invokeMx *ren
392396
Meta: interop.DoneMetadata{
393397
RuntimeRelease: appctx.GetRuntimeRelease(execCtx.appCtx),
394398
NumActiveExtensions: execCtx.registrationService.CountAgents(),
399+
InvokeReceivedTime: invokeReceivedTime,
395400
},
396401
}
397402

@@ -414,7 +419,7 @@ func handleInvoke(ctx context.Context, execCtx *rapidContext, watchdog *core.Wat
414419

415420
if err := doInvoke(ctx, execCtx, watchdog, invokeRequest, &invokeMx); err != nil {
416421
log.WithError(err).WithField("InvokeID", invokeRequest.ID).Error("Invoke failed")
417-
doneFailMsg := generateDoneFail(execCtx, invokeRequest.CorrelationID, &invokeMx)
422+
doneFailMsg := generateDoneFail(execCtx, invokeRequest.CorrelationID, &invokeMx, invokeRequest.InvokeReceivedTime)
418423
handleInvokeError(doneFailMsg, execCtx, invokeRequest.ID, interopServer, err)
419424
return
420425
}
@@ -436,6 +441,7 @@ func handleInvoke(ctx context.Context, execCtx *rapidContext, watchdog *core.Wat
436441
InvokeRequestReadTimeNs: invokeMx.ReadTime.Nanoseconds(),
437442
InvokeRequestSizeBytes: int64(invokeMx.SizeBytes),
438443
InvokeCompletionTimeNs: invokeCompletionTimeNs,
444+
InvokeReceivedTime: invokeRequest.InvokeReceivedTime,
439445
},
440446
}
441447
if execCtx.telemetryAPIEnabled {

lambda/rapidcore/server.go

+10-11
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"io"
1212
"io/ioutil"
13+
"math"
1314
"net/http"
1415
"sync"
1516
"time"
@@ -103,6 +104,10 @@ type Server struct {
103104
runtimeState runtimeState
104105
}
105106

107+
func (s *Server) StartAcceptingDirectInvokes() error {
108+
return nil
109+
}
110+
106111
func (s *Server) setRapidPhase(phase rapidPhase) {
107112
s.mutex.Lock()
108113
defer s.mutex.Unlock()
@@ -153,13 +158,7 @@ func (s *Server) GetInvokeContext() *InvokeContext {
153158
return &ctx
154159
}
155160

156-
func (s *Server) generateInvokeDeadline() string {
157-
s.mutex.Lock()
158-
defer s.mutex.Unlock()
159-
return fmt.Sprintf("%v", time.Now().Add(s.invokeTimeout).UnixNano())
160-
}
161-
162-
func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID, deadline string) (*ReserveResponse, error) {
161+
func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID string) (*ReserveResponse, error) {
163162
s.mutex.Lock()
164163
defer s.mutex.Unlock()
165164

@@ -172,9 +171,10 @@ func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID,
172171
ReservationToken: uuid.New().String(),
173172
InvokeID: invokeID,
174173
VersionID: standaloneVersionID,
175-
DeadlineNs: deadline,
174+
FunctionTimeout: s.invokeTimeout,
176175
TraceID: traceID,
177176
LambdaSegmentID: lambdaSegmentID,
177+
InvackDeadlineNs: math.MaxInt64, // no INVACK in standalone
178178
},
179179
}
180180

@@ -189,12 +189,11 @@ func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID,
189189

190190
// Reserve allocates invoke context
191191
func (s *Server) Reserve(id string, traceID, lambdaSegmentID string) (*ReserveResponse, error) {
192-
ddl := s.generateInvokeDeadline()
193192
invokeID := uuid.New().String()
194193
if len(id) > 0 {
195194
invokeID = id
196195
}
197-
resp, err := s.setNewInvokeContext(invokeID, traceID, lambdaSegmentID, ddl)
196+
resp, err := s.setNewInvokeContext(invokeID, traceID, lambdaSegmentID)
198197
if err != nil {
199198
return nil, err
200199
}
@@ -614,7 +613,7 @@ func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invo
614613
}
615614
}
616615

617-
invoke.DeadlineNs = reserveResp.Token.DeadlineNs
616+
invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+reserveResp.Token.FunctionTimeout.Nanoseconds())
618617

619618
invokeChan := make(chan error)
620619
go func() {

lambda/rapidcore/standalone/invokeHandler.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
package standalone
55

66
import (
7+
"fmt"
78
"net/http"
89

910
"go.amzn.com/lambda/interop"
11+
"go.amzn.com/lambda/metering"
1012
"go.amzn.com/lambda/rapidcore"
1113

1214
log "github.com/sirupsen/logrus"
@@ -25,7 +27,7 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, s rapidcore.InteropSe
2527
LambdaSegmentID: r.Header.Get("X-Amzn-Segment-Id"),
2628
Payload: r.Body,
2729
CorrelationID: "invokeCorrelationID",
28-
DeadlineNs: tok.DeadlineNs,
30+
DeadlineNs: fmt.Sprintf("%d", metering.Monotime()+tok.FunctionTimeout.Nanoseconds()),
2931
}
3032

3133
if err := s.FastInvoke(w, invokePayload, false); err != nil {

lambda/testdata/flowtesting.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type MockInteropServer struct {
2424
ActiveInvokeID string
2525
}
2626

27+
// StartAcceptingDirectInvokes
28+
func (i *MockInteropServer) StartAcceptingDirectInvokes() error { return nil }
29+
2730
// SendResponse writes response to a shared memory.
2831
func (i *MockInteropServer) SendResponse(invokeID string, reader io.Reader) error {
2932
bytes, err := ioutil.ReadAll(reader)
@@ -92,7 +95,6 @@ func (m *MockInteropServer) Invoke(w http.ResponseWriter, i *interop.Invoke) err
9295

9396
func (m *MockInteropServer) Shutdown(shutdown *interop.Shutdown) *statejson.InternalStateDescription { return nil }
9497

95-
9698
// FlowTest provides configuration for tests that involve synchronization flows.
9799
type FlowTest struct {
98100
AppCtx appctx.ApplicationContext

test/integration/local_lambda/end-to-end-test.py

+43-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,48 @@ def test_exception_returned(self):
106106
r = requests.post("http://localhost:9002/2015-03-31/functions/function/invocations", json={})
107107
self.assertEqual(b'{"errorMessage": "Raising an exception", "errorType": "Exception", "stackTrace": [" File \\"/var/task/main.py\\", line 13, in exception_handler\\n raise Exception(\\"Raising an exception\\")\\n"]}', r.content)
108108

109+
def test_context_get_remaining_time_in_three_seconds(self):
110+
cmd = f"docker run --name remainingtimethree -d --env AWS_LAMBDA_FUNCTION_TIMEOUT=3 -v {self.path_to_binary}:/local-lambda-runtime-server -p 9004:8080 --entrypoint /local-lambda-runtime-server/aws-lambda-rie {self.image_name} {DEFAULT_1P_ENTRYPOINT} main.check_remaining_time_handler"
111+
112+
Popen(cmd.split(' ')).communicate()
113+
114+
r = requests.post("http://localhost:9004/2015-03-31/functions/function/invocations", json={})
115+
116+
# sleep 1s to give enough time for the endpoint to be up to curl
117+
time.sleep(SLEEP_TIME)
118+
# Executation time is not decided, 1.0s ~ 3.0s is a good estimation
119+
self.assertLess(int(r.content), 3000)
120+
self.assertGreater(int(r.content), 1000)
121+
122+
123+
def test_context_get_remaining_time_in_ten_seconds(self):
124+
cmd = f"docker run --name remainingtimeten -d --env AWS_LAMBDA_FUNCTION_TIMEOUT=10 -v {self.path_to_binary}:/local-lambda-runtime-server -p 9005:8080 --entrypoint /local-lambda-runtime-server/aws-lambda-rie {self.image_name} {DEFAULT_1P_ENTRYPOINT} main.check_remaining_time_handler"
125+
126+
Popen(cmd.split(' ')).communicate()
127+
128+
r = requests.post("http://localhost:9005/2015-03-31/functions/function/invocations", json={})
129+
130+
# sleep 1s to give enough time for the endpoint to be up to curl
131+
time.sleep(SLEEP_TIME)
132+
# Executation time is not decided, 8.0s ~ 10.0s is a good estimation
133+
self.assertLess(int(r.content), 10000)
134+
self.assertGreater(int(r.content), 8000)
135+
136+
137+
def test_context_get_remaining_time_in_default_deadline(self):
138+
cmd = f"docker run --name remainingtimedefault -d -v {self.path_to_binary}:/local-lambda-runtime-server -p 9006:8080 --entrypoint /local-lambda-runtime-server/aws-lambda-rie {self.image_name} {DEFAULT_1P_ENTRYPOINT} main.check_remaining_time_handler"
139+
140+
Popen(cmd.split(' ')).communicate()
141+
142+
r = requests.post("http://localhost:9006/2015-03-31/functions/function/invocations", json={})
143+
144+
# sleep 1s to give enough time for the endpoint to be up to curl
145+
time.sleep(SLEEP_TIME)
146+
# Executation time is not decided, 298.0s ~ 300.0s is a good estimation
147+
self.assertLess(int(r.content), 300000)
148+
self.assertGreater(int(r.content), 298000)
149+
150+
109151
class TestPython36Runtime(TestCase):
110152

111153
@classmethod
@@ -153,4 +195,4 @@ def test_function_name_is_overriden(self):
153195

154196

155197
if __name__ == "__main__":
156-
main()
198+
main()

test/integration/testdata/main.py

+6
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,9 @@ def assert_lambda_arn_in_context(event, context):
3535
return "My lambda ran succesfully"
3636
else:
3737
raise("Function Arn was not there")
38+
39+
40+
def check_remaining_time_handler(event, context):
41+
# Wait 1s to see if the remaining time changes
42+
time.sleep(1)
43+
return context.get_remaining_time_in_millis()

0 commit comments

Comments
 (0)