/* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.dataprepper.model.event; import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.commons.lang3.StringUtils; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.StringJoiner; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** * A Jackson Implementation of {@link Event} interface. This implementation relies heavily on JsonNode to manage the keys of the event. *

* This implementation supports [JsonPointer](https://datatracker.ietf.org/doc/html/rfc6901) for keys to access nested structures. * For example using the key "/fizz/buzz" would allow a user to retrieve the number 42 using {@link #get(String, Class)} from the nested structure below. * Additionally, a key structure without a prefixed "/" will access the same value: "fizz/buzz" *

* { * "foo": "bar" * "fizz": { * "buzz": 42 * } * } * * @since 1.2 */ public class JacksonEvent implements Event { private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class); private static final String SEPARATOR = "/"; private static final ObjectMapper mapper = new ObjectMapper() .registerModule(new JavaTimeModule()) .registerModule(new Jdk8Module()); // required for using Optional with Jackson. Ref: https://github.com/FasterXML/jackson-modules-java8 private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() { }; private final EventMetadata eventMetadata; private EventHandle eventHandle; private final JsonNode jsonNode; static final int MAX_KEY_LENGTH = 2048; static final String MESSAGE_KEY = "message"; static final String EVENT_TYPE = "event"; protected JacksonEvent(final Builder builder) { if (builder.eventMetadata == null) { this.eventMetadata = new DefaultEventMetadata.Builder() .withEventType(builder.eventType) .withTimeReceived(builder.timeReceived) .withAttributes(builder.eventMetadataAttributes) .build(); } else { this.eventMetadata = builder.eventMetadata; } this.jsonNode = getInitialJsonNode(builder.data); } protected JacksonEvent(final JacksonEvent otherEvent) { this.jsonNode = otherEvent.jsonNode.deepCopy(); this.eventMetadata = DefaultEventMetadata.fromEventMetadata(otherEvent.eventMetadata); } public static Event fromMessage(String message) { return JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(Collections.singletonMap(MESSAGE_KEY, message)) .build(); } private JsonNode getInitialJsonNode(final Object data) { if (data == null) { return mapper.valueToTree(new HashMap<>()); } else if (data instanceof String) { try { return mapper.readTree((String) data); } catch (final JsonProcessingException e) { throw new IllegalArgumentException("Unable to convert data into an event"); } } return mapper.valueToTree(data); } protected JsonNode getJsonNode() { return jsonNode; } /** * Adds or updates the key with a given value in the Event. * * @param key where the value will be set * @param value value to set the key to * @since 1.2 */ @Override public void put(final String key, final Object value) { final String trimmedKey = checkAndTrimKey(key); final LinkedList keys = new LinkedList<>(Arrays.asList(trimmedKey.split(SEPARATOR))); JsonNode parentNode = jsonNode; while (!keys.isEmpty()) { if (keys.size() == 1) { setNode(parentNode, keys.removeFirst(), value); } else { final String childKey = keys.removeFirst(); if (!childKey.isEmpty()) { parentNode = getOrCreateNode(parentNode, childKey); } } } } public void setEventHandle(EventHandle handle) { this.eventHandle = handle; } @Override public EventHandle getEventHandle() { return eventHandle; } private void setNode(final JsonNode parentNode, final String leafKey, final Object value) { final JsonNode valueNode = mapper.valueToTree(value); if (StringUtils.isNumeric(leafKey)) { ((ArrayNode) parentNode).set(Integer.parseInt(leafKey), valueNode); } else { ((ObjectNode) parentNode).set(leafKey, valueNode); } } private JsonNode getOrCreateNode(final JsonNode node, final String key) { JsonNode childNode = node.get(key); if (childNode == null) { childNode = mapper.createObjectNode(); ((ObjectNode) node).set(key, childNode); } return childNode; } /** * Retrieves the value of type clazz from the key. * * @param key the value to retrieve from * @param clazz the return type of the value * @return the value * @throws RuntimeException if it is unable to map the value to the provided clazz * @since 1.2 */ @Override public T get(final String key, final Class clazz) { final String trimmedKey = checkAndTrimKey(key); final JsonNode node = getNode(trimmedKey); if (node.isMissingNode()) { return null; } return mapNodeToObject(key, node, clazz); } private JsonNode getNode(final String key) { final JsonPointer jsonPointer = toJsonPointer(key); return jsonNode.at(jsonPointer); } private T mapNodeToObject(final String key, final JsonNode node, final Class clazz) { try { return mapper.treeToValue(node, clazz); } catch (final JsonProcessingException e) { LOG.error("Unable to map {} to {}", key, clazz, e); throw new RuntimeException(String.format("Unable to map %s to %s", key, clazz), e); } } /** * Retrieves the given key from the Event as a List * * @param key the value to retrieve from * @param clazz the return type of elements in the list * @return a List of clazz * @throws RuntimeException if it is unable to map the elements in the list to the provided clazz * @since 1.2 */ @Override public List getList(final String key, final Class clazz) { final String trimmedKey = checkAndTrimKey(key); final JsonNode node = getNode(trimmedKey); if (node.isMissingNode()) { return null; } return mapNodeToList(key, node, clazz); } private List mapNodeToList(final String key, final JsonNode node, final Class clazz) { try { final ObjectReader reader = mapper.readerFor(TypeFactory.defaultInstance().constructCollectionType(List.class, clazz)); return reader.readValue(node); } catch (final IOException e) { LOG.error("Unable to map {} to List of {}", key, clazz, e); throw new RuntimeException(String.format("Unable to map %s to %s", key, clazz), e); } } private JsonPointer toJsonPointer(final String key) { String jsonPointerExpression = SEPARATOR + key; return JsonPointer.compile(jsonPointerExpression); } /** * Deletes the key from the event. * * @param key the field to be deleted */ @Override public void delete(final String key) { final String trimmedKey = checkAndTrimKey(key); final int index = trimmedKey.lastIndexOf(SEPARATOR); JsonNode baseNode = jsonNode; String leafKey = trimmedKey; if (index != -1) { final JsonPointer jsonPointer = toJsonPointer(trimmedKey.substring(0, index)); baseNode = jsonNode.at(jsonPointer); leafKey = trimmedKey.substring(index + 1); } if (!baseNode.isMissingNode()) { ((ObjectNode) baseNode).remove(leafKey); } } @Override public String toJsonString() { return jsonNode.toString(); } @Override public String getAsJsonString(final String key) { final String trimmedKey = checkAndTrimKey(key); final JsonNode node = getNode(trimmedKey); if (node.isMissingNode()) { return null; } return node.toString(); } /** * returns a string with formatted parts replaced by their values. The input * string may contain parts with format "${.../.../...}" which are replaced * by their value in the event * * @param format string with format * @throws RuntimeException if the format is incorrect or the value is not a string */ @Override public String formatString(final String format) { return formatStringInternal(format, null); } /** * returns a string with formatted parts replaced by their values. The input * string may contain parts with format "${.../.../...}" which are replaced * by their value in the event. The input string may also contain Data Prepper expressions * such as "${getMetadata(\"some_metadata_key\")} * * @param format string with format * @throws RuntimeException if the format is incorrect or the value is not a string */ @Override public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) { return formatStringInternal(format, expressionEvaluator); } private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) { int fromIndex = 0; String result = ""; int position = 0; while ((position = format.indexOf("${", fromIndex)) != -1) { int endPosition = format.indexOf("}", position + 1); if (endPosition == -1) { throw new RuntimeException("Format string is not properly formed"); } result += format.substring(fromIndex, position); String name = format.substring(position + 2, endPosition); Object val; if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) { val = expressionEvaluator.evaluate(name, this); } else { val = this.get(name, Object.class); if (val == null) { throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name)); } } if (Objects.nonNull(val)) { result += val.toString(); } fromIndex = endPosition + 1; } if (fromIndex < format.length()) { result += format.substring(fromIndex); } return result; } @Override public EventMetadata getMetadata() { return eventMetadata; } @Override public boolean containsKey(final String key) { final String trimmedKey = checkAndTrimKey(key); final JsonNode node = getNode(trimmedKey); return !node.isMissingNode(); } @Override public boolean isValueAList(final String key) { final String trimmedKey = checkAndTrimKey(key); final JsonNode node = getNode(trimmedKey); return node.isArray(); } @Override public Map toMap() { return mapper.convertValue(jsonNode, MAP_TYPE_REFERENCE); } private String checkAndTrimKey(final String key) { checkKey(key); return trimKey(key); } private void checkKey(final String key) { checkNotNull(key, "key cannot be null"); checkArgument(!key.isEmpty(), "key cannot be an empty string"); if (key.length() > MAX_KEY_LENGTH) { throw new IllegalArgumentException("key cannot be longer than " + MAX_KEY_LENGTH + " characters"); } if (!isValidKey(key)) { throw new IllegalArgumentException("key " + key + " must contain only alphanumeric chars with .-_ and must follow JsonPointer (ie. 'field/to/key')"); } } private String trimKey(final String key) { final String trimmedLeadingSlash = key.startsWith(SEPARATOR) ? key.substring(1) : key; return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash; } private boolean isValidKey(final String key) { for (int i = 0; i < key.length(); i++) { char c = key.charAt(i); if (!(c >= 48 && c <= 57 || c >= 65 && c <= 90 || c >= 97 && c <= 122 || c == '.' || c == '-' || c == '_' || c == '@' || c == '/')) { return false; } } return true; } /** * Constructs an empty builder. * * @return a builder * @since 1.2 */ public static Builder builder() { return new Builder() { @Override public Builder getThis() { return this; } }; } public JsonStringBuilder jsonBuilder() { return new JsonStringBuilder(this); } public static JacksonEvent fromEvent(final Event event) { if (event instanceof JacksonEvent) { return new JacksonEvent((JacksonEvent) event); } else { return JacksonEvent.builder() .withData(event.toMap()) .withEventMetadata(event.getMetadata()) .build(); } } /** * Builder for creating {@link JacksonEvent}. * * @since 1.2 */ public abstract static class Builder> { private EventMetadata eventMetadata; private Object data; private String eventType; private Instant timeReceived; private Map eventMetadataAttributes; public abstract T getThis(); /** * Sets the event type for the metadata if a {@link #withEventMetadata} is not used. * * @param eventType the event type * @return returns the builder * @since 1.2 */ public Builder withEventType(final String eventType) { this.eventType = eventType; return this; } /** * Sets the attributes for the metadata if a {@link #withEventMetadata} is not used. * * @param eventMetadataAttributes the attributes * @return returns the builder * @since 1.2 */ public Builder withEventMetadataAttributes(final Map eventMetadataAttributes) { this.eventMetadataAttributes = eventMetadataAttributes; return this; } /** * Sets the time received for the metadata if a {@link #withEventMetadata} is not used. * * @param timeReceived the time an event was received * @return returns the builder * @since 1.2 */ public Builder withTimeReceived(final Instant timeReceived) { this.timeReceived = timeReceived; return this; } /** * Sets the metadata. * * @param eventMetadata the metadata * @return returns the builder * @since 1.2 */ public Builder withEventMetadata(final EventMetadata eventMetadata) { this.eventMetadata = eventMetadata; return this; } /** * Sets the data of the event. * * @param data the data * @return returns the builder * @since 1.2 */ public Builder withData(final Object data) { this.data = data; return this; } /** * Returns a newly created {@link JacksonEvent}. * * @return an event * @since 1.2 */ public JacksonEvent build() { return new JacksonEvent(this); } } public class JsonStringBuilder extends Event.JsonStringBuilder { private final boolean RETAIN_ALL = true; private final boolean EXCLUDE_ALL = false; private final JacksonEvent event; private JsonStringBuilder(final JacksonEvent event) { checkNotNull(event, "event cannot be null"); this.event = event; } private JsonNode getBaseNode() { // Get root node. if (getRootKey() != null && !getRootKey().isEmpty() && event.containsKey(getRootKey())) { return event.getNode(getRootKey()); } return event.getJsonNode(); } public String toJsonString() { String jsonString; if (getIncludeKeys() != null && !getIncludeKeys().isEmpty()) { jsonString = searchAndFilter(getBaseNode(), "", getIncludeKeys(), RETAIN_ALL); } else if (getExcludeKeys() != null && !getExcludeKeys().isEmpty()) { jsonString = searchAndFilter(getBaseNode(), "", getExcludeKeys(), EXCLUDE_ALL); } else if (getBaseNode() !=event.getJsonNode()) { jsonString = event.getAsJsonString(getRootKey()); } else { // Some successors have its own implementation of toJsonString, such as JacksonSpan. // In such case, it's only used when the root key is not provided. // TODO: Need to check if such behaviour is expected. jsonString = event.toJsonString(); } final String tagsKey = getTagsKey(); if (tagsKey != null) { final JsonNode tagsNode = mapper.valueToTree(event.getMetadata().getTags()); return jsonString.substring(0, jsonString.length() - 1) + ",\"" + tagsKey + "\":" + tagsNode.toString() + "}"; } return jsonString; } /** * Perform DFS(Depth-first search) like traversing using recursion on the Json Tree and return the json string. * This supports filtering (to include or exclude) from a list of keys. * * @param node Root node to start traversing * @param path Json path, e.g. /foo/bar * @param filterKeys A list of filtered keys * @param filterAction Either to include (RETAIN_ALL or true) or to exclude (EXCLUDE_ALL or false) * @return a json string with filtered keys */ String searchAndFilter(JsonNode node, String path, final List filterKeys, boolean filterAction) { if (node.isArray()) { // for array node. StringJoiner sj = new StringJoiner(",", "[", "]"); node.forEach(childNode -> sj.add(searchAndFilter(childNode, path, filterKeys, filterAction))); return sj.toString(); } else { StringJoiner sj = new StringJoiner(",", "{", "}"); List valueList = new ArrayList<>(); node.properties().forEach(entry -> { String keyPath = path + SEPARATOR + entry.getKey(); // Track whether the key is found in the filter list. // Different behaviours between include and exclude action. boolean found = false; for (String key : filterKeys) { if (keyPath.equals(key)) { found = true; // To keep the order. if (filterAction == RETAIN_ALL) { valueList.add("\"" + entry.getKey() + "\":" + entry.getValue().toString()); } break; } else if (key.startsWith(keyPath)) { found = true; valueList.add("\"" + entry.getKey() + "\":" + searchAndFilter(entry.getValue(), keyPath, filterKeys, filterAction)); break; } if (key.compareTo(keyPath) > 0) { // To save the comparing. // This requires the filter keys to be sorted first. // This is done in SinkModel. break; } } if (!found && filterAction == EXCLUDE_ALL) { valueList.add("\"" + entry.getKey() + "\":" + entry.getValue().toString()); } }); valueList.forEach(value -> sj.add(value)); return sj.toString(); } } } }