// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). You may // not use this file except in compliance with the License. A copy of the // License is located at // // http://aws.amazon.com/apache2.0/ // // or in the "license" file accompanying this file. This file is distributed // on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either // express or implied. See the License for the specific language governing // permissions and limitations under the License. package vm import ( "context" "fmt" "io" "strings" "sync" "time" "github.com/firecracker-microvm/firecracker-containerd/internal" "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) // IOProxy is an interface to a particular implementation for initializing // and copying the stdio of a process running in a VM. All its methods are // unexported as they are used only internally. The interface exists just // to give outside callers a choice of implementation when setting up a // process. type IOProxy interface { // start should begin initialization of stdio proxying for the provided // process. It returns two channels, one to indicate io initialization // is completed and one to indicate io copying is completed. start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-chan error) // Close the proxy. Close() // IsOpen returns true if the proxy hasn't been closed. IsOpen() bool } // IOConnector is function that begins initializing an IO connection (i.e. // vsock, FIFO, etc.). It returns a channel that should be published to with // an IOConnectorResult object once initialization is complete or an error // occurs. // // The return of a channel instead of plain values gives the implementation // the freedom to do synchronous setup, asynchronous setup or a mix of both. // // The provided context is canceled if the process fails to create/exec or // when the process exits after a successful create/exec. type IOConnector func(procCtx context.Context, logger *logrus.Entry) <-chan IOConnectorResult // IOConnectorResult represents the result of attempting to establish an IO // connection. If successful, the ReadWriteCloser should be non-nil and Err // should be nil. If unsuccessful, ReadWriteCloser should be nil and Err // should be non-nil. type IOConnectorResult struct { io.ReadWriteCloser Err error } // IOConnectorPair holds the read and write side of IOConnectors whose IO // should be proxied. type IOConnectorPair struct { ReadConnector IOConnector WriteConnector IOConnector } func (connectorPair *IOConnectorPair) proxy( ctx context.Context, logger *logrus.Entry, timeoutAfterExit time.Duration, ) (ioInitDone <-chan error, ioCopyDone <-chan error) { // initDone might not have to be buffered. We only send ioInitErr once. initDone := make(chan error, 2) copyDone := make(chan error) ioCtx, ioCancel := context.WithCancel(context.Background()) // Start the initialization process. Any synchronous setup made by the connectors will // be completed after these lines. Async setup will be done once initDone is closed in // the goroutine below. readerResultCh := connectorPair.ReadConnector(ioCtx, logger.WithField("direction", "read")) writerResultCh := connectorPair.WriteConnector(ioCtx, logger.WithField("direction", "write")) go func() { defer ioCancel() defer close(copyDone) var reader io.ReadCloser var writer io.WriteCloser var ioInitErr error // Send the first error we get to initDone, but consume both so we can ensure both // end up closed in the case of an error for readerResultCh != nil || writerResultCh != nil { var err error select { case readerResult := <-readerResultCh: readerResultCh = nil if err = readerResult.Err; err == nil { reader = readerResult.ReadWriteCloser } case writerResult := <-writerResultCh: writerResultCh = nil if err = writerResult.Err; err == nil { writer = writerResult.ReadWriteCloser } } if err != nil { ioInitErr = fmt.Errorf("error initializing io: %w", err) logger.WithError(ioInitErr).Error() initDone <- ioInitErr } } close(initDone) if ioInitErr != nil { logClose(logger, reader, writer) return } // IO streams have been initialized successfully // Once the proc exits, wait the provided time before forcibly closing io streams. // If the io streams close on their own before the timeout, the Close calls here // should just be no-ops. go func() { <-ctx.Done() time.AfterFunc(timeoutAfterExit, func() { logClose(logger, reader, writer) }) }() logger.Debug("begin copying io") defer logger.Debug("end copying io") size, err := io.CopyBuffer(writer, reader, make([]byte, internal.DefaultBufferSize)) logger.Debugf("copied %d", size) if err != nil { if strings.Contains(err.Error(), "use of closed network connection") || strings.Contains(err.Error(), "file already closed") { logger.Infof("connection was closed: %v", err) } else { logger.WithError(err).Error("error copying io") } copyDone <- err } defer logClose(logger, reader, writer) }() return initDone, copyDone } type ioConnectorSet struct { stdin *IOConnectorPair stdout *IOConnectorPair stderr *IOConnectorPair // closeMu is needed since Close() will be called from different goroutines. closeMu sync.Mutex closed bool } // NewIOConnectorProxy implements the IOProxy interface for a set of // IOConnectorPairs, one each for stdin, stdout and stderr. If any one of // those streams does not need to be proxied, the corresponding arg should // be nil. func NewIOConnectorProxy(stdin, stdout, stderr *IOConnectorPair) IOProxy { return &ioConnectorSet{ stdin: stdin, stdout: stdout, stderr: stderr, closed: false, } } func (ioConnectorSet *ioConnectorSet) Close() { ioConnectorSet.closeMu.Lock() defer ioConnectorSet.closeMu.Unlock() ioConnectorSet.closed = true } func (ioConnectorSet *ioConnectorSet) IsOpen() bool { ioConnectorSet.closeMu.Lock() defer ioConnectorSet.closeMu.Unlock() return !ioConnectorSet.closed } // start starts goroutines to copy stdio and returns two channels. // The first channel returns its initialization error. The second channel returns errors from copying. func (ioConnectorSet *ioConnectorSet) start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-chan error) { var initErrG errgroup.Group // When one of the goroutines returns an error, we will cancel // the rest of goroutines through the ctx below. copyErrG, ctx := errgroup.WithContext(proc.ctx) waitErrs := func(initErrCh, copyErrCh <-chan error) { initErrG.Go(func() error { return <-initErrCh }) copyErrG.Go(func() error { return <-copyErrCh }) } if ioConnectorSet.stdin != nil { // For Stdin only, provide 0 as the timeout to wait after the proc exits before closing IO streams. // There's no reason to send stdin data to a proc that's already dead. waitErrs(ioConnectorSet.stdin.proxy(ctx, proc.logger.WithField("stream", "stdin"), 0)) } else { proc.logger.Debug("skipping proxy io for unset stdin") } if ioConnectorSet.stdout != nil { waitErrs(ioConnectorSet.stdout.proxy(ctx, proc.logger.WithField("stream", "stdout"), defaultIOFlushTimeout)) } else { proc.logger.Debug("skipping proxy io for unset stdout") } if ioConnectorSet.stderr != nil { waitErrs(ioConnectorSet.stderr.proxy(ctx, proc.logger.WithField("stream", "stderr"), defaultIOFlushTimeout)) } else { proc.logger.Debug("skipping proxy io for unset stderr") } // These channels are not buffered, since we will close them right after having one error. // Callers must read the channels. initDone := make(chan error) go func() { defer close(initDone) initDone <- initErrG.Wait() }() copyDone := make(chan error) go func() { defer close(copyDone) copyDone <- copyErrG.Wait() }() return initDone, copyDone } func logClose(logger *logrus.Entry, streams ...io.Closer) { var closeErr error for _, stream := range streams { if stream == nil { continue } err := stream.Close() if err != nil { closeErr = multierror.Append(closeErr, err) } } if closeErr != nil { logger.WithError(closeErr).Error("error closing io stream") } } // InputPair returns an IOConnectorPair from the given vsock port to // the FIFO file. func InputPair(src uint32, dest string) *IOConnectorPair { if dest == "" { return nil } return &IOConnectorPair{ ReadConnector: VSockAcceptConnector(src), WriteConnector: WriteFIFOConnector(dest), } } // OutputPair returns an IOConnectorPair from the given FIFO to // the vsock port. func OutputPair(src string, dest uint32) *IOConnectorPair { if src == "" { return nil } return &IOConnectorPair{ ReadConnector: ReadFIFOConnector(src), WriteConnector: VSockAcceptConnector(dest), } }