package com.amazonaws.athena.connector.lambda.data; /*- * #%L * Amazon Athena Query Federation SDK * %% * Copyright (C) 2019 Amazon Web Services * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * #L% */ import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.VisibleForTesting; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; import org.apache.arrow.vector.DateMilliVector; import org.apache.arrow.vector.DecimalVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.UInt1Vector; import org.apache.arrow.vector.UInt2Vector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.UInt8Vector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.impl.UnionListWriter; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter; import org.apache.arrow.vector.complex.writer.FieldWriter; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Text; import org.apache.commons.codec.Charsets; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; /** * This utility class abstracts many facets of reading and writing values into Apache Arrow's FieldReader and FieldVector * objects. * * @note This class encourages a row wise approach to writing results. These interfaces are often viewed as simpler than * the would be columnar equivalents. Even though many of the systems that we've integrated with using this SDK do not * themselves support columnar access patterns, there is value in offering a a variant of these mechanisms that provide * the skeleton for columnar writing/reading of results. *

* The current SDK version takes the approach that experts can drop into 'native' Apache Arrow mode and simply not use * this abstraction. This approach of making common things easy and still enabling access to a 'power user' mode is * one we'd like to stick with but we'd also like to make it easier for customers that can/want a more columnar * experience/performance to be able to do so more easily. *

* In general the abstractions provided by this utility class also come with a performance hit when compared with native, * columnar, Apache Arrow access patterns. The performance overhead primarily results from Object overhead related to boxing * and/or type conversion. The second source of overhead is the constant lookup and branching of field types, vectors, readers, * etc.. Some of this second category of overhead can be mitigated by being mindful of how you use this class but a more * ideal solution would be to offer an interface that steers you in a better direction. *

* An issue has been opened to track the creation of a columnar variant of this utility: * https://github.com/awslabs/aws-athena-query-federation/issues/1 */ public class BlockUtils { public static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); /** * Creates a new Block with a single column and populated with the provided values. * * @param allocator The BlockAllocator to use when creating the Block. * @param columnName The name of the single column in the Block's Schema. * @param type The Apache Arrow Type of the column. * @param values The values to write to the new Block. Each value will be its own row. * @return The newly created Block with a single column Schema at populated with the provided values. */ public static Block newBlock(BlockAllocator allocator, String columnName, ArrowType type, Object... values) { return newBlock(allocator, columnName, type, Arrays.asList(values)); } /** * Creates a new Block with a single column and populated with the provided values. * * @param allocator The BlockAllocator to use when creating the Block. * @param columnName The name of the single column in the Block's Schema. * @param type The Apache Arrow Type of the column. * @param values The values to write to the new Block. Each value will be its own row. * @return The newly created Block with a single column Schema at populated with the provided values. */ public static Block newBlock(BlockAllocator allocator, String columnName, ArrowType type, Collection values) { SchemaBuilder schemaBuilder = new SchemaBuilder(); schemaBuilder.addField(columnName, type); Schema schema = schemaBuilder.build(); Block block = allocator.createBlock(schema); int count = 0; for (Object next : values) { try { setValue(block.getFieldVector(columnName), count++, next); } catch (Exception ex) { throw new RuntimeException("Error for " + type + " " + columnName + " " + next, ex); } } block.setRowCount(count); return block; } /** * Creates a new, empty, Block with a single column. * * @param allocator The BlockAllocator to use when creating the Block. * @param columnName The name of the single column in the Block's Schema. * @param type The Apache Arrow Type of the column. * @return The newly created, empty, Block with a single column Schema. */ public static Block newEmptyBlock(BlockAllocator allocator, String columnName, ArrowType type) { SchemaBuilder schemaBuilder = new SchemaBuilder(); schemaBuilder.addField(columnName, type); Schema schema = schemaBuilder.build(); return allocator.createBlock(schema); } /** * Used to set complex values (Struct, List, etc...) on the provided FieldVector. * * @param vector The FieldVector into which we should write the provided value. * @param pos The row number that the value should be written to. * @param resolver The FieldResolver that can be used to map your value to the complex type (mostly for Structs, Maps). * @param value The value to write. * @note This method incurs more Object overhead (heap churn) than using Arrow's native interface. Users of this Utility * should weigh their performance needs vs. the readability / ease of use. */ public static void setComplexValue(FieldVector vector, int pos, FieldResolver resolver, Object value) { if (vector instanceof ListVector) { if (value != null) { UnionListWriter writer = ((ListVector) vector).getWriter(); writer.setPosition(pos); writeList(vector.getAllocator(), writer, vector.getField(), pos, ((List) value).iterator(), resolver); ((ListVector) vector).setNotNull(pos); } } else if (vector instanceof StructVector) { StructWriter writer = ((StructVector) vector).getWriter(); writer.setPosition(pos); writeStruct(vector.getAllocator(), writer, vector.getField(), pos, value, resolver); } else { throw new RuntimeException("Unsupported 'Complex' vector " + vector.getClass().getSimpleName() + " for field " + vector.getField().getName()); } } /** * Used to set values (Int, BigInt, Bit, etc...) on the provided FieldVector. * * @param vector The FieldVector into which we should write the provided value. * @param pos The row number that the value should be written to. * @param value The value to write. * @note This method incurs more Object overhead (heap churn) than using Arrow's native interface. Users of this Utility * should weigh their performance needs vs. the readability / ease of use. */ public static void setValue(FieldVector vector, int pos, Object value) { try { if (value == null) { setNullValue(vector, pos); return; } //TODO: add all types switch (vector.getMinorType()) { case DATEMILLI: if (value instanceof Date) { ((DateMilliVector) vector).setSafe(pos, ((Date) value).getTime()); } else if (value instanceof LocalDateTime) { ((DateMilliVector) vector).setSafe( pos, ((LocalDateTime) value).atZone(UTC_ZONE_ID).toInstant().toEpochMilli()); } else { ((DateMilliVector) vector).setSafe(pos, (long) value); } break; case DATEDAY: if (value instanceof Date) { org.joda.time.Days days = org.joda.time.Days.daysBetween(EPOCH, new org.joda.time.DateTime(((Date) value).getTime())); ((DateDayVector) vector).setSafe(pos, days.getDays()); } if (value instanceof LocalDate) { int days = (int) ((LocalDate) value).toEpochDay(); ((DateDayVector) vector).setSafe(pos, days); } else if (value instanceof Long) { ((DateDayVector) vector).setSafe(pos, ((Long) value).intValue()); } else { ((DateDayVector) vector).setSafe(pos, (int) value); } break; case FLOAT8: ((Float8Vector) vector).setSafe(pos, (double) value); break; case FLOAT4: ((Float4Vector) vector).setSafe(pos, (float) value); break; case INT: if (value != null && value instanceof Long) { //This may seem odd at first but many frameworks (like Presto) use long as the preferred //native java type for representing integers. We do this to keep type conversions simple. ((IntVector) vector).setSafe(pos, ((Long) value).intValue()); } else { ((IntVector) vector).setSafe(pos, (int) value); } break; case TINYINT: if (value instanceof Byte) { ((TinyIntVector) vector).setSafe(pos, (byte) value); } else { ((TinyIntVector) vector).setSafe(pos, (int) value); } break; case SMALLINT: if (value instanceof Short) { ((SmallIntVector) vector).setSafe(pos, (short) value); } else { ((SmallIntVector) vector).setSafe(pos, (int) value); } break; case UINT1: ((UInt1Vector) vector).setSafe(pos, (int) value); break; case UINT2: ((UInt2Vector) vector).setSafe(pos, (int) value); break; case UINT4: ((UInt4Vector) vector).setSafe(pos, (int) value); break; case UINT8: ((UInt8Vector) vector).setSafe(pos, (int) value); break; case BIGINT: ((BigIntVector) vector).setSafe(pos, (long) value); break; case VARBINARY: ((VarBinaryVector) vector).setSafe(pos, (byte[]) value); break; case DECIMAL: DecimalVector dVector = ((DecimalVector) vector); if (value instanceof Double) { BigDecimal bdVal = new BigDecimal((double) value); bdVal = bdVal.setScale(dVector.getScale(), RoundingMode.HALF_UP); dVector.setSafe(pos, bdVal); } else { BigDecimal scaledValue = ((BigDecimal) value).setScale(dVector.getScale(), RoundingMode.HALF_UP); ((DecimalVector) vector).setSafe(pos, scaledValue); } break; case VARCHAR: if (value instanceof Text) { ((VarCharVector) vector).setSafe(pos, (Text) value); } else { // always fall back to the object's toString() ((VarCharVector) vector).setSafe(pos, value.toString().getBytes(Charsets.UTF_8)); } break; case BIT: if (value instanceof Integer && (int) value > 0) { ((BitVector) vector).setSafe(pos, 1); } else if (value instanceof Boolean && (boolean) value) { ((BitVector) vector).setSafe(pos, 1); } else { ((BitVector) vector).setSafe(pos, 0); } break; default: throw new IllegalArgumentException("Unknown type " + vector.getMinorType()); } } catch (RuntimeException ex) { String fieldName = (vector != null) ? vector.getField().getName() : "null_vector"; throw new RuntimeException("Unable to set value for field " + fieldName + " using value " + value, ex); } } /** * Used to convert a specific row in the provided Block to a human readable string. This is useful for diagnostic * logging. * * @param block The Block to read the row from. * @param row The row number to read. * @return The human readable String representation of the requested row. */ public static String rowToString(Block block, int row) { if (row > block.getRowCount()) { throw new IllegalArgumentException(row + " exceeds available rows " + block.getRowCount()); } StringBuilder sb = new StringBuilder(); for (FieldReader nextReader : block.getFieldReaders()) { try { nextReader.setPosition(row); if (sb.length() > 0) { sb.append(", "); } sb.append("["); sb.append(nextReader.getField().getName()); sb.append(" : "); sb.append(fieldToString(nextReader)); sb.append("]"); } catch (RuntimeException ex) { throw new RuntimeException("Error processing field " + nextReader.getField().getName(), ex); } } return sb.toString(); } /** * Used to convert a single cell for the given FieldReader to a human readable string. * * @param reader The FieldReader from which we should read the current cell. This means the position to be read should * have been set on the reader before calling this method. * @return The human readable String representation of the value at the FieldReaders current position. */ public static String fieldToString(FieldReader reader) { switch (reader.getMinorType()) { case DATEDAY: return String.valueOf(reader.readInteger()); case DATEMILLI: return String.valueOf(reader.readLocalDateTime()); case FLOAT8: case FLOAT4: case UINT4: case UINT8: case INT: case BIGINT: case VARCHAR: case BIT: return String.valueOf(reader.readObject()); case DECIMAL: return String.valueOf(reader.readBigDecimal()); case SMALLINT: return String.valueOf(reader.readShort()); case TINYINT: case UINT1: return Integer.valueOf(reader.readByte()).toString(); case UINT2: return Integer.valueOf(reader.readCharacter()).toString(); case VARBINARY: return bytesToHex(reader.readByteArray()); case STRUCT: StringBuilder sb = new StringBuilder(); sb.append("{"); for (Field child : reader.getField().getChildren()) { if (sb.length() > 3) { sb.append(","); } sb.append("["); sb.append(child.getName()); sb.append(" : "); sb.append(fieldToString(reader.reader(child.getName()))); sb.append("]"); } sb.append("}"); return sb.toString(); case LIST: StringBuilder sbList = new StringBuilder(); sbList.append("{"); while (reader.next()) { if (sbList.length() > 1) { sbList.append(","); } sbList.append(fieldToString(reader.reader())); } sbList.append("}"); return sbList.toString(); default: Object obj = reader.readObject(); return reader.getMinorType() + " - " + ((obj != null) ? obj.getClass().toString() : "null") + "[ " + String.valueOf(obj) + " ]"; } } /** * Copies a inclusive range of rows from one block to another. * * @param srcBlock The source Block to copy the range of rows from. * @param dstBlock The destination Block to copy the range of rows to. * @param firstRow The first row we'd like to copy. * @param lastRow The last row we'd like to copy. * @return The number of rows that were copied. */ public static int copyRows(Block srcBlock, Block dstBlock, int firstRow, int lastRow) { if (firstRow > lastRow || lastRow > srcBlock.getRowCount() - 1) { throw new RuntimeException("src has " + srcBlock.getRowCount() + " but requested copy of " + firstRow + " to " + lastRow); } for (FieldReader src : srcBlock.getFieldReaders()) { int dstOffset = dstBlock.getRowCount(); for (int i = firstRow; i <= lastRow; i++) { FieldVector dst = dstBlock.getFieldVector(src.getField().getName()); src.setPosition(i); setValue(dst, dstOffset++, src.readObject()); } } int rowsCopied = 1 + (lastRow - firstRow); dstBlock.setRowCount(dstBlock.getRowCount() + rowsCopied); return rowsCopied; } /** * Checks if a row is null by checking that all fields in that row are null (aka not set). * * @param block The Block we'd like to check. * @param row The row number we'd like to check. * @return True if the entire row is null (aka all fields null/unset), False if any field has a non-null value. */ public static boolean isNullRow(Block block, int row) { if (row > block.getRowCount() - 1) { throw new RuntimeException("block has " + block.getRowCount() + " rows but requested to check " + row); } //If any column is non-null then return false for (FieldReader src : block.getFieldReaders()) { src.setPosition(row); if (src.isSet()) { return false; } } return true; } /** * Used to write a List value. * * @param allocator The BlockAllocator which can be used to generate Apache Arrow Buffers for types * which require conversion to an Arrow Buffer before they can be written using the FieldWriter. * @param writer The FieldWriter for the List field we'd like to write into. * @param field The Schema details of the List Field we are writing into. * @param pos The position (row) in the Apache Arrow batch we are writing to. * @param value An iterator to the collection of values we want to write into the row. * @param resolver The field resolver that can be used to extract individual values from the value iterator. */ @VisibleForTesting protected static void writeList(BufferAllocator allocator, FieldWriter writer, Field field, int pos, Iterator value, FieldResolver resolver) { //Apache Arrow List types have a single 'special' child field which gives us the concrete type of the values //stored in the list. Field child = null; if (field.getChildren() != null && !field.getChildren().isEmpty()) { child = field.getChildren().get(0); } //Mark the beginning of the list, this is essentially how Apache Arrow handles the variable length nature //of lists. writer.startList(); Iterator itr = value; while (itr.hasNext()) { //For each item in the iterator, attempt to write it to the list. Object val = itr.next(); if (val != null) { switch (Types.getMinorTypeForArrowType(child.getType())) { case LIST: try { writeList(allocator, (FieldWriter) writer.list(), child, pos, ((List) val).iterator(), resolver); } catch (Exception ex) { throw ex; } break; case STRUCT: writeStruct(allocator, writer.struct(), child, pos, val, resolver); break; default: writeListValue(writer, child.getType(), allocator, val); break; } } } writer.endList(); } /** * Used to write a Struct value. * * @param allocator The BlockAllocator which can be used to generate Apache Arrow Buffers for types * which require conversion to an Arrow Buffer before they can be written using the FieldWriter. * @param writer The FieldWriter for the Struct field we'd like to write into. * @param field The Schema details of the Struct Field we are writing into. * @param pos The position (row) in the Apache Arrow batch we are writing to. * @param value The value we'd like to write as a struct. * @param resolver The field resolver that can be used to extract individual Struct fields from the value. */ @VisibleForTesting protected static void writeStruct(BufferAllocator allocator, StructWriter writer, Field field, int pos, Object value, FieldResolver resolver) { //We expect null writes to have been handled earlier so this is a no-op. if (value == null) { return; } //Indicate the beginning of the struct value, this is how Apache Arrow handles the variable length of Struct types. writer.start(); for (Field nextChild : field.getChildren()) { //For each child field that comprises the struct, attempt to extract and write the corresponding value //using the FieldResolver. Object childValue = resolver.getFieldValue(nextChild, value); switch (Types.getMinorTypeForArrowType(nextChild.getType())) { case LIST: writeList(allocator, (FieldWriter) writer.list(nextChild.getName()), nextChild, pos, ((List) childValue).iterator(), resolver); break; case STRUCT: writeStruct(allocator, writer.struct(nextChild.getName()), nextChild, pos, childValue, resolver); break; default: writeStructValue(writer, nextChild, allocator, childValue); break; } } writer.end(); } @VisibleForTesting /** * Maps an Arrow Type to a Java class. * @param minorType * @return Java class mapping the Arrow type */ public static Class getJavaType(Types.MinorType minorType) { switch (minorType) { case DATEMILLI: return LocalDateTime.class; case TINYINT: case UINT1: return Byte.class; case SMALLINT: return Short.class; case UINT2: return Character.class; case DATEDAY: return LocalDate.class; case INT: case UINT4: return Integer.class; case UINT8: case BIGINT: return Long.class; case DECIMAL: return BigDecimal.class; case FLOAT4: return Float.class; case FLOAT8: return Double.class; case VARCHAR: return String.class; case VARBINARY: return byte[].class; case BIT: return Boolean.class; case LIST: return List.class; case STRUCT: return Map.class; default: throw new IllegalArgumentException("Unknown type " + minorType); } } /** * Used to write an individual value into a List field, multiple calls to this method per-cell are expected in order * to write the N values of a list of size N. * * @param writer The FieldWriter (already positioned at the row and list entry number) that we want to write into. * @param type The concrete type of the List's values. * @param allocator The BlockAllocator that can be used for allocating Arrow Buffers for fields which require conversion * to Arrow Buff before being written. * @param value The value to write. * @note This method and its Struct complement violate the DRY mantra because ListWriter and StructWriter don't share * a meaningful ancestor despite having identical methods. This requires us to either further wrap and abstract the writer * or duplicate come code. In a future release we hope to have contributed a better option to Apache Arrow which allows * us to simplify this method. */ protected static void writeListValue(FieldWriter writer, ArrowType type, BufferAllocator allocator, Object value) { try { //TODO: add all types switch (Types.getMinorTypeForArrowType(type)) { case DATEMILLI: if (value instanceof Date) { writer.writeDateMilli(((Date) value).getTime()); } else { writer.writeDateMilli((long) value); } break; case DATEDAY: if (value instanceof Date) { org.joda.time.Days days = org.joda.time.Days.daysBetween(EPOCH, new org.joda.time.DateTime(((Date) value).getTime())); writer.writeDateDay(days.getDays()); } else if (value instanceof LocalDate) { int days = (int) ((LocalDate) value).toEpochDay(); writer.writeDateDay(days); } else if (value instanceof Long) { writer.writeDateDay(((Long) value).intValue()); } else { writer.writeDateDay((int) value); } break; case FLOAT8: writer.float8().writeFloat8((double) value); break; case FLOAT4: writer.float4().writeFloat4((float) value); break; case INT: if (value != null && value instanceof Long) { //This may seem odd at first but many frameworks (like Presto) use long as the preferred //native java type for representing integers. We do this to keep type conversions simple. writer.integer().writeInt(((Long) value).intValue()); } else { writer.integer().writeInt((int) value); } break; case TINYINT: writer.tinyInt().writeTinyInt((byte) value); break; case SMALLINT: writer.smallInt().writeSmallInt((short) value); break; case UINT1: writer.uInt1().writeUInt1((byte) value); break; case UINT2: writer.uInt2().writeUInt2((char) value); break; case UINT4: writer.uInt4().writeUInt4((int) value); break; case UINT8: writer.uInt8().writeUInt8((long) value); break; case BIGINT: writer.bigInt().writeBigInt((long) value); break; case VARBINARY: if (value instanceof ArrowBuf) { ArrowBuf buf = (ArrowBuf) value; writer.varBinary().writeVarBinary(0, buf.capacity(), buf); } else if (value instanceof byte[]) { byte[] bytes = (byte[]) value; try (ArrowBuf buf = allocator.buffer(bytes.length)) { buf.writeBytes(bytes); writer.varBinary().writeVarBinary(0, buf.readableBytes(), buf); } } break; case DECIMAL: int scale = ((ArrowType.Decimal) type).getScale(); if (value instanceof Double) { int precision = ((ArrowType.Decimal) type).getPrecision(); BigDecimal bdVal = new BigDecimal((double) value); bdVal = bdVal.setScale(scale, RoundingMode.HALF_UP); writer.decimal().writeDecimal(bdVal); } else { BigDecimal scaledValue = ((BigDecimal) value).setScale(scale, RoundingMode.HALF_UP); writer.decimal().writeDecimal(scaledValue); } break; case VARCHAR: if (value instanceof ArrowBuf) { ArrowBuf buf = (ArrowBuf) value; writer.varChar().writeVarChar(0, buf.readableBytes(), buf); } else if (value instanceof byte[]) { byte[] bytes = (byte[]) value; try (ArrowBuf buf = allocator.buffer(bytes.length)) { buf.writeBytes(bytes); writer.varChar().writeVarChar(0, buf.readableBytes(), buf); } } else { // always fall back to the object's toString() byte[] bytes = value.toString().getBytes(Charsets.UTF_8); try (ArrowBuf buf = allocator.buffer(bytes.length)) { buf.writeBytes(bytes); writer.varChar().writeVarChar(0, buf.readableBytes(), buf); } } break; case BIT: if (value instanceof Integer && (int) value > 0) { writer.bit().writeBit(1); } else if (value instanceof Boolean && (boolean) value) { writer.bit().writeBit(1); } else { writer.bit().writeBit(0); } break; default: throw new IllegalArgumentException("Unknown type " + type); } } catch (RuntimeException ex) { String fieldName = (writer.getField() != null) ? writer.getField().getName() : "null_vector"; throw new RuntimeException("Unable to write value for field " + fieldName + " using value " + value, ex); } } /** * Used to write a value into a specific child field within a Struct. Multiple calls to this method per-cell are * expected in order to write to all N fields of a Struct. * * @param writer The FieldWriter (already positioned at the row and list entry number) that we want to write into. * @param field The child field we are attempting to write into. * @param allocator The BlockAllocator that can be used for allocating Arrow Buffers for fields which require conversion * to Arrow Buff before being written. * @param value The value to write. * @note This method and its List complement violate the DRY mantra because ListWriter and StructWriter don't share * a meaningful ancestor despite having identical methods. This requires us to either further wrap and abstract the writer * or duplicate come code. In a future release we hope to have contributed a better option to Apache Arrow which allows * us to simplify this method. */ @VisibleForTesting protected static void writeStructValue(StructWriter writer, Field field, BufferAllocator allocator, Object value) { if (value == null) { return; } ArrowType type = field.getType(); try { switch (Types.getMinorTypeForArrowType(type)) { case DATEMILLI: if (value instanceof Date) { writer.dateMilli(field.getName()).writeDateMilli(((Date) value).getTime()); } else { writer.dateMilli(field.getName()).writeDateMilli((long) value); } break; case DATEDAY: if (value instanceof Date) { org.joda.time.Days days = org.joda.time.Days.daysBetween(EPOCH, new org.joda.time.DateTime(((Date) value).getTime())); writer.dateDay(field.getName()).writeDateDay(days.getDays()); } else if (value instanceof LocalDate) { int days = (int) ((LocalDate) value).toEpochDay(); writer.dateDay(field.getName()).writeDateDay(days); } else if (value instanceof Long) { writer.dateDay(field.getName()).writeDateDay(((Long) value).intValue()); } else { writer.dateDay(field.getName()).writeDateDay((int) value); } break; case FLOAT8: writer.float8(field.getName()).writeFloat8((double) value); break; case FLOAT4: writer.float4(field.getName()).writeFloat4((float) value); break; case INT: if (value != null && value instanceof Long) { //This may seem odd at first but many frameworks (like Presto) use long as the preferred //native java type for representing integers. We do this to keep type conversions simple. writer.integer(field.getName()).writeInt(((Long) value).intValue()); } else { writer.integer(field.getName()).writeInt((int) value); } break; case TINYINT: writer.tinyInt(field.getName()).writeTinyInt((byte) value); break; case SMALLINT: writer.smallInt(field.getName()).writeSmallInt((short) value); break; case UINT1: writer.uInt1(field.getName()).writeUInt1((byte) value); break; case UINT2: writer.uInt2(field.getName()).writeUInt2((char) value); break; case UINT4: writer.uInt4(field.getName()).writeUInt4((int) value); break; case UINT8: writer.uInt8(field.getName()).writeUInt8((long) value); break; case BIGINT: writer.bigInt(field.getName()).writeBigInt((long) value); break; case VARBINARY: if (value instanceof ArrowBuf) { ArrowBuf buf = (ArrowBuf) value; writer.varBinary(field.getName()).writeVarBinary(0, buf.capacity(), buf); } else if (value instanceof byte[]) { byte[] bytes = (byte[]) value; try (ArrowBuf buf = allocator.buffer(bytes.length)) { buf.writeBytes(bytes); writer.varBinary(field.getName()).writeVarBinary(0, buf.readableBytes(), buf); } } break; case DECIMAL: int scale = ((ArrowType.Decimal) type).getScale(); int precision = ((ArrowType.Decimal) type).getPrecision(); if (value instanceof Double) { BigDecimal bdVal = new BigDecimal((double) value); bdVal = bdVal.setScale(scale, RoundingMode.HALF_UP); writer.decimal(field.getName(), scale, precision).writeDecimal(bdVal); } else { BigDecimal scaledValue = ((BigDecimal) value).setScale(scale, RoundingMode.HALF_UP); writer.decimal(field.getName(), scale, precision).writeDecimal(scaledValue); } break; case VARCHAR: if (value instanceof String) { byte[] bytes = ((String) value).getBytes(Charsets.UTF_8); try (ArrowBuf buf = allocator.buffer(bytes.length)) { buf.writeBytes(bytes); writer.varChar(field.getName()).writeVarChar(0, buf.readableBytes(), buf); } } else if (value instanceof ArrowBuf) { ArrowBuf buf = (ArrowBuf) value; writer.varChar(field.getName()).writeVarChar(0, buf.readableBytes(), buf); } else if (value instanceof byte[]) { byte[] bytes = (byte[]) value; try (ArrowBuf buf = allocator.buffer(bytes.length)) { buf.writeBytes(bytes); writer.varChar(field.getName()).writeVarChar(0, buf.readableBytes(), buf); } } break; case BIT: if (value instanceof Integer && (int) value > 0) { writer.bit(field.getName()).writeBit(1); } else if (value instanceof Boolean && (boolean) value) { writer.bit(field.getName()).writeBit(1); } else { writer.bit(field.getName()).writeBit(0); } break; default: throw new IllegalArgumentException("Unknown type " + type); } } catch (RuntimeException ex) { throw new RuntimeException("Unable to write value for field " + field.getName() + " using value " + value, ex); } } /** * Used to mark a particular cell as null. * * @param vector The FieldVector to write the null value to. * @param pos The position (row) in the FieldVector to mark as null. */ private static void setNullValue(FieldVector vector, int pos) { switch (vector.getMinorType()) { case DATEMILLI: ((DateMilliVector) vector).setNull(pos); break; case DATEDAY: ((DateDayVector) vector).setNull(pos); break; case FLOAT8: ((Float8Vector) vector).setNull(pos); break; case FLOAT4: ((Float4Vector) vector).setNull(pos); break; case INT: ((IntVector) vector).setNull(pos); break; case TINYINT: ((TinyIntVector) vector).setNull(pos); break; case SMALLINT: ((SmallIntVector) vector).setNull(pos); break; case UINT1: ((UInt1Vector) vector).setNull(pos); break; case UINT2: ((UInt2Vector) vector).setNull(pos); break; case UINT4: ((UInt4Vector) vector).setNull(pos); break; case UINT8: ((UInt8Vector) vector).setNull(pos); break; case BIGINT: ((BigIntVector) vector).setNull(pos); break; case VARBINARY: ((VarBinaryVector) vector).setNull(pos); break; case DECIMAL: ((DecimalVector) vector).setNull(pos); break; case VARCHAR: ((VarCharVector) vector).setNull(pos); break; case BIT: ((BitVector) vector).setNull(pos); break; default: throw new IllegalArgumentException("Unknown type " + vector.getMinorType()); } } /** * In some filtering situations it can be useful to 'unset' a row as an indication to a later processing stage * that the row is irrelevant. The mechanism by which we 'unset' a row is actually field type specific and as such * this method is not supported for all field types. * * @param row The row number to unset in the provided Block. * @param block The Block where we'd like to unset the specified row. */ public static void unsetRow(int row, Block block) { for (FieldVector vector : block.getFieldVectors()) { switch (vector.getMinorType()) { case DATEDAY: ((DateDayVector) vector).setNull(row); break; case DATEMILLI: ((DateMilliVector) vector).setNull(row); break; case TINYINT: ((TinyIntVector) vector).setNull(row); break; case UINT1: ((UInt1Vector) vector).setNull(row); break; case SMALLINT: ((SmallIntVector) vector).setNull(row); break; case UINT2: ((UInt2Vector) vector).setNull(row); break; case UINT4: ((UInt4Vector) vector).setNull(row); break; case INT: ((IntVector) vector).setNull(row); break; case UINT8: ((UInt8Vector) vector).setNull(row); break; case BIGINT: ((BigIntVector) vector).setNull(row); break; case FLOAT4: ((Float4Vector) vector).setNull(row); break; case FLOAT8: ((Float8Vector) vector).setNull(row); break; case DECIMAL: ((DecimalVector) vector).setNull(row); break; case VARBINARY: ((VarBinaryVector) vector).setNull(row); break; case VARCHAR: ((VarCharVector) vector).setNull(row); break; case BIT: ((BitVector) vector).setNull(row); break; case STRUCT: ((StructVector) vector).setNull(row); break; case LIST: UnionListWriter writer = ((ListVector) vector).getWriter(); writer.setPosition(row); writer.startList(); writer.endList(); writer.setValueCount(0); break; default: throw new IllegalArgumentException("Unknown type " + vector.getMinorType()); } } } public static final org.joda.time.MutableDateTime EPOCH = new org.joda.time.MutableDateTime(); static { EPOCH.setDate(0); } private BlockUtils() {} private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); private static String bytesToHex(byte[] bytes) { char[] hexChars = new char[bytes.length * 2]; for (int j = 0; j < bytes.length; j++) { int v = bytes[j] & 0xFF; hexChars[j * 2] = HEX_ARRAY[v >>> 4]; hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; } return new String(hexChars); } }