// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package core import ( "errors" "sync" "time" "go.amzn.com/lambda/core/statejson" ) // Suspendable on operator condition. type Suspendable interface { SuspendUnsafe() Release() Lock() Unlock() } // ManagedThread is suspendable on operator condition. type ManagedThread struct { operatorCondition *sync.Cond operatorConditionValue bool } // SuspendUnsafe suspends ManagedThread on operator condition. This allows thread // to be suspended and then resumed from the main thread. // It's marked Unsafe because ManagedThread should be locked before SuspendUnsafe is called func (s *ManagedThread) SuspendUnsafe() { for !s.operatorConditionValue { s.operatorCondition.Wait() } s.operatorConditionValue = false // reset back to false } // Release releases operator condition. This allows thread // to be suspended and then resumed from the main thread. func (s *ManagedThread) Release() { s.operatorCondition.L.Lock() defer s.operatorCondition.L.Unlock() s.operatorConditionValue = true s.operatorCondition.Signal() } // Lock ManagedThread condvar mutex func (s *ManagedThread) Lock() { s.operatorCondition.L.Lock() } // Unlock ManagedThread condvar mutex func (s *ManagedThread) Unlock() { s.operatorCondition.L.Unlock() } // NewManagedThread returns new ManagedThread instance. func NewManagedThread() *ManagedThread { return &ManagedThread{ operatorCondition: sync.NewCond(&sync.Mutex{}), operatorConditionValue: false, } } // ErrNotAllowed returned on illegal state transition var ErrNotAllowed = errors.New("State transition is not allowed") // ErrConcurrentStateModification returned when we've detected an invalid state transision caused by concurrent modification var ErrConcurrentStateModification = errors.New("Concurrent state modification") // RuntimeState is runtime state machine interface. type RuntimeState interface { InitError() error Ready() error RestoreReady() error InvocationResponse() error InvocationErrorResponse() error ResponseSent() error Name() string } type disallowEveryTransitionByDefault struct{} func (s *disallowEveryTransitionByDefault) InitError() error { return ErrNotAllowed } func (s *disallowEveryTransitionByDefault) Ready() error { return ErrNotAllowed } func (s *disallowEveryTransitionByDefault) RestoreReady() error { return ErrNotAllowed } func (s *disallowEveryTransitionByDefault) InvocationResponse() error { return ErrNotAllowed } func (s *disallowEveryTransitionByDefault) InvocationErrorResponse() error { return ErrNotAllowed } func (s *disallowEveryTransitionByDefault) ResponseSent() error { return ErrNotAllowed } // Runtime is runtime object. type Runtime struct { ManagedThread Suspendable currentState RuntimeState stateLastModified time.Time responseTime time.Time RuntimeStartedState RuntimeState RuntimeInitErrorState RuntimeState RuntimeReadyState RuntimeState RuntimeRunningState RuntimeState RuntimeRestoreReadyState RuntimeState RuntimeRestoringState RuntimeState RuntimeInvocationResponseState RuntimeState RuntimeInvocationErrorResponseState RuntimeState RuntimeResponseSentState RuntimeState } // Release ... func (s *Runtime) Release() { s.ManagedThread.Release() } // SetState ... func (s *Runtime) SetState(state RuntimeState) { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() s.setStateUnsafe(state) } func (s *Runtime) setStateUnsafe(state RuntimeState) { s.currentState = state s.stateLastModified = time.Now() } // GetState ... func (s *Runtime) GetState() RuntimeState { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() return s.currentState } // Ready delegates to state implementation. func (s *Runtime) Ready() error { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() return s.currentState.Ready() } func (s *Runtime) RestoreReady() error { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() return s.currentState.RestoreReady() } // InvocationResponse delegates to state implementation. func (s *Runtime) InvocationResponse() error { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() return s.currentState.InvocationResponse() } // InvocationErrorResponse delegates to state implementation. func (s *Runtime) InvocationErrorResponse() error { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() return s.currentState.InvocationErrorResponse() } // InitError delegates to state implementation. func (s *Runtime) InitError() error { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() return s.currentState.InitError() } // ResponseSent delegates to state implementation. func (s *Runtime) ResponseSent() error { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() err := s.currentState.ResponseSent() if err == nil { s.responseTime = time.Now() } return err } // GetRuntimeDescription returns runtime description object for debugging purposes func (s *Runtime) GetRuntimeDescription() statejson.RuntimeDescription { s.ManagedThread.Lock() defer s.ManagedThread.Unlock() res := statejson.RuntimeDescription{ State: statejson.StateDescription{ Name: s.currentState.Name(), LastModified: s.stateLastModified.UnixNano() / int64(time.Millisecond), }, } if !s.responseTime.IsZero() { res.State.ResponseTimeNs = s.responseTime.UnixNano() } return res } // NewRuntime returns new Runtime instance. func NewRuntime(initFlow InitFlowSynchronization, invokeFlow InvokeFlowSynchronization) *Runtime { runtime := &Runtime{ ManagedThread: NewManagedThread(), } runtime.RuntimeStartedState = &RuntimeStartedState{runtime: runtime, initFlow: initFlow} runtime.RuntimeInitErrorState = &RuntimeInitErrorState{runtime: runtime, initFlow: initFlow} runtime.RuntimeReadyState = &RuntimeReadyState{runtime: runtime} runtime.RuntimeRunningState = &RuntimeRunningState{runtime: runtime, invokeFlow: invokeFlow} runtime.RuntimeInvocationResponseState = &RuntimeInvocationResponseState{runtime: runtime, invokeFlow: invokeFlow} runtime.RuntimeInvocationErrorResponseState = &RuntimeInvocationErrorResponseState{runtime: runtime, invokeFlow: invokeFlow} runtime.RuntimeResponseSentState = &RuntimeResponseSentState{runtime: runtime, invokeFlow: invokeFlow} runtime.RuntimeRestoreReadyState = &RuntimeRestoreReadyState{} runtime.RuntimeRestoringState = &RuntimeRestoringState{runtime: runtime, initFlow: initFlow} runtime.setStateUnsafe(runtime.RuntimeStartedState) return runtime } // RuntimeStartedState runtime started state. type RuntimeStartedState struct { disallowEveryTransitionByDefault runtime *Runtime initFlow InitFlowSynchronization } // Ready call when runtime init done. func (s *RuntimeStartedState) Ready() error { s.runtime.setStateUnsafe(s.runtime.RuntimeReadyState) // runtime called /next without calling /restore/next // that means it's not interested in restore phase err := s.initFlow.RuntimeRestoreReady() if err != nil { return err } err = s.initFlow.RuntimeReady() if err != nil { return err } s.runtime.ManagedThread.SuspendUnsafe() if s.runtime.currentState != s.runtime.RuntimeReadyState && s.runtime.currentState != s.runtime.RuntimeRunningState { return ErrConcurrentStateModification } s.runtime.setStateUnsafe(s.runtime.RuntimeRunningState) return nil } func (s *RuntimeStartedState) RestoreReady() error { s.runtime.setStateUnsafe(s.runtime.RuntimeRestoreReadyState) err := s.initFlow.RuntimeRestoreReady() if err != nil { return err } s.runtime.ManagedThread.SuspendUnsafe() if s.runtime.currentState != s.runtime.RuntimeRestoreReadyState && s.runtime.currentState != s.runtime.RuntimeRestoringState { return ErrConcurrentStateModification } s.runtime.setStateUnsafe(s.runtime.RuntimeRestoringState) return nil } // InitError move runtime to init error state. func (s *RuntimeStartedState) InitError() error { s.runtime.setStateUnsafe(s.runtime.RuntimeInitErrorState) return nil } // Name ... func (s *RuntimeStartedState) Name() string { return RuntimeStartedStateName } type RuntimeRestoringState struct { disallowEveryTransitionByDefault runtime *Runtime initFlow InitFlowSynchronization } // Runtime is healthy after restore and called /next func (s *RuntimeRestoringState) Ready() error { s.runtime.setStateUnsafe(s.runtime.RuntimeReadyState) err := s.initFlow.RuntimeReady() if err != nil { return err } s.runtime.ManagedThread.SuspendUnsafe() if s.runtime.currentState != s.runtime.RuntimeReadyState && s.runtime.currentState != s.runtime.RuntimeRunningState { return ErrConcurrentStateModification } s.runtime.setStateUnsafe(s.runtime.RuntimeRunningState) return nil } // Runtime has thrown an exception when executing restore hooks and called /init/error func (s *RuntimeRestoringState) InitError() error { s.runtime.setStateUnsafe(s.runtime.RuntimeInitErrorState) return nil } func (s *RuntimeRestoringState) Name() string { return RuntimeRestoringStateName } // RuntimeInitErrorState runtime started state. type RuntimeInitErrorState struct { disallowEveryTransitionByDefault runtime *Runtime initFlow InitFlowSynchronization } // Name ... func (s *RuntimeInitErrorState) Name() string { return RuntimeInitErrorStateName } // RuntimeReadyState runtime ready state. type RuntimeReadyState struct { disallowEveryTransitionByDefault runtime *Runtime } func (s *RuntimeReadyState) Ready() error { s.runtime.ManagedThread.SuspendUnsafe() if s.runtime.currentState != s.runtime.RuntimeReadyState && s.runtime.currentState != s.runtime.RuntimeRunningState { return ErrConcurrentStateModification } s.runtime.setStateUnsafe(s.runtime.RuntimeRunningState) return nil } // Name ... func (s *RuntimeReadyState) Name() string { return RuntimeReadyStateName } // RuntimeRunningState runtime ready state. type RuntimeRunningState struct { disallowEveryTransitionByDefault runtime *Runtime invokeFlow InvokeFlowSynchronization } func (s *RuntimeRunningState) Ready() error { return nil } // InvocationResponse call when runtime response is available. func (s *RuntimeRunningState) InvocationResponse() error { s.runtime.setStateUnsafe(s.runtime.RuntimeInvocationResponseState) return nil } // InvocationErrorResponse call when runtime error response is available. func (s *RuntimeRunningState) InvocationErrorResponse() error { s.runtime.setStateUnsafe(s.runtime.RuntimeInvocationErrorResponseState) return nil } // Name ... func (s *RuntimeRunningState) Name() string { return RuntimeRunningStateName } type RuntimeRestoreReadyState struct { disallowEveryTransitionByDefault } func (s *RuntimeRestoreReadyState) Name() string { return RuntimeRestoreReadyStateName } // RuntimeInvocationResponseState runtime response is available. // Start state for runtime response submission. type RuntimeInvocationResponseState struct { disallowEveryTransitionByDefault runtime *Runtime invokeFlow InvokeFlowSynchronization } // ResponseSent completes RuntimeInvocationResponseState. func (s *RuntimeInvocationResponseState) ResponseSent() error { s.runtime.setStateUnsafe(s.runtime.RuntimeResponseSentState) return s.invokeFlow.RuntimeResponse(s.runtime) } // Name ... func (s *RuntimeInvocationResponseState) Name() string { return RuntimeInvocationResponseStateName } // RuntimeInvocationErrorResponseState runtime response is available. // Start state for runtime error response submission. type RuntimeInvocationErrorResponseState struct { disallowEveryTransitionByDefault runtime *Runtime invokeFlow InvokeFlowSynchronization } // ResponseSent completes RuntimeInvocationErrorResponseState. func (s *RuntimeInvocationErrorResponseState) ResponseSent() error { s.runtime.setStateUnsafe(s.runtime.RuntimeResponseSentState) return s.invokeFlow.RuntimeResponse(s.runtime) } // Name ... func (s *RuntimeInvocationErrorResponseState) Name() string { return RuntimeInvocationErrorResponseStateName } // RuntimeResponseSentState ends started runtime response or runtime error response submission. type RuntimeResponseSentState struct { disallowEveryTransitionByDefault runtime *Runtime invokeFlow InvokeFlowSynchronization } // Ready call when runtime ready. func (s *RuntimeResponseSentState) Ready() error { s.runtime.setStateUnsafe(s.runtime.RuntimeReadyState) if err := s.invokeFlow.RuntimeReady(s.runtime); err != nil { return err } s.runtime.ManagedThread.SuspendUnsafe() if s.runtime.currentState != s.runtime.RuntimeReadyState && s.runtime.currentState != s.runtime.RuntimeRunningState { return ErrConcurrentStateModification } s.runtime.setStateUnsafe(s.runtime.RuntimeRunningState) return nil } // Name ... func (s *RuntimeResponseSentState) Name() string { return RuntimeResponseSentStateName }