// Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package streams import ( "fmt" "sync" "sync/atomic" "time" "github.com/aws/go-kafka-event-source/streams/sak" "github.com/twmb/franz-go/pkg/kgo" ) // Returned by an EventProcessor or Interjector in response to an EventContext. ExecutionState // should not be conflated with concepts of error state, such as Success or Failure. type ExecutionState int const ( // Complete signals the EventSource that the event or interjection is completely processed. // Once Complete is returned, the offset for the associated EventContext will be commited. Complete ExecutionState = 0 // Incomplete signals the EventSource that the event or interjection is still ongoing, and // that your application promises to fulfill the EventContext in the future. // The offset for the associated EventContext will not be commited. Incomplete ExecutionState = 1 Fatal ExecutionState = 2 unknownType ExecutionState = 3 ) type AsyncJob[T any] struct { ctx *EventContext[T] finalizer func() ExecutionState } func (aj AsyncJob[T]) Finalize() ExecutionState { return aj.finalizer() } type asyncCompleter[T any] struct { asyncJobs chan AsyncJob[T] } func (ac asyncCompleter[T]) AsyncComplete(j AsyncJob[T]) { ac.asyncJobs <- j } type partitionWorker[T StateStore] struct { eosProducer *eosProducerPool[T] partitionInput chan []*kgo.Record maxPending chan struct{} interjectionInput chan *interjection[T] eventInput chan *EventContext[T] interjectionEventInput chan *EventContext[T] asyncCompleter asyncCompleter[T] stopSignal chan struct{} revokedSignal chan struct{} stopped chan struct{} changeLog changeLogPartition[T] eventSource *EventSource[T] runStatus sak.RunStatus ready int64 highestOffset int64 topicPartition TopicPartition revocationWaiter sync.WaitGroup } func newPartitionWorker[T StateStore]( eventSource *EventSource[T], topicPartition TopicPartition, commitLog *eosCommitLog, changeLog changeLogPartition[T], eosProducer *eosProducerPool[T], waiter func()) *partitionWorker[T] { eosConfig := eventSource.source.config.EosConfig recordsInputSize := sak.Max(eosConfig.MaxBatchSize/10, 100) asyncSize := recordsInputSize * 4 pw := &partitionWorker[T]{ eventSource: eventSource, topicPartition: topicPartition, changeLog: changeLog, eosProducer: eosProducer, stopSignal: make(chan struct{}), revokedSignal: make(chan struct{}, 1), stopped: make(chan struct{}), maxPending: make(chan struct{}, eosProducer.maxPendingItems()), asyncCompleter: asyncCompleter[T]{ asyncJobs: make(chan AsyncJob[T], asyncSize), }, partitionInput: make(chan []*kgo.Record, 4), eventInput: make(chan *EventContext[T], recordsInputSize), interjectionInput: make(chan *interjection[T], 1), interjectionEventInput: make(chan *EventContext[T], 1), runStatus: eventSource.runStatus.Fork(), highestOffset: -1, } go pw.work(pw.eventSource.interjections, waiter, commitLog) return pw } func (pw *partitionWorker[T]) canInterject() bool { return atomic.LoadInt64(&pw.ready) != 0 } func (pw *partitionWorker[T]) add(records []*kgo.Record) { if pw.isRevoked() { return } // atomic.AddInt64(&pw.pending, int64(len(records))) pw.partitionInput <- records } func (pw *partitionWorker[T]) revoke() { pw.runStatus.Halt() } type sincer struct { then time.Time } func (s sincer) String() string { return fmt.Sprintf("%v", time.Since(s.then)) } func (pw *partitionWorker[T]) pushRecords() { for { select { case records := <-pw.partitionInput: if !pw.isRevoked() { pw.scheduleTxnAndExecution(records) } case ij := <-pw.interjectionInput: pw.scheduleInterjection(ij) case <-pw.runStatus.Done(): log.Debugf("Closing worker for %+v", pw.topicPartition) pw.stopSignal <- struct{}{} <-pw.stopped close(pw.partitionInput) close(pw.eventInput) close(pw.asyncCompleter.asyncJobs) log.Debugf("Closed worker for %+v", pw.topicPartition) return } } } func (pw *partitionWorker[T]) scheduleTxnAndExecution(records []*kgo.Record) { if pw.isRevoked() { return } pw.revocationWaiter.Add(len(records)) // optimistically do one add call for _, record := range records { if record != nil && record.Offset >= pw.highestOffset { ec := newEventContext(pw.runStatus.Ctx(), record, pw.changeLog.changeLogData(), pw) pw.maxPending <- struct{}{} pw.eosProducer.addEventContext(ec) pw.eventInput <- ec } else { pw.revocationWaiter.Done() // in the rare occasion this is a stale evetn, decrement the revocation waiter } // this is needed as, when under load, the record input may starve out interjections // which have a very small input buffer pw.interleaveInterjection() } } func (pw *partitionWorker[T]) interleaveInterjection() { select { case ij := <-pw.interjectionInput: pw.scheduleInterjection(ij) default: } } func (pw *partitionWorker[T]) scheduleInterjection(inter *interjection[T]) { if pw.isRevoked() { if inter.callback != nil { inter.callback() } return } pw.revocationWaiter.Add(1) ec := newInterjectionContext(pw.runStatus.Ctx(), inter, pw.topicPartition, pw.changeLog.changeLogData(), pw) pw.maxPending <- struct{}{} pw.eosProducer.addEventContext(ec) pw.interjectionEventInput <- ec } func (pw *partitionWorker[T]) work(interjections []interjection[T], waiter func(), commitLog *eosCommitLog) { elapsed := sincer{time.Now()} // the partition is not ready to receive events as it is still bootstrapping the state store. // in the case where this partition was assigned due to a failure on another consumer, this could be a lengthy process // if we continue to consume events for this partition, we will fill it's input buffer // and block other partitions on this consumer. pause the partition until tghe state store is bootstrapped pw.eventSource.consumer.Client().PauseFetchPartitions(map[string][]int32{ pw.topicPartition.Topic: {pw.topicPartition.Partition}, }) // don't start consuming until this function returns // this function will block until all changelogs for this partition are populated pw.highestOffset = commitLog.lastProcessed(pw.topicPartition) log.Debugf("partitionWorker initialized %+v with lastProcessed offset: %d in %v", pw.topicPartition, pw.highestOffset, elapsed) waiter() pw.eventSource.consumer.Client().ResumeFetchPartitions(map[string][]int32{ pw.topicPartition.Topic: {pw.topicPartition.Partition}, }) // resume partition if it was paused go pw.pushRecords() atomic.StoreInt64(&pw.ready, 1) log.Debugf("partitionWorker activated %+v in %v, interjectionCount: %d", pw.topicPartition, elapsed, len(interjections)) ijPtrs := sak.ToPtrSlice(interjections) for _, ij := range ijPtrs { ij.init(pw.topicPartition, pw.interjectionInput) ij.tick() } pw.eventSource.source.onPartitionActivated(pw.topicPartition.Partition) for { select { case ec := <-pw.eventInput: pw.handleEvent(ec) case ec := <-pw.interjectionEventInput: pw.handleInterjection(ec) case job := <-pw.asyncCompleter.asyncJobs: pw.processAsyncJob(job) case <-pw.stopSignal: for _, ij := range ijPtrs { ij.cancel() } go pw.waitForRevocation() case <-pw.revokedSignal: pw.stopped <- struct{}{} return } } } func (pw *partitionWorker[T]) waitForRevocation() { pw.revocationWaiter.Wait() // wait until all pending events have been accpted by a producerNode pw.revokedSignal <- struct{}{} } func (pw *partitionWorker[T]) processAsyncJob(job AsyncJob[T]) { if job.Finalize() == Complete { job.ctx.complete() <-pw.maxPending } } func (pw *partitionWorker[T]) isRevoked() bool { return !pw.runStatus.Running() } func (pw *partitionWorker[T]) handleInterjection(ec *EventContext[T]) { inter := ec.interjection pw.assignProducer(ec) if ec.producer == nil { <-pw.maxPending if inter.callback != nil { inter.callback() // we need to close off 1-off interjections to prevent sourceConsumer from hanging } } else if ec.producer != nil && inter.interject(ec) == Complete { ec.complete() <-pw.maxPending inter.tick() } } func (pw *partitionWorker[T]) handleEvent(ec *EventContext[T]) bool { pw.forwardToEventSource(ec) return true } func (pw *partitionWorker[T]) assignProducer(ec *EventContext[T]) { // if we stop processing async completions while waiting for a producer // we could eventually dealock with the eos producer // if nothing is yet available, go ahead and process an asyncJob for { select { case ec.producer = <-ec.producerChan: return case job := <-pw.asyncCompleter.asyncJobs: pw.processAsyncJob(job) } } } func (pw *partitionWorker[T]) forwardToEventSource(ec *EventContext[T]) { pw.assignProducer(ec) if ec.producer == nil { // if we're revoked, don't even add this to the onDeck producer <-pw.maxPending return } offset := ec.Offset() pw.highestOffset = offset + 1 record, _ := ec.Input() if pw.eventSource.handleEvent(ec, record) == Complete { ec.complete() <-pw.maxPending } }