/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.jdbc;
import org.opensearch.jdbc.internal.results.ColumnMetaData;
import org.opensearch.jdbc.internal.results.Cursor;
import org.opensearch.jdbc.internal.exceptions.ObjectClosedException;
import org.opensearch.jdbc.internal.results.Row;
import org.opensearch.jdbc.internal.results.Schema;
import org.opensearch.jdbc.logging.LoggingSource;
import org.opensearch.jdbc.logging.Logger;
import org.opensearch.jdbc.protocol.ColumnDescriptor;
import org.opensearch.jdbc.internal.JdbcWrapper;
import org.opensearch.jdbc.protocol.QueryResponse;
import org.opensearch.jdbc.protocol.exceptions.InternalServerErrorException;
import org.opensearch.jdbc.protocol.exceptions.ResponseException;
import org.opensearch.jdbc.protocol.http.JdbcCursorQueryRequest;
import org.opensearch.jdbc.protocol.http.JsonCursorHttpProtocol;
import org.opensearch.jdbc.protocol.http.JsonCursorHttpProtocolFactory;
import org.opensearch.jdbc.transport.http.HttpTransport;
import org.opensearch.jdbc.types.TypeConverter;
import org.opensearch.jdbc.types.TypeConverters;
import org.opensearch.jdbc.types.UnrecognizedOpenSearchTypeException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.NClob;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowId;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* OpenSearch implementaion for a {@link ResultSet}
*
* Column names or labels received in APIs are treated in a
* case-sensitive manner since OpenSearch field names are
* case-sensitive.
*
* The lookup
*/
public class ResultSetImpl implements ResultSet, JdbcWrapper, LoggingSource {
private StatementImpl statement;
protected Cursor cursor;
private String cursorId;
private boolean open = false;
private boolean wasNull = false;
private boolean afterLast = false;
private boolean beforeFirst = true;
private Logger log;
public ResultSetImpl(StatementImpl statement, QueryResponse queryResponse, Logger log) throws SQLException {
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), queryResponse.getCursor(), log);
}
public ResultSetImpl(StatementImpl statement, List extends ColumnDescriptor> columnDescriptors,
List> dataRows, Logger log) throws SQLException {
this(statement, columnDescriptors, dataRows, null, log);
}
public ResultSetImpl(StatementImpl statement, List extends ColumnDescriptor> columnDescriptors,
List> dataRows, String cursorId, Logger log) throws SQLException {
this.statement = statement;
this.log = log;
final Schema schema;
try {
schema = new Schema(columnDescriptors
.stream()
.map(ColumnMetaData::new)
.collect(Collectors.toList()));
List rows = getRowsFromDataRows(dataRows);
this.cursor = new Cursor(schema, rows);
this.cursorId = cursorId;
this.open = true;
} catch (UnrecognizedOpenSearchTypeException ex) {
logAndThrowSQLException(log, new SQLException("Exception creating a ResultSet.", ex));
}
}
@Override
public boolean next() throws SQLException {
log.debug(() -> logEntry("next()"));
checkOpen();
boolean next = cursor.next();
if (!next && this.cursorId != null) {
log.debug(() -> logEntry("buildNextPageFromCursorId()"));
buildNextPageFromCursorId();
log.debug(() -> logExit("buildNextPageFromCursorId()"));
next = cursor.next();
}
if (next) {
beforeFirst = false;
} else {
afterLast = true;
}
boolean finalNext = next;
log.debug(() -> logExit("next", finalNext));
return next;
}
/**
* TODO: Refactor as suggested https://github.com/opendistro-for-elasticsearch/sql-jdbc/pull/76#discussion_r421571383
*
* This method has side effects. It creates a new Cursor to hold rows from new pages.
* Ideally fetching next set of rows using cursorId should be delegated to Cursor.
* In addition, the cursor should be final.
*
**/
protected void buildNextPageFromCursorId() throws SQLException {
try {
JdbcCursorQueryRequest jdbcCursorQueryRequest = new JdbcCursorQueryRequest(this.cursorId);
JsonCursorHttpProtocolFactory protocolFactory = JsonCursorHttpProtocolFactory.INSTANCE;
ConnectionImpl connection = (ConnectionImpl) statement.getConnection();
JsonCursorHttpProtocol protocol = protocolFactory.getProtocol(null, (HttpTransport) connection.getTransport());
QueryResponse queryResponse = protocol.execute(jdbcCursorQueryRequest);
if (queryResponse.getError() != null) {
throw new InternalServerErrorException(
queryResponse.getError().getReason(),
queryResponse.getError().getType(),
queryResponse.getError().getDetails());
}
cursor = new Cursor(cursor.getSchema(), getRowsFromDataRows(queryResponse.getDatarows()));
cursorId = queryResponse.getCursor();
} catch (ResponseException | IOException ex) {
logAndThrowSQLException(log, new SQLException("Error executing cursor query", ex));
}
}
private List getRowsFromDataRows(List> dataRows) {
return dataRows
.parallelStream()
.map(Row::new)
.collect(Collectors.toList());
}
@Override
public void close() throws SQLException {
log.debug(() -> logEntry("close()"));
closeX(true);
log.debug(() -> logExit("close"));
}
protected void closeX(boolean closeStatement) throws SQLException {
cursor = null;
open = false;
if (statement != null) {
statement.resultSetClosed(this, closeStatement);
}
}
@Override
public boolean wasNull() throws SQLException {
return wasNull;
}
@Override
public String getString(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getString (%d)", columnIndex));
checkCursorOperationPossible();
String value = getStringX(columnIndex);
log.debug(() -> logExit("getString", value));
return value;
}
private String getStringX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, String.class);
}
@Override
public boolean getBoolean(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getBoolean (%d)", columnIndex));
checkCursorOperationPossible();
boolean value = getBooleanX(columnIndex);
log.debug(() -> logExit("getBoolean", value));
return value;
}
private boolean getBooleanX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, Boolean.class);
}
@Override
public byte getByte(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getByte (%d)", columnIndex));
checkCursorOperationPossible();
byte value = getByteX(columnIndex);
log.debug(() -> logExit("getByte", value));
return value;
}
private byte getByteX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, Byte.class);
}
@Override
public short getShort(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getShort (%d)", columnIndex));
checkCursorOperationPossible();
short value = getShortX(columnIndex);
log.debug(() -> logExit("getShort", value));
return value;
}
private short getShortX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, Short.class);
}
@Override
public int getInt(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getInt (%d)", columnIndex));
checkCursorOperationPossible();
int value = getIntX(columnIndex);
log.debug(() -> logExit("getInt", value));
return value;
}
private int getIntX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, Integer.class);
}
@Override
public long getLong(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getLong (%d)", columnIndex));
checkCursorOperationPossible();
long value = getLongX(columnIndex);
log.debug(() -> logExit("getLong", value));
return value;
}
private long getLongX(int columnIndex) throws SQLException {
checkCursorOperationPossible();
return getObjectX(columnIndex, Long.class);
}
@Override
public float getFloat(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getFloat (%d)", columnIndex));
checkCursorOperationPossible();
float value = getFloatX(columnIndex);
log.debug(() -> logExit("getFloat", value));
return value;
}
private float getFloatX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, Float.class);
}
@Override
public double getDouble(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getDouble (%d)", columnIndex));
checkCursorOperationPossible();
double value = getDoubleX(columnIndex);
log.debug(() -> logExit("getDouble", value));
return value;
}
private double getDoubleX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, Double.class);
}
@Override
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
log.debug(() -> logEntry("getBigDecimal (%d, %d)", columnIndex, scale));
checkCursorOperationPossible();
BigDecimal value = getBigDecimalX(columnIndex, scale);
log.debug(() -> logExit("getBigDecimal", value));
return value;
}
private BigDecimal getBigDecimalX(int columnIndex, int scale) throws SQLException {
checkOpen();
// TODO - add support?
throw new SQLFeatureNotSupportedException("BigDecimal is not supported");
}
@Override
public byte[] getBytes(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getBytes (%d)", columnIndex));
checkCursorOperationPossible();
byte[] value = getBytesX(columnIndex);
log.debug(() -> logExit("getBytes",
String.format("%s, length(%s)", value, value != null ? value.length : 0)));
return value;
}
private byte[] getBytesX(int columnIndex) throws SQLException {
// TODO - add ByteArrayType support
return getObjectX(columnIndex, byte[].class);
}
@Override
public Date getDate(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getDate (%d)", columnIndex));
checkCursorOperationPossible();
Date value = getDateX(columnIndex, null);
log.debug(() -> logExit("getDate", value));
return value;
}
private Date getDateX(int columnIndex, Calendar calendar) throws SQLException {
Map conversionParams = null;
if (calendar != null) {
conversionParams = new HashMap<>();
conversionParams.put("calendar", calendar);
}
return getObjectX(columnIndex, Date.class, conversionParams);
}
@Override
public Time getTime(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getTime (%d)", columnIndex));
checkCursorOperationPossible();
Time value = getTimeX(columnIndex, null);
log.debug(() -> logExit("getTime", value));
return value;
}
private Time getTimeX(int columnIndex, Calendar calendar) throws SQLException {
Map conversionParams = null;
if (calendar != null) {
conversionParams = new HashMap<>();
conversionParams.put("calendar", calendar);
}
return getObjectX(columnIndex, Time.class, conversionParams);
}
@Override
public Timestamp getTimestamp(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getTimestamp (%d)", columnIndex));
checkCursorOperationPossible();
Timestamp value = getTimestampX(columnIndex, null);
log.debug(() -> logExit("getTimestamp", value));
return value;
}
private Timestamp getTimestampX(int columnIndex, Calendar calendar) throws SQLException {
Map conversionParams = null;
if (calendar != null) {
conversionParams = new HashMap<>();
conversionParams.put("calendar", calendar);
}
return getObjectX(columnIndex, Timestamp.class, conversionParams);
}
@Override
public InputStream getAsciiStream(int columnIndex) throws SQLException {
throw new SQLFeatureNotSupportedException("Streams are not supported");
}
@Override
public InputStream getUnicodeStream(int columnIndex) throws SQLException {
throw new SQLFeatureNotSupportedException("Streams are not supported");
}
@Override
public InputStream getBinaryStream(int columnIndex) throws SQLException {
throw new SQLFeatureNotSupportedException("Streams are not supported");
}
@Override
public String getString(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getString (%s)", columnLabel));
checkCursorOperationPossible();
String value = getStringX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getString", value));
return value;
}
@Override
public boolean getBoolean(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getBoolean (%s)", columnLabel));
checkCursorOperationPossible();
boolean value = getBooleanX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getBoolean", value));
return value;
}
@Override
public byte getByte(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getByte (%s)", columnLabel));
checkCursorOperationPossible();
byte value = getByteX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getByte", value));
return value;
}
@Override
public short getShort(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getShort (%s)", columnLabel));
checkCursorOperationPossible();
short value = getShortX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getShort", value));
return value;
}
@Override
public int getInt(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getInt (%s)", columnLabel));
checkCursorOperationPossible();
int value = getIntX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getInt", value));
return value;
}
@Override
public long getLong(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getLong (%s)", columnLabel));
checkCursorOperationPossible();
long value = getLongX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getLong", value));
return value;
}
@Override
public float getFloat(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getFloat (%s)", columnLabel));
checkCursorOperationPossible();
float value = getFloatX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getFloat", value));
return value;
}
@Override
public double getDouble(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getDouble (%s)", columnLabel));
checkCursorOperationPossible();
double value = getDoubleX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getDouble", value));
return value;
}
@Override
public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException {
log.debug(() -> logEntry("getBigDecimal (%s, %d)", columnLabel, scale));
checkCursorOperationPossible();
BigDecimal value = getBigDecimalX(getColumnIndex(columnLabel), scale);
log.debug(() -> logExit("getBigDecimal", value));
return value;
}
@Override
public byte[] getBytes(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getBytes (%s)", columnLabel));
checkCursorOperationPossible();
byte[] value = getBytesX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getBytes",
String.format("%s, length(%s)", value, value != null ? value.length : 0)));
return value;
}
@Override
public Date getDate(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getDate (%s)", columnLabel));
checkCursorOperationPossible();
Date value = getDateX(getColumnIndex(columnLabel), null);
log.debug(() -> logExit("getDate", value));
return value;
}
@Override
public Time getTime(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getTime (%s)", columnLabel));
checkCursorOperationPossible();
Time value = getTimeX(getColumnIndex(columnLabel), null);
log.debug(() -> logExit("getTime", value));
return value;
}
@Override
public Timestamp getTimestamp(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getTimestamp (%s)", columnLabel));
checkCursorOperationPossible();
Timestamp value = getTimestampX(getColumnIndex(columnLabel), null);
log.debug(() -> logExit("getTimestamp", value));
return value;
}
@Override
public InputStream getAsciiStream(String columnLabel) throws SQLException {
throw new SQLFeatureNotSupportedException("Streams are not supported");
}
@Override
public InputStream getUnicodeStream(String columnLabel) throws SQLException {
throw new SQLFeatureNotSupportedException("Streams are not supported");
}
@Override
public InputStream getBinaryStream(String columnLabel) throws SQLException {
throw new SQLFeatureNotSupportedException("Streams are not supported");
}
@Override
public SQLWarning getWarnings() throws SQLException {
checkOpen();
return null;
}
@Override
public void clearWarnings() throws SQLException {
checkOpen();
}
@Override
public String getCursorName() throws SQLException {
throw new SQLFeatureNotSupportedException("Cursor name is not supported");
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
checkOpen();
return new ResultSetMetaDataImpl(this, cursor.getSchema());
}
@Override
public Object getObject(int columnIndex) throws SQLException {
log.debug(() -> logEntry("getObject (%d)", columnIndex));
checkCursorOperationPossible();
Object value = getObjectX(columnIndex);
log.debug(() -> logExit("getObject",
value != null ? "(" + value.getClass().getName() + ") " + value : "null"));
return value;
}
@Override
public Object getObject(String columnLabel) throws SQLException {
log.debug(() -> logEntry("getObject (%s)", columnLabel));
checkCursorOperationPossible();
Object value = getObjectX(getColumnIndex(columnLabel));
log.debug(() -> logExit("getObject",
value != null ? "(" + value.getClass().getName() + ") " + value : "null"));
return value;
}
private Object getObjectX(int columnIndex) throws SQLException {
return getObjectX(columnIndex, (Class