/******************************************************************************* * Copyright 2010-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. ******************************************************************************/ package com.amazonaws.services.cloudtrail.processinglibrary.reader; import com.amazonaws.services.cloudtrail.processinglibrary.configuration.ProcessingConfiguration; import com.amazonaws.services.cloudtrail.processinglibrary.exceptions.CallbackException; import com.amazonaws.services.cloudtrail.processinglibrary.interfaces.EventFilter; import com.amazonaws.services.cloudtrail.processinglibrary.interfaces.EventsProcessor; import com.amazonaws.services.cloudtrail.processinglibrary.interfaces.ExceptionHandler; import com.amazonaws.services.cloudtrail.processinglibrary.interfaces.ProgressReporter; import com.amazonaws.services.cloudtrail.processinglibrary.interfaces.SourceFilter; import com.amazonaws.services.cloudtrail.processinglibrary.manager.S3Manager; import com.amazonaws.services.cloudtrail.processinglibrary.manager.SqsManager; import com.amazonaws.services.cloudtrail.processinglibrary.model.CloudTrailEvent; import com.amazonaws.services.cloudtrail.processinglibrary.model.CloudTrailLog; import com.amazonaws.services.cloudtrail.processinglibrary.model.CloudTrailSource; import com.amazonaws.services.cloudtrail.processinglibrary.model.SQSBasedSource; import com.amazonaws.services.cloudtrail.processinglibrary.progress.BasicProcessLogInfo; import com.amazonaws.services.cloudtrail.processinglibrary.progress.BasicProcessSourceInfo; import com.amazonaws.services.cloudtrail.processinglibrary.progress.ProgressState; import com.amazonaws.services.cloudtrail.processinglibrary.progress.ProgressStatus; import com.amazonaws.services.cloudtrail.processinglibrary.serializer.DefaultEventSerializer; import com.amazonaws.services.cloudtrail.processinglibrary.serializer.EventSerializer; import com.amazonaws.services.cloudtrail.processinglibrary.serializer.RawLogDeliveryEventSerializer; import com.amazonaws.services.cloudtrail.processinglibrary.utils.EventBuffer; import com.amazonaws.services.cloudtrail.processinglibrary.utils.LibraryUtils; import com.amazonaws.services.sqs.model.Message; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.zip.GZIPInputStream; /** * EventReader is responsible for processing a stream of events. It parses each event and hands * the events to EventsProcessor to process. */ public class EventReader { private static final Log logger = LogFactory.getLog(EventReader.class); private final SourceFilter sourceFilter; private final EventFilter eventFilter; private final EventsProcessor eventsProcessor; private final ProgressReporter progressReporter; private final ExceptionHandler exceptionHandler; private ProcessingConfiguration config; private SqsManager sqsManager; private S3Manager s3Manager; /** * Jackson parser to parse CloudTrail log files. */ private ObjectMapper mapper; /** * Internal use only. * * This constructor creates an instance of EventReader object. * * @param eventsProcessor user's implementation of eventsProcessor. * @param sourceFilter user's implementation of sourceFilter. * @param eventFilter user's implementation of eventFilter. * @param progressReporter user's implementation of progressReporter. * @param exceptionHandler user's implementation of exceptionHandler. * @param sqsManager that poll message from SQS queue. * @param s3Manager that download CloudTrail log files from S3. * @param configuration user provided ProcessingConfiguration. */ public EventReader(EventsProcessor eventsProcessor, SourceFilter sourceFilter, EventFilter eventFilter, ProgressReporter progressReporter, ExceptionHandler exceptionHandler, SqsManager sqsManager, S3Manager s3Manager, ProcessingConfiguration configuration) { this.eventsProcessor = eventsProcessor; this.sourceFilter = sourceFilter; this.eventFilter = eventFilter; this.progressReporter = progressReporter; this.exceptionHandler = exceptionHandler; this.config = configuration; this.sqsManager = sqsManager; this.s3Manager = s3Manager; this.mapper = new ObjectMapper(); } /** * Poll messages from SQS queue and convert messages to CloudTrailSource. * * @return a list of {@link CloudTrailSource}. */ public List getSources() { List sqsMessages = sqsManager.pollQueue(); return sqsManager.parseMessage(sqsMessages); } /** * Retrieve S3 object URL from source then downloads the object processes each event through * call back functions. * * @param source {@link CloudTrailSource} to process. */ public void processSource (CloudTrailSource source) { boolean filterSourceOut = false; boolean downloadLogSuccess = true; boolean processSourceSuccess = false; ProgressStatus processSourceStatus = new ProgressStatus(ProgressState.processSource, new BasicProcessSourceInfo(source, processSourceSuccess)); final Object processSourceReportObject = progressReporter.reportStart(processSourceStatus); // Start to process the source try { // Apply source filter first. If source filtered out then delete source immediately and return. if (!sourceFilter.filterSource(source)) { logger.debug("AWSCloudTrailSource " + source + " has been filtered out."); processSourceSuccess = true; filterSourceOut = true; } else { int nLogFilesToProcess = ((SQSBasedSource)source).getLogs().size(); for (CloudTrailLog ctLog : ((SQSBasedSource)source).getLogs()) { //start to process the log boolean processLogSuccess = false; ProgressStatus processLogStatus = new ProgressStatus(ProgressState.processLog, new BasicProcessLogInfo(source, ctLog, processLogSuccess)); final Object processLogReportObject = progressReporter.reportStart(processLogStatus); try { byte[] s3ObjectBytes = s3Manager.downloadLog(ctLog, source); if (s3ObjectBytes == null) { downloadLogSuccess = false; continue; //Failure downloading log file. Skip it. } try (GZIPInputStream gzippedInputStream = new GZIPInputStream(new ByteArrayInputStream(s3ObjectBytes)); EventSerializer serializer = getEventSerializer(gzippedInputStream, ctLog)) { emitEvents(serializer); //decrement this value upon successfully processed a log nLogFilesToProcess --; processLogSuccess = true; } catch (IllegalArgumentException | IOException e) { LibraryUtils.handleException(exceptionHandler, processLogStatus, e, "Failed to parse log file."); } } finally { //end to process the log LibraryUtils.endToProcess(progressReporter, processLogSuccess, processLogStatus, processLogReportObject); } } if (nLogFilesToProcess == 0) { processSourceSuccess = true; } } } catch (CallbackException ex) { exceptionHandler.handleException(ex); } finally { cleanupMessage(filterSourceOut, downloadLogSuccess, processSourceSuccess, source); // end to process the source LibraryUtils.endToProcess(progressReporter, processSourceSuccess, processSourceStatus, processSourceReportObject); } } /** * Delete SQS message after processing source. * * @param progressState {@link ProgressState} either {@link ProgressState#deleteMessage}, or {@link ProgressState#deleteFilteredMessage} * @param source {@link CloudTrailSource} that contains the SQS message that will be deleted. */ private void deleteMessageAfterProcessSource(ProgressState progressState, CloudTrailSource source) { ProgressStatus deleteMessageStatus = new ProgressStatus(progressState, new BasicProcessSourceInfo(source, false)); sqsManager.deleteMessageFromQueue(((SQSBasedSource)source).getSqsMessage(), deleteMessageStatus); } /** * Clean up the message after CPL finishes the processing. *

*

  • If the source is filtered out, the message will be deleted with {@link ProgressState#deleteFilteredMessage}.
  • *
  • If the processing is successful, the message with be deleted with {@link ProgressState#deleteMessage}.
  • *
  • If the processing failed due to downloading logs, the message will not be deleted regardless of * {@link ProcessingConfiguration#isDeleteMessageUponFailure()} value. Otherwise, this property controls the * deletion decision.
  • *

    */ private void cleanupMessage(boolean filterSourceOut, boolean downloadLogsSuccess, boolean processSourceSuccess, CloudTrailSource source) { if (filterSourceOut) { deleteMessageAfterProcessSource(ProgressState.deleteFilteredMessage, source); } else if (processSourceSuccess || sqsManager.shouldDeleteMessageUponFailure(!downloadLogsSuccess)) { deleteMessageAfterProcessSource(ProgressState.deleteMessage, source); } } /** * Gets the EventSerializer based on user's configuration. * * @param inputStream the Gzipped content from CloudTrail log file. * @param ctLog CloudTrail log file. * @return parser that parses CloudTrail log file. */ private EventSerializer getEventSerializer(GZIPInputStream inputStream, CloudTrailLog ctLog) throws IOException { EventSerializer serializer; if (config.isEnableRawEventInfo()) { String logFileContent = new String(LibraryUtils.toByteArray(inputStream), StandardCharsets.UTF_8); JsonParser jsonParser = mapper.getFactory().createParser(logFileContent); serializer = new RawLogDeliveryEventSerializer(logFileContent, ctLog, jsonParser); } else { JsonParser jsonParser = mapper.getFactory().createParser(inputStream); serializer = new DefaultEventSerializer(ctLog, jsonParser); } return serializer; } /** * Filter, buffer, and emit CloudTrailEvents. * * @param serializer {@link EventSerializer} that parses CloudTrail log file. * * @throws IOException If the log cannot be read. * @throws CallbackException If an error occurs when filtering or processing events. */ private void emitEvents(EventSerializer serializer) throws IOException, CallbackException { EventBuffer eventBuffer = new EventBuffer<>(config.getMaxEventsPerEmit()); while (serializer.hasNextEvent()) { CloudTrailEvent event = serializer.getNextEvent(); try { if (eventFilter.filterEvent(event)) { eventBuffer.addEvent(event); if (eventBuffer.isBufferFull()) { eventsProcessor.process(eventBuffer.getEvents()); } } else { logger.debug("AWSCloudTrailEvent " + event + " has been filtered out."); } } catch (Exception e) { logger.error("AWSCloudTrailEvent " + event + " caused the following Exception:", e); throw e; } } //emit whatever in the buffer as last batch List events = eventBuffer.getEvents(); if (!events.isEmpty()) { eventsProcessor.process(events); } } }